You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/10/14 09:05:40 UTC

[incubator-seatunnel] branch dev updated: [Engine] [ResourceManager] Add member active check before release resource (#3086)

This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new ad2025c16 [Engine] [ResourceManager] Add member active check before release resource (#3086)
ad2025c16 is described below

commit ad2025c16b1596bcb81a7d6958a0cf42be60257b
Author: Hisoka <fa...@qq.com>
AuthorDate: Fri Oct 14 17:05:34 2022 +0800

    [Engine] [ResourceManager] Add member active check before release resource (#3086)
---
 .../server/resourcemanager/AbstractResourceManager.java    | 14 +++++++-------
 1 file changed, 7 insertions(+), 7 deletions(-)

diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
index f4a504b47..aac24d732 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
@@ -19,12 +19,12 @@ package org.apache.seatunnel.engine.server.resourcemanager;
 
 import org.apache.seatunnel.engine.common.Constant;
 import org.apache.seatunnel.engine.common.runtime.ExecutionMode;
-import org.apache.seatunnel.engine.server.SeaTunnelServer;
 import org.apache.seatunnel.engine.server.resourcemanager.opeartion.ReleaseSlotOperation;
 import org.apache.seatunnel.engine.server.resourcemanager.opeartion.ResetResourceOperation;
 import org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile;
 import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
 import org.apache.seatunnel.engine.server.resourcemanager.worker.WorkerProfile;
+import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
 
 import com.hazelcast.cluster.Address;
 import com.hazelcast.cluster.Member;
@@ -32,7 +32,6 @@ import com.hazelcast.internal.services.MembershipServiceEvent;
 import com.hazelcast.logging.ILogger;
 import com.hazelcast.logging.Logger;
 import com.hazelcast.spi.impl.NodeEngine;
-import com.hazelcast.spi.impl.operationservice.InvocationBuilder;
 import com.hazelcast.spi.impl.operationservice.Operation;
 import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
 
@@ -132,10 +131,7 @@ public abstract class AbstractResourceManager implements ResourceManager {
     }
 
     protected <E> InvocationFuture<E> sendToMember(Operation operation, Address address) {
-        InvocationBuilder invocationBuilder =
-            nodeEngine.getOperationService().createInvocationBuilder(SeaTunnelServer.SERVICE_NAME,
-                operation, address);
-        return invocationBuilder.invoke();
+        return NodeEngineUtil.sendOperationToMemberNode(nodeEngine, operation, address);
     }
 
     @Override
@@ -157,7 +153,11 @@ public abstract class AbstractResourceManager implements ResourceManager {
 
     @Override
     public CompletableFuture<Void> releaseResource(long jobId, SlotProfile profile) {
-        return sendToMember(new ReleaseSlotOperation(jobId, profile), profile.getWorker());
+        if (nodeEngine.getClusterService().getMember(profile.getWorker()) != null) {
+            return sendToMember(new ReleaseSlotOperation(jobId, profile), profile.getWorker());
+        } else {
+            return CompletableFuture.completedFuture(null);
+        }
     }
 
     @Override