You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2023/05/30 07:54:58 UTC

[seatunnel] branch dev updated: [Hotfix][Zeta] Fix master active bug (#4855)

This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 1afd67341 [Hotfix][Zeta] Fix master active bug (#4855)
1afd67341 is described below

commit 1afd67341d2ff1ddcfeb75cc4032a9b2fb41371b
Author: Eric <ga...@gmail.com>
AuthorDate: Tue May 30 15:54:52 2023 +0800

    [Hotfix][Zeta] Fix master active bug (#4855)
---
 .../engine/server/CoordinatorService.java          | 33 ++++++++++++++++------
 .../engine/server/dag/physical/PhysicalVertex.java |  8 ++++--
 2 files changed, 31 insertions(+), 10 deletions(-)

diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
index c9b552cd9..d38f2e028 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
@@ -129,7 +129,7 @@ public class CoordinatorService {
     /** If this node is a master node */
     private volatile boolean isActive = false;
 
-    private final ExecutorService executorService;
+    private ExecutorService executorService;
 
     private final SeaTunnelServer seaTunnelServer;
 
@@ -279,13 +279,18 @@ public class CoordinatorService {
         }
 
         if (jobStatus.ordinal() < JobStatus.RUNNING.ordinal()) {
-            logger.info(
-                    String.format(
-                            "The restore %s is state %s, cancel job and submit it again.",
-                            jobFullName, jobStatus));
-            jobMaster.cancelJob();
-            jobMaster.getJobMasterCompleteFuture().join();
-            submitJob(jobId, jobInfo.getJobImmutableInformation()).join();
+            CompletableFuture.runAsync(
+                    () -> {
+                        logger.info(
+                                String.format(
+                                        "The restore %s is state %s, cancel job and submit it again.",
+                                        jobFullName, jobStatus));
+                        jobMaster.cancelJob();
+                        jobMaster.getJobMasterCompleteFuture().join();
+                        submitJob(jobId, jobInfo.getJobImmutableInformation()).join();
+                    },
+                    executorService);
+
             return;
         }
 
@@ -346,6 +351,13 @@ public class CoordinatorService {
             if (!isActive && this.seaTunnelServer.isMasterNode()) {
                 logger.info(
                         "This node become a new active master node, begin init coordinator service");
+                if (this.executorService.isShutdown()) {
+                    this.executorService =
+                            Executors.newCachedThreadPool(
+                                    new ThreadFactoryBuilder()
+                                            .setNameFormat("seatunnel-coordinator-service-%d")
+                                            .build());
+                }
                 initCoordinatorService();
                 isActive = true;
             } else if (isActive && !this.seaTunnelServer.isMasterNode()) {
@@ -523,6 +535,11 @@ public class CoordinatorService {
      * TaskGroup's state.
      */
     public void updateTaskExecutionState(TaskExecutionState taskExecutionState) {
+        logger.info(
+                String.format(
+                        "Received task end from execution %s, state %s",
+                        taskExecutionState.getTaskGroupLocation(),
+                        taskExecutionState.getExecutionState()));
         TaskGroupLocation taskGroupLocation = taskExecutionState.getTaskGroupLocation();
         JobMaster runningJobMaster = runningJobMasterMap.get(taskGroupLocation.getJobId());
         if (runningJobMaster == null) {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
index a95e7e8fd..586afbec3 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
@@ -211,7 +211,9 @@ public class PhysicalVertex {
                 LOGGER.warning(
                         "The node:"
                                 + worker.toString()
-                                + " running the taskGroup no longer exists, return false.");
+                                + " running the taskGroup "
+                                + taskGroupLocation
+                                + " no longer exists, return false.");
                 return false;
             }
             InvocationFuture<Object> invoke =
@@ -226,7 +228,9 @@ public class PhysicalVertex {
                 return (Boolean) invoke.get();
             } catch (InterruptedException | ExecutionException e) {
                 LOGGER.warning(
-                        "Execution of CheckTaskGroupIsExecutingOperation failed, checkTaskGroupIsExecuting return false. ",
+                        "Execution of CheckTaskGroupIsExecutingOperation "
+                                + taskGroupLocation
+                                + " failed, checkTaskGroupIsExecuting return false. ",
                         e);
             }
         }