You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ty...@apache.org on 2023/03/31 06:51:40 UTC
[incubator-seatunnel] branch dev updated: [Hotfix]'']Fix job error message is not right bug (#4463)
This is an automated email from the ASF dual-hosted git repository.
tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new e05ceb798 [Hotfix]'']Fix job error message is not right bug (#4463)
e05ceb798 is described below
commit e05ceb798d0860acbd1aba1f1bbd0937bfccb2c3
Author: Eric <ga...@gmail.com>
AuthorDate: Fri Mar 31 14:51:33 2023 +0800
[Hotfix]'']Fix job error message is not right bug (#4463)
---
config/log4j2.properties | 3 +++
.../engine/server/checkpoint/CheckpointCoordinator.java | 11 ++++++++---
.../apache/seatunnel/engine/server/dag/physical/SubPlan.java | 4 +++-
3 files changed, 14 insertions(+), 4 deletions(-)
diff --git a/config/log4j2.properties b/config/log4j2.properties
index 8b8d47db8..fb1a07c6d 100644
--- a/config/log4j2.properties
+++ b/config/log4j2.properties
@@ -27,6 +27,9 @@ rootLogger.level = INFO
logger.zeta.name=org.apache.seatunnel.engine
logger.zeta.level=INFO
+logger.debezium.name=io.debezium.connector
+logger.debezium.level=WARN
+
############################ log output to console #############################
#rootLogger.appenderRef.consoleStdout.ref = consoleStdoutAppender
#rootLogger.appenderRef.consoleStderr.ref = consoleStderrAppender
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
index 42ff998b9..ea3cfd689 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
@@ -60,6 +60,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import static org.apache.seatunnel.engine.common.utils.ExceptionUtil.sneakyThrow;
@@ -127,6 +128,8 @@ public class CheckpointCoordinator {
private CompletableFuture<CheckpointCoordinatorState> checkpointCoordinatorFuture;
+ private AtomicReference<String> errorByPhysicalVertex = new AtomicReference<>();
+
@SneakyThrows
public CheckpointCoordinator(
CheckpointManager manager,
@@ -212,15 +215,16 @@ public class CheckpointCoordinator {
}
private void handleCoordinatorError(CheckpointCloseReason reason, Throwable e) {
+ CheckpointException checkpointException = new CheckpointException(reason, e);
+ errorByPhysicalVertex.compareAndSet(null, ExceptionUtils.getMessage(checkpointException));
+
if (checkpointCoordinatorFuture.isDone()) {
return;
}
- CheckpointException checkpointException = new CheckpointException(reason, e);
cleanPendingCheckpoint(reason);
checkpointCoordinatorFuture.complete(
new CheckpointCoordinatorState(
- CheckpointCoordinatorStatus.FAILED,
- ExceptionUtils.getMessage(checkpointException)));
+ CheckpointCoordinatorStatus.FAILED, errorByPhysicalVertex.get()));
checkpointManager.handleCheckpointError(pipelineId);
}
@@ -295,6 +299,7 @@ public class CheckpointCoordinator {
protected void restoreCoordinator(boolean alreadyStarted) {
LOG.info("received restore CheckpointCoordinator with alreadyStarted= " + alreadyStarted);
+ errorByPhysicalVertex = new AtomicReference<>();
checkpointCoordinatorFuture = new CompletableFuture<>();
cleanPendingCheckpoint(CheckpointCloseReason.CHECKPOINT_COORDINATOR_RESET);
shutdown = false;
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
index 1a8700344..d11ad63ee 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
@@ -82,7 +82,7 @@ public class SubPlan {
private final PipelineLocation pipelineLocation;
/** The error throw by physicalVertex, should be set when physicalVertex throw error. */
- private final AtomicReference<String> errorByPhysicalVertex = new AtomicReference<>();
+ private AtomicReference<String> errorByPhysicalVertex = new AtomicReference<>();
private final ExecutorService executorService;
@@ -140,6 +140,8 @@ public class SubPlan {
}
public synchronized PassiveCompletableFuture<PipelineExecutionState> initStateFuture() {
+ // reset errorByPhysicalVertex when restore pipeline
+ errorByPhysicalVertex = new AtomicReference<>();
physicalVertexList.forEach(
physicalVertex -> {
addPhysicalVertexCallBack(physicalVertex.initStateFuture());