You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by we...@apache.org on 2022/07/19 04:25:47 UTC

[dolphinscheduler] 20/29: [Fix-10785] Fix state event handle error will not retry (#10786)

This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch 3.0.0-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git

commit 4fc9bce444b482248a68b0cf2a310c59d848ef95
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Wed Jul 6 14:53:28 2022 +0800

    [Fix-10785] Fix state event handle error will not retry (#10786)
    
    * Fix state event handle error will not retry
    
    * Use state event handler to deal with the event
    
    (cherry picked from commit 67d14fb7b3d941af613a96a0b9c3a2928e5c201c)
---
 .../master/runner/WorkflowExecuteRunnable.java     | 35 ++++++++++------------
 1 file changed, 16 insertions(+), 19 deletions(-)

diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index 7a15a43d2b..16604662ac 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -281,14 +281,14 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
             try {
                 stateEvent = this.stateEvents.peek();
                 LoggerUtils.setWorkflowAndTaskInstanceIDMDC(stateEvent.getProcessInstanceId(),
-                                                            stateEvent.getTaskInstanceId());
+                    stateEvent.getTaskInstanceId());
                 // if state handle success then will remove this state, otherwise will retry this state next time.
                 // The state should always handle success except database error.
                 checkProcessInstance(stateEvent);
 
-                StateEventHandler stateEventHandler
-                    = StateEventHandlerManager.getStateEventHandler(stateEvent.getType())
-                    .orElseThrow(() -> new StateEventHandleError("Cannot find handler for the given state event"));
+                StateEventHandler stateEventHandler =
+                    StateEventHandlerManager.getStateEventHandler(stateEvent.getType())
+                        .orElseThrow(() -> new StateEventHandleError("Cannot find handler for the given state event"));
                 if (stateEventHandler.handleStateEvent(this, stateEvent)) {
                     this.stateEvents.remove(stateEvent);
                 }
@@ -298,14 +298,14 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
                 ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
             } catch (StateEventHandleException stateEventHandleException) {
                 logger.error("State event handle error, will retry this event: {}",
-                             stateEvent,
-                             stateEventHandleException);
+                    stateEvent,
+                    stateEventHandleException);
                 ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
             } catch (Exception e) {
                 // we catch the exception here, since if the state event handle failed, the state event will still keep in the stateEvents queue.
                 logger.error("State event handle error, get a unknown exception, will retry this event: {}",
-                             stateEvent,
-                             e);
+                    stateEvent,
+                    e);
                 ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
             } finally {
                 LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
@@ -350,7 +350,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
             ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(taskInstance.getTaskCode());
             taskProcessor.action(TaskAction.DISPATCH);
             this.processService.updateTaskGroupQueueStatus(taskGroupQueue.getTaskId(),
-                                                           TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode());
+                TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode());
             return true;
         }
         if (taskGroupQueue.getInQueue() == Flag.YES.getCode()) {
@@ -454,8 +454,8 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
         TaskInstance newTaskInstance = cloneRetryTaskInstance(taskInstance);
         if (newTaskInstance == null) {
             logger.error("retry fail, new taskInstance is null, task code:{}, task id:{}",
-                         taskInstance.getTaskCode(),
-                         taskInstance.getId());
+                taskInstance.getTaskCode(),
+                taskInstance.getId());
             return;
         }
         waitToRetryTaskInstanceMap.put(newTaskInstance.getTaskCode(), newTaskInstance);
@@ -787,8 +787,8 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
         errorTaskMap.clear();
 
         if (!isNewProcessInstance()) {
-            List<TaskInstance> validTaskInstanceList
-                = processService.findValidTaskListByProcessId(processInstance.getId());
+            List<TaskInstance> validTaskInstanceList =
+                processService.findValidTaskListByProcessId(processInstance.getId());
             for (TaskInstance task : validTaskInstanceList) {
                 if (validTaskMap.containsKey(task.getTaskCode())) {
                     int oldTaskInstanceId = validTaskMap.get(task.getTaskCode());
@@ -799,7 +799,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
                         continue;
                     }
                     logger.warn("have same taskCode taskInstance when init task queue, taskCode:{}",
-                                task.getTaskCode());
+                        task.getTaskCode());
                 }
 
                 validTaskMap.put(task.getTaskCode(), task.getId());
@@ -1193,10 +1193,8 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
     }
 
     private void submitPostNode(String parentNodeCode) throws StateEventHandleException {
-        Set<String> submitTaskNodeList = DagHelper.parsePostNodes(parentNodeCode,
-                                                                  skipTaskNodeMap,
-                                                                  dag,
-                                                                  getCompleteTaskInstanceMap());
+        Set<String> submitTaskNodeList =
+            DagHelper.parsePostNodes(parentNodeCode, skipTaskNodeMap, dag, getCompleteTaskInstanceMap());
         List<TaskInstance> taskInstances = new ArrayList<>();
         for (String taskNode : submitTaskNodeList) {
             TaskNode taskNodeObject = dag.getNode(taskNode);
@@ -1859,7 +1857,6 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
         return waitToRetryTaskInstanceMap;
     }
 
-
     private void setGlobalParamIfCommanded(ProcessDefinition processDefinition, Map<String, String> cmdParam) {
         // get start params from command param
         Map<String, String> startParamMap = new HashMap<>();