You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by we...@apache.org on 2022/07/04 14:08:20 UTC

[dolphinscheduler] branch dev updated: [Fix-10666] Workflow submit failed will still in memory and never retry (#10667)

This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 35a10d092f [Fix-10666] Workflow submit failed will still in memory and never retry (#10667)
35a10d092f is described below

commit 35a10d092f566c07137da5fb67b21cee644cdc8f
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Mon Jul 4 22:08:15 2022 +0800

    [Fix-10666] Workflow submit failed will still in memory and never retry (#10667)
    
    * Workflow submit failed will still in memory and never retry
---
 .../server/master/exception/MasterException.java   |  29 ++++
 .../master/metrics/ProcessInstanceMetrics.java     |   6 +
 .../master/runner/MasterSchedulerService.java      | 161 ++++++++++++------
 .../master/runner/StateWheelExecuteThread.java     |  18 +-
 .../master/runner/WorkflowExecuteRunnable.java     | 184 ++++++++++-----------
 .../server/master/runner/WorkflowSubmitStatue.java |  34 ++++
 .../WorkflowExecuteRunnableTest.java}              |  18 +-
 .../service/process/ProcessService.java            |   2 +-
 8 files changed, 288 insertions(+), 164 deletions(-)

diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/exception/MasterException.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/exception/MasterException.java
new file mode 100644
index 0000000000..eb5d5e8dfe
--- /dev/null
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/exception/MasterException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.exception;
+
+public class MasterException extends Exception {
+
+    public MasterException(String message) {
+        super(message);
+    }
+
+    public MasterException(String message, Throwable throwable) {
+        super(message, throwable);
+    }
+}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/ProcessInstanceMetrics.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/ProcessInstanceMetrics.java
index ad0e479e5b..8edf3f0c86 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/ProcessInstanceMetrics.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/ProcessInstanceMetrics.java
@@ -90,6 +90,12 @@ public final class ProcessInstanceMetrics {
             .register(Metrics.globalRegistry);
     }
 
+    public static synchronized void registerProcessInstanceResubmitGauge(Supplier<Number> function) {
+        Gauge.builder("ds.workflow.instance.resubmit", function)
+            .description("The current process instance need to resubmit count")
+            .register(Metrics.globalRegistry);
+    }
+
     public static void incProcessInstanceSubmit() {
         PROCESS_INSTANCE_SUBMIT_COUNTER.increment();
     }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
index 3254d6c5b5..334df5baa3 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
@@ -28,11 +28,10 @@ import org.apache.dolphinscheduler.common.utils.NetUtils;
 import org.apache.dolphinscheduler.common.utils.OSUtils;
 import org.apache.dolphinscheduler.dao.entity.Command;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
-import org.apache.dolphinscheduler.remote.NettyRemotingClient;
-import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
 import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
+import org.apache.dolphinscheduler.server.master.exception.MasterException;
 import org.apache.dolphinscheduler.server.master.metrics.MasterServerMetrics;
 import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics;
 import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
@@ -44,7 +43,9 @@ import org.apache.commons.collections4.CollectionUtils;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 
 import org.slf4j.Logger;
@@ -52,6 +53,8 @@ import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
+import lombok.NonNull;
+
 /**
  * Master scheduler thread, this thread will consume the commands from database and trigger processInstance executed.
  */
@@ -72,8 +75,6 @@ public class MasterSchedulerService extends BaseDaemonThread {
     @Autowired
     private ProcessAlertManager processAlertManager;
 
-    private NettyRemotingClient nettyRemotingClient;
-
     @Autowired
     private NettyExecutorManager nettyExecutorManager;
 
@@ -97,6 +98,10 @@ public class MasterSchedulerService extends BaseDaemonThread {
     @Autowired
     private CuringParamsService curingGlobalParamsService;
 
+    private final LinkedBlockingQueue<ProcessInstance> submitFailedProcessInstances = new LinkedBlockingQueue<>();
+
+    private Thread failedProcessInstanceResubmitThread;
+
     private String masterAddress;
 
     protected MasterSchedulerService() {
@@ -108,22 +113,23 @@ public class MasterSchedulerService extends BaseDaemonThread {
      */
     public void init() {
         this.masterPrepareExecService = (ThreadPoolExecutor) ThreadUtils.newDaemonFixedThreadExecutor("MasterPreExecThread", masterConfig.getPreExecThreads());
-        NettyClientConfig clientConfig = new NettyClientConfig();
-        this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
         this.masterAddress = NetUtils.getAddr(masterConfig.getListenPort());
+        this.failedProcessInstanceResubmitThread = new FailedProcessInstanceResubmitThread(submitFailedProcessInstances);
+        ProcessInstanceMetrics.registerProcessInstanceResubmitGauge(submitFailedProcessInstances::size);
     }
 
     @Override
     public synchronized void start() {
         logger.info("Master schedule service starting..");
-        this.stateWheelExecuteThread.start();
         super.start();
+        this.failedProcessInstanceResubmitThread.start();
         logger.info("Master schedule service started...");
     }
 
     public void close() {
         logger.info("Master schedule service stopping...");
-        nettyRemotingClient.close();
+        // these process instances will be failover, so we can safa clear here
+        submitFailedProcessInstances.clear();
         logger.info("Master schedule service stopped...");
     }
 
@@ -146,7 +152,9 @@ public class MasterSchedulerService extends BaseDaemonThread {
                 Thread.currentThread().interrupt();
                 break;
             } catch (Exception e) {
-                logger.error("Master schedule service loop command error", e);
+                logger.error("Master schedule workflow error", e);
+                // sleep for 1s here to avoid the database down cause the exception boom
+                ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
             }
         }
     }
@@ -154,7 +162,7 @@ public class MasterSchedulerService extends BaseDaemonThread {
     /**
      * Query command from database by slot, and transform to workflow instance, then submit to workflowExecuteThreadPool.
      */
-    private void scheduleWorkflow() throws InterruptedException {
+    private void scheduleWorkflow() throws InterruptedException, MasterException {
         List<Command> commands = findCommands();
         if (CollectionUtils.isEmpty(commands)) {
             // indicate that no command ,sleep for 1s
@@ -164,38 +172,53 @@ public class MasterSchedulerService extends BaseDaemonThread {
 
         List<ProcessInstance> processInstances = command2ProcessInstance(commands);
         if (CollectionUtils.isEmpty(processInstances)) {
+            // indicate that the command transform to processInstance error, sleep for 1s
+            Thread.sleep(Constants.SLEEP_TIME_MILLIS);
             return;
         }
         MasterServerMetrics.incMasterConsumeCommand(commands.size());
 
         for (ProcessInstance processInstance : processInstances) {
-            try {
-                LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
-                logger.info("Master schedule service starting workflow instance");
-                final WorkflowExecuteRunnable workflowExecuteRunnable = new WorkflowExecuteRunnable(
-                    processInstance
-                    , processService
-                    , nettyExecutorManager
-                    , processAlertManager
-                    , masterConfig
-                    , stateWheelExecuteThread
-                    , curingGlobalParamsService);
-
-                this.processInstanceExecCacheManager.cache(processInstance.getId(), workflowExecuteRunnable);
-                if (processInstance.getTimeout() > 0) {
-                    stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance);
-                }
-                ProcessInstanceMetrics.incProcessInstanceSubmit();
-                workflowExecuteThreadPool.submit(workflowExecuteRunnable);
-                logger.info("Master schedule service started workflow instance");
-
-            } catch (Exception ex) {
-                processInstanceExecCacheManager.removeByProcessInstanceId(processInstance.getId());
-                stateWheelExecuteThread.removeProcess4TimeoutCheck(processInstance.getId());
-                logger.info("Master submit workflow to thread pool failed, will remove workflow runnable from cache manager", ex);
-            } finally {
-                LoggerUtils.removeWorkflowInstanceIdMDC();
+            submitProcessInstance(processInstance);
+        }
+    }
+
+    private void submitProcessInstance(@NonNull ProcessInstance processInstance) {
+        try {
+            LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
+            logger.info("Master schedule service starting workflow instance");
+            final WorkflowExecuteRunnable workflowExecuteRunnable = new WorkflowExecuteRunnable(
+                processInstance
+                , processService
+                , nettyExecutorManager
+                , processAlertManager
+                , masterConfig
+                , stateWheelExecuteThread
+                , curingGlobalParamsService);
+
+            this.processInstanceExecCacheManager.cache(processInstance.getId(), workflowExecuteRunnable);
+            if (processInstance.getTimeout() > 0) {
+                stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance);
             }
+            ProcessInstanceMetrics.incProcessInstanceSubmit();
+            CompletableFuture<WorkflowSubmitStatue> workflowSubmitFuture = CompletableFuture.supplyAsync(
+                workflowExecuteRunnable::call, workflowExecuteThreadPool);
+            workflowSubmitFuture.thenAccept(workflowSubmitStatue -> {
+                if (WorkflowSubmitStatue.FAILED == workflowSubmitStatue) {
+                    // submit failed
+                    processInstanceExecCacheManager.removeByProcessInstanceId(processInstance.getId());
+                    stateWheelExecuteThread.removeProcess4TimeoutCheck(processInstance.getId());
+                    submitFailedProcessInstances.add(processInstance);
+                }
+            });
+            logger.info("Master schedule service started workflow instance");
+
+        } catch (Exception ex) {
+            processInstanceExecCacheManager.removeByProcessInstanceId(processInstance.getId());
+            stateWheelExecuteThread.removeProcess4TimeoutCheck(processInstance.getId());
+            logger.info("Master submit workflow to thread pool failed, will remove workflow runnable from cache manager", ex);
+        } finally {
+            LoggerUtils.removeWorkflowInstanceIdMDC();
         }
     }
 
@@ -237,23 +260,27 @@ public class MasterSchedulerService extends BaseDaemonThread {
         return processInstances;
     }
 
-    private List<Command> findCommands() {
-        long scheduleStartTime = System.currentTimeMillis();
-        int thisMasterSlot = ServerNodeManager.getSlot();
-        int masterCount = ServerNodeManager.getMasterSize();
-        if (masterCount <= 0) {
-            logger.warn("Master count: {} is invalid, the current slot: {}", masterCount, thisMasterSlot);
-            return Collections.emptyList();
-        }
-        int pageNumber = 0;
-        int pageSize = masterConfig.getFetchCommandNum();
-        final List<Command> result = processService.findCommandPageBySlot(pageSize, pageNumber, masterCount, thisMasterSlot);
-        if (CollectionUtils.isNotEmpty(result)) {
-            logger.info("Master schedule service loop command success, command size: {}, current slot: {}, total slot size: {}",
-                result.size(), thisMasterSlot, masterCount);
+    private List<Command> findCommands() throws MasterException {
+        try {
+            long scheduleStartTime = System.currentTimeMillis();
+            int thisMasterSlot = ServerNodeManager.getSlot();
+            int masterCount = ServerNodeManager.getMasterSize();
+            if (masterCount <= 0) {
+                logger.warn("Master count: {} is invalid, the current slot: {}", masterCount, thisMasterSlot);
+                return Collections.emptyList();
+            }
+            int pageNumber = 0;
+            int pageSize = masterConfig.getFetchCommandNum();
+            final List<Command> result = processService.findCommandPageBySlot(pageSize, pageNumber, masterCount, thisMasterSlot);
+            if (CollectionUtils.isNotEmpty(result)) {
+                logger.info("Master schedule service loop command success, command size: {}, current slot: {}, total slot size: {}",
+                    result.size(), thisMasterSlot, masterCount);
+            }
+            ProcessInstanceMetrics.recordCommandQueryTime(System.currentTimeMillis() - scheduleStartTime);
+            return result;
+        } catch (Exception ex) {
+            throw new MasterException("Master loop command from database error", ex);
         }
-        ProcessInstanceMetrics.recordCommandQueryTime(System.currentTimeMillis() - scheduleStartTime);
-        return result;
     }
 
     private SlotCheckState slotCheck(Command command) {
@@ -270,4 +297,34 @@ public class MasterSchedulerService extends BaseDaemonThread {
         return state;
     }
 
+    private class FailedProcessInstanceResubmitThread extends Thread {
+
+        private final LinkedBlockingQueue<ProcessInstance> submitFailedProcessInstances;
+
+        public FailedProcessInstanceResubmitThread(LinkedBlockingQueue<ProcessInstance> submitFailedProcessInstances) {
+            logger.info("Starting workflow resubmit thread");
+            this.submitFailedProcessInstances = submitFailedProcessInstances;
+            this.setDaemon(true);
+            this.setName("SubmitFailedProcessInstanceHandleThread");
+            logger.info("Started workflow resubmit thread");
+        }
+
+        @Override
+        public void run() {
+            while (Stopper.isRunning()) {
+                try {
+                    ProcessInstance processInstance = submitFailedProcessInstances.take();
+                    submitProcessInstance(processInstance);
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    logger.warn("SubmitFailedProcessInstanceHandleThread has been interrupted, will return");
+                    break;
+                }
+
+                // avoid the failed-fast cause CPU higher
+                ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
+            }
+        }
+    }
+
 }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
index 8b92696723..a56a7d8c5a 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
@@ -33,12 +33,11 @@ import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheM
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.master.runner.task.TaskInstanceKey;
 
-import org.apache.commons.lang3.ThreadUtils;
-
-import java.time.Duration;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
+import javax.annotation.PostConstruct;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -91,9 +90,14 @@ public class StateWheelExecuteThread extends BaseDaemonThread {
         super("StateWheelExecuteThread");
     }
 
+    @PostConstruct
+    public void startWheelThread() {
+        super.start();
+    }
+
     @Override
     public void run() {
-        Duration checkInterval = masterConfig.getStateWheelInterval();
+        final long checkInterval = masterConfig.getStateWheelInterval().toMillis();
         while (Stopper.isRunning()) {
             try {
                 checkTask4Timeout();
@@ -104,9 +108,11 @@ public class StateWheelExecuteThread extends BaseDaemonThread {
                 logger.error("state wheel thread check error:", e);
             }
             try {
-                ThreadUtils.sleep(checkInterval);
+                Thread.sleep(checkInterval);
             } catch (InterruptedException e) {
-                logger.error("state wheel thread sleep error", e);
+                logger.error("state wheel thread sleep error, will close the loop", e);
+                Thread.currentThread().interrupt();
+                break;
             }
         }
     }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index 25766d0a4c..7fd23ba42c 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -81,12 +81,14 @@ import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue;
 
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.ArrayUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.math.NumberUtils;
 
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -96,30 +98,29 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Lists;
 
+import lombok.NonNull;
+
 /**
  * Workflow execute task, used to execute a workflow instance.
  */
-public class WorkflowExecuteRunnable implements Runnable {
+public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
 
     /**
      * logger of WorkflowExecuteThread
      */
     private static final Logger logger = LoggerFactory.getLogger(WorkflowExecuteRunnable.class);
 
-    /**
-     * master config
-     */
-    private final MasterConfig masterConfig;
-
     /**
      * process service
      */
@@ -151,14 +152,14 @@ public class WorkflowExecuteRunnable implements Runnable {
     private DAG<String, TaskNode, TaskNodeRelation> dag;
 
     /**
-     * key of workflow
+     * unique key of workflow
      */
     private String key;
 
     /**
      * start flag, true: start nodes submit completely
      */
-    private boolean isStart = false;
+    private volatile boolean isStart = false;
 
     /**
      * submit failure nodes
@@ -240,6 +241,8 @@ public class WorkflowExecuteRunnable implements Runnable {
      */
     private final CuringParamsService curingParamsService;
 
+    private final String masterAddress;
+
     /**
      * @param processInstance         processInstance
      * @param processService          processService
@@ -248,20 +251,21 @@ public class WorkflowExecuteRunnable implements Runnable {
      * @param masterConfig            masterConfig
      * @param stateWheelExecuteThread stateWheelExecuteThread
      */
-    public WorkflowExecuteRunnable(ProcessInstance processInstance
-            , ProcessService processService
-            , NettyExecutorManager nettyExecutorManager
-            , ProcessAlertManager processAlertManager
-            , MasterConfig masterConfig
-            , StateWheelExecuteThread stateWheelExecuteThread
-            , CuringParamsService curingParamsService) {
+    public WorkflowExecuteRunnable(
+        @NonNull ProcessInstance processInstance,
+        @NonNull ProcessService processService,
+        @NonNull NettyExecutorManager nettyExecutorManager,
+        @NonNull ProcessAlertManager processAlertManager,
+        @NonNull MasterConfig masterConfig,
+        @NonNull StateWheelExecuteThread stateWheelExecuteThread,
+        @NonNull CuringParamsService curingParamsService) {
         this.processService = processService;
         this.processInstance = processInstance;
-        this.masterConfig = masterConfig;
         this.nettyExecutorManager = nettyExecutorManager;
         this.processAlertManager = processAlertManager;
         this.stateWheelExecuteThread = stateWheelExecuteThread;
         this.curingParamsService = curingParamsService;
+        this.masterAddress = NetUtils.getAddr(masterConfig.getListenPort());
         TaskMetrics.registerTaskPrepared(readyToSubmitTaskQueue::size);
     }
 
@@ -287,6 +291,7 @@ public class WorkflowExecuteRunnable implements Runnable {
                     this.stateEvents.remove(stateEvent);
                 }
             } catch (Exception e) {
+                // we catch the exception here, since if the state event handle failed, the state event will still keep in the stateEvents queue.
                 logger.error("state handle error:", e);
             } finally {
                 LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
@@ -472,6 +477,7 @@ public class WorkflowExecuteRunnable implements Runnable {
 
         if (taskInstance.getState().typeIsSuccess()) {
             completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
+            // todo: merge the last taskInstance
             processInstance.setVarPool(taskInstance.getVarPool());
             processService.saveProcessInstance(processInstance);
             if (!processInstance.isBlocked()) {
@@ -828,18 +834,24 @@ public class WorkflowExecuteRunnable implements Runnable {
      * ProcessInstance start entrypoint.
      */
     @Override
-    public void run() {
+    public WorkflowSubmitStatue call() {
         if (this.taskInstanceMap.size() > 0 || isStart) {
             logger.warn("The workflow has already been started");
-            return;
+            return WorkflowSubmitStatue.DUPLICATED_SUBMITTED;
         }
+
         try {
+            LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
             buildFlowDag();
             initTaskQueue();
             submitPostNode(null);
             isStart = true;
+            return WorkflowSubmitStatue.SUCCESS;
         } catch (Exception e) {
-            logger.error("start process error, process instance id:{}", processInstance.getId(), e);
+            logger.error("Start workflow error", e);
+            return WorkflowSubmitStatue.FAILED;
+        } finally {
+            LoggerUtils.removeWorkflowInstanceIdMDC();
         }
     }
 
@@ -893,7 +905,7 @@ public class WorkflowExecuteRunnable implements Runnable {
     }
 
     /**
-     * generate process dag
+     * Generate process dag
      *
      * @throws Exception exception
      */
@@ -905,7 +917,7 @@ public class WorkflowExecuteRunnable implements Runnable {
                 processInstance.getProcessDefinitionVersion());
         processInstance.setProcessDefinition(processDefinition);
 
-        List<TaskInstance> recoverNodeList = getStartTaskInstanceList(processInstance.getCommandParam());
+        List<TaskInstance> recoverNodeList = getRecoverTaskInstanceList(processInstance.getCommandParam());
 
         List<ProcessTaskRelation> processTaskRelations = processService.findRelationByCode(processDefinition.getCode(), processDefinition.getVersion());
         List<TaskDefinitionLog> taskDefinitionLogs = processService.getTaskDefineLogListByRelation(processTaskRelations);
@@ -996,10 +1008,10 @@ public class WorkflowExecuteRunnable implements Runnable {
                 }
                 List<Schedule> schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(processInstance.getProcessDefinitionCode());
                 if (complementListDate.isEmpty() && needComplementProcess()) {
-                    if(start != null && end != null){
+                    if (start != null && end != null) {
                         complementListDate = CronUtils.getSelfFireDateList(start, end, schedules);
                     }
-                    if(cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST)){
+                    if (cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST)) {
                         complementListDate = CronUtils.getSelfScheduleDateList(cmdParam);
                     }
                     logger.info(" process definition code:{} complement data: {}",
@@ -1096,7 +1108,7 @@ public class WorkflowExecuteRunnable implements Runnable {
 
         try {
             HostUpdateCommand hostUpdateCommand = new HostUpdateCommand();
-            hostUpdateCommand.setProcessHost(NetUtils.getAddr(masterConfig.getListenPort()));
+            hostUpdateCommand.setProcessHost(masterAddress);
             hostUpdateCommand.setTaskInstanceId(taskInstance.getId());
             Host host = new Host(taskInstance.getHost());
             nettyExecutorManager.doExecute(host, hostUpdateCommand.convert2Command());
@@ -1857,99 +1869,75 @@ public class WorkflowExecuteRunnable implements Runnable {
      * handling the list of tasks to be submitted
      */
     private void submitStandByTask() {
-        try {
-            int length = readyToSubmitTaskQueue.size();
-            for (int i = 0; i < length; i++) {
-                TaskInstance task = readyToSubmitTaskQueue.peek();
-                if (task == null) {
+        int length = readyToSubmitTaskQueue.size();
+        for (int i = 0; i < length; i++) {
+            TaskInstance task = readyToSubmitTaskQueue.peek();
+            if (task == null) {
+                continue;
+            }
+            // stop tasks which is retrying if forced success happens
+            if (task.taskCanRetry()) {
+                TaskInstance retryTask = processService.findTaskInstanceById(task.getId());
+                if (retryTask != null && retryTask.getState().equals(ExecutionStatus.FORCED_SUCCESS)) {
+                    task.setState(retryTask.getState());
+                    logger.info("task: {} has been forced success, put it into complete task list and stop retrying", task.getName());
+                    removeTaskFromStandbyList(task);
+                    completeTaskMap.put(task.getTaskCode(), task.getId());
+                    taskInstanceMap.put(task.getId(), task);
+                    submitPostNode(Long.toString(task.getTaskCode()));
                     continue;
                 }
-                // stop tasks which is retrying if forced success happens
-                if (task.taskCanRetry()) {
-                    TaskInstance retryTask = processService.findTaskInstanceById(task.getId());
-                    if (retryTask != null && retryTask.getState().equals(ExecutionStatus.FORCED_SUCCESS)) {
-                        task.setState(retryTask.getState());
-                        logger.info("task: {} has been forced success, put it into complete task list and stop retrying", task.getName());
-                        removeTaskFromStandbyList(task);
-                        completeTaskMap.put(task.getTaskCode(), task.getId());
-                        taskInstanceMap.put(task.getId(), task);
-                        submitPostNode(Long.toString(task.getTaskCode()));
-                        continue;
-                    }
-                }
-                //init varPool only this task is the first time running
-                if (task.isFirstRun()) {
-                    //get pre task ,get all the task varPool to this task
-                    Set<String> preTask = dag.getPreviousNodes(Long.toString(task.getTaskCode()));
-                    getPreVarPool(task, preTask);
-                }
-                DependResult dependResult = getDependResultForTask(task);
-                if (DependResult.SUCCESS == dependResult) {
-                    Optional<TaskInstance> taskInstanceOptional = submitTaskExec(task);
-                    if (!taskInstanceOptional.isPresent()) {
-                        this.taskFailedSubmit = true;
-                        // Remove and add to complete map and error map
-                        removeTaskFromStandbyList(task);
-                        completeTaskMap.put(task.getTaskCode(), task.getId());
-                        errorTaskMap.put(task.getTaskCode(), task.getId());
-                        logger.error("Task submitted failed, processInstanceId: {}, taskInstanceId: {}", task.getProcessInstanceId(), task.getId());
-                    } else {
-                        removeTaskFromStandbyList(task);
-                    }
-                } else if (DependResult.FAILED == dependResult) {
-                    // if the dependency fails, the current node is not submitted and the state changes to failure.
-                    dependFailedTaskMap.put(task.getTaskCode(), task.getId());
+            }
+            //init varPool only this task is the first time running
+            if (task.isFirstRun()) {
+                //get pre task ,get all the task varPool to this task
+                Set<String> preTask = dag.getPreviousNodes(Long.toString(task.getTaskCode()));
+                getPreVarPool(task, preTask);
+            }
+            DependResult dependResult = getDependResultForTask(task);
+            if (DependResult.SUCCESS == dependResult) {
+                Optional<TaskInstance> taskInstanceOptional = submitTaskExec(task);
+                if (!taskInstanceOptional.isPresent()) {
+                    this.taskFailedSubmit = true;
+                    // Remove and add to complete map and error map
                     removeTaskFromStandbyList(task);
-                    logger.info("Task dependent result is failed, taskInstanceId:{} depend result : {}", task.getId(), dependResult);
-                } else if (DependResult.NON_EXEC == dependResult) {
-                    // for some reasons(depend task pause/stop) this task would not be submit
+                    completeTaskMap.put(task.getTaskCode(), task.getId());
+                    errorTaskMap.put(task.getTaskCode(), task.getId());
+                    logger.error("Task submitted failed, processInstanceId: {}, taskInstanceId: {}", task.getProcessInstanceId(), task.getId());
+                } else {
                     removeTaskFromStandbyList(task);
-                    logger.info("Remove task due to depend result not executed, taskInstanceId:{} depend result : {}", task.getId(), dependResult);
                 }
+            } else if (DependResult.FAILED == dependResult) {
+                // if the dependency fails, the current node is not submitted and the state changes to failure.
+                dependFailedTaskMap.put(task.getTaskCode(), task.getId());
+                removeTaskFromStandbyList(task);
+                logger.info("Task dependent result is failed, taskInstanceId:{} depend result : {}", task.getId(), dependResult);
+            } else if (DependResult.NON_EXEC == dependResult) {
+                // for some reasons(depend task pause/stop) this task would not be submit
+                removeTaskFromStandbyList(task);
+                logger.info("Remove task due to depend result not executed, taskInstanceId:{} depend result : {}", task.getId(), dependResult);
             }
-        } catch (Exception e) {
-            logger.error("submit standby task error", e);
         }
     }
 
     /**
-     * get recovery task instance list
-     *
-     * @param taskIdArray task id array
-     * @return recovery task instance list
-     */
-    private List<TaskInstance> getRecoverTaskInstanceList(String[] taskIdArray) {
-        if (taskIdArray == null || taskIdArray.length == 0) {
-            return new ArrayList<>();
-        }
-        List<Integer> taskIdList = new ArrayList<>(taskIdArray.length);
-        for (String taskId : taskIdArray) {
-            try {
-                Integer id = Integer.valueOf(taskId);
-                taskIdList.add(id);
-            } catch (Exception e) {
-                logger.error("get recovery task instance failed ", e);
-            }
-        }
-        return processService.findTaskInstanceByIdList(taskIdList);
-    }
-
-    /**
-     * get start task instance list
+     * Get start task instance list from recover
      *
      * @param cmdParam command param
      * @return task instance list
      */
-    private List<TaskInstance> getStartTaskInstanceList(String cmdParam) {
-
-        List<TaskInstance> instanceList = new ArrayList<>();
+    protected List<TaskInstance> getRecoverTaskInstanceList(String cmdParam) {
         Map<String, String> paramMap = JSONUtils.toMap(cmdParam);
 
+        // todo: Can we use a better way to set the recover taskInstanceId list? rather then use the cmdParam
         if (paramMap != null && paramMap.containsKey(CMD_PARAM_RECOVERY_START_NODE_STRING)) {
             String[] idList = paramMap.get(CMD_PARAM_RECOVERY_START_NODE_STRING).split(Constants.COMMA);
-            instanceList = getRecoverTaskInstanceList(idList);
+            if (ArrayUtils.isNotEmpty(idList)) {
+                List<Integer> taskInstanceIds = Arrays.stream(idList).map(Integer::valueOf).collect(Collectors.toList());
+                return processService.findTaskInstanceByIdList(taskInstanceIds);
+            }
         }
-        return instanceList;
+        return Collections.emptyList();
     }
 
     /**
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowSubmitStatue.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowSubmitStatue.java
new file mode 100644
index 0000000000..b53a500c89
--- /dev/null
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowSubmitStatue.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.runner;
+
+public enum WorkflowSubmitStatue {
+    /**
+     * Submit success
+     */
+    SUCCESS,
+    /**
+     * Submit failed, this status should be retry
+     */
+    FAILED,
+    /**
+     * Duplicated submitted, this status should never occur.
+     */
+    DUPLICATED_SUBMITTED,
+    ;
+}
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteTaskTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java
similarity index 93%
rename from dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteTaskTest.java
rename to dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java
index 097ba120cb..51064bc514 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteTaskTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java
@@ -15,12 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.server.master;
+package org.apache.dolphinscheduler.server.master.runner;
 
 import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
 import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
 import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING;
 import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES;
+
 import static org.powermock.api.mockito.PowerMockito.mock;
 
 import org.apache.dolphinscheduler.common.enums.CommandType;
@@ -36,8 +37,8 @@ import org.apache.dolphinscheduler.dao.entity.Schedule;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
-import org.apache.dolphinscheduler.server.master.runner.StateWheelExecuteThread;
-import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
+import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
+import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 
@@ -70,7 +71,7 @@ import org.springframework.context.ApplicationContext;
  */
 @RunWith(PowerMockRunner.class)
 @PrepareForTest({WorkflowExecuteRunnable.class})
-public class WorkflowExecuteTaskTest {
+public class WorkflowExecuteRunnableTest {
 
     private WorkflowExecuteRunnable workflowExecuteThread;
 
@@ -116,7 +117,10 @@ public class WorkflowExecuteTaskTest {
 
         stateWheelExecuteThread = mock(StateWheelExecuteThread.class);
         curingGlobalParamsService = mock(CuringParamsService.class);
-        workflowExecuteThread = PowerMockito.spy(new WorkflowExecuteRunnable(processInstance, processService, null, null, config, stateWheelExecuteThread, curingGlobalParamsService));
+        NettyExecutorManager nettyExecutorManager = mock(NettyExecutorManager.class);
+        ProcessAlertManager processAlertManager = mock(ProcessAlertManager.class);
+        workflowExecuteThread =
+            PowerMockito.spy(new WorkflowExecuteRunnable(processInstance, processService, nettyExecutorManager, processAlertManager, config, stateWheelExecuteThread, curingGlobalParamsService));
         // prepareProcess init dag
         Field dag = WorkflowExecuteRunnable.class.getDeclaredField("dag");
         dag.setAccessible(true);
@@ -157,9 +161,9 @@ public class WorkflowExecuteTaskTest {
                     Arrays.asList(taskInstance1.getId(), taskInstance2.getId(), taskInstance3.getId(), taskInstance4.getId()))
             ).thenReturn(Arrays.asList(taskInstance1, taskInstance2, taskInstance3, taskInstance4));
             Class<WorkflowExecuteRunnable> masterExecThreadClass = WorkflowExecuteRunnable.class;
-            Method method = masterExecThreadClass.getDeclaredMethod("getStartTaskInstanceList", String.class);
+            Method method = masterExecThreadClass.getDeclaredMethod("getRecoverTaskInstanceList", String.class);
             method.setAccessible(true);
-            List<TaskInstance> taskInstances = (List<TaskInstance>) method.invoke(workflowExecuteThread, JSONUtils.toJsonString(cmdParam));
+            List<TaskInstance> taskInstances = workflowExecuteThread.getRecoverTaskInstanceList(JSONUtils.toJsonString(cmdParam));
             Assert.assertEquals(4, taskInstances.size());
 
             cmdParam.put(CMD_PARAM_RECOVERY_START_NODE_STRING, "1");
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index 26afcc4408..6066fe525f 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -81,7 +81,7 @@ public interface ProcessService {
 
     ProcessDefinition findProcessDefineById(int processDefinitionId);
 
-    ProcessDefinition findProcessDefinition(Long processDefinitionCode, int version);
+    ProcessDefinition findProcessDefinition(Long processDefinitionCode, int processDefinitionVersion);
 
     ProcessDefinition findProcessDefinitionByCode(Long processDefinitionCode);