You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/10/15 05:28:48 UTC
[incubator-seatunnel] branch dev updated: [Improve][Connector-V2][File] Support parse field from file path (#2985)
This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 0bc12085c [Improve][Connector-V2][File] Support parse field from file path (#2985)
0bc12085c is described below
commit 0bc12085c248880ef616eb802400e95ff2982b1b
Author: TyrantLucifer <Ty...@gmail.com>
AuthorDate: Sat Oct 15 13:28:41 2022 +0800
[Improve][Connector-V2][File] Support parse field from file path (#2985)
---
docs/en/connector-v2/source/FtpFile.md | 43 +++++++++++-----
docs/en/connector-v2/source/HdfsFile.md | 37 +++++++++----
docs/en/connector-v2/source/LocalFile.md | 35 +++++++++----
docs/en/connector-v2/source/OssFile.md | 43 +++++++++++-----
.../file/hdfs/source/BaseHdfsFileSource.java | 6 ++-
.../seatunnel/file/config/BaseSourceConfig.java | 1 +
.../file/source/reader/AbstractReadStrategy.java | 60 +++++++++++++++++++++-
.../file/source/reader/JsonReadStrategy.java | 19 +++++--
.../file/source/reader/OrcReadStrategy.java | 25 +++++----
.../file/source/reader/ParquetReadStrategy.java | 23 ++++++---
.../seatunnel/file/source/reader/ReadStrategy.java | 2 +
.../file/source/reader/TextReadStrategy.java | 56 +++++++++++++++-----
.../seatunnel/file/ftp/source/FtpFileSource.java | 6 ++-
.../file/local/source/LocalFileSource.java | 6 ++-
.../seatunnel/file/oss/source/OssFileSource.java | 6 ++-
.../format/text/TextDeserializationSchema.java | 19 +++++--
16 files changed, 292 insertions(+), 95 deletions(-)
diff --git a/docs/en/connector-v2/source/FtpFile.md b/docs/en/connector-v2/source/FtpFile.md
index 0513cc1c5..28508ccc5 100644
--- a/docs/en/connector-v2/source/FtpFile.md
+++ b/docs/en/connector-v2/source/FtpFile.md
@@ -21,20 +21,21 @@ Read data from ftp file server.
## Options
-| name | type | required | default value |
-|-----------------|---------|----------|---------------------|
-| host | string | yes | - |
-| port | int | yes | - |
-| user | string | yes | - |
-| password | string | yes | - |
-| path | string | yes | - |
-| type | string | yes | - |
-| delimiter | string | no | \001 |
-| date_format | string | no | yyyy-MM-dd |
-| datetime_format | string | no | yyyy-MM-dd HH:mm:ss |
-| time_format | string | no | HH:mm:ss |
-| schema | config | no | - |
-| common-options | | no | - |
+| name | type | required | default value |
+|----------------------------|---------|----------|---------------------|
+| host | string | yes | - |
+| port | int | yes | - |
+| user | string | yes | - |
+| password | string | yes | - |
+| path | string | yes | - |
+| type | string | yes | - |
+| delimiter | string | no | \001 |
+| parse_partition_from_path | boolean | no | true |
+| date_format | string | no | yyyy-MM-dd |
+| datetime_format | string | no | yyyy-MM-dd HH:mm:ss |
+| time_format | string | no | HH:mm:ss |
+| schema | config | no | - |
+| common-options | | no | - |
### host [string]
@@ -62,6 +63,20 @@ Field delimiter, used to tell connector how to slice and dice fields when readin
default `\001`, the same as hive's default delimiter
+### parse_partition_from_path [boolean]
+
+Control whether parse the partition keys and values from file path
+
+For example if you read a file from path `ftp://hadoop-cluster/tmp/seatunnel/parquet/name=tyrantlucifer/age=26`
+
+Every record data from file will be added these two fields:
+
+| name | age |
+|----------------|-----|
+| tyrantlucifer | 26 |
+
+Tips: **Do not define partition fields in schema option**
+
### date_format [string]
Date type format, used to tell connector how to convert string to date, supported as the following formats:
diff --git a/docs/en/connector-v2/source/HdfsFile.md b/docs/en/connector-v2/source/HdfsFile.md
index 842d2ac5d..18bdff2fa 100644
--- a/docs/en/connector-v2/source/HdfsFile.md
+++ b/docs/en/connector-v2/source/HdfsFile.md
@@ -26,17 +26,18 @@ Read all the data in a split in a pollNext call. What splits are read will be sa
## Options
-| name | type | required | default value |
-|-----------------|--------|----------|---------------------|
-| path | string | yes | - |
-| type | string | yes | - |
-| fs.defaultFS | string | yes | - |
-| delimiter | string | no | \001 |
-| date_format | string | no | yyyy-MM-dd |
-| datetime_format | string | no | yyyy-MM-dd HH:mm:ss |
-| time_format | string | no | HH:mm:ss |
-| schema | config | no | - |
-| common-options | | no | - |
+| name | type | required | default value |
+|----------------------------|---------|----------|---------------------|
+| path | string | yes | - |
+| type | string | yes | - |
+| fs.defaultFS | string | yes | - |
+| delimiter | string | no | \001 |
+| parse_partition_from_path | boolean | no | true |
+| date_format | string | no | yyyy-MM-dd |
+| datetime_format | string | no | yyyy-MM-dd HH:mm:ss |
+| time_format | string | no | HH:mm:ss |
+| schema | config | no | - |
+| common-options | | no | - |
### path [string]
@@ -48,6 +49,20 @@ Field delimiter, used to tell connector how to slice and dice fields when readin
default `\001`, the same as hive's default delimiter
+### parse_partition_from_path [boolean]
+
+Control whether parse the partition keys and values from file path
+
+For example if you read a file from path `hdfs://hadoop-cluster/tmp/seatunnel/parquet/name=tyrantlucifer/age=26`
+
+Every record data from file will be added these two fields:
+
+| name | age |
+|----------------|-----|
+| tyrantlucifer | 26 |
+
+Tips: **Do not define partition fields in schema option**
+
### date_format [string]
Date type format, used to tell connector how to convert string to date, supported as the following formats:
diff --git a/docs/en/connector-v2/source/LocalFile.md b/docs/en/connector-v2/source/LocalFile.md
index 2aa8d280c..8dff26312 100644
--- a/docs/en/connector-v2/source/LocalFile.md
+++ b/docs/en/connector-v2/source/LocalFile.md
@@ -26,16 +26,17 @@ Read all the data in a split in a pollNext call. What splits are read will be sa
## Options
-| name | type | required | default value |
-|-----------------|--------|----------|---------------------|
-| path | string | yes | - |
-| type | string | yes | - |
-| delimiter | string | no | \001 |
-| date_format | string | no | yyyy-MM-dd |
-| datetime_format | string | no | yyyy-MM-dd HH:mm:ss |
-| time_format | string | no | HH:mm:ss |
-| schema | config | no | - |
-| common-options | | no | - |
+| name | type | required | default value |
+|----------------------------|-----------|----------|---------------------|
+| path | string | yes | - |
+| type | string | yes | - |
+| delimiter | string | no | \001 |
+| parse_partition_from_path | boolean | no | true |
+| date_format | string | no | yyyy-MM-dd |
+| datetime_format | string | no | yyyy-MM-dd HH:mm:ss |
+| time_format | string | no | HH:mm:ss |
+| schema | config | no | - |
+| common-options | | no | - |
### path [string]
@@ -47,6 +48,20 @@ Field delimiter, used to tell connector how to slice and dice fields when readin
default `\001`, the same as hive's default delimiter
+### parse_partition_from_path [boolean]
+
+Control whether parse the partition keys and values from file path
+
+For example if you read a file from path `file://hadoop-cluster/tmp/seatunnel/parquet/name=tyrantlucifer/age=26`
+
+Every record data from file will be added these two fields:
+
+| name | age |
+|----------------|-----|
+| tyrantlucifer | 26 |
+
+Tips: **Do not define partition fields in schema option**
+
### date_format [string]
Date type format, used to tell connector how to convert string to date, supported as the following formats:
diff --git a/docs/en/connector-v2/source/OssFile.md b/docs/en/connector-v2/source/OssFile.md
index 6a0aace48..8aa378ac6 100644
--- a/docs/en/connector-v2/source/OssFile.md
+++ b/docs/en/connector-v2/source/OssFile.md
@@ -29,20 +29,21 @@ Read all the data in a split in a pollNext call. What splits are read will be sa
## Options
-| name | type | required | default value |
-|-----------------|--------|----------|---------------------|
-| path | string | yes | - |
-| type | string | yes | - |
-| bucket | string | yes | - |
-| access_key | string | yes | - |
-| access_secret | string | yes | - |
-| endpoint | string | yes | - |
-| delimiter | string | no | \001 |
-| date_format | string | no | yyyy-MM-dd |
-| datetime_format | string | no | yyyy-MM-dd HH:mm:ss |
-| time_format | string | no | HH:mm:ss |
-| schema | config | no | - |
-| common-options | | no | - |
+| name | type | required | default value |
+|---------------------------|---------|----------|---------------------|
+| path | string | yes | - |
+| type | string | yes | - |
+| bucket | string | yes | - |
+| access_key | string | yes | - |
+| access_secret | string | yes | - |
+| endpoint | string | yes | - |
+| delimiter | string | no | \001 |
+| parse_partition_from_path | boolean | no | true |
+| date_format | string | no | yyyy-MM-dd |
+| datetime_format | string | no | yyyy-MM-dd HH:mm:ss |
+| time_format | string | no | HH:mm:ss |
+| schema | config | no | - |
+| common-options | | no | - |
### path [string]
@@ -54,6 +55,20 @@ Field delimiter, used to tell connector how to slice and dice fields when readin
default `\001`, the same as hive's default delimiter
+### parse_partition_from_path [boolean]
+
+Control whether parse the partition keys and values from file path
+
+For example if you read a file from path `oss://hadoop-cluster/tmp/seatunnel/parquet/name=tyrantlucifer/age=26`
+
+Every record data from file will be added these two fields:
+
+| name | age |
+|----------------|-----|
+| tyrantlucifer | 26 |
+
+Tips: **Do not define partition fields in schema option**
+
### date_format [string]
Date type format, used to tell connector how to convert string to date, supported as the following formats:
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
index 9a986d172..311a6dd04 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.file.hdfs.source;
import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
@@ -59,10 +60,11 @@ public abstract class BaseHdfsFileSource extends BaseFileSource {
case TEXT:
case JSON:
Config schemaConfig = pluginConfig.getConfig(SeaTunnelSchema.SCHEMA);
- rowType = SeaTunnelSchema
+ SeaTunnelRowType userDefinedSchema = SeaTunnelSchema
.buildWithConfig(schemaConfig)
.getSeaTunnelRowType();
- readStrategy.setSeaTunnelRowTypeInfo(rowType);
+ readStrategy.setSeaTunnelRowTypeInfo(userDefinedSchema);
+ rowType = readStrategy.getActualSeaTunnelRowTypeInfo();
break;
case ORC:
case PARQUET:
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java
index 60617de72..ffe6e7018 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java
@@ -25,4 +25,5 @@ public class BaseSourceConfig {
public static final String DATE_FORMAT = "date_format";
public static final String DATETIME_FORMAT = "datetime_format";
public static final String TIME_FORMAT = "time_format";
+ public static final String PARSE_PARTITION_FROM_PATH = "parse_partition_from_path";
}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
index 55213d219..fdfbc9756 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
@@ -22,7 +22,10 @@ import static org.apache.parquet.avro.AvroSchemaConverter.ADD_LIST_ELEMENT_RECOR
import static org.apache.parquet.avro.AvroWriteSupport.WRITE_FIXED_AS_INT96;
import static org.apache.parquet.avro.AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FilePluginException;
@@ -37,13 +40,19 @@ import org.apache.hadoop.fs.Path;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Map;
@Slf4j
public abstract class AbstractReadStrategy implements ReadStrategy {
protected HadoopConf hadoopConf;
protected SeaTunnelRowType seaTunnelRowType;
+ protected SeaTunnelRowType seaTunnelRowTypeWithPartition;
protected Config pluginConfig;
+ protected List<String> fileNames = new ArrayList<>();
+ protected boolean isMergePartition = true;
@Override
public void init(HadoopConf conf) {
@@ -53,6 +62,7 @@ public abstract class AbstractReadStrategy implements ReadStrategy {
@Override
public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) {
this.seaTunnelRowType = seaTunnelRowType;
+ this.seaTunnelRowTypeWithPartition = mergePartitionTypes(fileNames.get(0), seaTunnelRowType);
}
@Override
@@ -84,8 +94,8 @@ public abstract class AbstractReadStrategy implements ReadStrategy {
@Override
public List<String> getFileNamesByPath(HadoopConf hadoopConf, String path) throws IOException {
Configuration configuration = getConfiguration(hadoopConf);
- List<String> fileNames = new ArrayList<>();
FileSystem hdfs = FileSystem.get(configuration);
+ ArrayList<String> fileNames = new ArrayList<>();
Path listFiles = new Path(path);
FileStatus[] stats = hdfs.listStatus(listFiles);
for (FileStatus fileStatus : stats) {
@@ -97,6 +107,7 @@ public abstract class AbstractReadStrategy implements ReadStrategy {
// filter '_SUCCESS' file
if (!fileStatus.getPath().getName().equals("_SUCCESS")) {
fileNames.add(fileStatus.getPath().toString());
+ this.fileNames.add(fileStatus.getPath().toString());
}
}
}
@@ -106,5 +117,52 @@ public abstract class AbstractReadStrategy implements ReadStrategy {
@Override
public void setPluginConfig(Config pluginConfig) {
this.pluginConfig = pluginConfig;
+ if (pluginConfig.hasPath(BaseSourceConfig.PARSE_PARTITION_FROM_PATH)) {
+ isMergePartition = pluginConfig.getBoolean(BaseSourceConfig.PARSE_PARTITION_FROM_PATH);
+ }
+ }
+
+ @Override
+ public SeaTunnelRowType getActualSeaTunnelRowTypeInfo() {
+ return isMergePartition ? seaTunnelRowTypeWithPartition : seaTunnelRowType;
+ }
+
+ protected Map<String, String> parsePartitionsByPath(String path) {
+ LinkedHashMap<String, String> partitions = new LinkedHashMap<>();
+ Arrays.stream(path.split("/", -1))
+ .filter(split -> split.contains("="))
+ .map(split -> split.split("=", -1))
+ .forEach(kv -> partitions.put(kv[0], kv[1]));
+ return partitions;
+ }
+
+ protected SeaTunnelRowType mergePartitionTypes(String path, SeaTunnelRowType seaTunnelRowType) {
+ Map<String, String> partitionsMap = parsePartitionsByPath(path);
+ if (partitionsMap.isEmpty()) {
+ return seaTunnelRowType;
+ }
+ // get all names of partitions fields
+ String[] partitionNames = partitionsMap.keySet().toArray(new String[0]);
+ // initialize data type for partition fields
+ SeaTunnelDataType<?>[] partitionTypes = new SeaTunnelDataType<?>[partitionNames.length];
+ Arrays.fill(partitionTypes, BasicType.STRING_TYPE);
+ // get origin field names
+ String[] fieldNames = seaTunnelRowType.getFieldNames();
+ // get origin data types
+ SeaTunnelDataType<?>[] fieldTypes = seaTunnelRowType.getFieldTypes();
+ // create new array to merge partition fields and origin fields
+ String[] newFieldNames = new String[fieldNames.length + partitionNames.length];
+ // create new array to merge partition fields' data type and origin fields' data type
+ SeaTunnelDataType<?>[] newFieldTypes = new SeaTunnelDataType<?>[fieldTypes.length + partitionTypes.length];
+ // copy origin field names to new array
+ System.arraycopy(fieldNames, 0, newFieldNames, 0, fieldNames.length);
+ // copy partitions field name to new array
+ System.arraycopy(partitionNames, 0, newFieldNames, fieldNames.length, partitionNames.length);
+ // copy origin field types to new array
+ System.arraycopy(fieldTypes, 0, newFieldTypes, 0, fieldTypes.length);
+ // copy partition field types to new array
+ System.arraycopy(partitionTypes, 0, newFieldTypes, fieldTypes.length, partitionTypes.length);
+ // return merge row type
+ return new SeaTunnelRowType(newFieldNames, newFieldTypes);
}
}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/JsonReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/JsonReadStrategy.java
index 9e5775fb3..e0ff36c14 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/JsonReadStrategy.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/JsonReadStrategy.java
@@ -33,6 +33,7 @@ import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
+import java.util.Map;
public class JsonReadStrategy extends AbstractReadStrategy {
private DeserializationSchema<SeaTunnelRow> deserializationSchema;
@@ -40,7 +41,11 @@ public class JsonReadStrategy extends AbstractReadStrategy {
@Override
public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) {
super.setSeaTunnelRowTypeInfo(seaTunnelRowType);
- deserializationSchema = new JsonDeserializationSchema(false, false, this.seaTunnelRowType);
+ if (isMergePartition) {
+ deserializationSchema = new JsonDeserializationSchema(false, false, this.seaTunnelRowTypeWithPartition);
+ } else {
+ deserializationSchema = new JsonDeserializationSchema(false, false, this.seaTunnelRowType);
+ }
}
@Override
@@ -48,10 +53,18 @@ public class JsonReadStrategy extends AbstractReadStrategy {
Configuration conf = getConfiguration();
FileSystem fs = FileSystem.get(conf);
Path filePath = new Path(path);
+ Map<String, String> partitionsMap = parsePartitionsByPath(path);
try (BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(filePath), StandardCharsets.UTF_8))) {
reader.lines().forEach(line -> {
try {
- deserializationSchema.deserialize(line.getBytes(), output);
+ SeaTunnelRow seaTunnelRow = deserializationSchema.deserialize(line.getBytes());
+ if (isMergePartition) {
+ int index = seaTunnelRowType.getTotalFields();
+ for (String value : partitionsMap.values()) {
+ seaTunnelRow.setField(index++, value);
+ }
+ }
+ output.collect(seaTunnelRow);
} catch (IOException e) {
throw new RuntimeException(e);
}
@@ -61,6 +74,6 @@ public class JsonReadStrategy extends AbstractReadStrategy {
@Override
public SeaTunnelRowType getSeaTunnelRowTypeInfo(HadoopConf hadoopConf, String path) throws FilePluginException {
- return this.seaTunnelRowType;
+ throw new UnsupportedOperationException("User must defined schema for json file type");
}
}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategy.java
index 5b946b769..9b81950de 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategy.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategy.java
@@ -67,8 +67,6 @@ import java.util.Map;
@Slf4j
public class OrcReadStrategy extends AbstractReadStrategy {
-
- private SeaTunnelRowType seaTunnelRowTypeInfo;
private static final long MIN_SIZE = 16 * 1024;
@Override
@@ -78,6 +76,7 @@ public class OrcReadStrategy extends AbstractReadStrategy {
}
Configuration configuration = getConfiguration();
Path filePath = new Path(path);
+ Map<String, String> partitionsMap = parsePartitionsByPath(path);
OrcFile.ReaderOptions readerOptions = OrcFile.readerOptions(configuration);
try (Reader reader = OrcFile.createReader(filePath, readerOptions)) {
TypeDescription schema = reader.getSchema();
@@ -88,7 +87,16 @@ public class OrcReadStrategy extends AbstractReadStrategy {
int num = 0;
for (int i = 0; i < rowBatch.size; i++) {
int numCols = rowBatch.numCols;
- Object[] fields = new Object[numCols];
+ Object[] fields;
+ if (isMergePartition) {
+ int index = numCols;
+ fields = new Object[numCols + partitionsMap.size()];
+ for (String value : partitionsMap.values()) {
+ fields[index++] = value;
+ }
+ } else {
+ fields = new Object[numCols];
+ }
ColumnVector[] cols = rowBatch.cols;
for (int j = 0; j < numCols; j++) {
if (cols[j] == null) {
@@ -97,7 +105,8 @@ public class OrcReadStrategy extends AbstractReadStrategy {
fields[j] = readColumn(cols[j], children.get(j), num);
}
}
- output.collect(new SeaTunnelRow(fields));
+ SeaTunnelRow seaTunnelRow = new SeaTunnelRow(fields);
+ output.collect(seaTunnelRow);
num++;
}
}
@@ -106,9 +115,6 @@ public class OrcReadStrategy extends AbstractReadStrategy {
@Override
public SeaTunnelRowType getSeaTunnelRowTypeInfo(HadoopConf hadoopConf, String path) throws FilePluginException {
- if (null != seaTunnelRowTypeInfo) {
- return seaTunnelRowTypeInfo;
- }
Configuration configuration = getConfiguration(hadoopConf);
OrcFile.ReaderOptions readerOptions = OrcFile.readerOptions(configuration);
Path dstDir = new Path(path);
@@ -120,8 +126,9 @@ public class OrcReadStrategy extends AbstractReadStrategy {
fields[i] = schema.getFieldNames().get(i);
types[i] = orcDataType2SeaTunnelDataType(schema.getChildren().get(i));
}
- seaTunnelRowTypeInfo = new SeaTunnelRowType(fields, types);
- return seaTunnelRowTypeInfo;
+ seaTunnelRowType = new SeaTunnelRowType(fields, types);
+ seaTunnelRowTypeWithPartition = mergePartitionTypes(path, seaTunnelRowType);
+ return getActualSeaTunnelRowTypeInfo();
} catch (IOException e) {
throw new FilePluginException("Create OrcReader Fail", e);
}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
index 951d397b4..442e4cfd7 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
@@ -64,11 +64,11 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
@Slf4j
public class ParquetReadStrategy extends AbstractReadStrategy {
- private SeaTunnelRowType seaTunnelRowType;
private static final byte[] PARQUET_MAGIC = new byte[]{(byte) 'P', (byte) 'A', (byte) 'R', (byte) '1'};
private static final long NANOS_PER_MILLISECOND = 1000000;
private static final long MILLIS_PER_DAY = TimeUnit.DAYS.toMillis(1L);
@@ -80,6 +80,7 @@ public class ParquetReadStrategy extends AbstractReadStrategy {
throw new Exception("please check file type");
}
Path filePath = new Path(path);
+ Map<String, String> partitionsMap = parsePartitionsByPath(path);
HadoopInputFile hadoopInputFile = HadoopInputFile.fromPath(filePath, getConfiguration());
int fieldsCount = seaTunnelRowType.getTotalFields();
GenericData dataModel = new GenericData();
@@ -92,12 +93,22 @@ public class ParquetReadStrategy extends AbstractReadStrategy {
.withDataModel(dataModel)
.build()) {
while ((record = reader.read()) != null) {
- Object[] fields = new Object[fieldsCount];
+ Object[] fields;
+ if (isMergePartition) {
+ int index = fieldsCount;
+ fields = new Object[fieldsCount + partitionsMap.size()];
+ for (String value : partitionsMap.values()) {
+ fields[index++] = value;
+ }
+ } else {
+ fields = new Object[fieldsCount];
+ }
for (int i = 0; i < fieldsCount; i++) {
Object data = record.get(i);
fields[i] = resolveObject(data, seaTunnelRowType.getFieldType(i));
}
- output.collect(new SeaTunnelRow(fields));
+ SeaTunnelRow seaTunnelRow = new SeaTunnelRow(fields);
+ output.collect(seaTunnelRow);
}
}
}
@@ -185,9 +196,6 @@ public class ParquetReadStrategy extends AbstractReadStrategy {
@Override
public SeaTunnelRowType getSeaTunnelRowTypeInfo(HadoopConf hadoopConf, String path) throws FilePluginException {
- if (seaTunnelRowType != null) {
- return seaTunnelRowType;
- }
Path filePath = new Path(path);
ParquetMetadata metadata;
try {
@@ -210,7 +218,8 @@ public class ParquetReadStrategy extends AbstractReadStrategy {
types[i] = fieldType;
}
seaTunnelRowType = new SeaTunnelRowType(fields, types);
- return seaTunnelRowType;
+ seaTunnelRowTypeWithPartition = mergePartitionTypes(path, seaTunnelRowType);
+ return getActualSeaTunnelRowTypeInfo();
}
private SeaTunnelDataType<?> parquetType2SeaTunnelType(Type type) {
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java
index ee32bd182..3d1063c51 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java
@@ -45,4 +45,6 @@ public interface ReadStrategy extends Serializable {
List<String> getFileNamesByPath(HadoopConf hadoopConf, String path) throws IOException;
void setPluginConfig(Config pluginConfig);
+
+ SeaTunnelRowType getActualSeaTunnelRowTypeInfo();
}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
index 3ae05b86a..bd59e477d 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
@@ -39,6 +39,7 @@ import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
+import java.util.Map;
public class TextReadStrategy extends AbstractReadStrategy {
private DeserializationSchema<SeaTunnelRow> deserializationSchema;
@@ -52,10 +53,18 @@ public class TextReadStrategy extends AbstractReadStrategy {
Configuration conf = getConfiguration();
FileSystem fs = FileSystem.get(conf);
Path filePath = new Path(path);
+ Map<String, String> partitionsMap = parsePartitionsByPath(path);
try (BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(filePath), StandardCharsets.UTF_8))) {
reader.lines().forEach(line -> {
try {
- deserializationSchema.deserialize(line.getBytes(), output);
+ SeaTunnelRow seaTunnelRow = deserializationSchema.deserialize(line.getBytes());
+ if (isMergePartition) {
+ int index = seaTunnelRowType.getTotalFields();
+ for (String value : partitionsMap.values()) {
+ seaTunnelRow.setField(index++, value);
+ }
+ }
+ output.collect(seaTunnelRow);
} catch (IOException e) {
String errorMsg = String.format("Deserialize this data [%s] error, please check the origin data", line);
throw new RuntimeException(errorMsg);
@@ -67,11 +76,20 @@ public class TextReadStrategy extends AbstractReadStrategy {
@Override
public SeaTunnelRowType getSeaTunnelRowTypeInfo(HadoopConf hadoopConf, String path) {
SeaTunnelRowType simpleSeaTunnelType = SeaTunnelSchema.buildSimpleTextSchema();
- deserializationSchema = TextDeserializationSchema.builder()
- .seaTunnelRowType(simpleSeaTunnelType)
- .delimiter(String.valueOf('\002'))
- .build();
- return simpleSeaTunnelType;
+ this.seaTunnelRowType = simpleSeaTunnelType;
+ this.seaTunnelRowTypeWithPartition = mergePartitionTypes(fileNames.get(0), simpleSeaTunnelType);
+ if (isMergePartition) {
+ deserializationSchema = TextDeserializationSchema.builder()
+ .seaTunnelRowType(this.seaTunnelRowTypeWithPartition)
+ .delimiter(String.valueOf('\002'))
+ .build();
+ } else {
+ deserializationSchema = TextDeserializationSchema.builder()
+ .seaTunnelRowType(this.seaTunnelRowType)
+ .delimiter(String.valueOf('\002'))
+ .build();
+ }
+ return getActualSeaTunnelRowTypeInfo();
}
@Override
@@ -80,7 +98,7 @@ public class TextReadStrategy extends AbstractReadStrategy {
if (pluginConfig.hasPath(BaseSourceConfig.DELIMITER)) {
fieldDelimiter = pluginConfig.getString(BaseSourceConfig.DELIMITER);
} else {
- FileFormat fileFormat = FileFormat.valueOf(pluginConfig.getString(BaseSourceConfig.FILE_TYPE));
+ FileFormat fileFormat = FileFormat.valueOf(pluginConfig.getString(BaseSourceConfig.FILE_TYPE).toUpperCase());
if (fileFormat == FileFormat.CSV) {
fieldDelimiter = ",";
}
@@ -94,12 +112,22 @@ public class TextReadStrategy extends AbstractReadStrategy {
if (pluginConfig.hasPath(BaseSourceConfig.TIME_FORMAT)) {
timeFormat = TimeUtils.Formatter.parse(pluginConfig.getString(BaseSourceConfig.TIME_FORMAT));
}
- deserializationSchema = TextDeserializationSchema.builder()
- .seaTunnelRowType(seaTunnelRowType)
- .delimiter(fieldDelimiter)
- .dateFormatter(dateFormat)
- .dateTimeFormatter(datetimeFormat)
- .timeFormatter(timeFormat)
- .build();
+ if (isMergePartition) {
+ deserializationSchema = TextDeserializationSchema.builder()
+ .seaTunnelRowType(this.seaTunnelRowTypeWithPartition)
+ .delimiter(fieldDelimiter)
+ .dateFormatter(dateFormat)
+ .dateTimeFormatter(datetimeFormat)
+ .timeFormatter(timeFormat)
+ .build();
+ } else {
+ deserializationSchema = TextDeserializationSchema.builder()
+ .seaTunnelRowType(this.seaTunnelRowType)
+ .delimiter(fieldDelimiter)
+ .dateFormatter(dateFormat)
+ .dateTimeFormatter(datetimeFormat)
+ .timeFormatter(timeFormat)
+ .build();
+ }
}
}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java
index fc6c4d420..ab34a6c2b 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.connectors.seatunnel.file.ftp.source;
import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
@@ -73,10 +74,11 @@ public class FtpFileSource extends BaseFileSource {
case TEXT:
case JSON:
Config schemaConfig = pluginConfig.getConfig(SeaTunnelSchema.SCHEMA);
- rowType = SeaTunnelSchema
+ SeaTunnelRowType userDefinedSchema = SeaTunnelSchema
.buildWithConfig(schemaConfig)
.getSeaTunnelRowType();
- readStrategy.setSeaTunnelRowTypeInfo(rowType);
+ readStrategy.setSeaTunnelRowTypeInfo(userDefinedSchema);
+ rowType = readStrategy.getActualSeaTunnelRowTypeInfo();
break;
case ORC:
case PARQUET:
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
index 8cfe9e0df..df04acde4 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.connectors.seatunnel.file.local.source;
import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
@@ -70,10 +71,11 @@ public class LocalFileSource extends BaseFileSource {
case TEXT:
case JSON:
Config schemaConfig = pluginConfig.getConfig(SeaTunnelSchema.SCHEMA);
- rowType = SeaTunnelSchema
+ SeaTunnelRowType userDefinedSchema = SeaTunnelSchema
.buildWithConfig(schemaConfig)
.getSeaTunnelRowType();
- readStrategy.setSeaTunnelRowTypeInfo(rowType);
+ readStrategy.setSeaTunnelRowTypeInfo(userDefinedSchema);
+ rowType = readStrategy.getActualSeaTunnelRowTypeInfo();
break;
case ORC:
case PARQUET:
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java
index a2bd20c0e..a99d0e2ab 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.connectors.seatunnel.file.oss.source;
import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
@@ -71,10 +72,11 @@ public class OssFileSource extends BaseFileSource {
case TEXT:
case JSON:
Config schemaConfig = pluginConfig.getConfig(SeaTunnelSchema.SCHEMA);
- rowType = SeaTunnelSchema
+ SeaTunnelRowType userDefinedSchema = SeaTunnelSchema
.buildWithConfig(schemaConfig)
.getSeaTunnelRowType();
- readStrategy.setSeaTunnelRowTypeInfo(rowType);
+ readStrategy.setSeaTunnelRowTypeInfo(userDefinedSchema);
+ rowType = readStrategy.getActualSeaTunnelRowTypeInfo();
break;
case ORC:
case PARQUET:
diff --git a/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextDeserializationSchema.java b/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextDeserializationSchema.java
index ab3576eb9..3080b1a19 100644
--- a/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextDeserializationSchema.java
+++ b/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextDeserializationSchema.java
@@ -59,7 +59,7 @@ public class TextDeserializationSchema implements DeserializationSchema<SeaTunne
public SeaTunnelRow deserialize(byte[] message) throws IOException {
String content = new String(message);
Map<Integer, String> splitsMap = splitLineBySeaTunnelRowType(content, seaTunnelRowType);
- Object[] objects = new Object[splitsMap.size()];
+ Object[] objects = new Object[seaTunnelRowType.getTotalFields()];
for (int i = 0; i < objects.length; i++) {
objects[i] = convert(splitsMap.get(i), seaTunnelRowType.getFieldType(i));
}
@@ -80,12 +80,23 @@ public class TextDeserializationSchema implements DeserializationSchema<SeaTunne
if (fieldTypes[i].getSqlType() == SqlType.ROW) {
// row type
int totalFields = ((SeaTunnelRowType) fieldTypes[i]).getTotalFields();
- ArrayList<String> rowSplits = new ArrayList<>(Arrays.asList(splits).subList(cursor, cursor + totalFields));
- splitsMap.put(i, String.join(delimiter, rowSplits));
+ // if current field is empty
+ if (cursor >= splits.length) {
+ splitsMap.put(i, null);
+ } else {
+ ArrayList<String> rowSplits = new ArrayList<>(Arrays.asList(splits).subList(cursor, cursor + totalFields));
+ splitsMap.put(i, String.join(delimiter, rowSplits));
+ }
cursor += totalFields;
} else {
// not row type
- splitsMap.put(i, splits[cursor++]);
+ // if current field is empty
+ if (cursor >= splits.length) {
+ splitsMap.put(i, null);
+ cursor++;
+ } else {
+ splitsMap.put(i, splits[cursor++]);
+ }
}
}
return splitsMap;