You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by GitBox <gi...@apache.org> on 2022/09/05 09:34:13 UTC

[GitHub] [incubator-seatunnel] Hisoka-X commented on a diff in pull request #2567: [Feature][ST-Engine] Notice Task State By TaskExecutionService

Hisoka-X commented on code in PR #2567:
URL: https://github.com/apache/incubator-seatunnel/pull/2567#discussion_r962695101


##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java:
##########
@@ -154,65 +150,71 @@ public PassiveCompletableFuture<TaskExecutionState> initStateFuture() {
     private void deployOnLocal(@NonNull SlotProfile slotProfile) {
         deployInternal(taskGroupImmutableInformation -> {
             SeaTunnelServer server = nodeEngine.getService(SeaTunnelServer.SERVICE_NAME);
-            return new PassiveCompletableFuture<>(server.getSlotService().getSlotContext(slotProfile)
-                .getTaskExecutionService().deployTask(taskGroupImmutableInformation));
+            server.getSlotService().getSlotContext(slotProfile)
+                .getTaskExecutionService().deployTask(taskGroupImmutableInformation);
+            return null;
         });
     }
 
     private void deployOnRemote(@NonNull SlotProfile slotProfile) {
-        deployInternal(taskGroupImmutableInformation -> new PassiveCompletableFuture<>(
-            nodeEngine.getOperationService().createInvocationBuilder(Constant.SEATUNNEL_SERVICE_NAME,
-                    new DeployTaskOperation(slotProfile,
-                        nodeEngine.getSerializationService().toData(taskGroupImmutableInformation)),
-                    slotProfile.getWorker())
-                .invoke()));
+        deployInternal(taskGroupImmutableInformation -> {
+            try {
+                nodeEngine.getOperationService().createInvocationBuilder(Constant.SEATUNNEL_SERVICE_NAME,
+                        new DeployTaskOperation(slotProfile,
+                            nodeEngine.getSerializationService().toData(taskGroupImmutableInformation)),
+                        slotProfile.getWorker())
+                    .invoke().get();
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+            return null;
+        });
     }
 
     @SuppressWarnings("checkstyle:MagicNumber")
     // This method must not throw an exception
     public void deploy(@NonNull SlotProfile slotProfile) {
-        currentExecutionAddress = slotProfile.getWorker();
-        if (slotProfile.getWorker().equals(nodeEngine.getThisAddress())) {
-            deployOnLocal(slotProfile);
-        } else {
-            deployOnRemote(slotProfile);
+        try {
+            currentExecutionAddress = slotProfile.getWorker();
+            if (slotProfile.getWorker().equals(nodeEngine.getThisAddress())) {
+                deployOnLocal(slotProfile);
+            } else {
+                deployOnRemote(slotProfile);
+            }
+        } catch (Throwable th) {
+            failedByException(th);
         }
     }
 
-    private void deployInternal(Function<TaskGroupImmutableInformation, PassiveCompletableFuture<TaskExecutionState>> deployMethod) {
+    private void deployInternal(
+        Function<TaskGroupImmutableInformation, PassiveCompletableFuture<TaskExecutionState>> deployMethod) {

Review Comment:
   Change `Function` to `Consumer`, because deploy method doesn't return anything at now.



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java:
##########
@@ -189,4 +190,28 @@ public JobStatus getJobStatus() {
     public PhysicalPlan getPhysicalPlan() {
         return physicalPlan;
     }
+
+    public void updateTaskExecutionState(TaskExecutionState taskExecutionState) {
+        this.physicalPlan.getPipelineList().forEach(pipeline -> {
+            if (pipeline.getPipelineIndex() != taskExecutionState.getTaskGroupLocation().getPipelineId()) {
+                return;
+            }
+
+            pipeline.getCoordinatorVertexList().forEach(task -> {
+                if (task.getTaskGroupLocation().equals(taskExecutionState.getTaskGroupLocation())) {
+                    return;

Review Comment:
   Why `equals` but not `not equals`?



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java:
##########
@@ -189,4 +190,28 @@ public JobStatus getJobStatus() {
     public PhysicalPlan getPhysicalPlan() {
         return physicalPlan;
     }
+
+    public void updateTaskExecutionState(TaskExecutionState taskExecutionState) {
+        this.physicalPlan.getPipelineList().forEach(pipeline -> {
+            if (pipeline.getPipelineIndex() != taskExecutionState.getTaskGroupLocation().getPipelineId()) {
+                return;
+            }
+
+            pipeline.getCoordinatorVertexList().forEach(task -> {
+                if (task.getTaskGroupLocation().equals(taskExecutionState.getTaskGroupLocation())) {
+                    return;
+                }
+
+                task.updateTaskExecutionState(taskExecutionState);
+            });
+
+            pipeline.getPhysicalVertexList().forEach(task -> {
+                if (task.getTaskGroupLocation().equals(taskExecutionState.getTaskGroupLocation())) {
+                    return;

Review Comment:
   same as above



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java:
##########
@@ -189,4 +190,28 @@ public JobStatus getJobStatus() {
     public PhysicalPlan getPhysicalPlan() {
         return physicalPlan;
     }
+
+    public void updateTaskExecutionState(TaskExecutionState taskExecutionState) {
+        this.physicalPlan.getPipelineList().forEach(pipeline -> {
+            if (pipeline.getPipelineIndex() != taskExecutionState.getTaskGroupLocation().getPipelineId()) {

Review Comment:
   unify pipelineIndex and PipelineId maybe better



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org