You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by zo...@apache.org on 2022/10/12 06:21:54 UTC
[incubator-seatunnel] branch dev updated: [hotfix][engine] Fix local can not exit while job complete (#3068)
This is an automated email from the ASF dual-hosted git repository.
zongwen pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 556cb35ca [hotfix][engine] Fix local can not exit while job complete (#3068)
556cb35ca is described below
commit 556cb35ca89f95a3d5efbca43b6c150d3cd9b35f
Author: hailin0 <wa...@apache.org>
AuthorDate: Wed Oct 12 14:21:50 2022 +0800
[hotfix][engine] Fix local can not exit while job complete (#3068)
---
.../org/apache/seatunnel/engine/server/CoordinatorService.java | 7 ++++---
.../java/org/apache/seatunnel/engine/server/SeaTunnelServer.java | 6 ++++--
2 files changed, 8 insertions(+), 5 deletions(-)
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
index 5af9d47c9..2b4ef575a 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
@@ -111,15 +111,15 @@ public class CoordinatorService {
private volatile boolean isActive = false;
private final ExecutorService executorService;
+ private final ScheduledExecutorService monitorService;
@SuppressWarnings("checkstyle:MagicNumber")
public CoordinatorService(@NonNull NodeEngineImpl nodeEngine, @NonNull ExecutorService executorService) {
this.nodeEngine = nodeEngine;
this.logger = nodeEngine.getLogger(getClass());
this.executorService = executorService;
-
- ScheduledExecutorService masterActiveListener = Executors.newSingleThreadScheduledExecutor();
- masterActiveListener.scheduleAtFixedRate(() -> checkNewActiveMaster(), 0, 100, TimeUnit.MILLISECONDS);
+ this.monitorService = Executors.newSingleThreadScheduledExecutor();
+ monitorService.scheduleAtFixedRate(() -> checkNewActiveMaster(), 0, 100, TimeUnit.MILLISECONDS);
}
public JobMaster getJobMaster(Long jobId) {
@@ -390,6 +390,7 @@ public class CoordinatorService {
if (resourceManager != null) {
resourceManager.close();
}
+ monitorService.shutdown();
}
/**
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
index fe7ce55e0..90a87a770 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
@@ -58,6 +58,7 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
private volatile SlotService slotService;
private TaskExecutionService taskExecutionService;
private CoordinatorService coordinatorService;
+ private ScheduledExecutorService monitorService;
private final ExecutorService executorService;
@@ -100,8 +101,8 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
taskExecutionService.start();
getSlotService();
coordinatorService = new CoordinatorService(nodeEngine, executorService);
- ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
- service.scheduleAtFixedRate(() -> printExecutionInfo(), 0, 60, TimeUnit.SECONDS);
+ monitorService = Executors.newSingleThreadScheduledExecutor();
+ monitorService.scheduleAtFixedRate(() -> printExecutionInfo(), 0, 60, TimeUnit.SECONDS);
}
@Override
@@ -119,6 +120,7 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
}
executorService.shutdown();
taskExecutionService.shutdown();
+ monitorService.shutdown();
}
@Override