You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by GitBox <gi...@apache.org> on 2022/08/30 11:02:56 UTC

[GitHub] [incubator-seatunnel] EricJoy2048 opened a new pull request, #2567: [Feature][ST-Engine] Notice Task State By TaskExecutionService

EricJoy2048 opened a new pull request, #2567:
URL: https://github.com/apache/incubator-seatunnel/pull/2567

   Because JobMaster may done and restart in the new Master Node. So we can not use Future to receive the TaskGroup State.
   
   Alternative, let the TaskExecutionService notice JobMaster about TaskGroup State is better.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] EricJoy2048 commented on a diff in pull request #2567: [Feature][ST-Engine] Notice Task State By TaskExecutionService

Posted by GitBox <gi...@apache.org>.
EricJoy2048 commented on code in PR #2567:
URL: https://github.com/apache/incubator-seatunnel/pull/2567#discussion_r958546670


##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java:
##########
@@ -191,4 +194,16 @@ public JobStatus getJobStatus(long jobId) {
         }
         return runningJobMaster.getJobStatus();
     }
+
+    /**
+     * When TaskGroup ends, it is called by {@link TaskExecutionService} to notify JobMaster the TaskGroup's state.
+     */
+    public void updateTaskExecutionState(TaskExecutionState taskExecutionState) {
+        TaskGroupLocation taskGroupLocation = taskExecutionState.getTaskGroupLocation();
+        JobMaster runningJobMaster = runningJobMasterMap.get(taskGroupLocation.getJobId());
+        if (runningJobMaster == null) {
+            throw new JobException(String.format("Job %s not running", taskGroupLocation.getJobId()));
+        }
+        runningJobMaster.updateTaskExecutionState(taskExecutionState);

Review Comment:
   I will update it in a new pr.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] EricJoy2048 commented on a diff in pull request #2567: [Feature][ST-Engine] Notice Task State By TaskExecutionService

Posted by GitBox <gi...@apache.org>.
EricJoy2048 commented on code in PR #2567:
URL: https://github.com/apache/incubator-seatunnel/pull/2567#discussion_r961631912


##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java:
##########
@@ -165,12 +166,11 @@ public void deploy(@NonNull Address address) {
 
         try {
             if (ExecutionState.DEPLOYING.equals(executionState.get())) {
-                waitForCompleteByExecutionService = new PassiveCompletableFuture<>(

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] Hisoka-X commented on pull request #2567: [Feature][ST-Engine] Notice Task State By TaskExecutionService

Posted by GitBox <gi...@apache.org>.
Hisoka-X commented on PR #2567:
URL: https://github.com/apache/incubator-seatunnel/pull/2567#issuecomment-1236086828

   Please merge from `st-engine` again, lots of code had changed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] Hisoka-X commented on a diff in pull request #2567: [Feature][ST-Engine] Notice Task State By TaskExecutionService

Posted by GitBox <gi...@apache.org>.
Hisoka-X commented on code in PR #2567:
URL: https://github.com/apache/incubator-seatunnel/pull/2567#discussion_r962695101


##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java:
##########
@@ -154,65 +150,71 @@ public PassiveCompletableFuture<TaskExecutionState> initStateFuture() {
     private void deployOnLocal(@NonNull SlotProfile slotProfile) {
         deployInternal(taskGroupImmutableInformation -> {
             SeaTunnelServer server = nodeEngine.getService(SeaTunnelServer.SERVICE_NAME);
-            return new PassiveCompletableFuture<>(server.getSlotService().getSlotContext(slotProfile)
-                .getTaskExecutionService().deployTask(taskGroupImmutableInformation));
+            server.getSlotService().getSlotContext(slotProfile)
+                .getTaskExecutionService().deployTask(taskGroupImmutableInformation);
+            return null;
         });
     }
 
     private void deployOnRemote(@NonNull SlotProfile slotProfile) {
-        deployInternal(taskGroupImmutableInformation -> new PassiveCompletableFuture<>(
-            nodeEngine.getOperationService().createInvocationBuilder(Constant.SEATUNNEL_SERVICE_NAME,
-                    new DeployTaskOperation(slotProfile,
-                        nodeEngine.getSerializationService().toData(taskGroupImmutableInformation)),
-                    slotProfile.getWorker())
-                .invoke()));
+        deployInternal(taskGroupImmutableInformation -> {
+            try {
+                nodeEngine.getOperationService().createInvocationBuilder(Constant.SEATUNNEL_SERVICE_NAME,
+                        new DeployTaskOperation(slotProfile,
+                            nodeEngine.getSerializationService().toData(taskGroupImmutableInformation)),
+                        slotProfile.getWorker())
+                    .invoke().get();
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+            return null;
+        });
     }
 
     @SuppressWarnings("checkstyle:MagicNumber")
     // This method must not throw an exception
     public void deploy(@NonNull SlotProfile slotProfile) {
-        currentExecutionAddress = slotProfile.getWorker();
-        if (slotProfile.getWorker().equals(nodeEngine.getThisAddress())) {
-            deployOnLocal(slotProfile);
-        } else {
-            deployOnRemote(slotProfile);
+        try {
+            currentExecutionAddress = slotProfile.getWorker();
+            if (slotProfile.getWorker().equals(nodeEngine.getThisAddress())) {
+                deployOnLocal(slotProfile);
+            } else {
+                deployOnRemote(slotProfile);
+            }
+        } catch (Throwable th) {
+            failedByException(th);
         }
     }
 
-    private void deployInternal(Function<TaskGroupImmutableInformation, PassiveCompletableFuture<TaskExecutionState>> deployMethod) {
+    private void deployInternal(
+        Function<TaskGroupImmutableInformation, PassiveCompletableFuture<TaskExecutionState>> deployMethod) {

Review Comment:
   Change `Function` to `Consumer`, because deploy method doesn't return anything at now.



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java:
##########
@@ -189,4 +190,28 @@ public JobStatus getJobStatus() {
     public PhysicalPlan getPhysicalPlan() {
         return physicalPlan;
     }
+
+    public void updateTaskExecutionState(TaskExecutionState taskExecutionState) {
+        this.physicalPlan.getPipelineList().forEach(pipeline -> {
+            if (pipeline.getPipelineIndex() != taskExecutionState.getTaskGroupLocation().getPipelineId()) {
+                return;
+            }
+
+            pipeline.getCoordinatorVertexList().forEach(task -> {
+                if (task.getTaskGroupLocation().equals(taskExecutionState.getTaskGroupLocation())) {
+                    return;

Review Comment:
   Why `equals` but not `not equals`?



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java:
##########
@@ -189,4 +190,28 @@ public JobStatus getJobStatus() {
     public PhysicalPlan getPhysicalPlan() {
         return physicalPlan;
     }
+
+    public void updateTaskExecutionState(TaskExecutionState taskExecutionState) {
+        this.physicalPlan.getPipelineList().forEach(pipeline -> {
+            if (pipeline.getPipelineIndex() != taskExecutionState.getTaskGroupLocation().getPipelineId()) {
+                return;
+            }
+
+            pipeline.getCoordinatorVertexList().forEach(task -> {
+                if (task.getTaskGroupLocation().equals(taskExecutionState.getTaskGroupLocation())) {
+                    return;
+                }
+
+                task.updateTaskExecutionState(taskExecutionState);
+            });
+
+            pipeline.getPhysicalVertexList().forEach(task -> {
+                if (task.getTaskGroupLocation().equals(taskExecutionState.getTaskGroupLocation())) {
+                    return;

Review Comment:
   same as above



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java:
##########
@@ -189,4 +190,28 @@ public JobStatus getJobStatus() {
     public PhysicalPlan getPhysicalPlan() {
         return physicalPlan;
     }
+
+    public void updateTaskExecutionState(TaskExecutionState taskExecutionState) {
+        this.physicalPlan.getPipelineList().forEach(pipeline -> {
+            if (pipeline.getPipelineIndex() != taskExecutionState.getTaskGroupLocation().getPipelineId()) {

Review Comment:
   unify pipelineIndex and PipelineId maybe better



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] ic4y commented on a diff in pull request #2567: [Feature][ST-Engine] Notice Task State By TaskExecutionService

Posted by GitBox <gi...@apache.org>.
ic4y commented on code in PR #2567:
URL: https://github.com/apache/incubator-seatunnel/pull/2567#discussion_r958448441


##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java:
##########
@@ -165,12 +166,11 @@ public void deploy(@NonNull Address address) {
 
         try {
             if (ExecutionState.DEPLOYING.equals(executionState.get())) {
-                waitForCompleteByExecutionService = new PassiveCompletableFuture<>(

Review Comment:
   `waitForCompleteByExecutionService` objects are no longer in use



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java:
##########
@@ -191,4 +194,16 @@ public JobStatus getJobStatus(long jobId) {
         }
         return runningJobMaster.getJobStatus();
     }
+
+    /**
+     * When TaskGroup ends, it is called by {@link TaskExecutionService} to notify JobMaster the TaskGroup's state.
+     */
+    public void updateTaskExecutionState(TaskExecutionState taskExecutionState) {
+        TaskGroupLocation taskGroupLocation = taskExecutionState.getTaskGroupLocation();
+        JobMaster runningJobMaster = runningJobMasterMap.get(taskGroupLocation.getJobId());
+        if (runningJobMaster == null) {
+            throw new JobException(String.format("Job %s not running", taskGroupLocation.getJobId()));
+        }
+        runningJobMaster.updateTaskExecutionState(taskExecutionState);

Review Comment:
   It is suggest to abstract CoodinationServer to handle JobMaster related operations



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] Hisoka-X merged pull request #2567: [Feature][ST-Engine] Notice Task State By TaskExecutionService

Posted by GitBox <gi...@apache.org>.
Hisoka-X merged PR #2567:
URL: https://github.com/apache/incubator-seatunnel/pull/2567


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org