You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/08/08 01:05:17 UTC

[incubator-seatunnel] branch dev updated: [Improve][Connector-V2] Optimize the code structure (#2380)

This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 7376ec7ab [Improve][Connector-V2] Optimize the code structure (#2380)
7376ec7ab is described below

commit 7376ec7ab1bf02e56d89f051eedb827305736af5
Author: TyrantLucifer <Ty...@gmail.com>
AuthorDate: Mon Aug 8 09:05:11 2022 +0800

    [Improve][Connector-V2] Optimize the code structure (#2380)
    
    1. Optimize list traversal
    2. Optimize map key judge logic
    3. Add type for array list
    4. Make class attribute final
---
 .../file/sink/FileSinkAggregatedCommitter.java         | 18 ++++++------------
 1 file changed, 6 insertions(+), 12 deletions(-)

diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/FileSinkAggregatedCommitter.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/FileSinkAggregatedCommitter.java
index 3c7c8cf9c..4d68d75ad 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/FileSinkAggregatedCommitter.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/FileSinkAggregatedCommitter.java
@@ -34,8 +34,7 @@ import java.util.Set;
 
 public class FileSinkAggregatedCommitter implements SinkAggregatedCommitter<FileCommitInfo, FileAggregatedCommitInfo> {
     private static final Logger LOGGER = LoggerFactory.getLogger(FileSinkAggregatedCommitter.class);
-
-    private FileSystemCommitter fileSystemCommitter;
+    private final FileSystemCommitter fileSystemCommitter;
 
     public FileSinkAggregatedCommitter(@NonNull FileSystemCommitter fileSystemCommitter) {
         this.fileSystemCommitter = fileSystemCommitter;
@@ -46,8 +45,8 @@ public class FileSinkAggregatedCommitter implements SinkAggregatedCommitter<File
         if (aggregatedCommitInfoList == null || aggregatedCommitInfoList.size() == 0) {
             return null;
         }
-        List errorAggregatedCommitInfoList = new ArrayList();
-        aggregatedCommitInfoList.stream().forEach(aggregateCommitInfo -> {
+        List<FileAggregatedCommitInfo> errorAggregatedCommitInfoList = new ArrayList<>();
+        aggregatedCommitInfoList.forEach(aggregateCommitInfo -> {
             try {
                 fileSystemCommitter.commitTransaction(aggregateCommitInfo);
             } catch (Exception e) {
@@ -66,12 +65,8 @@ public class FileSinkAggregatedCommitter implements SinkAggregatedCommitter<File
         }
         Map<String, Map<String, String>> aggregateCommitInfo = new HashMap<>();
         Map<String, List<String>> partitionDirAndValsMap = new HashMap<>();
-        commitInfos.stream().forEach(commitInfo -> {
-            Map<String, String> needMoveFileMap = aggregateCommitInfo.get(commitInfo.getTransactionDir());
-            if (needMoveFileMap == null) {
-                needMoveFileMap = new HashMap<>();
-                aggregateCommitInfo.put(commitInfo.getTransactionDir(), needMoveFileMap);
-            }
+        commitInfos.forEach(commitInfo -> {
+            Map<String, String> needMoveFileMap = aggregateCommitInfo.computeIfAbsent(commitInfo.getTransactionDir(), k -> new HashMap<>());
             needMoveFileMap.putAll(commitInfo.getNeedMoveFiles());
             Set<Map.Entry<String, List<String>>> entries = commitInfo.getPartitionDirAndValsMap().entrySet();
             if (!CollectionUtils.isEmpty(entries)) {
@@ -86,10 +81,9 @@ public class FileSinkAggregatedCommitter implements SinkAggregatedCommitter<File
         if (aggregatedCommitInfoList == null || aggregatedCommitInfoList.size() == 0) {
             return;
         }
-        aggregatedCommitInfoList.stream().forEach(aggregateCommitInfo -> {
+        aggregatedCommitInfoList.forEach(aggregateCommitInfo -> {
             try {
                 fileSystemCommitter.abortTransaction(aggregateCommitInfo);
-
             } catch (Exception e) {
                 LOGGER.error("abort aggregateCommitInfo error ", e);
             }