You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2023/05/30 07:54:58 UTC
[seatunnel] branch dev updated: [Hotfix][Zeta] Fix master active bug (#4855)
This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 1afd67341 [Hotfix][Zeta] Fix master active bug (#4855)
1afd67341 is described below
commit 1afd67341d2ff1ddcfeb75cc4032a9b2fb41371b
Author: Eric <ga...@gmail.com>
AuthorDate: Tue May 30 15:54:52 2023 +0800
[Hotfix][Zeta] Fix master active bug (#4855)
---
.../engine/server/CoordinatorService.java | 33 ++++++++++++++++------
.../engine/server/dag/physical/PhysicalVertex.java | 8 ++++--
2 files changed, 31 insertions(+), 10 deletions(-)
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
index c9b552cd9..d38f2e028 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
@@ -129,7 +129,7 @@ public class CoordinatorService {
/** If this node is a master node */
private volatile boolean isActive = false;
- private final ExecutorService executorService;
+ private ExecutorService executorService;
private final SeaTunnelServer seaTunnelServer;
@@ -279,13 +279,18 @@ public class CoordinatorService {
}
if (jobStatus.ordinal() < JobStatus.RUNNING.ordinal()) {
- logger.info(
- String.format(
- "The restore %s is state %s, cancel job and submit it again.",
- jobFullName, jobStatus));
- jobMaster.cancelJob();
- jobMaster.getJobMasterCompleteFuture().join();
- submitJob(jobId, jobInfo.getJobImmutableInformation()).join();
+ CompletableFuture.runAsync(
+ () -> {
+ logger.info(
+ String.format(
+ "The restore %s is state %s, cancel job and submit it again.",
+ jobFullName, jobStatus));
+ jobMaster.cancelJob();
+ jobMaster.getJobMasterCompleteFuture().join();
+ submitJob(jobId, jobInfo.getJobImmutableInformation()).join();
+ },
+ executorService);
+
return;
}
@@ -346,6 +351,13 @@ public class CoordinatorService {
if (!isActive && this.seaTunnelServer.isMasterNode()) {
logger.info(
"This node become a new active master node, begin init coordinator service");
+ if (this.executorService.isShutdown()) {
+ this.executorService =
+ Executors.newCachedThreadPool(
+ new ThreadFactoryBuilder()
+ .setNameFormat("seatunnel-coordinator-service-%d")
+ .build());
+ }
initCoordinatorService();
isActive = true;
} else if (isActive && !this.seaTunnelServer.isMasterNode()) {
@@ -523,6 +535,11 @@ public class CoordinatorService {
* TaskGroup's state.
*/
public void updateTaskExecutionState(TaskExecutionState taskExecutionState) {
+ logger.info(
+ String.format(
+ "Received task end from execution %s, state %s",
+ taskExecutionState.getTaskGroupLocation(),
+ taskExecutionState.getExecutionState()));
TaskGroupLocation taskGroupLocation = taskExecutionState.getTaskGroupLocation();
JobMaster runningJobMaster = runningJobMasterMap.get(taskGroupLocation.getJobId());
if (runningJobMaster == null) {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
index a95e7e8fd..586afbec3 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
@@ -211,7 +211,9 @@ public class PhysicalVertex {
LOGGER.warning(
"The node:"
+ worker.toString()
- + " running the taskGroup no longer exists, return false.");
+ + " running the taskGroup "
+ + taskGroupLocation
+ + " no longer exists, return false.");
return false;
}
InvocationFuture<Object> invoke =
@@ -226,7 +228,9 @@ public class PhysicalVertex {
return (Boolean) invoke.get();
} catch (InterruptedException | ExecutionException e) {
LOGGER.warning(
- "Execution of CheckTaskGroupIsExecutingOperation failed, checkTaskGroupIsExecuting return false. ",
+ "Execution of CheckTaskGroupIsExecutingOperation "
+ + taskGroupLocation
+ + " failed, checkTaskGroupIsExecuting return false. ",
e);
}
}