You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by we...@apache.org on 2022/07/06 06:53:37 UTC
[dolphinscheduler] branch dev updated: [Fix-10785] Fix state event handle error will not retry (#10786)
This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 67d14fb7b3 [Fix-10785] Fix state event handle error will not retry (#10786)
67d14fb7b3 is described below
commit 67d14fb7b3d941af613a96a0b9c3a2928e5c201c
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Wed Jul 6 14:53:28 2022 +0800
[Fix-10785] Fix state event handle error will not retry (#10786)
* Fix state event handle error will not retry
* Use state event handler to deal with the event
---
.../api/service/impl/ExecutorServiceImpl.java | 3 +-
.../master/consumer/TaskPriorityQueueConsumer.java | 2 +
.../master/dispatch/host/assign/Selector.java | 2 +-
.../server/master/event}/StateEvent.java | 3 +-
.../StateEventHandleError.java} | 23 +-
.../StateEventHandleException.java} | 22 +-
.../server/master/event/StateEventHandler.java | 38 +-
.../master/event/StateEventHandlerManager.java | 41 +-
.../master/event/TaskRetryStateEventHandler.java | 47 +++
.../server/master/event/TaskStateEventHandler.java | 115 ++++++
.../master/event/TaskTimeoutStateEventHandler.java | 64 +++
.../event/TaskWaitTaskGroupStateHandler.java | 42 +-
.../event/WorkflowBlockStateEventHandler.java | 60 +++
.../master/event/WorkflowStateEventHandler.java | 95 +++++
.../event/WorkflowTimeoutStateEventHandler.java | 39 ++
.../master/processor/StateEventProcessor.java | 5 +-
.../master/processor/TaskEventProcessor.java | 2 +-
.../processor/queue/StateEventResponseService.java | 3 +-
.../master/processor/queue/TaskEventService.java | 17 +-
.../processor/queue/TaskExecuteRunnable.java | 3 +-
.../processor/queue/TaskExecuteThreadPool.java | 21 +-
.../master/runner/MasterSchedulerService.java | 2 +-
.../master/runner/StateWheelExecuteThread.java | 2 +-
.../master/runner/WorkflowExecuteRunnable.java | 438 ++++++---------------
.../master/runner/WorkflowExecuteThreadPool.java | 3 +-
.../server/master/service/FailoverService.java | 2 +-
.../server/master/service/FailoverServiceTest.java | 2 +-
.../remote/processor/NettyRemoteChannel.java | 7 +-
.../processor/StateEventCallbackService.java | 29 +-
.../queue/PeerTaskInstancePriorityQueue.java | 3 +-
.../service/queue/TaskPriorityQueue.java | 2 +-
.../service/queue/TaskPriorityQueueImpl.java | 3 +-
.../processor/TaskExecuteResponseAckProcessor.java | 8 +-
33 files changed, 674 insertions(+), 474 deletions(-)
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
index 396dcd3811..0e7d1b5abb 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
@@ -121,7 +121,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
private ProcessService processService;
@Autowired
- StateEventCallbackService stateEventCallbackService;
+ private StateEventCallbackService stateEventCallbackService;
@Autowired
private TaskDefinitionMapper taskDefinitionMapper;
@@ -500,6 +500,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
// determine whether the process is normal
if (update > 0) {
+ // directly send the process instance state change event to target master, not guarantee the event send success
StateEventChangeCommand stateEventChangeCommand = new StateEventChangeCommand(
processInstance.getId(), 0, processInstance.getState(), processInstance.getId(), 0
);
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
index e8cf4b2919..f04ae15dd8 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
@@ -116,7 +116,9 @@ public class TaskPriorityQueueConsumer extends BaseDaemonThread {
@PostConstruct
public void init() {
this.consumerThreadPoolExecutor = (ThreadPoolExecutor) ThreadUtils.newDaemonFixedThreadExecutor("TaskUpdateQueueConsumerThread", masterConfig.getDispatchTaskNumber());
+ logger.info("Task priority queue consume thread staring");
super.start();
+ logger.info("Task priority queue consume thread started");
}
@Override
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/Selector.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/Selector.java
index 8eed9d991e..49a4053e5e 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/Selector.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/Selector.java
@@ -27,7 +27,7 @@ public interface Selector<T> {
/**
* select
- * @param source source
+ * @param source source, the given source should not be empty.
* @return T
*/
T select(Collection<T> source);
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEvent.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/StateEvent.java
similarity index 91%
copy from dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEvent.java
copy to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/StateEvent.java
index 7f4be924dd..68cd582994 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEvent.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/StateEvent.java
@@ -15,8 +15,9 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.common.enums;
+package org.apache.dolphinscheduler.server.master.event;
+import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import io.netty.channel.Channel;
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/Selector.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/StateEventHandleError.java
similarity index 63%
copy from dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/Selector.java
copy to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/StateEventHandleError.java
index 8eed9d991e..a8dce04f03 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/Selector.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/StateEventHandleError.java
@@ -15,20 +15,19 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
-
-import java.util.Collection;
+package org.apache.dolphinscheduler.server.master.event;
/**
- * selector
- * @param <T> T
+ * This exception represent the exception can not recover, this happens when the event is broken.
+ * And when we get this exception, we will drop the event.
*/
-public interface Selector<T> {
+public class StateEventHandleError extends Exception {
+
+ public StateEventHandleError(String message) {
+ super(message);
+ }
- /**
- * select
- * @param source source
- * @return T
- */
- T select(Collection<T> source);
+ public StateEventHandleError(String message, Throwable throwable) {
+ super(message, throwable);
+ }
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/Selector.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/StateEventHandleException.java
similarity index 65%
copy from dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/Selector.java
copy to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/StateEventHandleException.java
index 8eed9d991e..dc9456a397 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/Selector.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/StateEventHandleException.java
@@ -15,20 +15,18 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
-
-import java.util.Collection;
+package org.apache.dolphinscheduler.server.master.event;
/**
- * selector
- * @param <T> T
+ * This exception represent the exception can be recovered, when we get this exception, we will retry the event.
*/
-public interface Selector<T> {
+public class StateEventHandleException extends Exception {
+
+ public StateEventHandleException(String message) {
+ super(message);
+ }
- /**
- * select
- * @param source source
- * @return T
- */
- T select(Collection<T> source);
+ public StateEventHandleException(String message, Throwable throwable) {
+ super(message, throwable);
+ }
}
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEvent.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/StateEventHandler.java
similarity index 51%
copy from dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEvent.java
copy to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/StateEventHandler.java
index 7f4be924dd..00808b2e29 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEvent.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/StateEventHandler.java
@@ -15,36 +15,22 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.common.enums;
+package org.apache.dolphinscheduler.server.master.event;
-import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.common.enums.StateEventType;
+import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
-import io.netty.channel.Channel;
-import lombok.Data;
-
-/**
- * state event
- */
-@Data
-public class StateEvent {
+public interface StateEventHandler {
/**
- * origin_pid-origin_task_id-process_instance_id-task_instance_id
+ * Handle a event, if handle success will reture true, else return false
+ *
+ * @param stateEvent given state event.
+ * @throws StateEventHandleException this exception means it can be recovered.
+ * @throws StateEventHandleError this exception means it cannot be recovered, so the event need to drop.
*/
- private String key;
-
- private StateEventType type;
-
- private ExecutionStatus executionStatus;
-
- private int taskInstanceId;
-
- private long taskCode;
-
- private int processInstanceId;
-
- private String context;
-
- private Channel channel;
+ boolean handleStateEvent(WorkflowExecuteRunnable workflowExecuteRunnable, StateEvent stateEvent)
+ throws StateEventHandleException, StateEventHandleError;
+ StateEventType getEventType();
}
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEvent.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/StateEventHandlerManager.java
similarity index 51%
copy from dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEvent.java
copy to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/StateEventHandlerManager.java
index 7f4be924dd..b436b55890 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEvent.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/StateEventHandlerManager.java
@@ -15,36 +15,27 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.common.enums;
+package org.apache.dolphinscheduler.server.master.event;
-import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.common.enums.StateEventType;
-import io.netty.channel.Channel;
-import lombok.Data;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.ServiceLoader;
-/**
- * state event
- */
-@Data
-public class StateEvent {
-
- /**
- * origin_pid-origin_task_id-process_instance_id-task_instance_id
- */
- private String key;
-
- private StateEventType type;
-
- private ExecutionStatus executionStatus;
-
- private int taskInstanceId;
-
- private long taskCode;
+public class StateEventHandlerManager {
- private int processInstanceId;
+ private static final Map<StateEventType, StateEventHandler> stateEventHandlerMap = new HashMap<>();
- private String context;
+ static {
+ ServiceLoader.load(StateEventHandler.class)
+ .forEach(stateEventHandler -> stateEventHandlerMap.put(stateEventHandler.getEventType(),
+ stateEventHandler));
+ }
- private Channel channel;
+ public static Optional<StateEventHandler> getStateEventHandler(StateEventType stateEventType) {
+ return Optional.ofNullable(stateEventHandlerMap.get(stateEventType));
+ }
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRetryStateEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRetryStateEventHandler.java
new file mode 100644
index 0000000000..ee8168856a
--- /dev/null
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRetryStateEventHandler.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.event;
+
+import org.apache.dolphinscheduler.common.enums.StateEventType;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics;
+import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
+
+import java.util.Map;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(StateEventHandler.class)
+public class TaskRetryStateEventHandler implements StateEventHandler {
+ @Override
+ public boolean handleStateEvent(WorkflowExecuteRunnable workflowExecuteRunnable, StateEvent stateEvent)
+ throws StateEventHandleException {
+ TaskMetrics.incTaskRetry();
+ Map<Long, TaskInstance> waitToRetryTaskInstanceMap = workflowExecuteRunnable.getWaitToRetryTaskInstanceMap();
+ TaskInstance taskInstance = waitToRetryTaskInstanceMap.get(stateEvent.getTaskCode());
+ workflowExecuteRunnable.addTaskToStandByList(taskInstance);
+ workflowExecuteRunnable.submitStandByTask();
+ waitToRetryTaskInstanceMap.remove(stateEvent.getTaskCode());
+ return true;
+ }
+
+ @Override
+ public StateEventType getEventType() {
+ return StateEventType.TASK_RETRY;
+ }
+}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskStateEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskStateEventHandler.java
new file mode 100644
index 0000000000..e3ad268f97
--- /dev/null
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskStateEventHandler.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.event;
+
+import org.apache.dolphinscheduler.common.enums.StateEventType;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics;
+import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
+import org.apache.dolphinscheduler.server.master.runner.task.ITaskProcessor;
+import org.apache.dolphinscheduler.server.master.runner.task.TaskAction;
+
+import java.util.Map;
+import java.util.Optional;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(StateEventHandler.class)
+public class TaskStateEventHandler implements StateEventHandler {
+
+ private static final Logger logger = LoggerFactory.getLogger(TaskStateEventHandler.class);
+
+ @Override
+ public boolean handleStateEvent(WorkflowExecuteRunnable workflowExecuteRunnable, StateEvent stateEvent)
+ throws StateEventHandleException, StateEventHandleError {
+ measureTaskState(stateEvent);
+ workflowExecuteRunnable.checkTaskInstanceByStateEvent(stateEvent);
+
+ Optional<TaskInstance> taskInstanceOptional =
+ workflowExecuteRunnable.getTaskInstance(stateEvent.getTaskInstanceId());
+
+ TaskInstance task = taskInstanceOptional.orElseThrow(() -> new StateEventHandleError(
+ "Cannot find task instance from taskMap by task instance id: " + stateEvent.getTaskInstanceId()));
+
+ if (task.getState() == null) {
+ throw new StateEventHandleError("Task state event handle error due to task state is null");
+ }
+
+ Map<Long, Integer> completeTaskMap = workflowExecuteRunnable.getCompleteTaskMap();
+
+ if (task.getState().typeIsFinished()) {
+ if (completeTaskMap.containsKey(task.getTaskCode())
+ && completeTaskMap.get(task.getTaskCode()) == task.getId()) {
+ logger.warn("The task instance is already complete, stateEvent: {}", stateEvent);
+ return true;
+ }
+ workflowExecuteRunnable.taskFinished(task);
+ if (task.getTaskGroupId() > 0) {
+ workflowExecuteRunnable.releaseTaskGroup(task);
+ }
+ return true;
+ }
+ Map<Long, ITaskProcessor> activeTaskProcessMap = workflowExecuteRunnable.getActiveTaskProcessMap();
+ if (activeTaskProcessMap.containsKey(task.getTaskCode())) {
+ ITaskProcessor iTaskProcessor = activeTaskProcessMap.get(task.getTaskCode());
+ iTaskProcessor.action(TaskAction.RUN);
+
+ if (iTaskProcessor.taskInstance().getState().typeIsFinished()) {
+ if (iTaskProcessor.taskInstance().getState() != task.getState()) {
+ task.setState(iTaskProcessor.taskInstance().getState());
+ }
+ workflowExecuteRunnable.taskFinished(task);
+ }
+ return true;
+ }
+ throw new StateEventHandleException(
+ "Task state event handle error, due to the task is not in activeTaskProcessorMaps");
+ }
+
+ @Override
+ public StateEventType getEventType() {
+ return StateEventType.TASK_STATE_CHANGE;
+ }
+
+ private void measureTaskState(StateEvent taskStateEvent) {
+ if (taskStateEvent == null || taskStateEvent.getExecutionStatus() == null) {
+ // the event is broken
+ logger.warn("The task event is broken..., taskEvent: {}", taskStateEvent);
+ return;
+ }
+ if (taskStateEvent.getExecutionStatus().typeIsFinished()) {
+ TaskMetrics.incTaskFinish();
+ }
+ switch (taskStateEvent.getExecutionStatus()) {
+ case STOP:
+ TaskMetrics.incTaskStop();
+ break;
+ case SUCCESS:
+ TaskMetrics.incTaskSuccess();
+ break;
+ case FAILURE:
+ TaskMetrics.incTaskFailure();
+ break;
+ default:
+ break;
+ }
+ }
+}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskTimeoutStateEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskTimeoutStateEventHandler.java
new file mode 100644
index 0000000000..240f10ff2c
--- /dev/null
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskTimeoutStateEventHandler.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.event;
+
+import org.apache.dolphinscheduler.common.enums.StateEventType;
+import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
+import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics;
+import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
+import org.apache.dolphinscheduler.server.master.runner.task.ITaskProcessor;
+import org.apache.dolphinscheduler.server.master.runner.task.TaskAction;
+
+import java.util.Map;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(StateEventHandler.class)
+public class TaskTimeoutStateEventHandler implements StateEventHandler {
+ @Override
+ public boolean handleStateEvent(WorkflowExecuteRunnable workflowExecuteRunnable, StateEvent stateEvent)
+ throws StateEventHandleError {
+ TaskMetrics.incTaskTimeout();
+ workflowExecuteRunnable.checkTaskInstanceByStateEvent(stateEvent);
+
+ TaskInstance taskInstance = workflowExecuteRunnable.getTaskInstance(stateEvent.getTaskInstanceId()).get();
+
+ if (TimeoutFlag.CLOSE == taskInstance.getTaskDefine().getTimeoutFlag()) {
+ return true;
+ }
+ TaskTimeoutStrategy taskTimeoutStrategy = taskInstance.getTaskDefine().getTimeoutNotifyStrategy();
+ Map<Long, ITaskProcessor> activeTaskProcessMap = workflowExecuteRunnable.getActiveTaskProcessMap();
+ if (TaskTimeoutStrategy.FAILED == taskTimeoutStrategy
+ || TaskTimeoutStrategy.WARNFAILED == taskTimeoutStrategy) {
+ ITaskProcessor taskProcessor = activeTaskProcessMap.get(taskInstance.getTaskCode());
+ taskProcessor.action(TaskAction.TIMEOUT);
+ }
+ if (TaskTimeoutStrategy.WARN == taskTimeoutStrategy || TaskTimeoutStrategy.WARNFAILED == taskTimeoutStrategy) {
+ workflowExecuteRunnable.processTimeout();
+ workflowExecuteRunnable.taskTimeout(taskInstance);
+ }
+ return true;
+ }
+
+ @Override
+ public StateEventType getEventType() {
+ return StateEventType.TASK_TIMEOUT;
+ }
+}
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEvent.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskWaitTaskGroupStateHandler.java
similarity index 54%
rename from dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEvent.java
rename to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskWaitTaskGroupStateHandler.java
index 7f4be924dd..9a3c59a949 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEvent.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskWaitTaskGroupStateHandler.java
@@ -15,36 +15,22 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.common.enums;
+package org.apache.dolphinscheduler.server.master.event;
-import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.common.enums.StateEventType;
+import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
-import io.netty.channel.Channel;
-import lombok.Data;
+import com.google.auto.service.AutoService;
-/**
- * state event
- */
-@Data
-public class StateEvent {
-
- /**
- * origin_pid-origin_task_id-process_instance_id-task_instance_id
- */
- private String key;
-
- private StateEventType type;
-
- private ExecutionStatus executionStatus;
-
- private int taskInstanceId;
-
- private long taskCode;
-
- private int processInstanceId;
-
- private String context;
-
- private Channel channel;
+@AutoService(StateEventHandler.class)
+public class TaskWaitTaskGroupStateHandler implements StateEventHandler {
+ @Override
+ public boolean handleStateEvent(WorkflowExecuteRunnable workflowExecuteRunnable, StateEvent stateEvent) {
+ return workflowExecuteRunnable.checkForceStartAndWakeUp(stateEvent);
+ }
+ @Override
+ public StateEventType getEventType() {
+ return StateEventType.WAIT_TASK_GROUP;
+ }
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowBlockStateEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowBlockStateEventHandler.java
new file mode 100644
index 0000000000..f7349fcbd1
--- /dev/null
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowBlockStateEventHandler.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.event;
+
+import org.apache.dolphinscheduler.common.enums.StateEventType;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.plugin.task.api.parameters.BlockingParameters;
+import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
+
+import java.util.Optional;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(StateEventHandler.class)
+public class WorkflowBlockStateEventHandler implements StateEventHandler {
+
+ private static final Logger logger = LoggerFactory.getLogger(WorkflowBlockStateEventHandler.class);
+
+ @Override
+ public boolean handleStateEvent(WorkflowExecuteRunnable workflowExecuteRunnable, StateEvent stateEvent)
+ throws StateEventHandleError {
+ Optional<TaskInstance> taskInstanceOptional =
+ workflowExecuteRunnable.getTaskInstance(stateEvent.getTaskInstanceId());
+ if (!taskInstanceOptional.isPresent()) {
+ throw new StateEventHandleError("Cannot find taskInstance from taskMap by taskInstanceId: "
+ + stateEvent.getTaskInstanceId());
+ }
+ TaskInstance task = taskInstanceOptional.get();
+
+ BlockingParameters parameters = JSONUtils.parseObject(task.getTaskParams(), BlockingParameters.class);
+ if (parameters != null && parameters.isAlertWhenBlocking()) {
+ workflowExecuteRunnable.processBlock();
+ }
+ return true;
+ }
+
+ @Override
+ public StateEventType getEventType() {
+ return StateEventType.PROCESS_BLOCKED;
+ }
+}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowStateEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowStateEventHandler.java
new file mode 100644
index 0000000000..37d8ceb1da
--- /dev/null
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowStateEventHandler.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.event;
+
+import org.apache.dolphinscheduler.common.enums.StateEventType;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics;
+import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(StateEventHandler.class)
+public class WorkflowStateEventHandler implements StateEventHandler {
+
+ private static final Logger logger = LoggerFactory.getLogger(WorkflowStateEventHandler.class);
+
+ @Override
+ public boolean handleStateEvent(WorkflowExecuteRunnable workflowExecuteRunnable, StateEvent stateEvent)
+ throws StateEventHandleException {
+ measureProcessState(stateEvent);
+ ProcessInstance processInstance = workflowExecuteRunnable.getProcessInstance();
+ ProcessDefinition processDefinition = processInstance.getProcessDefinition();
+
+ logger.info("process:{} state {} change to {}",
+ processInstance.getId(),
+ processInstance.getState(),
+ stateEvent.getExecutionStatus());
+
+ if (stateEvent.getExecutionStatus() == ExecutionStatus.STOP) {
+ // serial wait execution type needs to wake up the waiting process
+ if (processDefinition.getExecutionType().typeIsSerialWait() || processDefinition.getExecutionType()
+ .typeIsSerialPriority()) {
+ workflowExecuteRunnable.endProcess();
+ return true;
+ }
+ workflowExecuteRunnable.updateProcessInstanceState(stateEvent);
+ return true;
+ }
+ if (workflowExecuteRunnable.processComplementData()) {
+ return true;
+ }
+ if (stateEvent.getExecutionStatus().typeIsFinished()) {
+ workflowExecuteRunnable.endProcess();
+ }
+ if (processInstance.getState() == ExecutionStatus.READY_STOP) {
+ workflowExecuteRunnable.killAllTasks();
+ }
+
+ return true;
+ }
+
+ @Override
+ public StateEventType getEventType() {
+ return StateEventType.PROCESS_STATE_CHANGE;
+ }
+
+ private void measureProcessState(StateEvent processStateEvent) {
+ if (processStateEvent.getExecutionStatus().typeIsFinished()) {
+ ProcessInstanceMetrics.incProcessInstanceFinish();
+ }
+ switch (processStateEvent.getExecutionStatus()) {
+ case STOP:
+ ProcessInstanceMetrics.incProcessInstanceStop();
+ break;
+ case SUCCESS:
+ ProcessInstanceMetrics.incProcessInstanceSuccess();
+ break;
+ case FAILURE:
+ ProcessInstanceMetrics.incProcessInstanceFailure();
+ break;
+ default:
+ break;
+ }
+ }
+}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowTimeoutStateEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowTimeoutStateEventHandler.java
new file mode 100644
index 0000000000..c2fc873bdc
--- /dev/null
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowTimeoutStateEventHandler.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.event;
+
+import org.apache.dolphinscheduler.common.enums.StateEventType;
+import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics;
+import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(StateEventHandler.class)
+public class WorkflowTimeoutStateEventHandler implements StateEventHandler {
+ @Override
+ public boolean handleStateEvent(WorkflowExecuteRunnable workflowExecuteRunnable, StateEvent stateEvent) {
+ ProcessInstanceMetrics.incProcessInstanceTimeout();
+ workflowExecuteRunnable.processTimeout();
+ return true;
+ }
+
+ @Override
+ public StateEventType getEventType() {
+ return StateEventType.PROCESS_TIMEOUT;
+ }
+}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java
index b2c47e4112..f90277d0eb 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java
@@ -17,7 +17,7 @@
package org.apache.dolphinscheduler.server.master.processor;
-import org.apache.dolphinscheduler.common.enums.StateEvent;
+import org.apache.dolphinscheduler.server.master.event.StateEvent;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
@@ -62,7 +62,8 @@ public class StateEventProcessor implements NettyRequestProcessor {
}
stateEvent.setProcessInstanceId(stateEventChangeCommand.getDestProcessInstanceId());
stateEvent.setTaskInstanceId(stateEventChangeCommand.getDestTaskInstanceId());
- StateEventType type = stateEvent.getTaskInstanceId() == 0 ? StateEventType.PROCESS_STATE_CHANGE : StateEventType.TASK_STATE_CHANGE;
+ StateEventType
+ type = stateEvent.getTaskInstanceId() == 0 ? StateEventType.PROCESS_STATE_CHANGE : StateEventType.TASK_STATE_CHANGE;
stateEvent.setType(type);
try {
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskEventProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskEventProcessor.java
index e597f1cae5..42d4ab4db7 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskEventProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskEventProcessor.java
@@ -17,7 +17,7 @@
package org.apache.dolphinscheduler.server.master.processor;
-import org.apache.dolphinscheduler.common.enums.StateEvent;
+import org.apache.dolphinscheduler.server.master.event.StateEvent;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java
index 4ca6b9eccb..bdf80bbee9 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java
@@ -17,7 +17,7 @@
package org.apache.dolphinscheduler.server.master.processor.queue;
-import org.apache.dolphinscheduler.common.enums.StateEvent;
+import org.apache.dolphinscheduler.server.master.event.StateEvent;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
@@ -98,6 +98,7 @@ public class StateEventResponseService {
*/
public void addResponse(StateEvent stateEvent) {
try {
+ // check the event is validated
eventQueue.put(stateEvent);
} catch (InterruptedException e) {
logger.error("Put state event : {} error", stateEvent, e);
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEventService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEventService.java
index bed3b3d9ed..d4b97c09c7 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEventService.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEventService.java
@@ -63,11 +63,15 @@ public class TaskEventService {
@PostConstruct
public void start() {
- this.taskEventThread = new TaskEventThread();
+ this.taskEventThread = new TaskEventDispatchThread();
+ logger.info("TaskEvent dispatch thread starting");
this.taskEventThread.start();
+ logger.info("TaskEvent dispatch thread started");
this.taskEventHandlerThread = new TaskEventHandlerThread();
+ logger.info("TaskEvent handle thread staring");
this.taskEventHandlerThread.start();
+ logger.info("TaskEvent handle thread started");
}
@PreDestroy
@@ -94,14 +98,14 @@ public class TaskEventService {
* @param taskEvent taskEvent
*/
public void addEvent(TaskEvent taskEvent) {
- taskExecuteThreadPool.submitTaskEvent(taskEvent);
+ eventQueue.add(taskEvent);
}
/**
- * task worker thread
+ * Dispatch event to target task runnable.
*/
- class TaskEventThread extends BaseDaemonThread {
- protected TaskEventThread() {
+ class TaskEventDispatchThread extends BaseDaemonThread {
+ protected TaskEventDispatchThread() {
super("TaskEventLoopThread");
}
@@ -109,7 +113,7 @@ public class TaskEventService {
public void run() {
while (Stopper.isRunning()) {
try {
- // if not task , blocking here
+ // if not task event, blocking here
TaskEvent taskEvent = eventQueue.take();
taskExecuteThreadPool.submitTaskEvent(taskEvent);
} catch (InterruptedException e) {
@@ -141,6 +145,7 @@ public class TaskEventService {
TimeUnit.MILLISECONDS.sleep(Constants.SLEEP_TIME_MILLIS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
+ logger.warn("TaskEvent handle thread interrupted, will return this loop");
break;
} catch (Exception e) {
logger.error("event handler thread error", e);
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteRunnable.java
index 9529259611..f2a6d6873e 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteRunnable.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteRunnable.java
@@ -18,7 +18,7 @@
package org.apache.dolphinscheduler.server.master.processor.queue;
import org.apache.dolphinscheduler.common.enums.Event;
-import org.apache.dolphinscheduler.common.enums.StateEvent;
+import org.apache.dolphinscheduler.server.master.event.StateEvent;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
@@ -193,6 +193,7 @@ public class TaskExecuteRunnable implements Runnable {
}
}
// if taskInstance is null (maybe deleted) or finish. retry will be meaningless . so ack success
+ // send ack to worker
TaskExecuteRunningAckCommand taskExecuteRunningAckCommand = new TaskExecuteRunningAckCommand(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId());
channel.writeAndFlush(taskExecuteRunningAckCommand.convert2Command());
} catch (Exception e) {
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java
index 68e4511c42..ae578d4c03 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java
@@ -82,18 +82,13 @@ public class TaskExecuteThreadPool extends ThreadPoolTaskExecutor {
logger.warn("Cannot find workflowExecuteThread from cacheManager, event: {}", taskEvent);
return;
}
- if (!taskExecuteThreadMap.containsKey(taskEvent.getProcessInstanceId())) {
- TaskExecuteRunnable taskExecuteThread = new TaskExecuteRunnable(
- taskEvent.getProcessInstanceId(),
- processService, workflowExecuteThreadPool,
- processInstanceExecCacheManager,
- dataQualityResultOperator);
- taskExecuteThreadMap.put(taskEvent.getProcessInstanceId(), taskExecuteThread);
- }
- TaskExecuteRunnable taskExecuteRunnable= taskExecuteThreadMap.get(taskEvent.getProcessInstanceId());
- if (taskExecuteRunnable != null) {
- taskExecuteRunnable.addEvent(taskEvent);
- }
+ TaskExecuteRunnable taskExecuteRunnable = taskExecuteThreadMap.computeIfAbsent(taskEvent.getProcessInstanceId(),
+ (processInstanceId) -> new TaskExecuteRunnable(processInstanceId,
+ processService,
+ workflowExecuteThreadPool,
+ processInstanceExecCacheManager,
+ dataQualityResultOperator));
+ taskExecuteRunnable.addEvent(taskEvent);
}
public void eventHandler() {
@@ -103,7 +98,7 @@ public class TaskExecuteThreadPool extends ThreadPoolTaskExecutor {
}
public void executeEvent(TaskExecuteRunnable taskExecuteThread) {
- if (taskExecuteThread.eventSize() == 0) {
+ if (taskExecuteThread.isEmpty()) {
return;
}
if (multiThreadFilterMap.containsKey(taskExecuteThread.getKey())) {
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
index 334df5baa3..e88d84312c 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
@@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.server.master.runner;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.SlotCheckState;
-import org.apache.dolphinscheduler.service.expand.CuringParamsService;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
@@ -36,6 +35,7 @@ import org.apache.dolphinscheduler.server.master.metrics.MasterServerMetrics;
import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics;
import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
+import org.apache.dolphinscheduler.service.expand.CuringParamsService;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.commons.collections4.CollectionUtils;
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
index a56a7d8c5a..c70fb1f1d4 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
@@ -18,7 +18,7 @@
package org.apache.dolphinscheduler.server.master.runner;
import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.common.enums.StateEvent;
+import org.apache.dolphinscheduler.server.master.event.StateEvent;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index 7fd23ba42c..fdec38e65e 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -35,16 +35,14 @@ import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.Priority;
-import org.apache.dolphinscheduler.common.enums.StateEvent;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
-import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
-import org.apache.dolphinscheduler.service.expand.CuringParamsService;
import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
import org.apache.dolphinscheduler.common.process.ProcessDag;
+import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
@@ -63,20 +61,23 @@ import org.apache.dolphinscheduler.dao.utils.DagHelper;
import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
-import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
-import org.apache.dolphinscheduler.plugin.task.api.parameters.BlockingParameters;
import org.apache.dolphinscheduler.remote.command.HostUpdateCommand;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
-import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics;
+import org.apache.dolphinscheduler.server.master.event.StateEvent;
+import org.apache.dolphinscheduler.server.master.event.StateEventHandleError;
+import org.apache.dolphinscheduler.server.master.event.StateEventHandleException;
+import org.apache.dolphinscheduler.server.master.event.StateEventHandler;
+import org.apache.dolphinscheduler.server.master.event.StateEventHandlerManager;
import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics;
import org.apache.dolphinscheduler.server.master.runner.task.ITaskProcessor;
import org.apache.dolphinscheduler.server.master.runner.task.TaskAction;
import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory;
import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
import org.apache.dolphinscheduler.service.corn.CronUtils;
+import org.apache.dolphinscheduler.service.expand.CuringParamsService;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue;
@@ -283,16 +284,37 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
if (!isStart) {
return;
}
+ StateEvent stateEvent = null;
while (!this.stateEvents.isEmpty()) {
try {
- StateEvent stateEvent = this.stateEvents.peek();
- LoggerUtils.setWorkflowAndTaskInstanceIDMDC(stateEvent.getProcessInstanceId(), stateEvent.getTaskInstanceId());
- if (stateEventHandler(stateEvent)) {
+ stateEvent = this.stateEvents.peek();
+ LoggerUtils.setWorkflowAndTaskInstanceIDMDC(stateEvent.getProcessInstanceId(),
+ stateEvent.getTaskInstanceId());
+ // if state handle success then will remove this state, otherwise will retry this state next time.
+ // The state should always handle success except database error.
+ checkProcessInstance(stateEvent);
+
+ StateEventHandler stateEventHandler =
+ StateEventHandlerManager.getStateEventHandler(stateEvent.getType())
+ .orElseThrow(() -> new StateEventHandleError("Cannot find handler for the given state event"));
+ if (stateEventHandler.handleStateEvent(this, stateEvent)) {
this.stateEvents.remove(stateEvent);
}
+ } catch (StateEventHandleError stateEventHandleError) {
+ logger.error("State event handle error, will remove this event: {}", stateEvent, stateEventHandleError);
+ this.stateEvents.remove(stateEvent);
+ ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
+ } catch (StateEventHandleException stateEventHandleException) {
+ logger.error("State event handle error, will retry this event: {}",
+ stateEvent,
+ stateEventHandleException);
+ ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
} catch (Exception e) {
// we catch the exception here, since if the state event handle failed, the state event will still keep in the stateEvents queue.
- logger.error("state handle error:", e);
+ logger.error("State event handle error, get a unknown exception, will retry this event: {}",
+ stateEvent,
+ e);
+ ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
} finally {
LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
}
@@ -330,58 +352,14 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
return this.processInstance;
}
- private boolean stateEventHandler(StateEvent stateEvent) {
- logger.info("process event: {}", stateEvent);
-
- if (!checkProcessInstance(stateEvent)) {
- return false;
- }
-
- boolean result = false;
- switch (stateEvent.getType()) {
- case PROCESS_STATE_CHANGE:
- measureProcessState(stateEvent);
- result = processStateChangeHandler(stateEvent);
- break;
- case TASK_STATE_CHANGE:
- measureTaskState(stateEvent);
- result = taskStateChangeHandler(stateEvent);
- break;
- case PROCESS_TIMEOUT:
- ProcessInstanceMetrics.incProcessInstanceTimeout();
- result = processTimeout();
- break;
- case TASK_TIMEOUT:
- TaskMetrics.incTaskTimeout();
- result = taskTimeout(stateEvent);
- break;
- case WAIT_TASK_GROUP:
- result = checkForceStartAndWakeUp(stateEvent);
- break;
- case TASK_RETRY:
- TaskMetrics.incTaskRetry();
- result = taskRetryEventHandler(stateEvent);
- break;
- case PROCESS_BLOCKED:
- result = processBlockHandler(stateEvent);
- break;
- default:
- break;
- }
-
- if (result) {
- this.stateEvents.remove(stateEvent);
- }
- return result;
- }
-
- private boolean checkForceStartAndWakeUp(StateEvent stateEvent) {
+ public boolean checkForceStartAndWakeUp(StateEvent stateEvent) {
TaskGroupQueue taskGroupQueue = this.processService.loadTaskGroupQueue(stateEvent.getTaskInstanceId());
if (taskGroupQueue.getForceStart() == Flag.YES.getCode()) {
TaskInstance taskInstance = this.processService.findTaskInstanceById(stateEvent.getTaskInstanceId());
ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(taskInstance.getTaskCode());
taskProcessor.action(TaskAction.DISPATCH);
- this.processService.updateTaskGroupQueueStatus(taskGroupQueue.getTaskId(), TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode());
+ this.processService.updateTaskGroupQueueStatus(taskGroupQueue.getTaskId(),
+ TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode());
return true;
}
if (taskGroupQueue.getInQueue() == Flag.YES.getCode()) {
@@ -396,76 +374,17 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
return false;
}
- private boolean taskTimeout(StateEvent stateEvent) {
- if (!checkTaskInstanceByStateEvent(stateEvent)) {
- return true;
- }
-
- TaskInstance taskInstance = taskInstanceMap.get(stateEvent.getTaskInstanceId());
- if (TimeoutFlag.CLOSE == taskInstance.getTaskDefine().getTimeoutFlag()) {
- return true;
- }
- TaskTimeoutStrategy taskTimeoutStrategy = taskInstance.getTaskDefine().getTimeoutNotifyStrategy();
- if (TaskTimeoutStrategy.FAILED == taskTimeoutStrategy || TaskTimeoutStrategy.WARNFAILED == taskTimeoutStrategy) {
- ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(taskInstance.getTaskCode());
- taskProcessor.action(TaskAction.TIMEOUT);
- }
- if (TaskTimeoutStrategy.WARN == taskTimeoutStrategy || TaskTimeoutStrategy.WARNFAILED == taskTimeoutStrategy) {
- ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(processInstance.getId());
- processAlertManager.sendTaskTimeoutAlert(processInstance, taskInstance, projectUser);
- }
- return true;
- }
-
- private boolean processTimeout() {
+ public void processTimeout() {
ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(processInstance.getId());
this.processAlertManager.sendProcessTimeoutAlert(this.processInstance, projectUser);
- return true;
}
- private boolean taskStateChangeHandler(StateEvent stateEvent) {
- if (!checkTaskInstanceByStateEvent(stateEvent)) {
- return true;
- }
-
- Optional<TaskInstance> taskInstanceOptional = getTaskInstance(stateEvent.getTaskInstanceId());
- TaskInstance task = taskInstanceOptional.orElseThrow(
- () -> new RuntimeException("Cannot find task instance by task instance id: " + stateEvent.getTaskInstanceId()));
-
- if (task.getState() == null) {
- logger.error("task state is null, state handler error: {}", stateEvent);
- return true;
- }
-
- if (task.getState().typeIsFinished()) {
- if (completeTaskMap.containsKey(task.getTaskCode()) && completeTaskMap.get(task.getTaskCode()) == task.getId()) {
- logger.warn("The task instance is already complete, stateEvent: {}", stateEvent);
- return true;
- }
- taskFinished(task);
- if (task.getTaskGroupId() > 0) {
- releaseTaskGroup(task);
- }
- return true;
- }
- if (activeTaskProcessorMaps.containsKey(task.getTaskCode())) {
- ITaskProcessor iTaskProcessor = activeTaskProcessorMaps.get(task.getTaskCode());
- iTaskProcessor.action(TaskAction.RUN);
-
- if (iTaskProcessor.taskInstance().getState().typeIsFinished()) {
- if (iTaskProcessor.taskInstance().getState() != task.getState()) {
- task.setState(iTaskProcessor.taskInstance().getState());
- }
- taskFinished(task);
- }
- return true;
- }
- logger.error("state handler error: {}", stateEvent);
-
- return true;
+ public void taskTimeout(TaskInstance taskInstance) {
+ ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(processInstance.getId());
+ processAlertManager.sendTaskTimeoutAlert(processInstance, taskInstance, projectUser);
}
- private void taskFinished(TaskInstance taskInstance) {
+ public void taskFinished(TaskInstance taskInstance) throws StateEventHandleException {
logger.info("TaskInstance finished task code:{} state:{} ",
taskInstance.getTaskCode(),
taskInstance.getState());
@@ -512,7 +431,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
*
* @param taskInstance
*/
- private void releaseTaskGroup(TaskInstance taskInstance) {
+ public void releaseTaskGroup(TaskInstance taskInstance) {
if (taskInstance.getTaskGroupId() > 0) {
TaskInstance nextTaskInstance = this.processService.releaseTaskGroup(taskInstance);
if (nextTaskInstance != null) {
@@ -536,13 +455,15 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
*
* @param taskInstance
*/
- private void retryTaskInstance(TaskInstance taskInstance) {
+ private void retryTaskInstance(TaskInstance taskInstance) throws StateEventHandleException {
if (!taskInstance.taskCanRetry()) {
return;
}
TaskInstance newTaskInstance = cloneRetryTaskInstance(taskInstance);
if (newTaskInstance == null) {
- logger.error("retry fail, new taskInstance is null, task code:{}, task id:{}", taskInstance.getTaskCode(), taskInstance.getId());
+ logger.error("retry fail, new taskInstance is null, task code:{}, task id:{}",
+ taskInstance.getTaskCode(),
+ taskInstance.getId());
return;
}
waitToRetryTaskInstanceMap.put(newTaskInstance.getTaskCode(), newTaskInstance);
@@ -563,20 +484,6 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
}
}
- /**
- * handle task retry event
- *
- * @param stateEvent
- * @return
- */
- private boolean taskRetryEventHandler(StateEvent stateEvent) {
- TaskInstance taskInstance = waitToRetryTaskInstanceMap.get(stateEvent.getTaskCode());
- addTaskToStandByList(taskInstance);
- submitStandByTask();
- waitToRetryTaskInstanceMap.remove(stateEvent.getTaskCode());
- return true;
- }
-
/**
* update process instance
*/
@@ -610,45 +517,23 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
/**
* check process instance by state event
*/
- public boolean checkProcessInstance(StateEvent stateEvent) {
+ public void checkProcessInstance(StateEvent stateEvent) throws StateEventHandleError {
if (this.processInstance.getId() != stateEvent.getProcessInstanceId()) {
- logger.error("mismatch process instance id: {}, state event:{}",
- this.processInstance.getId(),
- stateEvent);
- return false;
+ throw new StateEventHandleError("The event doesn't contains process instance id");
}
- return true;
}
/**
* check if task instance exist by state event
*/
- public boolean checkTaskInstanceByStateEvent(StateEvent stateEvent) {
+ public void checkTaskInstanceByStateEvent(StateEvent stateEvent) throws StateEventHandleError {
if (stateEvent.getTaskInstanceId() == 0) {
- logger.error("task instance id null, state event:{}", stateEvent);
- return false;
+ throw new StateEventHandleError("The taskInstanceId is 0");
}
if (!taskInstanceMap.containsKey(stateEvent.getTaskInstanceId())) {
- logger.error("mismatch task instance id, event:{}", stateEvent);
- return false;
+ throw new StateEventHandleError("Cannot find the taskInstance from taskInstanceMap");
}
- return true;
- }
-
- /**
- * check if task instance exist by task code
- */
- public boolean checkTaskInstanceByCode(long taskCode) {
- if (taskInstanceMap.isEmpty()) {
- return false;
- }
- for (TaskInstance taskInstance : taskInstanceMap.values()) {
- if (taskInstance.getTaskCode() == taskCode) {
- return true;
- }
- }
- return false;
}
/**
@@ -697,58 +582,13 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
return Optional.empty();
}
- private boolean processStateChangeHandler(StateEvent stateEvent) {
- try {
- logger.info("process:{} state {} change to {}", processInstance.getId(), processInstance.getState(), stateEvent.getExecutionStatus());
-
- if (stateEvent.getExecutionStatus() == ExecutionStatus.STOP) {
- // serial wait execution type needs to wake up the waiting process
- if (processDefinition.getExecutionType().typeIsSerialWait() || processDefinition.getExecutionType().typeIsSerialPriority()) {
- endProcess();
- return true;
- }
- this.updateProcessInstanceState(stateEvent);
- return true;
- }
- if (processComplementData()) {
- return true;
- }
- if (stateEvent.getExecutionStatus().typeIsFinished()) {
- endProcess();
- }
- if (processInstance.getState() == ExecutionStatus.READY_STOP) {
- killAllTasks();
- }
- return true;
- } catch (Exception e) {
- logger.error("process state change error:", e);
- }
- return true;
- }
-
- private boolean processBlockHandler(StateEvent stateEvent) {
- try {
- Optional<TaskInstance> taskInstanceOptional = getTaskInstance(stateEvent.getTaskInstanceId());
- TaskInstance task = taskInstanceOptional.orElseThrow(
- () -> new RuntimeException("Cannot find taskInstance by taskInstanceId:" + stateEvent.getTaskInstanceId()));
- if (!checkTaskInstanceByStateEvent(stateEvent)) {
- logger.error("task {} is not a blocking task", task.getTaskCode());
- return false;
- }
-
- BlockingParameters parameters = JSONUtils.parseObject(task.getTaskParams(), BlockingParameters.class);
- if (parameters.isAlertWhenBlocking()) {
- ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(processInstance.getId());
- processAlertManager.sendProcessBlockingAlert(processInstance, projectUser);
- logger.info("processInstance {} block alert send successful!", processInstance.getId());
- }
- } catch (Exception e) {
- logger.error("sending blocking message error:", e);
- }
- return true;
+ public void processBlock() {
+ ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(processInstance.getId());
+ processAlertManager.sendProcessBlockingAlert(processInstance, projectUser);
+ logger.info("processInstance {} block alert send successful!", processInstance.getId());
}
- private boolean processComplementData() throws Exception {
+ public boolean processComplementData() {
if (!needComplementProcess()) {
return false;
}
@@ -946,7 +786,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
/**
* init task queue
*/
- private void initTaskQueue() {
+ private void initTaskQueue() throws StateEventHandleException {
taskFailedSubmit = false;
activeTaskProcessorMaps.clear();
@@ -955,7 +795,8 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
errorTaskMap.clear();
if (!isNewProcessInstance()) {
- List<TaskInstance> validTaskInstanceList = processService.findValidTaskListByProcessId(processInstance.getId());
+ List<TaskInstance> validTaskInstanceList =
+ processService.findValidTaskListByProcessId(processInstance.getId());
for (TaskInstance task : validTaskInstanceList) {
if (validTaskMap.containsKey(task.getTaskCode())) {
int oldTaskInstanceId = validTaskMap.get(task.getTaskCode());
@@ -965,7 +806,8 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
processService.updateTaskInstance(task);
continue;
}
- logger.warn("have same taskCode taskInstance when init task queue, taskCode:{}", task.getTaskCode());
+ logger.warn("have same taskCode taskInstance when init task queue, taskCode:{}",
+ task.getTaskCode());
}
validTaskMap.put(task.getTaskCode(), task.getId());
@@ -1113,6 +955,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
Host host = new Host(taskInstance.getHost());
nettyExecutorManager.doExecute(host, hostUpdateCommand.convert2Command());
} catch (Exception e) {
+ // Do we need to catch this exception?
logger.error("notify process host update", e);
}
}
@@ -1366,8 +1209,9 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
return validTaskInstanceList;
}
- private void submitPostNode(String parentNodeCode) {
- Set<String> submitTaskNodeList = DagHelper.parsePostNodes(parentNodeCode, skipTaskNodeMap, dag, getCompleteTaskInstanceMap());
+ private void submitPostNode(String parentNodeCode) throws StateEventHandleException {
+ Set<String> submitTaskNodeList =
+ DagHelper.parsePostNodes(parentNodeCode, skipTaskNodeMap, dag, getCompleteTaskInstanceMap());
List<TaskInstance> taskInstances = new ArrayList<>();
for (String taskNode : submitTaskNodeList) {
TaskNode taskNodeObject = dag.getNode(taskNode);
@@ -1710,34 +1554,19 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
return true;
}
- try {
- Map<String, String> cmdParam = JSONUtils.toMap(processInstance.getCommandParam());
- Date endTime = DateUtils.getScheduleDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE));
- return processInstance.getScheduleTime().equals(endTime);
- } catch (Exception e) {
- logger.error("complement end failed ", e);
- return false;
- }
+ Map<String, String> cmdParam = JSONUtils.toMap(processInstance.getCommandParam());
+ Date endTime = DateUtils.getScheduleDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE));
+ return processInstance.getScheduleTime().equals(endTime);
}
/**
* updateProcessInstance process instance state
* after each batch of tasks is executed, the status of the process instance is updated
*/
- private void updateProcessInstanceState() {
+ private void updateProcessInstanceState() throws StateEventHandleException {
ExecutionStatus state = getProcessInstanceState(processInstance);
if (processInstance.getState() != state) {
- logger.info(
- "work flow process instance [id: {}, name:{}], state change from {} to {}, cmd type: {}",
- processInstance.getId(), processInstance.getName(),
- processInstance.getState(), state,
- processInstance.getCommandType());
-
- processInstance.setState(state);
- if (state.typeIsFinished()) {
- processInstance.setEndTime(new Date());
- }
- processService.updateProcessInstance(processInstance);
+ updateWorkflowInstanceStatesToDB(state);
StateEvent stateEvent = new StateEvent();
stateEvent.setExecutionStatus(processInstance.getState());
@@ -1752,20 +1581,33 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
/**
* stateEvent's execution status as process instance state
*/
- private void updateProcessInstanceState(StateEvent stateEvent) {
+ public void updateProcessInstanceState(StateEvent stateEvent) throws StateEventHandleException {
ExecutionStatus state = stateEvent.getExecutionStatus();
- if (processInstance.getState() != state) {
- logger.info(
- "work flow process instance [id: {}, name:{}], state change from {} to {}, cmd type: {}",
- processInstance.getId(), processInstance.getName(),
- processInstance.getState(), state,
- processInstance.getCommandType());
-
- processInstance.setState(state);
- if (state.typeIsFinished()) {
+ updateWorkflowInstanceStatesToDB(state);
+ }
+
+ private void updateWorkflowInstanceStatesToDB(ExecutionStatus newStates) throws StateEventHandleException {
+ ExecutionStatus originStates = processInstance.getState();
+ if (originStates != newStates) {
+ logger.info("work flow process instance [id: {}, name:{}], state change from {} to {}, cmd type: {}",
+ processInstance.getId(),
+ processInstance.getName(),
+ originStates,
+ newStates,
+ processInstance.getCommandType());
+
+ processInstance.setState(newStates);
+ if (newStates.typeIsFinished()) {
processInstance.setEndTime(new Date());
}
- processService.updateProcessInstance(processInstance);
+ try {
+ processService.updateProcessInstance(processInstance);
+ } catch (Exception ex) {
+ // recover the status
+ processInstance.setState(originStates);
+ processInstance.setEndTime(null);
+ throw new StateEventHandleException("Update process instance status to DB error", ex);
+ }
}
}
@@ -1784,19 +1626,17 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
*
* @param taskInstance task instance
*/
- private void addTaskToStandByList(TaskInstance taskInstance) {
- try {
- if (readyToSubmitTaskQueue.contains(taskInstance)) {
- logger.warn("task was found in ready submit queue, task code:{}", taskInstance.getTaskCode());
- return;
- }
- logger.info("add task to stand by list, task name:{}, task id:{}, task code:{}",
- taskInstance.getName(), taskInstance.getId(), taskInstance.getTaskCode());
- TaskMetrics.incTaskSubmit();
- readyToSubmitTaskQueue.put(taskInstance);
- } catch (Exception e) {
- logger.error("add task instance to readyToSubmitTaskQueue, taskName:{}, task id:{}", taskInstance.getName(), taskInstance.getId(), e);
+ public void addTaskToStandByList(TaskInstance taskInstance) {
+ if (readyToSubmitTaskQueue.contains(taskInstance)) {
+ logger.warn("task was found in ready submit queue, task code:{}", taskInstance.getTaskCode());
+ return;
}
+ logger.info("add task to stand by list, task name:{}, task id:{}, task code:{}",
+ taskInstance.getName(),
+ taskInstance.getId(),
+ taskInstance.getTaskCode());
+ TaskMetrics.incTaskSubmit();
+ readyToSubmitTaskQueue.put(taskInstance);
}
/**
@@ -1805,13 +1645,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
* @param taskInstance task instance
*/
private void removeTaskFromStandbyList(TaskInstance taskInstance) {
- try {
- readyToSubmitTaskQueue.remove(taskInstance);
- } catch (Exception e) {
- logger.error("remove task instance from readyToSubmitTaskQueue error, task id:{}, Name: {}",
- taskInstance.getId(),
- taskInstance.getName(), e);
- }
+ readyToSubmitTaskQueue.remove(taskInstance);
}
/**
@@ -1831,9 +1665,10 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
/**
* close the on going tasks
*/
- private void killAllTasks() {
- logger.info("kill called on process instance id: {}, num: {}", processInstance.getId(),
- activeTaskProcessorMaps.size());
+ public void killAllTasks() {
+ logger.info("kill called on process instance id: {}, num: {}",
+ processInstance.getId(),
+ activeTaskProcessorMaps.size());
if (readyToSubmitTaskQueue.size() > 0) {
readyToSubmitTaskQueue.clear();
@@ -1868,7 +1703,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
/**
* handling the list of tasks to be submitted
*/
- private void submitStandByTask() {
+ public void submitStandByTask() throws StateEventHandleException {
int length = readyToSubmitTaskQueue.size();
for (int i = 0; i < length; i++) {
TaskInstance task = readyToSubmitTaskQueue.peek();
@@ -2027,6 +1862,19 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
throw new Exception("resubmit error, taskProcessor is null, task code: " + taskCode);
}
}
+
+ public Map<Long, Integer> getCompleteTaskMap() {
+ return completeTaskMap;
+ }
+
+ public Map<Long, ITaskProcessor> getActiveTaskProcessMap() {
+ return activeTaskProcessorMaps;
+ }
+
+ public Map<Long, TaskInstance> getWaitToRetryTaskInstanceMap() {
+ return waitToRetryTaskInstanceMap;
+ }
+
private void setGlobalParamIfCommanded(ProcessDefinition processDefinition, Map<String, String> cmdParam) {
// get start params from command param
Map<String, String> startParamMap = new HashMap<>();
@@ -2061,46 +1909,4 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
}
}
- private void measureProcessState(StateEvent processStateEvent) {
- if (processStateEvent.getExecutionStatus().typeIsFinished()) {
- ProcessInstanceMetrics.incProcessInstanceFinish();
- }
- switch (processStateEvent.getExecutionStatus()) {
- case STOP:
- ProcessInstanceMetrics.incProcessInstanceStop();
- break;
- case SUCCESS:
- ProcessInstanceMetrics.incProcessInstanceSuccess();
- break;
- case FAILURE:
- ProcessInstanceMetrics.incProcessInstanceFailure();
- break;
- default:
- break;
- }
- }
-
- private void measureTaskState(StateEvent taskStateEvent) {
- if (taskStateEvent == null || taskStateEvent.getExecutionStatus() == null) {
- // the event is broken
- logger.warn("The task event is broken..., taskEvent: {}", taskStateEvent);
- return;
- }
- if (taskStateEvent.getExecutionStatus().typeIsFinished()) {
- TaskMetrics.incTaskFinish();
- }
- switch (taskStateEvent.getExecutionStatus()) {
- case STOP:
- TaskMetrics.incTaskStop();
- break;
- case SUCCESS:
- TaskMetrics.incTaskSuccess();
- break;
- case FAILURE:
- TaskMetrics.incTaskFailure();
- break;
- default:
- break;
- }
- }
}
\ No newline at end of file
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
index 4085e99051..ce66318967 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
@@ -18,7 +18,7 @@
package org.apache.dolphinscheduler.server.master.runner;
import org.apache.dolphinscheduler.common.enums.Flag;
-import org.apache.dolphinscheduler.common.enums.StateEvent;
+import org.apache.dolphinscheduler.server.master.event.StateEvent;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
@@ -30,7 +30,6 @@ import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
-import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics;
import org.apache.dolphinscheduler.service.process.ProcessService;
import java.util.Map;
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java
index b377ceedbf..2d4bdfe5da 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java
@@ -19,7 +19,7 @@ package org.apache.dolphinscheduler.server.master.service;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.NodeType;
-import org.apache.dolphinscheduler.common.enums.StateEvent;
+import org.apache.dolphinscheduler.server.master.event.StateEvent;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java
index fc9bcca3aa..98bf514730 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java
@@ -26,7 +26,7 @@ import static org.mockito.Mockito.doNothing;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.NodeType;
-import org.apache.dolphinscheduler.common.enums.StateEvent;
+import org.apache.dolphinscheduler.server.master.event.StateEvent;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/NettyRemoteChannel.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/NettyRemoteChannel.java
index 247e4066f8..0d623e398a 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/NettyRemoteChannel.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/NettyRemoteChannel.java
@@ -17,12 +17,13 @@
package org.apache.dolphinscheduler.remote.processor;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
import org.apache.dolphinscheduler.remote.utils.Host;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+
/**
* callback channel
*/
@@ -72,7 +73,7 @@ public class NettyRemoteChannel {
return this.channel.isActive();
}
- public ChannelFuture writeAndFlush(Command command){
+ public ChannelFuture writeAndFlush(Command command) {
return this.channel.writeAndFlush(command);
}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/StateEventCallbackService.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/StateEventCallbackService.java
index fb2b411523..be564261fb 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/StateEventCallbackService.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/StateEventCallbackService.java
@@ -24,6 +24,7 @@ import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.utils.Host;
+import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
@@ -71,19 +72,19 @@ public class StateEventCallbackService {
* @param host
* @return callback channel
*/
- private NettyRemoteChannel newRemoteChannel(Host host) {
+ private Optional<NettyRemoteChannel> newRemoteChannel(Host host) {
Channel newChannel;
NettyRemoteChannel nettyRemoteChannel = REMOTE_CHANNELS.get(host.getAddress());
if (nettyRemoteChannel != null) {
if (nettyRemoteChannel.isActive()) {
- return nettyRemoteChannel;
+ return Optional.of(nettyRemoteChannel);
}
}
newChannel = nettyRemotingClient.getChannel(host);
if (newChannel != null) {
- return newRemoteChannel(newChannel, host.getAddress());
+ return Optional.of(newRemoteChannel(newChannel, host.getAddress()));
}
- return null;
+ return Optional.empty();
}
public long pause(int ntries) {
@@ -110,24 +111,26 @@ public class StateEventCallbackService {
}
/**
- * send result
+ * Send the command to target address, this method doesn't guarantee the command send success.
*
- * @param command command
+ * @param command command need tp send
*/
public void sendResult(String address, int port, Command command) {
logger.info("send result, host:{}, command:{}", address, command.toString());
Host host = new Host(address, port);
- NettyRemoteChannel nettyRemoteChannel = newRemoteChannel(host);
- if (nettyRemoteChannel != null) {
- nettyRemoteChannel.writeAndFlush(command);
- }
+ sendResult(host, command);
}
+ /**
+ * Send the command to target host, this method doesn't guarantee the command send success.
+ *
+ * @param host target host
+ * @param command command need to send
+ */
public void sendResult(Host host, Command command) {
logger.info("send result, host:{}, command:{}", host.getAddress(), command.toString());
- NettyRemoteChannel nettyRemoteChannel = newRemoteChannel(host);
- if (nettyRemoteChannel != null) {
+ newRemoteChannel(host).ifPresent(nettyRemoteChannel -> {
nettyRemoteChannel.writeAndFlush(command);
- }
+ });
}
}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java
index cc7f39e402..0cf03abe8d 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java
@@ -51,10 +51,9 @@ public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue<TaskInst
* put task instance to priority queue
*
* @param taskInstance taskInstance
- * @throws TaskPriorityQueueException
*/
@Override
- public void put(TaskInstance taskInstance) throws TaskPriorityQueueException {
+ public void put(TaskInstance taskInstance) {
Preconditions.checkNotNull(taskInstance);
queue.add(taskInstance);
taskInstanceIdentifySet.add(getTaskInstanceIdentify(taskInstance));
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueue.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueue.java
index 1325afe9b2..0007581d3c 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueue.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueue.java
@@ -33,7 +33,7 @@ public interface TaskPriorityQueue<T> {
* @param taskInfo taskInfo
* @throws TaskPriorityQueueException
*/
- void put(T taskInfo) throws TaskPriorityQueueException;
+ void put(T taskInfo);
/**
* take taskInfo
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java
index fef5f8ff79..0d0eebe03d 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java
@@ -40,10 +40,9 @@ public class TaskPriorityQueueImpl implements TaskPriorityQueue<TaskPriority> {
* put task takePriorityInfo
*
* @param taskPriorityInfo takePriorityInfo
- * @throws TaskPriorityQueueException
*/
@Override
- public void put(TaskPriority taskPriorityInfo) throws TaskPriorityQueueException {
+ public void put(TaskPriority taskPriorityInfo) {
queue.put(taskPriorityInfo);
}
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteResponseAckProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteResponseAckProcessor.java
index e080842c34..b62770984a 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteResponseAckProcessor.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteResponseAckProcessor.java
@@ -58,7 +58,13 @@ public class TaskExecuteResponseAckProcessor implements NettyRequestProcessor {
if (taskExecuteResponseAckCommand.getStatus() == ExecutionStatus.SUCCESS.getCode()) {
ResponseCache.get().removeResponseCache(taskExecuteResponseAckCommand.getTaskInstanceId());
TaskCallbackService.remove(taskExecuteResponseAckCommand.getTaskInstanceId());
- logger.debug("remove REMOTE_CHANNELS, task instance id:{}", taskExecuteResponseAckCommand.getTaskInstanceId());
+ logger.debug("remove REMOTE_CHANNELS, task instance id:{}",
+ taskExecuteResponseAckCommand.getTaskInstanceId());
+ } else if (taskExecuteResponseAckCommand.getStatus() == ExecutionStatus.FAILURE.getCode()) {
+ // master handle worker response error, will still retry
+ } else {
+ throw new IllegalArgumentException("Invalid task execute response ack status: "
+ + taskExecuteResponseAckCommand.getStatus());
}
}