You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/10/15 05:28:48 UTC

[incubator-seatunnel] branch dev updated: [Improve][Connector-V2][File] Support parse field from file path (#2985)

This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 0bc12085c [Improve][Connector-V2][File] Support parse field from file path (#2985)
0bc12085c is described below

commit 0bc12085c248880ef616eb802400e95ff2982b1b
Author: TyrantLucifer <Ty...@gmail.com>
AuthorDate: Sat Oct 15 13:28:41 2022 +0800

    [Improve][Connector-V2][File] Support parse field from file path (#2985)
---
 docs/en/connector-v2/source/FtpFile.md             | 43 +++++++++++-----
 docs/en/connector-v2/source/HdfsFile.md            | 37 +++++++++----
 docs/en/connector-v2/source/LocalFile.md           | 35 +++++++++----
 docs/en/connector-v2/source/OssFile.md             | 43 +++++++++++-----
 .../file/hdfs/source/BaseHdfsFileSource.java       |  6 ++-
 .../seatunnel/file/config/BaseSourceConfig.java    |  1 +
 .../file/source/reader/AbstractReadStrategy.java   | 60 +++++++++++++++++++++-
 .../file/source/reader/JsonReadStrategy.java       | 19 +++++--
 .../file/source/reader/OrcReadStrategy.java        | 25 +++++----
 .../file/source/reader/ParquetReadStrategy.java    | 23 ++++++---
 .../seatunnel/file/source/reader/ReadStrategy.java |  2 +
 .../file/source/reader/TextReadStrategy.java       | 56 +++++++++++++++-----
 .../seatunnel/file/ftp/source/FtpFileSource.java   |  6 ++-
 .../file/local/source/LocalFileSource.java         |  6 ++-
 .../seatunnel/file/oss/source/OssFileSource.java   |  6 ++-
 .../format/text/TextDeserializationSchema.java     | 19 +++++--
 16 files changed, 292 insertions(+), 95 deletions(-)

diff --git a/docs/en/connector-v2/source/FtpFile.md b/docs/en/connector-v2/source/FtpFile.md
index 0513cc1c5..28508ccc5 100644
--- a/docs/en/connector-v2/source/FtpFile.md
+++ b/docs/en/connector-v2/source/FtpFile.md
@@ -21,20 +21,21 @@ Read data from ftp file server.
 
 ## Options
 
-| name            | type    | required | default value       |
-|-----------------|---------|----------|---------------------|
-| host            | string  | yes      | -                   |
-| port            | int     | yes      | -                   |
-| user            | string  | yes      | -                   |
-| password        | string  | yes      | -                   |
-| path            | string  | yes      | -                   |
-| type            | string  | yes      | -                   |
-| delimiter       | string  | no       | \001                |
-| date_format     | string  | no       | yyyy-MM-dd          |
-| datetime_format | string  | no       | yyyy-MM-dd HH:mm:ss |
-| time_format     | string  | no       | HH:mm:ss            |
-| schema          | config  | no       | -                   |
-| common-options  |         | no       | -                   |
+| name                       | type    | required | default value       |
+|----------------------------|---------|----------|---------------------|
+| host                       | string  | yes      | -                   |
+| port                       | int     | yes      | -                   |
+| user                       | string  | yes      | -                   |
+| password                   | string  | yes      | -                   |
+| path                       | string  | yes      | -                   |
+| type                       | string  | yes      | -                   |
+| delimiter                  | string  | no       | \001                |
+| parse_partition_from_path  | boolean | no       | true                |
+| date_format                | string  | no       | yyyy-MM-dd          |
+| datetime_format            | string  | no       | yyyy-MM-dd HH:mm:ss |
+| time_format                | string  | no       | HH:mm:ss            |
+| schema                     | config  | no       | -                   |
+| common-options             |         | no       | -                   |
 
 ### host [string]
 
@@ -62,6 +63,20 @@ Field delimiter, used to tell connector how to slice and dice fields when readin
 
 default `\001`, the same as hive's default delimiter
 
+### parse_partition_from_path [boolean]
+
+Control whether parse the partition keys and values from file path
+
+For example if you read a file from path `ftp://hadoop-cluster/tmp/seatunnel/parquet/name=tyrantlucifer/age=26`
+
+Every record data from file will be added these two fields:
+
+| name           | age |
+|----------------|-----|
+| tyrantlucifer  | 26  |
+
+Tips: **Do not define partition fields in schema option**
+
 ### date_format [string]
 
 Date type format, used to tell connector how to convert string to date, supported as the following formats:
diff --git a/docs/en/connector-v2/source/HdfsFile.md b/docs/en/connector-v2/source/HdfsFile.md
index 842d2ac5d..18bdff2fa 100644
--- a/docs/en/connector-v2/source/HdfsFile.md
+++ b/docs/en/connector-v2/source/HdfsFile.md
@@ -26,17 +26,18 @@ Read all the data in a split in a pollNext call. What splits are read will be sa
 
 ## Options
 
-| name            | type   | required | default value       |
-|-----------------|--------|----------|---------------------|
-| path            | string | yes      | -                   |
-| type            | string | yes      | -                   |
-| fs.defaultFS    | string | yes      | -                   |
-| delimiter       | string | no       | \001                |
-| date_format     | string | no       | yyyy-MM-dd          |
-| datetime_format | string | no       | yyyy-MM-dd HH:mm:ss |
-| time_format     | string | no       | HH:mm:ss            |
-| schema          | config | no       | -                   |
-| common-options  |        | no       | -                   |
+| name                       | type    | required | default value       |
+|----------------------------|---------|----------|---------------------|
+| path                       | string  | yes      | -                   |
+| type                       | string  | yes      | -                   |
+| fs.defaultFS               | string  | yes      | -                   |
+| delimiter                  | string  | no       | \001                |
+| parse_partition_from_path  | boolean | no       | true                |
+| date_format                | string  | no       | yyyy-MM-dd          |
+| datetime_format            | string  | no       | yyyy-MM-dd HH:mm:ss |
+| time_format                | string  | no       | HH:mm:ss            |
+| schema                     | config  | no       | -                   |
+| common-options             |         | no       | -                   |
 
 ### path [string]
 
@@ -48,6 +49,20 @@ Field delimiter, used to tell connector how to slice and dice fields when readin
 
 default `\001`, the same as hive's default delimiter
 
+### parse_partition_from_path [boolean]
+
+Control whether parse the partition keys and values from file path
+
+For example if you read a file from path `hdfs://hadoop-cluster/tmp/seatunnel/parquet/name=tyrantlucifer/age=26`
+
+Every record data from file will be added these two fields:
+
+| name           | age |
+|----------------|-----|
+| tyrantlucifer  | 26  |
+
+Tips: **Do not define partition fields in schema option**
+
 ### date_format [string]
 
 Date type format, used to tell connector how to convert string to date, supported as the following formats:
diff --git a/docs/en/connector-v2/source/LocalFile.md b/docs/en/connector-v2/source/LocalFile.md
index 2aa8d280c..8dff26312 100644
--- a/docs/en/connector-v2/source/LocalFile.md
+++ b/docs/en/connector-v2/source/LocalFile.md
@@ -26,16 +26,17 @@ Read all the data in a split in a pollNext call. What splits are read will be sa
 
 ## Options
 
-| name            | type   | required | default value       |
-|-----------------|--------|----------|---------------------|
-| path            | string | yes      | -                   |
-| type            | string | yes      | -                   |
-| delimiter       | string | no       | \001                |
-| date_format     | string | no       | yyyy-MM-dd          |
-| datetime_format | string | no       | yyyy-MM-dd HH:mm:ss |
-| time_format     | string | no       | HH:mm:ss            |
-| schema          | config | no       | -                   |
-| common-options  |        | no       | -                   |
+| name                       | type      | required | default value       |
+|----------------------------|-----------|----------|---------------------|
+| path                       | string    | yes      | -                   |
+| type                       | string    | yes      | -                   |
+| delimiter                  | string    | no       | \001                |
+| parse_partition_from_path  | boolean   | no       | true                |
+| date_format                | string    | no       | yyyy-MM-dd          |
+| datetime_format            | string    | no       | yyyy-MM-dd HH:mm:ss |
+| time_format                | string    | no       | HH:mm:ss            |
+| schema                     | config    | no       | -                   |
+| common-options             |           | no       | -                   |
 
 ### path [string]
 
@@ -47,6 +48,20 @@ Field delimiter, used to tell connector how to slice and dice fields when readin
 
 default `\001`, the same as hive's default delimiter
 
+### parse_partition_from_path [boolean]
+
+Control whether parse the partition keys and values from file path
+
+For example if you read a file from path `file://hadoop-cluster/tmp/seatunnel/parquet/name=tyrantlucifer/age=26`
+
+Every record data from file will be added these two fields:
+
+| name           | age |
+|----------------|-----|
+| tyrantlucifer  | 26  |
+
+Tips: **Do not define partition fields in schema option**
+
 ### date_format [string]
 
 Date type format, used to tell connector how to convert string to date, supported as the following formats:
diff --git a/docs/en/connector-v2/source/OssFile.md b/docs/en/connector-v2/source/OssFile.md
index 6a0aace48..8aa378ac6 100644
--- a/docs/en/connector-v2/source/OssFile.md
+++ b/docs/en/connector-v2/source/OssFile.md
@@ -29,20 +29,21 @@ Read all the data in a split in a pollNext call. What splits are read will be sa
 
 ## Options
 
-| name            | type   | required | default value       |
-|-----------------|--------|----------|---------------------|
-| path            | string | yes      | -                   |
-| type            | string | yes      | -                   |
-| bucket          | string | yes      | -                   |
-| access_key      | string | yes      | -                   |
-| access_secret   | string | yes      | -                   |
-| endpoint        | string | yes      | -                   |
-| delimiter       | string | no       | \001                |
-| date_format     | string | no       | yyyy-MM-dd          |
-| datetime_format | string | no       | yyyy-MM-dd HH:mm:ss |
-| time_format     | string | no       | HH:mm:ss            |
-| schema          | config | no       | -                   |
-| common-options  |        | no       | -                   |
+| name                      | type    | required | default value       |
+|---------------------------|---------|----------|---------------------|
+| path                      | string  | yes      | -                   |
+| type                      | string  | yes      | -                   |
+| bucket                    | string  | yes      | -                   |
+| access_key                | string  | yes      | -                   |
+| access_secret             | string  | yes      | -                   |
+| endpoint                  | string  | yes      | -                   |
+| delimiter                 | string  | no       | \001                |
+| parse_partition_from_path | boolean | no       | true                |
+| date_format               | string  | no       | yyyy-MM-dd          |
+| datetime_format           | string  | no       | yyyy-MM-dd HH:mm:ss |
+| time_format               | string  | no       | HH:mm:ss            |
+| schema                    | config  | no       | -                   |
+| common-options            |         | no       | -                   |
 
 ### path [string]
 
@@ -54,6 +55,20 @@ Field delimiter, used to tell connector how to slice and dice fields when readin
 
 default `\001`, the same as hive's default delimiter
 
+### parse_partition_from_path [boolean]
+
+Control whether parse the partition keys and values from file path
+
+For example if you read a file from path `oss://hadoop-cluster/tmp/seatunnel/parquet/name=tyrantlucifer/age=26`
+
+Every record data from file will be added these two fields:
+
+| name           | age |
+|----------------|-----|
+| tyrantlucifer  | 26  |
+
+Tips: **Do not define partition fields in schema option**
+
 ### date_format [string]
 
 Date type format, used to tell connector how to convert string to date, supported as the following formats:
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
index 9a986d172..311a6dd04 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.connectors.seatunnel.file.hdfs.source;
 
 import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
 import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.constants.PluginType;
@@ -59,10 +60,11 @@ public abstract class BaseHdfsFileSource extends BaseFileSource {
                 case TEXT:
                 case JSON:
                     Config schemaConfig = pluginConfig.getConfig(SeaTunnelSchema.SCHEMA);
-                    rowType = SeaTunnelSchema
+                    SeaTunnelRowType userDefinedSchema = SeaTunnelSchema
                             .buildWithConfig(schemaConfig)
                             .getSeaTunnelRowType();
-                    readStrategy.setSeaTunnelRowTypeInfo(rowType);
+                    readStrategy.setSeaTunnelRowTypeInfo(userDefinedSchema);
+                    rowType = readStrategy.getActualSeaTunnelRowTypeInfo();
                     break;
                 case ORC:
                 case PARQUET:
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java
index 60617de72..ffe6e7018 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java
@@ -25,4 +25,5 @@ public class BaseSourceConfig {
     public static final String DATE_FORMAT = "date_format";
     public static final String DATETIME_FORMAT = "datetime_format";
     public static final String TIME_FORMAT = "time_format";
+    public static final String PARSE_PARTITION_FROM_PATH = "parse_partition_from_path";
 }
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
index 55213d219..fdfbc9756 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
@@ -22,7 +22,10 @@ import static org.apache.parquet.avro.AvroSchemaConverter.ADD_LIST_ELEMENT_RECOR
 import static org.apache.parquet.avro.AvroWriteSupport.WRITE_FIXED_AS_INT96;
 import static org.apache.parquet.avro.AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE;
 
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig;
 import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
 import org.apache.seatunnel.connectors.seatunnel.file.exception.FilePluginException;
 
@@ -37,13 +40,19 @@ import org.apache.hadoop.fs.Path;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 
 @Slf4j
 public abstract class AbstractReadStrategy implements ReadStrategy {
     protected HadoopConf hadoopConf;
     protected SeaTunnelRowType seaTunnelRowType;
+    protected SeaTunnelRowType seaTunnelRowTypeWithPartition;
     protected Config pluginConfig;
+    protected List<String> fileNames = new ArrayList<>();
+    protected boolean isMergePartition = true;
 
     @Override
     public void init(HadoopConf conf) {
@@ -53,6 +62,7 @@ public abstract class AbstractReadStrategy implements ReadStrategy {
     @Override
     public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) {
         this.seaTunnelRowType = seaTunnelRowType;
+        this.seaTunnelRowTypeWithPartition = mergePartitionTypes(fileNames.get(0), seaTunnelRowType);
     }
 
     @Override
@@ -84,8 +94,8 @@ public abstract class AbstractReadStrategy implements ReadStrategy {
     @Override
     public List<String> getFileNamesByPath(HadoopConf hadoopConf, String path) throws IOException {
         Configuration configuration = getConfiguration(hadoopConf);
-        List<String> fileNames = new ArrayList<>();
         FileSystem hdfs = FileSystem.get(configuration);
+        ArrayList<String> fileNames = new ArrayList<>();
         Path listFiles = new Path(path);
         FileStatus[] stats = hdfs.listStatus(listFiles);
         for (FileStatus fileStatus : stats) {
@@ -97,6 +107,7 @@ public abstract class AbstractReadStrategy implements ReadStrategy {
                 // filter '_SUCCESS' file
                 if (!fileStatus.getPath().getName().equals("_SUCCESS")) {
                     fileNames.add(fileStatus.getPath().toString());
+                    this.fileNames.add(fileStatus.getPath().toString());
                 }
             }
         }
@@ -106,5 +117,52 @@ public abstract class AbstractReadStrategy implements ReadStrategy {
     @Override
     public void setPluginConfig(Config pluginConfig) {
         this.pluginConfig = pluginConfig;
+        if (pluginConfig.hasPath(BaseSourceConfig.PARSE_PARTITION_FROM_PATH)) {
+            isMergePartition = pluginConfig.getBoolean(BaseSourceConfig.PARSE_PARTITION_FROM_PATH);
+        }
+    }
+
+    @Override
+    public SeaTunnelRowType getActualSeaTunnelRowTypeInfo() {
+        return isMergePartition ? seaTunnelRowTypeWithPartition : seaTunnelRowType;
+    }
+
+    protected Map<String, String> parsePartitionsByPath(String path) {
+        LinkedHashMap<String, String> partitions = new LinkedHashMap<>();
+        Arrays.stream(path.split("/", -1))
+                .filter(split -> split.contains("="))
+                .map(split -> split.split("=", -1))
+                .forEach(kv -> partitions.put(kv[0], kv[1]));
+        return partitions;
+    }
+
+    protected SeaTunnelRowType mergePartitionTypes(String path, SeaTunnelRowType seaTunnelRowType) {
+        Map<String, String> partitionsMap = parsePartitionsByPath(path);
+        if (partitionsMap.isEmpty()) {
+            return seaTunnelRowType;
+        }
+        // get all names of partitions fields
+        String[] partitionNames = partitionsMap.keySet().toArray(new String[0]);
+        // initialize data type for partition fields
+        SeaTunnelDataType<?>[] partitionTypes = new SeaTunnelDataType<?>[partitionNames.length];
+        Arrays.fill(partitionTypes, BasicType.STRING_TYPE);
+        // get origin field names
+        String[] fieldNames = seaTunnelRowType.getFieldNames();
+        // get origin data types
+        SeaTunnelDataType<?>[] fieldTypes = seaTunnelRowType.getFieldTypes();
+        // create new array to merge partition fields and origin fields
+        String[] newFieldNames = new String[fieldNames.length + partitionNames.length];
+        // create new array to merge partition fields' data type and origin fields' data type
+        SeaTunnelDataType<?>[] newFieldTypes = new SeaTunnelDataType<?>[fieldTypes.length + partitionTypes.length];
+        // copy origin field names to new array
+        System.arraycopy(fieldNames, 0, newFieldNames, 0, fieldNames.length);
+        // copy partitions field name to new array
+        System.arraycopy(partitionNames, 0, newFieldNames, fieldNames.length, partitionNames.length);
+        // copy origin field types to new array
+        System.arraycopy(fieldTypes, 0, newFieldTypes, 0, fieldTypes.length);
+        // copy partition field types to new array
+        System.arraycopy(partitionTypes, 0, newFieldTypes, fieldTypes.length, partitionTypes.length);
+        // return merge row type
+        return new SeaTunnelRowType(newFieldNames, newFieldTypes);
     }
 }
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/JsonReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/JsonReadStrategy.java
index 9e5775fb3..e0ff36c14 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/JsonReadStrategy.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/JsonReadStrategy.java
@@ -33,6 +33,7 @@ import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.nio.charset.StandardCharsets;
+import java.util.Map;
 
 public class JsonReadStrategy extends AbstractReadStrategy {
     private DeserializationSchema<SeaTunnelRow> deserializationSchema;
@@ -40,7 +41,11 @@ public class JsonReadStrategy extends AbstractReadStrategy {
     @Override
     public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) {
         super.setSeaTunnelRowTypeInfo(seaTunnelRowType);
-        deserializationSchema = new JsonDeserializationSchema(false, false, this.seaTunnelRowType);
+        if (isMergePartition) {
+            deserializationSchema = new JsonDeserializationSchema(false, false, this.seaTunnelRowTypeWithPartition);
+        } else {
+            deserializationSchema = new JsonDeserializationSchema(false, false, this.seaTunnelRowType);
+        }
     }
 
     @Override
@@ -48,10 +53,18 @@ public class JsonReadStrategy extends AbstractReadStrategy {
         Configuration conf = getConfiguration();
         FileSystem fs = FileSystem.get(conf);
         Path filePath = new Path(path);
+        Map<String, String> partitionsMap = parsePartitionsByPath(path);
         try (BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(filePath), StandardCharsets.UTF_8))) {
             reader.lines().forEach(line -> {
                 try {
-                    deserializationSchema.deserialize(line.getBytes(), output);
+                    SeaTunnelRow seaTunnelRow = deserializationSchema.deserialize(line.getBytes());
+                    if (isMergePartition) {
+                        int index = seaTunnelRowType.getTotalFields();
+                        for (String value : partitionsMap.values()) {
+                            seaTunnelRow.setField(index++, value);
+                        }
+                    }
+                    output.collect(seaTunnelRow);
                 } catch (IOException e) {
                     throw new RuntimeException(e);
                 }
@@ -61,6 +74,6 @@ public class JsonReadStrategy extends AbstractReadStrategy {
 
     @Override
     public SeaTunnelRowType getSeaTunnelRowTypeInfo(HadoopConf hadoopConf, String path) throws FilePluginException {
-        return this.seaTunnelRowType;
+        throw new UnsupportedOperationException("User must defined schema for json file type");
     }
 }
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategy.java
index 5b946b769..9b81950de 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategy.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategy.java
@@ -67,8 +67,6 @@ import java.util.Map;
 
 @Slf4j
 public class OrcReadStrategy extends AbstractReadStrategy {
-
-    private SeaTunnelRowType seaTunnelRowTypeInfo;
     private static final long MIN_SIZE = 16 * 1024;
 
     @Override
@@ -78,6 +76,7 @@ public class OrcReadStrategy extends AbstractReadStrategy {
         }
         Configuration configuration = getConfiguration();
         Path filePath = new Path(path);
+        Map<String, String> partitionsMap = parsePartitionsByPath(path);
         OrcFile.ReaderOptions readerOptions = OrcFile.readerOptions(configuration);
         try (Reader reader = OrcFile.createReader(filePath, readerOptions)) {
             TypeDescription schema = reader.getSchema();
@@ -88,7 +87,16 @@ public class OrcReadStrategy extends AbstractReadStrategy {
                 int num = 0;
                 for (int i = 0; i < rowBatch.size; i++) {
                     int numCols = rowBatch.numCols;
-                    Object[] fields = new Object[numCols];
+                    Object[] fields;
+                    if (isMergePartition) {
+                        int index = numCols;
+                        fields = new Object[numCols + partitionsMap.size()];
+                        for (String value : partitionsMap.values()) {
+                            fields[index++] = value;
+                        }
+                    } else {
+                        fields = new Object[numCols];
+                    }
                     ColumnVector[] cols = rowBatch.cols;
                     for (int j = 0; j < numCols; j++) {
                         if (cols[j] == null) {
@@ -97,7 +105,8 @@ public class OrcReadStrategy extends AbstractReadStrategy {
                             fields[j] = readColumn(cols[j], children.get(j), num);
                         }
                     }
-                    output.collect(new SeaTunnelRow(fields));
+                    SeaTunnelRow seaTunnelRow = new SeaTunnelRow(fields);
+                    output.collect(seaTunnelRow);
                     num++;
                 }
             }
@@ -106,9 +115,6 @@ public class OrcReadStrategy extends AbstractReadStrategy {
 
     @Override
     public SeaTunnelRowType getSeaTunnelRowTypeInfo(HadoopConf hadoopConf, String path) throws FilePluginException {
-        if (null != seaTunnelRowTypeInfo) {
-            return seaTunnelRowTypeInfo;
-        }
         Configuration configuration = getConfiguration(hadoopConf);
         OrcFile.ReaderOptions readerOptions = OrcFile.readerOptions(configuration);
         Path dstDir = new Path(path);
@@ -120,8 +126,9 @@ public class OrcReadStrategy extends AbstractReadStrategy {
                 fields[i] = schema.getFieldNames().get(i);
                 types[i] = orcDataType2SeaTunnelDataType(schema.getChildren().get(i));
             }
-            seaTunnelRowTypeInfo = new SeaTunnelRowType(fields, types);
-            return seaTunnelRowTypeInfo;
+            seaTunnelRowType = new SeaTunnelRowType(fields, types);
+            seaTunnelRowTypeWithPartition = mergePartitionTypes(path, seaTunnelRowType);
+            return getActualSeaTunnelRowTypeInfo();
         } catch (IOException e) {
             throw new FilePluginException("Create OrcReader Fail", e);
         }
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
index 951d397b4..442e4cfd7 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
@@ -64,11 +64,11 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 @Slf4j
 public class ParquetReadStrategy extends AbstractReadStrategy {
-    private SeaTunnelRowType seaTunnelRowType;
     private static final byte[] PARQUET_MAGIC = new byte[]{(byte) 'P', (byte) 'A', (byte) 'R', (byte) '1'};
     private static final long NANOS_PER_MILLISECOND = 1000000;
     private static final long MILLIS_PER_DAY = TimeUnit.DAYS.toMillis(1L);
@@ -80,6 +80,7 @@ public class ParquetReadStrategy extends AbstractReadStrategy {
             throw new Exception("please check file type");
         }
         Path filePath = new Path(path);
+        Map<String, String> partitionsMap = parsePartitionsByPath(path);
         HadoopInputFile hadoopInputFile = HadoopInputFile.fromPath(filePath, getConfiguration());
         int fieldsCount = seaTunnelRowType.getTotalFields();
         GenericData dataModel = new GenericData();
@@ -92,12 +93,22 @@ public class ParquetReadStrategy extends AbstractReadStrategy {
                 .withDataModel(dataModel)
                 .build()) {
             while ((record = reader.read()) != null) {
-                Object[] fields = new Object[fieldsCount];
+                Object[] fields;
+                if (isMergePartition) {
+                    int index = fieldsCount;
+                    fields = new Object[fieldsCount + partitionsMap.size()];
+                    for (String value : partitionsMap.values()) {
+                        fields[index++] = value;
+                    }
+                } else {
+                    fields = new Object[fieldsCount];
+                }
                 for (int i = 0; i < fieldsCount; i++) {
                     Object data = record.get(i);
                     fields[i] = resolveObject(data, seaTunnelRowType.getFieldType(i));
                 }
-                output.collect(new SeaTunnelRow(fields));
+                SeaTunnelRow seaTunnelRow = new SeaTunnelRow(fields);
+                output.collect(seaTunnelRow);
             }
         }
     }
@@ -185,9 +196,6 @@ public class ParquetReadStrategy extends AbstractReadStrategy {
 
     @Override
     public SeaTunnelRowType getSeaTunnelRowTypeInfo(HadoopConf hadoopConf, String path) throws FilePluginException {
-        if (seaTunnelRowType != null) {
-            return seaTunnelRowType;
-        }
         Path filePath = new Path(path);
         ParquetMetadata metadata;
         try {
@@ -210,7 +218,8 @@ public class ParquetReadStrategy extends AbstractReadStrategy {
             types[i] = fieldType;
         }
         seaTunnelRowType = new SeaTunnelRowType(fields, types);
-        return seaTunnelRowType;
+        seaTunnelRowTypeWithPartition = mergePartitionTypes(path, seaTunnelRowType);
+        return getActualSeaTunnelRowTypeInfo();
     }
 
     private SeaTunnelDataType<?> parquetType2SeaTunnelType(Type type) {
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java
index ee32bd182..3d1063c51 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java
@@ -45,4 +45,6 @@ public interface ReadStrategy extends Serializable {
     List<String> getFileNamesByPath(HadoopConf hadoopConf, String path) throws IOException;
 
     void setPluginConfig(Config pluginConfig);
+
+    SeaTunnelRowType getActualSeaTunnelRowTypeInfo();
 }
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
index 3ae05b86a..bd59e477d 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
@@ -39,6 +39,7 @@ import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.nio.charset.StandardCharsets;
+import java.util.Map;
 
 public class TextReadStrategy extends AbstractReadStrategy {
     private DeserializationSchema<SeaTunnelRow> deserializationSchema;
@@ -52,10 +53,18 @@ public class TextReadStrategy extends AbstractReadStrategy {
         Configuration conf = getConfiguration();
         FileSystem fs = FileSystem.get(conf);
         Path filePath = new Path(path);
+        Map<String, String> partitionsMap = parsePartitionsByPath(path);
         try (BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(filePath), StandardCharsets.UTF_8))) {
             reader.lines().forEach(line -> {
                 try {
-                    deserializationSchema.deserialize(line.getBytes(), output);
+                    SeaTunnelRow seaTunnelRow = deserializationSchema.deserialize(line.getBytes());
+                    if (isMergePartition) {
+                        int index = seaTunnelRowType.getTotalFields();
+                        for (String value : partitionsMap.values()) {
+                            seaTunnelRow.setField(index++, value);
+                        }
+                    }
+                    output.collect(seaTunnelRow);
                 } catch (IOException e) {
                     String errorMsg = String.format("Deserialize this data [%s] error, please check the origin data", line);
                     throw new RuntimeException(errorMsg);
@@ -67,11 +76,20 @@ public class TextReadStrategy extends AbstractReadStrategy {
     @Override
     public SeaTunnelRowType getSeaTunnelRowTypeInfo(HadoopConf hadoopConf, String path) {
         SeaTunnelRowType simpleSeaTunnelType = SeaTunnelSchema.buildSimpleTextSchema();
-        deserializationSchema = TextDeserializationSchema.builder()
-                .seaTunnelRowType(simpleSeaTunnelType)
-                .delimiter(String.valueOf('\002'))
-                .build();
-        return simpleSeaTunnelType;
+        this.seaTunnelRowType = simpleSeaTunnelType;
+        this.seaTunnelRowTypeWithPartition = mergePartitionTypes(fileNames.get(0), simpleSeaTunnelType);
+        if (isMergePartition) {
+            deserializationSchema = TextDeserializationSchema.builder()
+                    .seaTunnelRowType(this.seaTunnelRowTypeWithPartition)
+                    .delimiter(String.valueOf('\002'))
+                    .build();
+        } else {
+            deserializationSchema = TextDeserializationSchema.builder()
+                    .seaTunnelRowType(this.seaTunnelRowType)
+                    .delimiter(String.valueOf('\002'))
+                    .build();
+        }
+        return getActualSeaTunnelRowTypeInfo();
     }
 
     @Override
@@ -80,7 +98,7 @@ public class TextReadStrategy extends AbstractReadStrategy {
         if (pluginConfig.hasPath(BaseSourceConfig.DELIMITER)) {
             fieldDelimiter = pluginConfig.getString(BaseSourceConfig.DELIMITER);
         } else {
-            FileFormat fileFormat = FileFormat.valueOf(pluginConfig.getString(BaseSourceConfig.FILE_TYPE));
+            FileFormat fileFormat = FileFormat.valueOf(pluginConfig.getString(BaseSourceConfig.FILE_TYPE).toUpperCase());
             if (fileFormat == FileFormat.CSV) {
                 fieldDelimiter = ",";
             }
@@ -94,12 +112,22 @@ public class TextReadStrategy extends AbstractReadStrategy {
         if (pluginConfig.hasPath(BaseSourceConfig.TIME_FORMAT)) {
             timeFormat = TimeUtils.Formatter.parse(pluginConfig.getString(BaseSourceConfig.TIME_FORMAT));
         }
-        deserializationSchema = TextDeserializationSchema.builder()
-                .seaTunnelRowType(seaTunnelRowType)
-                .delimiter(fieldDelimiter)
-                .dateFormatter(dateFormat)
-                .dateTimeFormatter(datetimeFormat)
-                .timeFormatter(timeFormat)
-                .build();
+        if (isMergePartition) {
+            deserializationSchema = TextDeserializationSchema.builder()
+                    .seaTunnelRowType(this.seaTunnelRowTypeWithPartition)
+                    .delimiter(fieldDelimiter)
+                    .dateFormatter(dateFormat)
+                    .dateTimeFormatter(datetimeFormat)
+                    .timeFormatter(timeFormat)
+                    .build();
+        } else {
+            deserializationSchema = TextDeserializationSchema.builder()
+                    .seaTunnelRowType(this.seaTunnelRowType)
+                    .delimiter(fieldDelimiter)
+                    .dateFormatter(dateFormat)
+                    .dateTimeFormatter(datetimeFormat)
+                    .timeFormatter(timeFormat)
+                    .build();
+        }
     }
 }
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java
index fc6c4d420..ab34a6c2b 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.connectors.seatunnel.file.ftp.source;
 
 import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
 import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.constants.PluginType;
@@ -73,10 +74,11 @@ public class FtpFileSource extends BaseFileSource {
                 case TEXT:
                 case JSON:
                     Config schemaConfig = pluginConfig.getConfig(SeaTunnelSchema.SCHEMA);
-                    rowType = SeaTunnelSchema
+                    SeaTunnelRowType userDefinedSchema = SeaTunnelSchema
                             .buildWithConfig(schemaConfig)
                             .getSeaTunnelRowType();
-                    readStrategy.setSeaTunnelRowTypeInfo(rowType);
+                    readStrategy.setSeaTunnelRowTypeInfo(userDefinedSchema);
+                    rowType = readStrategy.getActualSeaTunnelRowTypeInfo();
                     break;
                 case ORC:
                 case PARQUET:
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
index 8cfe9e0df..df04acde4 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.connectors.seatunnel.file.local.source;
 
 import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
 import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.constants.PluginType;
@@ -70,10 +71,11 @@ public class LocalFileSource extends BaseFileSource {
                 case TEXT:
                 case JSON:
                     Config schemaConfig = pluginConfig.getConfig(SeaTunnelSchema.SCHEMA);
-                    rowType = SeaTunnelSchema
+                    SeaTunnelRowType userDefinedSchema = SeaTunnelSchema
                             .buildWithConfig(schemaConfig)
                             .getSeaTunnelRowType();
-                    readStrategy.setSeaTunnelRowTypeInfo(rowType);
+                    readStrategy.setSeaTunnelRowTypeInfo(userDefinedSchema);
+                    rowType = readStrategy.getActualSeaTunnelRowTypeInfo();
                     break;
                 case ORC:
                 case PARQUET:
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java
index a2bd20c0e..a99d0e2ab 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.connectors.seatunnel.file.oss.source;
 
 import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
 import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.constants.PluginType;
@@ -71,10 +72,11 @@ public class OssFileSource extends BaseFileSource {
                 case TEXT:
                 case JSON:
                     Config schemaConfig = pluginConfig.getConfig(SeaTunnelSchema.SCHEMA);
-                    rowType = SeaTunnelSchema
+                    SeaTunnelRowType userDefinedSchema = SeaTunnelSchema
                             .buildWithConfig(schemaConfig)
                             .getSeaTunnelRowType();
-                    readStrategy.setSeaTunnelRowTypeInfo(rowType);
+                    readStrategy.setSeaTunnelRowTypeInfo(userDefinedSchema);
+                    rowType = readStrategy.getActualSeaTunnelRowTypeInfo();
                     break;
                 case ORC:
                 case PARQUET:
diff --git a/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextDeserializationSchema.java b/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextDeserializationSchema.java
index ab3576eb9..3080b1a19 100644
--- a/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextDeserializationSchema.java
+++ b/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextDeserializationSchema.java
@@ -59,7 +59,7 @@ public class TextDeserializationSchema implements DeserializationSchema<SeaTunne
     public SeaTunnelRow deserialize(byte[] message) throws IOException {
         String content = new String(message);
         Map<Integer, String> splitsMap = splitLineBySeaTunnelRowType(content, seaTunnelRowType);
-        Object[] objects = new Object[splitsMap.size()];
+        Object[] objects = new Object[seaTunnelRowType.getTotalFields()];
         for (int i = 0; i < objects.length; i++) {
             objects[i] = convert(splitsMap.get(i), seaTunnelRowType.getFieldType(i));
         }
@@ -80,12 +80,23 @@ public class TextDeserializationSchema implements DeserializationSchema<SeaTunne
             if (fieldTypes[i].getSqlType() == SqlType.ROW) {
                 // row type
                 int totalFields = ((SeaTunnelRowType) fieldTypes[i]).getTotalFields();
-                ArrayList<String> rowSplits = new ArrayList<>(Arrays.asList(splits).subList(cursor, cursor + totalFields));
-                splitsMap.put(i, String.join(delimiter, rowSplits));
+                // if current field is empty
+                if (cursor >= splits.length) {
+                    splitsMap.put(i, null);
+                } else {
+                    ArrayList<String> rowSplits = new ArrayList<>(Arrays.asList(splits).subList(cursor, cursor + totalFields));
+                    splitsMap.put(i, String.join(delimiter, rowSplits));
+                }
                 cursor += totalFields;
             } else {
                 // not row type
-                splitsMap.put(i, splits[cursor++]);
+                // if current field is empty
+                if (cursor >= splits.length) {
+                    splitsMap.put(i, null);
+                    cursor++;
+                } else {
+                    splitsMap.put(i, splits[cursor++]);
+                }
             }
         }
         return splitsMap;