You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2022/01/11 14:51:15 UTC
[dolphinscheduler] branch 2.0.3-prepare updated: [cherry-pick-2.0.3][Improvement][TaskLog] Unified task log #7831 (#7926)
This is an automated email from the ASF dual-hosted git repository.
caishunfeng pushed a commit to branch 2.0.3-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/2.0.3-prepare by this push:
new 46a5d17 [cherry-pick-2.0.3][Improvement][TaskLog] Unified task log #7831 (#7926)
46a5d17 is described below
commit 46a5d17bf673427c8ecc1bfce9864dbc7625b269
Author: wind <ca...@users.noreply.github.com>
AuthorDate: Tue Jan 11 22:51:00 2022 +0800
[cherry-pick-2.0.3][Improvement][TaskLog] Unified task log #7831 (#7926)
* [cherry-pick][Improvement][TaskLog] Unified task log #7831
* [cherry-pick][Improvement][TaskLog] Unified task log #7831
* fix thread name
Co-authored-by: caishunfeng <53...@qq.com>
---
.../master/runner/WorkflowExecuteThread.java | 7 +-
.../master/runner/task/BaseTaskProcessor.java | 82 ++++++++++++++++++----
.../master/runner/task/CommonTaskProcessor.java | 20 ++----
.../master/runner/task/ConditionTaskProcessor.java | 29 ++------
.../master/runner/task/DependentTaskProcessor.java | 17 ++---
.../server/master/runner/task/ITaskProcessor.java | 4 +-
.../master/runner/task/SubTaskProcessor.java | 20 +++---
.../master/runner/task/SwitchTaskProcessor.java | 19 ++---
.../server/master/runner/task/TaskAction.java | 4 +-
9 files changed, 108 insertions(+), 94 deletions(-)
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
index 5988839..7907686 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
@@ -359,7 +359,7 @@ public class WorkflowExecuteThread implements Runnable {
taskFinished(task);
} else if (activeTaskProcessorMaps.containsKey(stateEvent.getTaskInstanceId())) {
ITaskProcessor iTaskProcessor = activeTaskProcessorMaps.get(stateEvent.getTaskInstanceId());
- iTaskProcessor.run();
+ iTaskProcessor.action(TaskAction.RUN);
if (iTaskProcessor.taskState().typeIsFinished()) {
task = processService.findTaskInstanceById(stateEvent.getTaskInstanceId());
@@ -630,11 +630,12 @@ public class WorkflowExecuteThread implements Runnable {
&& taskProcessor.getType().equalsIgnoreCase(Constants.COMMON_TASK_TYPE)) {
notifyProcessHostUpdate(taskInstance);
}
- boolean submit = taskProcessor.submit(taskInstance, processInstance, masterConfig.getMasterTaskCommitRetryTimes(), masterConfig.getMasterTaskCommitInterval());
+ taskProcessor.init(taskInstance, processInstance);
+ boolean submit = taskProcessor.action(TaskAction.SUBMIT);
if (submit) {
this.taskInstanceHashMap.put(taskInstance.getId(), taskInstance.getTaskCode(), taskInstance);
activeTaskProcessorMaps.put(taskInstance.getId(), taskProcessor);
- taskProcessor.run();
+ taskProcessor.action(TaskAction.RUN);
addTimeoutCheck(taskInstance);
addRetryCheck(taskInstance);
TaskDefinition taskDefinition = processService.findTaskDefinition(
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
index 4446485..508feb5 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
@@ -31,6 +31,7 @@ import org.apache.dolphinscheduler.common.task.sqoop.sources.SourceMysqlParamete
import org.apache.dolphinscheduler.common.task.sqoop.targets.TargetMysqlParameter;
import org.apache.dolphinscheduler.common.utils.HadoopUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.TaskParametersUtils;
import org.apache.dolphinscheduler.dao.entity.DataSource;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
@@ -39,6 +40,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.UdfFunc;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
+import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
@@ -68,7 +70,7 @@ import com.google.common.base.Strings;
public abstract class BaseTaskProcessor implements ITaskProcessor {
- protected final Logger logger = LoggerFactory.getLogger(String.format(TaskConstants.TASK_LOG_LOGGER_NAME, getClass()));
+ protected final Logger logger = LoggerFactory.getLogger(String.format(TaskConstants.TASK_LOG_LOGGER_NAME_FORMAT, getClass()));
protected boolean killed = false;
@@ -78,10 +80,32 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
protected TaskInstance taskInstance = null;
- protected ProcessInstance processInstance;
+ protected ProcessInstance processInstance = null;
+
+ protected int maxRetryTimes;
+
+ protected int commitInterval;
protected ProcessService processService = SpringApplicationContext.getBean(ProcessService.class);
+ protected MasterConfig masterConfig = SpringApplicationContext.getBean(MasterConfig.class);
+
+ protected String threadLoggerInfoName;
+
+ @Override
+ public void init(TaskInstance taskInstance, ProcessInstance processInstance) {
+ if (processService == null) {
+ processService = SpringApplicationContext.getBean(ProcessService.class);
+ }
+ if (masterConfig == null) {
+ masterConfig = SpringApplicationContext.getBean(MasterConfig.class);
+ }
+ this.taskInstance = taskInstance;
+ this.processInstance = processInstance;
+ this.maxRetryTimes = masterConfig.getMasterTaskCommitRetryTimes();
+ this.commitInterval = masterConfig.getMasterTaskCommitInterval();
+ }
+
/**
* persist task
*
@@ -91,21 +115,16 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
/**
* pause task, common tasks donot need this.
- *
- * @return
*/
protected abstract boolean pauseTask();
/**
* kill task, all tasks need to realize this function
- *
- * @return
*/
protected abstract boolean killTask();
/**
* task timeout process
- * @return
*/
protected abstract boolean taskTimeout();
@@ -119,12 +138,22 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
return persistTask(taskAction);
}
- @Override
- public void run() {
- }
+ /*
+ * submit task
+ */
+ protected abstract boolean submitTask();
+
+ /**
+ * run task
+ */
+ protected abstract boolean runTask();
@Override
public boolean action(TaskAction taskAction) {
+ String threadName = Thread.currentThread().getName();
+ if (StringUtils.isNotEmpty(threadLoggerInfoName)) {
+ Thread.currentThread().setName(String.format(TaskConstants.TASK_LOGGER_THREAD_NAME_FORMAT, threadLoggerInfoName));
+ }
switch (taskAction) {
case STOP:
@@ -133,13 +162,27 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
return pause();
case TIMEOUT:
return timeout();
+ case SUBMIT:
+ return submit();
+ case RUN:
+ return run();
default:
logger.error("unknown task action: {}", taskAction.toString());
-
}
+
+ // reset thread name
+ Thread.currentThread().setName(threadName);
return false;
}
+ protected boolean submit() {
+ return submitTask();
+ }
+
+ protected boolean run() {
+ return runTask();
+ }
+
protected boolean timeout() {
if (timeout) {
return true;
@@ -148,9 +191,6 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
return timeout;
}
- /**
- * @return
- */
protected boolean pause() {
if (paused) {
return true;
@@ -173,6 +213,18 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
}
/**
+ * set master task running logger.
+ */
+ public void setTaskExecutionLogger() {
+ threadLoggerInfoName = LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
+ processInstance.getProcessDefinitionCode(),
+ processInstance.getProcessDefinitionVersion(),
+ taskInstance.getProcessInstanceId(),
+ taskInstance.getId());
+ Thread.currentThread().setName(String.format(TaskConstants.TASK_LOGGER_THREAD_NAME_FORMAT, threadLoggerInfoName));
+ }
+
+ /**
* get TaskExecutionContext
*
* @param taskInstance taskInstance
@@ -321,7 +373,7 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
// whether udf type
boolean udfTypeFlag = Enums.getIfPresent(UdfType.class, Strings.nullToEmpty(sqlParameters.getType())).isPresent()
- && !StringUtils.isEmpty(sqlParameters.getUdfs());
+ && !StringUtils.isEmpty(sqlParameters.getUdfs());
if (udfTypeFlag) {
String[] udfFunIds = sqlParameters.getUdfs().split(",");
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
index 14bb3af..ca78ea3 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
@@ -24,7 +24,6 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand;
import org.apache.dolphinscheduler.remote.utils.Host;
-import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
@@ -39,8 +38,6 @@ import org.apache.commons.lang.StringUtils;
import java.util.Date;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
/**
@@ -52,24 +49,16 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
private TaskPriorityQueue taskUpdateQueue;
@Autowired
- MasterConfig masterConfig;
-
- @Autowired
NettyExecutorManager nettyExecutorManager = SpringApplicationContext.getBean(NettyExecutorManager.class);
- /**
- * logger of MasterBaseTaskExecThread
- */
- protected Logger logger = LoggerFactory.getLogger(getClass());
-
@Override
- public boolean submit(TaskInstance task, ProcessInstance processInstance, int maxRetryTimes, int commitInterval) {
- this.processInstance = processInstance;
- this.taskInstance = processService.submitTask(task, maxRetryTimes, commitInterval);
+ public boolean submitTask() {
+ this.taskInstance = processService.submitTask(taskInstance, maxRetryTimes, commitInterval);
if (this.taskInstance == null) {
return false;
}
+ setTaskExecutionLogger();
return dispatchTask(taskInstance, processInstance);
}
@@ -79,7 +68,8 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
}
@Override
- public void run() {
+ public boolean runTask() {
+ return true;
}
@Override
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
index 584e484..c303a80 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
@@ -26,15 +26,10 @@ import org.apache.dolphinscheduler.common.model.DependentItem;
import org.apache.dolphinscheduler.common.model.DependentTaskModel;
import org.apache.dolphinscheduler.common.task.dependent.DependentParameters;
import org.apache.dolphinscheduler.common.utils.DependentUtils;
-import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
-import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.utils.LogUtils;
-import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
-import org.apache.dolphinscheduler.spi.task.TaskConstants;
import java.util.ArrayList;
import java.util.Date;
@@ -42,23 +37,16 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
/**
* condition task processor
*/
public class ConditionTaskProcessor extends BaseTaskProcessor {
- protected static final Logger logger = LoggerFactory.getLogger(TaskConstants.TASK_LOG_LOGGER_NAME);
-
/**
* dependent parameters
*/
private DependentParameters dependentParameters;
- ProcessInstance processInstance;
-
/**
* condition result
*/
@@ -69,14 +57,11 @@ public class ConditionTaskProcessor extends BaseTaskProcessor {
*/
private Map<Long, ExecutionStatus> completeTaskList = new ConcurrentHashMap<>();
- MasterConfig masterConfig = SpringApplicationContext.getBean(MasterConfig.class);
-
private TaskDefinition taskDefinition;
@Override
- public boolean submit(TaskInstance task, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval) {
- this.processInstance = processInstance;
- this.taskInstance = processService.submitTask(task, masterTaskCommitRetryTimes, masterTaskCommitInterval);
+ public boolean submitTask() {
+ this.taskInstance = processService.submitTask(taskInstance, maxRetryTimes, commitInterval);
if (this.taskInstance == null) {
return false;
@@ -85,12 +70,7 @@ public class ConditionTaskProcessor extends BaseTaskProcessor {
taskInstance.getTaskCode(), taskInstance.getTaskDefinitionVersion()
);
- String threadLoggerInfoName = LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
- processInstance.getProcessDefinitionCode(),
- processInstance.getProcessDefinitionVersion(),
- taskInstance.getProcessInstanceId(),
- taskInstance.getId());
- Thread.currentThread().setName(String.format(TaskConstants.TASK_LOGGER_THREAD_NAME_FORMAT,threadLoggerInfoName));
+ setTaskExecutionLogger();
initTaskParameters();
logger.info("dependent task start");
return true;
@@ -102,13 +82,14 @@ public class ConditionTaskProcessor extends BaseTaskProcessor {
}
@Override
- public void run() {
+ public boolean runTask() {
if (conditionResult.equals(DependResult.WAITING)) {
setConditionResult();
endTask();
} else {
endTask();
}
+ return true;
}
@Override
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
index 0f84a5f..100c75f 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
@@ -27,13 +27,9 @@ import org.apache.dolphinscheduler.common.model.DependentTaskModel;
import org.apache.dolphinscheduler.common.task.dependent.DependentParameters;
import org.apache.dolphinscheduler.common.utils.DependentUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
-import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.utils.DependentExecute;
import org.apache.dolphinscheduler.server.utils.LogUtils;
-import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import java.util.ArrayList;
import java.util.Date;
@@ -69,18 +65,13 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
DependResult result;
- ProcessInstance processInstance;
TaskDefinition taskDefinition;
- MasterConfig masterConfig = SpringApplicationContext.getBean(MasterConfig.class);
-
boolean allDependentItemFinished;
@Override
- public boolean submit(TaskInstance task, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval) {
- this.processInstance = processInstance;
- this.taskInstance = task;
- this.taskInstance = processService.submitTask(task, masterTaskCommitRetryTimes, masterTaskCommitInterval);
+ public boolean submitTask() {
+ this.taskInstance = processService.submitTask(taskInstance, maxRetryTimes, commitInterval);
if (this.taskInstance == null) {
return false;
@@ -88,6 +79,7 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
taskDefinition = processService.findTaskDefinition(
taskInstance.getTaskCode(), taskInstance.getTaskDefinitionVersion()
);
+ setTaskExecutionLogger();
taskInstance.setLogPath(LogUtils.getTaskLogPath(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion(),
taskInstance.getProcessInstanceId(),
@@ -106,7 +98,7 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
}
@Override
- public void run() {
+ public boolean runTask() {
if (!allDependentItemFinished) {
allDependentItemFinished = allDependentTaskFinish();
}
@@ -114,6 +106,7 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
getTaskDependResult();
endTask();
}
+ return true;
}
@Override
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessor.java
index aa1e490..41fb0d0 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessor.java
@@ -28,14 +28,12 @@ public interface ITaskProcessor {
boolean persist(TaskAction taskAction);
- void run();
+ void init(TaskInstance taskInstance, ProcessInstance processInstance);
boolean action(TaskAction taskAction);
String getType();
- boolean submit(TaskInstance taskInstance, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval);
-
ExecutionStatus taskState();
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
index 02f08a8..16b21f0 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
@@ -22,9 +22,9 @@ import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
+import org.apache.dolphinscheduler.server.utils.LogUtils;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import java.util.Date;
@@ -36,8 +36,6 @@ import java.util.concurrent.locks.ReentrantLock;
*/
public class SubTaskProcessor extends BaseTaskProcessor {
- private ProcessInstance processInstance;
-
private ProcessInstance subProcessInstance = null;
private TaskDefinition taskDefinition;
@@ -49,17 +47,22 @@ public class SubTaskProcessor extends BaseTaskProcessor {
private StateEventCallbackService stateEventCallbackService = SpringApplicationContext.getBean(StateEventCallbackService.class);
@Override
- public boolean submit(TaskInstance task, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval) {
- this.processInstance = processInstance;
+ public boolean submitTask() {
taskDefinition = processService.findTaskDefinition(
- task.getTaskCode(), task.getTaskDefinitionVersion()
+ taskInstance.getTaskCode(), taskInstance.getTaskDefinitionVersion()
);
- this.taskInstance = processService.submitTask(task, masterTaskCommitRetryTimes, masterTaskCommitInterval);
+ this.taskInstance = processService.submitTask(taskInstance, maxRetryTimes, commitInterval);
if (this.taskInstance == null) {
return false;
}
+ setTaskExecutionLogger();
+ taskInstance.setLogPath(LogUtils.getTaskLogPath(processInstance.getProcessDefinitionCode(),
+ processInstance.getProcessDefinitionVersion(),
+ taskInstance.getProcessInstanceId(),
+ taskInstance.getId()));
+
return true;
}
@@ -69,7 +72,7 @@ public class SubTaskProcessor extends BaseTaskProcessor {
}
@Override
- public void run() {
+ public boolean runTask() {
try {
this.runLock.lock();
if (setSubWorkFlow()) {
@@ -83,6 +86,7 @@ public class SubTaskProcessor extends BaseTaskProcessor {
} finally {
this.runLock.unlock();
}
+ return true;
}
@Override
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
index c48a711..7134ff0 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
@@ -25,13 +25,10 @@ import org.apache.dolphinscheduler.common.task.switchtask.SwitchParameters;
import org.apache.dolphinscheduler.common.task.switchtask.SwitchResultVo;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
-import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.utils.LogUtils;
import org.apache.dolphinscheduler.server.utils.SwitchTaskUtils;
-import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang.StringUtils;
@@ -48,27 +45,22 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
protected final String rgex = "['\"]*\\$\\{(.*?)\\}['\"]*";
- private TaskInstance taskInstance;
-
- private ProcessInstance processInstance;
TaskDefinition taskDefinition;
- MasterConfig masterConfig = SpringApplicationContext.getBean(MasterConfig.class);
-
/**
* switch result
*/
private DependResult conditionResult;
@Override
- public boolean submit(TaskInstance taskInstance, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval) {
-
- this.processInstance = processInstance;
- this.taskInstance = processService.submitTask(taskInstance, masterTaskCommitRetryTimes, masterTaskCommitInterval);
+ public boolean submitTask() {
+ this.taskInstance = processService.submitTask(taskInstance, maxRetryTimes, commitInterval);
if (this.taskInstance == null) {
return false;
}
+ setTaskExecutionLogger();
+
taskDefinition = processService.findTaskDefinition(
taskInstance.getTaskCode(), taskInstance.getTaskDefinitionVersion()
);
@@ -84,7 +76,7 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
}
@Override
- public void run() {
+ public boolean runTask() {
try {
if (!this.taskState().typeIsFinished() && setSwitchResult()) {
endTaskState();
@@ -95,6 +87,7 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
this.taskInstance.getId(),
e);
}
+ return true;
}
@Override
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskAction.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskAction.java
index 42c8846..0c4db76 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskAction.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskAction.java
@@ -23,5 +23,7 @@ package org.apache.dolphinscheduler.server.master.runner.task;
public enum TaskAction {
PAUSE,
STOP,
- TIMEOUT
+ TIMEOUT,
+ SUBMIT,
+ RUN
}