You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by zo...@apache.org on 2022/08/30 09:09:48 UTC

[incubator-seatunnel] branch st-engine updated: [engine][Task] add notifyTaskStatus to jobMaster (#2562)

This is an automated email from the ASF dual-hosted git repository.

zongwen pushed a commit to branch st-engine
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/st-engine by this push:
     new 526727519 [engine][Task] add notifyTaskStatus to jobMaster (#2562)
526727519 is described below

commit 526727519277ef393f827da38047b3e9095027bb
Author: ic4y <83...@users.noreply.github.com>
AuthorDate: Tue Aug 30 17:09:43 2022 +0800

    [engine][Task] add notifyTaskStatus to jobMaster (#2562)
---
 .../engine/server/TaskExecutionService.java        | 39 +++++++++++++----
 .../engine/server/dag/physical/PhysicalVertex.java |  2 +-
 .../server/execution/TaskExecutionState.java       |  2 +-
 .../engine/server/execution/TaskGroup.java         |  2 +-
 .../server/execution/TaskGroupDefaultImpl.java     |  2 +-
 .../operation/NotifyTaskStatusOperation.java       | 49 ++++++++++++++++++++++
 .../server/scheduler/PipelineBaseScheduler.java    |  4 +-
 .../engine/server/TaskExecutionServiceTest.java    |  2 +-
 8 files changed, 87 insertions(+), 15 deletions(-)

diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
index f739fb192..03ea4d53e 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
@@ -37,6 +37,7 @@ import org.apache.seatunnel.engine.server.execution.TaskGroup;
 import org.apache.seatunnel.engine.server.execution.TaskGroupContext;
 import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
 import org.apache.seatunnel.engine.server.execution.TaskTracker;
+import org.apache.seatunnel.engine.server.operation.NotifyTaskStatusOperation;
 import org.apache.seatunnel.engine.server.task.TaskGroupImmutableInformation;
 
 import com.google.common.collect.Lists;
@@ -44,6 +45,7 @@ import com.hazelcast.internal.serialization.Data;
 import com.hazelcast.jet.impl.execution.init.CustomClassLoadedObject;
 import com.hazelcast.logging.ILogger;
 import com.hazelcast.spi.impl.NodeEngineImpl;
+import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
 import com.hazelcast.spi.properties.HazelcastProperties;
 import lombok.NonNull;
 import lombok.SneakyThrows;
@@ -141,20 +143,21 @@ public class TaskExecutionService {
             } else {
                 taskGroup = nodeEngine.getSerializationService().toObject(taskImmutableInfo.getGroup());
             }
-            if (executionContexts.containsKey(taskGroup.getTaskGroupInfo())) {
-                throw new RuntimeException(String.format("TaskGroupLocation: %s already exists", taskGroup.getTaskGroupInfo()));
+            if (executionContexts.containsKey(taskGroup.getTaskGroupLocation())) {
+                throw new RuntimeException(String.format("TaskGroupLocation: %s already exists", taskGroup.getTaskGroupLocation()));
             }
             return deployLocalTask(taskGroup, resultFuture);
         } catch (Throwable t) {
             logger.severe(String.format("TaskGroupID : %s  deploy error with Exception: %s",
-                taskGroup != null && taskGroup.getTaskGroupInfo() != null ? taskGroup.getTaskGroupInfo().toString() : "taskGroupInfo is null",
+                taskGroup != null && taskGroup.getTaskGroupLocation() != null ? taskGroup.getTaskGroupLocation().toString() : "taskGroupLocation is null",
                 ExceptionUtils.getMessage(t)));
             resultFuture.complete(
-                new TaskExecutionState(taskGroup != null && taskGroup.getTaskGroupInfo() != null ? taskGroup.getTaskGroupInfo() : null, ExecutionState.FAILED, t));
+                new TaskExecutionState(taskGroup != null && taskGroup.getTaskGroupLocation() != null ? taskGroup.getTaskGroupLocation() : null, ExecutionState.FAILED, t));
         }
         return new PassiveCompletableFuture<>(resultFuture);
     }
 
+    @SuppressWarnings("checkstyle:MagicNumber")
     public PassiveCompletableFuture<TaskExecutionState> deployLocalTask(
         @NonNull TaskGroup taskGroup,
         @NonNull CompletableFuture<TaskExecutionState> resultFuture
@@ -177,12 +180,32 @@ public class TaskExecutionService {
             submitThreadShareTask(executionTracker, byCooperation.get(true));
             submitBlockingTask(executionTracker, byCooperation.get(false));
             taskGroup.setTasksContext(taskExecutionContextMap);
-            executionContexts.put(taskGroup.getTaskGroupInfo(), new TaskGroupContext(taskGroup));
-            cancellationFutures.put(taskGroup.getTaskGroupInfo(), cancellationFuture);
+            executionContexts.put(taskGroup.getTaskGroupLocation(), new TaskGroupContext(taskGroup));
+            cancellationFutures.put(taskGroup.getTaskGroupLocation(), cancellationFuture);
         } catch (Throwable t) {
             logger.severe(ExceptionUtils.getMessage(t));
             resultFuture.completeExceptionally(t);
         }
+        resultFuture.whenComplete((r, s) -> {
+            InvocationFuture<Object> invoke = null;
+            long sleepTime = 1000;
+            do {
+                if (null != invoke) {
+                    logger.warning(String.format("notify the job of the task(%s) status failed, retry in %s millis", taskGroup.getTaskGroupLocation(), sleepTime));
+                    try {
+                        Thread.sleep(sleepTime += 1000);
+                    } catch (InterruptedException e) {
+                        logger.severe(e);
+                        Thread.interrupted();
+                    }
+                }
+                invoke = nodeEngine.getOperationService().createInvocationBuilder(
+                    SeaTunnelServer.SERVICE_NAME,
+                    new NotifyTaskStatusOperation(taskGroup.getTaskGroupLocation(), r),
+                    nodeEngine.getMasterAddress()).invoke();
+                invoke.join();
+            } while (!invoke.isDone());
+        });
         return new PassiveCompletableFuture<>(resultFuture);
     }
 
@@ -190,7 +213,7 @@ public class TaskExecutionService {
      * JobMaster call this method to cancel a task, and then {@link TaskExecutionService} cancel this task and send the
      * {@link TaskExecutionState} to JobMaster.
      *
-     * @param taskGroupLocation TaskGroup.getTaskGroupInfo()
+     * @param taskGroupLocation TaskGroup.getTaskGroupLocation()
      */
     public void cancelTaskGroup(TaskGroupLocation taskGroupLocation) {
         logger.info(String.format("Task (%s) need cancel.", taskGroupLocation));
@@ -383,7 +406,7 @@ public class TaskExecutionService {
 
         void taskDone() {
             if (completionLatch.decrementAndGet() == 0) {
-                TaskGroupLocation taskGroupLocation = taskGroup.getTaskGroupInfo();
+                TaskGroupLocation taskGroupLocation = taskGroup.getTaskGroupLocation();
                 executionContexts.remove(taskGroupLocation);
                 cancellationFutures.remove(taskGroupLocation);
                 Throwable ex = executionException.get();
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
index 15e7f843c..d091fa330 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
@@ -329,7 +329,7 @@ public class PhysicalVertex {
             try {
                 i++;
                 nodeEngine.getOperationService().createInvocationBuilder(Constant.SEATUNNEL_SERVICE_NAME,
-                        new CancelTaskOperation(taskGroup.getTaskGroupInfo()),
+                        new CancelTaskOperation(taskGroup.getTaskGroupLocation()),
                         currentExecutionAddress)
                     .invoke().get();
                 return;
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionState.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionState.java
index e31fdbcec..e43ecc35a 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionState.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionState.java
@@ -41,7 +41,7 @@ public class TaskExecutionState implements Serializable {
         return throwable;
     }
 
-    public TaskGroupLocation getTaskGroupInfo() {
+    public TaskGroupLocation getTaskGroupLocation() {
         return taskGroupLocation;
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java
index cee3eeaf3..5e2c654eb 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java
@@ -23,7 +23,7 @@ import java.util.Map;
 
 public interface TaskGroup extends Serializable {
 
-    TaskGroupLocation getTaskGroupInfo();
+    TaskGroupLocation getTaskGroupLocation();
 
     void init();
 
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupDefaultImpl.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupDefaultImpl.java
index d0d62364e..2aed98610 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupDefaultImpl.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupDefaultImpl.java
@@ -40,7 +40,7 @@ public class TaskGroupDefaultImpl implements TaskGroup {
     }
 
     @Override
-    public TaskGroupLocation getTaskGroupInfo() {
+    public TaskGroupLocation getTaskGroupLocation() {
         return taskGroupLocation;
     }
 
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/NotifyTaskStatusOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/NotifyTaskStatusOperation.java
new file mode 100644
index 000000000..377316c36
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/NotifyTaskStatusOperation.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.operation;
+
+import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
+import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+import org.apache.seatunnel.engine.server.master.JobMaster;
+
+import com.hazelcast.spi.impl.operationservice.Operation;
+
+public class NotifyTaskStatusOperation extends Operation {
+
+    private final TaskGroupLocation taskGroupLocation;
+    private final TaskExecutionState taskExecutionState;
+
+    public NotifyTaskStatusOperation(TaskGroupLocation taskGroupLocation, TaskExecutionState taskExecutionState) {
+        super();
+        this.taskGroupLocation = taskGroupLocation;
+        this.taskExecutionState = taskExecutionState;
+    }
+
+    @Override
+    public void run() throws Exception {
+        SeaTunnelServer service = getService();
+        JobMaster jobMaster = service.getJobMaster(taskGroupLocation.getJobId());
+        //TODO Notify jobMaster of TaskGroup State
+    }
+
+    @Override
+    public Object getResponse() {
+        return super.getResponse();
+    }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java
index 797de9365..bb99792ff 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java
@@ -100,7 +100,7 @@ public class PipelineBaseScheduler implements JobScheduler {
             // TODO If there is no enough resources for tasks, we need add some wait profile
             if (task.updateTaskState(ExecutionState.CREATED, ExecutionState.SCHEDULED)) {
                 resourceManager.applyForResource(physicalPlan.getJobImmutableInformation().getJobId(),
-                    task.getTaskGroup().getTaskGroupInfo());
+                    task.getTaskGroup().getTaskGroupLocation());
             } else {
                 handleTaskStateUpdateError(task, ExecutionState.SCHEDULED);
             }
@@ -115,7 +115,7 @@ public class PipelineBaseScheduler implements JobScheduler {
             return CompletableFuture.supplyAsync(() -> {
                 task.deploy(
                     resourceManager.getAppliedResource(physicalPlan.getJobImmutableInformation().getJobId(),
-                        task.getTaskGroup().getTaskGroupInfo()));
+                        task.getTaskGroup().getTaskGroupLocation()));
                 return null;
             });
         } else {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java
index d6d371e49..e7afae507 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java
@@ -93,7 +93,7 @@ public class TaskExecutionServiceTest extends AbstractSeaTunnelServerTest {
         TaskGroupDefaultImpl ts = new TaskGroupDefaultImpl(new TaskGroupLocation(jobId, pipeLineId, flakeIdGenerator.newId()), "ts", Lists.newArrayList(testTask1, testTask2));
         CompletableFuture<TaskExecutionState> completableFuture = taskExecutionService.deployLocalTask(ts, new CompletableFuture<>());
 
-        taskExecutionService.cancelTaskGroup(ts.getTaskGroupInfo());
+        taskExecutionService.cancelTaskGroup(ts.getTaskGroupLocation());
 
         await().atMost(sleepTime + 1000, TimeUnit.MILLISECONDS)
             .untilAsserted(() -> assertEquals(CANCELED, completableFuture.get().getExecutionState()));