You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by wa...@apache.org on 2023/12/07 02:30:56 UTC

(seatunnel) branch dev updated: [Feature][Zeta] add new job status `DOING_SAVEPOINT` and `SAVEPOINT_DONE` (#5917)

This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 87db4d1f8b [Feature][Zeta] add new job status `DOING_SAVEPOINT` and `SAVEPOINT_DONE` (#5917)
87db4d1f8b is described below

commit 87db4d1f8bfc6273a727d5ed77e642a4fbee5e72
Author: Jia Fan <fa...@qq.com>
AuthorDate: Thu Dec 7 10:30:51 2023 +0800

    [Feature][Zeta] add new job status `DOING_SAVEPOINT` and `SAVEPOINT_DONE` (#5917)
---
 .../org/apache/seatunnel/engine/e2e/RestApiIT.java |  2 +-
 .../engine/client/SeaTunnelClientTest.java         |  2 +-
 .../seatunnel/engine/core/job/JobStatus.java       |  6 +++++
 .../engine/server/CoordinatorService.java          | 11 ++++++--
 .../server/checkpoint/CheckpointManager.java       | 16 ++++++------
 .../engine/server/dag/physical/PhysicalPlan.java   | 29 +++++++++++++++++-----
 .../seatunnel/engine/server/master/JobMaster.java  |  7 +++---
 .../engine/server/checkpoint/SavePointTest.java    |  8 +++++-
 .../engine/server/master/JobMetricsTest.java       |  2 +-
 9 files changed, 60 insertions(+), 23 deletions(-)

diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
index 0c5b7d2f6f..fedfa4ce25 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
@@ -203,7 +203,7 @@ public class RestApiIT {
                 .untilAsserted(
                         () ->
                                 Assertions.assertEquals(
-                                        JobStatus.FINISHED,
+                                        JobStatus.SAVEPOINT_DONE,
                                         seaTunnelServer
                                                 .getCoordinatorService()
                                                 .getJobStatus(Long.parseLong(jobId))));
diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
index ea76c28b48..0fbbe80572 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
@@ -406,7 +406,7 @@ public class SeaTunnelClientTest {
                     .untilAsserted(
                             () ->
                                     Assertions.assertEquals(
-                                            "FINISHED", jobClient.getJobStatus(jobId)));
+                                            "SAVEPOINT_DONE", jobClient.getJobStatus(jobId)));
 
             Thread.sleep(1000);
             seaTunnelClient
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobStatus.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobStatus.java
index e9801629b1..ed3cd50bb3 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobStatus.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobStatus.java
@@ -44,6 +44,12 @@ public enum JobStatus {
     /** The job has failed with a non-recoverable task failure. */
     FAILED(EndState.GLOBALLY),
 
+    /** Job is being savepoint. */
+    DOING_SAVEPOINT(EndState.NOT_END),
+
+    /** Job has been savepoint. */
+    SAVEPOINT_DONE(EndState.GLOBALLY),
+
     /** Job is being cancelled. */
     CANCELING(EndState.NOT_END),
 
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
index d92f40ce39..98b5f03b3a 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
@@ -453,8 +453,15 @@ public class CoordinatorService {
             logger.warning(throwable);
             voidCompletableFuture.completeExceptionally(throwable);
         } else {
-            JobMaster jobMaster = runningJobMasterMap.get(jobId);
-            voidCompletableFuture = jobMaster.savePoint();
+            voidCompletableFuture =
+                    new PassiveCompletableFuture<>(
+                            CompletableFuture.supplyAsync(
+                                    () -> {
+                                        JobMaster runningJobMaster = runningJobMasterMap.get(jobId);
+                                        runningJobMaster.savePoint().join();
+                                        return null;
+                                    },
+                                    executorService));
         }
         return new PassiveCompletableFuture<>(voidCompletableFuture);
     }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
index cd58da1dd9..941af4b791 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
@@ -112,16 +112,16 @@ public class CheckpointManager {
                                                     jobId, plan.getPipelineId(), nodeEngine);
                                     try {
                                         idCounter.start();
-                                        PipelineState pipelineState =
-                                                checkpointStorage
-                                                        .getLatestCheckpointByJobIdAndPipelineId(
-                                                                String.valueOf(jobId),
-                                                                String.valueOf(
-                                                                        plan.getPipelineId()));
-                                        if (pipelineState != null) {
+                                        PipelineState pipelineState = null;
+                                        if (isStartWithSavePoint) {
+                                            pipelineState =
+                                                    checkpointStorage
+                                                            .getLatestCheckpointByJobIdAndPipelineId(
+                                                                    String.valueOf(jobId),
+                                                                    String.valueOf(
+                                                                            plan.getPipelineId()));
                                             long checkpointId = pipelineState.getCheckpointId();
                                             idCounter.setCount(checkpointId + 1);
-
                                             log.info(
                                                     "pipeline({}) start with savePoint on checkPointId({})",
                                                     plan.getPipelineId(),
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
index 72de3f8658..ba169d6c2e 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
@@ -45,11 +45,11 @@ public class PhysicalPlan {
 
     private final List<SubPlan> pipelineList;
 
-    private AtomicInteger finishedPipelineNum = new AtomicInteger(0);
+    private final AtomicInteger finishedPipelineNum = new AtomicInteger(0);
 
-    private AtomicInteger canceledPipelineNum = new AtomicInteger(0);
+    private final AtomicInteger canceledPipelineNum = new AtomicInteger(0);
 
-    private AtomicInteger failedPipelineNum = new AtomicInteger(0);
+    private final AtomicInteger failedPipelineNum = new AtomicInteger(0);
 
     private final JobImmutableInformation jobImmutableInformation;
 
@@ -85,7 +85,7 @@ public class PhysicalPlan {
             @NonNull ExecutorService executorService,
             @NonNull JobImmutableInformation jobImmutableInformation,
             long initializationTimestamp,
-            @NonNull IMap runningJobStateIMap,
+            @NonNull IMap<Object, Object> runningJobStateIMap,
             @NonNull IMap runningJobStateTimestampsIMap) {
         this.jobImmutableInformation = jobImmutableInformation;
         this.jobId = jobImmutableInformation.getJobId();
@@ -166,7 +166,11 @@ public class PhysicalPlan {
                                 jobStatus = JobStatus.CANCELED;
                                 updateJobState(jobStatus);
                             } else {
-                                jobStatus = JobStatus.FINISHED;
+                                if (this.getJobStatus() == JobStatus.DOING_SAVEPOINT) {
+                                    jobStatus = JobStatus.SAVEPOINT_DONE;
+                                } else {
+                                    jobStatus = JobStatus.FINISHED;
+                                }
                                 updateJobState(jobStatus);
                             }
                         }
@@ -191,6 +195,17 @@ public class PhysicalPlan {
         updateJobState(JobStatus.CANCELING);
     }
 
+    public void savepointJob() {
+        if (getJobStatus().isEndState()) {
+            log.warn(
+                    String.format(
+                            "%s is in end state %s, can not do savepoint",
+                            jobFullName, getJobStatus()));
+            return;
+        }
+        updateJobState(JobStatus.DOING_SAVEPOINT);
+    }
+
     public List<SubPlan> getPipelineList() {
         return pipelineList;
     }
@@ -237,7 +252,7 @@ public class PhysicalPlan {
                     new RetryUtils.RetryMaterial(
                             Constant.OPERATION_RETRY_TIME,
                             true,
-                            exception -> ExceptionUtil.isOperationNeedRetryException(exception),
+                            ExceptionUtil::isOperationNeedRetryException,
                             Constant.OPERATION_RETRY_SLEEP));
             log.info(
                     String.format(
@@ -300,6 +315,7 @@ public class PhysicalPlan {
                 updateJobState(JobStatus.RUNNING);
                 break;
             case RUNNING:
+            case DOING_SAVEPOINT:
                 break;
             case FAILING:
             case CANCELING:
@@ -308,6 +324,7 @@ public class PhysicalPlan {
                 break;
             case FAILED:
             case CANCELED:
+            case SAVEPOINT_DONE:
             case FINISHED:
                 stopJobStateProcess();
                 jobEndFuture.complete(new JobResult(getJobStatus(), errorBySubPlan.get()));
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index 9bd40804fb..d9a4a0b94a 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -229,7 +229,7 @@ public class JobMaster {
         this.checkpointPlanMap = planTuple.f1();
         Exception initException = null;
         try {
-            this.initCheckPointManager();
+            this.initCheckPointManager(restart);
         } catch (Exception e) {
             initException = e;
         }
@@ -242,11 +242,11 @@ public class JobMaster {
         }
     }
 
-    public void initCheckPointManager() throws CheckpointStorageException {
+    public void initCheckPointManager(boolean restart) throws CheckpointStorageException {
         this.checkpointManager =
                 new CheckpointManager(
                         jobImmutableInformation.getJobId(),
-                        jobImmutableInformation.isStartWithSavePoint(),
+                        jobImmutableInformation.isStartWithSavePoint() || restart,
                         nodeEngine,
                         this,
                         checkpointPlanMap,
@@ -672,6 +672,7 @@ public class JobMaster {
                         "Begin do save point for Job %s (%s) ",
                         jobImmutableInformation.getJobConfig().getName(),
                         jobImmutableInformation.getJobId()));
+        physicalPlan.savepointJob();
         PassiveCompletableFuture<CompletedCheckpoint>[] passiveCompletableFutures =
                 checkpointManager.triggerSavePoints();
         return CompletableFuture.allOf(passiveCompletableFutures);
diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/SavePointTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/SavePointTest.java
index fedf186ca5..43fc98cd7b 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/SavePointTest.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/SavePointTest.java
@@ -75,6 +75,12 @@ public class SavePointTest extends AbstractSeaTunnelServerTest {
 
         // 3 start savePoint
         server.getCoordinatorService().savePoint(JOB_ID);
+        await().atMost(10000, TimeUnit.MILLISECONDS)
+                .untilAsserted(
+                        () -> {
+                            JobStatus status = server.getCoordinatorService().getJobStatus(JOB_ID);
+                            Assertions.assertEquals(JobStatus.DOING_SAVEPOINT, status);
+                        });
 
         // 4 Wait for savePoint to complete
         await().atMost(120000, TimeUnit.MILLISECONDS)
@@ -82,7 +88,7 @@ public class SavePointTest extends AbstractSeaTunnelServerTest {
                         () -> {
                             Assertions.assertEquals(
                                     server.getCoordinatorService().getJobStatus(JOB_ID),
-                                    JobStatus.FINISHED);
+                                    JobStatus.SAVEPOINT_DONE);
                         });
 
         Thread.sleep(1000);
diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java
index 0446e9bb11..f3da33ba9a 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java
@@ -124,7 +124,7 @@ class JobMetricsTest extends AbstractSeaTunnelServerTest {
                 .untilAsserted(
                         () ->
                                 Assertions.assertEquals(
-                                        JobStatus.FINISHED,
+                                        JobStatus.SAVEPOINT_DONE,
                                         server.getCoordinatorService().getJobStatus(jobId3)));
 
         // restore job