You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by we...@apache.org on 2022/07/19 04:25:51 UTC

[dolphinscheduler] 24/29: Catch exception when check state in StateWheelExecuteThread (#10908)

This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch 3.0.0-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git

commit 527ee472fbc22cee6cd564f2cb6e3ee008939d04
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Wed Jul 13 10:51:20 2022 +0800

    Catch exception when check state in StateWheelExecuteThread (#10908)
    
    * Catch exception when check state
    
    (cherry picked from commit 2a67866718a74a6abc30fb615c0e16978511e3eb)
---
 .../master/runner/StateWheelExecuteThread.java     | 68 ++++++++++++++--------
 .../master/runner/WorkflowExecuteRunnable.java     |  5 +-
 .../src/main/resources/application.yaml            |  2 +-
 .../service/process/ProcessServiceImpl.java        |  2 +-
 .../service/process/ProcessServiceTest.java        |  7 ++-
 .../src/main/resources/application.yaml            |  2 +-
 6 files changed, 57 insertions(+), 29 deletions(-)

diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
index 0d1b3a423a..c053cb238b 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
@@ -134,23 +134,34 @@ public class StateWheelExecuteThread extends BaseDaemonThread {
             return;
         }
         for (Integer processInstanceId : processInstanceTimeoutCheckList) {
-            WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
-            if (workflowExecuteThread == null) {
-                logger.warn("Check workflow timeout failed, can not find workflowExecuteThread from cache manager, will remove this workflowInstance from check list");
-                processInstanceTimeoutCheckList.remove(processInstanceId);
-                continue;
-            }
-            ProcessInstance processInstance = workflowExecuteThread.getProcessInstance();
-            if (processInstance == null) {
-                logger.warn("Check workflow timeout failed, the workflowInstance is null");
-                continue;
-            }
-            long timeRemain = DateUtils.getRemainTime(processInstance.getStartTime(), (long) processInstance.getTimeout() * Constants.SEC_2_MINUTES_TIME_UNIT);
-            if (timeRemain < 0) {
-                logger.info("Workflow instance timeout, adding timeout event");
-                addProcessTimeoutEvent(processInstance);
-                processInstanceTimeoutCheckList.remove(processInstance.getId());
-                logger.info("Workflow instance timeout, added timeout event");
+            try {
+                LoggerUtils.setWorkflowInstanceIdMDC(processInstanceId);
+                WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(
+                    processInstanceId);
+                if (workflowExecuteThread == null) {
+                    logger.warn(
+                        "Check workflow timeout failed, can not find workflowExecuteThread from cache manager, will remove this workflowInstance from check list");
+                    processInstanceTimeoutCheckList.remove(processInstanceId);
+                    continue;
+                }
+                ProcessInstance processInstance = workflowExecuteThread.getProcessInstance();
+                if (processInstance == null) {
+                    logger.warn("Check workflow timeout failed, the workflowInstance is null");
+                    continue;
+                }
+                long timeRemain = DateUtils.getRemainTime(processInstance.getStartTime(),
+                                                          (long) processInstance.getTimeout()
+                                                              * Constants.SEC_2_MINUTES_TIME_UNIT);
+                if (timeRemain < 0) {
+                    logger.info("Workflow instance timeout, adding timeout event");
+                    addProcessTimeoutEvent(processInstance);
+                    processInstanceTimeoutCheckList.remove(processInstance.getId());
+                    logger.info("Workflow instance timeout, added timeout event");
+                }
+            } catch (Exception ex) {
+                logger.error("Check workflow instance timeout error");
+            } finally {
+                LoggerUtils.removeWorkflowInstanceIdMDC();
             }
         }
     }
@@ -243,20 +254,26 @@ public class StateWheelExecuteThread extends BaseDaemonThread {
                 }
                 Optional<TaskInstance> taskInstanceOptional = workflowExecuteThread.getActiveTaskInstanceByTaskCode(taskCode);
                 if (!taskInstanceOptional.isPresent()) {
-                    logger.warn("Check task instance timeout failed, can not get taskInstance from workflowExecuteThread, taskCode: {}"
-                        + "will remove this check task", taskCode);
+                    logger.warn(
+                        "Check task instance timeout failed, can not get taskInstance from workflowExecuteThread, taskCode: {}"
+                            + "will remove this check task",
+                        taskCode);
                     taskInstanceTimeoutCheckList.remove(taskInstanceKey);
                     continue;
                 }
                 TaskInstance taskInstance = taskInstanceOptional.get();
                 if (TimeoutFlag.OPEN == taskInstance.getTaskDefine().getTimeoutFlag()) {
-                    long timeRemain = DateUtils.getRemainTime(taskInstance.getStartTime(), (long) taskInstance.getTaskDefine().getTimeout() * Constants.SEC_2_MINUTES_TIME_UNIT);
+                    long timeRemain = DateUtils.getRemainTime(taskInstance.getStartTime(),
+                                                              (long) taskInstance.getTaskDefine().getTimeout()
+                                                                  * Constants.SEC_2_MINUTES_TIME_UNIT);
                     if (timeRemain < 0) {
                         logger.info("Task instance is timeout, adding task timeout event and remove the check");
                         addTaskTimeoutEvent(taskInstance);
                         taskInstanceTimeoutCheckList.remove(taskInstanceKey);
                     }
                 }
+            } catch (Exception ex) {
+                logger.error("Check task timeout error, taskInstanceKey: {}", taskInstanceKey, ex);
             } finally {
                 LoggerUtils.removeWorkflowInstanceIdMDC();
             }
@@ -277,8 +294,9 @@ public class StateWheelExecuteThread extends BaseDaemonThread {
                 WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
 
                 if (workflowExecuteThread == null) {
-                    logger.warn("Task instance retry check failed, can not find workflowExecuteThread from cache manager, "
-                        + "will remove this check task");
+                    logger.warn(
+                        "Task instance retry check failed, can not find workflowExecuteThread from cache manager, "
+                            + "will remove this check task");
                     taskInstanceRetryCheckList.remove(taskInstanceKey);
                     continue;
                 }
@@ -308,13 +326,15 @@ public class StateWheelExecuteThread extends BaseDaemonThread {
                     // reset taskInstance endTime and state
                     // todo relative funtion: TaskInstance.retryTaskIntervalOverTime, WorkflowExecuteThread.cloneRetryTaskInstance
                     logger.info("[TaskInstance-{}]The task instance can retry, will retry this task instance",
-                        taskInstance.getId());
+                                taskInstance.getId());
                     taskInstance.setEndTime(null);
                     taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
 
                     addTaskRetryEvent(taskInstance);
                     taskInstanceRetryCheckList.remove(taskInstanceKey);
                 }
+            } catch (Exception ex) {
+                logger.error("Check task retry error, taskInstanceKey: {}", taskInstanceKey, ex);
             } finally {
                 LoggerUtils.removeWorkflowInstanceIdMDC();
             }
@@ -349,6 +369,8 @@ public class StateWheelExecuteThread extends BaseDaemonThread {
                     continue;
                 }
                 addTaskStateChangeEvent(taskInstance);
+            } catch (Exception ex) {
+                logger.error("Task state check error, taskInstanceKey: {}", taskInstanceKey, ex);
             } finally {
                 LoggerUtils.removeWorkflowInstanceIdMDC();
             }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index a433981aca..0e94b0318d 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -552,8 +552,9 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
     }
 
     public Optional<TaskInstance> getActiveTaskInstanceByTaskCode(long taskCode) {
-        if (activeTaskProcessorMaps.containsKey(taskCode)) {
-            return Optional.ofNullable(activeTaskProcessorMaps.get(taskCode).taskInstance());
+        Integer taskInstanceId = validTaskMap.get(taskCode);
+        if (taskInstanceId != null) {
+            return Optional.ofNullable(taskInstanceMap.get(taskInstanceId));
         }
         return Optional.empty();
     }
diff --git a/dolphinscheduler-master/src/main/resources/application.yaml b/dolphinscheduler-master/src/main/resources/application.yaml
index a277272026..ff29b19389 100644
--- a/dolphinscheduler-master/src/main/resources/application.yaml
+++ b/dolphinscheduler-master/src/main/resources/application.yaml
@@ -102,7 +102,7 @@ master:
   task-commit-retry-times: 5
   # master commit task interval
   task-commit-interval: 1s
-  state-wheel-interval: 5
+  state-wheel-interval: 5s
   # master max cpuload avg, only higher than the system cpu load average, master server can schedule. default value -1: the number of cpu cores * 2
   max-cpu-load-avg: -1
   # master reserved memory, only lower than system available memory, master server can schedule. default value 0.3, the unit is G
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index b12cd77de2..9cce9152f2 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -909,7 +909,7 @@ public class ProcessServiceImpl implements ProcessService {
                                                        command.getProcessDefinitionVersion());
         if (processDefinition == null) {
             logger.error("cannot find the work process define! define code : {}", command.getProcessDefinitionCode());
-            return null;
+            throw new IllegalArgumentException("Cannot find the process definition for this workflowInstance");
         }
         Map<String, String> cmdParam = JSONUtils.toMap(command.getCommandParam());
         int processInstanceId = command.getProcessInstanceId();
diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
index a98138dd03..82dc3f4673 100644
--- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
+++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
@@ -292,7 +292,12 @@ public class ProcessServiceTest {
                                     + "\":\"111\",\""
                                     + CMD_PARAM_SUB_PROCESS_DEFINE_CODE
                                     + "\":\"222\"}");
-        Assert.assertNull(processService.handleCommand(host, command));
+        try {
+            Assert.assertNull(processService.handleCommand(host, command));
+        } catch (IllegalArgumentException illegalArgumentException) {
+            // assert throw illegalArgumentException here since the definition is null
+            Assert.assertTrue(true);
+        }
 
         int definitionVersion = 1;
         long definitionCode = 123;
diff --git a/dolphinscheduler-standalone-server/src/main/resources/application.yaml b/dolphinscheduler-standalone-server/src/main/resources/application.yaml
index cf5ce5ea88..9a940b6c0e 100644
--- a/dolphinscheduler-standalone-server/src/main/resources/application.yaml
+++ b/dolphinscheduler-standalone-server/src/main/resources/application.yaml
@@ -120,7 +120,7 @@ master:
   task-commit-retry-times: 5
   # master commit task interval
   task-commit-interval: 1s
-  state-wheel-interval: 5
+  state-wheel-interval: 5s
   # master max cpuload avg, only higher than the system cpu load average, master server can schedule. default value -1: the number of cpu cores * 2
   max-cpu-load-avg: -1
   # master reserved memory, only lower than system available memory, master server can schedule. default value 0.3, the unit is G