You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by we...@apache.org on 2022/06/28 08:08:41 UTC
[dolphinscheduler] branch dev updated: [Bug] [Master] Worker failover will cause task cannot be failover (#10631)
This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 66624c5c86 [Bug] [Master] Worker failover will cause task cannot be failover (#10631)
66624c5c86 is described below
commit 66624c5c86dd0bb2ae658eaf68691e03c86ae5e0
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Tue Jun 28 16:08:35 2022 +0800
[Bug] [Master] Worker failover will cause task cannot be failover (#10631)
* fix worker failover may lose event
---
.../dolphinscheduler/common/enums/StateEvent.java | 77 +---
.../dolphinscheduler/common/utils/OSUtils.java | 21 +-
.../cache/ProcessInstanceExecCacheManager.java | 4 +-
.../impl/ProcessInstanceExecCacheManagerImpl.java | 7 +-
.../master/consumer/TaskPriorityQueueConsumer.java | 8 +-
.../master/metrics/ProcessInstanceMetrics.java | 49 ++-
.../master/runner/MasterSchedulerService.java | 73 ++--
.../master/runner/StateWheelExecuteThread.java | 14 +-
.../master/runner/WorkflowExecuteRunnable.java | 27 +-
.../master/runner/WorkflowExecuteThreadPool.java | 12 +-
.../master/runner/task/BaseTaskProcessor.java | 21 +-
.../server/master/runner/task/TaskInstanceKey.java | 62 +---
.../server/master/service/FailoverService.java | 104 +++---
.../src/main/resources/application.yaml | 2 +-
.../ProcessInstanceExecCacheManagerImplTest.java | 2 +-
.../master/dispatch/ExecutionContextTestUtils.java | 3 +-
.../executor/NettyExecutorManagerTest.java | 2 +-
.../server/master/service/FailoverServiceTest.java | 21 +-
.../dolphinscheduler/meter/MeterConfiguration.java | 4 -
.../dolphinscheduler/remote/command/Command.java | 3 +
.../remote/command/TaskExecuteRequestCommand.java | 42 +--
.../server/utils/ProcessUtils.java | 8 +-
.../service/process/ProcessService.java | 8 +-
.../service/process/ProcessServiceImpl.java | 23 +-
.../service/process/ProcessServiceTest.java | 53 +--
.../plugin/task/api/TaskExecutionContext.java | 409 +--------------------
.../plugin/task/api/enums/ExecutionStatus.java | 23 ++
.../server/worker/metrics/WorkerServerMetrics.java | 25 +-
.../worker/processor/TaskCallbackService.java | 26 +-
.../worker/processor/TaskExecuteProcessor.java | 3 +-
.../worker/runner/RetryReportTaskStatusThread.java | 86 +++--
.../server/worker/runner/TaskExecuteThread.java | 3 +-
.../worker/processor/TaskExecuteProcessorTest.java | 4 +-
33 files changed, 393 insertions(+), 836 deletions(-)
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEvent.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEvent.java
index 405df09d3e..7f4be924dd 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEvent.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEvent.java
@@ -20,10 +20,12 @@ package org.apache.dolphinscheduler.common.enums;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import io.netty.channel.Channel;
+import lombok.Data;
/**
* state event
*/
+@Data
public class StateEvent {
/**
@@ -45,79 +47,4 @@ public class StateEvent {
private Channel channel;
- public ExecutionStatus getExecutionStatus() {
- return executionStatus;
- }
-
- public void setExecutionStatus(ExecutionStatus executionStatus) {
- this.executionStatus = executionStatus;
- }
-
- public int getTaskInstanceId() {
- return taskInstanceId;
- }
-
- public long getTaskCode() {
- return taskCode;
- }
-
- public int getProcessInstanceId() {
- return processInstanceId;
- }
-
- public void setProcessInstanceId(int processInstanceId) {
- this.processInstanceId = processInstanceId;
- }
-
- public String getContext() {
- return context;
- }
-
- public void setContext(String context) {
- this.context = context;
- }
-
- public void setTaskInstanceId(int taskInstanceId) {
- this.taskInstanceId = taskInstanceId;
- }
-
- public void setTaskCode(long taskCode) {
- this.taskCode = taskCode;
- }
-
- public Channel getChannel() {
- return channel;
- }
-
- public void setChannel(Channel channel) {
- this.channel = channel;
- }
-
- @Override
- public String toString() {
- return "State Event :"
- + "key: " + key
- + " type: " + type
- + " executeStatus: " + executionStatus
- + " task instance id: " + taskInstanceId
- + " process instance id: " + processInstanceId
- + " context: " + context
- ;
- }
-
- public String getKey() {
- return key;
- }
-
- public void setKey(String key) {
- this.key = key;
- }
-
- public void setType(StateEventType type) {
- this.type = type;
- }
-
- public StateEventType getType() {
- return this.type;
- }
}
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/OSUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/OSUtils.java
index 723a6639c3..a072d5a232 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/OSUtils.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/OSUtils.java
@@ -19,14 +19,14 @@ package org.apache.dolphinscheduler.common.utils;
import org.apache.dolphinscheduler.common.shell.ShellExecutor;
-import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang.SystemUtils;
+import org.apache.commons.lang3.StringUtils;
import java.io.BufferedReader;
+import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
-import java.io.File;
import java.lang.management.ManagementFactory;
import java.lang.management.OperatingSystemMXBean;
import java.lang.management.RuntimeMXBean;
@@ -460,24 +460,23 @@ public class OSUtils {
}
/**
- * check memory and cpu usage
+ * Check memory and cpu usage is overload the given thredshod.
*
- * @param maxCpuloadAvg maxCpuloadAvg
+ * @param maxCpuLoadAvg maxCpuLoadAvg
* @param reservedMemory reservedMemory
- * @return check memory and cpu usage
+ * @return True, if the cpu or memory exceed the given thredshod.
*/
- public static Boolean checkResource(double maxCpuloadAvg, double reservedMemory) {
+ public static Boolean isOverload(double maxCpuLoadAvg, double reservedMemory) {
// system load average
double loadAverage = loadAverage();
// system available physical memory
double availablePhysicalMemorySize = availablePhysicalMemorySize();
- if (loadAverage > maxCpuloadAvg || availablePhysicalMemorySize < reservedMemory) {
- logger.warn("current cpu load average {} is too high or available memory {}G is too low, under max.cpuload.avg={} and reserved.memory={}G",
- loadAverage, availablePhysicalMemorySize, maxCpuloadAvg, reservedMemory);
- return false;
- } else {
+ if (loadAverage > maxCpuLoadAvg || availablePhysicalMemorySize < reservedMemory) {
+ logger.warn("Current cpu load average {} is too high or available memory {}G is too low, under max.cpuLoad.avg={} and reserved.memory={}G",
+ loadAverage, availablePhysicalMemorySize, maxCpuLoadAvg, reservedMemory);
return true;
}
+ return false;
}
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cache/ProcessInstanceExecCacheManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cache/ProcessInstanceExecCacheManager.java
index 2f5f6dc472..59e064105a 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cache/ProcessInstanceExecCacheManager.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cache/ProcessInstanceExecCacheManager.java
@@ -17,6 +17,8 @@
package org.apache.dolphinscheduler.server.master.cache;
+import lombok.NonNull;
+
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import java.util.Collection;
@@ -55,7 +57,7 @@ public interface ProcessInstanceExecCacheManager {
* @param processInstanceId processInstanceId
* @param workflowExecuteThread if it is null, will not be cached
*/
- void cache(int processInstanceId, WorkflowExecuteRunnable workflowExecuteThread);
+ void cache(int processInstanceId, @NonNull WorkflowExecuteRunnable workflowExecuteThread);
/**
* get all WorkflowExecuteThread from cache
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/ProcessInstanceExecCacheManagerImpl.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/ProcessInstanceExecCacheManagerImpl.java
index dc562d37bd..8f00029a3e 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/ProcessInstanceExecCacheManagerImpl.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/ProcessInstanceExecCacheManagerImpl.java
@@ -30,6 +30,8 @@ import org.springframework.stereotype.Component;
import com.google.common.collect.ImmutableList;
+import lombok.NonNull;
+
/**
* cache of process instance id and WorkflowExecuteThread
*/
@@ -59,10 +61,7 @@ public class ProcessInstanceExecCacheManagerImpl implements ProcessInstanceExecC
}
@Override
- public void cache(int processInstanceId, WorkflowExecuteRunnable workflowExecuteThread) {
- if (workflowExecuteThread == null) {
- return;
- }
+ public void cache(int processInstanceId, @NonNull WorkflowExecuteRunnable workflowExecuteThread) {
processInstanceExecMaps.put(processInstanceId, workflowExecuteThread);
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
index eab33a07a6..e8cf4b2919 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
@@ -131,9 +131,8 @@ public class TaskPriorityQueueConsumer extends BaseDaemonThread {
for (TaskPriority dispatchFailedTask : failedDispatchTasks) {
taskPriorityQueue.put(dispatchFailedTask);
}
- // If there are tasks in a cycle that cannot find the worker group,
- // sleep for 1 second
- if (taskPriorityQueue.size() <= failedDispatchTasks.size()) {
+ // If the all task dispatch failed, will sleep for 1s to avoid the master cpu higher.
+ if (fetchTaskNum == failedDispatchTasks.size()) {
TimeUnit.MILLISECONDS.sleep(Constants.SLEEP_TIME_MILLIS);
}
}
@@ -218,8 +217,7 @@ public class TaskPriorityQueueConsumer extends BaseDaemonThread {
}
private Command toCommand(TaskExecutionContext taskExecutionContext) {
- TaskExecuteRequestCommand requestCommand = new TaskExecuteRequestCommand();
- requestCommand.setTaskExecutionContext(JSONUtils.toJsonString(taskExecutionContext));
+ TaskExecuteRequestCommand requestCommand = new TaskExecuteRequestCommand(taskExecutionContext);
return requestCommand.convert2Command();
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/ProcessInstanceMetrics.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/ProcessInstanceMetrics.java
index 1693972dac..ad0e479e5b 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/ProcessInstanceMetrics.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/ProcessInstanceMetrics.java
@@ -17,11 +17,13 @@
package org.apache.dolphinscheduler.server.master.metrics;
+import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.Metrics;
+import io.micrometer.core.instrument.Timer;
public final class ProcessInstanceMetrics {
@@ -29,15 +31,25 @@ public final class ProcessInstanceMetrics {
throw new UnsupportedOperationException("Utility class");
}
+ private static final Timer COMMAND_QUERY_TIMETER =
+ Timer.builder("ds.workflow.command.query.duration")
+ .description("Command query duration")
+ .register(Metrics.globalRegistry);
+
+ private static final Timer PROCESS_INSTANCE_GENERATE_TIMER =
+ Timer.builder("ds.workflow.instance.generate.duration")
+ .description("Process instance generated duration")
+ .register(Metrics.globalRegistry);
+
private static final Counter PROCESS_INSTANCE_SUBMIT_COUNTER =
- Counter.builder("ds.workflow.instance.submit.count")
- .description("Process instance submit total count")
- .register(Metrics.globalRegistry);
+ Counter.builder("ds.workflow.instance.submit.count")
+ .description("Process instance submit total count")
+ .register(Metrics.globalRegistry);
private static final Counter PROCESS_INSTANCE_TIMEOUT_COUNTER =
- Counter.builder("ds.workflow.instance.timeout.count")
- .description("Process instance timeout total count")
- .register(Metrics.globalRegistry);
+ Counter.builder("ds.workflow.instance.timeout.count")
+ .description("Process instance timeout total count")
+ .register(Metrics.globalRegistry);
private static final Counter PROCESS_INSTANCE_FINISH_COUNTER =
Counter.builder("ds.workflow.instance.finish.count")
@@ -55,19 +67,27 @@ public final class ProcessInstanceMetrics {
.register(Metrics.globalRegistry);
private static final Counter PROCESS_INSTANCE_STOP_COUNTER =
- Counter.builder("ds.workflow.instance.stop.count")
- .description("Process instance stop total count")
- .register(Metrics.globalRegistry);
+ Counter.builder("ds.workflow.instance.stop.count")
+ .description("Process instance stop total count")
+ .register(Metrics.globalRegistry);
private static final Counter PROCESS_INSTANCE_FAILOVER_COUNTER =
- Counter.builder("ds.workflow.instance.failover.count")
- .description("Process instance failover total count")
- .register(Metrics.globalRegistry);
+ Counter.builder("ds.workflow.instance.failover.count")
+ .description("Process instance failover total count")
+ .register(Metrics.globalRegistry);
+
+ public static void recordCommandQueryTime(long milliseconds) {
+ COMMAND_QUERY_TIMETER.record(milliseconds, TimeUnit.MILLISECONDS);
+ }
+
+ public static void recordProcessInstanceGenerateTime(long milliseconds) {
+ PROCESS_INSTANCE_GENERATE_TIMER.record(milliseconds, TimeUnit.MILLISECONDS);
+ }
public static synchronized void registerProcessInstanceRunningGauge(Supplier<Number> function) {
Gauge.builder("ds.workflow.instance.running", function)
- .description("The current running process instance count")
- .register(Metrics.globalRegistry);
+ .description("The current running process instance count")
+ .register(Metrics.globalRegistry);
}
public static void incProcessInstanceSubmit() {
@@ -97,5 +117,4 @@ public final class ProcessInstanceMetrics {
public static void incProcessInstanceFailover() {
PROCESS_INSTANCE_FAILOVER_COUNTER.increment();
}
-
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
index 3f5a715d7c..bce8753781 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
@@ -34,6 +34,7 @@ import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheM
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
import org.apache.dolphinscheduler.server.master.metrics.MasterServerMetrics;
+import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics;
import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
import org.apache.dolphinscheduler.service.process.ProcessService;
@@ -62,31 +63,19 @@ public class MasterSchedulerService extends BaseDaemonThread {
*/
private static final Logger logger = LoggerFactory.getLogger(MasterSchedulerService.class);
- /**
- * dolphinscheduler database interface
- */
@Autowired
private ProcessService processService;
- /**
- * master config
- */
@Autowired
private MasterConfig masterConfig;
- /**
- * alert manager
- */
@Autowired
private ProcessAlertManager processAlertManager;
- /**
- * netty remoting client
- */
private NettyRemotingClient nettyRemotingClient;
@Autowired
- NettyExecutorManager nettyExecutorManager;
+ private NettyExecutorManager nettyExecutorManager;
/**
* master prepare exec service
@@ -108,6 +97,8 @@ public class MasterSchedulerService extends BaseDaemonThread {
@Autowired
private CuringGlobalParamsService curingGlobalParamsService;
+ private String masterAddress;
+
protected MasterSchedulerService() {
super("MasterCommandLoopThread");
}
@@ -119,6 +110,7 @@ public class MasterSchedulerService extends BaseDaemonThread {
this.masterPrepareExecService = (ThreadPoolExecutor) ThreadUtils.newDaemonFixedThreadExecutor("MasterPreExecThread", masterConfig.getPreExecThreads());
NettyClientConfig clientConfig = new NettyClientConfig();
this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
+ this.masterAddress = NetUtils.getAddr(masterConfig.getListenPort());
}
@Override
@@ -142,13 +134,13 @@ public class MasterSchedulerService extends BaseDaemonThread {
public void run() {
while (Stopper.isRunning()) {
try {
- boolean runCheckFlag = OSUtils.checkResource(masterConfig.getMaxCpuLoadAvg(), masterConfig.getReservedMemory());
- if (!runCheckFlag) {
+ boolean isOverload = OSUtils.isOverload(masterConfig.getMaxCpuLoadAvg(), masterConfig.getReservedMemory());
+ if (isOverload) {
MasterServerMetrics.incMasterOverload();
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
continue;
}
- scheduleProcess();
+ scheduleWorkflow();
} catch (InterruptedException interruptedException) {
logger.warn("Master schedule service interrupted, close the loop", interruptedException);
Thread.currentThread().interrupt();
@@ -160,13 +152,12 @@ public class MasterSchedulerService extends BaseDaemonThread {
}
/**
- * 1. get command by slot
- * 2. donot handle command if slot is empty
+ * Query command from database by slot, and transform to workflow instance, then submit to workflowExecuteThreadPool.
*/
- private void scheduleProcess() throws InterruptedException {
+ private void scheduleWorkflow() throws InterruptedException {
List<Command> commands = findCommands();
if (CollectionUtils.isEmpty(commands)) {
- //indicate that no command ,sleep for 1s
+ // indicate that no command ,sleep for 1s
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
return;
}
@@ -181,7 +172,7 @@ public class MasterSchedulerService extends BaseDaemonThread {
try {
LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
logger.info("Master schedule service starting workflow instance");
- WorkflowExecuteRunnable workflowExecuteRunnable = new WorkflowExecuteRunnable(
+ final WorkflowExecuteRunnable workflowExecuteRunnable = new WorkflowExecuteRunnable(
processInstance
, processService
, nettyExecutorManager
@@ -194,9 +185,14 @@ public class MasterSchedulerService extends BaseDaemonThread {
if (processInstance.getTimeout() > 0) {
stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance);
}
- workflowExecuteThreadPool.startWorkflow(workflowExecuteRunnable);
+ ProcessInstanceMetrics.incProcessInstanceSubmit();
+ workflowExecuteThreadPool.submit(workflowExecuteRunnable);
logger.info("Master schedule service started workflow instance");
+ } catch (Exception ex) {
+ processInstanceExecCacheManager.removeByProcessInstanceId(processInstance.getId());
+ stateWheelExecuteThread.removeProcess4TimeoutCheck(processInstance.getId());
+ logger.info("Master submit workflow to thread pool failed, will remove workflow runnable from cache manager", ex);
} finally {
LoggerUtils.removeWorkflowInstanceIdMDC();
}
@@ -204,21 +200,21 @@ public class MasterSchedulerService extends BaseDaemonThread {
}
private List<ProcessInstance> command2ProcessInstance(List<Command> commands) throws InterruptedException {
+ long commandTransformStartTime = System.currentTimeMillis();
logger.info("Master schedule service transforming command to ProcessInstance, commandSize: {}", commands.size());
List<ProcessInstance> processInstances = Collections.synchronizedList(new ArrayList<>(commands.size()));
CountDownLatch latch = new CountDownLatch(commands.size());
for (final Command command : commands) {
masterPrepareExecService.execute(() -> {
try {
+ // todo: this check is not safe, the slot may change after command transform.
// slot check again
SlotCheckState slotCheckState = slotCheck(command);
if (slotCheckState.equals(SlotCheckState.CHANGE) || slotCheckState.equals(SlotCheckState.INJECT)) {
logger.info("Master handle command {} skip, slot check state: {}", command.getId(), slotCheckState);
return;
}
- ProcessInstance processInstance = processService.handleCommand(logger,
- getLocalAddress(),
- command);
+ ProcessInstance processInstance = processService.handleCommand(masterAddress, command);
if (processInstance != null) {
processInstances.add(processInstance);
logger.info("Master handle command {} end, create process instance {}", command.getId(), processInstance.getId());
@@ -236,24 +232,26 @@ public class MasterSchedulerService extends BaseDaemonThread {
latch.await();
logger.info("Master schedule service transformed command to ProcessInstance, commandSize: {}, processInstanceSize: {}",
commands.size(), processInstances.size());
+ ProcessInstanceMetrics.recordProcessInstanceGenerateTime(System.currentTimeMillis() - commandTransformStartTime);
return processInstances;
}
private List<Command> findCommands() {
+ long scheduleStartTime = System.currentTimeMillis();
+ int thisMasterSlot = ServerNodeManager.getSlot();
+ int masterCount = ServerNodeManager.getMasterSize();
+ if (masterCount <= 0) {
+ logger.warn("Master count: {} is invalid, the current slot: {}", masterCount, thisMasterSlot);
+ return Collections.emptyList();
+ }
int pageNumber = 0;
int pageSize = masterConfig.getFetchCommandNum();
- List<Command> result = new ArrayList<>();
- if (Stopper.isRunning()) {
- int thisMasterSlot = ServerNodeManager.getSlot();
- int masterCount = ServerNodeManager.getMasterSize();
- if (masterCount > 0) {
- result = processService.findCommandPageBySlot(pageSize, pageNumber, masterCount, thisMasterSlot);
- if (CollectionUtils.isNotEmpty(result)) {
- logger.info("Master schedule service loop command success, command size: {}, current slot: {}, total slot size: {}",
- result.size(), thisMasterSlot, masterCount);
- }
- }
+ final List<Command> result = processService.findCommandPageBySlot(pageSize, pageNumber, masterCount, thisMasterSlot);
+ if (CollectionUtils.isNotEmpty(result)) {
+ logger.info("Master schedule service loop command success, command size: {}, current slot: {}, total slot size: {}",
+ result.size(), thisMasterSlot, masterCount);
}
+ ProcessInstanceMetrics.recordCommandQueryTime(System.currentTimeMillis() - scheduleStartTime);
return result;
}
@@ -271,7 +269,4 @@ public class MasterSchedulerService extends BaseDaemonThread {
return state;
}
- private String getLocalAddress() {
- return NetUtils.getAddr(masterConfig.getListenPort());
- }
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
index 53acb4ce83..65c7db924d 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
@@ -59,24 +59,24 @@ public class StateWheelExecuteThread extends BaseDaemonThread {
private static final Logger logger = LoggerFactory.getLogger(StateWheelExecuteThread.class);
/**
- * process timeout check list
+ * ProcessInstance timeout check list, element is the processInstanceId.
*/
- private ConcurrentLinkedQueue<Integer> processInstanceTimeoutCheckList = new ConcurrentLinkedQueue<>();
+ private final ConcurrentLinkedQueue<Integer> processInstanceTimeoutCheckList = new ConcurrentLinkedQueue<>();
/**
* task time out check list
*/
- private ConcurrentLinkedQueue<TaskInstanceKey> taskInstanceTimeoutCheckList = new ConcurrentLinkedQueue<>();
+ private final ConcurrentLinkedQueue<TaskInstanceKey> taskInstanceTimeoutCheckList = new ConcurrentLinkedQueue<>();
/**
* task retry check list
*/
- private ConcurrentLinkedQueue<TaskInstanceKey> taskInstanceRetryCheckList = new ConcurrentLinkedQueue<>();
+ private final ConcurrentLinkedQueue<TaskInstanceKey> taskInstanceRetryCheckList = new ConcurrentLinkedQueue<>();
/**
* task state check list
*/
- private ConcurrentLinkedQueue<TaskInstanceKey> taskInstanceStateCheckList = new ConcurrentLinkedQueue<>();
+ private final ConcurrentLinkedQueue<TaskInstanceKey> taskInstanceStateCheckList = new ConcurrentLinkedQueue<>();
@Autowired
private MasterConfig masterConfig;
@@ -116,8 +116,8 @@ public class StateWheelExecuteThread extends BaseDaemonThread {
logger.info("Success add workflow instance into timeout check list");
}
- public void removeProcess4TimeoutCheck(ProcessInstance processInstance) {
- boolean removeFlag = processInstanceTimeoutCheckList.remove(processInstance.getId());
+ public void removeProcess4TimeoutCheck(int processInstanceId) {
+ boolean removeFlag = processInstanceTimeoutCheckList.remove(processInstanceId);
if (removeFlag) {
logger.info("Success remove workflow instance from timeout check list");
} else {
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index 9fa452c66e..42654665a2 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -435,6 +435,7 @@ public class WorkflowExecuteRunnable implements Runnable {
if (task.getState().typeIsFinished()) {
if (completeTaskMap.containsKey(task.getTaskCode()) && completeTaskMap.get(task.getTaskCode()) == task.getId()) {
+ logger.warn("The task instance is already complete, stateEvent: {}", stateEvent);
return true;
}
taskFinished(task);
@@ -461,11 +462,9 @@ public class WorkflowExecuteRunnable implements Runnable {
}
private void taskFinished(TaskInstance taskInstance) {
- logger.info("work flow {} task id:{} code:{} state:{} ",
- processInstance.getId(),
- taskInstance.getId(),
- taskInstance.getTaskCode(),
- taskInstance.getState());
+ logger.info("TaskInstance finished task code:{} state:{} ",
+ taskInstance.getTaskCode(),
+ taskInstance.getState());
activeTaskProcessorMaps.remove(taskInstance.getTaskCode());
stateWheelExecuteThread.removeTask4TimeoutCheck(processInstance, taskInstance);
@@ -481,12 +480,13 @@ public class WorkflowExecuteRunnable implements Runnable {
}
} else if (taskInstance.taskCanRetry() && processInstance.getState() != ExecutionStatus.READY_STOP) {
// retry task
+ logger.info("Retry taskInstance taskInstance state: {}", taskInstance.getState());
retryTaskInstance(taskInstance);
} else if (taskInstance.getState().typeIsFailure()) {
completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
// There are child nodes and the failure policy is: CONTINUE
- if (DagHelper.haveAllNodeAfterNode(Long.toString(taskInstance.getTaskCode()), dag)
- && processInstance.getFailureStrategy() == FailureStrategy.CONTINUE) {
+ if (processInstance.getFailureStrategy() == FailureStrategy.CONTINUE
+ && DagHelper.haveAllNodeAfterNode(Long.toString(taskInstance.getTaskCode()), dag)) {
submitPostNode(Long.toString(taskInstance.getTaskCode()));
} else {
errorTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
@@ -495,6 +495,7 @@ public class WorkflowExecuteRunnable implements Runnable {
}
}
} else if (taskInstance.getState().typeIsFinished()) {
+ // todo: when the task instance type is pause, then it should not in completeTaskMap
completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
}
@@ -536,7 +537,7 @@ public class WorkflowExecuteRunnable implements Runnable {
}
TaskInstance newTaskInstance = cloneRetryTaskInstance(taskInstance);
if (newTaskInstance == null) {
- logger.error("retry fail, new taskInstancce is null, task code:{}, task id:{}", taskInstance.getTaskCode(), taskInstance.getId());
+ logger.error("retry fail, new taskInstance is null, task code:{}, task id:{}", taskInstance.getTaskCode(), taskInstance.getId());
return;
}
waitToRetryTaskInstanceMap.put(newTaskInstance.getTaskCode(), newTaskInstance);
@@ -634,7 +635,7 @@ public class WorkflowExecuteRunnable implements Runnable {
* check if task instance exist by task code
*/
public boolean checkTaskInstanceByCode(long taskCode) {
- if (taskInstanceMap == null || taskInstanceMap.size() == 0) {
+ if (taskInstanceMap.isEmpty()) {
return false;
}
for (TaskInstance taskInstance : taskInstanceMap.values()) {
@@ -649,7 +650,7 @@ public class WorkflowExecuteRunnable implements Runnable {
* check if task instance exist by id
*/
public boolean checkTaskInstanceById(int taskInstanceId) {
- if (taskInstanceMap == null || taskInstanceMap.size() == 0) {
+ if (taskInstanceMap.isEmpty()) {
return false;
}
return taskInstanceMap.containsKey(taskInstanceId);
@@ -697,7 +698,7 @@ public class WorkflowExecuteRunnable implements Runnable {
if (stateEvent.getExecutionStatus() == ExecutionStatus.STOP) {
// serial wait execution type needs to wake up the waiting process
- if (processDefinition.getExecutionType().typeIsSerialWait() || processDefinition.getExecutionType().typeIsSerialPriority()){
+ if (processDefinition.getExecutionType().typeIsSerialWait() || processDefinition.getExecutionType().typeIsSerialPriority()) {
endProcess();
return true;
}
@@ -1296,6 +1297,10 @@ public class WorkflowExecuteRunnable implements Runnable {
}
}
+ public Collection<TaskInstance> getAllTaskInstances() {
+ return taskInstanceMap.values();
+ }
+
private void setVarPoolValue(Map<String, Property> allProperty, Map<String, TaskInstance> allTaskInstance, TaskInstance preTaskInstance, Property thisProperty) {
//for this taskInstance all the param in this part is IN.
thisProperty.setDirect(Direct.IN);
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
index eb8bcd94bb..4085e99051 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
@@ -51,7 +51,7 @@ import com.google.common.base.Strings;
import lombok.NonNull;
/**
- * Used to execute {@link WorkflowExecuteRunnable}, when
+ * Used to execute {@link WorkflowExecuteRunnable}.
*/
@Component
public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
@@ -99,14 +99,6 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
logger.info("Submit state event success, stateEvent: {}", stateEvent);
}
- /**
- * Start the given workflow.
- */
- public void startWorkflow(WorkflowExecuteRunnable workflowExecuteThread) {
- ProcessInstanceMetrics.incProcessInstanceSubmit();
- submit(workflowExecuteThread);
- }
-
/**
* Handle the events belong to the given workflow.
*/
@@ -138,7 +130,7 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
try {
LoggerUtils.setWorkflowInstanceIdMDC(workflowExecuteThread.getProcessInstance().getId());
if (workflowExecuteThread.workFlowFinish()) {
- stateWheelExecuteThread.removeProcess4TimeoutCheck(workflowExecuteThread.getProcessInstance());
+ stateWheelExecuteThread.removeProcess4TimeoutCheck(workflowExecuteThread.getProcessInstance().getId());
processInstanceExecCacheManager.removeByProcessInstanceId(processInstanceId);
notifyProcessChanged(workflowExecuteThread.getProcessInstance());
logger.info("Workflow instance is finished.");
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
index 329f72d1b1..16aa9d9957 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
@@ -96,6 +96,8 @@ import org.slf4j.LoggerFactory;
import com.zaxxer.hikari.HikariDataSource;
+import lombok.NonNull;
+
public abstract class BaseTaskProcessor implements ITaskProcessor {
protected final Logger logger = LoggerFactory.getLogger(String.format(TaskConstants.TASK_LOG_LOGGER_NAME_FORMAT, getClass()));
@@ -114,22 +116,19 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
protected int commitInterval;
- protected ProcessService processService = SpringApplicationContext.getBean(ProcessService.class);
+ protected ProcessService processService;
- protected MasterConfig masterConfig = SpringApplicationContext.getBean(MasterConfig.class);
+ protected MasterConfig masterConfig;
- protected TaskPluginManager taskPluginManager = SpringApplicationContext.getBean(TaskPluginManager.class);
+ protected TaskPluginManager taskPluginManager;
protected String threadLoggerInfoName;
@Override
- public void init(TaskInstance taskInstance, ProcessInstance processInstance) {
- if (processService == null) {
- processService = SpringApplicationContext.getBean(ProcessService.class);
- }
- if (masterConfig == null) {
- masterConfig = SpringApplicationContext.getBean(MasterConfig.class);
- }
+ public void init(@NonNull TaskInstance taskInstance, @NonNull ProcessInstance processInstance) {
+ processService = SpringApplicationContext.getBean(ProcessService.class);
+ masterConfig = SpringApplicationContext.getBean(MasterConfig.class);
+ taskPluginManager = SpringApplicationContext.getBean(TaskPluginManager.class);
this.taskInstance = taskInstance;
this.processInstance = processInstance;
this.maxRetryTimes = masterConfig.getTaskCommitRetryTimes();
@@ -245,7 +244,7 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
@Override
public String getType() {
- return null;
+ throw new UnsupportedOperationException("This abstract class doesn's has type");
}
@Override
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskInstanceKey.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskInstanceKey.java
index 5b3d788c8c..d9ca82a45c 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskInstanceKey.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskInstanceKey.java
@@ -20,64 +20,22 @@ package org.apache.dolphinscheduler.server.master.runner.task;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import java.util.Objects;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NonNull;
/**
- * task instance key, processInstanceId
+ * Used to identify a task instance.
*/
+@Data
+@AllArgsConstructor
public class TaskInstanceKey {
- private int processInstanceId;
- private long taskCode;
- private int taskVersion;
+ private final int processInstanceId;
+ private final long taskCode;
+ private final int taskVersion;
- public TaskInstanceKey(int processInstanceId, long taskCode, int taskVersion) {
- this.processInstanceId = processInstanceId;
- this.taskCode = taskCode;
- this.taskVersion = taskVersion;
- }
-
- public int getProcessInstanceId() {
- return processInstanceId;
- }
-
- public long getTaskCode() {
- return taskCode;
- }
-
- public int getTaskVersion() {
- return taskVersion;
- }
-
- public static TaskInstanceKey getTaskInstanceKey(ProcessInstance processInstance, TaskInstance taskInstance) {
- if (processInstance == null || taskInstance == null) {
- return null;
- }
+ public static TaskInstanceKey getTaskInstanceKey(@NonNull ProcessInstance processInstance, @NonNull TaskInstance taskInstance) {
return new TaskInstanceKey(processInstance.getId(), taskInstance.getTaskCode(), taskInstance.getTaskDefinitionVersion());
}
- @Override
- public String toString() {
- return "TaskKey{"
- + "processInstanceId=" + processInstanceId
- + ", taskCode=" + taskCode
- + ", taskVersion=" + taskVersion
- + '}';
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- TaskInstanceKey taskInstanceKey = (TaskInstanceKey) o;
- return processInstanceId == taskInstanceKey.processInstanceId && taskCode == taskInstanceKey.taskCode && taskVersion == taskInstanceKey.taskVersion;
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(processInstanceId, taskCode, taskVersion);
- }
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java
index 32f6562ca0..b377ceedbf 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java
@@ -17,19 +17,19 @@
package org.apache.dolphinscheduler.server.master.service;
-import static com.google.common.base.Preconditions.checkNotNull;
-
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.enums.StateEvent;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.model.Server;
+import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
+import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics;
import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics;
@@ -49,6 +49,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -56,6 +57,7 @@ import org.springframework.stereotype.Component;
import io.micrometer.core.annotation.Counted;
import io.micrometer.core.annotation.Timed;
+import lombok.NonNull;
/**
* failover service
@@ -67,15 +69,20 @@ public class FailoverService {
private final MasterConfig masterConfig;
private final ProcessService processService;
private final WorkflowExecuteThreadPool workflowExecuteThreadPool;
-
- public FailoverService(RegistryClient registryClient,
- MasterConfig masterConfig,
- ProcessService processService,
- WorkflowExecuteThreadPool workflowExecuteThreadPool) {
- this.registryClient = checkNotNull(registryClient);
- this.masterConfig = checkNotNull(masterConfig);
- this.processService = checkNotNull(processService);
- this.workflowExecuteThreadPool = checkNotNull(workflowExecuteThreadPool);
+ private final ProcessInstanceExecCacheManager cacheManager;
+ private final String localAddress;
+
+ public FailoverService(@NonNull RegistryClient registryClient,
+ @NonNull MasterConfig masterConfig,
+ @NonNull ProcessService processService,
+ @NonNull WorkflowExecuteThreadPool workflowExecuteThreadPool,
+ @NonNull ProcessInstanceExecCacheManager cacheManager) {
+ this.registryClient = registryClient;
+ this.masterConfig = masterConfig;
+ this.processService = processService;
+ this.workflowExecuteThreadPool = workflowExecuteThreadPool;
+ this.cacheManager = cacheManager;
+ this.localAddress = NetUtils.getAddr(masterConfig.getListenPort());
}
/**
@@ -88,7 +95,7 @@ public class FailoverService {
if (CollectionUtils.isEmpty(hosts)) {
return;
}
- LOGGER.info("Master failover service {} begin to failover hosts:{}", getLocalAddress(), hosts);
+ LOGGER.info("Master failover service {} begin to failover hosts:{}", localAddress, hosts);
for (String host : hosts) {
failoverMasterWithLock(host);
@@ -174,11 +181,10 @@ public class FailoverService {
}
/**
- * failover worker tasks
+ * Do the worker failover. Will find the SUBMITTED_SUCCESS/DISPATCH/RUNNING_EXECUTION/DELAY_EXECUTION/READY_PAUSE/READY_STOP tasks belong the given worker,
+ * and failover these tasks.
* <p>
- * 1. kill yarn job if there are yarn jobs in tasks.
- * 2. change task state from running to need failover.
- * 3. failover all tasks when workerHost is null
+ * Note: When we do worker failover, the master will only failover the processInstance belongs to the current master.
*
* @param workerHost worker host
*/
@@ -188,29 +194,40 @@ public class FailoverService {
}
long startTime = System.currentTimeMillis();
- List<TaskInstance> needFailoverTaskInstanceList = processService.queryNeedFailoverTaskInstances(workerHost);
- Map<Integer, ProcessInstance> processInstanceCacheMap = new HashMap<>();
+ // we query the task instance from cache, so that we can directly update the cache
+ final List<TaskInstance> needFailoverTaskInstanceList = cacheManager.getAll()
+ .stream()
+ .flatMap(workflowExecuteRunnable -> workflowExecuteRunnable.getAllTaskInstances().stream())
+ .filter(taskInstance ->
+ workerHost.equals(taskInstance.getHost()) && ExecutionStatus.isNeedFailoverWorkflowInstanceState(taskInstance.getState()))
+ .collect(Collectors.toList());
+ final Map<Integer, ProcessInstance> processInstanceCacheMap = new HashMap<>();
LOGGER.info("start worker[{}] failover, task list size:{}", workerHost, needFailoverTaskInstanceList.size());
- List<Server> workerServers = registryClient.getServerList(NodeType.WORKER);
+ final List<Server> workerServers = registryClient.getServerList(NodeType.WORKER);
for (TaskInstance taskInstance : needFailoverTaskInstanceList) {
- ProcessInstance processInstance = processInstanceCacheMap.get(taskInstance.getProcessInstanceId());
- if (processInstance == null) {
- processInstance = processService.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
+ LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskInstance.getProcessInstanceId(), taskInstance.getId());
+ try {
+ ProcessInstance processInstance = processInstanceCacheMap.get(taskInstance.getProcessInstanceId());
if (processInstance == null) {
- LOGGER.error("failover task instance error, processInstance {} of taskInstance {} is null",
- taskInstance.getProcessInstanceId(), taskInstance.getId());
+ processInstance = cacheManager.getByProcessInstanceId(taskInstance.getProcessInstanceId()).getProcessInstance();
+ if (processInstance == null) {
+ LOGGER.error("failover task instance error, processInstance {} of taskInstance {} is null",
+ taskInstance.getProcessInstanceId(), taskInstance.getId());
+ continue;
+ }
+ processInstanceCacheMap.put(processInstance.getId(), processInstance);
+ }
+
+ // only failover the task owned myself if worker down.
+ if (!StringUtils.equalsIgnoreCase(processInstance.getHost(), localAddress)) {
continue;
}
- processInstanceCacheMap.put(processInstance.getId(), processInstance);
- }
- // only failover the task owned myself if worker down.
- if (!processInstance.getHost().equalsIgnoreCase(getLocalAddress())) {
- continue;
+ LOGGER.info("failover task instance id: {}, process instance id: {}", taskInstance.getId(), taskInstance.getProcessInstanceId());
+ failoverTaskInstance(processInstance, taskInstance, workerServers);
+ } finally {
+ LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
}
-
- LOGGER.info("failover task instance id: {}, process instance id: {}", taskInstance.getId(), taskInstance.getProcessInstanceId());
- failoverTaskInstance(processInstance, taskInstance, workerServers);
}
LOGGER.info("end worker[{}] failover, useTime:{}ms", workerHost, System.currentTimeMillis() - startTime);
}
@@ -221,17 +238,14 @@ public class FailoverService {
* 1. kill yarn job if run on worker and there are yarn jobs in tasks.
* 2. change task state from running to need failover.
* 3. try to notify local master
+ *
* @param processInstance
* @param taskInstance
- * @param servers if failover master, servers container master servers and worker servers; if failover worker, servers contain worker servers.
+ * @param servers if failover master, servers container master servers and worker servers; if failover worker, servers contain worker servers.
*/
- private void failoverTaskInstance(ProcessInstance processInstance, TaskInstance taskInstance, List<Server> servers) {
- if (processInstance == null) {
- LOGGER.error("failover task instance error, processInstance {} of taskInstance {} is null",
- taskInstance.getProcessInstanceId(), taskInstance.getId());
- return;
- }
+ private void failoverTaskInstance(@NonNull ProcessInstance processInstance, TaskInstance taskInstance, List<Server> servers) {
if (!checkTaskInstanceNeedFailover(servers, taskInstance)) {
+ LOGGER.info("The taskInstance doesn't need to failover");
return;
}
TaskMetrics.incTaskFailover();
@@ -240,6 +254,7 @@ public class FailoverService {
taskInstance.setProcessInstance(processInstance);
if (!isMasterTask) {
+ LOGGER.info("The failover taskInstance is not master task");
TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get()
.buildTaskInstanceRelatedInfo(taskInstance)
.buildProcessInstanceRelatedInfo(processInstance)
@@ -249,6 +264,8 @@ public class FailoverService {
// only kill yarn job if exists , the local thread has exited
ProcessUtils.killYarnJob(taskExecutionContext);
}
+ } else {
+ LOGGER.info("The failover taskInstance is a master task");
}
taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE);
@@ -278,7 +295,7 @@ public class FailoverService {
while (iterator.hasNext()) {
String host = iterator.next();
if (registryClient.checkNodeExists(host, NodeType.MASTER)) {
- if (!getLocalAddress().equals(host)) {
+ if (!localAddress.equals(host)) {
iterator.remove();
}
}
@@ -390,11 +407,8 @@ public class FailoverService {
return serverStartupTime;
}
- /**
- * get local address
- */
- String getLocalAddress() {
- return NetUtils.getAddr(masterConfig.getListenPort());
+ public String getLocalAddress() {
+ return localAddress;
}
}
diff --git a/dolphinscheduler-master/src/main/resources/application.yaml b/dolphinscheduler-master/src/main/resources/application.yaml
index 619322bfab..a908ac3072 100644
--- a/dolphinscheduler-master/src/main/resources/application.yaml
+++ b/dolphinscheduler-master/src/main/resources/application.yaml
@@ -92,7 +92,7 @@ master:
pre-exec-threads: 10
# master execute thread number to limit process instances in parallel
exec-threads: 100
- # master dispatch task number per batch
+ # master dispatch task number per batch, if all the tasks dispatch failed in a batch, will sleep 1s.
dispatch-task-number: 3
# master host selector to select a suitable worker, default value: LowerWeight. Optional values include random, round_robin, lower_weight
host-selector: lower_weight
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/cache/impl/ProcessInstanceExecCacheManagerImplTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/cache/impl/ProcessInstanceExecCacheManagerImplTest.java
index d7b53f3c34..96bdfe4914 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/cache/impl/ProcessInstanceExecCacheManagerImplTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/cache/impl/ProcessInstanceExecCacheManagerImplTest.java
@@ -56,7 +56,7 @@ public class ProcessInstanceExecCacheManagerImplTest {
Assert.assertTrue(processInstanceExecCacheManager.contains(1));
}
- @Test
+ @Test(expected = NullPointerException.class)
public void testCacheNull() {
processInstanceExecCacheManager.cache(2, null);
WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(2);
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutionContextTestUtils.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutionContextTestUtils.java
index 06265d2e35..b2c7d63cd7 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutionContextTestUtils.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutionContextTestUtils.java
@@ -49,8 +49,7 @@ public class ExecutionContextTestUtils {
.buildProcessDefinitionRelatedInfo(processDefinition)
.create();
- TaskExecuteRequestCommand requestCommand = new TaskExecuteRequestCommand();
- requestCommand.setTaskExecutionContext(JSONUtils.toJsonString(context));
+ TaskExecuteRequestCommand requestCommand = new TaskExecuteRequestCommand(context);
Command command = requestCommand.convert2Command();
ExecutionContext executionContext = new ExecutionContext(command, ExecutorType.WORKER);
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java
index 88c6655d2a..fdd79552b1 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java
@@ -97,7 +97,7 @@ public class NettyExecutorManagerTest {
private Command toCommand(TaskExecutionContext taskExecutionContext) {
TaskExecuteRequestCommand requestCommand = new TaskExecuteRequestCommand();
- requestCommand.setTaskExecutionContext(JSONUtils.toJsonString(taskExecutionContext));
+ requestCommand.setTaskExecutionContext(taskExecutionContext);
return requestCommand.convert2Command();
}
}
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java
index 4e3d99347a..fc9bcca3aa 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java
@@ -18,8 +18,8 @@
package org.apache.dolphinscheduler.server.master.service;
import static org.apache.dolphinscheduler.common.Constants.COMMON_TASK_TYPE;
-import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_DEPENDENT;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SWITCH;
+
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.doNothing;
@@ -31,7 +31,9 @@ import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
@@ -45,7 +47,6 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
-import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
@@ -63,7 +64,6 @@ import com.google.common.collect.Lists;
@PrepareForTest({RegistryClient.class})
@PowerMockIgnore({"javax.management.*"})
public class FailoverServiceTest {
- @InjectMocks
private FailoverService failoverService;
@Mock
@@ -78,6 +78,9 @@ public class FailoverServiceTest {
@Mock
private WorkflowExecuteThreadPool workflowExecuteThreadPool;
+ @Mock
+ private ProcessInstanceExecCacheManager cacheManager;
+
private static int masterPort = 5678;
private static int workerPort = 1234;
@@ -95,6 +98,7 @@ public class FailoverServiceTest {
springApplicationContext.setApplicationContext(applicationContext);
given(masterConfig.getListenPort()).willReturn(masterPort);
+ failoverService = new FailoverService(registryClient, masterConfig, processService, workflowExecuteThreadPool, cacheManager);
testMasterHost = failoverService.getLocalAddress();
String ip = testMasterHost.split(":")[0];
@@ -182,7 +186,16 @@ public class FailoverServiceTest {
@Test
public void failoverWorkTest() {
+ workerTaskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
+ WorkflowExecuteRunnable workflowExecuteRunnable = Mockito.mock(WorkflowExecuteRunnable.class);
+ Mockito.when(workflowExecuteRunnable.getAllTaskInstances()).thenReturn(Lists.newArrayList(workerTaskInstance));
+ Mockito.when(workflowExecuteRunnable.getProcessInstance()).thenReturn(processInstance);
+
+ Mockito.when(cacheManager.getAll()).thenReturn(Lists.newArrayList(workflowExecuteRunnable));
+ Mockito.when(cacheManager.getByProcessInstanceId(Mockito.anyInt())).thenReturn(workflowExecuteRunnable);
+
+
failoverService.failoverServerWhenDown(testWorkerHost, NodeType.WORKER);
- Assert.assertEquals(workerTaskInstance.getState(), ExecutionStatus.NEED_FAULT_TOLERANCE);
+ Assert.assertEquals(ExecutionStatus.NEED_FAULT_TOLERANCE, workerTaskInstance.getState());
}
}
diff --git a/dolphinscheduler-meter/src/main/java/org/apache/dolphinscheduler/meter/MeterConfiguration.java b/dolphinscheduler-meter/src/main/java/org/apache/dolphinscheduler/meter/MeterConfiguration.java
index b7aecf7ee6..64259f2f4f 100644
--- a/dolphinscheduler-meter/src/main/java/org/apache/dolphinscheduler/meter/MeterConfiguration.java
+++ b/dolphinscheduler-meter/src/main/java/org/apache/dolphinscheduler/meter/MeterConfiguration.java
@@ -20,10 +20,6 @@
package org.apache.dolphinscheduler.meter;
-import javax.annotation.PostConstruct;
-
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Command.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Command.java
index 9baa321a9e..527a269411 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Command.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Command.java
@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.remote.command;
import java.io.Serializable;
@@ -25,6 +26,8 @@ import java.util.concurrent.atomic.AtomicLong;
*/
public class Command implements Serializable {
+ private static final long serialVersionUID = -1L;
+
private static final AtomicLong REQUEST_ID = new AtomicLong(1);
public static final byte MAGIC = (byte) 0xbabe;
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRequestCommand.java
index 5b2e33922c..000e3f4e02 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRequestCommand.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRequestCommand.java
@@ -18,39 +18,23 @@
package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import java.io.Serializable;
-/**
- * execute task request command
- */
-public class TaskExecuteRequestCommand implements Serializable {
-
- /**
- * task execution context
- */
- private String taskExecutionContext;
-
- public String getTaskExecutionContext() {
- return taskExecutionContext;
- }
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
- public void setTaskExecutionContext(String taskExecutionContext) {
- this.taskExecutionContext = taskExecutionContext;
- }
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class TaskExecuteRequestCommand implements Serializable {
- public TaskExecuteRequestCommand() {
- }
+ private static final long serialVersionUID = -1L;
- public TaskExecuteRequestCommand(String taskExecutionContext) {
- this.taskExecutionContext = taskExecutionContext;
- }
+ private TaskExecutionContext taskExecutionContext;
- /**
- * package request command
- *
- * @return command
- */
public Command convert2Command() {
Command command = new Command();
command.setType(CommandType.TASK_EXECUTE_REQUEST);
@@ -59,10 +43,4 @@ public class TaskExecuteRequestCommand implements Serializable {
return command;
}
- @Override
- public String toString() {
- return "TaskExecuteRequestCommand{"
- + "taskExecutionContext='" + taskExecutionContext + '\''
- + '}';
- }
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java
index 2637d6a11a..125b2d82db 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java
@@ -32,6 +32,7 @@ import org.apache.dolphinscheduler.service.log.LogClientService;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.SystemUtils;
+
import java.io.File;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
@@ -43,6 +44,8 @@ import java.util.regex.Pattern;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import lombok.NonNull;
+
/**
* mainly used to get the start command line of a process.
*/
@@ -181,7 +184,10 @@ public class ProcessUtils {
* @param taskExecutionContext taskExecutionContext
* @return yarn application ids
*/
- public static List<String> killYarnJob(TaskExecutionContext taskExecutionContext) {
+ public static List<String> killYarnJob(@NonNull TaskExecutionContext taskExecutionContext) {
+ if (taskExecutionContext.getLogPath() == null) {
+ return Collections.emptyList();
+ }
try {
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
String log;
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index 9ff8689fd3..8d61a9a460 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -52,16 +52,16 @@ import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.model.DateInterval;
import org.apache.dolphinscheduler.spi.enums.ResourceType;
-import org.slf4j.Logger;
-import org.springframework.transaction.annotation.Transactional;
import java.util.Date;
import java.util.List;
import java.util.Map;
+import org.springframework.transaction.annotation.Transactional;
+
public interface ProcessService {
@Transactional
- ProcessInstance handleCommand(Logger logger, String host, Command command);
+ ProcessInstance handleCommand(String host, Command command);
void moveToErrorCommand(Command command, String message);
@@ -161,8 +161,6 @@ public interface ProcessService {
void changeOutParam(TaskInstance taskInstance);
- List<String> convertIntListToString(List<Integer> intList);
-
Schedule querySchedule(int id);
List<Schedule> queryReleaseSchedulerListByProcessDefinitionCode(long processDefinitionCode);
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index a8ae3f33a9..fc436ee4bd 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.service.process;
-import static java.util.stream.Collectors.toSet;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_EMPTY_SUB_PROCESS;
@@ -31,6 +30,8 @@ import static org.apache.dolphinscheduler.plugin.task.api.enums.DataType.VARCHAR
import static org.apache.dolphinscheduler.plugin.task.api.enums.Direct.IN;
import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TASK_INSTANCE_ID;
+import static java.util.stream.Collectors.toSet;
+
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.CommandType;
@@ -173,13 +174,6 @@ public class ProcessServiceImpl implements ProcessService {
private final Logger logger = LoggerFactory.getLogger(getClass());
- private final int[] stateArray = new int[] {ExecutionStatus.SUBMITTED_SUCCESS.ordinal(),
- ExecutionStatus.DISPATCH.ordinal(),
- ExecutionStatus.RUNNING_EXECUTION.ordinal(),
- ExecutionStatus.DELAY_EXECUTION.ordinal(),
- ExecutionStatus.READY_PAUSE.ordinal(),
- ExecutionStatus.READY_STOP.ordinal()};
-
@Autowired
private UserMapper userMapper;
@@ -290,7 +284,7 @@ public class ProcessServiceImpl implements ProcessService {
*/
@Override
@Transactional
- public ProcessInstance handleCommand(Logger logger, String host, Command command) {
+ public ProcessInstance handleCommand(String host, Command command) {
ProcessInstance processInstance = constructProcessInstance(command, host);
// cannot construct process instance, return null
if (processInstance == null) {
@@ -1973,8 +1967,7 @@ public class ProcessServiceImpl implements ProcessService {
* @param intList intList
* @return string list
*/
- @Override
- public List<String> convertIntListToString(List<Integer> intList) {
+ private List<String> convertIntListToString(List<Integer> intList) {
if (intList == null) {
return new ArrayList<>();
}
@@ -2039,12 +2032,12 @@ public class ProcessServiceImpl implements ProcessService {
*/
@Override
public List<ProcessInstance> queryNeedFailoverProcessInstances(String host) {
- return processInstanceMapper.queryByHostAndStatus(host, stateArray);
+ return processInstanceMapper.queryByHostAndStatus(host, ExecutionStatus.getNeedFailoverWorkflowInstanceState());
}
@Override
public List<String> queryNeedFailoverProcessInstanceHost() {
- return processInstanceMapper.queryNeedFailoverProcessInstanceHost(stateArray);
+ return processInstanceMapper.queryNeedFailoverProcessInstanceHost(ExecutionStatus.getNeedFailoverWorkflowInstanceState());
}
/**
@@ -2081,7 +2074,7 @@ public class ProcessServiceImpl implements ProcessService {
@Override
public List<TaskInstance> queryNeedFailoverTaskInstances(String host) {
return taskInstanceMapper.queryByHostAndStatus(host,
- stateArray);
+ ExecutionStatus.getNeedFailoverWorkflowInstanceState());
}
/**
@@ -2215,7 +2208,7 @@ public class ProcessServiceImpl implements ProcessService {
return processInstanceMapper.queryLastRunningProcess(definitionCode,
startTime,
endTime,
- stateArray);
+ ExecutionStatus.getNeedFailoverWorkflowInstanceState());
}
/**
diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
index 0a859235ac..0dd9734bd1 100644
--- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
+++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
@@ -17,7 +17,12 @@
package org.apache.dolphinscheduler.service.process;
-import com.fasterxml.jackson.databind.JsonNode;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_PARAMS;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE;
+
+import static org.mockito.ArgumentMatchers.any;
+
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.Flag;
@@ -72,9 +77,17 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.dp.InputType;
import org.apache.dolphinscheduler.plugin.task.api.enums.dp.OptionSourceType;
import org.apache.dolphinscheduler.plugin.task.api.enums.dp.ValueType;
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
-import org.apache.dolphinscheduler.service.exceptions.ServiceException;
import org.apache.dolphinscheduler.service.cron.CronUtilsTest;
+import org.apache.dolphinscheduler.service.exceptions.ServiceException;
import org.apache.dolphinscheduler.spi.params.base.FormType;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
@@ -88,17 +101,7 @@ import org.powermock.reflect.Whitebox;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_PARAMS;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE;
-import static org.mockito.ArgumentMatchers.any;
+import com.fasterxml.jackson.databind.JsonNode;
/**
* process service test
@@ -289,8 +292,8 @@ public class ProcessServiceTest {
command.setProcessDefinitionCode(222);
command.setCommandType(CommandType.REPEAT_RUNNING);
command.setCommandParam("{\"" + CMD_PARAM_RECOVER_PROCESS_ID_STRING + "\":\"111\",\""
- + CMD_PARAM_SUB_PROCESS_DEFINE_CODE + "\":\"222\"}");
- Assert.assertNull(processService.handleCommand(logger, host, command));
+ + CMD_PARAM_SUB_PROCESS_DEFINE_CODE + "\":\"222\"}");
+ Assert.assertNull(processService.handleCommand(host, command));
int definitionVersion = 1;
long definitionCode = 123;
@@ -325,7 +328,7 @@ public class ProcessServiceTest {
Mockito.when(processDefineLogMapper.queryByDefinitionCodeAndVersion(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion())).thenReturn(new ProcessDefinitionLog(processDefinition));
Mockito.when(processInstanceMapper.queryDetailById(222)).thenReturn(processInstance);
- Assert.assertNotNull(processService.handleCommand(logger, host, command1));
+ Assert.assertNotNull(processService.handleCommand(host, command1));
Command command2 = new Command();
command2.setId(2);
@@ -335,7 +338,7 @@ public class ProcessServiceTest {
command2.setCommandType(CommandType.RECOVER_SUSPENDED_PROCESS);
command2.setProcessInstanceId(processInstanceId);
Mockito.when(commandMapper.deleteById(2)).thenReturn(1);
- Assert.assertNotNull(processService.handleCommand(logger, host, command2));
+ Assert.assertNotNull(processService.handleCommand(host, command2));
Command command3 = new Command();
command3.setId(3);
@@ -345,7 +348,7 @@ public class ProcessServiceTest {
command3.setCommandParam("{\"WaitingThreadInstanceId\":222}");
command3.setCommandType(CommandType.START_FAILURE_TASK_PROCESS);
Mockito.when(commandMapper.deleteById(3)).thenReturn(1);
- Assert.assertNotNull(processService.handleCommand(logger, host, command3));
+ Assert.assertNotNull(processService.handleCommand(host, command3));
Command command4 = new Command();
command4.setId(4);
@@ -355,7 +358,7 @@ public class ProcessServiceTest {
command4.setCommandType(CommandType.REPEAT_RUNNING);
command4.setProcessInstanceId(processInstanceId);
Mockito.when(commandMapper.deleteById(4)).thenReturn(1);
- Assert.assertNotNull(processService.handleCommand(logger, host, command4));
+ Assert.assertNotNull(processService.handleCommand(host, command4));
Command command5 = new Command();
command5.setId(5);
@@ -374,7 +377,7 @@ public class ProcessServiceTest {
processDefinition.getGlobalParamList(),
CommandType.START_PROCESS,
processInstance.getScheduleTime(), null)).thenReturn("\"testStartParam1\"");
- ProcessInstance processInstance1 = processService.handleCommand(logger, host, command5);
+ ProcessInstance processInstance1 = processService.handleCommand(host, command5);
Assert.assertTrue(processInstance1.getGlobalParams().contains("\"testStartParam1\""));
ProcessDefinition processDefinition1 = new ProcessDefinition();
@@ -399,7 +402,7 @@ public class ProcessServiceTest {
Mockito.when(processInstanceMapper.queryDetailById(223)).thenReturn(processInstance2);
Mockito.when(processDefineMapper.queryByCode(11L)).thenReturn(processDefinition1);
Mockito.when(commandMapper.deleteById(1)).thenReturn(1);
- Assert.assertNotNull(processService.handleCommand(logger, host, command1));
+ Assert.assertNotNull(processService.handleCommand(host, command1));
Command command6 = new Command();
command6.setId(6);
@@ -410,7 +413,7 @@ public class ProcessServiceTest {
Mockito.when(processInstanceMapper.queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(11L, 1, Constants.RUNNING_PROCESS_STATE, 223)).thenReturn(lists);
Mockito.when(processInstanceMapper.updateNextProcessIdById(223, 222)).thenReturn(true);
Mockito.when(commandMapper.deleteById(6)).thenReturn(1);
- ProcessInstance processInstance6 = processService.handleCommand(logger, host, command6);
+ ProcessInstance processInstance6 = processService.handleCommand(host, command6);
Assert.assertTrue(processInstance6 != null);
processDefinition1.setExecutionType(ProcessExecutionTypeEnum.SERIAL_DISCARD);
@@ -429,7 +432,7 @@ public class ProcessServiceTest {
command7.setProcessDefinitionVersion(1);
Mockito.when(commandMapper.deleteById(7)).thenReturn(1);
Mockito.when(processInstanceMapper.queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(11L, 1, Constants.RUNNING_PROCESS_STATE, 224)).thenReturn(null);
- ProcessInstance processInstance8 = processService.handleCommand(logger, host, command7);
+ ProcessInstance processInstance8 = processService.handleCommand(host, command7);
Assert.assertTrue(processInstance8 != null);
ProcessDefinition processDefinition2 = new ProcessDefinition();
@@ -453,7 +456,7 @@ public class ProcessServiceTest {
Mockito.when(processInstanceMapper.queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(12L, 1, Constants.RUNNING_PROCESS_STATE, 0)).thenReturn(lists);
Mockito.when(processInstanceMapper.updateById(processInstance)).thenReturn(1);
Mockito.when(commandMapper.deleteById(9)).thenReturn(1);
- ProcessInstance processInstance10 = processService.handleCommand(logger, host, command9);
+ ProcessInstance processInstance10 = processService.handleCommand(host, command9);
Assert.assertTrue(processInstance10 == null);
}
@@ -494,7 +497,7 @@ public class ProcessServiceTest {
Mockito.when(processInstanceMapper.queryDetailById(222)).thenReturn(processInstance);
// will throw exception when command id is 0 and delete fail
- processService.handleCommand(logger, host, command1);
+ processService.handleCommand(host, command1);
}
@Test
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java
index 4e1958c671..2ada887868 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java
@@ -22,13 +22,25 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper;
+import java.io.Serializable;
import java.util.Date;
import java.util.Map;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
/**
* to master/worker task transport
*/
-public class TaskExecutionContext {
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class TaskExecutionContext implements Serializable {
+
+ private static final long serialVersionUID = -1L;
/**
* task id
@@ -248,399 +260,4 @@ public class TaskExecutionContext {
* max memory
*/
private Integer memoryMax;
-
- public String getTaskLogName() {
- return taskLogName;
- }
-
- public void setTaskLogName(String taskLogName) {
- this.taskLogName = taskLogName;
- }
-
- public Map<String, String> getResources() {
- return resources;
- }
-
- public void setResources(Map<String, String> resources) {
- this.resources = resources;
- }
-
- public Map<String, Property> getParamsMap() {
- return paramsMap;
- }
-
- public void setParamsMap(Map<String, Property> paramsMap) {
- this.paramsMap = paramsMap;
- }
-
- public int getTaskInstanceId() {
- return taskInstanceId;
- }
-
- public void setTaskInstanceId(int taskInstanceId) {
- this.taskInstanceId = taskInstanceId;
- }
-
- public String getTaskName() {
- return taskName;
- }
-
- public void setTaskName(String taskName) {
- this.taskName = taskName;
- }
-
- public Date getFirstSubmitTime() {
- return firstSubmitTime;
- }
-
- public void setFirstSubmitTime(Date firstSubmitTime) {
- this.firstSubmitTime = firstSubmitTime;
- }
-
- public Date getStartTime() {
- return startTime;
- }
-
- public void setStartTime(Date startTime) {
- this.startTime = startTime;
- }
-
- public String getTaskType() {
- return taskType;
- }
-
- public void setTaskType(String taskType) {
- this.taskType = taskType;
- }
-
- public String getHost() {
- return host;
- }
-
- public void setHost(String host) {
- this.host = host;
- }
-
- public String getExecutePath() {
- return executePath;
- }
-
- public void setExecutePath(String executePath) {
- this.executePath = executePath;
- }
-
- public String getLogPath() {
- return logPath;
- }
-
- public void setLogPath(String logPath) {
- this.logPath = logPath;
- }
-
- public String getTaskJson() {
- return taskJson;
- }
-
- public void setTaskJson(String taskJson) {
- this.taskJson = taskJson;
- }
-
- public int getProcessId() {
- return processId;
- }
-
- public void setProcessId(int processId) {
- this.processId = processId;
- }
-
- public String getAppIds() {
- return appIds;
- }
-
- public void setAppIds(String appIds) {
- this.appIds = appIds;
- }
-
- public int getProcessInstanceId() {
- return processInstanceId;
- }
-
- public void setProcessInstanceId(int processInstanceId) {
- this.processInstanceId = processInstanceId;
- }
-
- public Date getScheduleTime() {
- return scheduleTime;
- }
-
- public void setScheduleTime(Date scheduleTime) {
- this.scheduleTime = scheduleTime;
- }
-
- public String getGlobalParams() {
- return globalParams;
- }
-
- public void setGlobalParams(String globalParams) {
- this.globalParams = globalParams;
- }
-
- public int getExecutorId() {
- return executorId;
- }
-
- public void setExecutorId(int executorId) {
- this.executorId = executorId;
- }
-
- public int getCmdTypeIfComplement() {
- return cmdTypeIfComplement;
- }
-
- public void setCmdTypeIfComplement(int cmdTypeIfComplement) {
- this.cmdTypeIfComplement = cmdTypeIfComplement;
- }
-
- public String getTenantCode() {
- return tenantCode;
- }
-
- public void setTenantCode(String tenantCode) {
- this.tenantCode = tenantCode;
- }
-
- public String getQueue() {
- return queue;
- }
-
- public void setQueue(String queue) {
- this.queue = queue;
- }
-
- public int getProcessDefineId() {
- return processDefineId;
- }
-
- public void setProcessDefineId(int processDefineId) {
- this.processDefineId = processDefineId;
- }
-
- public int getProjectId() {
- return projectId;
- }
-
- public void setProjectId(int projectId) {
- this.projectId = projectId;
- }
-
- public String getTaskParams() {
- return taskParams;
- }
-
- public void setTaskParams(String taskParams) {
- this.taskParams = taskParams;
- }
-
- public String getEnvFile() {
- return envFile;
- }
-
- public void setEnvFile(String envFile) {
- this.envFile = envFile;
- }
-
- public String getEnvironmentConfig() {
- return environmentConfig;
- }
-
- public void setEnvironmentConfig(String config) {
- this.environmentConfig = config;
- }
-
- public Map<String, String> getDefinedParams() {
- return definedParams;
- }
-
- public void setDefinedParams(Map<String, String> definedParams) {
- this.definedParams = definedParams;
- }
-
- public String getTaskAppId() {
- return taskAppId;
- }
-
- public void setTaskAppId(String taskAppId) {
- this.taskAppId = taskAppId;
- }
-
- public TaskTimeoutStrategy getTaskTimeoutStrategy() {
- return taskTimeoutStrategy;
- }
-
- public void setTaskTimeoutStrategy(TaskTimeoutStrategy taskTimeoutStrategy) {
- this.taskTimeoutStrategy = taskTimeoutStrategy;
- }
-
- public int getTaskTimeout() {
- return taskTimeout;
- }
-
- public void setTaskTimeout(int taskTimeout) {
- this.taskTimeout = taskTimeout;
- }
-
- public String getWorkerGroup() {
- return workerGroup;
- }
-
- public void setWorkerGroup(String workerGroup) {
- this.workerGroup = workerGroup;
- }
-
- public int getDelayTime() {
- return delayTime;
- }
-
- public void setDelayTime(int delayTime) {
- this.delayTime = delayTime;
- }
-
- public ResourceParametersHelper getResourceParametersHelper() {
- return resourceParametersHelper;
- }
-
- public void setResourceParametersHelper(ResourceParametersHelper resourceParametersHelper) {
- this.resourceParametersHelper = resourceParametersHelper;
- }
-
- public String getVarPool() {
- return varPool;
- }
-
- public void setVarPool(String varPool) {
- this.varPool = varPool;
- }
-
- public int getDryRun() {
- return dryRun;
- }
-
- public void setDryRun(int dryRun) {
- this.dryRun = dryRun;
- }
-
- public Long getProcessDefineCode() {
- return processDefineCode;
- }
-
- public void setProcessDefineCode(Long processDefineCode) {
- this.processDefineCode = processDefineCode;
- }
-
- public int getProcessDefineVersion() {
- return processDefineVersion;
- }
-
- public void setProcessDefineVersion(int processDefineVersion) {
- this.processDefineVersion = processDefineVersion;
- }
-
- public long getProjectCode() {
- return projectCode;
- }
-
- public void setProjectCode(long projectCode) {
- this.projectCode = projectCode;
- }
-
- public DataQualityTaskExecutionContext getDataQualityTaskExecutionContext() {
- return dataQualityTaskExecutionContext;
- }
-
- public void setDataQualityTaskExecutionContext(DataQualityTaskExecutionContext dataQualityTaskExecutionContext) {
- this.dataQualityTaskExecutionContext = dataQualityTaskExecutionContext;
- }
-
- public void setCurrentExecutionStatus(ExecutionStatus currentExecutionStatus) {
- this.currentExecutionStatus = currentExecutionStatus;
- }
-
- public ExecutionStatus getCurrentExecutionStatus() {
- return currentExecutionStatus;
- }
-
- public Date getEndTime() {
- return endTime;
- }
-
- public void setEndTime(Date endTime) {
- this.endTime = endTime;
- }
-
- public Integer getCpuQuota() {
- return cpuQuota;
- }
-
- public void setCpuQuota(Integer cpuQuota) {
- this.cpuQuota = cpuQuota;
- }
-
- public Integer getMemoryMax() {
- return memoryMax;
- }
-
- public void setMemoryMax(Integer memoryMax) {
- this.memoryMax = memoryMax;
- }
-
- public K8sTaskExecutionContext getK8sTaskExecutionContext() {
- return k8sTaskExecutionContext;
- }
-
- public void setK8sTaskExecutionContext(K8sTaskExecutionContext k8sTaskExecutionContext) {
- this.k8sTaskExecutionContext = k8sTaskExecutionContext;
- }
-
- @Override
- public String toString() {
- return "TaskExecutionContext{"
- + "taskInstanceId=" + taskInstanceId
- + ", taskName='" + taskName + '\''
- + ", currentExecutionStatus=" + currentExecutionStatus
- + ", firstSubmitTime=" + firstSubmitTime
- + ", startTime=" + startTime
- + ", taskType='" + taskType + '\''
- + ", host='" + host + '\''
- + ", executePath='" + executePath + '\''
- + ", logPath='" + logPath + '\''
- + ", taskJson='" + taskJson + '\''
- + ", processId=" + processId
- + ", processDefineCode=" + processDefineCode
- + ", processDefineVersion=" + processDefineVersion
- + ", appIds='" + appIds + '\''
- + ", processInstanceId=" + processInstanceId
- + ", scheduleTime=" + scheduleTime
- + ", globalParams='" + globalParams + '\''
- + ", executorId=" + executorId
- + ", cmdTypeIfComplement=" + cmdTypeIfComplement
- + ", tenantCode='" + tenantCode + '\''
- + ", queue='" + queue + '\''
- + ", projectCode=" + projectCode
- + ", taskParams='" + taskParams + '\''
- + ", envFile='" + envFile + '\''
- + ", dryRun='" + dryRun + '\''
- + ", definedParams=" + definedParams
- + ", taskAppId='" + taskAppId + '\''
- + ", taskTimeoutStrategy=" + taskTimeoutStrategy
- + ", taskTimeout=" + taskTimeout
- + ", workerGroup='" + workerGroup + '\''
- + ", environmentConfig='" + environmentConfig + '\''
- + ", delayTime=" + delayTime
- + ", resources=" + resources
- + ", sqlTaskExecutionContext=" + sqlTaskExecutionContext
- + ", k8sTaskExecutionContext=" + k8sTaskExecutionContext
- + ", dataQualityTaskExecutionContext=" + dataQualityTaskExecutionContext
- + '}';
- }
-
}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/ExecutionStatus.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/ExecutionStatus.java
index 82c8db12d4..411de30c34 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/ExecutionStatus.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/ExecutionStatus.java
@@ -78,6 +78,15 @@ public enum ExecutionStatus {
private static HashMap<Integer, ExecutionStatus> EXECUTION_STATUS_MAP = new HashMap<>();
+ private static final int[] NEED_FAILOVER_STATES = new int[] {
+ ExecutionStatus.SUBMITTED_SUCCESS.ordinal(),
+ ExecutionStatus.DISPATCH.ordinal(),
+ ExecutionStatus.RUNNING_EXECUTION.ordinal(),
+ ExecutionStatus.DELAY_EXECUTION.ordinal(),
+ ExecutionStatus.READY_PAUSE.ordinal(),
+ ExecutionStatus.READY_STOP.ordinal()
+ };
+
static {
for (ExecutionStatus executionStatus : ExecutionStatus.values()) {
EXECUTION_STATUS_MAP.put(executionStatus.code, executionStatus);
@@ -180,4 +189,18 @@ public enum ExecutionStatus {
}
throw new IllegalArgumentException("invalid status : " + status);
}
+
+ public static boolean isNeedFailoverWorkflowInstanceState(ExecutionStatus executionStatus) {
+ return
+ ExecutionStatus.SUBMITTED_SUCCESS == executionStatus
+ || ExecutionStatus.DISPATCH == executionStatus
+ || ExecutionStatus.RUNNING_EXECUTION == executionStatus
+ || ExecutionStatus.DELAY_EXECUTION == executionStatus
+ || ExecutionStatus.READY_PAUSE == executionStatus
+ || ExecutionStatus.READY_STOP == executionStatus;
+ }
+
+ public static int[] getNeedFailoverWorkflowInstanceState() {
+ return NEED_FAILOVER_STATES;
+ }
}
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/metrics/WorkerServerMetrics.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/metrics/WorkerServerMetrics.java
index 1e69d873ab..6c68cc00ec 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/metrics/WorkerServerMetrics.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/metrics/WorkerServerMetrics.java
@@ -22,22 +22,20 @@ import java.util.function.Supplier;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.Metrics;
+import lombok.experimental.UtilityClass;
-public final class WorkerServerMetrics {
-
- public WorkerServerMetrics() {
- throw new UnsupportedOperationException("Utility class");
- }
+@UtilityClass
+public class WorkerServerMetrics {
private static final Counter WORKER_OVERLOAD_COUNTER =
- Counter.builder("ds.worker.overload.count")
- .description("overloaded workers count")
- .register(Metrics.globalRegistry);
+ Counter.builder("ds.worker.overload.count")
+ .description("overloaded workers count")
+ .register(Metrics.globalRegistry);
private static final Counter WORKER_SUBMIT_QUEUE_IS_FULL_COUNTER =
- Counter.builder("ds.worker.full.submit.queue.count")
- .description("full worker submit queues count")
- .register(Metrics.globalRegistry);
+ Counter.builder("ds.worker.full.submit.queue.count")
+ .description("full worker submit queues count")
+ .register(Metrics.globalRegistry);
public static void incWorkerOverloadCount() {
WORKER_OVERLOAD_COUNTER.increment();
@@ -49,8 +47,9 @@ public final class WorkerServerMetrics {
public static void registerWorkerRunningTaskGauge(Supplier<Number> supplier) {
Gauge.builder("ds.task.running", supplier)
- .description("number of running tasks on workers")
- .register(Metrics.globalRegistry);
+ .description("number of running tasks on workers")
+ .register(Metrics.globalRegistry);
}
+
}
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
index 8264ea52f0..3f70974344 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
@@ -34,6 +34,7 @@ import org.apache.dolphinscheduler.remote.processor.NettyRemoteChannel;
import org.apache.dolphinscheduler.server.worker.cache.ResponseCache;
import java.util.Arrays;
+import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
@@ -92,9 +93,6 @@ public class TaskCallbackService {
* change remote channel
*/
public void changeRemoteChannel(int taskInstanceId, NettyRemoteChannel channel) {
- if (REMOTE_CHANNELS.containsKey(taskInstanceId)) {
- REMOTE_CHANNELS.remove(taskInstanceId);
- }
REMOTE_CHANNELS.put(taskInstanceId, channel);
}
@@ -104,19 +102,19 @@ public class TaskCallbackService {
* @param taskInstanceId taskInstanceId
* @return callback channel
*/
- private NettyRemoteChannel getRemoteChannel(int taskInstanceId) {
+ private Optional<NettyRemoteChannel> getRemoteChannel(int taskInstanceId) {
Channel newChannel;
NettyRemoteChannel nettyRemoteChannel = REMOTE_CHANNELS.get(taskInstanceId);
if (nettyRemoteChannel != null) {
if (nettyRemoteChannel.isActive()) {
- return nettyRemoteChannel;
+ return Optional.of(nettyRemoteChannel);
}
newChannel = nettyRemotingClient.getChannel(nettyRemoteChannel.getHost());
if (newChannel != null) {
- return getRemoteChannel(newChannel, nettyRemoteChannel.getOpaque(), taskInstanceId);
+ return Optional.of(getRemoteChannel(newChannel, nettyRemoteChannel.getOpaque(), taskInstanceId));
}
}
- return null;
+ return Optional.empty();
}
public long pause(int ntries) {
@@ -151,15 +149,13 @@ public class TaskCallbackService {
* @param command command
*/
public void send(int taskInstanceId, Command command) {
- NettyRemoteChannel nettyRemoteChannel = getRemoteChannel(taskInstanceId);
- if (nettyRemoteChannel != null) {
- nettyRemoteChannel.writeAndFlush(command).addListener(new ChannelFutureListener() {
-
+ Optional<NettyRemoteChannel> nettyRemoteChannel = getRemoteChannel(taskInstanceId);
+ if (nettyRemoteChannel.isPresent()) {
+ nettyRemoteChannel.get().writeAndFlush(command).addListener(new ChannelFutureListener() {
@Override
- public void operationComplete(ChannelFuture future) throws Exception {
- if (future.isSuccess()) {
- // remove(taskInstanceId);
- return;
+ public void operationComplete(ChannelFuture future) {
+ if (!future.isSuccess()) {
+ logger.error("Send callback command error, taskInstanceId: {}, command: {}", taskInstanceId, command);
}
}
});
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
index 533f40fa41..c14a2e891e 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
@@ -111,8 +111,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
}
logger.info("task execute request command : {}", taskRequestCommand);
- String contextJson = taskRequestCommand.getTaskExecutionContext();
- TaskExecutionContext taskExecutionContext = JSONUtils.parseObject(contextJson, TaskExecutionContext.class);
+ TaskExecutionContext taskExecutionContext = taskRequestCommand.getTaskExecutionContext();
if (taskExecutionContext == null) {
logger.error("task execution context is null");
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java
index 03df8de451..f691f7c8de 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java
@@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.worker.runner;
+import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.remote.command.Command;
@@ -34,7 +35,7 @@ import org.springframework.stereotype.Component;
* Retry Report Task Status Thread
*/
@Component
-public class RetryReportTaskStatusThread implements Runnable {
+public class RetryReportTaskStatusThread extends BaseDaemonThread {
private final Logger logger = LoggerFactory.getLogger(RetryReportTaskStatusThread.class);
@@ -46,11 +47,14 @@ public class RetryReportTaskStatusThread implements Runnable {
@Autowired
private TaskCallbackService taskCallbackService;
- public void start() {
+ protected RetryReportTaskStatusThread() {
+ super("RetryReportTaskStatusThread");
+ }
+
+ @Override
+ public synchronized void start() {
logger.info("Retry report task status thread starting");
- Thread thread = new Thread(this, "RetryReportTaskStatusThread");
- thread.setDaemon(true);
- thread.start();
+ super.start();
logger.info("Retry report task status thread started");
}
@@ -59,7 +63,7 @@ public class RetryReportTaskStatusThread implements Runnable {
*/
@Override
public void run() {
- ResponseCache instance = ResponseCache.get();
+ final ResponseCache instance = ResponseCache.get();
while (Stopper.isRunning()) {
@@ -67,33 +71,57 @@ public class RetryReportTaskStatusThread implements Runnable {
ThreadUtils.sleep(RETRY_REPORT_TASK_STATUS_INTERVAL);
try {
- if (!instance.getRunningCache().isEmpty()) {
- Map<Integer, Command> runningCache = instance.getRunningCache();
- for (Map.Entry<Integer, Command> entry : runningCache.entrySet()) {
- Integer taskInstanceId = entry.getKey();
- Command runningCommand = entry.getValue();
- taskCallbackService.send(taskInstanceId, runningCommand);
- }
+ // todo: Only retry the send failed command
+ retryRunningCommand(instance);
+ retryResponseCommand(instance);
+ retryRecallCommand(instance);
+ } catch (Exception e) {
+ logger.warn("Retry report task status error", e);
+ }
+ }
+ }
+
+ private void retryRunningCommand(ResponseCache instance) {
+ if (!instance.getRunningCache().isEmpty()) {
+ Map<Integer, Command> runningCache = instance.getRunningCache();
+ for (Map.Entry<Integer, Command> entry : runningCache.entrySet()) {
+ Integer taskInstanceId = entry.getKey();
+ Command runningCommand = entry.getValue();
+ try {
+ taskCallbackService.send(taskInstanceId, runningCommand);
+ } catch (Exception ex) {
+ logger.error("Retry send running command to master error, taskInstanceId: {}, command: {}", taskInstanceId, runningCommand);
}
+ }
+ }
+ }
- if (!instance.getResponseCache().isEmpty()) {
- Map<Integer, Command> responseCache = instance.getResponseCache();
- for (Map.Entry<Integer, Command> entry : responseCache.entrySet()) {
- Integer taskInstanceId = entry.getKey();
- Command responseCommand = entry.getValue();
- taskCallbackService.send(taskInstanceId, responseCommand);
- }
+ private void retryResponseCommand(ResponseCache instance) {
+ if (!instance.getResponseCache().isEmpty()) {
+ Map<Integer, Command> responseCache = instance.getResponseCache();
+ for (Map.Entry<Integer, Command> entry : responseCache.entrySet()) {
+ Integer taskInstanceId = entry.getKey();
+ Command responseCommand = entry.getValue();
+ try {
+ taskCallbackService.send(taskInstanceId, responseCommand);
+ } catch (Exception ex) {
+ logger.error("Retry send response command to master error, taskInstanceId: {}, command: {}", taskInstanceId, responseCommand);
}
- if (!instance.getRecallCache().isEmpty()) {
- Map<Integer, Command> recallCache = instance.getRecallCache();
- for (Map.Entry<Integer, Command> entry : recallCache.entrySet()) {
- Integer taskInstanceId = entry.getKey();
- Command responseCommand = entry.getValue();
- taskCallbackService.send(taskInstanceId, responseCommand);
- }
+ }
+ }
+ }
+
+ private void retryRecallCommand(ResponseCache instance) {
+ if (!instance.getRecallCache().isEmpty()) {
+ Map<Integer, Command> recallCache = instance.getRecallCache();
+ for (Map.Entry<Integer, Command> entry : recallCache.entrySet()) {
+ Integer taskInstanceId = entry.getKey();
+ Command responseCommand = entry.getValue();
+ try {
+ taskCallbackService.send(taskInstanceId, responseCommand);
+ } catch (Exception ex) {
+ logger.error("Retry send recall command to master error, taskInstanceId: {}, command: {}", taskInstanceId, responseCommand);
}
- } catch (Exception e) {
- logger.warn("Retry report task status error", e);
}
}
}
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
index 9349056507..7d49005395 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
@@ -132,9 +132,10 @@ public class TaskExecuteThread implements Runnable, Delayed {
taskExecutionContext.setEndTime(new Date());
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
+ logger.info("[WorkflowInstance-{}][TaskInstance-{}] Task dry run success",
+ taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId());
return;
}
-
try {
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId());
logger.info("script path : {}", taskExecutionContext.getExecutePath());
diff --git a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java
index 13c533f1cf..2187a19281 100644
--- a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java
+++ b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java
@@ -88,7 +88,7 @@ public class TaskExecuteProcessorTest {
command = new Command();
command.setType(CommandType.TASK_EXECUTE_REQUEST);
ackCommand = new TaskExecuteRunningCommand().convert2Command();
- taskRequestCommand = new TaskExecuteRequestCommand();
+ taskRequestCommand = new TaskExecuteRequestCommand(taskExecutionContext);
alertClientService = PowerMockito.mock(AlertClientService.class);
workerExecService = PowerMockito.mock(ExecutorService.class);
PowerMockito.when(workerExecService.submit(Mockito.any(TaskExecuteThread.class)))
@@ -127,8 +127,6 @@ public class TaskExecuteProcessorTest {
PowerMockito.mockStatic(JSONUtils.class);
PowerMockito.when(JSONUtils.parseObject(command.getBody(), TaskExecuteRequestCommand.class))
.thenReturn(taskRequestCommand);
- PowerMockito.when(JSONUtils.parseObject(taskRequestCommand.getTaskExecutionContext(), TaskExecutionContext.class))
- .thenReturn(taskExecutionContext);
PowerMockito.mockStatic(FileUtils.class);
PowerMockito.when(FileUtils.getProcessExecDir(taskExecutionContext.getProjectCode(),