You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2022/01/04 05:38:38 UTC
[dolphinscheduler] branch dev updated: [Implement][MasterServer]TaskProcessor code optimization (#7754)
This is an automated email from the ASF dual-hosted git repository.
caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 4b22ad6 [Implement][MasterServer]TaskProcessor code optimization (#7754)
4b22ad6 is described below
commit 4b22ad6cf6c584a35b01bc9d3d548419dda676a2
Author: wind <ca...@users.noreply.github.com>
AuthorDate: Tue Jan 4 13:38:31 2022 +0800
[Implement][MasterServer]TaskProcessor code optimization (#7754)
* task processor optimization
* fix test
Co-authored-by: caishunfeng <53...@qq.com>
---
.../master/runner/WorkflowExecuteThread.java | 20 ++++---
.../master/runner/task/BaseTaskProcessor.java | 70 ++++++++++++++++++----
.../runner/task/CommonTaskProcessFactory.java | 36 -----------
.../master/runner/task/CommonTaskProcessor.java | 43 +++++--------
.../runner/task/ConditionTaskProcessFactory.java | 35 -----------
.../master/runner/task/ConditionTaskProcessor.java | 30 ++++------
.../runner/task/DependentTaskProcessFactory.java | 36 -----------
.../master/runner/task/DependentTaskProcessor.java | 32 ++++------
.../master/runner/task/ITaskProcessFactory.java | 25 --------
.../server/master/runner/task/ITaskProcessor.java | 6 +-
.../master/runner/task/SubTaskProcessFactory.java | 35 -----------
.../master/runner/task/SubTaskProcessor.java | 27 +++++----
.../runner/task/SwitchTaskProcessFactory.java | 36 -----------
.../master/runner/task/SwitchTaskProcessor.java | 27 ++++-----
.../server/master/runner/task/TaskAction.java | 7 ++-
.../master/runner/task/TaskProcessorFactory.java | 16 ++---
.../server/master/WorkflowExecuteThreadTest.java | 12 +++-
.../runner/task/TaskProcessorFactoryTest.java | 2 +-
18 files changed, 164 insertions(+), 331 deletions(-)
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
index c23da35..5ca705d 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
@@ -54,7 +54,6 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
import org.apache.dolphinscheduler.dao.entity.ProjectUser;
import org.apache.dolphinscheduler.dao.entity.Schedule;
-import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
@@ -333,7 +332,8 @@ public class WorkflowExecuteThread {
ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(stateEvent.getTaskInstanceId());
TaskInstance taskInstance = this.processService.findTaskInstanceById(stateEvent.getTaskInstanceId());
ProcessInstance processInstance = this.processService.findProcessInstanceById(taskInstance.getProcessInstanceId());
- taskProcessor.dispatch(taskInstance, processInstance);
+ taskProcessor.init(taskInstance, processInstance);
+ taskProcessor.action(TaskAction.DISPATCH);
this.processService.updateTaskGroupQueueStatus(taskGroupQueue.getId(), TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode());
return true;
}
@@ -343,7 +343,8 @@ public class WorkflowExecuteThread {
ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(stateEvent.getTaskInstanceId());
TaskInstance taskInstance = this.processService.findTaskInstanceById(stateEvent.getTaskInstanceId());
ProcessInstance processInstance = this.processService.findProcessInstanceById(taskInstance.getProcessInstanceId());
- taskProcessor.dispatch(taskInstance, processInstance);
+ taskProcessor.init(taskInstance, processInstance);
+ taskProcessor.action(TaskAction.DISPATCH);
return true;
}
}
@@ -406,7 +407,7 @@ public class WorkflowExecuteThread {
}
} else if (activeTaskProcessorMaps.containsKey(stateEvent.getTaskInstanceId())) {
ITaskProcessor iTaskProcessor = activeTaskProcessorMaps.get(stateEvent.getTaskInstanceId());
- iTaskProcessor.run();
+ iTaskProcessor.action(TaskAction.RUN);
if (iTaskProcessor.taskState().typeIsFinished()) {
task = processService.findTaskInstanceById(stateEvent.getTaskInstanceId());
@@ -800,15 +801,18 @@ public class WorkflowExecuteThread {
*/
private TaskInstance submitTaskExec(TaskInstance taskInstance) {
try {
+ // package task instance before submit
+ processService.packageTaskInstance(taskInstance, processInstance);
+
ITaskProcessor taskProcessor = TaskProcessorFactory.getTaskProcessor(taskInstance.getTaskType());
+ taskProcessor.init(taskInstance, processInstance);
+
if (taskInstance.getState() == ExecutionStatus.RUNNING_EXECUTION
&& taskProcessor.getType().equalsIgnoreCase(Constants.COMMON_TASK_TYPE)) {
notifyProcessHostUpdate(taskInstance);
}
- // package task instance before submit
- processService.packageTaskInstance(taskInstance, processInstance);
- boolean submit = taskProcessor.submit(taskInstance, processInstance, masterConfig.getTaskCommitRetryTimes(), masterConfig.getTaskCommitInterval(), masterConfig.isTaskLogger());
+ boolean submit = taskProcessor.action(TaskAction.SUBMIT);
if (!submit) {
logger.error("process id:{} name:{} submit standby task id:{} name:{} failed!",
processInstance.getId(), processInstance.getName(),
@@ -818,7 +822,7 @@ public class WorkflowExecuteThread {
validTaskMap.put(Long.toString(taskInstance.getTaskCode()), taskInstance.getId());
taskInstanceMap.put(taskInstance.getId(), taskInstance);
activeTaskProcessorMaps.put(taskInstance.getId(), taskProcessor);
- taskProcessor.run();
+ taskProcessor.action(TaskAction.RUN);
stateWheelExecuteThread.addTask4TimeoutCheck(taskInstance);
stateWheelExecuteThread.addTask4RetryCheck(taskInstance);
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
index 0b3d96b..8d62f1f 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
@@ -40,6 +40,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.UdfFunc;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
+import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
@@ -80,7 +81,30 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
protected ProcessInstance processInstance;
- protected ProcessService processService = SpringApplicationContext.getBean(ProcessService.class);;
+ protected int maxRetryTimes;
+
+ protected int commitInterval;
+
+ protected boolean isTaskLogger;
+
+ protected ProcessService processService = SpringApplicationContext.getBean(ProcessService.class);
+
+ protected MasterConfig masterConfig = SpringApplicationContext.getBean(MasterConfig.class);
+
+ @Override
+ public void init(TaskInstance taskInstance, ProcessInstance processInstance) {
+ if (processService == null) {
+ processService = SpringApplicationContext.getBean(ProcessService.class);
+ }
+ if (masterConfig == null) {
+ masterConfig = SpringApplicationContext.getBean(MasterConfig.class);
+ }
+ this.taskInstance = taskInstance;
+ this.processInstance = processInstance;
+ this.maxRetryTimes = masterConfig.getTaskCommitRetryTimes();
+ this.commitInterval = masterConfig.getTaskCommitInterval();
+ this.isTaskLogger = masterConfig.isTaskLogger();
+ }
/**
* pause task, common tasks donot need this.
@@ -97,9 +121,21 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
*/
protected abstract boolean taskTimeout();
- @Override
- public void run() {
- }
+ /**
+ * submit task
+ */
+ protected abstract boolean submitTask();
+
+ /**
+ * run task
+ */
+ protected abstract boolean runTask();
+
+ /**
+ * dispatch task
+ * @return
+ */
+ protected abstract boolean dispatchTask();
@Override
public boolean action(TaskAction taskAction) {
@@ -111,6 +147,12 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
return pause();
case TIMEOUT:
return timeout();
+ case SUBMIT:
+ return submit();
+ case RUN:
+ return run();
+ case DISPATCH:
+ return dispatch();
default:
logger.error("unknown task action: {}", taskAction);
@@ -118,6 +160,18 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
return false;
}
+ protected boolean submit() {
+ return submitTask();
+ }
+
+ protected boolean run() {
+ return runTask();
+ }
+
+ protected boolean dispatch() {
+ return dispatchTask();
+ }
+
protected boolean timeout() {
if (timeout) {
return true;
@@ -126,9 +180,6 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
return timeout;
}
- /**
- *
- */
protected boolean pause() {
if (paused) {
return true;
@@ -150,9 +201,8 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
return null;
}
- @Override
- public void dispatch(TaskInstance taskInstance, ProcessInstance processInstance) {
-
+ public ExecutionStatus taskState() {
+ return this.taskInstance.getState();
}
/**
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessFactory.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessFactory.java
deleted file mode 100644
index 4884650..0000000
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessFactory.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.master.runner.task;
-
-import org.apache.dolphinscheduler.common.Constants;
-
-import com.google.auto.service.AutoService;
-
-@AutoService(ITaskProcessFactory.class)
-public class CommonTaskProcessFactory implements ITaskProcessFactory {
- @Override
- public String type() {
- return Constants.COMMON_TASK_TYPE;
-
- }
-
- @Override
- public ITaskProcessor create() {
- return new CommonTaskProcessor();
- }
-}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
index f35127a..db6e7e2 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
@@ -19,9 +19,6 @@ package org.apache.dolphinscheduler.server.master.runner.task;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
-import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
@@ -38,9 +35,12 @@ import org.apache.commons.lang.StringUtils;
import java.util.Date;
+import com.google.auto.service.AutoService;
+
/**
* common task processor
*/
+@AutoService(ITaskProcessor.class)
public class CommonTaskProcessor extends BaseTaskProcessor {
private TaskPriorityQueue taskUpdateQueue;
@@ -48,42 +48,32 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
private NettyExecutorManager nettyExecutorManager = SpringApplicationContext.getBean(NettyExecutorManager.class);
@Override
- public boolean submit(TaskInstance task, ProcessInstance processInstance, int maxRetryTimes, int commitInterval, boolean isTaskLogger) {
- this.processInstance = processInstance;
- this.taskInstance = processService.submitTaskWithRetry(processInstance, task, maxRetryTimes, commitInterval);
+ public boolean submitTask() {
+ this.taskInstance = processService.submitTaskWithRetry(processInstance, taskInstance, maxRetryTimes, commitInterval);
if (this.taskInstance == null) {
return false;
}
setTaskExecutionLogger(isTaskLogger);
- int taskGroupId = task.getTaskGroupId();
+ int taskGroupId = taskInstance.getTaskGroupId();
if (taskGroupId > 0) {
- boolean acquireTaskGroup = processService.acquireTaskGroup(task.getId(),
- task.getName(),
+ boolean acquireTaskGroup = processService.acquireTaskGroup(taskInstance.getId(),
+ taskInstance.getName(),
taskGroupId,
- task.getProcessInstanceId(),
- task.getTaskInstancePriority().getCode());
+ taskInstance.getProcessInstanceId(),
+ taskInstance.getTaskInstancePriority().getCode());
if (!acquireTaskGroup) {
logger.info("submit task name :{}, but the first time to try to acquire task group failed", taskInstance.getName());
return true;
}
}
- dispatchTask(taskInstance, processInstance);
+ dispatchTask();
return true;
}
@Override
- public ExecutionStatus taskState() {
- return this.taskInstance.getState();
- }
-
- @Override
- public void dispatch(TaskInstance taskInstance, ProcessInstance processInstance) {
- this.dispatchTask(taskInstance, processInstance);
- }
-
- @Override
- public void run() {
+ public boolean runTask() {
+ return true;
}
@Override
@@ -104,8 +94,8 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
return Constants.COMMON_TASK_TYPE;
}
- private boolean dispatchTask(TaskInstance taskInstance, ProcessInstance processInstance) {
-
+ @Override
+ public boolean dispatchTask() {
try {
if (taskUpdateQueue == null) {
this.initQueue();
@@ -133,8 +123,7 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
logger.info(String.format("master submit success, task : %s", taskInstance.getName()));
return true;
} catch (Exception e) {
- logger.error("submit task Exception: ", e);
- logger.error("task error : {}", JSONUtils.toJsonString(taskInstance));
+ logger.error("submit task error", e);
return false;
}
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessFactory.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessFactory.java
deleted file mode 100644
index 3028c56..0000000
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessFactory.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.master.runner.task;
-
-import org.apache.dolphinscheduler.common.enums.TaskType;
-
-import com.google.auto.service.AutoService;
-
-@AutoService(ITaskProcessFactory.class)
-public class ConditionTaskProcessFactory implements ITaskProcessFactory {
- @Override
- public String type() {
- return TaskType.CONDITIONS.getDesc();
- }
-
- @Override
- public ITaskProcessor create() {
- return new ConditionTaskProcessor();
- }
-}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
index 7a7fe29..3d96d2e 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
@@ -27,12 +27,8 @@ import org.apache.dolphinscheduler.common.model.DependentTaskModel;
import org.apache.dolphinscheduler.common.task.dependent.DependentParameters;
import org.apache.dolphinscheduler.common.utils.DependentUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
-import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
-import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.utils.LogUtils;
-import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import java.util.ArrayList;
import java.util.Date;
@@ -40,9 +36,12 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import com.google.auto.service.AutoService;
+
/**
* condition task processor
*/
+@AutoService(ITaskProcessor.class)
public class ConditionTaskProcessor extends BaseTaskProcessor {
/**
@@ -60,21 +59,13 @@ public class ConditionTaskProcessor extends BaseTaskProcessor {
*/
private Map<Long, ExecutionStatus> completeTaskList = new ConcurrentHashMap<>();
- private MasterConfig masterConfig = SpringApplicationContext.getBean(MasterConfig.class);;
-
- private TaskDefinition taskDefinition;
-
@Override
- public boolean submit(TaskInstance task, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval, boolean isTaskLogger) {
- this.processInstance = processInstance;
- this.taskInstance = processService.submitTaskWithRetry(processInstance, task, masterTaskCommitRetryTimes, masterTaskCommitInterval);
+ public boolean submitTask() {
+ this.taskInstance = processService.submitTaskWithRetry(processInstance, taskInstance, maxRetryTimes, commitInterval);
if (this.taskInstance == null) {
return false;
}
- taskDefinition = processService.findTaskDefinition(
- taskInstance.getTaskCode(), taskInstance.getTaskDefinitionVersion()
- );
setTaskExecutionLogger(isTaskLogger);
String threadLoggerInfoName = String.format(Constants.TASK_LOG_INFO_FORMAT, processService.formatTaskAppId(this.taskInstance));
@@ -90,13 +81,19 @@ public class ConditionTaskProcessor extends BaseTaskProcessor {
}
@Override
- public void run() {
+ public boolean runTask() {
if (conditionResult.equals(DependResult.WAITING)) {
setConditionResult();
endTask();
} else {
endTask();
}
+ return true;
+ }
+
+ @Override
+ protected boolean dispatchTask() {
+ return true;
}
@Override
@@ -109,8 +106,7 @@ public class ConditionTaskProcessor extends BaseTaskProcessor {
@Override
protected boolean taskTimeout() {
- TaskTimeoutStrategy taskTimeoutStrategy =
- taskDefinition.getTimeoutNotifyStrategy();
+ TaskTimeoutStrategy taskTimeoutStrategy = taskInstance.getTaskDefine().getTimeoutNotifyStrategy();
if (taskTimeoutStrategy == TaskTimeoutStrategy.WARN) {
return true;
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessFactory.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessFactory.java
deleted file mode 100644
index 3f885ed..0000000
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessFactory.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.master.runner.task;
-
-import org.apache.dolphinscheduler.common.enums.TaskType;
-
-import com.google.auto.service.AutoService;
-
-@AutoService(ITaskProcessFactory.class)
-public class DependentTaskProcessFactory implements ITaskProcessFactory {
-
- @Override
- public String type() {
- return TaskType.DEPENDENT.getDesc();
- }
-
- @Override
- public ITaskProcessor create() {
- return new DependentTaskProcessor();
- }
-}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
index 1c1c969..4b20b68 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
@@ -27,13 +27,8 @@ import org.apache.dolphinscheduler.common.model.DependentTaskModel;
import org.apache.dolphinscheduler.common.task.dependent.DependentParameters;
import org.apache.dolphinscheduler.common.utils.DependentUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
-import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
-import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.utils.DependentExecute;
import org.apache.dolphinscheduler.server.utils.LogUtils;
-import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import java.util.ArrayList;
import java.util.Date;
@@ -42,10 +37,12 @@ import java.util.List;
import java.util.Map;
import com.fasterxml.jackson.annotation.JsonFormat;
+import com.google.auto.service.AutoService;
/**
* dependent task processor
*/
+@AutoService(ITaskProcessor.class)
public class DependentTaskProcessor extends BaseTaskProcessor {
private DependentParameters dependentParameters;
@@ -69,24 +66,16 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
DependResult result;
- TaskDefinition taskDefinition;
-
- private MasterConfig masterConfig = SpringApplicationContext.getBean(MasterConfig.class);;
-
boolean allDependentItemFinished;
@Override
- public boolean submit(TaskInstance task, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval, boolean isTaskLogger) {
- this.processInstance = processInstance;
- this.taskInstance = task;
- this.taskInstance = processService.submitTaskWithRetry(processInstance, task, masterTaskCommitRetryTimes, masterTaskCommitInterval);
+ public boolean submitTask() {
+ this.taskInstance = processService.submitTaskWithRetry(processInstance, taskInstance, maxRetryTimes, commitInterval);
if (this.taskInstance == null) {
return false;
}
- taskDefinition = processService.findTaskDefinition(
- taskInstance.getTaskCode(), taskInstance.getTaskDefinitionVersion()
- );
+
taskInstance.setLogPath(LogUtils.getTaskLogPath(taskInstance.getFirstSubmitTime(),
processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion(),
@@ -107,7 +96,7 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
}
@Override
- public void run() {
+ public boolean runTask() {
if (!allDependentItemFinished) {
allDependentItemFinished = allDependentTaskFinish();
}
@@ -115,12 +104,17 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
getTaskDependResult();
endTask();
}
+ return true;
+ }
+
+ @Override
+ protected boolean dispatchTask() {
+ return true;
}
@Override
protected boolean taskTimeout() {
- TaskTimeoutStrategy taskTimeoutStrategy =
- taskDefinition.getTimeoutNotifyStrategy();
+ TaskTimeoutStrategy taskTimeoutStrategy = taskInstance.getTaskDefine().getTimeoutNotifyStrategy();
if (TaskTimeoutStrategy.FAILED != taskTimeoutStrategy
&& TaskTimeoutStrategy.WARNFAILED != taskTimeoutStrategy) {
return true;
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessFactory.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessFactory.java
deleted file mode 100644
index ffbbafb..0000000
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessFactory.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.master.runner.task;
-
-public interface ITaskProcessFactory {
-
- String type();
-
- ITaskProcessor create();
-}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessor.java
index 66e49f9..d1f3c4c 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessor.java
@@ -26,16 +26,12 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
*/
public interface ITaskProcessor {
- void run();
+ void init(TaskInstance taskInstance, ProcessInstance processInstance);
boolean action(TaskAction taskAction);
String getType();
- boolean submit(TaskInstance taskInstance, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval, boolean isTaskLogger);
-
ExecutionStatus taskState();
- void dispatch(TaskInstance taskInstance, ProcessInstance processInstance);
-
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessFactory.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessFactory.java
deleted file mode 100644
index 439d8e1..0000000
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessFactory.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.master.runner.task;
-
-import org.apache.dolphinscheduler.common.enums.TaskType;
-
-import com.google.auto.service.AutoService;
-
-@AutoService(ITaskProcessFactory.class)
-public class SubTaskProcessFactory implements ITaskProcessFactory {
- @Override
- public String type() {
- return TaskType.SUB_PROCESS.getDesc();
- }
-
- @Override
- public ITaskProcessor create() {
- return new SubTaskProcessor();
- }
-}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
index 50ddaeb..85bce57 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
@@ -21,8 +21,6 @@ import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
-import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
@@ -31,28 +29,26 @@ import java.util.Date;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import com.google.auto.service.AutoService;
+
/**
* subtask processor
*/
+@AutoService(ITaskProcessor.class)
public class SubTaskProcessor extends BaseTaskProcessor {
private ProcessInstance subProcessInstance = null;
- private TaskDefinition taskDefinition;
/**
* run lock
*/
private final Lock runLock = new ReentrantLock();
- private StateEventCallbackService stateEventCallbackService = SpringApplicationContext.getBean(StateEventCallbackService.class);;
+ private StateEventCallbackService stateEventCallbackService = SpringApplicationContext.getBean(StateEventCallbackService.class);
@Override
- public boolean submit(TaskInstance task, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval, boolean isTaskLogger) {
- this.processInstance = processInstance;
- taskDefinition = processService.findTaskDefinition(
- task.getTaskCode(), task.getTaskDefinitionVersion()
- );
- this.taskInstance = processService.submitTaskWithRetry(processInstance, task, masterTaskCommitRetryTimes, masterTaskCommitInterval);
+ public boolean submitTask() {
+ this.taskInstance = processService.submitTaskWithRetry(processInstance, taskInstance, maxRetryTimes, commitInterval);
if (this.taskInstance == null) {
return false;
@@ -67,7 +63,7 @@ public class SubTaskProcessor extends BaseTaskProcessor {
}
@Override
- public void run() {
+ public boolean runTask() {
try {
this.runLock.lock();
if (setSubWorkFlow()) {
@@ -81,12 +77,17 @@ public class SubTaskProcessor extends BaseTaskProcessor {
} finally {
this.runLock.unlock();
}
+ return true;
+ }
+
+ @Override
+ protected boolean dispatchTask() {
+ return true;
}
@Override
protected boolean taskTimeout() {
- TaskTimeoutStrategy taskTimeoutStrategy =
- taskDefinition.getTimeoutNotifyStrategy();
+ TaskTimeoutStrategy taskTimeoutStrategy = taskInstance.getTaskDefine().getTimeoutNotifyStrategy();
if (TaskTimeoutStrategy.FAILED != taskTimeoutStrategy
&& TaskTimeoutStrategy.WARNFAILED != taskTimeoutStrategy) {
return true;
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessFactory.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessFactory.java
deleted file mode 100644
index d536e65..0000000
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessFactory.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.master.runner.task;
-
-import org.apache.dolphinscheduler.common.enums.TaskType;
-
-import com.google.auto.service.AutoService;
-
-@AutoService(ITaskProcessFactory.class)
-public class SwitchTaskProcessFactory implements ITaskProcessFactory {
-
- @Override
- public String type() {
- return TaskType.SWITCH.getDesc();
- }
-
- @Override
- public ITaskProcessor create() {
- return new SwitchTaskProcessor();
- }
-}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
index 1afd211..ce2706f 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
@@ -25,13 +25,9 @@ import org.apache.dolphinscheduler.common.task.switchtask.SwitchParameters;
import org.apache.dolphinscheduler.common.task.switchtask.SwitchResultVo;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
-import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
-import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.utils.LogUtils;
import org.apache.dolphinscheduler.server.utils.SwitchTaskUtils;
-import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang.StringUtils;
@@ -44,33 +40,28 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
+import com.google.auto.service.AutoService;
+
/**
* switch task processor
*/
+@AutoService(ITaskProcessor.class)
public class SwitchTaskProcessor extends BaseTaskProcessor {
protected final String rgex = "['\"]*\\$\\{(.*?)\\}['\"]*";
- TaskDefinition taskDefinition;
-
- private MasterConfig masterConfig = SpringApplicationContext.getBean(MasterConfig.class);;
-
/**
* switch result
*/
private DependResult conditionResult;
@Override
- public boolean submit(TaskInstance taskInstance, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval, boolean isTaskLogger) {
- this.processInstance = processInstance;
- this.taskInstance = processService.submitTaskWithRetry(processInstance, taskInstance, masterTaskCommitRetryTimes, masterTaskCommitInterval);
+ public boolean submitTask() {
+ this.taskInstance = processService.submitTaskWithRetry(processInstance, taskInstance, maxRetryTimes, commitInterval);
if (this.taskInstance == null) {
return false;
}
- taskDefinition = processService.findTaskDefinition(
- taskInstance.getTaskCode(), taskInstance.getTaskDefinitionVersion()
- );
taskInstance.setLogPath(LogUtils.getTaskLogPath(taskInstance.getFirstSubmitTime(), processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion(),
taskInstance.getProcessInstanceId(),
@@ -84,7 +75,7 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
}
@Override
- public void run() {
+ public boolean runTask() {
try {
if (!this.taskState().typeIsFinished() && setSwitchResult()) {
endTaskState();
@@ -95,6 +86,12 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
this.taskInstance.getId(),
e);
}
+ return true;
+ }
+
+ @Override
+ protected boolean dispatchTask() {
+ return true;
}
@Override
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskAction.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskAction.java
index 42c8846..9044945 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskAction.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskAction.java
@@ -20,8 +20,11 @@ package org.apache.dolphinscheduler.server.master.runner.task;
/**
* task action
*/
-public enum TaskAction {
+public enum TaskAction {
PAUSE,
STOP,
- TIMEOUT
+ TIMEOUT,
+ SUBMIT,
+ RUN,
+ DISPATCH
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java
index 4b20848..2b9e9d6 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java
@@ -31,25 +31,25 @@ import java.util.concurrent.ConcurrentHashMap;
*/
public class TaskProcessorFactory {
- public static final Map<String, ITaskProcessFactory> PROCESS_FACTORY_MAP = new ConcurrentHashMap<>();
+ public static final Map<String, ITaskProcessor> PROCESS_MAP = new ConcurrentHashMap<>();
private static final String DEFAULT_PROCESSOR = COMMON_TASK_TYPE;
static {
- for (ITaskProcessFactory iTaskProcessor : ServiceLoader.load(ITaskProcessFactory.class)) {
- PROCESS_FACTORY_MAP.put(iTaskProcessor.type(), iTaskProcessor);
+ for (ITaskProcessor iTaskProcessor : ServiceLoader.load(ITaskProcessor.class)) {
+ PROCESS_MAP.put(iTaskProcessor.getType(), iTaskProcessor);
}
}
- public static ITaskProcessor getTaskProcessor(String type) {
+ public static ITaskProcessor getTaskProcessor(String type) throws InstantiationException, IllegalAccessException {
if (StringUtils.isEmpty(type)) {
type = DEFAULT_PROCESSOR;
}
- ITaskProcessFactory taskProcessFactory = PROCESS_FACTORY_MAP.get(type);
- if (Objects.isNull(taskProcessFactory)) {
- taskProcessFactory = PROCESS_FACTORY_MAP.get(DEFAULT_PROCESSOR);
+ ITaskProcessor iTaskProcessor = PROCESS_MAP.get(type);
+ if (Objects.isNull(iTaskProcessor)) {
+ iTaskProcessor = PROCESS_MAP.get(DEFAULT_PROCESSOR);
}
- return taskProcessFactory.create();
+ return iTaskProcessor.getClass().newInstance();
}
}
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java
index 8f2572d..1e38938 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java
@@ -39,6 +39,7 @@ import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.runner.StateWheelExecuteThread;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory;
+import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import java.lang.reflect.Field;
@@ -89,13 +90,18 @@ public class WorkflowExecuteThreadTest {
@Before
public void init() throws Exception {
- processService = mock(ProcessService.class);
- taskProcessorFactory = mock(TaskProcessorFactory.class);
-
applicationContext = mock(ApplicationContext.class);
+ SpringApplicationContext springApplicationContext = new SpringApplicationContext();
+ springApplicationContext.setApplicationContext(applicationContext);
+
config = new MasterConfig();
Mockito.when(applicationContext.getBean(MasterConfig.class)).thenReturn(config);
+ processService = mock(ProcessService.class);
+ Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService);
+
+ taskProcessorFactory = mock(TaskProcessorFactory.class);
+
processInstance = mock(ProcessInstance.class);
Mockito.when(processInstance.getState()).thenReturn(ExecutionStatus.SUCCESS);
Mockito.when(processInstance.getHistoryCmd()).thenReturn(CommandType.COMPLEMENT_DATA.toString());
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java
index 4114a7a..d037180 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java
@@ -27,7 +27,7 @@ import org.junit.Test;
public class TaskProcessorFactoryTest {
@Test
- public void testFactory() {
+ public void testFactory() throws InstantiationException, IllegalAccessException {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setTaskType("shell");