You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/10/14 09:05:40 UTC
[incubator-seatunnel] branch dev updated: [Engine] [ResourceManager] Add member active check before release resource (#3086)
This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new ad2025c16 [Engine] [ResourceManager] Add member active check before release resource (#3086)
ad2025c16 is described below
commit ad2025c16b1596bcb81a7d6958a0cf42be60257b
Author: Hisoka <fa...@qq.com>
AuthorDate: Fri Oct 14 17:05:34 2022 +0800
[Engine] [ResourceManager] Add member active check before release resource (#3086)
---
.../server/resourcemanager/AbstractResourceManager.java | 14 +++++++-------
1 file changed, 7 insertions(+), 7 deletions(-)
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
index f4a504b47..aac24d732 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
@@ -19,12 +19,12 @@ package org.apache.seatunnel.engine.server.resourcemanager;
import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.runtime.ExecutionMode;
-import org.apache.seatunnel.engine.server.SeaTunnelServer;
import org.apache.seatunnel.engine.server.resourcemanager.opeartion.ReleaseSlotOperation;
import org.apache.seatunnel.engine.server.resourcemanager.opeartion.ResetResourceOperation;
import org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile;
import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
import org.apache.seatunnel.engine.server.resourcemanager.worker.WorkerProfile;
+import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
import com.hazelcast.cluster.Address;
import com.hazelcast.cluster.Member;
@@ -32,7 +32,6 @@ import com.hazelcast.internal.services.MembershipServiceEvent;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
import com.hazelcast.spi.impl.NodeEngine;
-import com.hazelcast.spi.impl.operationservice.InvocationBuilder;
import com.hazelcast.spi.impl.operationservice.Operation;
import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
@@ -132,10 +131,7 @@ public abstract class AbstractResourceManager implements ResourceManager {
}
protected <E> InvocationFuture<E> sendToMember(Operation operation, Address address) {
- InvocationBuilder invocationBuilder =
- nodeEngine.getOperationService().createInvocationBuilder(SeaTunnelServer.SERVICE_NAME,
- operation, address);
- return invocationBuilder.invoke();
+ return NodeEngineUtil.sendOperationToMemberNode(nodeEngine, operation, address);
}
@Override
@@ -157,7 +153,11 @@ public abstract class AbstractResourceManager implements ResourceManager {
@Override
public CompletableFuture<Void> releaseResource(long jobId, SlotProfile profile) {
- return sendToMember(new ReleaseSlotOperation(jobId, profile), profile.getWorker());
+ if (nodeEngine.getClusterService().getMember(profile.getWorker()) != null) {
+ return sendToMember(new ReleaseSlotOperation(jobId, profile), profile.getWorker());
+ } else {
+ return CompletableFuture.completedFuture(null);
+ }
}
@Override