You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ty...@apache.org on 2023/03/31 06:51:40 UTC

[incubator-seatunnel] branch dev updated: [Hotfix]'']Fix job error message is not right bug (#4463)

This is an automated email from the ASF dual-hosted git repository.

tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new e05ceb798 [Hotfix]'']Fix job error message is not right bug (#4463)
e05ceb798 is described below

commit e05ceb798d0860acbd1aba1f1bbd0937bfccb2c3
Author: Eric <ga...@gmail.com>
AuthorDate: Fri Mar 31 14:51:33 2023 +0800

    [Hotfix]'']Fix job error message is not right bug (#4463)
---
 config/log4j2.properties                                      |  3 +++
 .../engine/server/checkpoint/CheckpointCoordinator.java       | 11 ++++++++---
 .../apache/seatunnel/engine/server/dag/physical/SubPlan.java  |  4 +++-
 3 files changed, 14 insertions(+), 4 deletions(-)

diff --git a/config/log4j2.properties b/config/log4j2.properties
index 8b8d47db8..fb1a07c6d 100644
--- a/config/log4j2.properties
+++ b/config/log4j2.properties
@@ -27,6 +27,9 @@ rootLogger.level = INFO
 logger.zeta.name=org.apache.seatunnel.engine
 logger.zeta.level=INFO
 
+logger.debezium.name=io.debezium.connector
+logger.debezium.level=WARN
+
 ############################ log output to console #############################
 #rootLogger.appenderRef.consoleStdout.ref = consoleStdoutAppender
 #rootLogger.appenderRef.consoleStderr.ref = consoleStderrAppender
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
index 42ff998b9..ea3cfd689 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
@@ -60,6 +60,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
 import static org.apache.seatunnel.engine.common.utils.ExceptionUtil.sneakyThrow;
@@ -127,6 +128,8 @@ public class CheckpointCoordinator {
 
     private CompletableFuture<CheckpointCoordinatorState> checkpointCoordinatorFuture;
 
+    private AtomicReference<String> errorByPhysicalVertex = new AtomicReference<>();
+
     @SneakyThrows
     public CheckpointCoordinator(
             CheckpointManager manager,
@@ -212,15 +215,16 @@ public class CheckpointCoordinator {
     }
 
     private void handleCoordinatorError(CheckpointCloseReason reason, Throwable e) {
+        CheckpointException checkpointException = new CheckpointException(reason, e);
+        errorByPhysicalVertex.compareAndSet(null, ExceptionUtils.getMessage(checkpointException));
+
         if (checkpointCoordinatorFuture.isDone()) {
             return;
         }
-        CheckpointException checkpointException = new CheckpointException(reason, e);
         cleanPendingCheckpoint(reason);
         checkpointCoordinatorFuture.complete(
                 new CheckpointCoordinatorState(
-                        CheckpointCoordinatorStatus.FAILED,
-                        ExceptionUtils.getMessage(checkpointException)));
+                        CheckpointCoordinatorStatus.FAILED, errorByPhysicalVertex.get()));
         checkpointManager.handleCheckpointError(pipelineId);
     }
 
@@ -295,6 +299,7 @@ public class CheckpointCoordinator {
 
     protected void restoreCoordinator(boolean alreadyStarted) {
         LOG.info("received restore CheckpointCoordinator with alreadyStarted= " + alreadyStarted);
+        errorByPhysicalVertex = new AtomicReference<>();
         checkpointCoordinatorFuture = new CompletableFuture<>();
         cleanPendingCheckpoint(CheckpointCloseReason.CHECKPOINT_COORDINATOR_RESET);
         shutdown = false;
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
index 1a8700344..d11ad63ee 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
@@ -82,7 +82,7 @@ public class SubPlan {
     private final PipelineLocation pipelineLocation;
 
     /** The error throw by physicalVertex, should be set when physicalVertex throw error. */
-    private final AtomicReference<String> errorByPhysicalVertex = new AtomicReference<>();
+    private AtomicReference<String> errorByPhysicalVertex = new AtomicReference<>();
 
     private final ExecutorService executorService;
 
@@ -140,6 +140,8 @@ public class SubPlan {
     }
 
     public synchronized PassiveCompletableFuture<PipelineExecutionState> initStateFuture() {
+        // reset errorByPhysicalVertex when restore pipeline
+        errorByPhysicalVertex = new AtomicReference<>();
         physicalVertexList.forEach(
                 physicalVertex -> {
                     addPhysicalVertexCallBack(physicalVertex.initStateFuture());