You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2022/09/14 10:26:54 UTC
[incubator-seatunnel] branch st-engine updated: [Engine] [Server] Add when the node offline make task failed (#2724)
This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch st-engine
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/st-engine by this push:
new 4fbe412ec [Engine] [Server] Add when the node offline make task failed (#2724)
4fbe412ec is described below
commit 4fbe412ec9e491475caea2fc576198cbf217d5a9
Author: ic4y <83...@users.noreply.github.com>
AuthorDate: Wed Sep 14 18:26:47 2022 +0800
[Engine] [Server] Add when the node offline make task failed (#2724)
* add when the node offline make task failed
---
.../src/main/resources/hazelcast.yaml | 6 ++---
.../seatunnel/engine/server/SeaTunnelServer.java | 31 ++++++++++++++++++++++
.../engine/server/dag/physical/PhysicalVertex.java | 4 +++
3 files changed, 38 insertions(+), 3 deletions(-)
diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/resources/hazelcast.yaml b/seatunnel-engine/seatunnel-engine-common/src/main/resources/hazelcast.yaml
index bde537011..885326b00 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/main/resources/hazelcast.yaml
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/resources/hazelcast.yaml
@@ -19,10 +19,10 @@ hazelcast:
cluster-name: seatunnel
network:
join:
- multicast:
+ tcp-ip:
enabled: true
- multicast-group: 224.2.2.3
- multicast-port: 53328
+ member-list:
+ - localhost
port:
auto-increment: true
port-count: 100
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
index eecd2f2f1..e1632ae8f 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
@@ -20,8 +20,11 @@ package org.apache.seatunnel.engine.server;
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
import org.apache.seatunnel.engine.common.exception.JobException;
+import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex;
+import org.apache.seatunnel.engine.server.execution.ExecutionState;
import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
import org.apache.seatunnel.engine.server.master.JobMaster;
@@ -31,6 +34,7 @@ import org.apache.seatunnel.engine.server.service.slot.DefaultSlotService;
import org.apache.seatunnel.engine.server.service.slot.SlotService;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.hazelcast.cluster.Address;
import com.hazelcast.instance.impl.Node;
import com.hazelcast.internal.serialization.Data;
import com.hazelcast.internal.services.ManagedService;
@@ -45,6 +49,7 @@ import com.hazelcast.spi.impl.operationservice.LiveOperations;
import com.hazelcast.spi.impl.operationservice.LiveOperationsTracker;
import lombok.NonNull;
+import java.util.ArrayList;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
@@ -140,6 +145,7 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
@Override
public void memberRemoved(MembershipServiceEvent event) {
resourceManager.memberRemoved(event);
+ failedTaskOnMemberRemoved(event);
}
@Override
@@ -242,6 +248,31 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
return runningJobMaster.getJobStatus();
}
+ public void failedTaskOnMemberRemoved(MembershipServiceEvent event) {
+ Address lostAddress = event.getMember().getAddress();
+ runningJobMasterMap.forEach((aLong, jobMaster) -> {
+ jobMaster.getPhysicalPlan().getPipelineList().forEach(subPlan -> {
+ ArrayList<PhysicalVertex> allVertex = new ArrayList<>();
+ allVertex.addAll(subPlan.getPhysicalVertexList());
+ allVertex.addAll(subPlan.getCoordinatorVertexList());
+ allVertex.forEach(physicalVertex -> {
+ Address deployAddress = physicalVertex.getCurrentExecutionAddress();
+ ExecutionState executionState = physicalVertex.getExecutionState().get();
+ if (null != deployAddress && deployAddress.equals(lostAddress) &&
+ (executionState.equals(ExecutionState.DEPLOYING) ||
+ executionState.equals(ExecutionState.RUNNING))) {
+ TaskGroupLocation taskGroupLocation = physicalVertex.getTaskGroupLocation();
+ physicalVertex.updateTaskExecutionState(
+ new TaskExecutionState(taskGroupLocation, ExecutionState.FAILED,
+ new SeaTunnelEngineException(
+ String.format("The taskGroup(%s) deployed node(%s) offline", taskGroupLocation,
+ lostAddress))));
+ }
+ });
+ });
+ });
+ }
+
/**
* When TaskGroup ends, it is called by {@link TaskExecutionService} to notify JobMaster the TaskGroup's state.
*/
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
index b90e21b8a..6838df1f6 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
@@ -371,6 +371,10 @@ public class PhysicalVertex {
taskFuture.complete(taskExecutionState);
}
+ public Address getCurrentExecutionAddress() {
+ return currentExecutionAddress;
+ }
+
public TaskGroupLocation getTaskGroupLocation() {
return taskGroupLocation;
}