You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by we...@apache.org on 2022/07/06 06:53:37 UTC

[dolphinscheduler] branch dev updated: [Fix-10785] Fix state event handle error will not retry (#10786)

This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 67d14fb7b3 [Fix-10785] Fix state event handle error will not retry (#10786)
67d14fb7b3 is described below

commit 67d14fb7b3d941af613a96a0b9c3a2928e5c201c
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Wed Jul 6 14:53:28 2022 +0800

    [Fix-10785] Fix state event handle error will not retry (#10786)
    
    * Fix state event handle error will not retry
    
    * Use state event handler to deal with the event
---
 .../api/service/impl/ExecutorServiceImpl.java      |   3 +-
 .../master/consumer/TaskPriorityQueueConsumer.java |   2 +
 .../master/dispatch/host/assign/Selector.java      |   2 +-
 .../server/master/event}/StateEvent.java           |   3 +-
 .../StateEventHandleError.java}                    |  23 +-
 .../StateEventHandleException.java}                |  22 +-
 .../server/master/event/StateEventHandler.java     |  38 +-
 .../master/event/StateEventHandlerManager.java     |  41 +-
 .../master/event/TaskRetryStateEventHandler.java   |  47 +++
 .../server/master/event/TaskStateEventHandler.java | 115 ++++++
 .../master/event/TaskTimeoutStateEventHandler.java |  64 +++
 .../event/TaskWaitTaskGroupStateHandler.java       |  42 +-
 .../event/WorkflowBlockStateEventHandler.java      |  60 +++
 .../master/event/WorkflowStateEventHandler.java    |  95 +++++
 .../event/WorkflowTimeoutStateEventHandler.java    |  39 ++
 .../master/processor/StateEventProcessor.java      |   5 +-
 .../master/processor/TaskEventProcessor.java       |   2 +-
 .../processor/queue/StateEventResponseService.java |   3 +-
 .../master/processor/queue/TaskEventService.java   |  17 +-
 .../processor/queue/TaskExecuteRunnable.java       |   3 +-
 .../processor/queue/TaskExecuteThreadPool.java     |  21 +-
 .../master/runner/MasterSchedulerService.java      |   2 +-
 .../master/runner/StateWheelExecuteThread.java     |   2 +-
 .../master/runner/WorkflowExecuteRunnable.java     | 438 ++++++---------------
 .../master/runner/WorkflowExecuteThreadPool.java   |   3 +-
 .../server/master/service/FailoverService.java     |   2 +-
 .../server/master/service/FailoverServiceTest.java |   2 +-
 .../remote/processor/NettyRemoteChannel.java       |   7 +-
 .../processor/StateEventCallbackService.java       |  29 +-
 .../queue/PeerTaskInstancePriorityQueue.java       |   3 +-
 .../service/queue/TaskPriorityQueue.java           |   2 +-
 .../service/queue/TaskPriorityQueueImpl.java       |   3 +-
 .../processor/TaskExecuteResponseAckProcessor.java |   8 +-
 33 files changed, 674 insertions(+), 474 deletions(-)

diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
index 396dcd3811..0e7d1b5abb 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
@@ -121,7 +121,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
     private ProcessService processService;
 
     @Autowired
-    StateEventCallbackService stateEventCallbackService;
+    private StateEventCallbackService stateEventCallbackService;
 
     @Autowired
     private TaskDefinitionMapper taskDefinitionMapper;
@@ -500,6 +500,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
 
         // determine whether the process is normal
         if (update > 0) {
+            // directly send the process instance state change event to target master, not guarantee the event send success
             StateEventChangeCommand stateEventChangeCommand = new StateEventChangeCommand(
                     processInstance.getId(), 0, processInstance.getState(), processInstance.getId(), 0
             );
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
index e8cf4b2919..f04ae15dd8 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
@@ -116,7 +116,9 @@ public class TaskPriorityQueueConsumer extends BaseDaemonThread {
     @PostConstruct
     public void init() {
         this.consumerThreadPoolExecutor = (ThreadPoolExecutor) ThreadUtils.newDaemonFixedThreadExecutor("TaskUpdateQueueConsumerThread", masterConfig.getDispatchTaskNumber());
+        logger.info("Task priority queue consume thread staring");
         super.start();
+        logger.info("Task priority queue consume thread started");
     }
 
     @Override
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/Selector.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/Selector.java
index 8eed9d991e..49a4053e5e 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/Selector.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/Selector.java
@@ -27,7 +27,7 @@ public interface Selector<T> {
 
     /**
      * select
-     * @param source source
+     * @param source source, the given source should not be empty.
      * @return T
      */
     T select(Collection<T> source);
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEvent.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/StateEvent.java
similarity index 91%
copy from dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEvent.java
copy to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/StateEvent.java
index 7f4be924dd..68cd582994 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEvent.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/StateEvent.java
@@ -15,8 +15,9 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.common.enums;
+package org.apache.dolphinscheduler.server.master.event;
 
+import org.apache.dolphinscheduler.common.enums.StateEventType;
 import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
 
 import io.netty.channel.Channel;
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/Selector.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/StateEventHandleError.java
similarity index 63%
copy from dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/Selector.java
copy to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/StateEventHandleError.java
index 8eed9d991e..a8dce04f03 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/Selector.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/StateEventHandleError.java
@@ -15,20 +15,19 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
-
-import java.util.Collection;
+package org.apache.dolphinscheduler.server.master.event;
 
 /**
- * selector
- * @param <T> T
+ * This exception represent the exception can not recover, this happens when the event is broken.
+ * And when we get this exception, we will drop the event.
  */
-public interface Selector<T> {
+public class StateEventHandleError extends Exception {
+
+    public StateEventHandleError(String message) {
+        super(message);
+    }
 
-    /**
-     * select
-     * @param source source
-     * @return T
-     */
-    T select(Collection<T> source);
+    public StateEventHandleError(String message, Throwable throwable) {
+        super(message, throwable);
+    }
 }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/Selector.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/StateEventHandleException.java
similarity index 65%
copy from dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/Selector.java
copy to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/StateEventHandleException.java
index 8eed9d991e..dc9456a397 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/Selector.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/StateEventHandleException.java
@@ -15,20 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
-
-import java.util.Collection;
+package org.apache.dolphinscheduler.server.master.event;
 
 /**
- * selector
- * @param <T> T
+ * This exception represent the exception can be recovered, when we get this exception, we will retry the event.
  */
-public interface Selector<T> {
+public class StateEventHandleException extends Exception {
+
+    public StateEventHandleException(String message) {
+        super(message);
+    }
 
-    /**
-     * select
-     * @param source source
-     * @return T
-     */
-    T select(Collection<T> source);
+    public StateEventHandleException(String message, Throwable throwable) {
+        super(message, throwable);
+    }
 }
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEvent.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/StateEventHandler.java
similarity index 51%
copy from dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEvent.java
copy to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/StateEventHandler.java
index 7f4be924dd..00808b2e29 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEvent.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/StateEventHandler.java
@@ -15,36 +15,22 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.common.enums;
+package org.apache.dolphinscheduler.server.master.event;
 
-import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.common.enums.StateEventType;
+import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
 
-import io.netty.channel.Channel;
-import lombok.Data;
-
-/**
- * state event
- */
-@Data
-public class StateEvent {
+public interface StateEventHandler {
 
     /**
-     * origin_pid-origin_task_id-process_instance_id-task_instance_id
+     * Handle a event, if handle success will reture true, else return false
+     *
+     * @param stateEvent given state event.
+     * @throws StateEventHandleException this exception means it can be recovered.
+     * @throws StateEventHandleError     this exception means it cannot be recovered, so the event need to drop.
      */
-    private String key;
-
-    private StateEventType type;
-
-    private ExecutionStatus executionStatus;
-
-    private int taskInstanceId;
-
-    private long taskCode;
-
-    private int processInstanceId;
-
-    private String context;
-
-    private Channel channel;
+    boolean handleStateEvent(WorkflowExecuteRunnable workflowExecuteRunnable, StateEvent stateEvent)
+        throws StateEventHandleException, StateEventHandleError;
 
+    StateEventType getEventType();
 }
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEvent.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/StateEventHandlerManager.java
similarity index 51%
copy from dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEvent.java
copy to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/StateEventHandlerManager.java
index 7f4be924dd..b436b55890 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEvent.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/StateEventHandlerManager.java
@@ -15,36 +15,27 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.common.enums;
+package org.apache.dolphinscheduler.server.master.event;
 
-import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.common.enums.StateEventType;
 
-import io.netty.channel.Channel;
-import lombok.Data;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.ServiceLoader;
 
-/**
- * state event
- */
-@Data
-public class StateEvent {
-
-    /**
-     * origin_pid-origin_task_id-process_instance_id-task_instance_id
-     */
-    private String key;
-
-    private StateEventType type;
-
-    private ExecutionStatus executionStatus;
-
-    private int taskInstanceId;
-
-    private long taskCode;
+public class StateEventHandlerManager {
 
-    private int processInstanceId;
+    private static final Map<StateEventType, StateEventHandler> stateEventHandlerMap = new HashMap<>();
 
-    private String context;
+    static {
+        ServiceLoader.load(StateEventHandler.class)
+            .forEach(stateEventHandler -> stateEventHandlerMap.put(stateEventHandler.getEventType(),
+                stateEventHandler));
+    }
 
-    private Channel channel;
+    public static Optional<StateEventHandler> getStateEventHandler(StateEventType stateEventType) {
+        return Optional.ofNullable(stateEventHandlerMap.get(stateEventType));
+    }
 
 }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRetryStateEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRetryStateEventHandler.java
new file mode 100644
index 0000000000..ee8168856a
--- /dev/null
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRetryStateEventHandler.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.event;
+
+import org.apache.dolphinscheduler.common.enums.StateEventType;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics;
+import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
+
+import java.util.Map;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(StateEventHandler.class)
+public class TaskRetryStateEventHandler implements StateEventHandler {
+    @Override
+    public boolean handleStateEvent(WorkflowExecuteRunnable workflowExecuteRunnable, StateEvent stateEvent)
+        throws StateEventHandleException {
+        TaskMetrics.incTaskRetry();
+        Map<Long, TaskInstance> waitToRetryTaskInstanceMap = workflowExecuteRunnable.getWaitToRetryTaskInstanceMap();
+        TaskInstance taskInstance = waitToRetryTaskInstanceMap.get(stateEvent.getTaskCode());
+        workflowExecuteRunnable.addTaskToStandByList(taskInstance);
+        workflowExecuteRunnable.submitStandByTask();
+        waitToRetryTaskInstanceMap.remove(stateEvent.getTaskCode());
+        return true;
+    }
+
+    @Override
+    public StateEventType getEventType() {
+        return StateEventType.TASK_RETRY;
+    }
+}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskStateEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskStateEventHandler.java
new file mode 100644
index 0000000000..e3ad268f97
--- /dev/null
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskStateEventHandler.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.event;
+
+import org.apache.dolphinscheduler.common.enums.StateEventType;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics;
+import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
+import org.apache.dolphinscheduler.server.master.runner.task.ITaskProcessor;
+import org.apache.dolphinscheduler.server.master.runner.task.TaskAction;
+
+import java.util.Map;
+import java.util.Optional;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(StateEventHandler.class)
+public class TaskStateEventHandler implements StateEventHandler {
+
+    private static final Logger logger = LoggerFactory.getLogger(TaskStateEventHandler.class);
+
+    @Override
+    public boolean handleStateEvent(WorkflowExecuteRunnable workflowExecuteRunnable, StateEvent stateEvent)
+        throws StateEventHandleException, StateEventHandleError {
+        measureTaskState(stateEvent);
+        workflowExecuteRunnable.checkTaskInstanceByStateEvent(stateEvent);
+
+        Optional<TaskInstance> taskInstanceOptional =
+            workflowExecuteRunnable.getTaskInstance(stateEvent.getTaskInstanceId());
+
+        TaskInstance task = taskInstanceOptional.orElseThrow(() -> new StateEventHandleError(
+            "Cannot find task instance from taskMap by task instance id: " + stateEvent.getTaskInstanceId()));
+
+        if (task.getState() == null) {
+            throw new StateEventHandleError("Task state event handle error due to task state is null");
+        }
+
+        Map<Long, Integer> completeTaskMap = workflowExecuteRunnable.getCompleteTaskMap();
+
+        if (task.getState().typeIsFinished()) {
+            if (completeTaskMap.containsKey(task.getTaskCode())
+                && completeTaskMap.get(task.getTaskCode()) == task.getId()) {
+                logger.warn("The task instance is already complete, stateEvent: {}", stateEvent);
+                return true;
+            }
+            workflowExecuteRunnable.taskFinished(task);
+            if (task.getTaskGroupId() > 0) {
+                workflowExecuteRunnable.releaseTaskGroup(task);
+            }
+            return true;
+        }
+        Map<Long, ITaskProcessor> activeTaskProcessMap = workflowExecuteRunnable.getActiveTaskProcessMap();
+        if (activeTaskProcessMap.containsKey(task.getTaskCode())) {
+            ITaskProcessor iTaskProcessor = activeTaskProcessMap.get(task.getTaskCode());
+            iTaskProcessor.action(TaskAction.RUN);
+
+            if (iTaskProcessor.taskInstance().getState().typeIsFinished()) {
+                if (iTaskProcessor.taskInstance().getState() != task.getState()) {
+                    task.setState(iTaskProcessor.taskInstance().getState());
+                }
+                workflowExecuteRunnable.taskFinished(task);
+            }
+            return true;
+        }
+        throw new StateEventHandleException(
+            "Task state event handle error, due to the task is not in activeTaskProcessorMaps");
+    }
+
+    @Override
+    public StateEventType getEventType() {
+        return StateEventType.TASK_STATE_CHANGE;
+    }
+
+    private void measureTaskState(StateEvent taskStateEvent) {
+        if (taskStateEvent == null || taskStateEvent.getExecutionStatus() == null) {
+            // the event is broken
+            logger.warn("The task event is broken..., taskEvent: {}", taskStateEvent);
+            return;
+        }
+        if (taskStateEvent.getExecutionStatus().typeIsFinished()) {
+            TaskMetrics.incTaskFinish();
+        }
+        switch (taskStateEvent.getExecutionStatus()) {
+            case STOP:
+                TaskMetrics.incTaskStop();
+                break;
+            case SUCCESS:
+                TaskMetrics.incTaskSuccess();
+                break;
+            case FAILURE:
+                TaskMetrics.incTaskFailure();
+                break;
+            default:
+                break;
+        }
+    }
+}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskTimeoutStateEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskTimeoutStateEventHandler.java
new file mode 100644
index 0000000000..240f10ff2c
--- /dev/null
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskTimeoutStateEventHandler.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.event;
+
+import org.apache.dolphinscheduler.common.enums.StateEventType;
+import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
+import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics;
+import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
+import org.apache.dolphinscheduler.server.master.runner.task.ITaskProcessor;
+import org.apache.dolphinscheduler.server.master.runner.task.TaskAction;
+
+import java.util.Map;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(StateEventHandler.class)
+public class TaskTimeoutStateEventHandler implements StateEventHandler {
+    @Override
+    public boolean handleStateEvent(WorkflowExecuteRunnable workflowExecuteRunnable, StateEvent stateEvent)
+        throws StateEventHandleError {
+        TaskMetrics.incTaskTimeout();
+        workflowExecuteRunnable.checkTaskInstanceByStateEvent(stateEvent);
+
+        TaskInstance taskInstance = workflowExecuteRunnable.getTaskInstance(stateEvent.getTaskInstanceId()).get();
+
+        if (TimeoutFlag.CLOSE == taskInstance.getTaskDefine().getTimeoutFlag()) {
+            return true;
+        }
+        TaskTimeoutStrategy taskTimeoutStrategy = taskInstance.getTaskDefine().getTimeoutNotifyStrategy();
+        Map<Long, ITaskProcessor> activeTaskProcessMap = workflowExecuteRunnable.getActiveTaskProcessMap();
+        if (TaskTimeoutStrategy.FAILED == taskTimeoutStrategy
+            || TaskTimeoutStrategy.WARNFAILED == taskTimeoutStrategy) {
+            ITaskProcessor taskProcessor = activeTaskProcessMap.get(taskInstance.getTaskCode());
+            taskProcessor.action(TaskAction.TIMEOUT);
+        }
+        if (TaskTimeoutStrategy.WARN == taskTimeoutStrategy || TaskTimeoutStrategy.WARNFAILED == taskTimeoutStrategy) {
+            workflowExecuteRunnable.processTimeout();
+            workflowExecuteRunnable.taskTimeout(taskInstance);
+        }
+        return true;
+    }
+
+    @Override
+    public StateEventType getEventType() {
+        return StateEventType.TASK_TIMEOUT;
+    }
+}
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEvent.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskWaitTaskGroupStateHandler.java
similarity index 54%
rename from dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEvent.java
rename to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskWaitTaskGroupStateHandler.java
index 7f4be924dd..9a3c59a949 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEvent.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskWaitTaskGroupStateHandler.java
@@ -15,36 +15,22 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.common.enums;
+package org.apache.dolphinscheduler.server.master.event;
 
-import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.common.enums.StateEventType;
+import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
 
-import io.netty.channel.Channel;
-import lombok.Data;
+import com.google.auto.service.AutoService;
 
-/**
- * state event
- */
-@Data
-public class StateEvent {
-
-    /**
-     * origin_pid-origin_task_id-process_instance_id-task_instance_id
-     */
-    private String key;
-
-    private StateEventType type;
-
-    private ExecutionStatus executionStatus;
-
-    private int taskInstanceId;
-
-    private long taskCode;
-
-    private int processInstanceId;
-
-    private String context;
-
-    private Channel channel;
+@AutoService(StateEventHandler.class)
+public class TaskWaitTaskGroupStateHandler implements StateEventHandler {
+    @Override
+    public boolean handleStateEvent(WorkflowExecuteRunnable workflowExecuteRunnable, StateEvent stateEvent) {
+        return workflowExecuteRunnable.checkForceStartAndWakeUp(stateEvent);
+    }
 
+    @Override
+    public StateEventType getEventType() {
+        return StateEventType.WAIT_TASK_GROUP;
+    }
 }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowBlockStateEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowBlockStateEventHandler.java
new file mode 100644
index 0000000000..f7349fcbd1
--- /dev/null
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowBlockStateEventHandler.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.event;
+
+import org.apache.dolphinscheduler.common.enums.StateEventType;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.plugin.task.api.parameters.BlockingParameters;
+import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
+
+import java.util.Optional;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(StateEventHandler.class)
+public class WorkflowBlockStateEventHandler implements StateEventHandler {
+
+    private static final Logger logger = LoggerFactory.getLogger(WorkflowBlockStateEventHandler.class);
+
+    @Override
+    public boolean handleStateEvent(WorkflowExecuteRunnable workflowExecuteRunnable, StateEvent stateEvent)
+        throws StateEventHandleError {
+        Optional<TaskInstance> taskInstanceOptional =
+            workflowExecuteRunnable.getTaskInstance(stateEvent.getTaskInstanceId());
+        if (!taskInstanceOptional.isPresent()) {
+            throw new StateEventHandleError("Cannot find taskInstance from taskMap by taskInstanceId: "
+                + stateEvent.getTaskInstanceId());
+        }
+        TaskInstance task = taskInstanceOptional.get();
+
+        BlockingParameters parameters = JSONUtils.parseObject(task.getTaskParams(), BlockingParameters.class);
+        if (parameters != null && parameters.isAlertWhenBlocking()) {
+            workflowExecuteRunnable.processBlock();
+        }
+        return true;
+    }
+
+    @Override
+    public StateEventType getEventType() {
+        return StateEventType.PROCESS_BLOCKED;
+    }
+}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowStateEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowStateEventHandler.java
new file mode 100644
index 0000000000..37d8ceb1da
--- /dev/null
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowStateEventHandler.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.event;
+
+import org.apache.dolphinscheduler.common.enums.StateEventType;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics;
+import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(StateEventHandler.class)
+public class WorkflowStateEventHandler implements StateEventHandler {
+
+    private static final Logger logger = LoggerFactory.getLogger(WorkflowStateEventHandler.class);
+
+    @Override
+    public boolean handleStateEvent(WorkflowExecuteRunnable workflowExecuteRunnable, StateEvent stateEvent)
+        throws StateEventHandleException {
+        measureProcessState(stateEvent);
+        ProcessInstance processInstance = workflowExecuteRunnable.getProcessInstance();
+        ProcessDefinition processDefinition = processInstance.getProcessDefinition();
+
+        logger.info("process:{} state {} change to {}",
+            processInstance.getId(),
+            processInstance.getState(),
+            stateEvent.getExecutionStatus());
+
+        if (stateEvent.getExecutionStatus() == ExecutionStatus.STOP) {
+            // serial wait execution type needs to wake up the waiting process
+            if (processDefinition.getExecutionType().typeIsSerialWait() || processDefinition.getExecutionType()
+                .typeIsSerialPriority()) {
+                workflowExecuteRunnable.endProcess();
+                return true;
+            }
+            workflowExecuteRunnable.updateProcessInstanceState(stateEvent);
+            return true;
+        }
+        if (workflowExecuteRunnable.processComplementData()) {
+            return true;
+        }
+        if (stateEvent.getExecutionStatus().typeIsFinished()) {
+            workflowExecuteRunnable.endProcess();
+        }
+        if (processInstance.getState() == ExecutionStatus.READY_STOP) {
+            workflowExecuteRunnable.killAllTasks();
+        }
+
+        return true;
+    }
+
+    @Override
+    public StateEventType getEventType() {
+        return StateEventType.PROCESS_STATE_CHANGE;
+    }
+
+    private void measureProcessState(StateEvent processStateEvent) {
+        if (processStateEvent.getExecutionStatus().typeIsFinished()) {
+            ProcessInstanceMetrics.incProcessInstanceFinish();
+        }
+        switch (processStateEvent.getExecutionStatus()) {
+            case STOP:
+                ProcessInstanceMetrics.incProcessInstanceStop();
+                break;
+            case SUCCESS:
+                ProcessInstanceMetrics.incProcessInstanceSuccess();
+                break;
+            case FAILURE:
+                ProcessInstanceMetrics.incProcessInstanceFailure();
+                break;
+            default:
+                break;
+        }
+    }
+}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowTimeoutStateEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowTimeoutStateEventHandler.java
new file mode 100644
index 0000000000..c2fc873bdc
--- /dev/null
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowTimeoutStateEventHandler.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.event;
+
+import org.apache.dolphinscheduler.common.enums.StateEventType;
+import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics;
+import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(StateEventHandler.class)
+public class WorkflowTimeoutStateEventHandler implements StateEventHandler {
+    @Override
+    public boolean handleStateEvent(WorkflowExecuteRunnable workflowExecuteRunnable, StateEvent stateEvent) {
+        ProcessInstanceMetrics.incProcessInstanceTimeout();
+        workflowExecuteRunnable.processTimeout();
+        return true;
+    }
+
+    @Override
+    public StateEventType getEventType() {
+        return StateEventType.PROCESS_TIMEOUT;
+    }
+}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java
index b2c47e4112..f90277d0eb 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java
@@ -17,7 +17,7 @@
 
 package org.apache.dolphinscheduler.server.master.processor;
 
-import org.apache.dolphinscheduler.common.enums.StateEvent;
+import org.apache.dolphinscheduler.server.master.event.StateEvent;
 import org.apache.dolphinscheduler.common.enums.StateEventType;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.common.utils.LoggerUtils;
@@ -62,7 +62,8 @@ public class StateEventProcessor implements NettyRequestProcessor {
         }
         stateEvent.setProcessInstanceId(stateEventChangeCommand.getDestProcessInstanceId());
         stateEvent.setTaskInstanceId(stateEventChangeCommand.getDestTaskInstanceId());
-        StateEventType type = stateEvent.getTaskInstanceId() == 0 ? StateEventType.PROCESS_STATE_CHANGE : StateEventType.TASK_STATE_CHANGE;
+        StateEventType
+            type = stateEvent.getTaskInstanceId() == 0 ? StateEventType.PROCESS_STATE_CHANGE : StateEventType.TASK_STATE_CHANGE;
         stateEvent.setType(type);
 
         try {
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskEventProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskEventProcessor.java
index e597f1cae5..42d4ab4db7 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskEventProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskEventProcessor.java
@@ -17,7 +17,7 @@
 
 package org.apache.dolphinscheduler.server.master.processor;
 
-import org.apache.dolphinscheduler.common.enums.StateEvent;
+import org.apache.dolphinscheduler.server.master.event.StateEvent;
 import org.apache.dolphinscheduler.common.enums.StateEventType;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.common.utils.LoggerUtils;
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java
index 4ca6b9eccb..bdf80bbee9 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java
@@ -17,7 +17,7 @@
 
 package org.apache.dolphinscheduler.server.master.processor.queue;
 
-import org.apache.dolphinscheduler.common.enums.StateEvent;
+import org.apache.dolphinscheduler.server.master.event.StateEvent;
 import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
 import org.apache.dolphinscheduler.common.thread.Stopper;
 import org.apache.dolphinscheduler.common.utils.LoggerUtils;
@@ -98,6 +98,7 @@ public class StateEventResponseService {
      */
     public void addResponse(StateEvent stateEvent) {
         try {
+            // check the event is validated
             eventQueue.put(stateEvent);
         } catch (InterruptedException e) {
             logger.error("Put state event : {} error", stateEvent, e);
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEventService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEventService.java
index bed3b3d9ed..d4b97c09c7 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEventService.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEventService.java
@@ -63,11 +63,15 @@ public class TaskEventService {
 
     @PostConstruct
     public void start() {
-        this.taskEventThread = new TaskEventThread();
+        this.taskEventThread = new TaskEventDispatchThread();
+        logger.info("TaskEvent dispatch thread starting");
         this.taskEventThread.start();
+        logger.info("TaskEvent dispatch thread started");
 
         this.taskEventHandlerThread = new TaskEventHandlerThread();
+        logger.info("TaskEvent handle thread staring");
         this.taskEventHandlerThread.start();
+        logger.info("TaskEvent handle thread started");
     }
 
     @PreDestroy
@@ -94,14 +98,14 @@ public class TaskEventService {
      * @param taskEvent taskEvent
      */
     public void addEvent(TaskEvent taskEvent) {
-        taskExecuteThreadPool.submitTaskEvent(taskEvent);
+        eventQueue.add(taskEvent);
     }
 
     /**
-     * task worker thread
+     * Dispatch event to target task runnable.
      */
-    class TaskEventThread extends BaseDaemonThread {
-        protected TaskEventThread() {
+    class TaskEventDispatchThread extends BaseDaemonThread {
+        protected TaskEventDispatchThread() {
             super("TaskEventLoopThread");
         }
 
@@ -109,7 +113,7 @@ public class TaskEventService {
         public void run() {
             while (Stopper.isRunning()) {
                 try {
-                    // if not task , blocking here
+                    // if not task event, blocking here
                     TaskEvent taskEvent = eventQueue.take();
                     taskExecuteThreadPool.submitTaskEvent(taskEvent);
                 } catch (InterruptedException e) {
@@ -141,6 +145,7 @@ public class TaskEventService {
                     TimeUnit.MILLISECONDS.sleep(Constants.SLEEP_TIME_MILLIS);
                 } catch (InterruptedException e) {
                     Thread.currentThread().interrupt();
+                    logger.warn("TaskEvent handle thread interrupted, will return this loop");
                     break;
                 } catch (Exception e) {
                     logger.error("event handler thread error", e);
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteRunnable.java
index 9529259611..f2a6d6873e 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteRunnable.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteRunnable.java
@@ -18,7 +18,7 @@
 package org.apache.dolphinscheduler.server.master.processor.queue;
 
 import org.apache.dolphinscheduler.common.enums.Event;
-import org.apache.dolphinscheduler.common.enums.StateEvent;
+import org.apache.dolphinscheduler.server.master.event.StateEvent;
 import org.apache.dolphinscheduler.common.enums.StateEventType;
 import org.apache.dolphinscheduler.common.utils.LoggerUtils;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
@@ -193,6 +193,7 @@ public class TaskExecuteRunnable implements Runnable {
                 }
             }
             // if taskInstance is null (maybe deleted) or finish. retry will be meaningless . so ack success
+            // send ack to worker
             TaskExecuteRunningAckCommand taskExecuteRunningAckCommand = new TaskExecuteRunningAckCommand(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId());
             channel.writeAndFlush(taskExecuteRunningAckCommand.convert2Command());
         } catch (Exception e) {
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java
index 68e4511c42..ae578d4c03 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java
@@ -82,18 +82,13 @@ public class TaskExecuteThreadPool extends ThreadPoolTaskExecutor {
             logger.warn("Cannot find workflowExecuteThread from cacheManager, event: {}", taskEvent);
             return;
         }
-        if (!taskExecuteThreadMap.containsKey(taskEvent.getProcessInstanceId())) {
-            TaskExecuteRunnable taskExecuteThread = new TaskExecuteRunnable(
-                    taskEvent.getProcessInstanceId(),
-                    processService, workflowExecuteThreadPool,
-                    processInstanceExecCacheManager,
-                    dataQualityResultOperator);
-            taskExecuteThreadMap.put(taskEvent.getProcessInstanceId(), taskExecuteThread);
-        }
-        TaskExecuteRunnable taskExecuteRunnable= taskExecuteThreadMap.get(taskEvent.getProcessInstanceId());
-        if (taskExecuteRunnable != null) {
-            taskExecuteRunnable.addEvent(taskEvent);
-        }
+        TaskExecuteRunnable taskExecuteRunnable = taskExecuteThreadMap.computeIfAbsent(taskEvent.getProcessInstanceId(),
+            (processInstanceId) -> new TaskExecuteRunnable(processInstanceId,
+                processService,
+                workflowExecuteThreadPool,
+                processInstanceExecCacheManager,
+                dataQualityResultOperator));
+        taskExecuteRunnable.addEvent(taskEvent);
     }
 
     public void eventHandler() {
@@ -103,7 +98,7 @@ public class TaskExecuteThreadPool extends ThreadPoolTaskExecutor {
     }
 
     public void executeEvent(TaskExecuteRunnable taskExecuteThread) {
-        if (taskExecuteThread.eventSize() == 0) {
+        if (taskExecuteThread.isEmpty()) {
             return;
         }
         if (multiThreadFilterMap.containsKey(taskExecuteThread.getKey())) {
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
index 334df5baa3..e88d84312c 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
@@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.server.master.runner;
 
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.SlotCheckState;
-import org.apache.dolphinscheduler.service.expand.CuringParamsService;
 import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
 import org.apache.dolphinscheduler.common.thread.Stopper;
 import org.apache.dolphinscheduler.common.thread.ThreadUtils;
@@ -36,6 +35,7 @@ import org.apache.dolphinscheduler.server.master.metrics.MasterServerMetrics;
 import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics;
 import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
 import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
+import org.apache.dolphinscheduler.service.expand.CuringParamsService;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 
 import org.apache.commons.collections4.CollectionUtils;
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
index a56a7d8c5a..c70fb1f1d4 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
@@ -18,7 +18,7 @@
 package org.apache.dolphinscheduler.server.master.runner;
 
 import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.common.enums.StateEvent;
+import org.apache.dolphinscheduler.server.master.event.StateEvent;
 import org.apache.dolphinscheduler.common.enums.StateEventType;
 import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
 import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index 7fd23ba42c..fdec38e65e 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -35,16 +35,14 @@ import org.apache.dolphinscheduler.common.enums.CommandType;
 import org.apache.dolphinscheduler.common.enums.FailureStrategy;
 import org.apache.dolphinscheduler.common.enums.Flag;
 import org.apache.dolphinscheduler.common.enums.Priority;
-import org.apache.dolphinscheduler.common.enums.StateEvent;
 import org.apache.dolphinscheduler.common.enums.StateEventType;
 import org.apache.dolphinscheduler.common.enums.TaskDependType;
 import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
-import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
-import org.apache.dolphinscheduler.service.expand.CuringParamsService;
 import org.apache.dolphinscheduler.common.graph.DAG;
 import org.apache.dolphinscheduler.common.model.TaskNode;
 import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
 import org.apache.dolphinscheduler.common.process.ProcessDag;
+import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.common.utils.DateUtils;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.common.utils.LoggerUtils;
@@ -63,20 +61,23 @@ import org.apache.dolphinscheduler.dao.utils.DagHelper;
 import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
 import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
 import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
-import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
 import org.apache.dolphinscheduler.plugin.task.api.model.Property;
-import org.apache.dolphinscheduler.plugin.task.api.parameters.BlockingParameters;
 import org.apache.dolphinscheduler.remote.command.HostUpdateCommand;
 import org.apache.dolphinscheduler.remote.utils.Host;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
-import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics;
+import org.apache.dolphinscheduler.server.master.event.StateEvent;
+import org.apache.dolphinscheduler.server.master.event.StateEventHandleError;
+import org.apache.dolphinscheduler.server.master.event.StateEventHandleException;
+import org.apache.dolphinscheduler.server.master.event.StateEventHandler;
+import org.apache.dolphinscheduler.server.master.event.StateEventHandlerManager;
 import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics;
 import org.apache.dolphinscheduler.server.master.runner.task.ITaskProcessor;
 import org.apache.dolphinscheduler.server.master.runner.task.TaskAction;
 import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory;
 import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
 import org.apache.dolphinscheduler.service.corn.CronUtils;
+import org.apache.dolphinscheduler.service.expand.CuringParamsService;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue;
 
@@ -283,16 +284,37 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
         if (!isStart) {
             return;
         }
+        StateEvent stateEvent = null;
         while (!this.stateEvents.isEmpty()) {
             try {
-                StateEvent stateEvent = this.stateEvents.peek();
-                LoggerUtils.setWorkflowAndTaskInstanceIDMDC(stateEvent.getProcessInstanceId(), stateEvent.getTaskInstanceId());
-                if (stateEventHandler(stateEvent)) {
+                stateEvent = this.stateEvents.peek();
+                LoggerUtils.setWorkflowAndTaskInstanceIDMDC(stateEvent.getProcessInstanceId(),
+                    stateEvent.getTaskInstanceId());
+                // if state handle success then will remove this state, otherwise will retry this state next time.
+                // The state should always handle success except database error.
+                checkProcessInstance(stateEvent);
+
+                StateEventHandler stateEventHandler =
+                    StateEventHandlerManager.getStateEventHandler(stateEvent.getType())
+                        .orElseThrow(() -> new StateEventHandleError("Cannot find handler for the given state event"));
+                if (stateEventHandler.handleStateEvent(this, stateEvent)) {
                     this.stateEvents.remove(stateEvent);
                 }
+            } catch (StateEventHandleError stateEventHandleError) {
+                logger.error("State event handle error, will remove this event: {}", stateEvent, stateEventHandleError);
+                this.stateEvents.remove(stateEvent);
+                ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
+            } catch (StateEventHandleException stateEventHandleException) {
+                logger.error("State event handle error, will retry this event: {}",
+                    stateEvent,
+                    stateEventHandleException);
+                ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
             } catch (Exception e) {
                 // we catch the exception here, since if the state event handle failed, the state event will still keep in the stateEvents queue.
-                logger.error("state handle error:", e);
+                logger.error("State event handle error, get a unknown exception, will retry this event: {}",
+                    stateEvent,
+                    e);
+                ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
             } finally {
                 LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
             }
@@ -330,58 +352,14 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
         return this.processInstance;
     }
 
-    private boolean stateEventHandler(StateEvent stateEvent) {
-        logger.info("process event: {}", stateEvent);
-
-        if (!checkProcessInstance(stateEvent)) {
-            return false;
-        }
-
-        boolean result = false;
-        switch (stateEvent.getType()) {
-            case PROCESS_STATE_CHANGE:
-                measureProcessState(stateEvent);
-                result = processStateChangeHandler(stateEvent);
-                break;
-            case TASK_STATE_CHANGE:
-                measureTaskState(stateEvent);
-                result = taskStateChangeHandler(stateEvent);
-                break;
-            case PROCESS_TIMEOUT:
-                ProcessInstanceMetrics.incProcessInstanceTimeout();
-                result = processTimeout();
-                break;
-            case TASK_TIMEOUT:
-                TaskMetrics.incTaskTimeout();
-                result = taskTimeout(stateEvent);
-                break;
-            case WAIT_TASK_GROUP:
-                result = checkForceStartAndWakeUp(stateEvent);
-                break;
-            case TASK_RETRY:
-                TaskMetrics.incTaskRetry();
-                result = taskRetryEventHandler(stateEvent);
-                break;
-            case PROCESS_BLOCKED:
-                result = processBlockHandler(stateEvent);
-                break;
-            default:
-                break;
-        }
-
-        if (result) {
-            this.stateEvents.remove(stateEvent);
-        }
-        return result;
-    }
-
-    private boolean checkForceStartAndWakeUp(StateEvent stateEvent) {
+    public boolean checkForceStartAndWakeUp(StateEvent stateEvent) {
         TaskGroupQueue taskGroupQueue = this.processService.loadTaskGroupQueue(stateEvent.getTaskInstanceId());
         if (taskGroupQueue.getForceStart() == Flag.YES.getCode()) {
             TaskInstance taskInstance = this.processService.findTaskInstanceById(stateEvent.getTaskInstanceId());
             ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(taskInstance.getTaskCode());
             taskProcessor.action(TaskAction.DISPATCH);
-            this.processService.updateTaskGroupQueueStatus(taskGroupQueue.getTaskId(), TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode());
+            this.processService.updateTaskGroupQueueStatus(taskGroupQueue.getTaskId(),
+                TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode());
             return true;
         }
         if (taskGroupQueue.getInQueue() == Flag.YES.getCode()) {
@@ -396,76 +374,17 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
         return false;
     }
 
-    private boolean taskTimeout(StateEvent stateEvent) {
-        if (!checkTaskInstanceByStateEvent(stateEvent)) {
-            return true;
-        }
-
-        TaskInstance taskInstance = taskInstanceMap.get(stateEvent.getTaskInstanceId());
-        if (TimeoutFlag.CLOSE == taskInstance.getTaskDefine().getTimeoutFlag()) {
-            return true;
-        }
-        TaskTimeoutStrategy taskTimeoutStrategy = taskInstance.getTaskDefine().getTimeoutNotifyStrategy();
-        if (TaskTimeoutStrategy.FAILED == taskTimeoutStrategy || TaskTimeoutStrategy.WARNFAILED == taskTimeoutStrategy) {
-            ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(taskInstance.getTaskCode());
-            taskProcessor.action(TaskAction.TIMEOUT);
-        }
-        if (TaskTimeoutStrategy.WARN == taskTimeoutStrategy || TaskTimeoutStrategy.WARNFAILED == taskTimeoutStrategy) {
-            ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(processInstance.getId());
-            processAlertManager.sendTaskTimeoutAlert(processInstance, taskInstance, projectUser);
-        }
-        return true;
-    }
-
-    private boolean processTimeout() {
+    public void processTimeout() {
         ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(processInstance.getId());
         this.processAlertManager.sendProcessTimeoutAlert(this.processInstance, projectUser);
-        return true;
     }
 
-    private boolean taskStateChangeHandler(StateEvent stateEvent) {
-        if (!checkTaskInstanceByStateEvent(stateEvent)) {
-            return true;
-        }
-
-        Optional<TaskInstance> taskInstanceOptional = getTaskInstance(stateEvent.getTaskInstanceId());
-        TaskInstance task = taskInstanceOptional.orElseThrow(
-                () -> new RuntimeException("Cannot find task instance by task instance id: " + stateEvent.getTaskInstanceId()));
-
-        if (task.getState() == null) {
-            logger.error("task state is null, state handler error: {}", stateEvent);
-            return true;
-        }
-
-        if (task.getState().typeIsFinished()) {
-            if (completeTaskMap.containsKey(task.getTaskCode()) && completeTaskMap.get(task.getTaskCode()) == task.getId()) {
-                logger.warn("The task instance is already complete, stateEvent: {}", stateEvent);
-                return true;
-            }
-            taskFinished(task);
-            if (task.getTaskGroupId() > 0) {
-                releaseTaskGroup(task);
-            }
-            return true;
-        }
-        if (activeTaskProcessorMaps.containsKey(task.getTaskCode())) {
-            ITaskProcessor iTaskProcessor = activeTaskProcessorMaps.get(task.getTaskCode());
-            iTaskProcessor.action(TaskAction.RUN);
-
-            if (iTaskProcessor.taskInstance().getState().typeIsFinished()) {
-                if (iTaskProcessor.taskInstance().getState() != task.getState()) {
-                    task.setState(iTaskProcessor.taskInstance().getState());
-                }
-                taskFinished(task);
-            }
-            return true;
-        }
-        logger.error("state handler error: {}", stateEvent);
-
-        return true;
+    public void taskTimeout(TaskInstance taskInstance) {
+        ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(processInstance.getId());
+        processAlertManager.sendTaskTimeoutAlert(processInstance, taskInstance, projectUser);
     }
 
-    private void taskFinished(TaskInstance taskInstance) {
+    public void taskFinished(TaskInstance taskInstance) throws StateEventHandleException {
         logger.info("TaskInstance finished task code:{} state:{} ",
             taskInstance.getTaskCode(),
             taskInstance.getState());
@@ -512,7 +431,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
      *
      * @param taskInstance
      */
-    private void releaseTaskGroup(TaskInstance taskInstance) {
+    public void releaseTaskGroup(TaskInstance taskInstance) {
         if (taskInstance.getTaskGroupId() > 0) {
             TaskInstance nextTaskInstance = this.processService.releaseTaskGroup(taskInstance);
             if (nextTaskInstance != null) {
@@ -536,13 +455,15 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
      *
      * @param taskInstance
      */
-    private void retryTaskInstance(TaskInstance taskInstance) {
+    private void retryTaskInstance(TaskInstance taskInstance) throws StateEventHandleException {
         if (!taskInstance.taskCanRetry()) {
             return;
         }
         TaskInstance newTaskInstance = cloneRetryTaskInstance(taskInstance);
         if (newTaskInstance == null) {
-            logger.error("retry fail, new taskInstance is null, task code:{}, task id:{}", taskInstance.getTaskCode(), taskInstance.getId());
+            logger.error("retry fail, new taskInstance is null, task code:{}, task id:{}",
+                taskInstance.getTaskCode(),
+                taskInstance.getId());
             return;
         }
         waitToRetryTaskInstanceMap.put(newTaskInstance.getTaskCode(), newTaskInstance);
@@ -563,20 +484,6 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
         }
     }
 
-    /**
-     * handle task retry event
-     *
-     * @param stateEvent
-     * @return
-     */
-    private boolean taskRetryEventHandler(StateEvent stateEvent) {
-        TaskInstance taskInstance = waitToRetryTaskInstanceMap.get(stateEvent.getTaskCode());
-        addTaskToStandByList(taskInstance);
-        submitStandByTask();
-        waitToRetryTaskInstanceMap.remove(stateEvent.getTaskCode());
-        return true;
-    }
-
     /**
      * update process instance
      */
@@ -610,45 +517,23 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
     /**
      * check process instance by state event
      */
-    public boolean checkProcessInstance(StateEvent stateEvent) {
+    public void checkProcessInstance(StateEvent stateEvent) throws StateEventHandleError {
         if (this.processInstance.getId() != stateEvent.getProcessInstanceId()) {
-            logger.error("mismatch process instance id: {}, state event:{}",
-                    this.processInstance.getId(),
-                    stateEvent);
-            return false;
+            throw new StateEventHandleError("The event doesn't contains process instance id");
         }
-        return true;
     }
 
     /**
      * check if task instance exist by state event
      */
-    public boolean checkTaskInstanceByStateEvent(StateEvent stateEvent) {
+    public void checkTaskInstanceByStateEvent(StateEvent stateEvent) throws StateEventHandleError {
         if (stateEvent.getTaskInstanceId() == 0) {
-            logger.error("task instance id null, state event:{}", stateEvent);
-            return false;
+            throw new StateEventHandleError("The taskInstanceId is 0");
         }
 
         if (!taskInstanceMap.containsKey(stateEvent.getTaskInstanceId())) {
-            logger.error("mismatch task instance id, event:{}", stateEvent);
-            return false;
+            throw new StateEventHandleError("Cannot find the taskInstance from taskInstanceMap");
         }
-        return true;
-    }
-
-    /**
-     * check if task instance exist by task code
-     */
-    public boolean checkTaskInstanceByCode(long taskCode) {
-        if (taskInstanceMap.isEmpty()) {
-            return false;
-        }
-        for (TaskInstance taskInstance : taskInstanceMap.values()) {
-            if (taskInstance.getTaskCode() == taskCode) {
-                return true;
-            }
-        }
-        return false;
     }
 
     /**
@@ -697,58 +582,13 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
         return Optional.empty();
     }
 
-    private boolean processStateChangeHandler(StateEvent stateEvent) {
-        try {
-            logger.info("process:{} state {} change to {}", processInstance.getId(), processInstance.getState(), stateEvent.getExecutionStatus());
-
-            if (stateEvent.getExecutionStatus() == ExecutionStatus.STOP) {
-                // serial wait execution type needs to wake up the waiting process
-                if (processDefinition.getExecutionType().typeIsSerialWait() || processDefinition.getExecutionType().typeIsSerialPriority()) {
-                    endProcess();
-                    return true;
-                }
-                this.updateProcessInstanceState(stateEvent);
-                return true;
-            }
-            if (processComplementData()) {
-                return true;
-            }
-            if (stateEvent.getExecutionStatus().typeIsFinished()) {
-                endProcess();
-            }
-            if (processInstance.getState() == ExecutionStatus.READY_STOP) {
-                killAllTasks();
-            }
-            return true;
-        } catch (Exception e) {
-            logger.error("process state change error:", e);
-        }
-        return true;
-    }
-
-    private boolean processBlockHandler(StateEvent stateEvent) {
-        try {
-            Optional<TaskInstance> taskInstanceOptional = getTaskInstance(stateEvent.getTaskInstanceId());
-            TaskInstance task = taskInstanceOptional.orElseThrow(
-                    () -> new RuntimeException("Cannot find taskInstance by taskInstanceId:" + stateEvent.getTaskInstanceId()));
-            if (!checkTaskInstanceByStateEvent(stateEvent)) {
-                logger.error("task {} is not a blocking task", task.getTaskCode());
-                return false;
-            }
-
-            BlockingParameters parameters = JSONUtils.parseObject(task.getTaskParams(), BlockingParameters.class);
-            if (parameters.isAlertWhenBlocking()) {
-                ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(processInstance.getId());
-                processAlertManager.sendProcessBlockingAlert(processInstance, projectUser);
-                logger.info("processInstance {} block alert send successful!", processInstance.getId());
-            }
-        } catch (Exception e) {
-            logger.error("sending blocking message error:", e);
-        }
-        return true;
+    public void processBlock() {
+        ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(processInstance.getId());
+        processAlertManager.sendProcessBlockingAlert(processInstance, projectUser);
+        logger.info("processInstance {} block alert send successful!", processInstance.getId());
     }
 
-    private boolean processComplementData() throws Exception {
+    public boolean processComplementData() {
         if (!needComplementProcess()) {
             return false;
         }
@@ -946,7 +786,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
     /**
      * init task queue
      */
-    private void initTaskQueue() {
+    private void initTaskQueue() throws StateEventHandleException {
 
         taskFailedSubmit = false;
         activeTaskProcessorMaps.clear();
@@ -955,7 +795,8 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
         errorTaskMap.clear();
 
         if (!isNewProcessInstance()) {
-            List<TaskInstance> validTaskInstanceList = processService.findValidTaskListByProcessId(processInstance.getId());
+            List<TaskInstance> validTaskInstanceList =
+                processService.findValidTaskListByProcessId(processInstance.getId());
             for (TaskInstance task : validTaskInstanceList) {
                 if (validTaskMap.containsKey(task.getTaskCode())) {
                     int oldTaskInstanceId = validTaskMap.get(task.getTaskCode());
@@ -965,7 +806,8 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
                         processService.updateTaskInstance(task);
                         continue;
                     }
-                    logger.warn("have same taskCode taskInstance when init task queue, taskCode:{}", task.getTaskCode());
+                    logger.warn("have same taskCode taskInstance when init task queue, taskCode:{}",
+                        task.getTaskCode());
                 }
 
                 validTaskMap.put(task.getTaskCode(), task.getId());
@@ -1113,6 +955,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
             Host host = new Host(taskInstance.getHost());
             nettyExecutorManager.doExecute(host, hostUpdateCommand.convert2Command());
         } catch (Exception e) {
+            // Do we need to catch this exception?
             logger.error("notify process host update", e);
         }
     }
@@ -1366,8 +1209,9 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
         return validTaskInstanceList;
     }
 
-    private void submitPostNode(String parentNodeCode) {
-        Set<String> submitTaskNodeList = DagHelper.parsePostNodes(parentNodeCode, skipTaskNodeMap, dag, getCompleteTaskInstanceMap());
+    private void submitPostNode(String parentNodeCode) throws StateEventHandleException {
+        Set<String> submitTaskNodeList =
+            DagHelper.parsePostNodes(parentNodeCode, skipTaskNodeMap, dag, getCompleteTaskInstanceMap());
         List<TaskInstance> taskInstances = new ArrayList<>();
         for (String taskNode : submitTaskNodeList) {
             TaskNode taskNodeObject = dag.getNode(taskNode);
@@ -1710,34 +1554,19 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
             return true;
         }
 
-        try {
-            Map<String, String> cmdParam = JSONUtils.toMap(processInstance.getCommandParam());
-            Date endTime = DateUtils.getScheduleDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE));
-            return processInstance.getScheduleTime().equals(endTime);
-        } catch (Exception e) {
-            logger.error("complement end failed ", e);
-            return false;
-        }
+        Map<String, String> cmdParam = JSONUtils.toMap(processInstance.getCommandParam());
+        Date endTime = DateUtils.getScheduleDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE));
+        return processInstance.getScheduleTime().equals(endTime);
     }
 
     /**
      * updateProcessInstance process instance state
      * after each batch of tasks is executed, the status of the process instance is updated
      */
-    private void updateProcessInstanceState() {
+    private void updateProcessInstanceState() throws StateEventHandleException {
         ExecutionStatus state = getProcessInstanceState(processInstance);
         if (processInstance.getState() != state) {
-            logger.info(
-                    "work flow process instance [id: {}, name:{}], state change from {} to {}, cmd type: {}",
-                    processInstance.getId(), processInstance.getName(),
-                    processInstance.getState(), state,
-                    processInstance.getCommandType());
-
-            processInstance.setState(state);
-            if (state.typeIsFinished()) {
-                processInstance.setEndTime(new Date());
-            }
-            processService.updateProcessInstance(processInstance);
+            updateWorkflowInstanceStatesToDB(state);
 
             StateEvent stateEvent = new StateEvent();
             stateEvent.setExecutionStatus(processInstance.getState());
@@ -1752,20 +1581,33 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
     /**
      * stateEvent's execution status as process instance state
      */
-    private void updateProcessInstanceState(StateEvent stateEvent) {
+    public void updateProcessInstanceState(StateEvent stateEvent) throws StateEventHandleException {
         ExecutionStatus state = stateEvent.getExecutionStatus();
-        if (processInstance.getState() != state) {
-            logger.info(
-                    "work flow process instance [id: {}, name:{}], state change from {} to {}, cmd type: {}",
-                    processInstance.getId(), processInstance.getName(),
-                    processInstance.getState(), state,
-                    processInstance.getCommandType());
-
-            processInstance.setState(state);
-            if (state.typeIsFinished()) {
+        updateWorkflowInstanceStatesToDB(state);
+    }
+
+    private void updateWorkflowInstanceStatesToDB(ExecutionStatus newStates) throws StateEventHandleException {
+        ExecutionStatus originStates = processInstance.getState();
+        if (originStates != newStates) {
+            logger.info("work flow process instance [id: {}, name:{}], state change from {} to {}, cmd type: {}",
+                processInstance.getId(),
+                processInstance.getName(),
+                originStates,
+                newStates,
+                processInstance.getCommandType());
+
+            processInstance.setState(newStates);
+            if (newStates.typeIsFinished()) {
                 processInstance.setEndTime(new Date());
             }
-            processService.updateProcessInstance(processInstance);
+            try {
+                processService.updateProcessInstance(processInstance);
+            } catch (Exception ex) {
+                // recover the status
+                processInstance.setState(originStates);
+                processInstance.setEndTime(null);
+                throw new StateEventHandleException("Update process instance status to DB error", ex);
+            }
         }
     }
 
@@ -1784,19 +1626,17 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
      *
      * @param taskInstance task instance
      */
-    private void addTaskToStandByList(TaskInstance taskInstance) {
-        try {
-            if (readyToSubmitTaskQueue.contains(taskInstance)) {
-                logger.warn("task was found in ready submit queue, task code:{}", taskInstance.getTaskCode());
-                return;
-            }
-            logger.info("add task to stand by list, task name:{}, task id:{}, task code:{}",
-                    taskInstance.getName(), taskInstance.getId(), taskInstance.getTaskCode());
-            TaskMetrics.incTaskSubmit();
-            readyToSubmitTaskQueue.put(taskInstance);
-        } catch (Exception e) {
-            logger.error("add task instance to readyToSubmitTaskQueue, taskName:{}, task id:{}", taskInstance.getName(), taskInstance.getId(), e);
+    public void addTaskToStandByList(TaskInstance taskInstance) {
+        if (readyToSubmitTaskQueue.contains(taskInstance)) {
+            logger.warn("task was found in ready submit queue, task code:{}", taskInstance.getTaskCode());
+            return;
         }
+        logger.info("add task to stand by list, task name:{}, task id:{}, task code:{}",
+            taskInstance.getName(),
+            taskInstance.getId(),
+            taskInstance.getTaskCode());
+        TaskMetrics.incTaskSubmit();
+        readyToSubmitTaskQueue.put(taskInstance);
     }
 
     /**
@@ -1805,13 +1645,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
      * @param taskInstance task instance
      */
     private void removeTaskFromStandbyList(TaskInstance taskInstance) {
-        try {
-            readyToSubmitTaskQueue.remove(taskInstance);
-        } catch (Exception e) {
-            logger.error("remove task instance from readyToSubmitTaskQueue error, task id:{}, Name: {}",
-                    taskInstance.getId(),
-                    taskInstance.getName(), e);
-        }
+        readyToSubmitTaskQueue.remove(taskInstance);
     }
 
     /**
@@ -1831,9 +1665,10 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
     /**
      * close the on going tasks
      */
-    private void killAllTasks() {
-        logger.info("kill called on process instance id: {}, num: {}", processInstance.getId(),
-                activeTaskProcessorMaps.size());
+    public void killAllTasks() {
+        logger.info("kill called on process instance id: {}, num: {}",
+            processInstance.getId(),
+            activeTaskProcessorMaps.size());
 
         if (readyToSubmitTaskQueue.size() > 0) {
             readyToSubmitTaskQueue.clear();
@@ -1868,7 +1703,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
     /**
      * handling the list of tasks to be submitted
      */
-    private void submitStandByTask() {
+    public void submitStandByTask() throws StateEventHandleException {
         int length = readyToSubmitTaskQueue.size();
         for (int i = 0; i < length; i++) {
             TaskInstance task = readyToSubmitTaskQueue.peek();
@@ -2027,6 +1862,19 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
             throw new Exception("resubmit error, taskProcessor is null, task code: " + taskCode);
         }
     }
+
+    public Map<Long, Integer> getCompleteTaskMap() {
+        return completeTaskMap;
+    }
+
+    public Map<Long, ITaskProcessor> getActiveTaskProcessMap() {
+        return activeTaskProcessorMaps;
+    }
+
+    public Map<Long, TaskInstance> getWaitToRetryTaskInstanceMap() {
+        return waitToRetryTaskInstanceMap;
+    }
+
     private void setGlobalParamIfCommanded(ProcessDefinition processDefinition, Map<String, String> cmdParam) {
         // get start params from command param
         Map<String, String> startParamMap = new HashMap<>();
@@ -2061,46 +1909,4 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
         }
     }
 
-    private void measureProcessState(StateEvent processStateEvent) {
-        if (processStateEvent.getExecutionStatus().typeIsFinished()) {
-            ProcessInstanceMetrics.incProcessInstanceFinish();
-        }
-        switch (processStateEvent.getExecutionStatus()) {
-            case STOP:
-                ProcessInstanceMetrics.incProcessInstanceStop();
-                break;
-            case SUCCESS:
-                ProcessInstanceMetrics.incProcessInstanceSuccess();
-                break;
-            case FAILURE:
-                ProcessInstanceMetrics.incProcessInstanceFailure();
-                break;
-            default:
-                break;
-        }
-    }
-
-    private void measureTaskState(StateEvent taskStateEvent) {
-        if (taskStateEvent == null || taskStateEvent.getExecutionStatus() == null) {
-            // the event is broken
-            logger.warn("The task event is broken..., taskEvent: {}", taskStateEvent);
-            return;
-        }
-        if (taskStateEvent.getExecutionStatus().typeIsFinished()) {
-            TaskMetrics.incTaskFinish();
-        }
-        switch (taskStateEvent.getExecutionStatus()) {
-            case STOP:
-                TaskMetrics.incTaskStop();
-                break;
-            case SUCCESS:
-                TaskMetrics.incTaskSuccess();
-                break;
-            case FAILURE:
-                TaskMetrics.incTaskFailure();
-                break;
-            default:
-                break;
-        }
-    }
 }
\ No newline at end of file
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
index 4085e99051..ce66318967 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
@@ -18,7 +18,7 @@
 package org.apache.dolphinscheduler.server.master.runner;
 
 import org.apache.dolphinscheduler.common.enums.Flag;
-import org.apache.dolphinscheduler.common.enums.StateEvent;
+import org.apache.dolphinscheduler.server.master.event.StateEvent;
 import org.apache.dolphinscheduler.common.enums.StateEventType;
 import org.apache.dolphinscheduler.common.utils.LoggerUtils;
 import org.apache.dolphinscheduler.common.utils.NetUtils;
@@ -30,7 +30,6 @@ import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
 import org.apache.dolphinscheduler.remote.utils.Host;
 import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
-import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 
 import java.util.Map;
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java
index b377ceedbf..2d4bdfe5da 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java
@@ -19,7 +19,7 @@ package org.apache.dolphinscheduler.server.master.service;
 
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.NodeType;
-import org.apache.dolphinscheduler.common.enums.StateEvent;
+import org.apache.dolphinscheduler.server.master.event.StateEvent;
 import org.apache.dolphinscheduler.common.enums.StateEventType;
 import org.apache.dolphinscheduler.common.model.Server;
 import org.apache.dolphinscheduler.common.utils.LoggerUtils;
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java
index fc9bcca3aa..98bf514730 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java
@@ -26,7 +26,7 @@ import static org.mockito.Mockito.doNothing;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.CommandType;
 import org.apache.dolphinscheduler.common.enums.NodeType;
-import org.apache.dolphinscheduler.common.enums.StateEvent;
+import org.apache.dolphinscheduler.server.master.event.StateEvent;
 import org.apache.dolphinscheduler.common.model.Server;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/NettyRemoteChannel.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/NettyRemoteChannel.java
index 247e4066f8..0d623e398a 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/NettyRemoteChannel.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/NettyRemoteChannel.java
@@ -17,12 +17,13 @@
 
 package org.apache.dolphinscheduler.remote.processor;
 
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
 import org.apache.dolphinscheduler.remote.utils.Host;
 
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+
 /**
  *  callback channel
  */
@@ -72,7 +73,7 @@ public class NettyRemoteChannel {
         return this.channel.isActive();
     }
 
-    public ChannelFuture writeAndFlush(Command command){
+    public ChannelFuture writeAndFlush(Command command) {
         return this.channel.writeAndFlush(command);
     }
 
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/StateEventCallbackService.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/StateEventCallbackService.java
index fb2b411523..be564261fb 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/StateEventCallbackService.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/StateEventCallbackService.java
@@ -24,6 +24,7 @@ import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
 import org.apache.dolphinscheduler.remote.utils.Host;
 
+import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.slf4j.Logger;
@@ -71,19 +72,19 @@ public class StateEventCallbackService {
      * @param host
      * @return callback channel
      */
-    private NettyRemoteChannel newRemoteChannel(Host host) {
+    private Optional<NettyRemoteChannel> newRemoteChannel(Host host) {
         Channel newChannel;
         NettyRemoteChannel nettyRemoteChannel = REMOTE_CHANNELS.get(host.getAddress());
         if (nettyRemoteChannel != null) {
             if (nettyRemoteChannel.isActive()) {
-                return nettyRemoteChannel;
+                return Optional.of(nettyRemoteChannel);
             }
         }
         newChannel = nettyRemotingClient.getChannel(host);
         if (newChannel != null) {
-            return newRemoteChannel(newChannel, host.getAddress());
+            return Optional.of(newRemoteChannel(newChannel, host.getAddress()));
         }
-        return null;
+        return Optional.empty();
     }
 
     public long pause(int ntries) {
@@ -110,24 +111,26 @@ public class StateEventCallbackService {
     }
 
     /**
-     * send result
+     * Send the command to target address, this method doesn't guarantee the command send success.
      *
-     * @param command command
+     * @param command command need tp send
      */
     public void sendResult(String address, int port, Command command) {
         logger.info("send result, host:{}, command:{}", address, command.toString());
         Host host = new Host(address, port);
-        NettyRemoteChannel nettyRemoteChannel = newRemoteChannel(host);
-        if (nettyRemoteChannel != null) {
-            nettyRemoteChannel.writeAndFlush(command);
-        }
+        sendResult(host, command);
     }
 
+    /**
+     * Send the command to target host, this method doesn't guarantee the command send success.
+     *
+     * @param host    target host
+     * @param command command need to send
+     */
     public void sendResult(Host host, Command command) {
         logger.info("send result, host:{}, command:{}", host.getAddress(), command.toString());
-        NettyRemoteChannel nettyRemoteChannel = newRemoteChannel(host);
-        if (nettyRemoteChannel != null) {
+        newRemoteChannel(host).ifPresent(nettyRemoteChannel -> {
             nettyRemoteChannel.writeAndFlush(command);
-        }
+        });
     }
 }
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java
index cc7f39e402..0cf03abe8d 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java
@@ -51,10 +51,9 @@ public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue<TaskInst
      * put task instance to priority queue
      *
      * @param taskInstance taskInstance
-     * @throws TaskPriorityQueueException
      */
     @Override
-    public void put(TaskInstance taskInstance) throws TaskPriorityQueueException {
+    public void put(TaskInstance taskInstance) {
         Preconditions.checkNotNull(taskInstance);
         queue.add(taskInstance);
         taskInstanceIdentifySet.add(getTaskInstanceIdentify(taskInstance));
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueue.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueue.java
index 1325afe9b2..0007581d3c 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueue.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueue.java
@@ -33,7 +33,7 @@ public interface TaskPriorityQueue<T> {
      * @param taskInfo taskInfo
      * @throws TaskPriorityQueueException
      */
-    void put(T taskInfo) throws TaskPriorityQueueException;
+    void put(T taskInfo);
 
     /**
      * take taskInfo
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java
index fef5f8ff79..0d0eebe03d 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java
@@ -40,10 +40,9 @@ public class TaskPriorityQueueImpl implements TaskPriorityQueue<TaskPriority> {
      * put task takePriorityInfo
      *
      * @param taskPriorityInfo takePriorityInfo
-     * @throws TaskPriorityQueueException
      */
     @Override
-    public void put(TaskPriority taskPriorityInfo) throws TaskPriorityQueueException {
+    public void put(TaskPriority taskPriorityInfo) {
         queue.put(taskPriorityInfo);
     }
 
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteResponseAckProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteResponseAckProcessor.java
index e080842c34..b62770984a 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteResponseAckProcessor.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteResponseAckProcessor.java
@@ -58,7 +58,13 @@ public class TaskExecuteResponseAckProcessor implements NettyRequestProcessor {
         if (taskExecuteResponseAckCommand.getStatus() == ExecutionStatus.SUCCESS.getCode()) {
             ResponseCache.get().removeResponseCache(taskExecuteResponseAckCommand.getTaskInstanceId());
             TaskCallbackService.remove(taskExecuteResponseAckCommand.getTaskInstanceId());
-            logger.debug("remove REMOTE_CHANNELS, task instance id:{}", taskExecuteResponseAckCommand.getTaskInstanceId());
+            logger.debug("remove REMOTE_CHANNELS, task instance id:{}",
+                taskExecuteResponseAckCommand.getTaskInstanceId());
+        } else if (taskExecuteResponseAckCommand.getStatus() == ExecutionStatus.FAILURE.getCode()) {
+            // master handle worker response error, will still retry
+        } else {
+            throw new IllegalArgumentException("Invalid task execute response ack status: "
+                + taskExecuteResponseAckCommand.getStatus());
         }
     }