You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by GitBox <gi...@apache.org> on 2022/10/12 14:23:14 UTC

[GitHub] [incubator-seatunnel] EricJoy2048 commented on a diff in pull request #2985: [Improve][Connector-V2][File] Support parse field from file path

EricJoy2048 commented on code in PR #2985:
URL: https://github.com/apache/incubator-seatunnel/pull/2985#discussion_r993525262


##########
seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java:
##########
@@ -106,5 +118,68 @@ public List<String> getFileNamesByPath(HadoopConf hadoopConf, String path) throw
     @Override
     public void setPluginConfig(Config pluginConfig) {
         this.pluginConfig = pluginConfig;
+        if (pluginConfig.hasPath(BaseSourceConfig.PARSE_PARTITION_FROM_PATH)) {
+            isMergePartition = pluginConfig.getBoolean(BaseSourceConfig.PARSE_PARTITION_FROM_PATH);
+        }
+    }
+
+    @Override
+    public SeaTunnelRowType getActualSeaTunnelRowTypeInfo() {
+        return isMergePartition ? seaTunnelRowTypeWithPartition : seaTunnelRowType;
+    }
+
+    protected Map<String, String> parsePartitionsByPath(String path) {
+        LinkedHashMap<String, String> partitions = new LinkedHashMap<>();
+        Arrays.stream(path.split("/", -1))
+                .filter(split -> split.contains("="))
+                .map(split -> split.split("=", -1))
+                .forEach(kv -> partitions.put(kv[0], kv[1]));
+        return partitions;
+    }
+
+    protected SeaTunnelRow mergePartitionFields(String path, SeaTunnelRow seaTunnelRow) {
+        Map<String, String> partitionsMap = parsePartitionsByPath(path);

Review Comment:
   Each row will call this method `mergePartitionFields ` , we must reduce the execution time of this method. 
   `parsePartitionsByPath` is not necessary to execute every row. It can do once when we open the file.
   



##########
seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java:
##########
@@ -106,5 +118,68 @@ public List<String> getFileNamesByPath(HadoopConf hadoopConf, String path) throw
     @Override
     public void setPluginConfig(Config pluginConfig) {
         this.pluginConfig = pluginConfig;
+        if (pluginConfig.hasPath(BaseSourceConfig.PARSE_PARTITION_FROM_PATH)) {
+            isMergePartition = pluginConfig.getBoolean(BaseSourceConfig.PARSE_PARTITION_FROM_PATH);
+        }
+    }
+
+    @Override
+    public SeaTunnelRowType getActualSeaTunnelRowTypeInfo() {
+        return isMergePartition ? seaTunnelRowTypeWithPartition : seaTunnelRowType;
+    }
+
+    protected Map<String, String> parsePartitionsByPath(String path) {
+        LinkedHashMap<String, String> partitions = new LinkedHashMap<>();
+        Arrays.stream(path.split("/", -1))
+                .filter(split -> split.contains("="))
+                .map(split -> split.split("=", -1))
+                .forEach(kv -> partitions.put(kv[0], kv[1]));
+        return partitions;
+    }
+
+    protected SeaTunnelRow mergePartitionFields(String path, SeaTunnelRow seaTunnelRow) {
+        Map<String, String> partitionsMap = parsePartitionsByPath(path);
+        // get all values of partition fields
+        Object[] partitions = partitionsMap.values().toArray(new Object[0]);
+        // get all values of origin SeaTunnelRow
+        Object[] fields = seaTunnelRow.getFields();
+        // create a new array to merge partition fields and origin fields
+        Object[] objects = new Object[fields.length + partitions.length];
+        // copy origin values to new array
+        System.arraycopy(fields, 0, objects, 0, fields.length);

Review Comment:
   Each row will call this method mergePartitionFields, we must reduce the memory use and memory copy time here.
   I think we can make the fields length of `SeaTunnelRow `  equals to `fields.length + partitions.length` when it create, and then add the partition value to the fields array.
   
   Thank you.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org