You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by wa...@apache.org on 2023/12/07 02:30:56 UTC
(seatunnel) branch dev updated: [Feature][Zeta] add new job status `DOING_SAVEPOINT` and `SAVEPOINT_DONE` (#5917)
This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 87db4d1f8b [Feature][Zeta] add new job status `DOING_SAVEPOINT` and `SAVEPOINT_DONE` (#5917)
87db4d1f8b is described below
commit 87db4d1f8bfc6273a727d5ed77e642a4fbee5e72
Author: Jia Fan <fa...@qq.com>
AuthorDate: Thu Dec 7 10:30:51 2023 +0800
[Feature][Zeta] add new job status `DOING_SAVEPOINT` and `SAVEPOINT_DONE` (#5917)
---
.../org/apache/seatunnel/engine/e2e/RestApiIT.java | 2 +-
.../engine/client/SeaTunnelClientTest.java | 2 +-
.../seatunnel/engine/core/job/JobStatus.java | 6 +++++
.../engine/server/CoordinatorService.java | 11 ++++++--
.../server/checkpoint/CheckpointManager.java | 16 ++++++------
.../engine/server/dag/physical/PhysicalPlan.java | 29 +++++++++++++++++-----
.../seatunnel/engine/server/master/JobMaster.java | 7 +++---
.../engine/server/checkpoint/SavePointTest.java | 8 +++++-
.../engine/server/master/JobMetricsTest.java | 2 +-
9 files changed, 60 insertions(+), 23 deletions(-)
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
index 0c5b7d2f6f..fedfa4ce25 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
@@ -203,7 +203,7 @@ public class RestApiIT {
.untilAsserted(
() ->
Assertions.assertEquals(
- JobStatus.FINISHED,
+ JobStatus.SAVEPOINT_DONE,
seaTunnelServer
.getCoordinatorService()
.getJobStatus(Long.parseLong(jobId))));
diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
index ea76c28b48..0fbbe80572 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
@@ -406,7 +406,7 @@ public class SeaTunnelClientTest {
.untilAsserted(
() ->
Assertions.assertEquals(
- "FINISHED", jobClient.getJobStatus(jobId)));
+ "SAVEPOINT_DONE", jobClient.getJobStatus(jobId)));
Thread.sleep(1000);
seaTunnelClient
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobStatus.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobStatus.java
index e9801629b1..ed3cd50bb3 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobStatus.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobStatus.java
@@ -44,6 +44,12 @@ public enum JobStatus {
/** The job has failed with a non-recoverable task failure. */
FAILED(EndState.GLOBALLY),
+ /** Job is being savepoint. */
+ DOING_SAVEPOINT(EndState.NOT_END),
+
+ /** Job has been savepoint. */
+ SAVEPOINT_DONE(EndState.GLOBALLY),
+
/** Job is being cancelled. */
CANCELING(EndState.NOT_END),
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
index d92f40ce39..98b5f03b3a 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
@@ -453,8 +453,15 @@ public class CoordinatorService {
logger.warning(throwable);
voidCompletableFuture.completeExceptionally(throwable);
} else {
- JobMaster jobMaster = runningJobMasterMap.get(jobId);
- voidCompletableFuture = jobMaster.savePoint();
+ voidCompletableFuture =
+ new PassiveCompletableFuture<>(
+ CompletableFuture.supplyAsync(
+ () -> {
+ JobMaster runningJobMaster = runningJobMasterMap.get(jobId);
+ runningJobMaster.savePoint().join();
+ return null;
+ },
+ executorService));
}
return new PassiveCompletableFuture<>(voidCompletableFuture);
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
index cd58da1dd9..941af4b791 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
@@ -112,16 +112,16 @@ public class CheckpointManager {
jobId, plan.getPipelineId(), nodeEngine);
try {
idCounter.start();
- PipelineState pipelineState =
- checkpointStorage
- .getLatestCheckpointByJobIdAndPipelineId(
- String.valueOf(jobId),
- String.valueOf(
- plan.getPipelineId()));
- if (pipelineState != null) {
+ PipelineState pipelineState = null;
+ if (isStartWithSavePoint) {
+ pipelineState =
+ checkpointStorage
+ .getLatestCheckpointByJobIdAndPipelineId(
+ String.valueOf(jobId),
+ String.valueOf(
+ plan.getPipelineId()));
long checkpointId = pipelineState.getCheckpointId();
idCounter.setCount(checkpointId + 1);
-
log.info(
"pipeline({}) start with savePoint on checkPointId({})",
plan.getPipelineId(),
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
index 72de3f8658..ba169d6c2e 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
@@ -45,11 +45,11 @@ public class PhysicalPlan {
private final List<SubPlan> pipelineList;
- private AtomicInteger finishedPipelineNum = new AtomicInteger(0);
+ private final AtomicInteger finishedPipelineNum = new AtomicInteger(0);
- private AtomicInteger canceledPipelineNum = new AtomicInteger(0);
+ private final AtomicInteger canceledPipelineNum = new AtomicInteger(0);
- private AtomicInteger failedPipelineNum = new AtomicInteger(0);
+ private final AtomicInteger failedPipelineNum = new AtomicInteger(0);
private final JobImmutableInformation jobImmutableInformation;
@@ -85,7 +85,7 @@ public class PhysicalPlan {
@NonNull ExecutorService executorService,
@NonNull JobImmutableInformation jobImmutableInformation,
long initializationTimestamp,
- @NonNull IMap runningJobStateIMap,
+ @NonNull IMap<Object, Object> runningJobStateIMap,
@NonNull IMap runningJobStateTimestampsIMap) {
this.jobImmutableInformation = jobImmutableInformation;
this.jobId = jobImmutableInformation.getJobId();
@@ -166,7 +166,11 @@ public class PhysicalPlan {
jobStatus = JobStatus.CANCELED;
updateJobState(jobStatus);
} else {
- jobStatus = JobStatus.FINISHED;
+ if (this.getJobStatus() == JobStatus.DOING_SAVEPOINT) {
+ jobStatus = JobStatus.SAVEPOINT_DONE;
+ } else {
+ jobStatus = JobStatus.FINISHED;
+ }
updateJobState(jobStatus);
}
}
@@ -191,6 +195,17 @@ public class PhysicalPlan {
updateJobState(JobStatus.CANCELING);
}
+ public void savepointJob() {
+ if (getJobStatus().isEndState()) {
+ log.warn(
+ String.format(
+ "%s is in end state %s, can not do savepoint",
+ jobFullName, getJobStatus()));
+ return;
+ }
+ updateJobState(JobStatus.DOING_SAVEPOINT);
+ }
+
public List<SubPlan> getPipelineList() {
return pipelineList;
}
@@ -237,7 +252,7 @@ public class PhysicalPlan {
new RetryUtils.RetryMaterial(
Constant.OPERATION_RETRY_TIME,
true,
- exception -> ExceptionUtil.isOperationNeedRetryException(exception),
+ ExceptionUtil::isOperationNeedRetryException,
Constant.OPERATION_RETRY_SLEEP));
log.info(
String.format(
@@ -300,6 +315,7 @@ public class PhysicalPlan {
updateJobState(JobStatus.RUNNING);
break;
case RUNNING:
+ case DOING_SAVEPOINT:
break;
case FAILING:
case CANCELING:
@@ -308,6 +324,7 @@ public class PhysicalPlan {
break;
case FAILED:
case CANCELED:
+ case SAVEPOINT_DONE:
case FINISHED:
stopJobStateProcess();
jobEndFuture.complete(new JobResult(getJobStatus(), errorBySubPlan.get()));
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index 9bd40804fb..d9a4a0b94a 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -229,7 +229,7 @@ public class JobMaster {
this.checkpointPlanMap = planTuple.f1();
Exception initException = null;
try {
- this.initCheckPointManager();
+ this.initCheckPointManager(restart);
} catch (Exception e) {
initException = e;
}
@@ -242,11 +242,11 @@ public class JobMaster {
}
}
- public void initCheckPointManager() throws CheckpointStorageException {
+ public void initCheckPointManager(boolean restart) throws CheckpointStorageException {
this.checkpointManager =
new CheckpointManager(
jobImmutableInformation.getJobId(),
- jobImmutableInformation.isStartWithSavePoint(),
+ jobImmutableInformation.isStartWithSavePoint() || restart,
nodeEngine,
this,
checkpointPlanMap,
@@ -672,6 +672,7 @@ public class JobMaster {
"Begin do save point for Job %s (%s) ",
jobImmutableInformation.getJobConfig().getName(),
jobImmutableInformation.getJobId()));
+ physicalPlan.savepointJob();
PassiveCompletableFuture<CompletedCheckpoint>[] passiveCompletableFutures =
checkpointManager.triggerSavePoints();
return CompletableFuture.allOf(passiveCompletableFutures);
diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/SavePointTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/SavePointTest.java
index fedf186ca5..43fc98cd7b 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/SavePointTest.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/SavePointTest.java
@@ -75,6 +75,12 @@ public class SavePointTest extends AbstractSeaTunnelServerTest {
// 3 start savePoint
server.getCoordinatorService().savePoint(JOB_ID);
+ await().atMost(10000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ JobStatus status = server.getCoordinatorService().getJobStatus(JOB_ID);
+ Assertions.assertEquals(JobStatus.DOING_SAVEPOINT, status);
+ });
// 4 Wait for savePoint to complete
await().atMost(120000, TimeUnit.MILLISECONDS)
@@ -82,7 +88,7 @@ public class SavePointTest extends AbstractSeaTunnelServerTest {
() -> {
Assertions.assertEquals(
server.getCoordinatorService().getJobStatus(JOB_ID),
- JobStatus.FINISHED);
+ JobStatus.SAVEPOINT_DONE);
});
Thread.sleep(1000);
diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java
index 0446e9bb11..f3da33ba9a 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java
@@ -124,7 +124,7 @@ class JobMetricsTest extends AbstractSeaTunnelServerTest {
.untilAsserted(
() ->
Assertions.assertEquals(
- JobStatus.FINISHED,
+ JobStatus.SAVEPOINT_DONE,
server.getCoordinatorService().getJobStatus(jobId3)));
// restore job