You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ty...@apache.org on 2023/04/12 13:56:04 UTC
[incubator-seatunnel] branch dev updated: [Hotfix][Zeta] Fix savepoint test case error (#4561)
This is an automated email from the ASF dual-hosted git repository.
tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new c350c984f [Hotfix][Zeta] Fix savepoint test case error (#4561)
c350c984f is described below
commit c350c984ffdd1e06019e0b596ea6da05f678b8e0
Author: Eric <ga...@gmail.com>
AuthorDate: Wed Apr 12 21:55:54 2023 +0800
[Hotfix][Zeta] Fix savepoint test case error (#4561)
---
.../engine/client/SeaTunnelClientTest.java | 24 +++++++++++++++++++---
.../src/test/resources/log4j2-test.properties | 4 ++++
.../server/checkpoint/CheckpointCloseReason.java | 3 ++-
.../server/checkpoint/CheckpointCoordinator.java | 15 +++++++++++++-
.../CheckpointBarrierTriggerOperation.java | 2 +-
.../seatunnel/engine/server/master/JobMaster.java | 5 +++++
6 files changed, 47 insertions(+), 6 deletions(-)
diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
index c045067b4..554a1e782 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
@@ -21,8 +21,10 @@ import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.config.DeployMode;
+import org.apache.seatunnel.common.utils.RetryUtils;
import org.apache.seatunnel.engine.client.job.ClientJobProxy;
import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment;
+import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.config.ConfigProvider;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
@@ -259,8 +261,7 @@ public class SeaTunnelClientTest {
}
@Test
- public void testSavePointAndRestoreWithSavePoint()
- throws ExecutionException, InterruptedException {
+ public void testSavePointAndRestoreWithSavePoint() throws Exception {
Common.setDeployMode(DeployMode.CLIENT);
String filePath = TestUtils.getResource("/streaming_fake_to_console.conf");
JobConfig jobConfig = new JobConfig();
@@ -276,7 +277,24 @@ public class SeaTunnelClientTest {
.untilAsserted(
() -> Assertions.assertEquals("RUNNING", CLIENT.getJobStatus(jobId)));
- CLIENT.savePointJob(jobId);
+ RetryUtils.retryWithException(
+ () -> {
+ CLIENT.savePointJob(jobId);
+ return null;
+ },
+ new RetryUtils.RetryMaterial(
+ Constant.OPERATION_RETRY_TIME,
+ true,
+ exception -> {
+ // If we do savepoint for a Job which initialization has not been
+ // completed yet, we will get an error.
+ // In this test case, we need retry savepoint.
+ return exception
+ .getCause()
+ .getMessage()
+ .contains("Task not all ready, savepoint error");
+ },
+ Constant.OPERATION_RETRY_SLEEP));
await().atMost(30000, TimeUnit.MILLISECONDS)
.untilAsserted(
diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/resources/log4j2-test.properties b/seatunnel-engine/seatunnel-engine-client/src/test/resources/log4j2-test.properties
index 2dc1b8ca5..c5b54e52e 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/test/resources/log4j2-test.properties
+++ b/seatunnel-engine/seatunnel-engine-client/src/test/resources/log4j2-test.properties
@@ -18,6 +18,10 @@
rootLogger.level = INFO
+
+logger.zeta.name=org.apache.seatunnel.engine
+logger.zeta.level=INFO
+
rootLogger.appenderRef.consoleStdout.ref = consoleStdoutAppender
rootLogger.appenderRef.consoleStderr.ref = consoleStderrAppender
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCloseReason.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCloseReason.java
index a500eb19a..7962ebc75 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCloseReason.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCloseReason.java
@@ -24,7 +24,8 @@ public enum CheckpointCloseReason {
CHECKPOINT_COORDINATOR_SHUTDOWN("CheckpointCoordinator shutdown."),
CHECKPOINT_COORDINATOR_RESET("CheckpointCoordinator reset."),
CHECKPOINT_INSIDE_ERROR("CheckpointCoordinator inside have error."),
- AGGREGATE_COMMIT_ERROR("Aggregate commit error.");
+ AGGREGATE_COMMIT_ERROR("Aggregate commit error."),
+ TASK_NOT_ALL_READY_WHEN_SAVEPOINT("Task not all ready, savepoint error");
private final String message;
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
index ea3cfd689..64285ffa5 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
@@ -371,10 +371,23 @@ public class CheckpointCoordinator {
}
public PassiveCompletableFuture<CompletedCheckpoint> startSavepoint() {
+ LOG.info(String.format("Start save point for Job (%s)", jobId));
+ if (!isAllTaskReady) {
+ CompletableFuture savepointFuture = new CompletableFuture();
+ savepointFuture.completeExceptionally(
+ new CheckpointException(
+ CheckpointCloseReason.TASK_NOT_ALL_READY_WHEN_SAVEPOINT));
+ return new PassiveCompletableFuture<>(savepointFuture);
+ }
CompletableFuture<PendingCheckpoint> savepoint =
createPendingCheckpoint(Instant.now().toEpochMilli(), SAVEPOINT_TYPE);
startTriggerPendingCheckpoint(savepoint);
- return savepoint.join().getCompletableFuture();
+ PendingCheckpoint savepointPendingCheckpoint = savepoint.join();
+ LOG.info(
+ String.format(
+ "The save point checkpointId is %s",
+ savepointPendingCheckpoint.getCheckpointId()));
+ return savepointPendingCheckpoint.getCompletableFuture();
}
private void startTriggerPendingCheckpoint(
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointBarrierTriggerOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointBarrierTriggerOperation.java
index 3bb373ce5..cd9d54837 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointBarrierTriggerOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointBarrierTriggerOperation.java
@@ -80,7 +80,7 @@ public class CheckpointBarrierTriggerOperation extends TaskOperation {
.getTaskGroup()
.getTask(taskLocation.getTaskID());
try {
- log.debug("CheckpointBarrierTriggerOperation [{}]" + taskLocation);
+ log.debug("CheckpointBarrierTriggerOperation [{}]", taskLocation);
task.triggerBarrier(barrier);
} catch (Exception e) {
sneakyThrow(e);
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index 5bb01702f..d13fcbbd1 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -624,6 +624,11 @@ public class JobMaster {
/** Execute savePoint, which will cause the job to end. */
public CompletableFuture<Void> savePoint() {
+ LOGGER.info(
+ String.format(
+ "Begin do save point for Job %s (%s) ",
+ jobImmutableInformation.getJobConfig().getName(),
+ jobImmutableInformation.getJobId()));
PassiveCompletableFuture<CompletedCheckpoint>[] passiveCompletableFutures =
checkpointManager.triggerSavePoints();
return CompletableFuture.allOf(passiveCompletableFutures);