You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/10/31 03:18:35 UTC

[incubator-seatunnel] branch dev updated: [Improve][engine] Improve the number of loops of the method to get the last Latest Pipeline name (#3235)

This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 0b73b5fe1 [Improve][engine] Improve the number of loops of the method to get the last Latest Pipeline name (#3235)
0b73b5fe1 is described below

commit 0b73b5fe100a4724f0560c722a163306cef02711
Author: Kerwin <37...@users.noreply.github.com>
AuthorDate: Mon Oct 31 11:18:30 2022 +0800

    [Improve][engine] Improve the number of loops of the method to get the last Latest Pipeline name (#3235)
    
    * Improve the number of loops of the method to get the last Latest Pipeline name
    
    * fix code style
---
 .../storage/api/AbstractCheckpointStorage.java     | 32 ++++++++++++----------
 .../checkpoint/storage/hdfs/HdfsStorage.java       |  2 +-
 .../storage/localfile/LocalFileStorage.java        | 23 ++++++++--------
 3 files changed, 29 insertions(+), 28 deletions(-)

diff --git a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/api/AbstractCheckpointStorage.java b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/api/AbstractCheckpointStorage.java
index 3e14ea44d..eb3593504 100644
--- a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/api/AbstractCheckpointStorage.java
+++ b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/api/AbstractCheckpointStorage.java
@@ -29,10 +29,11 @@ import org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorag
 import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
+import java.util.Collection;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -41,6 +42,7 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
 
 @Slf4j
 public abstract class AbstractCheckpointStorage implements CheckpointStorage {
@@ -110,24 +112,20 @@ public abstract class AbstractCheckpointStorage implements CheckpointStorage {
         }
     }
 
-    public Set<String> getLatestPipelineNames(List<String> fileNames) {
+    public Set<String> getLatestPipelineNames(Collection<String> fileNames) {
         Map<String, String> latestPipelineMap = new HashMap<>();
+        Map<String, Long> latestPipelineVersionMap = new HashMap<>();
         fileNames.forEach(fileName -> {
-            String[] fileNameSegments = fileName.split(FILE_NAME_SPLIT);
+            String[] fileNameSegments = getFileNameSegments(fileName);
             long fileVersion = Long.parseLong(fileNameSegments[FILE_SORT_ID_INDEX]);
             String filePipelineId = fileNameSegments[FILE_NAME_PIPELINE_ID_INDEX];
-            if (latestPipelineMap.containsKey(filePipelineId)) {
-                long oldVersion = Long.parseLong(latestPipelineMap.get(filePipelineId).split(FILE_NAME_SPLIT)[FILE_SORT_ID_INDEX]);
-                if (fileVersion > oldVersion) {
-                    latestPipelineMap.put(filePipelineId, fileName);
-                }
-            } else {
+            Long oldVersion = latestPipelineVersionMap.get(filePipelineId);
+            if (Objects.isNull(oldVersion) || fileVersion > oldVersion) {
+                latestPipelineVersionMap.put(filePipelineId, fileVersion);
                 latestPipelineMap.put(filePipelineId, fileName);
             }
         });
-        Set<String> latestPipelines = new HashSet<>(latestPipelineMap.size());
-        latestPipelineMap.forEach((pipelineId, fileName) -> latestPipelines.add(fileName));
-        return latestPipelines;
+        return latestPipelineMap.entrySet().stream().map(Map.Entry::getValue).collect(Collectors.toSet());
     }
 
     /**
@@ -140,7 +138,7 @@ public abstract class AbstractCheckpointStorage implements CheckpointStorage {
         AtomicReference<String> latestFileName = new AtomicReference<>();
         AtomicLong latestVersion = new AtomicLong();
         fileNames.forEach(fileName -> {
-            String[] fileNameSegments = fileName.split(FILE_NAME_SPLIT);
+            String[] fileNameSegments = getFileNameSegments(fileName);
             long fileVersion = Long.parseLong(fileNameSegments[FILE_SORT_ID_INDEX]);
             String filePipelineId = fileNameSegments[FILE_NAME_PIPELINE_ID_INDEX];
             if (pipelineId.equals(filePipelineId) && fileVersion > latestVersion.get()) {
@@ -151,6 +149,10 @@ public abstract class AbstractCheckpointStorage implements CheckpointStorage {
         return latestFileName.get();
     }
 
+    private String[] getFileNameSegments(String fileName) {
+        return fileName.split(FILE_NAME_SPLIT);
+    }
+
     /**
      * get the pipeline id of the file name
      *
@@ -158,7 +160,7 @@ public abstract class AbstractCheckpointStorage implements CheckpointStorage {
      * @return the pipeline id of the file.
      */
     public String getPipelineIdByFileName(String fileName) {
-        return fileName.split(FILE_NAME_SPLIT)[FILE_NAME_PIPELINE_ID_INDEX];
+        return getFileNameSegments(fileName)[FILE_NAME_PIPELINE_ID_INDEX];
     }
 
     /**
@@ -168,7 +170,7 @@ public abstract class AbstractCheckpointStorage implements CheckpointStorage {
      * @return the checkpoint id of the file.
      */
     public String getCheckpointIdByFileName(String fileName) {
-        return fileName.split(FILE_NAME_SPLIT)[FILE_NAME_CHECKPOINT_ID_INDEX].split("\\.")[0];
+        return getFileNameSegments(fileName)[FILE_NAME_CHECKPOINT_ID_INDEX].split("\\.")[0];
     }
 
     @Override
diff --git a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorage.java b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorage.java
index 9ac3393e2..598e5ef8e 100644
--- a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorage.java
+++ b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorage.java
@@ -184,7 +184,7 @@ public class HdfsStorage extends AbstractCheckpointStorage {
 
         List<PipelineState> pipelineStates = new ArrayList<>();
         fileNames.forEach(file -> {
-            String filePipelineId = file.split(FILE_NAME_SPLIT)[FILE_NAME_PIPELINE_ID_INDEX];
+            String filePipelineId = getPipelineIdByFileName(file);
             if (pipelineId.equals(filePipelineId)) {
                 try {
                     pipelineStates.add(readPipelineState(file, jobId));
diff --git a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-local-file/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/localfile/LocalFileStorage.java b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-local-file/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/localfile/LocalFileStorage.java
index bc6f0c7e1..c00705e7e 100644
--- a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-local-file/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/localfile/LocalFileStorage.java
+++ b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-local-file/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/localfile/LocalFileStorage.java
@@ -39,6 +39,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
 @Slf4j
@@ -137,20 +138,17 @@ public class LocalFileStorage extends AbstractCheckpointStorage {
         if (fileList.isEmpty()) {
             throw new CheckpointStorageException("No checkpoint found for job " + jobId);
         }
-        List<String> fileNames = fileList.stream().map(File::getName).collect(Collectors.toList());
-        Set<String> latestPipelines = getLatestPipelineNames(fileNames);
+        Map<String, File> fileMap = fileList.stream().collect(Collectors.toMap(File::getName, Function.identity(), (v1, v2) -> v2));
+        Set<String> latestPipelines = getLatestPipelineNames(fileMap.keySet());
         List<PipelineState> latestPipelineFiles = new ArrayList<>(latestPipelines.size());
-        fileList.forEach(file -> {
-            String fileName = file.getName();
-            if (latestPipelines.contains(fileName)) {
-                try {
-                    byte[] data = FileUtils.readFileToByteArray(file);
-                    latestPipelineFiles.add(deserializeCheckPointData(data));
-                } catch (IOException e) {
-                    log.error("Failed to read checkpoint data from file " + file.getAbsolutePath(), e);
-                }
+        latestPipelines.forEach(fileName -> {
+            File file = fileMap.get(fileName);
+            try {
+                byte[] data = FileUtils.readFileToByteArray(file);
+                latestPipelineFiles.add(deserializeCheckPointData(data));
+            } catch (IOException e) {
+                log.error("Failed to read checkpoint data from file " + file.getAbsolutePath(), e);
             }
-
         });
         if (latestPipelineFiles.isEmpty()) {
             throw new CheckpointStorageException("Failed to read checkpoint data from file");
@@ -195,6 +193,7 @@ public class LocalFileStorage extends AbstractCheckpointStorage {
         if (fileList.isEmpty()) {
             throw new CheckpointStorageException("No checkpoint found for job " + jobId);
         }
+
         List<PipelineState> pipelineStates = new ArrayList<>();
         fileList.forEach(file -> {
             String filePipelineId = getPipelineIdByFileName(file.getName());