You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2022/08/13 06:19:20 UTC

[dolphinscheduler] branch dev updated: Set master's task running status in `runTask` to avoid the task group acquire failed, but the task status is in running (#11451)

This is an automated email from the ASF dual-hosted git repository.

caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 05589606a2 Set master's task running status in `runTask` to avoid the task group acquire failed, but the task status is in running (#11451)
05589606a2 is described below

commit 05589606a2fe6b7287b146e82f927b1eceaed701
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Sat Aug 13 14:19:13 2022 +0800

    Set master's task running status in `runTask` to avoid the task group acquire failed, but the task status is in running (#11451)
---
 .../master/runner/task/BlockingTaskProcessor.java  |  6 ++--
 .../master/runner/task/ConditionTaskProcessor.java |  6 ++--
 .../master/runner/task/SwitchTaskProcessor.java    | 34 ++++++++++------------
 3 files changed, 23 insertions(+), 23 deletions(-)

diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java
index c6a1a99d88..96fed73c0d 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java
@@ -114,19 +114,21 @@ public class BlockingTaskProcessor extends BaseTaskProcessor {
             return false;
         }
         this.setTaskExecutionLogger();
-        initTaskParameters();
-        logger.info("blocking task start");
+        logger.info("blocking task submit success");
         return true;
     }
 
     @Override
     protected boolean runTask() {
+        logger.info("blocking task starting");
+        initTaskParameters();
         if (conditionResult.equals(DependResult.WAITING)) {
             setConditionResult();
             endTask();
         } else {
             endTask();
         }
+        logger.info("blocking task finished");
         return true;
     }
 
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
index 2e4f7bf1ac..ed6ec54baf 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
@@ -67,19 +67,21 @@ public class ConditionTaskProcessor extends BaseTaskProcessor {
             return false;
         }
         this.setTaskExecutionLogger();
-        initTaskParameters();
-        logger.info("condition task start");
+        logger.info("condition task submit success");
         return true;
     }
 
     @Override
     public boolean runTask() {
+        initTaskParameters();
+        logger.info("condition task start");
         if (conditionResult.equals(DependResult.WAITING)) {
             setConditionResult();
             endTask();
         } else {
             endTask();
         }
+        logger.info("condition task finished");
         return true;
     }
 
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
index 82802d8e61..2c9396b244 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
@@ -17,8 +17,9 @@
 
 package org.apache.dolphinscheduler.server.master.runner.task;
 
-import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SWITCH;
-
+import com.google.auto.service.AutoService;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.common.utils.NetUtils;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
@@ -30,9 +31,6 @@ import org.apache.dolphinscheduler.plugin.task.api.parameters.SwitchParameters;
 import org.apache.dolphinscheduler.server.master.utils.SwitchTaskUtils;
 import org.apache.dolphinscheduler.server.utils.LogUtils;
 
-import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
-
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
@@ -41,7 +39,7 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
-import com.google.auto.service.AutoService;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SWITCH;
 
 /**
  * switch task processor
@@ -65,6 +63,13 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
             return false;
         }
         this.setTaskExecutionLogger();
+        logger.info("switch task submit success");
+        return true;
+    }
+
+    @Override
+    public boolean runTask() {
+        logger.info("switch task starting");
         taskInstance.setLogPath(
                 LogUtils.getTaskLogPath(taskInstance.getFirstSubmitTime(), processInstance.getProcessDefinitionCode(),
                         processInstance.getProcessDefinitionVersion(),
@@ -74,21 +79,12 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
         taskInstance.setState(TaskExecutionStatus.RUNNING_EXECUTION);
         taskInstance.setStartTime(new Date());
         processService.updateTaskInstance(taskInstance);
-        return true;
-    }
 
-    @Override
-    public boolean runTask() {
-        try {
-            if (!this.taskInstance().getState().isFinished() && setSwitchResult()) {
-                endTaskState();
-            }
-        } catch (Exception e) {
-            logger.error("update work flow {} switch task {} state error:",
-                    this.processInstance.getId(),
-                    this.taskInstance.getId(),
-                    e);
+        if (!this.taskInstance().getState().isFinished()) {
+            setSwitchResult();
         }
+        endTaskState();
+        logger.info("switch task finished");
         return true;
     }