You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by we...@apache.org on 2022/07/19 04:25:48 UTC
[dolphinscheduler] 21/29: [Fix-10842] Fix master/worker failover will cause status incorrect (#10839)
This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch 3.0.0-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
commit 71edaf41a234f5c73e0c6266b6b8b927b146cd89
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Sat Jul 9 11:54:59 2022 +0800
[Fix-10842] Fix master/worker failover will cause status incorrect (#10839)
* Fix master failover will not update task instance status
* Add some failover log
* Fix worker failover will rerun task more than once
* Fix workflowInstance failover may rerun already success taskInstance
(cherry picked from commit 3f69ec8f28c7206c153f5de8c7c0d1fa33d311f9)
---
.../apache/dolphinscheduler/common/graph/DAG.java | 11 +
.../master/consumer/TaskPriorityQueueConsumer.java | 32 +-
.../master/dispatch/context/ExecutionContext.java | 45 +--
.../dispatch/executor/NettyExecutorManager.java | 3 +
.../processor/queue/TaskExecuteRunnable.java | 124 +++----
.../registry/MasterRegistryDataListener.java | 1 +
.../master/runner/FailoverExecuteThread.java | 5 +-
.../master/runner/StateWheelExecuteThread.java | 12 +-
.../master/runner/WorkflowExecuteRunnable.java | 115 ++++---
.../master/runner/task/CommonTaskProcessor.java | 5 +-
.../server/master/service/FailoverService.java | 371 +--------------------
.../master/service/MasterFailoverService.java | 253 ++++++++++++++
.../master/service/WorkerFailoverService.java | 266 +++++++++++++++
.../master/dispatch/ExecutionContextTestUtils.java | 2 +-
.../executor/NettyExecutorManagerTest.java | 4 +-
.../server/master/service/FailoverServiceTest.java | 22 +-
.../service/process/ProcessService.java | 1 +
.../service/process/ProcessServiceImpl.java | 38 ++-
.../service/queue/TaskPriority.java | 41 ++-
19 files changed, 823 insertions(+), 528 deletions(-)
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/graph/DAG.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/graph/DAG.java
index e57b3dd93e..afa780163a 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/graph/DAG.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/graph/DAG.java
@@ -498,5 +498,16 @@ public class DAG<Node, NodeInfo, EdgeInfo> {
return new AbstractMap.SimpleEntry<>(notZeroIndegreeNodeMap.size() == 0, topoResultList);
}
+ @Override
+ public String toString() {
+ return "DAG{"
+ + "nodesMap="
+ + nodesMap
+ + ", edgesMap="
+ + edgesMap
+ + ", reverseEdgesMap="
+ + reverseEdgesMap
+ + '}';
+ }
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
index f04ae15dd8..82198e942a 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
@@ -34,11 +34,11 @@ import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteExce
import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEventService;
+import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.TaskPriority;
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
-import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.commons.collections.CollectionUtils;
@@ -46,6 +46,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
+import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -187,8 +188,24 @@ public class TaskPriorityQueueConsumer extends BaseDaemonThread {
TaskMetrics.incTaskDispatch();
boolean result = false;
try {
+ WorkflowExecuteRunnable workflowExecuteRunnable =
+ processInstanceExecCacheManager.getByProcessInstanceId(taskPriority.getProcessInstanceId());
+ if (workflowExecuteRunnable == null) {
+ logger.error("Cannot find the related processInstance of the task, taskPriority: {}", taskPriority);
+ return true;
+ }
+ Optional<TaskInstance> taskInstanceOptional =
+ workflowExecuteRunnable.getTaskInstance(taskPriority.getTaskId());
+ if (!taskInstanceOptional.isPresent()) {
+ logger.error("Cannot find the task instance from related processInstance, taskPriority: {}",
+ taskPriority);
+ // we return true, so that we will drop this task.
+ return true;
+ }
+ TaskInstance taskInstance = taskInstanceOptional.get();
TaskExecutionContext context = taskPriority.getTaskExecutionContext();
- ExecutionContext executionContext = new ExecutionContext(toCommand(context), ExecutorType.WORKER, context.getWorkerGroup());
+ ExecutionContext executionContext =
+ new ExecutionContext(toCommand(context), ExecutorType.WORKER, context.getWorkerGroup(), taskInstance);
if (isTaskNeedToCheck(taskPriority)) {
if (taskInstanceIsFinalState(taskPriority.getTaskId())) {
@@ -196,16 +213,21 @@ public class TaskPriorityQueueConsumer extends BaseDaemonThread {
return true;
}
}
+
result = dispatcher.dispatch(executionContext);
if (result) {
- logger.info("Master success dispatch task to worker, taskInstanceId: {}", taskPriority.getTaskId());
+ logger.info("Master success dispatch task to worker, taskInstanceId: {}, worker: {}",
+ taskPriority.getTaskId(),
+ executionContext.getHost());
addDispatchEvent(context, executionContext);
} else {
- logger.info("Master failed to dispatch task to worker, taskInstanceId: {}", taskPriority.getTaskId());
+ logger.info("Master failed to dispatch task to worker, taskInstanceId: {}, worker: {}",
+ taskPriority.getTaskId(),
+ executionContext.getHost());
}
} catch (RuntimeException | ExecuteException e) {
- logger.error("Master dispatch task to worker error: ", e);
+ logger.error("Master dispatch task to worker error, taskPriority: {}", taskPriority, e);
}
return result;
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java
index b3fba87870..880640e91d 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java
@@ -19,13 +19,17 @@ package org.apache.dolphinscheduler.server.master.dispatch.context;
import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
+import lombok.Data;
+
/**
* execution context
*/
+@Data
public class ExecutionContext {
/**
@@ -34,51 +38,30 @@ public class ExecutionContext {
private Host host;
/**
- * command
+ * command
*/
private final Command command;
+ private final TaskInstance taskInstance;
+
/**
- * executor type : worker or client
+ * executor type : worker or client
*/
private final ExecutorType executorType;
/**
- * worker group
+ * worker group
*/
- private String workerGroup;
+ private final String workerGroup;
- public ExecutionContext(Command command, ExecutorType executorType) {
- this(command, executorType, DEFAULT_WORKER_GROUP);
+ public ExecutionContext(Command command, ExecutorType executorType, TaskInstance taskInstance) {
+ this(command, executorType, DEFAULT_WORKER_GROUP, taskInstance);
}
- public ExecutionContext(Command command, ExecutorType executorType, String workerGroup) {
+ public ExecutionContext(Command command, ExecutorType executorType, String workerGroup, TaskInstance taskInstance) {
this.command = command;
this.executorType = executorType;
this.workerGroup = workerGroup;
- }
-
- public Command getCommand() {
- return command;
- }
-
- public ExecutorType getExecutorType() {
- return executorType;
- }
-
- public void setWorkerGroup(String workerGroup) {
- this.workerGroup = workerGroup;
- }
-
- public String getWorkerGroup() {
- return this.workerGroup;
- }
-
- public Host getHost() {
- return host;
- }
-
- public void setHost(Host host) {
- this.host = host;
+ this.taskInstance = taskInstance;
}
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
index 0ba24e287d..0cc2d4b8f7 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
@@ -123,6 +123,9 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean> {
doExecute(host, command);
success = true;
context.setHost(host);
+ // We set the host to taskInstance to avoid when the worker down, this taskInstance may not be failovered, due to the taskInstance's host
+ // is not belongs to the down worker ISSUE-10842.
+ context.getTaskInstance().setHost(host.getAddress());
} catch (ExecuteException ex) {
logger.error(String.format("execute command : %s error", command), ex);
try {
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteRunnable.java
index f488fc3de1..d57adcbbbc 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteRunnable.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteRunnable.java
@@ -18,7 +18,6 @@
package org.apache.dolphinscheduler.server.master.processor.queue;
import org.apache.dolphinscheduler.common.enums.Event;
-import org.apache.dolphinscheduler.server.master.event.StateEvent;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
@@ -26,6 +25,7 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseAckCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningAckCommand;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
+import org.apache.dolphinscheduler.server.master.event.StateEvent;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
import org.apache.dolphinscheduler.server.utils.DataQualityResultOperator;
@@ -75,7 +75,7 @@ public class TaskExecuteRunnable implements Runnable {
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(event.getProcessInstanceId(), event.getTaskInstanceId());
persist(event);
} catch (Exception e) {
- logger.error("persist error, event:{}, error: {}", event, e);
+ logger.error("persist task event error, event:{}", event, e);
} finally {
this.events.remove(event);
LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
@@ -113,27 +113,29 @@ public class TaskExecuteRunnable implements Runnable {
*
* @param taskEvent taskEvent
*/
- private void persist(TaskEvent taskEvent) {
+ private void persist(TaskEvent taskEvent) throws Exception {
Event event = taskEvent.getEvent();
int taskInstanceId = taskEvent.getTaskInstanceId();
int processInstanceId = taskEvent.getProcessInstanceId();
Optional<TaskInstance> taskInstance;
- WorkflowExecuteRunnable workflowExecuteRunnable = this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
+ WorkflowExecuteRunnable workflowExecuteRunnable =
+ this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
if (workflowExecuteRunnable != null && workflowExecuteRunnable.checkTaskInstanceById(taskInstanceId)) {
taskInstance = workflowExecuteRunnable.getTaskInstance(taskInstanceId);
} else {
taskInstance = Optional.ofNullable(processService.findTaskInstanceById(taskInstanceId));
}
+ boolean needToSendEvent = true;
switch (event) {
case DISPATCH:
- handleDispatchEvent(taskEvent, taskInstance);
+ needToSendEvent = handleDispatchEvent(taskEvent, taskInstance);
// dispatch event do not need to submit state event
- return;
+ break;
case DELAY:
case RUNNING:
- handleRunningEvent(taskEvent, taskInstance);
+ needToSendEvent = handleRunningEvent(taskEvent, taskInstance);
break;
case RESULT:
handleResultEvent(taskEvent, taskInstance);
@@ -141,6 +143,10 @@ public class TaskExecuteRunnable implements Runnable {
default:
throw new IllegalArgumentException("invalid event type : " + event);
}
+ if (!needToSendEvent) {
+ logger.info("Handle task event: {} success, there is no need to send a StateEvent", taskEvent);
+ return;
+ }
StateEvent stateEvent = new StateEvent();
stateEvent.setProcessInstanceId(taskEvent.getProcessInstanceId());
@@ -153,81 +159,83 @@ public class TaskExecuteRunnable implements Runnable {
/**
* handle dispatch event
*/
- private void handleDispatchEvent(TaskEvent taskEvent, Optional<TaskInstance> taskInstanceOptional) {
+ private boolean handleDispatchEvent(TaskEvent taskEvent, Optional<TaskInstance> taskInstanceOptional) {
if (!taskInstanceOptional.isPresent()) {
logger.error("taskInstance is null");
- return;
+ return false;
}
TaskInstance taskInstance = taskInstanceOptional.get();
if (taskInstance.getState() != ExecutionStatus.SUBMITTED_SUCCESS) {
- return;
+ return false;
}
taskInstance.setState(ExecutionStatus.DISPATCH);
taskInstance.setHost(taskEvent.getWorkerAddress());
processService.saveTaskInstance(taskInstance);
+ return true;
}
/**
* handle running event
*/
- private void handleRunningEvent(TaskEvent taskEvent, Optional<TaskInstance> taskInstanceOptional) {
+ private boolean handleRunningEvent(TaskEvent taskEvent, Optional<TaskInstance> taskInstanceOptional) {
Channel channel = taskEvent.getChannel();
- try {
- if (taskInstanceOptional.isPresent()) {
- TaskInstance taskInstance = taskInstanceOptional.get();
- if (taskInstance.getState().typeIsFinished()) {
- logger.warn("task is finish, running event is meaningless, taskInstanceId:{}, state:{}", taskInstance.getId(), taskInstance.getState());
- } else {
- taskInstance.setState(taskEvent.getState());
- taskInstance.setStartTime(taskEvent.getStartTime());
- taskInstance.setHost(taskEvent.getWorkerAddress());
- taskInstance.setLogPath(taskEvent.getLogPath());
- taskInstance.setExecutePath(taskEvent.getExecutePath());
- taskInstance.setPid(taskEvent.getProcessId());
- taskInstance.setAppLink(taskEvent.getAppIds());
- processService.saveTaskInstance(taskInstance);
- }
+ if (taskInstanceOptional.isPresent()) {
+ TaskInstance taskInstance = taskInstanceOptional.get();
+ if (taskInstance.getState().typeIsFinished()) {
+ logger.warn("task is finish, running event is meaningless, taskInstanceId:{}, state:{}",
+ taskInstance.getId(),
+ taskInstance.getState());
+ return false;
+ } else {
+ taskInstance.setState(taskEvent.getState());
+ taskInstance.setStartTime(taskEvent.getStartTime());
+ taskInstance.setHost(taskEvent.getWorkerAddress());
+ taskInstance.setLogPath(taskEvent.getLogPath());
+ taskInstance.setExecutePath(taskEvent.getExecutePath());
+ taskInstance.setPid(taskEvent.getProcessId());
+ taskInstance.setAppLink(taskEvent.getAppIds());
+ processService.saveTaskInstance(taskInstance);
}
- // if taskInstance is null (maybe deleted) or finish. retry will be meaningless . so ack success
- // send ack to worker
- TaskExecuteRunningAckCommand taskExecuteRunningAckCommand = new TaskExecuteRunningAckCommand(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId());
- channel.writeAndFlush(taskExecuteRunningAckCommand.convert2Command());
- } catch (Exception e) {
- logger.error("worker ack master error", e);
- TaskExecuteRunningAckCommand taskExecuteRunningAckCommand = new TaskExecuteRunningAckCommand(ExecutionStatus.FAILURE.getCode(), -1);
- channel.writeAndFlush(taskExecuteRunningAckCommand.convert2Command());
}
+ // if taskInstance is null (maybe deleted) or finish. retry will be meaningless . so ack success
+ // send ack to worker
+ TaskExecuteRunningAckCommand taskExecuteRunningAckCommand =
+ new TaskExecuteRunningAckCommand(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId());
+ channel.writeAndFlush(taskExecuteRunningAckCommand.convert2Command());
+ return true;
}
/**
* handle result event
*/
- private void handleResultEvent(TaskEvent taskEvent, Optional<TaskInstance> taskInstanceOptional) {
+ private boolean handleResultEvent(TaskEvent taskEvent, Optional<TaskInstance> taskInstanceOptional) {
Channel channel = taskEvent.getChannel();
- try {
- if (taskInstanceOptional.isPresent()) {
- TaskInstance taskInstance = taskInstanceOptional.get();
- dataQualityResultOperator.operateDqExecuteResult(taskEvent, taskInstance);
-
- taskInstance.setStartTime(taskEvent.getStartTime());
- taskInstance.setHost(taskEvent.getWorkerAddress());
- taskInstance.setLogPath(taskEvent.getLogPath());
- taskInstance.setExecutePath(taskEvent.getExecutePath());
- taskInstance.setPid(taskEvent.getProcessId());
- taskInstance.setAppLink(taskEvent.getAppIds());
- taskInstance.setState(taskEvent.getState());
- taskInstance.setEndTime(taskEvent.getEndTime());
- taskInstance.setVarPool(taskEvent.getVarPool());
- processService.changeOutParam(taskInstance);
- processService.saveTaskInstance(taskInstance);
+ if (taskInstanceOptional.isPresent()) {
+ TaskInstance taskInstance = taskInstanceOptional.get();
+ if (taskInstance.getState().typeIsFinished()) {
+ logger.warn("The current taskInstance has already been finished, taskEvent: {}", taskEvent);
+ return false;
}
- // if taskInstance is null (maybe deleted) . retry will be meaningless . so response success
- TaskExecuteResponseAckCommand taskExecuteResponseAckCommand = new TaskExecuteResponseAckCommand(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId());
- channel.writeAndFlush(taskExecuteResponseAckCommand.convert2Command());
- } catch (Exception e) {
- logger.error("worker response master error", e);
- TaskExecuteResponseAckCommand taskExecuteResponseAckCommand = new TaskExecuteResponseAckCommand(ExecutionStatus.FAILURE.getCode(), -1);
- channel.writeAndFlush(taskExecuteResponseAckCommand.convert2Command());
+
+ dataQualityResultOperator.operateDqExecuteResult(taskEvent, taskInstance);
+
+ taskInstance.setStartTime(taskEvent.getStartTime());
+ taskInstance.setHost(taskEvent.getWorkerAddress());
+ taskInstance.setLogPath(taskEvent.getLogPath());
+ taskInstance.setExecutePath(taskEvent.getExecutePath());
+ taskInstance.setPid(taskEvent.getProcessId());
+ taskInstance.setAppLink(taskEvent.getAppIds());
+ taskInstance.setState(taskEvent.getState());
+ taskInstance.setEndTime(taskEvent.getEndTime());
+ taskInstance.setVarPool(taskEvent.getVarPool());
+ processService.changeOutParam(taskInstance);
+ processService.saveTaskInstance(taskInstance);
}
+ // if taskInstance is null (maybe deleted) . retry will be meaningless . so response success
+ TaskExecuteResponseAckCommand taskExecuteResponseAckCommand =
+ new TaskExecuteResponseAckCommand(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId());
+ channel.writeAndFlush(taskExecuteResponseAckCommand.convert2Command());
+ return true;
}
+
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryDataListener.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryDataListener.java
index 30cea36db7..1eafb4cb54 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryDataListener.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryDataListener.java
@@ -64,6 +64,7 @@ public class MasterRegistryDataListener implements SubscribeListener {
break;
case REMOVE:
masterRegistryClient.removeMasterNodePath(path, NodeType.MASTER, true);
+
break;
default:
break;
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
index 63f4215f27..16656c8760 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
@@ -23,6 +23,7 @@ import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.service.FailoverService;
+import org.apache.dolphinscheduler.server.master.service.MasterFailoverService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,7 +42,7 @@ public class FailoverExecuteThread extends BaseDaemonThread {
* failover service
*/
@Autowired
- private FailoverService failoverService;
+ private MasterFailoverService masterFailoverService;
protected FailoverExecuteThread() {
super("FailoverExecuteThread");
@@ -63,7 +64,7 @@ public class FailoverExecuteThread extends BaseDaemonThread {
try {
// todo: DO we need to schedule a task to do this kind of check
// This kind of check may only need to be executed when a master server start
- failoverService.checkMasterFailover();
+ masterFailoverService.checkMasterFailover();
} catch (Exception e) {
logger.error("Master failover thread execute error", e);
} finally {
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
index c70fb1f1d4..843c1b246c 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
@@ -18,7 +18,6 @@
package org.apache.dolphinscheduler.server.master.runner;
import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.server.master.event.StateEvent;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
@@ -31,6 +30,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+import org.apache.dolphinscheduler.server.master.event.StateEvent;
import org.apache.dolphinscheduler.server.master.runner.task.TaskInstanceKey;
import java.util.Optional;
@@ -296,15 +296,21 @@ public class StateWheelExecuteThread extends BaseDaemonThread {
}
if (!taskInstanceOptional.isPresent()) {
- logger.warn("Task instance retry check failed, can not find taskInstance from workflowExecuteThread, will remove this check");
+ logger.warn(
+ "Task instance retry check failed, can not find taskInstance from workflowExecuteThread, will remove this check");
taskInstanceRetryCheckList.remove(taskInstanceKey);
continue;
}
TaskInstance taskInstance = taskInstanceOptional.get();
- if (taskInstance.retryTaskIntervalOverTime()) {
+ // We check the status to avoid when we do worker failover we submit a failover task, this task may be resubmit by this
+ // thread
+ if (taskInstance.getState() != ExecutionStatus.NEED_FAULT_TOLERANCE
+ && taskInstance.retryTaskIntervalOverTime()) {
// reset taskInstance endTime and state
// todo relative funtion: TaskInstance.retryTaskIntervalOverTime, WorkflowExecuteThread.cloneRetryTaskInstance
+ logger.info("[TaskInstance-{}]The task instance can retry, will retry this task instance",
+ taskInstance.getId());
taskInstance.setEndTime(null);
taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index 16604662ac..2763d99932 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -107,6 +107,7 @@ import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.beans.BeanUtils;
import com.google.common.collect.Lists;
@@ -140,7 +141,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
/**
* process instance
*/
- private ProcessInstance processInstance;
+ private final ProcessInstance processInstance;
/**
* process definition
@@ -289,6 +290,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
StateEventHandler stateEventHandler =
StateEventHandlerManager.getStateEventHandler(stateEvent.getType())
.orElseThrow(() -> new StateEventHandleError("Cannot find handler for the given state event"));
+ logger.info("Begin to handle state event, {}", stateEvent);
if (stateEventHandler.handleStateEvent(this, stateEvent)) {
this.stateEvents.remove(stateEvent);
}
@@ -482,9 +484,12 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
*/
public void refreshProcessInstance(int processInstanceId) {
logger.info("process instance update: {}", processInstanceId);
- processInstance = processService.findProcessInstanceById(processInstanceId);
+ ProcessInstance newProcessInstance = processService.findProcessInstanceById(processInstanceId);
+ // just update the processInstance field(this is soft copy)
+ BeanUtils.copyProperties(newProcessInstance, processInstance);
+
processDefinition = processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
- processInstance.getProcessDefinitionVersion());
+ processInstance.getProcessDefinitionVersion());
processInstance.setProcessDefinition(processDefinition);
}
@@ -773,6 +778,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
}
// generate process dag
dag = DagHelper.buildDagGraph(processDag);
+ logger.info("Build dag success, dag: {}", dag);
}
/**
@@ -787,45 +793,60 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
errorTaskMap.clear();
if (!isNewProcessInstance()) {
+ logger.info("The workflowInstance is not a newly running instance, runtimes: {}, recover flag: {}",
+ processInstance.getRunTimes(),
+ processInstance.getRecovery());
List<TaskInstance> validTaskInstanceList =
processService.findValidTaskListByProcessId(processInstance.getId());
for (TaskInstance task : validTaskInstanceList) {
- if (validTaskMap.containsKey(task.getTaskCode())) {
- int oldTaskInstanceId = validTaskMap.get(task.getTaskCode());
- TaskInstance oldTaskInstance = taskInstanceMap.get(oldTaskInstanceId);
- if (!oldTaskInstance.getState().typeIsFinished() && task.getState().typeIsFinished()) {
- task.setFlag(Flag.NO);
- processService.updateTaskInstance(task);
- continue;
+ try {
+ LoggerUtils.setWorkflowAndTaskInstanceIDMDC(task.getProcessInstanceId(), task.getId());
+ logger.info(
+ "Check the taskInstance from a exist workflowInstance, existTaskInstanceCode: {}, taskInstanceStatus: {}",
+ task.getTaskCode(),
+ task.getState());
+ if (validTaskMap.containsKey(task.getTaskCode())) {
+ int oldTaskInstanceId = validTaskMap.get(task.getTaskCode());
+ TaskInstance oldTaskInstance = taskInstanceMap.get(oldTaskInstanceId);
+ if (!oldTaskInstance.getState().typeIsFinished() && task.getState().typeIsFinished()) {
+ task.setFlag(Flag.NO);
+ processService.updateTaskInstance(task);
+ continue;
+ }
+ logger.warn("have same taskCode taskInstance when init task queue, taskCode:{}",
+ task.getTaskCode());
}
- logger.warn("have same taskCode taskInstance when init task queue, taskCode:{}",
- task.getTaskCode());
- }
- validTaskMap.put(task.getTaskCode(), task.getId());
- taskInstanceMap.put(task.getId(), task);
+ validTaskMap.put(task.getTaskCode(), task.getId());
+ taskInstanceMap.put(task.getId(), task);
- if (task.isTaskComplete()) {
- completeTaskMap.put(task.getTaskCode(), task.getId());
- continue;
- }
- if (task.isConditionsTask() || DagHelper.haveConditionsAfterNode(Long.toString(task.getTaskCode()), dag)) {
- continue;
- }
- if (task.taskCanRetry()) {
- if (task.getState() == ExecutionStatus.NEED_FAULT_TOLERANCE) {
- // tolerantTaskInstance add to standby list directly
- TaskInstance tolerantTaskInstance = cloneTolerantTaskInstance(task);
- addTaskToStandByList(tolerantTaskInstance);
- } else {
- retryTaskInstance(task);
+ if (task.isTaskComplete()) {
+ completeTaskMap.put(task.getTaskCode(), task.getId());
+ continue;
}
- continue;
- }
- if (task.getState().typeIsFailure()) {
- errorTaskMap.put(task.getTaskCode(), task.getId());
+ if (task.isConditionsTask() || DagHelper.haveConditionsAfterNode(Long.toString(task.getTaskCode()),
+ dag)) {
+ continue;
+ }
+ if (task.taskCanRetry()) {
+ if (task.getState() == ExecutionStatus.NEED_FAULT_TOLERANCE) {
+ // tolerantTaskInstance add to standby list directly
+ TaskInstance tolerantTaskInstance = cloneTolerantTaskInstance(task);
+ addTaskToStandByList(tolerantTaskInstance);
+ } else {
+ retryTaskInstance(task);
+ }
+ continue;
+ }
+ if (task.getState().typeIsFailure()) {
+ errorTaskMap.put(task.getTaskCode(), task.getId());
+ }
+ } finally {
+ LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
}
}
+ } else {
+ logger.info("The current workflowInstance is a newly running workflowInstance");
}
if (processInstance.isComplementData() && complementListDate.isEmpty()) {
@@ -858,6 +879,10 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
}
}
}
+ logger.info("Initialize task queue, dependFailedTaskMap: {}, completeTaskMap: {}, errorTaskMap: {}",
+ dependFailedTaskMap,
+ completeTaskMap,
+ errorTaskMap);
}
/**
@@ -902,6 +927,15 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
validTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
taskInstanceMap.put(taskInstance.getId(), taskInstance);
activeTaskProcessorMaps.put(taskInstance.getTaskCode(), taskProcessor);
+ boolean dispatchSuccess = taskProcessor.action(TaskAction.DISPATCH);
+ if (!dispatchSuccess) {
+ logger.error("process id:{} name:{} dispatch standby task id:{} name:{} failed!",
+ processInstance.getId(),
+ processInstance.getName(),
+ taskInstance.getId(),
+ taskInstance.getName());
+ return Optional.empty();
+ }
taskProcessor.action(TaskAction.RUN);
stateWheelExecuteThread.addTask4TimeoutCheck(processInstance, taskInstance);
@@ -1835,14 +1869,19 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
* is new process instance
*/
private boolean isNewProcessInstance() {
+ if (Flag.YES.equals(processInstance.getRecovery())) {
+ logger.info("This workInstance will be recover by this execution");
+ return false;
+ }
+
if (ExecutionStatus.RUNNING_EXECUTION == processInstance.getState() && processInstance.getRunTimes() == 1) {
return true;
- } else if (processInstance.getRecovery().equals(Flag.YES)) {
- // host is empty use old task instance
- return false;
- } else {
- return false;
}
+ logger.info(
+ "The workflowInstance has been executed before, this execution is to reRun, processInstance status: {}, runTimes: {}",
+ processInstance.getState(),
+ processInstance.getRunTimes());
+ return false;
}
public Map<Long, Integer> getCompleteTaskMap() {
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
index a19d08affc..0dec26defe 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
@@ -67,7 +67,6 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
return true;
}
}
- dispatchTask();
return true;
}
@@ -110,7 +109,7 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
logger.info("submit task, but the status of the task {} is already running or delayed.", taskInstance.getName());
return true;
}
- logger.info("task ready to submit: taskInstanceId: {}", taskInstance.getId());
+ logger.info("task ready to dispatch to worker: taskInstanceId: {}", taskInstance.getId());
TaskPriority taskPriority = new TaskPriority(processInstance.getProcessInstancePriority().getCode(),
processInstance.getId(), taskInstance.getProcessInstancePriority().getCode(),
@@ -158,7 +157,7 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
TaskKillRequestCommand killCommand = new TaskKillRequestCommand();
killCommand.setTaskInstanceId(taskInstance.getId());
- ExecutionContext executionContext = new ExecutionContext(killCommand.convert2Command(), ExecutorType.WORKER);
+ ExecutionContext executionContext = new ExecutionContext(killCommand.convert2Command(), ExecutorType.WORKER, taskInstance);
Host host = Host.of(taskInstance.getHost());
executionContext.setHost(host);
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java
index 313a43c0e0..05a54e8529 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java
@@ -17,46 +17,12 @@
package org.apache.dolphinscheduler.server.master.service;
-import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.NodeType;
-import org.apache.dolphinscheduler.server.master.event.StateEvent;
-import org.apache.dolphinscheduler.common.enums.StateEventType;
-import org.apache.dolphinscheduler.common.model.Server;
-import org.apache.dolphinscheduler.common.utils.LoggerUtils;
-import org.apache.dolphinscheduler.common.utils.NetUtils;
-import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
-import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
-import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
-import org.apache.dolphinscheduler.server.master.config.MasterConfig;
-import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics;
-import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics;
-import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
-import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory;
-import org.apache.dolphinscheduler.server.utils.ProcessUtils;
-import org.apache.dolphinscheduler.service.process.ProcessService;
-import org.apache.dolphinscheduler.service.registry.RegistryClient;
-
-import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.time.StopWatch;
-
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
-import io.micrometer.core.annotation.Counted;
-import io.micrometer.core.annotation.Timed;
import lombok.NonNull;
/**
@@ -65,41 +31,14 @@ import lombok.NonNull;
@Component
public class FailoverService {
private static final Logger LOGGER = LoggerFactory.getLogger(FailoverService.class);
- private final RegistryClient registryClient;
- private final MasterConfig masterConfig;
- private final ProcessService processService;
- private final WorkflowExecuteThreadPool workflowExecuteThreadPool;
- private final ProcessInstanceExecCacheManager cacheManager;
- private final String localAddress;
-
- public FailoverService(@NonNull RegistryClient registryClient,
- @NonNull MasterConfig masterConfig,
- @NonNull ProcessService processService,
- @NonNull WorkflowExecuteThreadPool workflowExecuteThreadPool,
- @NonNull ProcessInstanceExecCacheManager cacheManager) {
- this.registryClient = registryClient;
- this.masterConfig = masterConfig;
- this.processService = processService;
- this.workflowExecuteThreadPool = workflowExecuteThreadPool;
- this.cacheManager = cacheManager;
- this.localAddress = NetUtils.getAddr(masterConfig.getListenPort());
- }
- /**
- * check master failover
- */
- @Counted(value = "failover_scheduler_check_task_count")
- @Timed(value = "failover_scheduler_check_task_time", percentiles = {0.5, 0.75, 0.95, 0.99}, histogram = true)
- public void checkMasterFailover() {
- List<String> hosts = getNeedFailoverMasterServers();
- if (CollectionUtils.isEmpty(hosts)) {
- return;
- }
- LOGGER.info("Master failover service {} begin to failover hosts:{}", localAddress, hosts);
+ private final MasterFailoverService masterFailoverService;
+ private final WorkerFailoverService workerFailoverService;
- for (String host : hosts) {
- failoverMasterWithLock(host);
- }
+ public FailoverService(@NonNull MasterFailoverService masterFailoverService,
+ @NonNull WorkerFailoverService workerFailoverService) {
+ this.masterFailoverService = masterFailoverService;
+ this.workerFailoverService = workerFailoverService;
}
/**
@@ -111,304 +50,18 @@ public class FailoverService {
public void failoverServerWhenDown(String serverHost, NodeType nodeType) {
switch (nodeType) {
case MASTER:
- failoverMasterWithLock(serverHost);
+ LOGGER.info("Master failover starting, masterServer: {}", serverHost);
+ masterFailoverService.failoverMaster(serverHost);
+ LOGGER.info("Master failover finished, masterServer: {}", serverHost);
break;
case WORKER:
- failoverWorker(serverHost);
+ LOGGER.info("Worker failover staring, workerServer: {}", serverHost);
+ workerFailoverService.failoverWorker(serverHost);
+ LOGGER.info("Worker failover finished, workerServer: {}", serverHost);
break;
default:
break;
}
}
- private void failoverMasterWithLock(String masterHost) {
- String failoverPath = getFailoverLockPath(NodeType.MASTER, masterHost);
- try {
- registryClient.getLock(failoverPath);
- this.failoverMaster(masterHost);
- } catch (Exception e) {
- LOGGER.error("{} server failover failed, host:{}", NodeType.MASTER, masterHost, e);
- } finally {
- registryClient.releaseLock(failoverPath);
- }
- }
-
- /**
- * Failover master, will failover process instance and associated task instance.
- * <p>When the process instance belongs to the given masterHost and the restartTime is before the current server start up time,
- * then the process instance will be failovered.
- *
- * @param masterHost master host
- */
- private void failoverMaster(String masterHost) {
- if (StringUtils.isEmpty(masterHost)) {
- return;
- }
- Date serverStartupTime = getServerStartupTime(NodeType.MASTER, masterHost);
- StopWatch failoverTimeCost = StopWatch.createStarted();
- List<ProcessInstance> needFailoverProcessInstanceList = processService.queryNeedFailoverProcessInstances(masterHost);
- LOGGER.info("start master[{}] failover, need to failover process list size:{}", masterHost, needFailoverProcessInstanceList.size());
-
- // servers need to contain master hosts and worker hosts, otherwise the logic task will failover fail.
- List<Server> servers = registryClient.getServerList(NodeType.WORKER);
- servers.addAll(registryClient.getServerList(NodeType.MASTER));
-
- for (ProcessInstance processInstance : needFailoverProcessInstanceList) {
- if (Constants.NULL.equals(processInstance.getHost())) {
- continue;
- }
-
- List<TaskInstance> validTaskInstanceList = processService.findValidTaskListByProcessId(processInstance.getId());
- for (TaskInstance taskInstance : validTaskInstanceList) {
- LOGGER.info("failover task instance id: {}, process instance id: {}", taskInstance.getId(), taskInstance.getProcessInstanceId());
- failoverTaskInstance(processInstance, taskInstance, servers);
- }
-
- if (serverStartupTime != null && processInstance.getRestartTime() != null
- && processInstance.getRestartTime().after(serverStartupTime)) {
- continue;
- }
-
- LOGGER.info("failover process instance id: {}", processInstance.getId());
- ProcessInstanceMetrics.incProcessInstanceFailover();
- //updateProcessInstance host is null and insert into command
- processInstance.setHost(Constants.NULL);
- processService.processNeedFailoverProcessInstances(processInstance);
- }
-
- failoverTimeCost.stop();
- LOGGER.info("master[{}] failover end, useTime:{}ms", masterHost, failoverTimeCost.getTime(TimeUnit.MILLISECONDS));
- }
-
- /**
- * Do the worker failover. Will find the SUBMITTED_SUCCESS/DISPATCH/RUNNING_EXECUTION/DELAY_EXECUTION/READY_PAUSE/READY_STOP tasks belong the given worker,
- * and failover these tasks.
- * <p>
- * Note: When we do worker failover, the master will only failover the processInstance belongs to the current master.
- *
- * @param workerHost worker host
- */
- private void failoverWorker(String workerHost) {
- if (StringUtils.isEmpty(workerHost)) {
- return;
- }
-
- long startTime = System.currentTimeMillis();
- // we query the task instance from cache, so that we can directly update the cache
- final List<TaskInstance> needFailoverTaskInstanceList = cacheManager.getAll()
- .stream()
- .flatMap(workflowExecuteRunnable -> workflowExecuteRunnable.getAllTaskInstances().stream())
- .filter(taskInstance ->
- workerHost.equals(taskInstance.getHost()) && ExecutionStatus.isNeedFailoverWorkflowInstanceState(taskInstance.getState()))
- .collect(Collectors.toList());
- final Map<Integer, ProcessInstance> processInstanceCacheMap = new HashMap<>();
- LOGGER.info("start worker[{}] failover, task list size:{}", workerHost, needFailoverTaskInstanceList.size());
- final List<Server> workerServers = registryClient.getServerList(NodeType.WORKER);
- for (TaskInstance taskInstance : needFailoverTaskInstanceList) {
- LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskInstance.getProcessInstanceId(), taskInstance.getId());
- try {
- ProcessInstance processInstance = processInstanceCacheMap.get(taskInstance.getProcessInstanceId());
- if (processInstance == null) {
- processInstance = cacheManager.getByProcessInstanceId(taskInstance.getProcessInstanceId()).getProcessInstance();
- if (processInstance == null) {
- LOGGER.error("failover task instance error, processInstance {} of taskInstance {} is null",
- taskInstance.getProcessInstanceId(), taskInstance.getId());
- continue;
- }
- processInstanceCacheMap.put(processInstance.getId(), processInstance);
- }
-
- // only failover the task owned myself if worker down.
- if (!StringUtils.equalsIgnoreCase(processInstance.getHost(), localAddress)) {
- continue;
- }
-
- LOGGER.info("failover task instance id: {}, process instance id: {}", taskInstance.getId(), taskInstance.getProcessInstanceId());
- failoverTaskInstance(processInstance, taskInstance, workerServers);
- } finally {
- LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
- }
- }
- LOGGER.info("end worker[{}] failover, useTime:{}ms", workerHost, System.currentTimeMillis() - startTime);
- }
-
- /**
- * failover task instance
- * <p>
- * 1. kill yarn job if run on worker and there are yarn jobs in tasks.
- * 2. change task state from running to need failover.
- * 3. try to notify local master
- *
- * @param processInstance
- * @param taskInstance
- * @param servers if failover master, servers container master servers and worker servers; if failover worker, servers contain worker servers.
- */
- private void failoverTaskInstance(@NonNull ProcessInstance processInstance, TaskInstance taskInstance, List<Server> servers) {
- if (!checkTaskInstanceNeedFailover(servers, taskInstance)) {
- LOGGER.info("The taskInstance doesn't need to failover");
- return;
- }
- TaskMetrics.incTaskFailover();
- boolean isMasterTask = TaskProcessorFactory.isMasterTask(taskInstance.getTaskType());
-
- taskInstance.setProcessInstance(processInstance);
-
- if (!isMasterTask) {
- LOGGER.info("The failover taskInstance is not master task");
- TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get()
- .buildTaskInstanceRelatedInfo(taskInstance)
- .buildProcessInstanceRelatedInfo(processInstance)
- .create();
-
- if (masterConfig.isKillYarnJobWhenTaskFailover()) {
- // only kill yarn job if exists , the local thread has exited
- ProcessUtils.killYarnJob(taskExecutionContext);
- }
- } else {
- LOGGER.info("The failover taskInstance is a master task");
- }
-
- taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE);
- processService.saveTaskInstance(taskInstance);
-
- StateEvent stateEvent = new StateEvent();
- stateEvent.setTaskInstanceId(taskInstance.getId());
- stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
- stateEvent.setProcessInstanceId(processInstance.getId());
- stateEvent.setExecutionStatus(taskInstance.getState());
- workflowExecuteThreadPool.submitStateEvent(stateEvent);
- }
-
- /**
- * Get need failover master servers.
- * <p>
- * Query the process instances from database, if the processInstance's host doesn't exist in registry
- * or the host is the currentServer, then it will need to failover.
- *
- * @return need failover master servers
- */
- private List<String> getNeedFailoverMasterServers() {
- // failover myself && failover dead masters
- List<String> hosts = processService.queryNeedFailoverProcessInstanceHost();
-
- Iterator<String> iterator = hosts.iterator();
- while (iterator.hasNext()) {
- String host = iterator.next();
- if (registryClient.checkNodeExists(host, NodeType.MASTER)) {
- if (!localAddress.equals(host)) {
- iterator.remove();
- }
- }
- }
- return hosts;
- }
-
- /**
- * task needs failover if task start before server starts
- *
- * @param servers servers, can container master servers or worker servers
- * @param taskInstance task instance
- * @return true if task instance need fail over
- */
- private boolean checkTaskInstanceNeedFailover(List<Server> servers, TaskInstance taskInstance) {
-
- boolean taskNeedFailover = true;
-
- if (taskInstance == null) {
- LOGGER.error("Master failover task instance error, taskInstance is null");
- return false;
- }
-
- if (Constants.NULL.equals(taskInstance.getHost())) {
- return false;
- }
-
- if (taskInstance.getState() != null && taskInstance.getState().typeIsFinished()) {
- return false;
- }
-
- //now no host will execute this task instance,so no need to failover the task
- if (taskInstance.getHost() == null) {
- return false;
- }
-
- //if task start after server starts, there is no need to failover the task.
- if (checkTaskAfterServerStart(servers, taskInstance)) {
- taskNeedFailover = false;
- }
-
- return taskNeedFailover;
- }
-
- /**
- * check task start after the worker server starts.
- *
- * @param servers servers, can contain master servers or worker servers
- * @param taskInstance task instance
- * @return true if task instance start time after server start date
- */
- private boolean checkTaskAfterServerStart(List<Server> servers, TaskInstance taskInstance) {
- if (StringUtils.isEmpty(taskInstance.getHost())) {
- return false;
- }
- Date serverStartDate = getServerStartupTime(servers, taskInstance.getHost());
- if (serverStartDate != null) {
- if (taskInstance.getStartTime() == null) {
- return taskInstance.getSubmitTime().after(serverStartDate);
- } else {
- return taskInstance.getStartTime().after(serverStartDate);
- }
- }
- return false;
- }
-
- /**
- * get failover lock path
- *
- * @param nodeType zookeeper node type
- * @return fail over lock path
- */
- private String getFailoverLockPath(NodeType nodeType, String host) {
- switch (nodeType) {
- case MASTER:
- return Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS + "/" + host;
- case WORKER:
- return Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_WORKERS + "/" + host;
- default:
- return "";
- }
- }
-
- /**
- * get server startup time
- */
- private Date getServerStartupTime(NodeType nodeType, String host) {
- if (StringUtils.isEmpty(host)) {
- return null;
- }
- List<Server> servers = registryClient.getServerList(nodeType);
- return getServerStartupTime(servers, host);
- }
-
- /**
- * get server startup time
- */
- private Date getServerStartupTime(List<Server> servers, String host) {
- if (CollectionUtils.isEmpty(servers)) {
- return null;
- }
- Date serverStartupTime = null;
- for (Server server : servers) {
- if (host.equals(server.getHost() + Constants.COLON + server.getPort())) {
- serverStartupTime = server.getCreateTime();
- break;
- }
- }
- return serverStartupTime;
- }
-
- public String getLocalAddress() {
- return localAddress;
- }
-
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java
new file mode 100644
index 0000000000..c7e5b4ea13
--- /dev/null
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.service;
+
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.Flag;
+import org.apache.dolphinscheduler.common.enums.NodeType;
+import org.apache.dolphinscheduler.common.model.Server;
+import org.apache.dolphinscheduler.common.utils.LoggerUtils;
+import org.apache.dolphinscheduler.common.utils.NetUtils;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
+import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics;
+import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics;
+import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory;
+import org.apache.dolphinscheduler.server.utils.ProcessUtils;
+import org.apache.dolphinscheduler.service.process.ProcessService;
+import org.apache.dolphinscheduler.service.registry.RegistryClient;
+
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.time.StopWatch;
+
+import java.util.Date;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+
+import io.micrometer.core.annotation.Counted;
+import io.micrometer.core.annotation.Timed;
+import lombok.NonNull;
+
+@Service
+public class MasterFailoverService {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(MasterFailoverService.class);
+ private final RegistryClient registryClient;
+ private final MasterConfig masterConfig;
+ private final ProcessService processService;
+ private final String localAddress;
+
+ public MasterFailoverService(@NonNull RegistryClient registryClient,
+ @NonNull MasterConfig masterConfig,
+ @NonNull ProcessService processService) {
+ this.registryClient = registryClient;
+ this.masterConfig = masterConfig;
+ this.processService = processService;
+ this.localAddress = NetUtils.getAddr(masterConfig.getListenPort());
+
+ }
+
+ /**
+ * check master failover
+ */
+ @Counted(value = "ds.master.scheduler.failover.check.count")
+ @Timed(value = "ds.master.scheduler.failover.check.time", percentiles = {0.5, 0.75, 0.95, 0.99}, histogram = true)
+ public void checkMasterFailover() {
+ List<String> needFailoverMasterHosts = processService.queryNeedFailoverProcessInstanceHost()
+ .stream()
+ // failover myself || dead server
+ .filter(host -> localAddress.equals(host) || !registryClient.checkNodeExists(host, NodeType.MASTER))
+ .distinct()
+ .collect(Collectors.toList());
+ if (CollectionUtils.isEmpty(needFailoverMasterHosts)) {
+ return;
+ }
+ LOGGER.info("Master failover service {} begin to failover hosts:{}", localAddress, needFailoverMasterHosts);
+
+ for (String needFailoverMasterHost : needFailoverMasterHosts) {
+ failoverMaster(needFailoverMasterHost);
+ }
+ }
+
+ public void failoverMaster(String masterHost) {
+ String failoverPath = Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS + "/" + masterHost;
+ try {
+ registryClient.getLock(failoverPath);
+ doFailoverMaster(masterHost);
+ } catch (Exception e) {
+ LOGGER.error("Master server failover failed, host:{}", masterHost, e);
+ } finally {
+ registryClient.releaseLock(failoverPath);
+ }
+ }
+
+ /**
+ * Failover master, will failover process instance and associated task instance.
+ * <p>When the process instance belongs to the given masterHost and the restartTime is before the current server start up time,
+ * then the process instance will be failovered.
+ *
+ * @param masterHost master host
+ */
+ private void doFailoverMaster(@NonNull String masterHost) {
+ LOGGER.info("Master[{}] failover starting, need to failover process", masterHost);
+ StopWatch failoverTimeCost = StopWatch.createStarted();
+
+ Optional<Date> masterStartupTimeOptional =
+ getServerStartupTime(registryClient.getServerList(NodeType.MASTER), masterHost);
+ List<ProcessInstance> needFailoverProcessInstanceList =
+ processService.queryNeedFailoverProcessInstances(masterHost);
+
+ LOGGER.info(
+ "Master[{}] failover there are {} workflowInstance may need to failover, will do a deep check, workflowInstanceIds: {}",
+ masterHost,
+ needFailoverProcessInstanceList.size(),
+ needFailoverProcessInstanceList.stream().map(ProcessInstance::getId).collect(Collectors.toList()));
+
+ for (ProcessInstance processInstance : needFailoverProcessInstanceList) {
+ try {
+ LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
+ LOGGER.info("WorkflowInstance failover starting");
+ if (!checkProcessInstanceNeedFailover(masterStartupTimeOptional, processInstance)) {
+ LOGGER.info("WorkflowInstance doesn't need to failover");
+ continue;
+ }
+ int processInstanceId = processInstance.getId();
+ List<TaskInstance> taskInstanceList = processService.findValidTaskListByProcessId(processInstanceId);
+ for (TaskInstance taskInstance : taskInstanceList) {
+ try {
+ LoggerUtils.setTaskInstanceIdMDC(taskInstance.getId());
+ LOGGER.info("TaskInstance failover starting");
+ if (!checkTaskInstanceNeedFailover(taskInstance)) {
+ LOGGER.info("The taskInstance doesn't need to failover");
+ continue;
+ }
+ failoverTaskInstance(processInstance, taskInstance);
+ LOGGER.info("TaskInstance failover finished");
+ } finally {
+ LoggerUtils.removeTaskInstanceIdMDC();
+ }
+ }
+
+ ProcessInstanceMetrics.incProcessInstanceFailover();
+ //updateProcessInstance host is null to mark this processInstance has been failover
+ // and insert a failover command
+ processInstance.setHost(Constants.NULL);
+ processService.processNeedFailoverProcessInstances(processInstance);
+ LOGGER.info("WorkflowInstance failover finished");
+ } finally {
+ LoggerUtils.removeWorkflowInstanceIdMDC();
+ }
+ }
+
+ failoverTimeCost.stop();
+ LOGGER.info("Master[{}] failover finished, useTime:{}ms",
+ masterHost,
+ failoverTimeCost.getTime(TimeUnit.MILLISECONDS));
+ }
+
+ private Optional<Date> getServerStartupTime(List<Server> servers, String host) {
+ if (CollectionUtils.isEmpty(servers)) {
+ return Optional.empty();
+ }
+ Date serverStartupTime = null;
+ for (Server server : servers) {
+ if (host.equals(server.getHost() + Constants.COLON + server.getPort())) {
+ serverStartupTime = server.getCreateTime();
+ break;
+ }
+ }
+ return Optional.ofNullable(serverStartupTime);
+ }
+
+ /**
+ * failover task instance
+ * <p>
+ * 1. kill yarn job if run on worker and there are yarn jobs in tasks.
+ * 2. change task state from running to need failover.
+ * 3. try to notify local master
+ *
+ * @param processInstance
+ * @param taskInstance
+ */
+ private void failoverTaskInstance(@NonNull ProcessInstance processInstance, @NonNull TaskInstance taskInstance) {
+ TaskMetrics.incTaskFailover();
+ boolean isMasterTask = TaskProcessorFactory.isMasterTask(taskInstance.getTaskType());
+
+ taskInstance.setProcessInstance(processInstance);
+
+ if (!isMasterTask) {
+ LOGGER.info("The failover taskInstance is not master task");
+ TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get()
+ .buildTaskInstanceRelatedInfo(taskInstance)
+ .buildProcessInstanceRelatedInfo(processInstance)
+ .create();
+
+ if (masterConfig.isKillYarnJobWhenTaskFailover()) {
+ // only kill yarn job if exists , the local thread has exited
+ LOGGER.info("TaskInstance failover begin kill the task related yarn job");
+ ProcessUtils.killYarnJob(taskExecutionContext);
+ }
+ } else {
+ LOGGER.info("The failover taskInstance is a master task");
+ }
+
+ taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE);
+ taskInstance.setFlag(Flag.NO);
+ processService.saveTaskInstance(taskInstance);
+ }
+
+ private boolean checkTaskInstanceNeedFailover(@NonNull TaskInstance taskInstance) {
+ if (taskInstance.getState() != null && taskInstance.getState().typeIsFinished()) {
+ // The task is already finished, so we don't need to failover this task instance
+ return false;
+ }
+ return true;
+ }
+
+ private boolean checkProcessInstanceNeedFailover(Optional<Date> beFailoveredMasterStartupTimeOptional,
+ @NonNull ProcessInstance processInstance) {
+ // The process has already been failover, since when we do master failover we will hold a lock, so we can guarantee
+ // the host will not be set concurrent.
+ if (Constants.NULL.equals(processInstance.getHost())) {
+ return false;
+ }
+ if (!beFailoveredMasterStartupTimeOptional.isPresent()) {
+ // the master is not active, we can failover all it's processInstance
+ return true;
+ }
+ Date beFailoveredMasterStartupTime = beFailoveredMasterStartupTimeOptional.get();
+
+ if (processInstance.getStartTime().after(beFailoveredMasterStartupTime)) {
+ // The processInstance is newly created
+ return false;
+ }
+
+ return true;
+ }
+
+}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java
new file mode 100644
index 0000000000..ec126a3ec3
--- /dev/null
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.service;
+
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.Flag;
+import org.apache.dolphinscheduler.common.enums.NodeType;
+import org.apache.dolphinscheduler.common.enums.StateEventType;
+import org.apache.dolphinscheduler.common.model.Server;
+import org.apache.dolphinscheduler.common.utils.LoggerUtils;
+import org.apache.dolphinscheduler.common.utils.NetUtils;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
+import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
+import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+import org.apache.dolphinscheduler.server.master.event.StateEvent;
+import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics;
+import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
+import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
+import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory;
+import org.apache.dolphinscheduler.server.utils.ProcessUtils;
+import org.apache.dolphinscheduler.service.process.ProcessService;
+import org.apache.dolphinscheduler.service.registry.RegistryClient;
+
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.time.StopWatch;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import javax.annotation.Nullable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+
+import lombok.NonNull;
+
+@Service
+public class WorkerFailoverService {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(WorkerFailoverService.class);
+
+ private final RegistryClient registryClient;
+ private final MasterConfig masterConfig;
+ private final ProcessService processService;
+ private final WorkflowExecuteThreadPool workflowExecuteThreadPool;
+ private final ProcessInstanceExecCacheManager cacheManager;
+ private final String localAddress;
+
+ public WorkerFailoverService(@NonNull RegistryClient registryClient,
+ @NonNull MasterConfig masterConfig,
+ @NonNull ProcessService processService,
+ @NonNull WorkflowExecuteThreadPool workflowExecuteThreadPool,
+ @NonNull ProcessInstanceExecCacheManager cacheManager) {
+ this.registryClient = registryClient;
+ this.masterConfig = masterConfig;
+ this.processService = processService;
+ this.workflowExecuteThreadPool = workflowExecuteThreadPool;
+ this.cacheManager = cacheManager;
+ this.localAddress = NetUtils.getAddr(masterConfig.getListenPort());
+ }
+
+ /**
+ * Do the worker failover. Will find the SUBMITTED_SUCCESS/DISPATCH/RUNNING_EXECUTION/DELAY_EXECUTION/READY_PAUSE/READY_STOP tasks belong the given worker,
+ * and failover these tasks.
+ * <p>
+ * Note: When we do worker failover, the master will only failover the processInstance belongs to the current master.
+ *
+ * @param workerHost worker host
+ */
+ public void failoverWorker(@NonNull String workerHost) {
+ LOGGER.info("Worker[{}] failover starting", workerHost);
+ final StopWatch failoverTimeCost = StopWatch.createStarted();
+
+ // we query the task instance from cache, so that we can directly update the cache
+ final Optional<Date> needFailoverWorkerStartTime =
+ getServerStartupTime(registryClient.getServerList(NodeType.WORKER), workerHost);
+
+ final List<TaskInstance> needFailoverTaskInstanceList = getNeedFailoverTaskInstance(workerHost);
+ if (CollectionUtils.isEmpty(needFailoverTaskInstanceList)) {
+ LOGGER.info("Worker[{}] failover finished there are no taskInstance need to failover", workerHost);
+ return;
+ }
+ LOGGER.info(
+ "Worker[{}] failover there are {} taskInstance may need to failover, will do a deep check, taskInstanceIds: {}",
+ workerHost,
+ needFailoverTaskInstanceList.size(),
+ needFailoverTaskInstanceList.stream().map(TaskInstance::getId).collect(Collectors.toList()));
+ final Map<Integer, ProcessInstance> processInstanceCacheMap = new HashMap<>();
+ for (TaskInstance taskInstance : needFailoverTaskInstanceList) {
+ LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskInstance.getProcessInstanceId(), taskInstance.getId());
+ try {
+ ProcessInstance processInstance =
+ processInstanceCacheMap.computeIfAbsent(taskInstance.getProcessInstanceId(), k -> {
+ WorkflowExecuteRunnable workflowExecuteRunnable =
+ cacheManager.getByProcessInstanceId(taskInstance.getProcessInstanceId());
+ if (workflowExecuteRunnable == null) {
+ return null;
+ }
+ return workflowExecuteRunnable.getProcessInstance();
+ });
+ if (!checkTaskInstanceNeedFailover(needFailoverWorkerStartTime, processInstance, taskInstance)) {
+ LOGGER.info("Worker[{}] the current taskInstance doesn't need to failover", workerHost);
+ continue;
+ }
+ LOGGER.info(
+ "Worker[{}] failover: begin to failover taskInstance, will set the status to NEED_FAULT_TOLERANCE",
+ workerHost);
+ failoverTaskInstance(processInstance, taskInstance);
+ LOGGER.info("Worker[{}] failover: Finish failover taskInstance", workerHost);
+ } catch (Exception ex) {
+ LOGGER.info("Worker[{}] failover taskInstance occur exception", workerHost, ex);
+ } finally {
+ LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
+ }
+ }
+ failoverTimeCost.stop();
+ LOGGER.info("Worker[{}] failover finished, useTime:{}ms",
+ workerHost,
+ failoverTimeCost.getTime(TimeUnit.MILLISECONDS));
+ }
+
+ /**
+ * failover task instance
+ * <p>
+ * 1. kill yarn job if run on worker and there are yarn jobs in tasks.
+ * 2. change task state from running to need failover.
+ * 3. try to notify local master
+ *
+ * @param processInstance
+ * @param taskInstance
+ */
+ private void failoverTaskInstance(@NonNull ProcessInstance processInstance, @NonNull TaskInstance taskInstance) {
+
+ TaskMetrics.incTaskFailover();
+ boolean isMasterTask = TaskProcessorFactory.isMasterTask(taskInstance.getTaskType());
+
+ taskInstance.setProcessInstance(processInstance);
+
+ if (!isMasterTask) {
+ LOGGER.info("The failover taskInstance is not master task");
+ TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get()
+ .buildTaskInstanceRelatedInfo(taskInstance)
+ .buildProcessInstanceRelatedInfo(processInstance)
+ .create();
+
+ if (masterConfig.isKillYarnJobWhenTaskFailover()) {
+ // only kill yarn job if exists , the local thread has exited
+ LOGGER.info("TaskInstance failover begin kill the task related yarn job");
+ ProcessUtils.killYarnJob(taskExecutionContext);
+ }
+ } else {
+ LOGGER.info("The failover taskInstance is a master task");
+ }
+
+ taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE);
+ taskInstance.setFlag(Flag.NO);
+ processService.saveTaskInstance(taskInstance);
+
+ StateEvent stateEvent = new StateEvent();
+ stateEvent.setTaskInstanceId(taskInstance.getId());
+ stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
+ stateEvent.setProcessInstanceId(processInstance.getId());
+ stateEvent.setExecutionStatus(taskInstance.getState());
+ workflowExecuteThreadPool.submitStateEvent(stateEvent);
+ }
+
+ /**
+ * task needs failover if task start before server starts
+ *
+ * @return true if task instance need fail over
+ */
+ private boolean checkTaskInstanceNeedFailover(Optional<Date> needFailoverWorkerStartTime,
+ @Nullable ProcessInstance processInstance,
+ TaskInstance taskInstance) {
+ if (processInstance == null) {
+ // This case should be happened.
+ LOGGER.error(
+ "Failover task instance error, cannot find the related processInstance form memory, this case shouldn't happened");
+ return false;
+ }
+ if (taskInstance == null) {
+ // This case should be happened.
+ LOGGER.error("Master failover task instance error, taskInstance is null, this case shouldn't happened");
+ return false;
+ }
+ // only failover the task owned myself if worker down.
+ if (!StringUtils.equalsIgnoreCase(processInstance.getHost(), localAddress)) {
+ LOGGER.error(
+ "Master failover task instance error, the taskInstance's processInstance's host: {} is not the current master: {}",
+ processInstance.getHost(),
+ localAddress);
+ return false;
+ }
+ if (taskInstance.getState() != null && taskInstance.getState().typeIsFinished()) {
+ // The taskInstance is already finished, doesn't need to failover
+ LOGGER.info("The task is already finished, doesn't need to failover");
+ return false;
+ }
+ if (!needFailoverWorkerStartTime.isPresent()) {
+ // The worker is still down
+ return true;
+ }
+ // The worker is active, may already send some new task to it
+ if (taskInstance.getSubmitTime() != null && taskInstance.getSubmitTime()
+ .after(needFailoverWorkerStartTime.get())) {
+ LOGGER.info(
+ "The taskInstance's submitTime: {} is after the need failover worker's start time: {}, the taskInstance is newly submit, it doesn't need to failover",
+ taskInstance.getSubmitTime(),
+ needFailoverWorkerStartTime.get());
+ return false;
+ }
+
+ return true;
+ }
+
+ private List<TaskInstance> getNeedFailoverTaskInstance(@NonNull String failoverWorkerHost) {
+ // we query the task instance from cache, so that we can directly update the cache
+ return cacheManager.getAll()
+ .stream()
+ .flatMap(workflowExecuteRunnable -> workflowExecuteRunnable.getAllTaskInstances().stream())
+ // If the worker is in dispatching and the host is not set
+ .filter(taskInstance -> failoverWorkerHost.equals(taskInstance.getHost())
+ && ExecutionStatus.isNeedFailoverWorkflowInstanceState(taskInstance.getState()))
+ .collect(Collectors.toList());
+ }
+
+ private Optional<Date> getServerStartupTime(List<Server> servers, String host) {
+ if (CollectionUtils.isEmpty(servers)) {
+ return Optional.empty();
+ }
+ Date serverStartupTime = null;
+ for (Server server : servers) {
+ if (host.equals(server.getHost() + Constants.COLON + server.getPort())) {
+ serverStartupTime = server.getCreateTime();
+ break;
+ }
+ }
+ return Optional.ofNullable(serverStartupTime);
+ }
+}
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutionContextTestUtils.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutionContextTestUtils.java
index b2c7d63cd7..55dd236f71 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutionContextTestUtils.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutionContextTestUtils.java
@@ -52,7 +52,7 @@ public class ExecutionContextTestUtils {
TaskExecuteRequestCommand requestCommand = new TaskExecuteRequestCommand(context);
Command command = requestCommand.convert2Command();
- ExecutionContext executionContext = new ExecutionContext(command, ExecutorType.WORKER);
+ ExecutionContext executionContext = new ExecutionContext(command, ExecutorType.WORKER, taskInstance);
executionContext.setHost(Host.of(NetUtils.getAddr(port)));
return executionContext;
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java
index fdd79552b1..1ee890faf2 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java
@@ -70,7 +70,7 @@ public class NettyExecutorManagerTest {
.buildProcessInstanceRelatedInfo(processInstance)
.buildProcessDefinitionRelatedInfo(processDefinition)
.create();
- ExecutionContext executionContext = new ExecutionContext(toCommand(context), ExecutorType.WORKER);
+ ExecutionContext executionContext = new ExecutionContext(toCommand(context), ExecutorType.WORKER, taskInstance);
executionContext.setHost(Host.of(NetUtils.getAddr(serverConfig.getListenPort())));
Boolean execute = nettyExecutorManager.execute(executionContext);
Assert.assertTrue(execute);
@@ -89,7 +89,7 @@ public class NettyExecutorManagerTest {
.buildProcessInstanceRelatedInfo(processInstance)
.buildProcessDefinitionRelatedInfo(processDefinition)
.create();
- ExecutionContext executionContext = new ExecutionContext(toCommand(context), ExecutorType.WORKER);
+ ExecutionContext executionContext = new ExecutionContext(toCommand(context), ExecutorType.WORKER, taskInstance);
executionContext.setHost(Host.of(NetUtils.getAddr(4444)));
nettyExecutorManager.execute(executionContext);
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java
index 98bf514730..44e5a382f6 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java
@@ -26,13 +26,14 @@ import static org.mockito.Mockito.doNothing;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.NodeType;
-import org.apache.dolphinscheduler.server.master.event.StateEvent;
import org.apache.dolphinscheduler.common.model.Server;
+import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+import org.apache.dolphinscheduler.server.master.event.StateEvent;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
@@ -98,9 +99,17 @@ public class FailoverServiceTest {
springApplicationContext.setApplicationContext(applicationContext);
given(masterConfig.getListenPort()).willReturn(masterPort);
- failoverService = new FailoverService(registryClient, masterConfig, processService, workflowExecuteThreadPool, cacheManager);
+ MasterFailoverService masterFailoverService =
+ new MasterFailoverService(registryClient, masterConfig, processService);
+ WorkerFailoverService workerFailoverService = new WorkerFailoverService(registryClient,
+ masterConfig,
+ processService,
+ workflowExecuteThreadPool,
+ cacheManager);
+
+ failoverService = new FailoverService(masterFailoverService, workerFailoverService);
- testMasterHost = failoverService.getLocalAddress();
+ testMasterHost = NetUtils.getAddr(masterConfig.getListenPort());
String ip = testMasterHost.split(":")[0];
int port = Integer.valueOf(testMasterHost.split(":")[1]);
Assert.assertEquals(masterPort, port);
@@ -118,6 +127,7 @@ public class FailoverServiceTest {
processInstance = new ProcessInstance();
processInstance.setId(1);
processInstance.setHost(testMasterHost);
+ processInstance.setStartTime(new Date());
processInstance.setRestartTime(new Date());
processInstance.setHistoryCmd("xxx");
processInstance.setCommandType(CommandType.STOP);
@@ -154,16 +164,10 @@ public class FailoverServiceTest {
given(registryClient.getServerList(NodeType.WORKER)).willReturn(new ArrayList<>(Arrays.asList(workerServer)));
given(registryClient.getServerList(NodeType.MASTER)).willReturn(new ArrayList<>(Arrays.asList(masterServer)));
- ReflectionTestUtils.setField(failoverService, "registryClient", registryClient);
doNothing().when(workflowExecuteThreadPool).submitStateEvent(Mockito.any(StateEvent.class));
}
- @Test
- public void checkMasterFailoverTest() {
- failoverService.checkMasterFailover();
- }
-
@Test
public void failoverMasterTest() {
processInstance.setHost(Constants.NULL);
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index 4bfb0b4809..f61676e693 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
+import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.DagData;
import org.apache.dolphinscheduler.dao.entity.DataSource;
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index c527276933..5624f47812 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -126,7 +126,6 @@ import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
import org.apache.dolphinscheduler.remote.command.TaskEventChangeCommand;
import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
import org.apache.dolphinscheduler.remote.utils.Host;
-import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.exceptions.ServiceException;
import org.apache.dolphinscheduler.service.log.LogClientService;
import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
@@ -266,10 +265,12 @@ public class ProcessServiceImpl implements ProcessService {
@Autowired
private TaskPluginManager taskPluginManager;
+ @Autowired
+ private ProcessService processService;
+
/**
* handle Command (construct ProcessInstance from Command) , wrapped in transaction
*
- * @param logger logger
* @param host host
* @param command found command
* @return process instance
@@ -904,7 +905,8 @@ public class ProcessServiceImpl implements ProcessService {
ProcessDefinition processDefinition;
CommandType commandType = command.getCommandType();
- processDefinition = this.findProcessDefinition(command.getProcessDefinitionCode(), command.getProcessDefinitionVersion());
+ processDefinition = this.findProcessDefinition(command.getProcessDefinitionCode(),
+ command.getProcessDefinitionVersion());
if (processDefinition == null) {
logger.error("cannot find the work process define! define code : {}", command.getProcessDefinitionCode());
return null;
@@ -1003,6 +1005,7 @@ public class ProcessServiceImpl implements ProcessService {
case RECOVER_TOLERANCE_FAULT_PROCESS:
// recover tolerance fault process
processInstance.setRecovery(Flag.YES);
+ processInstance.setRunTimes(runTime + 1);
runStatus = processInstance.getState();
break;
case COMPLEMENT_DATA:
@@ -1241,11 +1244,15 @@ public class ProcessServiceImpl implements ProcessService {
while (retryTimes <= commitRetryTimes) {
try {
// submit task to db
- task = SpringApplicationContext.getBean(ProcessService.class).submitTask(processInstance, taskInstance);
+ // Only want to use transaction here
+ task = processService.submitTask(processInstance, taskInstance);
if (task != null && task.getId() != 0) {
break;
}
- logger.error("task commit to db failed , taskId {} has already retry {} times, please check the database", taskInstance.getId(), retryTimes);
+ logger.error(
+ "task commit to db failed , taskId {} has already retry {} times, please check the database",
+ taskInstance.getId(),
+ retryTimes);
Thread.sleep(commitInterval);
} catch (Exception e) {
logger.error("task commit to db failed", e);
@@ -1267,13 +1274,17 @@ public class ProcessServiceImpl implements ProcessService {
@Override
@Transactional(rollbackFor = Exception.class)
public TaskInstance submitTask(ProcessInstance processInstance, TaskInstance taskInstance) {
- logger.info("start submit task : {}, processInstance id:{}, state: {}",
- taskInstance.getName(), taskInstance.getProcessInstanceId(), processInstance.getState());
+ logger.info("Start save taskInstance to database : {}, processInstance id:{}, state: {}",
+ taskInstance.getName(),
+ taskInstance.getProcessInstanceId(),
+ processInstance.getState());
//submit to db
TaskInstance task = submitTaskInstanceToDB(taskInstance, processInstance);
if (task == null) {
- logger.error("end submit task to db error, task name:{}, process id:{} state: {} ",
- taskInstance.getName(), taskInstance.getProcessInstance(), processInstance.getState());
+ logger.error("Save taskInstance to db error, task name:{}, process id:{} state: {} ",
+ taskInstance.getName(),
+ taskInstance.getProcessInstance(),
+ processInstance.getState());
return null;
}
@@ -1281,8 +1292,13 @@ public class ProcessServiceImpl implements ProcessService {
createSubWorkProcess(processInstance, task);
}
- logger.info("end submit task to db successfully:{} {} state:{} complete, instance id:{} state: {} ",
- taskInstance.getId(), taskInstance.getName(), task.getState(), processInstance.getId(), processInstance.getState());
+ logger.info(
+ "End save taskInstance to db successfully:{}, taskInstanceName: {}, taskInstance state:{}, processInstanceId:{}, processInstanceState: {}",
+ taskInstance.getId(),
+ taskInstance.getName(),
+ task.getState(),
+ processInstance.getId(),
+ processInstance.getState());
return task;
}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriority.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriority.java
index 0aeec4609d..823ec81d3c 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriority.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriority.java
@@ -211,16 +211,45 @@ public class TaskPriority implements Comparable<TaskPriority> {
}
TaskPriority that = (TaskPriority) o;
return processInstancePriority == that.processInstancePriority
- && processInstanceId == that.processInstanceId
- && taskInstancePriority == that.taskInstancePriority
- && taskId == that.taskId
- && taskGroupPriority == that.taskGroupPriority
- && Objects.equals(groupName, that.groupName);
+ && processInstanceId == that.processInstanceId
+ && taskInstancePriority == that.taskInstancePriority
+ && taskId == that.taskId
+ && taskGroupPriority == that.taskGroupPriority
+ && Objects.equals(groupName, that.groupName);
}
@Override
public int hashCode() {
- return Objects.hash(processInstancePriority, processInstanceId, taskInstancePriority, taskId, taskGroupPriority, groupName);
+ return Objects.hash(processInstancePriority,
+ processInstanceId,
+ taskInstancePriority,
+ taskId,
+ taskGroupPriority,
+ groupName);
}
+ @Override
+ public String toString() {
+ return "TaskPriority{"
+ + "processInstancePriority="
+ + processInstancePriority
+ + ", processInstanceId="
+ + processInstanceId
+ + ", taskInstancePriority="
+ + taskInstancePriority
+ + ", taskId="
+ + taskId
+ + ", taskExecutionContext="
+ + taskExecutionContext
+ + ", groupName='"
+ + groupName
+ + '\''
+ + ", context="
+ + context
+ + ", checkpoint="
+ + checkpoint
+ + ", taskGroupPriority="
+ + taskGroupPriority
+ + '}';
+ }
}