You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2022/10/14 07:04:01 UTC
[incubator-seatunnel] branch dev updated: [feature][engine][checkpoint] The checkpoint coordinator supports fault-tolerant recovery (#3092)
This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new c8fc31023 [feature][engine][checkpoint] The checkpoint coordinator supports fault-tolerant recovery (#3092)
c8fc31023 is described below
commit c8fc3102378f4ca4d561c533e0560e0a867100ba
Author: Zongwen Li <zo...@apache.org>
AuthorDate: Fri Oct 14 15:03:55 2022 +0800
[feature][engine][checkpoint] The checkpoint coordinator supports fault-tolerant recovery (#3092)
* [feature][engine][checkpoint] The checkpoint coordinator supports fault-tolerant recovery
* [engine][checkpoint] add CheckpointManager Test case
---
.../core/checkpoint/CheckpointIDCounter.java | 6 +-
.../server/checkpoint/CheckpointCoordinator.java | 18 +++--
.../server/checkpoint/CheckpointManager.java | 57 ++++++++++++++--
.../engine/server/checkpoint/CheckpointPlan.java | 6 --
...IDCounter.java => IMapCheckpointIDCounter.java} | 40 +++++++-----
.../checkpoint/StandaloneCheckpointIDCounter.java | 4 +-
.../server/checkpoint/CheckpointManagerTest.java | 76 ++++++++++++++++++++++
.../storage/api/AbstractCheckpointStorage.java | 2 +-
.../checkpoint/storage/api/CheckpointStorage.java | 11 ++++
.../checkpoint/storage/hdfs/HdfsStorage.java | 20 ++++++
.../storage/localfile/LocalFileStorage.java | 28 +++++++-
11 files changed, 224 insertions(+), 44 deletions(-)
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/checkpoint/CheckpointIDCounter.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/checkpoint/CheckpointIDCounter.java
index cbf728380..7bdc228fc 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/checkpoint/CheckpointIDCounter.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/checkpoint/CheckpointIDCounter.java
@@ -18,14 +18,14 @@
package org.apache.seatunnel.engine.core.checkpoint;
-import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.engine.core.job.PipelineState;
import java.util.concurrent.CompletableFuture;
/** A checkpoint ID counter. */
public interface CheckpointIDCounter {
- int INITIAL_CHECKPOINT_ID = 1;
+ long INITIAL_CHECKPOINT_ID = 1;
/** Starts the {@link CheckpointIDCounter} service down. */
void start() throws Exception;
@@ -38,7 +38,7 @@ public interface CheckpointIDCounter {
*
* @return The {@code CompletableFuture} holding the result of the shutdown operation.
*/
- CompletableFuture<Void> shutdown(JobStatus jobStatus);
+ CompletableFuture<Void> shutdown(PipelineState jobStatus);
/**
* Atomically increments the current checkpoint ID.
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
index 6525e1beb..2a1ef88d7 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
@@ -41,6 +41,8 @@ import org.apache.seatunnel.engine.server.task.record.Barrier;
import org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTaskState;
import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
+import lombok.Getter;
+import lombok.SneakyThrows;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -81,6 +83,7 @@ public class CheckpointCoordinator {
private final CheckpointStorageConfiguration storageConfig;
+ @Getter
private final CheckpointIDCounter checkpointIdCounter;
private final transient Serializer serializer;
@@ -100,7 +103,7 @@ public class CheckpointCoordinator {
private final ArrayDeque<CompletedCheckpoint> completedCheckpoints;
- private volatile CompletedCheckpoint latestCompletedCheckpoint;
+ private volatile CompletedCheckpoint latestCompletedCheckpoint = null;
private final CheckpointCoordinatorConfiguration coordinatorConfig;
@@ -116,13 +119,15 @@ public class CheckpointCoordinator {
/** Flag marking the coordinator as shut down (not accepting any messages any more). */
private volatile boolean shutdown;
+ @SneakyThrows
public CheckpointCoordinator(CheckpointManager manager,
CheckpointStorage checkpointStorage,
CheckpointStorageConfiguration storageConfig,
long jobId,
CheckpointPlan plan,
- CheckpointCoordinatorConfiguration coordinatorConfig) {
-
+ CheckpointCoordinatorConfiguration coordinatorConfig,
+ CheckpointIDCounter checkpointIdCounter,
+ PipelineState pipelineState) {
this.checkpointManager = manager;
this.checkpointStorage = checkpointStorage;
this.storageConfig = storageConfig;
@@ -130,7 +135,6 @@ public class CheckpointCoordinator {
this.pipelineId = plan.getPipelineId();
this.plan = plan;
this.coordinatorConfig = coordinatorConfig;
- this.latestCompletedCheckpoint = plan.getRestoredCheckpoint();
this.tolerableFailureCheckpoints = coordinatorConfig.getTolerableFailureCheckpoints();
this.pendingCheckpoints = new ConcurrentHashMap<>();
this.completedCheckpoints = new ArrayDeque<>(storageConfig.getMaxRetainedCheckpoints() + 1);
@@ -144,8 +148,10 @@ public class CheckpointCoordinator {
this.serializer = new ProtoStuffSerializer();
this.pipelineTasks = getPipelineTasks(plan.getPipelineSubtasks());
this.pipelineTaskStatus = new ConcurrentHashMap<>();
- // TODO: IDCounter SPI
- this.checkpointIdCounter = new StandaloneCheckpointIDCounter();
+ this.checkpointIdCounter = checkpointIdCounter;
+ if (pipelineState != null) {
+ this.latestCompletedCheckpoint = serializer.deserialize(pipelineState.getStates(), CompletedCheckpoint.class);
+ }
}
public int getPipelineId() {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
index b62e1a454..5286c1aae 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
@@ -18,13 +18,19 @@
package org.apache.seatunnel.engine.server.checkpoint;
import org.apache.seatunnel.api.table.factory.FactoryUtil;
+import org.apache.seatunnel.engine.checkpoint.storage.PipelineState;
import org.apache.seatunnel.engine.checkpoint.storage.api.CheckpointStorage;
import org.apache.seatunnel.engine.checkpoint.storage.api.CheckpointStorageFactory;
import org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException;
+import org.apache.seatunnel.engine.common.utils.ExceptionUtil;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
+import org.apache.seatunnel.engine.core.checkpoint.CheckpointIDCounter;
import org.apache.seatunnel.engine.core.dag.actions.Action;
+import org.apache.seatunnel.engine.core.job.Job;
+import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.server.checkpoint.operation.TaskAcknowledgeOperation;
import org.apache.seatunnel.engine.server.checkpoint.operation.TaskReportStatusOperation;
+import org.apache.seatunnel.engine.server.dag.execution.Pipeline;
import org.apache.seatunnel.engine.server.dag.physical.SubPlan;
import org.apache.seatunnel.engine.server.execution.ExecutionState;
import org.apache.seatunnel.engine.server.execution.Task;
@@ -36,12 +42,14 @@ import org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTaskState;
import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
import com.hazelcast.cluster.Address;
+import com.hazelcast.map.IMap;
import com.hazelcast.spi.impl.NodeEngine;
import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
import lombok.extern.slf4j.Slf4j;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -78,14 +86,30 @@ public class CheckpointManager {
this.subtaskWithAddresses = new HashMap<>();
this.checkpointStorage = FactoryUtil.discoverFactory(Thread.currentThread().getContextClassLoader(), CheckpointStorageFactory.class, storageConfig.getStorage())
.create(new HashMap<>());
+ IMap<Integer, Long> checkpointIdMap = nodeEngine.getHazelcastInstance().getMap(String.format("checkpoint-id-%d", jobId));
this.coordinatorMap = checkpointPlanMap.values().parallelStream()
- .map(plan -> new CheckpointCoordinator(this,
- checkpointStorage,
- storageConfig,
- jobId,
- plan,
- coordinatorConfig)
- ).collect(Collectors.toMap(CheckpointCoordinator::getPipelineId, Function.identity()));
+ .map(plan -> {
+ IMapCheckpointIDCounter idCounter = new IMapCheckpointIDCounter(plan.getPipelineId(), checkpointIdMap);
+ try {
+ idCounter.start();
+ // TODO: support savepoint
+ PipelineState pipelineState = null;
+ if (idCounter.get() != CheckpointIDCounter.INITIAL_CHECKPOINT_ID) {
+ pipelineState = checkpointStorage.getCheckpoint(String.valueOf(jobId), String.valueOf(plan.getPipelineId()), String.valueOf(idCounter.get() - 1));
+ }
+ return new CheckpointCoordinator(this,
+ checkpointStorage,
+ storageConfig,
+ jobId,
+ plan,
+ coordinatorConfig,
+ idCounter,
+ pipelineState);
+ } catch (Exception e) {
+ ExceptionUtil.sneakyThrow(e);
+ }
+ throw new RuntimeException("Never throw here.");
+ }).collect(Collectors.toMap(CheckpointCoordinator::getPipelineId, Function.identity()));
}
/**
@@ -147,6 +171,25 @@ public class CheckpointManager {
}
}
+ /**
+ * Called by the JobMaster.
+ * <br> Listen to the {@link org.apache.seatunnel.engine.core.job.PipelineState} of the {@link Pipeline}, which is used to shut down the running {@link CheckpointIDCounter} at the end of the pipeline.
+ */
+ public CompletableFuture<Void> listenPipeline(int pipelineId, org.apache.seatunnel.engine.core.job.PipelineState pipelineStatus) {
+ return getCheckpointCoordinator(pipelineId).getCheckpointIdCounter().shutdown(pipelineStatus);
+ }
+
+ /**
+ * Called by the JobMaster.
+ * <br> Listen to the {@link JobStatus} of the {@link Job}.
+ */
+ public CompletableFuture<Void> shutdown(JobStatus jobStatus) {
+ if (jobStatus == JobStatus.FINISHED) {
+ checkpointStorage.deleteCheckpoint(jobId + "");
+ }
+ return CompletableFuture.completedFuture(null);
+ }
+
/**
* Called by the JobMaster.
* <br> Returns whether the pipeline has completed; No need to deploy/restore the {@link SubPlan} if the pipeline has been completed;
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlan.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlan.java
index c2f7ab1f5..0dd1c0dc5 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlan.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlan.java
@@ -49,11 +49,6 @@ public class CheckpointPlan {
*/
private final Set<TaskLocation> startingSubtasks;
- /**
- * Restored completed checkpoint.
- */
- private final CompletedCheckpoint restoredCheckpoint;
-
/**
* All actions in this pipeline.
* <br> key: the action id;
@@ -64,7 +59,6 @@ public class CheckpointPlan {
public static final class Builder {
private final Set<TaskLocation> pipelineSubtasks = new HashSet<>();
private final Set<TaskLocation> startingSubtasks = new HashSet<>();
- private CompletedCheckpoint restoredCheckpoint;
private final Map<Long, Integer> pipelineActions = new HashMap<>();
private Builder() {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/StandaloneCheckpointIDCounter.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/IMapCheckpointIDCounter.java
similarity index 55%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/StandaloneCheckpointIDCounter.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/IMapCheckpointIDCounter.java
index e656fca4f..c98a8a69a 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/StandaloneCheckpointIDCounter.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/IMapCheckpointIDCounter.java
@@ -18,45 +18,49 @@
package org.apache.seatunnel.engine.server.checkpoint;
import org.apache.seatunnel.engine.core.checkpoint.CheckpointIDCounter;
-import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.engine.core.job.PipelineState;
+
+import com.hazelcast.map.IMap;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicLong;
-public class StandaloneCheckpointIDCounter implements CheckpointIDCounter {
+public class IMapCheckpointIDCounter implements CheckpointIDCounter {
+ private final Integer pipelineId;
+ private final IMap<Integer, Long> checkpointIdMap;
- private final AtomicLong checkpointIdCounter = new AtomicLong(INITIAL_CHECKPOINT_ID);
+ public IMapCheckpointIDCounter(Integer pipelineId,
+ IMap<Integer, Long> checkpointIdMap) {
+ this.pipelineId = pipelineId;
+ this.checkpointIdMap = checkpointIdMap;
+ }
@Override
public void start() throws Exception {
+ checkpointIdMap.putIfAbsent(pipelineId, INITIAL_CHECKPOINT_ID);
}
@Override
- public CompletableFuture<Void> shutdown(JobStatus jobStatus) {
+ public CompletableFuture<Void> shutdown(PipelineState pipelineStatus) {
+ if (pipelineStatus.isEndState()) {
+ checkpointIdMap.remove(pipelineId);
+ }
return CompletableFuture.completedFuture(null);
}
@Override
public long getAndIncrement() throws Exception {
- return checkpointIdCounter.getAndIncrement();
+ Long currentId = checkpointIdMap.get(pipelineId);
+ checkpointIdMap.put(pipelineId, currentId + 1);
+ return currentId;
}
@Override
public long get() {
- return checkpointIdCounter.get();
+ return checkpointIdMap.get(pipelineId);
}
@Override
- public void setCount(long newCount) {
- checkpointIdCounter.set(newCount);
- }
-
- /**
- * Returns the last checkpoint ID (current - 1).
- *
- * @return Last checkpoint ID.
- */
- public long getLast() {
- return checkpointIdCounter.get() - 1;
+ public void setCount(long newId) throws Exception {
+ checkpointIdMap.put(pipelineId, newId);
}
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/StandaloneCheckpointIDCounter.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/StandaloneCheckpointIDCounter.java
index e656fca4f..d1ad67a87 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/StandaloneCheckpointIDCounter.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/StandaloneCheckpointIDCounter.java
@@ -18,7 +18,7 @@
package org.apache.seatunnel.engine.server.checkpoint;
import org.apache.seatunnel.engine.core.checkpoint.CheckpointIDCounter;
-import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.engine.core.job.PipelineState;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
@@ -32,7 +32,7 @@ public class StandaloneCheckpointIDCounter implements CheckpointIDCounter {
}
@Override
- public CompletableFuture<Void> shutdown(JobStatus jobStatus) {
+ public CompletableFuture<Void> shutdown(PipelineState pipelineStatus) {
return CompletableFuture.completedFuture(null);
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManagerTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManagerTest.java
new file mode 100644
index 000000000..1801def2b
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManagerTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.checkpoint;
+
+import org.apache.seatunnel.api.table.factory.FactoryUtil;
+import org.apache.seatunnel.engine.checkpoint.storage.PipelineState;
+import org.apache.seatunnel.engine.checkpoint.storage.api.CheckpointStorage;
+import org.apache.seatunnel.engine.checkpoint.storage.api.CheckpointStorageFactory;
+import org.apache.seatunnel.engine.checkpoint.storage.common.ProtoStuffSerializer;
+import org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException;
+import org.apache.seatunnel.engine.core.checkpoint.CheckpointType;
+import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
+
+import com.hazelcast.map.IMap;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledOnOs;
+import org.junit.jupiter.api.condition.OS;
+
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+@DisabledOnOs(OS.WINDOWS)
+public class CheckpointManagerTest extends AbstractSeaTunnelServerTest {
+
+ @Test
+ public void testHAByIMapCheckpointIDCounter() throws CheckpointStorageException {
+ long jobId = (long) (Math.random() * 1000000L);
+ CheckpointStorage checkpointStorage = FactoryUtil.discoverFactory(Thread.currentThread().getContextClassLoader(), CheckpointStorageFactory.class,
+ CheckpointStorageConfiguration.builder().build().getStorage())
+ .create(new HashMap<>());
+ CompletedCheckpoint completedCheckpoint = new CompletedCheckpoint(jobId, 1, 1,
+ Instant.now().toEpochMilli(),
+ CheckpointType.COMPLETED_POINT_TYPE,
+ Instant.now().toEpochMilli(),
+ new HashMap<>(),
+ new HashMap<>());
+ checkpointStorage.storeCheckPoint(PipelineState.builder().jobId(jobId + "").pipelineId(1).checkpointId(1)
+ .states(new ProtoStuffSerializer().serialize(completedCheckpoint)).build());
+ IMap<Integer, Long> checkpointIdMap = nodeEngine.getHazelcastInstance().getMap("checkpoint-id-" + jobId);
+ checkpointIdMap.put(1, 2L);
+ Map<Integer, CheckpointPlan> planMap = new HashMap<>();
+ planMap.put(1, CheckpointPlan.builder().pipelineId(1).build());
+ CheckpointManager checkpointManager = new CheckpointManager(
+ jobId,
+ nodeEngine,
+ planMap,
+ CheckpointCoordinatorConfiguration.builder().build(),
+ CheckpointStorageConfiguration.builder().build());
+ Assertions.assertTrue(checkpointManager.isCompletedPipeline(1));
+ CompletableFuture<Void> future = checkpointManager.listenPipeline(1, org.apache.seatunnel.engine.core.job.PipelineState.FINISHED);
+ future.join();
+ Assertions.assertNull(checkpointIdMap.get(1));
+ future = checkpointManager.shutdown(JobStatus.FINISHED);
+ future.join();
+ Assertions.assertTrue(checkpointStorage.getAllCheckpoints(jobId + "").isEmpty());
+ }
+}
diff --git a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/api/AbstractCheckpointStorage.java b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/api/AbstractCheckpointStorage.java
index 9dd791d49..3e14ea44d 100644
--- a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/api/AbstractCheckpointStorage.java
+++ b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/api/AbstractCheckpointStorage.java
@@ -168,7 +168,7 @@ public abstract class AbstractCheckpointStorage implements CheckpointStorage {
* @return the checkpoint id of the file.
*/
public String getCheckpointIdByFileName(String fileName) {
- return fileName.split(FILE_NAME_SPLIT)[FILE_NAME_CHECKPOINT_ID_INDEX];
+ return fileName.split(FILE_NAME_SPLIT)[FILE_NAME_CHECKPOINT_ID_INDEX].split("\\.")[0];
}
@Override
diff --git a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/api/CheckpointStorage.java b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/api/CheckpointStorage.java
index 814df74c1..7478f5362 100644
--- a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/api/CheckpointStorage.java
+++ b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/api/CheckpointStorage.java
@@ -95,6 +95,17 @@ public interface CheckpointStorage {
*/
void deleteCheckpoint(String jobId);
+ /**
+ * get checkpoint state
+ *
+ * @param jobId job id
+ * @param pipelineId pipeline id
+ * @param checkpointId checkpoint id
+ * @return checkpoint state
+ * @throws CheckpointStorageException get checkpoint failed
+ */
+ PipelineState getCheckpoint(String jobId, String pipelineId, String checkpointId) throws CheckpointStorageException;
+
/**
* Delete the checkpoint data.
*
diff --git a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorage.java b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorage.java
index cd9190e9b..9ac3393e2 100644
--- a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorage.java
+++ b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorage.java
@@ -206,6 +206,26 @@ public class HdfsStorage extends AbstractCheckpointStorage {
}
}
+ @Override
+ public PipelineState getCheckpoint(String jobId, String pipelineId, String checkpointId) throws CheckpointStorageException {
+ String path = getStorageParentDirectory() + jobId;
+ List<String> fileNames = getFileNames(path);
+ if (fileNames.isEmpty()) {
+ throw new CheckpointStorageException("No checkpoint found for job, job id is: " + jobId);
+ }
+ for (String fileName : fileNames) {
+ if (pipelineId.equals(getPipelineIdByFileName(fileName)) &&
+ checkpointId.equals(getCheckpointIdByFileName(fileName))) {
+ try {
+ return readPipelineState(fileName, jobId);
+ } catch (Exception e) {
+ log.error("Failed to get checkpoint {} for job {}, pipeline {}", checkpointId, jobId, pipelineId, e);
+ }
+ }
+ }
+ throw new CheckpointStorageException(String.format("No checkpoint found, job(%s), pipeline(%s), checkpoint(%s)", jobId, pipelineId, checkpointId));
+ }
+
@Override
public void deleteCheckpoint(String jobId, String pipelineId, String checkpointId) throws CheckpointStorageException {
String path = getStorageParentDirectory() + jobId;
diff --git a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-local-file/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/localfile/LocalFileStorage.java b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-local-file/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/localfile/LocalFileStorage.java
index 1396b6a75..bc6f0c7e1 100644
--- a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-local-file/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/localfile/LocalFileStorage.java
+++ b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-local-file/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/localfile/LocalFileStorage.java
@@ -105,9 +105,14 @@ public class LocalFileStorage extends AbstractCheckpointStorage {
@Override
public List<PipelineState> getAllCheckpoints(String jobId) throws CheckpointStorageException {
+ File filePath = new File(getStorageParentDirectory() + jobId);
+ if (!filePath.exists()) {
+ return new ArrayList<>();
+ }
+
Collection<File> fileList;
try {
- fileList = FileUtils.listFiles(new File(getStorageParentDirectory() + jobId), FILE_EXTENSIONS, true);
+ fileList = FileUtils.listFiles(filePath, FILE_EXTENSIONS, true);
} catch (Exception e) {
throw new CheckpointStorageException("Failed to get all checkpoints for job " + jobId, e);
}
@@ -216,6 +221,27 @@ public class LocalFileStorage extends AbstractCheckpointStorage {
}
}
+ @Override
+ public PipelineState getCheckpoint(String jobId, String pipelineId, String checkpointId) throws CheckpointStorageException {
+ Collection<File> fileList = FileUtils.listFiles(new File(getStorageParentDirectory() + jobId), FILE_EXTENSIONS, false);
+ if (fileList.isEmpty()) {
+ throw new CheckpointStorageException("No checkpoint found for job " + jobId);
+ }
+ for (File file : fileList) {
+ String fileName = file.getName();
+ if (pipelineId.equals(getPipelineIdByFileName(fileName)) &&
+ checkpointId.equals(getCheckpointIdByFileName(fileName))) {
+ try {
+ byte[] data = FileUtils.readFileToByteArray(file);
+ return deserializeCheckPointData(data);
+ } catch (Exception e) {
+ log.error("Failed to delete checkpoint {} for job {}, pipeline {}", checkpointId, jobId, pipelineId, e);
+ }
+ }
+ }
+ throw new CheckpointStorageException(String.format("No checkpoint found, job(%s), pipeline(%s), checkpoint(%s)", jobId, pipelineId, checkpointId));
+ }
+
@Override
public void deleteCheckpoint(String jobId, String pipelineId, String checkpointId) throws CheckpointStorageException {
Collection<File> fileList = FileUtils.listFiles(new File(getStorageParentDirectory() + jobId), FILE_EXTENSIONS, false);