You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by we...@apache.org on 2022/07/19 04:25:28 UTC
[dolphinscheduler] 01/29: Fix TaskProcessorFactory#getTaskProcessor get common processor is not thread safe (#10479)
This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch 3.0.0-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
commit 4ceb42087310575ffedd7b09fbf35a08614bb3b7
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Thu Jun 16 21:46:18 2022 +0800
Fix TaskProcessorFactory#getTaskProcessor get common processor is not thread safe (#10479)
* Fix TaskProcessorFactory#getTaskProcessor get common processor is not thread safe
(cherry picked from commit ad2646ff1f7baa5d76d29023ced2c28a89b52f6b)
---
.../dolphinscheduler/common/utils/FileUtils.java | 29 +++-
.../dao/entity/ProcessInstance.java | 12 +-
.../master/consumer/TaskPriorityQueueConsumer.java | 8 +-
.../server/master/processor/queue/TaskEvent.java | 105 +-----------
.../processor/queue/TaskExecuteThreadPool.java | 22 +--
.../master/runner/WorkflowExecuteThread.java | 176 +++++++++++++--------
.../master/runner/WorkflowExecuteThreadPool.java | 2 +-
.../master/runner/task/CommonTaskProcessor.java | 4 +-
.../master/runner/task/TaskProcessorFactory.java | 27 +++-
.../runner/task/TaskProcessorFactoryTest.java | 4 +-
.../remote/utils/ChannelUtils.java | 12 +-
.../apache/dolphinscheduler/remote/utils/Host.java | 2 +
.../service/process/ProcessServiceImpl.java | 5 +-
.../queue/PeerTaskInstancePriorityQueue.java | 35 ++--
.../queue/PeerTaskInstancePriorityQueueTest.java | 9 +-
.../src/main/resources/application.yaml | 4 +-
.../plugin/task/shell/ShellTask.java | 7 +-
.../server/worker/config/WorkerConfig.java | 4 +-
.../worker/processor/TaskCallbackService.java | 2 +
.../server/worker/runner/TaskExecuteThread.java | 24 ++-
20 files changed, 247 insertions(+), 246 deletions(-)
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java
index bdcf62f76a..23e4b74b75 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java
@@ -17,14 +17,25 @@
package org.apache.dolphinscheduler.common.utils;
+import static org.apache.dolphinscheduler.common.Constants.DATA_BASEDIR_PATH;
+import static org.apache.dolphinscheduler.common.Constants.FOLDER_SEPARATOR;
+import static org.apache.dolphinscheduler.common.Constants.RESOURCE_VIEW_SUFFIXES;
+import static org.apache.dolphinscheduler.common.Constants.RESOURCE_VIEW_SUFFIXES_DEFAULT_VALUE;
+import static org.apache.dolphinscheduler.common.Constants.UTF_8;
+import static org.apache.dolphinscheduler.common.Constants.YYYYMMDDHHMMSS;
+
import org.apache.commons.io.IOUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.io.*;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
import java.nio.charset.StandardCharsets;
+import java.nio.file.NoSuchFileException;
-import static org.apache.dolphinscheduler.common.Constants.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* file utils
@@ -112,7 +123,15 @@ public class FileUtils {
File execLocalPathFile = new File(execLocalPath);
if (execLocalPathFile.exists()) {
- org.apache.commons.io.FileUtils.forceDelete(execLocalPathFile);
+ try {
+ org.apache.commons.io.FileUtils.forceDelete(execLocalPathFile);
+ } catch (Exception ex) {
+ if (ex instanceof NoSuchFileException || ex.getCause() instanceof NoSuchFileException) {
+ // this file is already be deleted.
+ } else {
+ throw ex;
+ }
+ }
}
//create work dir
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java
index 8f639efc07..fb3ae13858 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java
@@ -266,13 +266,11 @@ public class ProcessInstance {
*/
public ProcessInstance(ProcessDefinition processDefinition) {
this.processDefinition = processDefinition;
- this.name = processDefinition.getName()
- + "-"
- +
- processDefinition.getVersion()
- + "-"
- +
- DateUtils.getCurrentTimeStamp();
+ // todo: the name is not unique
+ this.name = String.join("-",
+ processDefinition.getName(),
+ String.valueOf(processDefinition.getVersion()),
+ DateUtils.getCurrentTimeStamp());
}
public String getVarPool() {
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
index f692685009..f9c19c48fd 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
@@ -180,16 +180,18 @@ public class TaskPriorityQueueConsumer extends Thread {
return true;
}
}
-
result = dispatcher.dispatch(executionContext);
if (result) {
+ logger.info("Master success dispatch task to worker, taskInstanceId: {}", taskPriority.getTaskId());
addDispatchEvent(context, executionContext);
+ } else {
+ logger.info("Master failed to dispatch task to worker, taskInstanceId: {}", taskPriority.getTaskId());
}
} catch (RuntimeException e) {
- logger.error("dispatch error: ", e);
+ logger.error("Master dispatch task to worker error: ", e);
} catch (ExecuteException e) {
- logger.error("dispatch error: {}", e.getMessage());
+ logger.error("Master dispatch task to worker error: {}", e);
}
return result;
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java
index 865eee53a5..86cce5170e 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java
@@ -26,10 +26,12 @@ import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
import java.util.Date;
import io.netty.channel.Channel;
+import lombok.Data;
/**
* task event
*/
+@Data
public class TaskEvent {
/**
@@ -135,107 +137,4 @@ public class TaskEvent {
return event;
}
- public String getVarPool() {
- return varPool;
- }
-
- public void setVarPool(String varPool) {
- this.varPool = varPool;
- }
-
- public int getTaskInstanceId() {
- return taskInstanceId;
- }
-
- public void setTaskInstanceId(int taskInstanceId) {
- this.taskInstanceId = taskInstanceId;
- }
-
- public String getWorkerAddress() {
- return workerAddress;
- }
-
- public void setWorkerAddress(String workerAddress) {
- this.workerAddress = workerAddress;
- }
-
- public ExecutionStatus getState() {
- return state;
- }
-
- public void setState(ExecutionStatus state) {
- this.state = state;
- }
-
- public Date getStartTime() {
- return startTime;
- }
-
- public void setStartTime(Date startTime) {
- this.startTime = startTime;
- }
-
- public Date getEndTime() {
- return endTime;
- }
-
- public void setEndTime(Date endTime) {
- this.endTime = endTime;
- }
-
- public String getExecutePath() {
- return executePath;
- }
-
- public void setExecutePath(String executePath) {
- this.executePath = executePath;
- }
-
- public String getLogPath() {
- return logPath;
- }
-
- public void setLogPath(String logPath) {
- this.logPath = logPath;
- }
-
- public int getProcessId() {
- return processId;
- }
-
- public void setProcessId(int processId) {
- this.processId = processId;
- }
-
- public String getAppIds() {
- return appIds;
- }
-
- public void setAppIds(String appIds) {
- this.appIds = appIds;
- }
-
- public Event getEvent() {
- return event;
- }
-
- public void setEvent(Event event) {
- this.event = event;
- }
-
- public Channel getChannel() {
- return channel;
- }
-
- public void setChannel(Channel channel) {
- this.channel = channel;
- }
-
- public int getProcessInstanceId() {
- return processInstanceId;
- }
-
- public void setProcessInstanceId(int processInstanceId) {
- this.processInstanceId = processInstanceId;
- }
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java
index 75c0272519..c9c2868d36 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java
@@ -79,7 +79,7 @@ public class TaskExecuteThreadPool extends ThreadPoolTaskExecutor {
public void submitTaskEvent(TaskEvent taskEvent) {
if (!processInstanceExecCacheManager.contains(taskEvent.getProcessInstanceId())) {
- logger.warn("workflowExecuteThread is null, event: {}", taskEvent);
+ logger.warn("Cannot find workflowExecuteThread from cacheManager, event: {}", taskEvent);
return;
}
if (!taskExecuteThreadMap.containsKey(taskEvent.getProcessInstanceId())) {
@@ -114,20 +114,24 @@ public class TaskExecuteThreadPool extends ThreadPoolTaskExecutor {
future.addCallback(new ListenableFutureCallback() {
@Override
public void onFailure(Throwable ex) {
- logger.error("handle event {} failed: {}", taskExecuteThread.getProcessInstanceId(), ex);
- if (!processInstanceExecCacheManager.contains(taskExecuteThread.getProcessInstanceId())) {
- taskExecuteThreadMap.remove(taskExecuteThread.getProcessInstanceId());
- logger.info("remove process instance: {}", taskExecuteThread.getProcessInstanceId());
+ Integer processInstanceId = taskExecuteThread.getProcessInstanceId();
+ logger.error("persist event failed processInstanceId: {}", processInstanceId, ex);
+ if (!processInstanceExecCacheManager.contains(processInstanceId)) {
+ taskExecuteThreadMap.remove(processInstanceId);
+ logger.info("Cannot find processInstance from cacheManager, remove process instance from threadMap: {}",
+ processInstanceId);
}
multiThreadFilterMap.remove(taskExecuteThread.getKey());
}
@Override
public void onSuccess(Object result) {
- logger.info("persist events {} succeeded.", taskExecuteThread.getProcessInstanceId());
- if (!processInstanceExecCacheManager.contains(taskExecuteThread.getProcessInstanceId())) {
- taskExecuteThreadMap.remove(taskExecuteThread.getProcessInstanceId());
- logger.info("remove process instance: {}", taskExecuteThread.getProcessInstanceId());
+ Integer processInstanceId = taskExecuteThread.getProcessInstanceId();
+ logger.info("persist events succeeded, processInstanceId: {}", processInstanceId);
+ if (!processInstanceExecCacheManager.contains(processInstanceId)) {
+ taskExecuteThreadMap.remove(processInstanceId);
+ logger.info("Cannot find processInstance from cacheManager, remove process instance from threadMap: {}",
+ processInstanceId);
}
multiThreadFilterMap.remove(taskExecuteThread.getKey());
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
index 993e1df14b..fede5abad2 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
@@ -75,8 +75,8 @@ import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue;
import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.lang.math.NumberUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.math.NumberUtils;
import java.util.ArrayList;
import java.util.Arrays;
@@ -87,6 +87,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -238,12 +239,12 @@ public class WorkflowExecuteThread {
* @param masterConfig masterConfig
* @param stateWheelExecuteThread stateWheelExecuteThread
*/
- public WorkflowExecuteThread(ProcessInstance processInstance
- , ProcessService processService
- , NettyExecutorManager nettyExecutorManager
- , ProcessAlertManager processAlertManager
- , MasterConfig masterConfig
- , StateWheelExecuteThread stateWheelExecuteThread) {
+ public WorkflowExecuteThread(ProcessInstance processInstance,
+ ProcessService processService,
+ NettyExecutorManager nettyExecutorManager,
+ ProcessAlertManager processAlertManager,
+ MasterConfig masterConfig,
+ StateWheelExecuteThread stateWheelExecuteThread) {
this.processService = processService;
this.processInstance = processInstance;
this.masterConfig = masterConfig;
@@ -279,15 +280,14 @@ public class WorkflowExecuteThread {
}
public String getKey() {
- if (StringUtils.isNotEmpty(key)
- || this.processDefinition == null) {
+ if (StringUtils.isNotEmpty(key) || this.processDefinition == null) {
return key;
}
key = String.format("%d_%d_%d",
- this.processDefinition.getCode(),
- this.processDefinition.getVersion(),
- this.processInstance.getId());
+ this.processDefinition.getCode(),
+ this.processDefinition.getVersion(),
+ this.processInstance.getId());
return key;
}
@@ -436,10 +436,10 @@ public class WorkflowExecuteThread {
private void taskFinished(TaskInstance taskInstance) {
logger.info("work flow {} task id:{} code:{} state:{} ",
- processInstance.getId(),
- taskInstance.getId(),
- taskInstance.getTaskCode(),
- taskInstance.getState());
+ processInstance.getId(),
+ taskInstance.getId(),
+ taskInstance.getTaskCode(),
+ taskInstance.getState());
activeTaskProcessorMaps.remove(taskInstance.getTaskCode());
stateWheelExecuteThread.removeTask4TimeoutCheck(processInstance, taskInstance);
@@ -460,7 +460,7 @@ public class WorkflowExecuteThread {
completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
// There are child nodes and the failure policy is: CONTINUE
if (DagHelper.haveAllNodeAfterNode(Long.toString(taskInstance.getTaskCode()), dag)
- && processInstance.getFailureStrategy() == FailureStrategy.CONTINUE) {
+ && processInstance.getFailureStrategy() == FailureStrategy.CONTINUE) {
submitPostNode(Long.toString(taskInstance.getTaskCode()));
} else {
errorTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
@@ -492,8 +492,9 @@ public class WorkflowExecuteThread {
this.stateEvents.add(nextEvent);
} else {
ProcessInstance processInstance = this.processService.findProcessInstanceById(nextTaskInstance.getProcessInstanceId());
- this.processService.sendStartTask2Master(processInstance, nextTaskInstance.getId(),
- org.apache.dolphinscheduler.remote.command.CommandType.TASK_WAKEUP_EVENT_REQUEST);
+ this.processService.sendStartTask2Master(processInstance,
+ nextTaskInstance.getId(),
+ org.apache.dolphinscheduler.remote.command.CommandType.TASK_WAKEUP_EVENT_REQUEST);
}
}
}
@@ -515,7 +516,8 @@ public class WorkflowExecuteThread {
}
waitToRetryTaskInstanceMap.put(newTaskInstance.getTaskCode(), newTaskInstance);
if (!taskInstance.retryTaskIntervalOverTime()) {
- logger.info("failure task will be submitted: process id: {}, task instance code: {} state:{} retry times:{} / {}, interval:{}",
+ logger.info(
+ "failure task will be submitted: process id: {}, task instance code: {} state:{} retry times:{} / {}, interval:{}",
processInstance.getId(),
newTaskInstance.getTaskCode(),
newTaskInstance.getState(),
@@ -552,7 +554,7 @@ public class WorkflowExecuteThread {
logger.info("process instance update: {}", processInstanceId);
processInstance = processService.findProcessInstanceById(processInstanceId);
processDefinition = processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
- processInstance.getProcessDefinitionVersion());
+ processInstance.getProcessDefinitionVersion());
processInstance.setProcessDefinition(processDefinition);
}
@@ -580,9 +582,7 @@ public class WorkflowExecuteThread {
*/
public boolean checkProcessInstance(StateEvent stateEvent) {
if (this.processInstance.getId() != stateEvent.getProcessInstanceId()) {
- logger.error("mismatch process instance id: {}, state event:{}",
- this.processInstance.getId(),
- stateEvent);
+ logger.error("mismatch process instance id: {}, state event:{}", this.processInstance.getId(), stateEvent);
return false;
}
return true;
@@ -742,9 +742,9 @@ public class WorkflowExecuteThread {
return true;
}
logger.info("process complement continue. process id:{}, schedule time:{} complementListDate:{}",
- processInstance.getId(),
- processInstance.getScheduleTime(),
- complementListDate.toString());
+ processInstance.getId(),
+ processInstance.getScheduleTime(),
+ complementListDate.toString());
scheduleDate = complementListDate.get(index + 1);
}
//the next process complement
@@ -783,8 +783,7 @@ public class WorkflowExecuteThread {
}
private boolean needComplementProcess() {
- if (processInstance.isComplementData()
- && Flag.NO == processInstance.getIsSubProcess()) {
+ if (processInstance.isComplementData() && Flag.NO == processInstance.getIsSubProcess()) {
return true;
}
return false;
@@ -863,7 +862,7 @@ public class WorkflowExecuteThread {
return;
}
processDefinition = processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
- processInstance.getProcessDefinitionVersion());
+ processInstance.getProcessDefinitionVersion());
processInstance.setProcessDefinition(processDefinition);
List<TaskInstance> recoverNodeList = getStartTaskInstanceList(processInstance.getCommandParam());
@@ -883,7 +882,9 @@ public class WorkflowExecuteThread {
List<String> recoveryNodeCodeList = getRecoveryNodeCodeList(recoverNodeList);
List<String> startNodeNameList = parseStartNodeName(processInstance.getCommandParam());
ProcessDag processDag = generateFlowDag(taskNodeList,
- startNodeNameList, recoveryNodeCodeList, processInstance.getTaskDependType());
+ startNodeNameList,
+ recoveryNodeCodeList,
+ processInstance.getTaskDependType());
if (processDag == null) {
logger.error("processDag is null");
return;
@@ -955,14 +956,16 @@ public class WorkflowExecuteThread {
if (complementListDate.size() == 0 && needComplementProcess()) {
complementListDate = CronUtils.getSelfFireDateList(start, end, schedules);
logger.info(" process definition code:{} complement data: {}",
- processInstance.getProcessDefinitionCode(), complementListDate.toString());
+ processInstance.getProcessDefinitionCode(),
+ complementListDate.toString());
if (complementListDate.size() > 0 && Flag.NO == processInstance.getIsSubProcess()) {
processInstance.setScheduleTime(complementListDate.get(0));
- processInstance.setGlobalParams(ParameterUtils.curingGlobalParams(
- processDefinition.getGlobalParamMap(),
- processDefinition.getGlobalParamList(),
- CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime(), cmdParam.get(Constants.SCHEDULE_TIMEZONE)));
+ processInstance.setGlobalParams(ParameterUtils.curingGlobalParams(processDefinition.getGlobalParamMap(),
+ processDefinition.getGlobalParamList(),
+ CommandType.COMPLEMENT_DATA,
+ processInstance.getScheduleTime(),
+ cmdParam.get(Constants.SCHEDULE_TIMEZONE)));
processService.updateProcessInstance(processInstance);
}
}
@@ -976,7 +979,7 @@ public class WorkflowExecuteThread {
* @param taskInstance task instance
* @return TaskInstance
*/
- private TaskInstance submitTaskExec(TaskInstance taskInstance) {
+ private Optional<TaskInstance> submitTaskExec(TaskInstance taskInstance) {
try {
// package task instance before submit
processService.packageTaskInstance(taskInstance, processInstance);
@@ -984,17 +987,17 @@ public class WorkflowExecuteThread {
ITaskProcessor taskProcessor = TaskProcessorFactory.getTaskProcessor(taskInstance.getTaskType());
taskProcessor.init(taskInstance, processInstance);
- if (taskInstance.getState() == ExecutionStatus.RUNNING_EXECUTION
- && taskProcessor.getType().equalsIgnoreCase(Constants.COMMON_TASK_TYPE)) {
+ if (taskInstance.getState() == ExecutionStatus.RUNNING_EXECUTION && taskProcessor.getType()
+ .equalsIgnoreCase(Constants.COMMON_TASK_TYPE)) {
notifyProcessHostUpdate(taskInstance);
}
boolean submit = taskProcessor.action(TaskAction.SUBMIT);
if (!submit) {
logger.error("process id:{} name:{} submit standby task id:{} name:{} failed!",
- processInstance.getId(), processInstance.getName(),
- taskInstance.getId(), taskInstance.getName());
- return null;
+ processInstance.getId(), processInstance.getName(),
+ taskInstance.getId(), taskInstance.getName());
+ return Optional.empty();
}
// in a dag, only one taskInstance is valid per taskCode, so need to set the old taskInstance invalid
@@ -1033,10 +1036,10 @@ public class WorkflowExecuteThread {
taskStateChangeEvent.setType(StateEventType.TASK_STATE_CHANGE);
this.stateEvents.add(taskStateChangeEvent);
}
- return taskInstance;
+ return Optional.of(taskInstance);
} catch (Exception e) {
logger.error("submit standby task error", e);
- return null;
+ return Optional.empty();
}
}
@@ -1333,6 +1336,7 @@ public class WorkflowExecuteThread {
for (TaskInstance task : taskInstances) {
if (readyToSubmitTaskQueue.contains(task)) {
+ logger.warn("Task is already at submit queue, taskInstanceId: {}", task.getId());
continue;
}
@@ -1659,11 +1663,12 @@ public class WorkflowExecuteThread {
private void updateProcessInstanceState() {
ExecutionStatus state = getProcessInstanceState(processInstance);
if (processInstance.getState() != state) {
- logger.info(
- "work flow process instance [id: {}, name:{}], state change from {} to {}, cmd type: {}",
- processInstance.getId(), processInstance.getName(),
- processInstance.getState(), state,
- processInstance.getCommandType());
+ logger.info("work flow process instance [id: {}, name:{}], state change from {} to {}, cmd type: {}",
+ processInstance.getId(),
+ processInstance.getName(),
+ processInstance.getState(),
+ state,
+ processInstance.getCommandType());
processInstance.setState(state);
if (state.typeIsFinished()) {
@@ -1687,11 +1692,12 @@ public class WorkflowExecuteThread {
private void updateProcessInstanceState(StateEvent stateEvent) {
ExecutionStatus state = stateEvent.getExecutionStatus();
if (processInstance.getState() != state) {
- logger.info(
- "work flow process instance [id: {}, name:{}], state change from {} to {}, cmd type: {}",
- processInstance.getId(), processInstance.getName(),
- processInstance.getState(), state,
- processInstance.getCommandType());
+ logger.info("work flow process instance [id: {}, name:{}], state change from {} to {}, cmd type: {}",
+ processInstance.getId(),
+ processInstance.getName(),
+ processInstance.getState(),
+ state,
+ processInstance.getCommandType());
processInstance.setState(state);
if (state.typeIsFinished()) {
@@ -1723,7 +1729,9 @@ public class WorkflowExecuteThread {
return;
}
logger.info("add task to stand by list, task name:{}, task id:{}, task code:{}",
- taskInstance.getName(), taskInstance.getId(), taskInstance.getTaskCode());
+ taskInstance.getName(),
+ taskInstance.getId(),
+ taskInstance.getTaskCode());
readyToSubmitTaskQueue.put(taskInstance);
} catch (Exception e) {
logger.error("add task instance to readyToSubmitTaskQueue, taskName:{}, task id:{}", taskInstance.getName(), taskInstance.getId(), e);
@@ -1736,15 +1744,14 @@ public class WorkflowExecuteThread {
* @param taskInstance task instance
*/
private void removeTaskFromStandbyList(TaskInstance taskInstance) {
- logger.info("remove task from stand by list, id: {} name:{}",
- taskInstance.getId(),
- taskInstance.getName());
+ logger.info("remove task from stand by list, id: {} name:{}", taskInstance.getId(), taskInstance.getName());
try {
readyToSubmitTaskQueue.remove(taskInstance);
} catch (Exception e) {
logger.error("remove task instance from readyToSubmitTaskQueue error, task id:{}, Name: {}",
- taskInstance.getId(),
- taskInstance.getName(), e);
+ taskInstance.getId(),
+ taskInstance.getName(),
+ e);
}
}
@@ -1766,8 +1773,9 @@ public class WorkflowExecuteThread {
* close the on going tasks
*/
private void killAllTasks() {
- logger.info("kill called on process instance id: {}, num: {}", processInstance.getId(),
- activeTaskProcessorMaps.size());
+ logger.info("kill called on process instance id: {}, num: {}",
+ processInstance.getId(),
+ activeTaskProcessorMaps.size());
if (readyToSubmitTaskQueue.size() > 0) {
readyToSubmitTaskQueue.clear();
@@ -1831,14 +1839,16 @@ public class WorkflowExecuteThread {
}
DependResult dependResult = getDependResultForTask(task);
if (DependResult.SUCCESS == dependResult) {
- TaskInstance taskInstance = submitTaskExec(task);
- if (taskInstance == null) {
+ Optional<TaskInstance> taskInstanceOptional = submitTaskExec(task);
+ if (!taskInstanceOptional.isPresent()) {
this.taskFailedSubmit = true;
// Remove and add to complete map and error map
removeTaskFromStandbyList(task);
completeTaskMap.put(task.getTaskCode(), task.getId());
errorTaskMap.put(task.getTaskCode(), task.getId());
- logger.error("process {}, task {}, code:{} submit task failed.", task.getProcessInstanceId(), task.getName(), task.getTaskCode());
+ logger.error("Task submitted failed, processInstanceId: {}, taskInstanceId: {}",
+ task.getProcessInstanceId(),
+ task.getId());
} else {
removeTaskFromStandbyList(task);
}
@@ -1846,11 +1856,15 @@ public class WorkflowExecuteThread {
// if the dependency fails, the current node is not submitted and the state changes to failure.
dependFailedTaskMap.put(task.getTaskCode(), task.getId());
removeTaskFromStandbyList(task);
- logger.info("task {},id:{} depend result : {}", task.getName(), task.getId(), dependResult);
+ logger.info("Task dependent result is failed, taskInstanceId:{} depend result : {}",
+ task.getId(),
+ dependResult);
} else if (DependResult.NON_EXEC == dependResult) {
// for some reasons(depend task pause/stop) this task would not be submit
removeTaskFromStandbyList(task);
- logger.info("remove task {},id:{} , because depend result : {}", task.getName(), task.getId(), dependResult);
+ logger.info("Remove task due to depend result not executed, taskInstanceId:{} depend result : {}",
+ task.getId(),
+ dependResult);
}
}
} catch (Exception e) {
@@ -2009,4 +2023,30 @@ public class WorkflowExecuteThread {
}
}
}
+
}
+
+ private void measureTaskState(StateEvent taskStateEvent) {
+ if (taskStateEvent == null || taskStateEvent.getExecutionStatus() == null) {
+ // the event is broken
+ logger.warn("The task event is broken..., taskEvent: {}", taskStateEvent);
+ return;
+ }
+ if (taskStateEvent.getExecutionStatus().typeIsFinished()) {
+ TaskMetrics.incTaskFinish();
+ }
+ switch (taskStateEvent.getExecutionStatus()) {
+ case STOP:
+ TaskMetrics.incTaskStop();
+ break;
+ case SUCCESS:
+ TaskMetrics.incTaskSuccess();
+ break;
+ case FAILURE:
+ TaskMetrics.incTaskFailure();
+ break;
+ default:
+ break;
+ }
+ }
+}
\ No newline at end of file
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
index cd337b4c95..556f36a1b7 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
@@ -100,7 +100,7 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
/**
* execute workflow
*/
- public void executeEvent(WorkflowExecuteThread workflowExecuteThread) {
+ public void executeEvent(final WorkflowExecuteRunnable workflowExecuteThread) {
if (!workflowExecuteThread.isStart() || workflowExecuteThread.eventSize() == 0) {
return;
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
index ffeb89a0d2..81e3070089 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
@@ -110,7 +110,7 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
logger.info("submit task, but the status of the task {} is already running or delayed.", taskInstance.getName());
return true;
}
- logger.info("task ready to submit: {}", taskInstance);
+ logger.info("task ready to submit: taskInstanceId: {}", taskInstance.getId());
TaskPriority taskPriority = new TaskPriority(processInstance.getProcessInstancePriority().getCode(),
processInstance.getId(), taskInstance.getProcessInstancePriority().getCode(),
@@ -125,7 +125,7 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
taskPriority.setTaskExecutionContext(taskExecutionContext);
taskUpdateQueue.put(taskPriority);
- logger.info(String.format("master submit success, task : %s", taskInstance.getName()));
+ logger.info("Master submit task to priority queue success, taskInstanceId : {}", taskInstance.getId());
return true;
} catch (Exception e) {
logger.error("submit task error", e);
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java
index 0129338649..d5bd131a10 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java
@@ -21,36 +21,47 @@ import static org.apache.dolphinscheduler.common.Constants.COMMON_TASK_TYPE;
import org.apache.commons.lang3.StringUtils;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
import java.util.Map;
-import java.util.Objects;
import java.util.ServiceLoader;
import java.util.concurrent.ConcurrentHashMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* the factory to create task processor
*/
public class TaskProcessorFactory {
- public static final Map<String, ITaskProcessor> PROCESS_MAP = new ConcurrentHashMap<>();
+ private static final Logger logger = LoggerFactory.getLogger(TaskProcessorFactory.class);
+
+ public static final Map<String, Constructor<ITaskProcessor>> PROCESS_MAP = new ConcurrentHashMap<>();
private static final String DEFAULT_PROCESSOR = COMMON_TASK_TYPE;
static {
for (ITaskProcessor iTaskProcessor : ServiceLoader.load(ITaskProcessor.class)) {
- PROCESS_MAP.put(iTaskProcessor.getType(), iTaskProcessor);
+ try {
+ PROCESS_MAP.put(iTaskProcessor.getType(), (Constructor<ITaskProcessor>) iTaskProcessor.getClass().getConstructor());
+ } catch (NoSuchMethodException e) {
+ throw new IllegalArgumentException("The task processor should has a no args constructor");
+ }
}
}
- public static ITaskProcessor getTaskProcessor(String type) throws InstantiationException, IllegalAccessException {
+ public static ITaskProcessor getTaskProcessor(String type) throws InvocationTargetException, InstantiationException, IllegalAccessException {
if (StringUtils.isEmpty(type)) {
type = DEFAULT_PROCESSOR;
}
- ITaskProcessor iTaskProcessor = PROCESS_MAP.get(type);
- if (Objects.isNull(iTaskProcessor)) {
- iTaskProcessor = PROCESS_MAP.get(DEFAULT_PROCESSOR);
+ Constructor<ITaskProcessor> iTaskProcessorConstructor = PROCESS_MAP.get(type);
+ if (iTaskProcessorConstructor == null) {
+ logger.warn("ITaskProcessor could not found for taskType: {}", type);
+ iTaskProcessorConstructor = PROCESS_MAP.get(DEFAULT_PROCESSOR);
}
- return iTaskProcessor.getClass().newInstance();
+ return iTaskProcessorConstructor.newInstance();
}
/**
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java
index d0371809cc..b974a40b31 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java
@@ -19,6 +19,8 @@ package org.apache.dolphinscheduler.server.master.runner.task;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import java.lang.reflect.InvocationTargetException;
+
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
@@ -27,7 +29,7 @@ import org.junit.Test;
public class TaskProcessorFactoryTest {
@Test
- public void testFactory() throws InstantiationException, IllegalAccessException {
+ public void testFactory() throws InvocationTargetException, InstantiationException, IllegalAccessException {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setTaskType("shell");
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/ChannelUtils.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/ChannelUtils.java
index 239a3993c0..b4177ec25d 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/ChannelUtils.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/ChannelUtils.java
@@ -21,6 +21,9 @@ import org.apache.dolphinscheduler.common.utils.NetUtils;
import java.net.InetSocketAddress;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import io.netty.channel.Channel;
/**
@@ -28,6 +31,8 @@ import io.netty.channel.Channel;
*/
public class ChannelUtils {
+ private static final Logger LOGGER = LoggerFactory.getLogger(ChannelUtils.class);
+
private ChannelUtils() {
throw new IllegalStateException(ChannelUtils.class.getName());
}
@@ -49,7 +54,7 @@ public class ChannelUtils {
* @return remote address
*/
public static String getRemoteAddress(Channel channel) {
- return NetUtils.getHost(((InetSocketAddress) channel.remoteAddress()).getAddress());
+ return toAddress(channel).getAddress();
}
/**
@@ -60,6 +65,11 @@ public class ChannelUtils {
*/
public static Host toAddress(Channel channel) {
InetSocketAddress socketAddress = ((InetSocketAddress) channel.remoteAddress());
+ if (socketAddress == null) {
+ // the remote channel already closed
+ LOGGER.warn("The channel is already closed");
+ return Host.EMPTY;
+ }
return new Host(NetUtils.getHost(socketAddress.getAddress()), socketAddress.getPort());
}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java
index 2163e9c7d8..dc8e1f0d36 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java
@@ -27,6 +27,8 @@ import java.util.Objects;
*/
public class Host implements Serializable {
+ public static final Host EMPTY = new Host();
+
/**
* address
*/
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index 48bb9c624d..eba7ced00a 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -748,7 +748,8 @@ public class ProcessServiceImpl implements ProcessService {
processInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
processInstance.setRecovery(Flag.NO);
processInstance.setStartTime(new Date());
- processInstance.setRestartTime(processInstance.getStartTime());
+ // the new process instance restart time is null.
+ processInstance.setRestartTime(null);
processInstance.setRunTimes(1);
processInstance.setMaxTryTimes(0);
processInstance.setCommandParam(command.getCommandParam());
@@ -1266,7 +1267,7 @@ public class ProcessServiceImpl implements ProcessService {
@Override
@Transactional(rollbackFor = Exception.class)
public TaskInstance submitTask(ProcessInstance processInstance, TaskInstance taskInstance) {
- logger.info("start submit task : {}, instance id:{}, state: {}",
+ logger.info("start submit task : {}, processInstance id:{}, state: {}",
taskInstance.getName(), taskInstance.getProcessInstanceId(), processInstance.getState());
//submit to db
TaskInstance task = submitTaskInstanceToDB(taskInstance, processInstance);
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java
index 231fd2a20f..2e939ee332 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java
@@ -21,11 +21,13 @@ import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException;
+import java.util.Collections;
import java.util.Comparator;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.PriorityQueue;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantLock;
/**
* Task instances priority queue implementation
@@ -40,12 +42,8 @@ public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue<TaskInst
/**
* queue
*/
- private PriorityQueue<TaskInstance> queue = new PriorityQueue<>(QUEUE_MAX_SIZE, new TaskInfoComparator());
-
- /**
- * Lock used for all public operations
- */
- private final ReentrantLock lock = new ReentrantLock(true);
+ private final PriorityQueue<TaskInstance> queue = new PriorityQueue<>(QUEUE_MAX_SIZE, new TaskInfoComparator());
+ private final Set<Integer> taskInstanceIdSet = Collections.synchronizedSet(new HashSet<>());
/**
* put task instance to priority queue
@@ -56,6 +54,7 @@ public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue<TaskInst
@Override
public void put(TaskInstance taskInstance) throws TaskPriorityQueueException {
queue.add(taskInstance);
+ taskInstanceIdSet.add(taskInstance.getId());
}
/**
@@ -66,7 +65,11 @@ public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue<TaskInst
*/
@Override
public TaskInstance take() throws TaskPriorityQueueException {
- return queue.poll();
+ TaskInstance taskInstance = queue.poll();
+ if (taskInstance != null) {
+ taskInstanceIdSet.remove(taskInstance.getId());
+ }
+ return taskInstance;
}
/**
@@ -111,6 +114,7 @@ public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue<TaskInst
*/
public void clear() {
queue.clear();
+ taskInstanceIdSet.clear();
}
/**
@@ -120,20 +124,11 @@ public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue<TaskInst
* @return true is contains
*/
public boolean contains(TaskInstance taskInstance) {
- return this.contains(taskInstance.getTaskCode(), taskInstance.getTaskDefinitionVersion());
+ return this.contains(taskInstance.getId());
}
- public boolean contains(long taskCode, int taskVersion) {
- Iterator<TaskInstance> iterator = this.queue.iterator();
- while (iterator.hasNext()) {
- TaskInstance taskInstance = iterator.next();
- if (taskCode == taskInstance.getTaskCode()
- && taskVersion == taskInstance.getTaskDefinitionVersion()) {
- return true;
- }
- }
- return false;
-
+ public boolean contains(int taskInstanceId) {
+ return taskInstanceIdSet.contains(taskInstanceId);
}
/**
diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java
index 8da3a6c194..67e40d1189 100644
--- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java
+++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java
@@ -46,14 +46,11 @@ public class PeerTaskInstancePriorityQueueTest {
Assert.assertTrue(queue.size() < peekBeforeLength);
}
- @Test
+
+ @Test(expected = TaskPriorityQueueException.class)
public void poll() throws Exception {
PeerTaskInstancePriorityQueue queue = getPeerTaskInstancePriorityQueue();
- try {
- queue.poll(1000, TimeUnit.MILLISECONDS);
- } catch (TaskPriorityQueueException e) {
- e.printStackTrace();
- }
+ queue.poll(1000, TimeUnit.MILLISECONDS);
}
@Test
diff --git a/dolphinscheduler-standalone-server/src/main/resources/application.yaml b/dolphinscheduler-standalone-server/src/main/resources/application.yaml
index c50883da62..7be661de4e 100644
--- a/dolphinscheduler-standalone-server/src/main/resources/application.yaml
+++ b/dolphinscheduler-standalone-server/src/main/resources/application.yaml
@@ -109,7 +109,7 @@ master:
# master prepare execute thread number to limit handle commands in parallel
pre-exec-threads: 10
# master execute thread number to limit process instances in parallel
- exec-threads: 100
+ exec-threads: 10
# master dispatch task number per batch
dispatch-task-number: 3
# master host selector to select a suitable worker, default value: LowerWeight. Optional values include random, round_robin, lower_weight
@@ -134,7 +134,7 @@ worker:
# worker listener port
listen-port: 1234
# worker execute thread number to limit task instances in parallel
- exec-threads: 100
+ exec-threads: 10
# worker heartbeat interval, the unit is second
heartbeat-interval: 10
# worker host weight to dispatch tasks, default value 100
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java
index 737b876b74..9d084d0ec8 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java
@@ -34,6 +34,7 @@ import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.commons.collections4.MapUtils;
import java.io.File;
+import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
@@ -147,7 +148,11 @@ public class ShellTask extends AbstractTaskExecutor {
if (!file.getParentFile().exists()) {
file.getParentFile().mkdirs();
}
- Files.createFile(path, attr);
+ try {
+ Files.createFile(path, attr);
+ } catch (FileAlreadyExistsException ex) {
+ // this is expected
+ }
}
Files.write(path, shellParameters.getRawScript().getBytes(), StandardOpenOption.APPEND);
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
index 5192e27d61..6edff09f97 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
@@ -21,9 +21,9 @@ import java.util.Set;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.stereotype.Component;
+import org.springframework.context.annotation.Configuration;
-@Component
+@Configuration
@EnableConfigurationProperties
@ConfigurationProperties("worker")
public class WorkerConfig {
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
index 3641de8453..2a92222b0a 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
@@ -162,6 +162,8 @@ public class TaskCallbackService {
}
}
});
+ } else {
+ logger.warn("Remote channel of taskInstanceId is null: {}, cannot send command: {}", taskInstanceId, command);
}
}
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
index cc15eb62f4..e6ff2b5653 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
@@ -19,12 +19,15 @@ package org.apache.dolphinscheduler.server.worker.runner;
import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
-import org.apache.commons.lang3.tuple.Pair;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.exception.StorageOperateNoConfiguredException;
import org.apache.dolphinscheduler.common.storage.StorageOperate;
-import org.apache.dolphinscheduler.common.utils.*;
+import org.apache.dolphinscheduler.common.utils.CommonUtils;
+import org.apache.dolphinscheduler.common.utils.DateUtils;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.LoggerUtils;
+import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
@@ -40,10 +43,17 @@ import org.apache.dolphinscheduler.service.task.TaskPluginManager;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
import java.io.File;
import java.io.IOException;
-import java.util.*;
+import java.nio.file.NoSuchFileException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -233,7 +243,11 @@ public class TaskExecuteThread implements Runnable, Delayed {
org.apache.commons.io.FileUtils.deleteDirectory(new File(execLocalPath));
logger.info("exec local path: {} cleared.", execLocalPath);
} catch (IOException e) {
- logger.error("delete exec dir failed : {}", e.getMessage(), e);
+ if (e instanceof NoSuchFileException) {
+ // this is expected
+ } else {
+ logger.error("Delete exec dir failed.", e);
+ }
}
}
}
@@ -264,7 +278,7 @@ public class TaskExecuteThread implements Runnable, Delayed {
task.cancelApplication(true);
ProcessUtils.killYarnJob(taskExecutionContext);
} catch (Exception e) {
- logger.error(e.getMessage(), e);
+ logger.error("Kill task failed", e);
}
}
}