You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/10/31 03:18:35 UTC
[incubator-seatunnel] branch dev updated: [Improve][engine] Improve the number of loops of the method to get the last Latest Pipeline name (#3235)
This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 0b73b5fe1 [Improve][engine] Improve the number of loops of the method to get the last Latest Pipeline name (#3235)
0b73b5fe1 is described below
commit 0b73b5fe100a4724f0560c722a163306cef02711
Author: Kerwin <37...@users.noreply.github.com>
AuthorDate: Mon Oct 31 11:18:30 2022 +0800
[Improve][engine] Improve the number of loops of the method to get the last Latest Pipeline name (#3235)
* Improve the number of loops of the method to get the last Latest Pipeline name
* fix code style
---
.../storage/api/AbstractCheckpointStorage.java | 32 ++++++++++++----------
.../checkpoint/storage/hdfs/HdfsStorage.java | 2 +-
.../storage/localfile/LocalFileStorage.java | 23 ++++++++--------
3 files changed, 29 insertions(+), 28 deletions(-)
diff --git a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/api/AbstractCheckpointStorage.java b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/api/AbstractCheckpointStorage.java
index 3e14ea44d..eb3593504 100644
--- a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/api/AbstractCheckpointStorage.java
+++ b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/api/AbstractCheckpointStorage.java
@@ -29,10 +29,11 @@ import org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorag
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
+import java.util.Collection;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
@@ -41,6 +42,7 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
@Slf4j
public abstract class AbstractCheckpointStorage implements CheckpointStorage {
@@ -110,24 +112,20 @@ public abstract class AbstractCheckpointStorage implements CheckpointStorage {
}
}
- public Set<String> getLatestPipelineNames(List<String> fileNames) {
+ public Set<String> getLatestPipelineNames(Collection<String> fileNames) {
Map<String, String> latestPipelineMap = new HashMap<>();
+ Map<String, Long> latestPipelineVersionMap = new HashMap<>();
fileNames.forEach(fileName -> {
- String[] fileNameSegments = fileName.split(FILE_NAME_SPLIT);
+ String[] fileNameSegments = getFileNameSegments(fileName);
long fileVersion = Long.parseLong(fileNameSegments[FILE_SORT_ID_INDEX]);
String filePipelineId = fileNameSegments[FILE_NAME_PIPELINE_ID_INDEX];
- if (latestPipelineMap.containsKey(filePipelineId)) {
- long oldVersion = Long.parseLong(latestPipelineMap.get(filePipelineId).split(FILE_NAME_SPLIT)[FILE_SORT_ID_INDEX]);
- if (fileVersion > oldVersion) {
- latestPipelineMap.put(filePipelineId, fileName);
- }
- } else {
+ Long oldVersion = latestPipelineVersionMap.get(filePipelineId);
+ if (Objects.isNull(oldVersion) || fileVersion > oldVersion) {
+ latestPipelineVersionMap.put(filePipelineId, fileVersion);
latestPipelineMap.put(filePipelineId, fileName);
}
});
- Set<String> latestPipelines = new HashSet<>(latestPipelineMap.size());
- latestPipelineMap.forEach((pipelineId, fileName) -> latestPipelines.add(fileName));
- return latestPipelines;
+ return latestPipelineMap.entrySet().stream().map(Map.Entry::getValue).collect(Collectors.toSet());
}
/**
@@ -140,7 +138,7 @@ public abstract class AbstractCheckpointStorage implements CheckpointStorage {
AtomicReference<String> latestFileName = new AtomicReference<>();
AtomicLong latestVersion = new AtomicLong();
fileNames.forEach(fileName -> {
- String[] fileNameSegments = fileName.split(FILE_NAME_SPLIT);
+ String[] fileNameSegments = getFileNameSegments(fileName);
long fileVersion = Long.parseLong(fileNameSegments[FILE_SORT_ID_INDEX]);
String filePipelineId = fileNameSegments[FILE_NAME_PIPELINE_ID_INDEX];
if (pipelineId.equals(filePipelineId) && fileVersion > latestVersion.get()) {
@@ -151,6 +149,10 @@ public abstract class AbstractCheckpointStorage implements CheckpointStorage {
return latestFileName.get();
}
+ private String[] getFileNameSegments(String fileName) {
+ return fileName.split(FILE_NAME_SPLIT);
+ }
+
/**
* get the pipeline id of the file name
*
@@ -158,7 +160,7 @@ public abstract class AbstractCheckpointStorage implements CheckpointStorage {
* @return the pipeline id of the file.
*/
public String getPipelineIdByFileName(String fileName) {
- return fileName.split(FILE_NAME_SPLIT)[FILE_NAME_PIPELINE_ID_INDEX];
+ return getFileNameSegments(fileName)[FILE_NAME_PIPELINE_ID_INDEX];
}
/**
@@ -168,7 +170,7 @@ public abstract class AbstractCheckpointStorage implements CheckpointStorage {
* @return the checkpoint id of the file.
*/
public String getCheckpointIdByFileName(String fileName) {
- return fileName.split(FILE_NAME_SPLIT)[FILE_NAME_CHECKPOINT_ID_INDEX].split("\\.")[0];
+ return getFileNameSegments(fileName)[FILE_NAME_CHECKPOINT_ID_INDEX].split("\\.")[0];
}
@Override
diff --git a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorage.java b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorage.java
index 9ac3393e2..598e5ef8e 100644
--- a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorage.java
+++ b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorage.java
@@ -184,7 +184,7 @@ public class HdfsStorage extends AbstractCheckpointStorage {
List<PipelineState> pipelineStates = new ArrayList<>();
fileNames.forEach(file -> {
- String filePipelineId = file.split(FILE_NAME_SPLIT)[FILE_NAME_PIPELINE_ID_INDEX];
+ String filePipelineId = getPipelineIdByFileName(file);
if (pipelineId.equals(filePipelineId)) {
try {
pipelineStates.add(readPipelineState(file, jobId));
diff --git a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-local-file/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/localfile/LocalFileStorage.java b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-local-file/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/localfile/LocalFileStorage.java
index bc6f0c7e1..c00705e7e 100644
--- a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-local-file/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/localfile/LocalFileStorage.java
+++ b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-local-file/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/localfile/LocalFileStorage.java
@@ -39,6 +39,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
import java.util.stream.Collectors;
@Slf4j
@@ -137,20 +138,17 @@ public class LocalFileStorage extends AbstractCheckpointStorage {
if (fileList.isEmpty()) {
throw new CheckpointStorageException("No checkpoint found for job " + jobId);
}
- List<String> fileNames = fileList.stream().map(File::getName).collect(Collectors.toList());
- Set<String> latestPipelines = getLatestPipelineNames(fileNames);
+ Map<String, File> fileMap = fileList.stream().collect(Collectors.toMap(File::getName, Function.identity(), (v1, v2) -> v2));
+ Set<String> latestPipelines = getLatestPipelineNames(fileMap.keySet());
List<PipelineState> latestPipelineFiles = new ArrayList<>(latestPipelines.size());
- fileList.forEach(file -> {
- String fileName = file.getName();
- if (latestPipelines.contains(fileName)) {
- try {
- byte[] data = FileUtils.readFileToByteArray(file);
- latestPipelineFiles.add(deserializeCheckPointData(data));
- } catch (IOException e) {
- log.error("Failed to read checkpoint data from file " + file.getAbsolutePath(), e);
- }
+ latestPipelines.forEach(fileName -> {
+ File file = fileMap.get(fileName);
+ try {
+ byte[] data = FileUtils.readFileToByteArray(file);
+ latestPipelineFiles.add(deserializeCheckPointData(data));
+ } catch (IOException e) {
+ log.error("Failed to read checkpoint data from file " + file.getAbsolutePath(), e);
}
-
});
if (latestPipelineFiles.isEmpty()) {
throw new CheckpointStorageException("Failed to read checkpoint data from file");
@@ -195,6 +193,7 @@ public class LocalFileStorage extends AbstractCheckpointStorage {
if (fileList.isEmpty()) {
throw new CheckpointStorageException("No checkpoint found for job " + jobId);
}
+
List<PipelineState> pipelineStates = new ArrayList<>();
fileList.forEach(file -> {
String filePipelineId = getPipelineIdByFileName(file.getName());