You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ki...@apache.org on 2021/10/08 06:08:23 UTC
[dolphinscheduler] branch dev updated: [Bug-6455][Master]fix bug
6455: cannot stop sub-task (#6458)
This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new a8baa95 [Bug-6455][Master]fix bug 6455: cannot stop sub-task (#6458)
a8baa95 is described below
commit a8baa9553fd64d1414c204c6ba95019b7376f771
Author: OS <29...@users.noreply.github.com>
AuthorDate: Fri Oct 8 14:08:12 2021 +0800
[Bug-6455][Master]fix bug 6455: cannot stop sub-task (#6458)
* fix bug: cannot stop the task.
* fix bug: cannot stop the task.
* remove the check thread number
---
.../server/master/processor/StateEventProcessor.java | 6 +++++-
.../server/master/runner/WorkflowExecuteThread.java | 2 +-
.../server/master/runner/task/CommonTaskProcessor.java | 2 +-
.../server/master/runner/task/SubTaskProcessor.java | 18 ++++++++++++++++--
.../service/process/ProcessService.java | 4 ----
5 files changed, 23 insertions(+), 9 deletions(-)
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java
index d5a8e85..2f9a634 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java
@@ -61,8 +61,12 @@ public class StateEventProcessor implements NettyRequestProcessor {
StateEventChangeCommand stateEventChangeCommand = JSONUtils.parseObject(command.getBody(), StateEventChangeCommand.class);
StateEvent stateEvent = new StateEvent();
- stateEvent.setExecutionStatus(ExecutionStatus.RUNNING_EXECUTION);
stateEvent.setKey(stateEventChangeCommand.getKey());
+ if (stateEventChangeCommand.getSourceProcessInstanceId() != stateEventChangeCommand.getDestProcessInstanceId()) {
+ stateEvent.setExecutionStatus(ExecutionStatus.RUNNING_EXECUTION);
+ } else {
+ stateEvent.setExecutionStatus(stateEventChangeCommand.getSourceStatus());
+ }
stateEvent.setProcessInstanceId(stateEventChangeCommand.getDestProcessInstanceId());
stateEvent.setTaskInstanceId(stateEventChangeCommand.getDestTaskInstanceId());
StateEventType type = stateEvent.getTaskInstanceId() == 0 ? StateEventType.PROCESS_STATE_CHANGE : StateEventType.TASK_STATE_CHANGE;
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
index 7ed2679..ad44407 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
@@ -416,7 +416,7 @@ public class WorkflowExecuteThread implements Runnable {
if (stateEvent.getExecutionStatus().typeIsFinished()) {
endProcess();
}
- if (stateEvent.getExecutionStatus() == ExecutionStatus.READY_STOP) {
+ if (processInstance.getState() == ExecutionStatus.READY_STOP) {
killAllTasks();
}
return true;
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
index 4296b85..ee1c548 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
@@ -55,7 +55,7 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
MasterConfig masterConfig;
@Autowired
- NettyExecutorManager nettyExecutorManager;
+ NettyExecutorManager nettyExecutorManager = SpringApplicationContext.getBean(NettyExecutorManager.class);
/**
* logger of MasterBaseTaskExecThread
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
index 7a4be58..e0cd3e8 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
@@ -23,6 +23,9 @@ import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
+import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
+import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import java.util.Date;
import java.util.concurrent.locks.Lock;
@@ -43,6 +46,8 @@ public class SubTaskProcessor extends BaseTaskProcessor {
*/
private final Lock runLock = new ReentrantLock();
+ private StateEventCallbackService stateEventCallbackService = SpringApplicationContext.getBean(StateEventCallbackService.class);
+
@Override
public boolean submit(TaskInstance task, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval) {
this.processInstance = processInstance;
@@ -121,8 +126,7 @@ public class SubTaskProcessor extends BaseTaskProcessor {
}
subProcessInstance.setState(ExecutionStatus.READY_PAUSE);
processService.updateProcessInstance(subProcessInstance);
- //TODO...
- // send event to sub process master
+ sendToSubProcess();
return true;
}
@@ -157,9 +161,19 @@ public class SubTaskProcessor extends BaseTaskProcessor {
}
subProcessInstance.setState(ExecutionStatus.READY_STOP);
processService.updateProcessInstance(subProcessInstance);
+ sendToSubProcess();
return true;
}
+ private void sendToSubProcess() {
+ StateEventChangeCommand stateEventChangeCommand = new StateEventChangeCommand(
+ processInstance.getId(), taskInstance.getId(), subProcessInstance.getState(), subProcessInstance.getId(), 0
+ );
+ String address = subProcessInstance.getHost().split(":")[0];
+ int port = Integer.parseInt(subProcessInstance.getHost().split(":")[1]);
+ this.stateEventCallbackService.sendResult(address, port, stateEventChangeCommand.convert2Command());
+ }
+
@Override
public String getType() {
return TaskType.SUB_PROCESS.getDesc();
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index 6821967..e78112a 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -222,10 +222,6 @@ public class ProcessService {
moveToErrorCommand(command, "process instance is null");
return null;
}
- if (!checkThreadNum(command, validThreadNum)) {
- logger.info("there is not enough thread for this command: {}", command);
- return setWaitingThreadProcess(command, processInstance);
- }
processInstance.setCommandType(command.getCommandType());
processInstance.addHistoryCmd(command.getCommandType());
saveProcessInstance(processInstance);