You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2022/09/14 10:26:54 UTC

[incubator-seatunnel] branch st-engine updated: [Engine] [Server] Add when the node offline make task failed (#2724)

This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch st-engine
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/st-engine by this push:
     new 4fbe412ec [Engine] [Server] Add when the node offline make task failed (#2724)
4fbe412ec is described below

commit 4fbe412ec9e491475caea2fc576198cbf217d5a9
Author: ic4y <83...@users.noreply.github.com>
AuthorDate: Wed Sep 14 18:26:47 2022 +0800

    [Engine] [Server] Add when the node offline make task failed (#2724)
    
    * add when the node offline make task failed
---
 .../src/main/resources/hazelcast.yaml              |  6 ++---
 .../seatunnel/engine/server/SeaTunnelServer.java   | 31 ++++++++++++++++++++++
 .../engine/server/dag/physical/PhysicalVertex.java |  4 +++
 3 files changed, 38 insertions(+), 3 deletions(-)

diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/resources/hazelcast.yaml b/seatunnel-engine/seatunnel-engine-common/src/main/resources/hazelcast.yaml
index bde537011..885326b00 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/main/resources/hazelcast.yaml
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/resources/hazelcast.yaml
@@ -19,10 +19,10 @@ hazelcast:
   cluster-name: seatunnel
   network:
     join:
-      multicast:
+      tcp-ip:
         enabled: true
-        multicast-group: 224.2.2.3
-        multicast-port: 53328
+        member-list:
+          - localhost
     port:
       auto-increment: true
       port-count: 100
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
index eecd2f2f1..e1632ae8f 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
@@ -20,8 +20,11 @@ package org.apache.seatunnel.engine.server;
 import org.apache.seatunnel.common.utils.ExceptionUtils;
 import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
 import org.apache.seatunnel.engine.common.exception.JobException;
+import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
 import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
 import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex;
+import org.apache.seatunnel.engine.server.execution.ExecutionState;
 import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
 import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
 import org.apache.seatunnel.engine.server.master.JobMaster;
@@ -31,6 +34,7 @@ import org.apache.seatunnel.engine.server.service.slot.DefaultSlotService;
 import org.apache.seatunnel.engine.server.service.slot.SlotService;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.hazelcast.cluster.Address;
 import com.hazelcast.instance.impl.Node;
 import com.hazelcast.internal.serialization.Data;
 import com.hazelcast.internal.services.ManagedService;
@@ -45,6 +49,7 @@ import com.hazelcast.spi.impl.operationservice.LiveOperations;
 import com.hazelcast.spi.impl.operationservice.LiveOperationsTracker;
 import lombok.NonNull;
 
+import java.util.ArrayList;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.CompletableFuture;
@@ -140,6 +145,7 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
     @Override
     public void memberRemoved(MembershipServiceEvent event) {
         resourceManager.memberRemoved(event);
+        failedTaskOnMemberRemoved(event);
     }
 
     @Override
@@ -242,6 +248,31 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
         return runningJobMaster.getJobStatus();
     }
 
+    public void failedTaskOnMemberRemoved(MembershipServiceEvent event) {
+        Address lostAddress = event.getMember().getAddress();
+        runningJobMasterMap.forEach((aLong, jobMaster) -> {
+            jobMaster.getPhysicalPlan().getPipelineList().forEach(subPlan -> {
+                ArrayList<PhysicalVertex> allVertex = new ArrayList<>();
+                allVertex.addAll(subPlan.getPhysicalVertexList());
+                allVertex.addAll(subPlan.getCoordinatorVertexList());
+                allVertex.forEach(physicalVertex -> {
+                    Address deployAddress = physicalVertex.getCurrentExecutionAddress();
+                    ExecutionState executionState = physicalVertex.getExecutionState().get();
+                    if (null != deployAddress && deployAddress.equals(lostAddress) &&
+                        (executionState.equals(ExecutionState.DEPLOYING) ||
+                            executionState.equals(ExecutionState.RUNNING))) {
+                        TaskGroupLocation taskGroupLocation = physicalVertex.getTaskGroupLocation();
+                        physicalVertex.updateTaskExecutionState(
+                            new TaskExecutionState(taskGroupLocation, ExecutionState.FAILED,
+                                new SeaTunnelEngineException(
+                                    String.format("The taskGroup(%s) deployed node(%s) offline", taskGroupLocation,
+                                        lostAddress))));
+                    }
+                });
+            });
+        });
+    }
+
     /**
      * When TaskGroup ends, it is called by {@link TaskExecutionService} to notify JobMaster the TaskGroup's state.
      */
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
index b90e21b8a..6838df1f6 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
@@ -371,6 +371,10 @@ public class PhysicalVertex {
         taskFuture.complete(taskExecutionState);
     }
 
+    public Address getCurrentExecutionAddress() {
+        return currentExecutionAddress;
+    }
+
     public TaskGroupLocation getTaskGroupLocation() {
         return taskGroupLocation;
     }