You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by we...@apache.org on 2022/07/04 14:08:20 UTC
[dolphinscheduler] branch dev updated: [Fix-10666] Workflow submit failed will still in memory and never retry (#10667)
This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 35a10d092f [Fix-10666] Workflow submit failed will still in memory and never retry (#10667)
35a10d092f is described below
commit 35a10d092f566c07137da5fb67b21cee644cdc8f
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Mon Jul 4 22:08:15 2022 +0800
[Fix-10666] Workflow submit failed will still in memory and never retry (#10667)
* Workflow submit failed will still in memory and never retry
---
.../server/master/exception/MasterException.java | 29 ++++
.../master/metrics/ProcessInstanceMetrics.java | 6 +
.../master/runner/MasterSchedulerService.java | 161 ++++++++++++------
.../master/runner/StateWheelExecuteThread.java | 18 +-
.../master/runner/WorkflowExecuteRunnable.java | 184 ++++++++++-----------
.../server/master/runner/WorkflowSubmitStatue.java | 34 ++++
.../WorkflowExecuteRunnableTest.java} | 18 +-
.../service/process/ProcessService.java | 2 +-
8 files changed, 288 insertions(+), 164 deletions(-)
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/exception/MasterException.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/exception/MasterException.java
new file mode 100644
index 0000000000..eb5d5e8dfe
--- /dev/null
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/exception/MasterException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.exception;
+
+public class MasterException extends Exception {
+
+ public MasterException(String message) {
+ super(message);
+ }
+
+ public MasterException(String message, Throwable throwable) {
+ super(message, throwable);
+ }
+}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/ProcessInstanceMetrics.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/ProcessInstanceMetrics.java
index ad0e479e5b..8edf3f0c86 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/ProcessInstanceMetrics.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/ProcessInstanceMetrics.java
@@ -90,6 +90,12 @@ public final class ProcessInstanceMetrics {
.register(Metrics.globalRegistry);
}
+ public static synchronized void registerProcessInstanceResubmitGauge(Supplier<Number> function) {
+ Gauge.builder("ds.workflow.instance.resubmit", function)
+ .description("The current process instance need to resubmit count")
+ .register(Metrics.globalRegistry);
+ }
+
public static void incProcessInstanceSubmit() {
PROCESS_INSTANCE_SUBMIT_COUNTER.increment();
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
index 3254d6c5b5..334df5baa3 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
@@ -28,11 +28,10 @@ import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
-import org.apache.dolphinscheduler.remote.NettyRemotingClient;
-import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
+import org.apache.dolphinscheduler.server.master.exception.MasterException;
import org.apache.dolphinscheduler.server.master.metrics.MasterServerMetrics;
import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics;
import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
@@ -44,7 +43,9 @@ import org.apache.commons.collections4.CollectionUtils;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import org.slf4j.Logger;
@@ -52,6 +53,8 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
+import lombok.NonNull;
+
/**
* Master scheduler thread, this thread will consume the commands from database and trigger processInstance executed.
*/
@@ -72,8 +75,6 @@ public class MasterSchedulerService extends BaseDaemonThread {
@Autowired
private ProcessAlertManager processAlertManager;
- private NettyRemotingClient nettyRemotingClient;
-
@Autowired
private NettyExecutorManager nettyExecutorManager;
@@ -97,6 +98,10 @@ public class MasterSchedulerService extends BaseDaemonThread {
@Autowired
private CuringParamsService curingGlobalParamsService;
+ private final LinkedBlockingQueue<ProcessInstance> submitFailedProcessInstances = new LinkedBlockingQueue<>();
+
+ private Thread failedProcessInstanceResubmitThread;
+
private String masterAddress;
protected MasterSchedulerService() {
@@ -108,22 +113,23 @@ public class MasterSchedulerService extends BaseDaemonThread {
*/
public void init() {
this.masterPrepareExecService = (ThreadPoolExecutor) ThreadUtils.newDaemonFixedThreadExecutor("MasterPreExecThread", masterConfig.getPreExecThreads());
- NettyClientConfig clientConfig = new NettyClientConfig();
- this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
this.masterAddress = NetUtils.getAddr(masterConfig.getListenPort());
+ this.failedProcessInstanceResubmitThread = new FailedProcessInstanceResubmitThread(submitFailedProcessInstances);
+ ProcessInstanceMetrics.registerProcessInstanceResubmitGauge(submitFailedProcessInstances::size);
}
@Override
public synchronized void start() {
logger.info("Master schedule service starting..");
- this.stateWheelExecuteThread.start();
super.start();
+ this.failedProcessInstanceResubmitThread.start();
logger.info("Master schedule service started...");
}
public void close() {
logger.info("Master schedule service stopping...");
- nettyRemotingClient.close();
+ // these process instances will be failover, so we can safa clear here
+ submitFailedProcessInstances.clear();
logger.info("Master schedule service stopped...");
}
@@ -146,7 +152,9 @@ public class MasterSchedulerService extends BaseDaemonThread {
Thread.currentThread().interrupt();
break;
} catch (Exception e) {
- logger.error("Master schedule service loop command error", e);
+ logger.error("Master schedule workflow error", e);
+ // sleep for 1s here to avoid the database down cause the exception boom
+ ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
}
}
}
@@ -154,7 +162,7 @@ public class MasterSchedulerService extends BaseDaemonThread {
/**
* Query command from database by slot, and transform to workflow instance, then submit to workflowExecuteThreadPool.
*/
- private void scheduleWorkflow() throws InterruptedException {
+ private void scheduleWorkflow() throws InterruptedException, MasterException {
List<Command> commands = findCommands();
if (CollectionUtils.isEmpty(commands)) {
// indicate that no command ,sleep for 1s
@@ -164,38 +172,53 @@ public class MasterSchedulerService extends BaseDaemonThread {
List<ProcessInstance> processInstances = command2ProcessInstance(commands);
if (CollectionUtils.isEmpty(processInstances)) {
+ // indicate that the command transform to processInstance error, sleep for 1s
+ Thread.sleep(Constants.SLEEP_TIME_MILLIS);
return;
}
MasterServerMetrics.incMasterConsumeCommand(commands.size());
for (ProcessInstance processInstance : processInstances) {
- try {
- LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
- logger.info("Master schedule service starting workflow instance");
- final WorkflowExecuteRunnable workflowExecuteRunnable = new WorkflowExecuteRunnable(
- processInstance
- , processService
- , nettyExecutorManager
- , processAlertManager
- , masterConfig
- , stateWheelExecuteThread
- , curingGlobalParamsService);
-
- this.processInstanceExecCacheManager.cache(processInstance.getId(), workflowExecuteRunnable);
- if (processInstance.getTimeout() > 0) {
- stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance);
- }
- ProcessInstanceMetrics.incProcessInstanceSubmit();
- workflowExecuteThreadPool.submit(workflowExecuteRunnable);
- logger.info("Master schedule service started workflow instance");
-
- } catch (Exception ex) {
- processInstanceExecCacheManager.removeByProcessInstanceId(processInstance.getId());
- stateWheelExecuteThread.removeProcess4TimeoutCheck(processInstance.getId());
- logger.info("Master submit workflow to thread pool failed, will remove workflow runnable from cache manager", ex);
- } finally {
- LoggerUtils.removeWorkflowInstanceIdMDC();
+ submitProcessInstance(processInstance);
+ }
+ }
+
+ private void submitProcessInstance(@NonNull ProcessInstance processInstance) {
+ try {
+ LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
+ logger.info("Master schedule service starting workflow instance");
+ final WorkflowExecuteRunnable workflowExecuteRunnable = new WorkflowExecuteRunnable(
+ processInstance
+ , processService
+ , nettyExecutorManager
+ , processAlertManager
+ , masterConfig
+ , stateWheelExecuteThread
+ , curingGlobalParamsService);
+
+ this.processInstanceExecCacheManager.cache(processInstance.getId(), workflowExecuteRunnable);
+ if (processInstance.getTimeout() > 0) {
+ stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance);
}
+ ProcessInstanceMetrics.incProcessInstanceSubmit();
+ CompletableFuture<WorkflowSubmitStatue> workflowSubmitFuture = CompletableFuture.supplyAsync(
+ workflowExecuteRunnable::call, workflowExecuteThreadPool);
+ workflowSubmitFuture.thenAccept(workflowSubmitStatue -> {
+ if (WorkflowSubmitStatue.FAILED == workflowSubmitStatue) {
+ // submit failed
+ processInstanceExecCacheManager.removeByProcessInstanceId(processInstance.getId());
+ stateWheelExecuteThread.removeProcess4TimeoutCheck(processInstance.getId());
+ submitFailedProcessInstances.add(processInstance);
+ }
+ });
+ logger.info("Master schedule service started workflow instance");
+
+ } catch (Exception ex) {
+ processInstanceExecCacheManager.removeByProcessInstanceId(processInstance.getId());
+ stateWheelExecuteThread.removeProcess4TimeoutCheck(processInstance.getId());
+ logger.info("Master submit workflow to thread pool failed, will remove workflow runnable from cache manager", ex);
+ } finally {
+ LoggerUtils.removeWorkflowInstanceIdMDC();
}
}
@@ -237,23 +260,27 @@ public class MasterSchedulerService extends BaseDaemonThread {
return processInstances;
}
- private List<Command> findCommands() {
- long scheduleStartTime = System.currentTimeMillis();
- int thisMasterSlot = ServerNodeManager.getSlot();
- int masterCount = ServerNodeManager.getMasterSize();
- if (masterCount <= 0) {
- logger.warn("Master count: {} is invalid, the current slot: {}", masterCount, thisMasterSlot);
- return Collections.emptyList();
- }
- int pageNumber = 0;
- int pageSize = masterConfig.getFetchCommandNum();
- final List<Command> result = processService.findCommandPageBySlot(pageSize, pageNumber, masterCount, thisMasterSlot);
- if (CollectionUtils.isNotEmpty(result)) {
- logger.info("Master schedule service loop command success, command size: {}, current slot: {}, total slot size: {}",
- result.size(), thisMasterSlot, masterCount);
+ private List<Command> findCommands() throws MasterException {
+ try {
+ long scheduleStartTime = System.currentTimeMillis();
+ int thisMasterSlot = ServerNodeManager.getSlot();
+ int masterCount = ServerNodeManager.getMasterSize();
+ if (masterCount <= 0) {
+ logger.warn("Master count: {} is invalid, the current slot: {}", masterCount, thisMasterSlot);
+ return Collections.emptyList();
+ }
+ int pageNumber = 0;
+ int pageSize = masterConfig.getFetchCommandNum();
+ final List<Command> result = processService.findCommandPageBySlot(pageSize, pageNumber, masterCount, thisMasterSlot);
+ if (CollectionUtils.isNotEmpty(result)) {
+ logger.info("Master schedule service loop command success, command size: {}, current slot: {}, total slot size: {}",
+ result.size(), thisMasterSlot, masterCount);
+ }
+ ProcessInstanceMetrics.recordCommandQueryTime(System.currentTimeMillis() - scheduleStartTime);
+ return result;
+ } catch (Exception ex) {
+ throw new MasterException("Master loop command from database error", ex);
}
- ProcessInstanceMetrics.recordCommandQueryTime(System.currentTimeMillis() - scheduleStartTime);
- return result;
}
private SlotCheckState slotCheck(Command command) {
@@ -270,4 +297,34 @@ public class MasterSchedulerService extends BaseDaemonThread {
return state;
}
+ private class FailedProcessInstanceResubmitThread extends Thread {
+
+ private final LinkedBlockingQueue<ProcessInstance> submitFailedProcessInstances;
+
+ public FailedProcessInstanceResubmitThread(LinkedBlockingQueue<ProcessInstance> submitFailedProcessInstances) {
+ logger.info("Starting workflow resubmit thread");
+ this.submitFailedProcessInstances = submitFailedProcessInstances;
+ this.setDaemon(true);
+ this.setName("SubmitFailedProcessInstanceHandleThread");
+ logger.info("Started workflow resubmit thread");
+ }
+
+ @Override
+ public void run() {
+ while (Stopper.isRunning()) {
+ try {
+ ProcessInstance processInstance = submitFailedProcessInstances.take();
+ submitProcessInstance(processInstance);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ logger.warn("SubmitFailedProcessInstanceHandleThread has been interrupted, will return");
+ break;
+ }
+
+ // avoid the failed-fast cause CPU higher
+ ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
+ }
+ }
+ }
+
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
index 8b92696723..a56a7d8c5a 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
@@ -33,12 +33,11 @@ import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheM
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.runner.task.TaskInstanceKey;
-import org.apache.commons.lang3.ThreadUtils;
-
-import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.ConcurrentLinkedQueue;
+import javax.annotation.PostConstruct;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -91,9 +90,14 @@ public class StateWheelExecuteThread extends BaseDaemonThread {
super("StateWheelExecuteThread");
}
+ @PostConstruct
+ public void startWheelThread() {
+ super.start();
+ }
+
@Override
public void run() {
- Duration checkInterval = masterConfig.getStateWheelInterval();
+ final long checkInterval = masterConfig.getStateWheelInterval().toMillis();
while (Stopper.isRunning()) {
try {
checkTask4Timeout();
@@ -104,9 +108,11 @@ public class StateWheelExecuteThread extends BaseDaemonThread {
logger.error("state wheel thread check error:", e);
}
try {
- ThreadUtils.sleep(checkInterval);
+ Thread.sleep(checkInterval);
} catch (InterruptedException e) {
- logger.error("state wheel thread sleep error", e);
+ logger.error("state wheel thread sleep error, will close the loop", e);
+ Thread.currentThread().interrupt();
+ break;
}
}
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index 25766d0a4c..7fd23ba42c 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -81,12 +81,14 @@ import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue;
import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
@@ -96,30 +98,29 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
+import lombok.NonNull;
+
/**
* Workflow execute task, used to execute a workflow instance.
*/
-public class WorkflowExecuteRunnable implements Runnable {
+public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
/**
* logger of WorkflowExecuteThread
*/
private static final Logger logger = LoggerFactory.getLogger(WorkflowExecuteRunnable.class);
- /**
- * master config
- */
- private final MasterConfig masterConfig;
-
/**
* process service
*/
@@ -151,14 +152,14 @@ public class WorkflowExecuteRunnable implements Runnable {
private DAG<String, TaskNode, TaskNodeRelation> dag;
/**
- * key of workflow
+ * unique key of workflow
*/
private String key;
/**
* start flag, true: start nodes submit completely
*/
- private boolean isStart = false;
+ private volatile boolean isStart = false;
/**
* submit failure nodes
@@ -240,6 +241,8 @@ public class WorkflowExecuteRunnable implements Runnable {
*/
private final CuringParamsService curingParamsService;
+ private final String masterAddress;
+
/**
* @param processInstance processInstance
* @param processService processService
@@ -248,20 +251,21 @@ public class WorkflowExecuteRunnable implements Runnable {
* @param masterConfig masterConfig
* @param stateWheelExecuteThread stateWheelExecuteThread
*/
- public WorkflowExecuteRunnable(ProcessInstance processInstance
- , ProcessService processService
- , NettyExecutorManager nettyExecutorManager
- , ProcessAlertManager processAlertManager
- , MasterConfig masterConfig
- , StateWheelExecuteThread stateWheelExecuteThread
- , CuringParamsService curingParamsService) {
+ public WorkflowExecuteRunnable(
+ @NonNull ProcessInstance processInstance,
+ @NonNull ProcessService processService,
+ @NonNull NettyExecutorManager nettyExecutorManager,
+ @NonNull ProcessAlertManager processAlertManager,
+ @NonNull MasterConfig masterConfig,
+ @NonNull StateWheelExecuteThread stateWheelExecuteThread,
+ @NonNull CuringParamsService curingParamsService) {
this.processService = processService;
this.processInstance = processInstance;
- this.masterConfig = masterConfig;
this.nettyExecutorManager = nettyExecutorManager;
this.processAlertManager = processAlertManager;
this.stateWheelExecuteThread = stateWheelExecuteThread;
this.curingParamsService = curingParamsService;
+ this.masterAddress = NetUtils.getAddr(masterConfig.getListenPort());
TaskMetrics.registerTaskPrepared(readyToSubmitTaskQueue::size);
}
@@ -287,6 +291,7 @@ public class WorkflowExecuteRunnable implements Runnable {
this.stateEvents.remove(stateEvent);
}
} catch (Exception e) {
+ // we catch the exception here, since if the state event handle failed, the state event will still keep in the stateEvents queue.
logger.error("state handle error:", e);
} finally {
LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
@@ -472,6 +477,7 @@ public class WorkflowExecuteRunnable implements Runnable {
if (taskInstance.getState().typeIsSuccess()) {
completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
+ // todo: merge the last taskInstance
processInstance.setVarPool(taskInstance.getVarPool());
processService.saveProcessInstance(processInstance);
if (!processInstance.isBlocked()) {
@@ -828,18 +834,24 @@ public class WorkflowExecuteRunnable implements Runnable {
* ProcessInstance start entrypoint.
*/
@Override
- public void run() {
+ public WorkflowSubmitStatue call() {
if (this.taskInstanceMap.size() > 0 || isStart) {
logger.warn("The workflow has already been started");
- return;
+ return WorkflowSubmitStatue.DUPLICATED_SUBMITTED;
}
+
try {
+ LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
buildFlowDag();
initTaskQueue();
submitPostNode(null);
isStart = true;
+ return WorkflowSubmitStatue.SUCCESS;
} catch (Exception e) {
- logger.error("start process error, process instance id:{}", processInstance.getId(), e);
+ logger.error("Start workflow error", e);
+ return WorkflowSubmitStatue.FAILED;
+ } finally {
+ LoggerUtils.removeWorkflowInstanceIdMDC();
}
}
@@ -893,7 +905,7 @@ public class WorkflowExecuteRunnable implements Runnable {
}
/**
- * generate process dag
+ * Generate process dag
*
* @throws Exception exception
*/
@@ -905,7 +917,7 @@ public class WorkflowExecuteRunnable implements Runnable {
processInstance.getProcessDefinitionVersion());
processInstance.setProcessDefinition(processDefinition);
- List<TaskInstance> recoverNodeList = getStartTaskInstanceList(processInstance.getCommandParam());
+ List<TaskInstance> recoverNodeList = getRecoverTaskInstanceList(processInstance.getCommandParam());
List<ProcessTaskRelation> processTaskRelations = processService.findRelationByCode(processDefinition.getCode(), processDefinition.getVersion());
List<TaskDefinitionLog> taskDefinitionLogs = processService.getTaskDefineLogListByRelation(processTaskRelations);
@@ -996,10 +1008,10 @@ public class WorkflowExecuteRunnable implements Runnable {
}
List<Schedule> schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(processInstance.getProcessDefinitionCode());
if (complementListDate.isEmpty() && needComplementProcess()) {
- if(start != null && end != null){
+ if (start != null && end != null) {
complementListDate = CronUtils.getSelfFireDateList(start, end, schedules);
}
- if(cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST)){
+ if (cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST)) {
complementListDate = CronUtils.getSelfScheduleDateList(cmdParam);
}
logger.info(" process definition code:{} complement data: {}",
@@ -1096,7 +1108,7 @@ public class WorkflowExecuteRunnable implements Runnable {
try {
HostUpdateCommand hostUpdateCommand = new HostUpdateCommand();
- hostUpdateCommand.setProcessHost(NetUtils.getAddr(masterConfig.getListenPort()));
+ hostUpdateCommand.setProcessHost(masterAddress);
hostUpdateCommand.setTaskInstanceId(taskInstance.getId());
Host host = new Host(taskInstance.getHost());
nettyExecutorManager.doExecute(host, hostUpdateCommand.convert2Command());
@@ -1857,99 +1869,75 @@ public class WorkflowExecuteRunnable implements Runnable {
* handling the list of tasks to be submitted
*/
private void submitStandByTask() {
- try {
- int length = readyToSubmitTaskQueue.size();
- for (int i = 0; i < length; i++) {
- TaskInstance task = readyToSubmitTaskQueue.peek();
- if (task == null) {
+ int length = readyToSubmitTaskQueue.size();
+ for (int i = 0; i < length; i++) {
+ TaskInstance task = readyToSubmitTaskQueue.peek();
+ if (task == null) {
+ continue;
+ }
+ // stop tasks which is retrying if forced success happens
+ if (task.taskCanRetry()) {
+ TaskInstance retryTask = processService.findTaskInstanceById(task.getId());
+ if (retryTask != null && retryTask.getState().equals(ExecutionStatus.FORCED_SUCCESS)) {
+ task.setState(retryTask.getState());
+ logger.info("task: {} has been forced success, put it into complete task list and stop retrying", task.getName());
+ removeTaskFromStandbyList(task);
+ completeTaskMap.put(task.getTaskCode(), task.getId());
+ taskInstanceMap.put(task.getId(), task);
+ submitPostNode(Long.toString(task.getTaskCode()));
continue;
}
- // stop tasks which is retrying if forced success happens
- if (task.taskCanRetry()) {
- TaskInstance retryTask = processService.findTaskInstanceById(task.getId());
- if (retryTask != null && retryTask.getState().equals(ExecutionStatus.FORCED_SUCCESS)) {
- task.setState(retryTask.getState());
- logger.info("task: {} has been forced success, put it into complete task list and stop retrying", task.getName());
- removeTaskFromStandbyList(task);
- completeTaskMap.put(task.getTaskCode(), task.getId());
- taskInstanceMap.put(task.getId(), task);
- submitPostNode(Long.toString(task.getTaskCode()));
- continue;
- }
- }
- //init varPool only this task is the first time running
- if (task.isFirstRun()) {
- //get pre task ,get all the task varPool to this task
- Set<String> preTask = dag.getPreviousNodes(Long.toString(task.getTaskCode()));
- getPreVarPool(task, preTask);
- }
- DependResult dependResult = getDependResultForTask(task);
- if (DependResult.SUCCESS == dependResult) {
- Optional<TaskInstance> taskInstanceOptional = submitTaskExec(task);
- if (!taskInstanceOptional.isPresent()) {
- this.taskFailedSubmit = true;
- // Remove and add to complete map and error map
- removeTaskFromStandbyList(task);
- completeTaskMap.put(task.getTaskCode(), task.getId());
- errorTaskMap.put(task.getTaskCode(), task.getId());
- logger.error("Task submitted failed, processInstanceId: {}, taskInstanceId: {}", task.getProcessInstanceId(), task.getId());
- } else {
- removeTaskFromStandbyList(task);
- }
- } else if (DependResult.FAILED == dependResult) {
- // if the dependency fails, the current node is not submitted and the state changes to failure.
- dependFailedTaskMap.put(task.getTaskCode(), task.getId());
+ }
+ //init varPool only this task is the first time running
+ if (task.isFirstRun()) {
+ //get pre task ,get all the task varPool to this task
+ Set<String> preTask = dag.getPreviousNodes(Long.toString(task.getTaskCode()));
+ getPreVarPool(task, preTask);
+ }
+ DependResult dependResult = getDependResultForTask(task);
+ if (DependResult.SUCCESS == dependResult) {
+ Optional<TaskInstance> taskInstanceOptional = submitTaskExec(task);
+ if (!taskInstanceOptional.isPresent()) {
+ this.taskFailedSubmit = true;
+ // Remove and add to complete map and error map
removeTaskFromStandbyList(task);
- logger.info("Task dependent result is failed, taskInstanceId:{} depend result : {}", task.getId(), dependResult);
- } else if (DependResult.NON_EXEC == dependResult) {
- // for some reasons(depend task pause/stop) this task would not be submit
+ completeTaskMap.put(task.getTaskCode(), task.getId());
+ errorTaskMap.put(task.getTaskCode(), task.getId());
+ logger.error("Task submitted failed, processInstanceId: {}, taskInstanceId: {}", task.getProcessInstanceId(), task.getId());
+ } else {
removeTaskFromStandbyList(task);
- logger.info("Remove task due to depend result not executed, taskInstanceId:{} depend result : {}", task.getId(), dependResult);
}
+ } else if (DependResult.FAILED == dependResult) {
+ // if the dependency fails, the current node is not submitted and the state changes to failure.
+ dependFailedTaskMap.put(task.getTaskCode(), task.getId());
+ removeTaskFromStandbyList(task);
+ logger.info("Task dependent result is failed, taskInstanceId:{} depend result : {}", task.getId(), dependResult);
+ } else if (DependResult.NON_EXEC == dependResult) {
+ // for some reasons(depend task pause/stop) this task would not be submit
+ removeTaskFromStandbyList(task);
+ logger.info("Remove task due to depend result not executed, taskInstanceId:{} depend result : {}", task.getId(), dependResult);
}
- } catch (Exception e) {
- logger.error("submit standby task error", e);
}
}
/**
- * get recovery task instance list
- *
- * @param taskIdArray task id array
- * @return recovery task instance list
- */
- private List<TaskInstance> getRecoverTaskInstanceList(String[] taskIdArray) {
- if (taskIdArray == null || taskIdArray.length == 0) {
- return new ArrayList<>();
- }
- List<Integer> taskIdList = new ArrayList<>(taskIdArray.length);
- for (String taskId : taskIdArray) {
- try {
- Integer id = Integer.valueOf(taskId);
- taskIdList.add(id);
- } catch (Exception e) {
- logger.error("get recovery task instance failed ", e);
- }
- }
- return processService.findTaskInstanceByIdList(taskIdList);
- }
-
- /**
- * get start task instance list
+ * Get start task instance list from recover
*
* @param cmdParam command param
* @return task instance list
*/
- private List<TaskInstance> getStartTaskInstanceList(String cmdParam) {
-
- List<TaskInstance> instanceList = new ArrayList<>();
+ protected List<TaskInstance> getRecoverTaskInstanceList(String cmdParam) {
Map<String, String> paramMap = JSONUtils.toMap(cmdParam);
+ // todo: Can we use a better way to set the recover taskInstanceId list? rather then use the cmdParam
if (paramMap != null && paramMap.containsKey(CMD_PARAM_RECOVERY_START_NODE_STRING)) {
String[] idList = paramMap.get(CMD_PARAM_RECOVERY_START_NODE_STRING).split(Constants.COMMA);
- instanceList = getRecoverTaskInstanceList(idList);
+ if (ArrayUtils.isNotEmpty(idList)) {
+ List<Integer> taskInstanceIds = Arrays.stream(idList).map(Integer::valueOf).collect(Collectors.toList());
+ return processService.findTaskInstanceByIdList(taskInstanceIds);
+ }
}
- return instanceList;
+ return Collections.emptyList();
}
/**
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowSubmitStatue.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowSubmitStatue.java
new file mode 100644
index 0000000000..b53a500c89
--- /dev/null
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowSubmitStatue.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.runner;
+
+public enum WorkflowSubmitStatue {
+ /**
+ * Submit success
+ */
+ SUCCESS,
+ /**
+ * Submit failed, this status should be retry
+ */
+ FAILED,
+ /**
+ * Duplicated submitted, this status should never occur.
+ */
+ DUPLICATED_SUBMITTED,
+ ;
+}
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteTaskTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java
similarity index 93%
rename from dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteTaskTest.java
rename to dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java
index 097ba120cb..51064bc514 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteTaskTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java
@@ -15,12 +15,13 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.server.master;
+package org.apache.dolphinscheduler.server.master.runner;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES;
+
import static org.powermock.api.mockito.PowerMockito.mock;
import org.apache.dolphinscheduler.common.enums.CommandType;
@@ -36,8 +37,8 @@ import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
-import org.apache.dolphinscheduler.server.master.runner.StateWheelExecuteThread;
-import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
+import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
+import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
@@ -70,7 +71,7 @@ import org.springframework.context.ApplicationContext;
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest({WorkflowExecuteRunnable.class})
-public class WorkflowExecuteTaskTest {
+public class WorkflowExecuteRunnableTest {
private WorkflowExecuteRunnable workflowExecuteThread;
@@ -116,7 +117,10 @@ public class WorkflowExecuteTaskTest {
stateWheelExecuteThread = mock(StateWheelExecuteThread.class);
curingGlobalParamsService = mock(CuringParamsService.class);
- workflowExecuteThread = PowerMockito.spy(new WorkflowExecuteRunnable(processInstance, processService, null, null, config, stateWheelExecuteThread, curingGlobalParamsService));
+ NettyExecutorManager nettyExecutorManager = mock(NettyExecutorManager.class);
+ ProcessAlertManager processAlertManager = mock(ProcessAlertManager.class);
+ workflowExecuteThread =
+ PowerMockito.spy(new WorkflowExecuteRunnable(processInstance, processService, nettyExecutorManager, processAlertManager, config, stateWheelExecuteThread, curingGlobalParamsService));
// prepareProcess init dag
Field dag = WorkflowExecuteRunnable.class.getDeclaredField("dag");
dag.setAccessible(true);
@@ -157,9 +161,9 @@ public class WorkflowExecuteTaskTest {
Arrays.asList(taskInstance1.getId(), taskInstance2.getId(), taskInstance3.getId(), taskInstance4.getId()))
).thenReturn(Arrays.asList(taskInstance1, taskInstance2, taskInstance3, taskInstance4));
Class<WorkflowExecuteRunnable> masterExecThreadClass = WorkflowExecuteRunnable.class;
- Method method = masterExecThreadClass.getDeclaredMethod("getStartTaskInstanceList", String.class);
+ Method method = masterExecThreadClass.getDeclaredMethod("getRecoverTaskInstanceList", String.class);
method.setAccessible(true);
- List<TaskInstance> taskInstances = (List<TaskInstance>) method.invoke(workflowExecuteThread, JSONUtils.toJsonString(cmdParam));
+ List<TaskInstance> taskInstances = workflowExecuteThread.getRecoverTaskInstanceList(JSONUtils.toJsonString(cmdParam));
Assert.assertEquals(4, taskInstances.size());
cmdParam.put(CMD_PARAM_RECOVERY_START_NODE_STRING, "1");
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index 26afcc4408..6066fe525f 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -81,7 +81,7 @@ public interface ProcessService {
ProcessDefinition findProcessDefineById(int processDefinitionId);
- ProcessDefinition findProcessDefinition(Long processDefinitionCode, int version);
+ ProcessDefinition findProcessDefinition(Long processDefinitionCode, int processDefinitionVersion);
ProcessDefinition findProcessDefinitionByCode(Long processDefinitionCode);