You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2022/10/09 14:26:24 UTC

[incubator-seatunnel] branch dev updated: put imap names to constant (#3036)

This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 996c4741f put imap names to constant (#3036)
996c4741f is described below

commit 996c4741f799e1bb8726f2eb1b35282ab0932505
Author: Eric <ga...@gmail.com>
AuthorDate: Sun Oct 9 22:26:18 2022 +0800

    put imap names to constant (#3036)
---
 .../main/java/org/apache/seatunnel/engine/common/Constant.java | 10 ++++++++++
 .../org/apache/seatunnel/engine/server/CoordinatorService.java |  9 +++++----
 .../org/apache/seatunnel/engine/server/SeaTunnelServer.java    |  3 ++-
 .../engine/server/resourcemanager/AbstractResourceManager.java |  3 ++-
 4 files changed, 19 insertions(+), 6 deletions(-)

diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
index 6aab140cd..ba02b7f4f 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
@@ -39,4 +39,14 @@ public class Constant {
     public static final int OPERATION_RETRY_TIME = 5;
 
     public static final int OPERATION_RETRY_SLEEP = 2000;
+
+    public static final String IMAP_RUNNING_JOB_INFO = "runningJobInfo";
+
+    public static final String IMAP_RUNNING_JOB_STATE = "runningJobState";
+
+    public static final String IMAP_STATE_TIMESTAMPS = "stateTimestamps";
+
+    public static final String IMAP_OWNED_SLOT_PROFILES = "ownedSlotProfilesIMap";
+
+    public static final String IMAP_RESOURCE_MANAGER_REGISTER_WORKER = "ResourceManager_RegisterWorker";
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
index 14b61a91d..5af9d47c9 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.engine.server;
 
 import org.apache.seatunnel.common.utils.ExceptionUtils;
+import org.apache.seatunnel.engine.common.Constant;
 import org.apache.seatunnel.engine.common.exception.JobException;
 import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
 import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
@@ -139,10 +140,10 @@ public class CoordinatorService {
     // 4. If runningJobStateIMap.get(jobId) != null and the value is CANCELING or RUNNING. We need recover the JobMaster
     //    from runningJobStateIMap and then waiting for it complete.
     private void initCoordinatorService() {
-        runningJobInfoIMap = nodeEngine.getHazelcastInstance().getMap("runningJobInfo");
-        runningJobStateIMap = nodeEngine.getHazelcastInstance().getMap("runningJobState");
-        runningJobStateTimestampsIMap = nodeEngine.getHazelcastInstance().getMap("stateTimestamps");
-        ownedSlotProfilesIMap = nodeEngine.getHazelcastInstance().getMap("ownedSlotProfilesIMap");
+        runningJobInfoIMap = nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_INFO);
+        runningJobStateIMap = nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_STATE);
+        runningJobStateTimestampsIMap = nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_STATE_TIMESTAMPS);
+        ownedSlotProfilesIMap = nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_OWNED_SLOT_PROFILES);
 
         List<CompletableFuture<Void>> collect = runningJobInfoIMap.entrySet().stream().map(entry -> {
             return CompletableFuture.runAsync(() -> restoreJobFromMasterActiveSwitch(entry.getKey(), entry.getValue()),
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
index ead141dac..fe7ce55e0 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.engine.server;
 
+import org.apache.seatunnel.engine.common.Constant;
 import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
 import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
 import org.apache.seatunnel.engine.server.execution.ExecutionState;
@@ -181,7 +182,7 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
      * @return
      */
     public boolean taskIsEnded(@NonNull TaskGroupLocation taskGroupLocation) {
-        IMap<Object, Object> runningJobState = nodeEngine.getHazelcastInstance().getMap("runningJobState");
+        IMap<Object, Object> runningJobState = nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_STATE);
         if (runningJobState == null) {
             return false;
         }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
index c10d928db..f4a504b47 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.engine.server.resourcemanager;
 
+import org.apache.seatunnel.engine.common.Constant;
 import org.apache.seatunnel.engine.common.runtime.ExecutionMode;
 import org.apache.seatunnel.engine.server.SeaTunnelServer;
 import org.apache.seatunnel.engine.server.resourcemanager.opeartion.ReleaseSlotOperation;
@@ -55,7 +56,7 @@ public abstract class AbstractResourceManager implements ResourceManager {
     private final ExecutionMode mode = ExecutionMode.LOCAL;
 
     public AbstractResourceManager(NodeEngine nodeEngine) {
-        this.registerWorker = nodeEngine.getHazelcastInstance().getMap("ResourceManager_RegisterWorker");
+        this.registerWorker = nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RESOURCE_MANAGER_REGISTER_WORKER);
         this.nodeEngine = nodeEngine;
     }