You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ty...@apache.org on 2023/04/12 13:56:04 UTC

[incubator-seatunnel] branch dev updated: [Hotfix][Zeta] Fix savepoint test case error (#4561)

This is an automated email from the ASF dual-hosted git repository.

tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new c350c984f [Hotfix][Zeta] Fix savepoint test case error (#4561)
c350c984f is described below

commit c350c984ffdd1e06019e0b596ea6da05f678b8e0
Author: Eric <ga...@gmail.com>
AuthorDate: Wed Apr 12 21:55:54 2023 +0800

    [Hotfix][Zeta] Fix savepoint test case error (#4561)
---
 .../engine/client/SeaTunnelClientTest.java         | 24 +++++++++++++++++++---
 .../src/test/resources/log4j2-test.properties      |  4 ++++
 .../server/checkpoint/CheckpointCloseReason.java   |  3 ++-
 .../server/checkpoint/CheckpointCoordinator.java   | 15 +++++++++++++-
 .../CheckpointBarrierTriggerOperation.java         |  2 +-
 .../seatunnel/engine/server/master/JobMaster.java  |  5 +++++
 6 files changed, 47 insertions(+), 6 deletions(-)

diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
index c045067b4..554a1e782 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
@@ -21,8 +21,10 @@ import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
 
 import org.apache.seatunnel.common.config.Common;
 import org.apache.seatunnel.common.config.DeployMode;
+import org.apache.seatunnel.common.utils.RetryUtils;
 import org.apache.seatunnel.engine.client.job.ClientJobProxy;
 import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment;
+import org.apache.seatunnel.engine.common.Constant;
 import org.apache.seatunnel.engine.common.config.ConfigProvider;
 import org.apache.seatunnel.engine.common.config.JobConfig;
 import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
@@ -259,8 +261,7 @@ public class SeaTunnelClientTest {
     }
 
     @Test
-    public void testSavePointAndRestoreWithSavePoint()
-            throws ExecutionException, InterruptedException {
+    public void testSavePointAndRestoreWithSavePoint() throws Exception {
         Common.setDeployMode(DeployMode.CLIENT);
         String filePath = TestUtils.getResource("/streaming_fake_to_console.conf");
         JobConfig jobConfig = new JobConfig();
@@ -276,7 +277,24 @@ public class SeaTunnelClientTest {
                 .untilAsserted(
                         () -> Assertions.assertEquals("RUNNING", CLIENT.getJobStatus(jobId)));
 
-        CLIENT.savePointJob(jobId);
+        RetryUtils.retryWithException(
+                () -> {
+                    CLIENT.savePointJob(jobId);
+                    return null;
+                },
+                new RetryUtils.RetryMaterial(
+                        Constant.OPERATION_RETRY_TIME,
+                        true,
+                        exception -> {
+                            // If we do savepoint for a Job which initialization has not been
+                            // completed yet, we will get an error.
+                            // In this test case, we need retry savepoint.
+                            return exception
+                                    .getCause()
+                                    .getMessage()
+                                    .contains("Task not all ready, savepoint error");
+                        },
+                        Constant.OPERATION_RETRY_SLEEP));
 
         await().atMost(30000, TimeUnit.MILLISECONDS)
                 .untilAsserted(
diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/resources/log4j2-test.properties b/seatunnel-engine/seatunnel-engine-client/src/test/resources/log4j2-test.properties
index 2dc1b8ca5..c5b54e52e 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/test/resources/log4j2-test.properties
+++ b/seatunnel-engine/seatunnel-engine-client/src/test/resources/log4j2-test.properties
@@ -18,6 +18,10 @@
 
 rootLogger.level = INFO
 
+
+logger.zeta.name=org.apache.seatunnel.engine
+logger.zeta.level=INFO
+
 rootLogger.appenderRef.consoleStdout.ref = consoleStdoutAppender
 rootLogger.appenderRef.consoleStderr.ref = consoleStderrAppender
 
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCloseReason.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCloseReason.java
index a500eb19a..7962ebc75 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCloseReason.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCloseReason.java
@@ -24,7 +24,8 @@ public enum CheckpointCloseReason {
     CHECKPOINT_COORDINATOR_SHUTDOWN("CheckpointCoordinator shutdown."),
     CHECKPOINT_COORDINATOR_RESET("CheckpointCoordinator reset."),
     CHECKPOINT_INSIDE_ERROR("CheckpointCoordinator inside have error."),
-    AGGREGATE_COMMIT_ERROR("Aggregate commit error.");
+    AGGREGATE_COMMIT_ERROR("Aggregate commit error."),
+    TASK_NOT_ALL_READY_WHEN_SAVEPOINT("Task not all ready, savepoint error");
 
     private final String message;
 
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
index ea3cfd689..64285ffa5 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
@@ -371,10 +371,23 @@ public class CheckpointCoordinator {
     }
 
     public PassiveCompletableFuture<CompletedCheckpoint> startSavepoint() {
+        LOG.info(String.format("Start save point for Job (%s)", jobId));
+        if (!isAllTaskReady) {
+            CompletableFuture savepointFuture = new CompletableFuture();
+            savepointFuture.completeExceptionally(
+                    new CheckpointException(
+                            CheckpointCloseReason.TASK_NOT_ALL_READY_WHEN_SAVEPOINT));
+            return new PassiveCompletableFuture<>(savepointFuture);
+        }
         CompletableFuture<PendingCheckpoint> savepoint =
                 createPendingCheckpoint(Instant.now().toEpochMilli(), SAVEPOINT_TYPE);
         startTriggerPendingCheckpoint(savepoint);
-        return savepoint.join().getCompletableFuture();
+        PendingCheckpoint savepointPendingCheckpoint = savepoint.join();
+        LOG.info(
+                String.format(
+                        "The save point checkpointId is %s",
+                        savepointPendingCheckpoint.getCheckpointId()));
+        return savepointPendingCheckpoint.getCompletableFuture();
     }
 
     private void startTriggerPendingCheckpoint(
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointBarrierTriggerOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointBarrierTriggerOperation.java
index 3bb373ce5..cd9d54837 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointBarrierTriggerOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointBarrierTriggerOperation.java
@@ -80,7 +80,7 @@ public class CheckpointBarrierTriggerOperation extends TaskOperation {
                                     .getTaskGroup()
                                     .getTask(taskLocation.getTaskID());
                     try {
-                        log.debug("CheckpointBarrierTriggerOperation [{}]" + taskLocation);
+                        log.debug("CheckpointBarrierTriggerOperation [{}]", taskLocation);
                         task.triggerBarrier(barrier);
                     } catch (Exception e) {
                         sneakyThrow(e);
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index 5bb01702f..d13fcbbd1 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -624,6 +624,11 @@ public class JobMaster {
 
     /** Execute savePoint, which will cause the job to end. */
     public CompletableFuture<Void> savePoint() {
+        LOGGER.info(
+                String.format(
+                        "Begin do save point for Job %s (%s) ",
+                        jobImmutableInformation.getJobConfig().getName(),
+                        jobImmutableInformation.getJobId()));
         PassiveCompletableFuture<CompletedCheckpoint>[] passiveCompletableFutures =
                 checkpointManager.triggerSavePoints();
         return CompletableFuture.allOf(passiveCompletableFutures);