You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by zo...@apache.org on 2022/08/30 09:09:48 UTC
[incubator-seatunnel] branch st-engine updated: [engine][Task] add notifyTaskStatus to jobMaster (#2562)
This is an automated email from the ASF dual-hosted git repository.
zongwen pushed a commit to branch st-engine
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/st-engine by this push:
new 526727519 [engine][Task] add notifyTaskStatus to jobMaster (#2562)
526727519 is described below
commit 526727519277ef393f827da38047b3e9095027bb
Author: ic4y <83...@users.noreply.github.com>
AuthorDate: Tue Aug 30 17:09:43 2022 +0800
[engine][Task] add notifyTaskStatus to jobMaster (#2562)
---
.../engine/server/TaskExecutionService.java | 39 +++++++++++++----
.../engine/server/dag/physical/PhysicalVertex.java | 2 +-
.../server/execution/TaskExecutionState.java | 2 +-
.../engine/server/execution/TaskGroup.java | 2 +-
.../server/execution/TaskGroupDefaultImpl.java | 2 +-
.../operation/NotifyTaskStatusOperation.java | 49 ++++++++++++++++++++++
.../server/scheduler/PipelineBaseScheduler.java | 4 +-
.../engine/server/TaskExecutionServiceTest.java | 2 +-
8 files changed, 87 insertions(+), 15 deletions(-)
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
index f739fb192..03ea4d53e 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
@@ -37,6 +37,7 @@ import org.apache.seatunnel.engine.server.execution.TaskGroup;
import org.apache.seatunnel.engine.server.execution.TaskGroupContext;
import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
import org.apache.seatunnel.engine.server.execution.TaskTracker;
+import org.apache.seatunnel.engine.server.operation.NotifyTaskStatusOperation;
import org.apache.seatunnel.engine.server.task.TaskGroupImmutableInformation;
import com.google.common.collect.Lists;
@@ -44,6 +45,7 @@ import com.hazelcast.internal.serialization.Data;
import com.hazelcast.jet.impl.execution.init.CustomClassLoadedObject;
import com.hazelcast.logging.ILogger;
import com.hazelcast.spi.impl.NodeEngineImpl;
+import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
import com.hazelcast.spi.properties.HazelcastProperties;
import lombok.NonNull;
import lombok.SneakyThrows;
@@ -141,20 +143,21 @@ public class TaskExecutionService {
} else {
taskGroup = nodeEngine.getSerializationService().toObject(taskImmutableInfo.getGroup());
}
- if (executionContexts.containsKey(taskGroup.getTaskGroupInfo())) {
- throw new RuntimeException(String.format("TaskGroupLocation: %s already exists", taskGroup.getTaskGroupInfo()));
+ if (executionContexts.containsKey(taskGroup.getTaskGroupLocation())) {
+ throw new RuntimeException(String.format("TaskGroupLocation: %s already exists", taskGroup.getTaskGroupLocation()));
}
return deployLocalTask(taskGroup, resultFuture);
} catch (Throwable t) {
logger.severe(String.format("TaskGroupID : %s deploy error with Exception: %s",
- taskGroup != null && taskGroup.getTaskGroupInfo() != null ? taskGroup.getTaskGroupInfo().toString() : "taskGroupInfo is null",
+ taskGroup != null && taskGroup.getTaskGroupLocation() != null ? taskGroup.getTaskGroupLocation().toString() : "taskGroupLocation is null",
ExceptionUtils.getMessage(t)));
resultFuture.complete(
- new TaskExecutionState(taskGroup != null && taskGroup.getTaskGroupInfo() != null ? taskGroup.getTaskGroupInfo() : null, ExecutionState.FAILED, t));
+ new TaskExecutionState(taskGroup != null && taskGroup.getTaskGroupLocation() != null ? taskGroup.getTaskGroupLocation() : null, ExecutionState.FAILED, t));
}
return new PassiveCompletableFuture<>(resultFuture);
}
+ @SuppressWarnings("checkstyle:MagicNumber")
public PassiveCompletableFuture<TaskExecutionState> deployLocalTask(
@NonNull TaskGroup taskGroup,
@NonNull CompletableFuture<TaskExecutionState> resultFuture
@@ -177,12 +180,32 @@ public class TaskExecutionService {
submitThreadShareTask(executionTracker, byCooperation.get(true));
submitBlockingTask(executionTracker, byCooperation.get(false));
taskGroup.setTasksContext(taskExecutionContextMap);
- executionContexts.put(taskGroup.getTaskGroupInfo(), new TaskGroupContext(taskGroup));
- cancellationFutures.put(taskGroup.getTaskGroupInfo(), cancellationFuture);
+ executionContexts.put(taskGroup.getTaskGroupLocation(), new TaskGroupContext(taskGroup));
+ cancellationFutures.put(taskGroup.getTaskGroupLocation(), cancellationFuture);
} catch (Throwable t) {
logger.severe(ExceptionUtils.getMessage(t));
resultFuture.completeExceptionally(t);
}
+ resultFuture.whenComplete((r, s) -> {
+ InvocationFuture<Object> invoke = null;
+ long sleepTime = 1000;
+ do {
+ if (null != invoke) {
+ logger.warning(String.format("notify the job of the task(%s) status failed, retry in %s millis", taskGroup.getTaskGroupLocation(), sleepTime));
+ try {
+ Thread.sleep(sleepTime += 1000);
+ } catch (InterruptedException e) {
+ logger.severe(e);
+ Thread.interrupted();
+ }
+ }
+ invoke = nodeEngine.getOperationService().createInvocationBuilder(
+ SeaTunnelServer.SERVICE_NAME,
+ new NotifyTaskStatusOperation(taskGroup.getTaskGroupLocation(), r),
+ nodeEngine.getMasterAddress()).invoke();
+ invoke.join();
+ } while (!invoke.isDone());
+ });
return new PassiveCompletableFuture<>(resultFuture);
}
@@ -190,7 +213,7 @@ public class TaskExecutionService {
* JobMaster call this method to cancel a task, and then {@link TaskExecutionService} cancel this task and send the
* {@link TaskExecutionState} to JobMaster.
*
- * @param taskGroupLocation TaskGroup.getTaskGroupInfo()
+ * @param taskGroupLocation TaskGroup.getTaskGroupLocation()
*/
public void cancelTaskGroup(TaskGroupLocation taskGroupLocation) {
logger.info(String.format("Task (%s) need cancel.", taskGroupLocation));
@@ -383,7 +406,7 @@ public class TaskExecutionService {
void taskDone() {
if (completionLatch.decrementAndGet() == 0) {
- TaskGroupLocation taskGroupLocation = taskGroup.getTaskGroupInfo();
+ TaskGroupLocation taskGroupLocation = taskGroup.getTaskGroupLocation();
executionContexts.remove(taskGroupLocation);
cancellationFutures.remove(taskGroupLocation);
Throwable ex = executionException.get();
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
index 15e7f843c..d091fa330 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
@@ -329,7 +329,7 @@ public class PhysicalVertex {
try {
i++;
nodeEngine.getOperationService().createInvocationBuilder(Constant.SEATUNNEL_SERVICE_NAME,
- new CancelTaskOperation(taskGroup.getTaskGroupInfo()),
+ new CancelTaskOperation(taskGroup.getTaskGroupLocation()),
currentExecutionAddress)
.invoke().get();
return;
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionState.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionState.java
index e31fdbcec..e43ecc35a 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionState.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionState.java
@@ -41,7 +41,7 @@ public class TaskExecutionState implements Serializable {
return throwable;
}
- public TaskGroupLocation getTaskGroupInfo() {
+ public TaskGroupLocation getTaskGroupLocation() {
return taskGroupLocation;
}
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java
index cee3eeaf3..5e2c654eb 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java
@@ -23,7 +23,7 @@ import java.util.Map;
public interface TaskGroup extends Serializable {
- TaskGroupLocation getTaskGroupInfo();
+ TaskGroupLocation getTaskGroupLocation();
void init();
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupDefaultImpl.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupDefaultImpl.java
index d0d62364e..2aed98610 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupDefaultImpl.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupDefaultImpl.java
@@ -40,7 +40,7 @@ public class TaskGroupDefaultImpl implements TaskGroup {
}
@Override
- public TaskGroupLocation getTaskGroupInfo() {
+ public TaskGroupLocation getTaskGroupLocation() {
return taskGroupLocation;
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/NotifyTaskStatusOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/NotifyTaskStatusOperation.java
new file mode 100644
index 000000000..377316c36
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/NotifyTaskStatusOperation.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.operation;
+
+import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
+import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+import org.apache.seatunnel.engine.server.master.JobMaster;
+
+import com.hazelcast.spi.impl.operationservice.Operation;
+
+public class NotifyTaskStatusOperation extends Operation {
+
+ private final TaskGroupLocation taskGroupLocation;
+ private final TaskExecutionState taskExecutionState;
+
+ public NotifyTaskStatusOperation(TaskGroupLocation taskGroupLocation, TaskExecutionState taskExecutionState) {
+ super();
+ this.taskGroupLocation = taskGroupLocation;
+ this.taskExecutionState = taskExecutionState;
+ }
+
+ @Override
+ public void run() throws Exception {
+ SeaTunnelServer service = getService();
+ JobMaster jobMaster = service.getJobMaster(taskGroupLocation.getJobId());
+ //TODO Notify jobMaster of TaskGroup State
+ }
+
+ @Override
+ public Object getResponse() {
+ return super.getResponse();
+ }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java
index 797de9365..bb99792ff 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java
@@ -100,7 +100,7 @@ public class PipelineBaseScheduler implements JobScheduler {
// TODO If there is no enough resources for tasks, we need add some wait profile
if (task.updateTaskState(ExecutionState.CREATED, ExecutionState.SCHEDULED)) {
resourceManager.applyForResource(physicalPlan.getJobImmutableInformation().getJobId(),
- task.getTaskGroup().getTaskGroupInfo());
+ task.getTaskGroup().getTaskGroupLocation());
} else {
handleTaskStateUpdateError(task, ExecutionState.SCHEDULED);
}
@@ -115,7 +115,7 @@ public class PipelineBaseScheduler implements JobScheduler {
return CompletableFuture.supplyAsync(() -> {
task.deploy(
resourceManager.getAppliedResource(physicalPlan.getJobImmutableInformation().getJobId(),
- task.getTaskGroup().getTaskGroupInfo()));
+ task.getTaskGroup().getTaskGroupLocation()));
return null;
});
} else {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java
index d6d371e49..e7afae507 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java
@@ -93,7 +93,7 @@ public class TaskExecutionServiceTest extends AbstractSeaTunnelServerTest {
TaskGroupDefaultImpl ts = new TaskGroupDefaultImpl(new TaskGroupLocation(jobId, pipeLineId, flakeIdGenerator.newId()), "ts", Lists.newArrayList(testTask1, testTask2));
CompletableFuture<TaskExecutionState> completableFuture = taskExecutionService.deployLocalTask(ts, new CompletableFuture<>());
- taskExecutionService.cancelTaskGroup(ts.getTaskGroupInfo());
+ taskExecutionService.cancelTaskGroup(ts.getTaskGroupLocation());
await().atMost(sleepTime + 1000, TimeUnit.MILLISECONDS)
.untilAsserted(() -> assertEquals(CANCELED, completableFuture.get().getExecutionState()));