You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2022/10/09 14:26:24 UTC
[incubator-seatunnel] branch dev updated: put imap names to constant (#3036)
This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 996c4741f put imap names to constant (#3036)
996c4741f is described below
commit 996c4741f799e1bb8726f2eb1b35282ab0932505
Author: Eric <ga...@gmail.com>
AuthorDate: Sun Oct 9 22:26:18 2022 +0800
put imap names to constant (#3036)
---
.../main/java/org/apache/seatunnel/engine/common/Constant.java | 10 ++++++++++
.../org/apache/seatunnel/engine/server/CoordinatorService.java | 9 +++++----
.../org/apache/seatunnel/engine/server/SeaTunnelServer.java | 3 ++-
.../engine/server/resourcemanager/AbstractResourceManager.java | 3 ++-
4 files changed, 19 insertions(+), 6 deletions(-)
diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
index 6aab140cd..ba02b7f4f 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
@@ -39,4 +39,14 @@ public class Constant {
public static final int OPERATION_RETRY_TIME = 5;
public static final int OPERATION_RETRY_SLEEP = 2000;
+
+ public static final String IMAP_RUNNING_JOB_INFO = "runningJobInfo";
+
+ public static final String IMAP_RUNNING_JOB_STATE = "runningJobState";
+
+ public static final String IMAP_STATE_TIMESTAMPS = "stateTimestamps";
+
+ public static final String IMAP_OWNED_SLOT_PROFILES = "ownedSlotProfilesIMap";
+
+ public static final String IMAP_RESOURCE_MANAGER_REGISTER_WORKER = "ResourceManager_RegisterWorker";
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
index 14b61a91d..5af9d47c9 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.engine.server;
import org.apache.seatunnel.common.utils.ExceptionUtils;
+import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.exception.JobException;
import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
@@ -139,10 +140,10 @@ public class CoordinatorService {
// 4. If runningJobStateIMap.get(jobId) != null and the value is CANCELING or RUNNING. We need recover the JobMaster
// from runningJobStateIMap and then waiting for it complete.
private void initCoordinatorService() {
- runningJobInfoIMap = nodeEngine.getHazelcastInstance().getMap("runningJobInfo");
- runningJobStateIMap = nodeEngine.getHazelcastInstance().getMap("runningJobState");
- runningJobStateTimestampsIMap = nodeEngine.getHazelcastInstance().getMap("stateTimestamps");
- ownedSlotProfilesIMap = nodeEngine.getHazelcastInstance().getMap("ownedSlotProfilesIMap");
+ runningJobInfoIMap = nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_INFO);
+ runningJobStateIMap = nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_STATE);
+ runningJobStateTimestampsIMap = nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_STATE_TIMESTAMPS);
+ ownedSlotProfilesIMap = nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_OWNED_SLOT_PROFILES);
List<CompletableFuture<Void>> collect = runningJobInfoIMap.entrySet().stream().map(entry -> {
return CompletableFuture.runAsync(() -> restoreJobFromMasterActiveSwitch(entry.getKey(), entry.getValue()),
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
index ead141dac..fe7ce55e0 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.engine.server;
+import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
import org.apache.seatunnel.engine.server.execution.ExecutionState;
@@ -181,7 +182,7 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
* @return
*/
public boolean taskIsEnded(@NonNull TaskGroupLocation taskGroupLocation) {
- IMap<Object, Object> runningJobState = nodeEngine.getHazelcastInstance().getMap("runningJobState");
+ IMap<Object, Object> runningJobState = nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_STATE);
if (runningJobState == null) {
return false;
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
index c10d928db..f4a504b47 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.engine.server.resourcemanager;
+import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.runtime.ExecutionMode;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
import org.apache.seatunnel.engine.server.resourcemanager.opeartion.ReleaseSlotOperation;
@@ -55,7 +56,7 @@ public abstract class AbstractResourceManager implements ResourceManager {
private final ExecutionMode mode = ExecutionMode.LOCAL;
public AbstractResourceManager(NodeEngine nodeEngine) {
- this.registerWorker = nodeEngine.getHazelcastInstance().getMap("ResourceManager_RegisterWorker");
+ this.registerWorker = nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RESOURCE_MANAGER_REGISTER_WORKER);
this.nodeEngine = nodeEngine;
}