You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by zo...@apache.org on 2022/10/12 06:21:54 UTC

[incubator-seatunnel] branch dev updated: [hotfix][engine] Fix local can not exit while job complete (#3068)

This is an automated email from the ASF dual-hosted git repository.

zongwen pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 556cb35ca [hotfix][engine] Fix local can not exit while job complete (#3068)
556cb35ca is described below

commit 556cb35ca89f95a3d5efbca43b6c150d3cd9b35f
Author: hailin0 <wa...@apache.org>
AuthorDate: Wed Oct 12 14:21:50 2022 +0800

    [hotfix][engine] Fix local can not exit while job complete (#3068)
---
 .../org/apache/seatunnel/engine/server/CoordinatorService.java     | 7 ++++---
 .../java/org/apache/seatunnel/engine/server/SeaTunnelServer.java   | 6 ++++--
 2 files changed, 8 insertions(+), 5 deletions(-)

diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
index 5af9d47c9..2b4ef575a 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
@@ -111,15 +111,15 @@ public class CoordinatorService {
     private volatile boolean isActive = false;
 
     private final ExecutorService executorService;
+    private final ScheduledExecutorService monitorService;
 
     @SuppressWarnings("checkstyle:MagicNumber")
     public CoordinatorService(@NonNull NodeEngineImpl nodeEngine, @NonNull ExecutorService executorService) {
         this.nodeEngine = nodeEngine;
         this.logger = nodeEngine.getLogger(getClass());
         this.executorService = executorService;
-
-        ScheduledExecutorService masterActiveListener = Executors.newSingleThreadScheduledExecutor();
-        masterActiveListener.scheduleAtFixedRate(() -> checkNewActiveMaster(), 0, 100, TimeUnit.MILLISECONDS);
+        this.monitorService = Executors.newSingleThreadScheduledExecutor();
+        monitorService.scheduleAtFixedRate(() -> checkNewActiveMaster(), 0, 100, TimeUnit.MILLISECONDS);
     }
 
     public JobMaster getJobMaster(Long jobId) {
@@ -390,6 +390,7 @@ public class CoordinatorService {
         if (resourceManager != null) {
             resourceManager.close();
         }
+        monitorService.shutdown();
     }
 
     /**
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
index fe7ce55e0..90a87a770 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
@@ -58,6 +58,7 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
     private volatile SlotService slotService;
     private TaskExecutionService taskExecutionService;
     private CoordinatorService coordinatorService;
+    private ScheduledExecutorService monitorService;
 
     private final ExecutorService executorService;
 
@@ -100,8 +101,8 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
         taskExecutionService.start();
         getSlotService();
         coordinatorService = new CoordinatorService(nodeEngine, executorService);
-        ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
-        service.scheduleAtFixedRate(() -> printExecutionInfo(), 0, 60, TimeUnit.SECONDS);
+        monitorService = Executors.newSingleThreadScheduledExecutor();
+        monitorService.scheduleAtFixedRate(() -> printExecutionInfo(), 0, 60, TimeUnit.SECONDS);
     }
 
     @Override
@@ -119,6 +120,7 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
         }
         executorService.shutdown();
         taskExecutionService.shutdown();
+        monitorService.shutdown();
     }
 
     @Override