You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/08/18 02:03:58 UTC
[incubator-seatunnel] branch dev updated: [Feature][Connector-V2] Add hdfs file json support (#2451)
This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 84f6b17c1 [Feature][Connector-V2] Add hdfs file json support (#2451)
84f6b17c1 is described below
commit 84f6b17c15ac3f82c010322ce2b74ee74b501ac9
Author: TyrantLucifer <Ty...@gmail.com>
AuthorDate: Thu Aug 18 10:03:53 2022 +0800
[Feature][Connector-V2] Add hdfs file json support (#2451)
* [Feature][Connector-V2] Support json format for hdfs source
* [Feature][Connector-V2] Add json read strategy
* [Feature][Connector-V2] Update hdfs file source doc
---
docs/en/connector-v2/source/HdfsFile.md | 71 ++++++++++++++++++++--
.../connector-file/connector-file-base/pom.xml | 6 ++
.../seatunnel/file/config/BaseSourceConfig.java | 1 +
.../seatunnel/file/config/FileFormat.java | 3 +-
.../file/source/reader/AbstractReadStrategy.java | 9 ++-
.../file/source/reader/JsonReadStrategy.java | 66 ++++++++++++++++++++
.../seatunnel/file/source/reader/ReadStrategy.java | 2 +
.../seatunnel/file/hdfs/source/HdfsFileSource.java | 18 ++++--
8 files changed, 164 insertions(+), 12 deletions(-)
diff --git a/docs/en/connector-v2/source/HdfsFile.md b/docs/en/connector-v2/source/HdfsFile.md
index 6f8d4df83..00bbe5fdd 100644
--- a/docs/en/connector-v2/source/HdfsFile.md
+++ b/docs/en/connector-v2/source/HdfsFile.md
@@ -8,11 +8,12 @@ Read data from hdfs file system.
## Options
-| name | type | required | default value |
-|--------------| ------ |----------|---------------|
-| path | string | yes | - |
-| type | string | yes | - |
-| fs.defaultFS | string | yes | - |
+| name | type | required | default value |
+|---------------|--------|----------|---------------|
+| path | string | yes | - |
+| type | string | yes | - |
+| fs.defaultFS | string | yes | - |
+| schema | config | no | - |
### path [string]
@@ -24,13 +25,55 @@ File type, supported as the following file types:
`text` `csv` `parquet` `orc` `json`
+If you assign file type to `json`, you should also assign schema option to tell connector how to parse data to the row you want.
+
+For example:
+
+upstream data is the following:
+
+```json
+
+{"code": 200, "data": "get success", "success": true}
+
+```
+
+you should assign schema as the following:
+
+```hocon
+
+schema {
+ fields {
+ code = int
+ data = string
+ success = boolean
+ }
+}
+
+```
+
+connector will generate data as the following:
+
+| code | data | success |
+|------|-------------|---------|
+| 200 | get success | true |
+
+If you assign file type to `parquet` `orc`, schema option not required, connector can find the schema of upstream data automatically.
+
+If you assign file type to `text` `csv`, schema option not supported temporarily, but the subsequent features will support.
+
+Now connector will treat the upstream data as the following:
+
+| lines |
+|-----------------------------------|
+| The content of every line in file |
+
### fs.defaultFS [string]
Hdfs cluster address.
## Example
-```hcon
+```hocon
HdfsFile {
path = "/apps/hive/demo/student"
@@ -38,4 +81,20 @@ HdfsFile {
fs.defaultFS = "hdfs://namenode001"
}
+```
+
+```hocon
+
+HdfsFile {
+ schema {
+ fields {
+ name = string
+ age = int
+ }
+ }
+ path = "/apps/hive/demo/student"
+ type = "json"
+ fs.defaultFS = "hdfs://namenode001"
+}
+
```
\ No newline at end of file
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml b/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml
index 13173c14e..03fc5a56d 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml
@@ -50,6 +50,12 @@
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java
index 16f1bada9..fcccba986 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java
@@ -20,4 +20,5 @@ package org.apache.seatunnel.connectors.seatunnel.file.config;
public class BaseSourceConfig {
public static final String FILE_TYPE = "type";
public static final String FILE_PATH = "path";
+ public static final String SCHEMA = "schema";
}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java
index 790e7ca64..a356ed0c6 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.connectors.seatunnel.file.config;
+import org.apache.seatunnel.connectors.seatunnel.file.source.reader.JsonReadStrategy;
import org.apache.seatunnel.connectors.seatunnel.file.source.reader.OrcReadStrategy;
import org.apache.seatunnel.connectors.seatunnel.file.source.reader.ParquetReadStrategy;
import org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategy;
@@ -52,7 +53,7 @@ public enum FileFormat implements Serializable {
JSON("json") {
@Override
public ReadStrategy getReadStrategy() {
- return new TextReadStrategy();
+ return new JsonReadStrategy();
}
};
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
index eba6d4b21..215c4667c 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.connectors.seatunnel.file.source.reader;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FilePluginException;
@@ -33,13 +34,19 @@ import java.util.List;
@Slf4j
public abstract class AbstractReadStrategy implements ReadStrategy {
- HadoopConf hadoopConf;
+ protected HadoopConf hadoopConf;
+ protected SeaTunnelRowType seaTunnelRowType;
@Override
public void init(HadoopConf conf) {
this.hadoopConf = conf;
}
+ @Override
+ public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) {
+ this.seaTunnelRowType = seaTunnelRowType;
+ }
+
@Override
public Configuration getConfiguration(HadoopConf hadoopConf) {
Configuration configuration = new Configuration();
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/JsonReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/JsonReadStrategy.java
new file mode 100644
index 000000000..9e5775fb3
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/JsonReadStrategy.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.source.reader;
+
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import org.apache.seatunnel.connectors.seatunnel.file.exception.FilePluginException;
+import org.apache.seatunnel.format.json.JsonDeserializationSchema;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+
+public class JsonReadStrategy extends AbstractReadStrategy {
+ private DeserializationSchema<SeaTunnelRow> deserializationSchema;
+
+ @Override
+ public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) {
+ super.setSeaTunnelRowTypeInfo(seaTunnelRowType);
+ deserializationSchema = new JsonDeserializationSchema(false, false, this.seaTunnelRowType);
+ }
+
+ @Override
+ public void read(String path, Collector<SeaTunnelRow> output) throws Exception {
+ Configuration conf = getConfiguration();
+ FileSystem fs = FileSystem.get(conf);
+ Path filePath = new Path(path);
+ try (BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(filePath), StandardCharsets.UTF_8))) {
+ reader.lines().forEach(line -> {
+ try {
+ deserializationSchema.deserialize(line.getBytes(), output);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+ }
+
+ @Override
+ public SeaTunnelRowType getSeaTunnelRowTypeInfo(HadoopConf hadoopConf, String path) throws FilePluginException {
+ return this.seaTunnelRowType;
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java
index 1114eab65..4f512a7db 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java
@@ -38,5 +38,7 @@ public interface ReadStrategy extends Serializable {
SeaTunnelRowType getSeaTunnelRowTypeInfo(HadoopConf hadoopConf, String path) throws FilePluginException;
+ void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType);
+
List<String> getFileNamesByPath(HadoopConf hadoopConf, String path) throws IOException;
}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSource.java
index 44cddd879..f0474a2c1 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSource.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSource.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.connectors.seatunnel.common.schema.SeatunnelSchema;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FilePluginException;
@@ -57,10 +58,19 @@ public class HdfsFileSource extends BaseFileSource {
} catch (IOException e) {
throw new PrepareFailException(getPluginName(), PluginType.SOURCE, "Check file path fail.");
}
- try {
- rowType = readStrategy.getSeaTunnelRowTypeInfo(hadoopConf, filePaths.get(0));
- } catch (FilePluginException e) {
- throw new PrepareFailException(getPluginName(), PluginType.SOURCE, "Read file schema error.", e);
+ // support user-defined schema
+ if (pluginConfig.hasPath(HdfsSourceConfig.SCHEMA)) {
+ Config schemaConfig = pluginConfig.getConfig(HdfsSourceConfig.SCHEMA);
+ rowType = SeatunnelSchema
+ .buildWithConfig(schemaConfig)
+ .getSeaTunnelRowType();
+ readStrategy.setSeaTunnelRowTypeInfo(rowType);
+ } else {
+ try {
+ rowType = readStrategy.getSeaTunnelRowTypeInfo(hadoopConf, filePaths.get(0));
+ } catch (FilePluginException e) {
+ throw new PrepareFailException(getPluginName(), PluginType.SOURCE, "Read file schema error.", e);
+ }
}
}
}