You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by we...@apache.org on 2022/07/19 04:25:51 UTC
[dolphinscheduler] 24/29: Catch exception when check state in StateWheelExecuteThread (#10908)
This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch 3.0.0-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
commit 527ee472fbc22cee6cd564f2cb6e3ee008939d04
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Wed Jul 13 10:51:20 2022 +0800
Catch exception when check state in StateWheelExecuteThread (#10908)
* Catch exception when check state
(cherry picked from commit 2a67866718a74a6abc30fb615c0e16978511e3eb)
---
.../master/runner/StateWheelExecuteThread.java | 68 ++++++++++++++--------
.../master/runner/WorkflowExecuteRunnable.java | 5 +-
.../src/main/resources/application.yaml | 2 +-
.../service/process/ProcessServiceImpl.java | 2 +-
.../service/process/ProcessServiceTest.java | 7 ++-
.../src/main/resources/application.yaml | 2 +-
6 files changed, 57 insertions(+), 29 deletions(-)
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
index 0d1b3a423a..c053cb238b 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
@@ -134,23 +134,34 @@ public class StateWheelExecuteThread extends BaseDaemonThread {
return;
}
for (Integer processInstanceId : processInstanceTimeoutCheckList) {
- WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
- if (workflowExecuteThread == null) {
- logger.warn("Check workflow timeout failed, can not find workflowExecuteThread from cache manager, will remove this workflowInstance from check list");
- processInstanceTimeoutCheckList.remove(processInstanceId);
- continue;
- }
- ProcessInstance processInstance = workflowExecuteThread.getProcessInstance();
- if (processInstance == null) {
- logger.warn("Check workflow timeout failed, the workflowInstance is null");
- continue;
- }
- long timeRemain = DateUtils.getRemainTime(processInstance.getStartTime(), (long) processInstance.getTimeout() * Constants.SEC_2_MINUTES_TIME_UNIT);
- if (timeRemain < 0) {
- logger.info("Workflow instance timeout, adding timeout event");
- addProcessTimeoutEvent(processInstance);
- processInstanceTimeoutCheckList.remove(processInstance.getId());
- logger.info("Workflow instance timeout, added timeout event");
+ try {
+ LoggerUtils.setWorkflowInstanceIdMDC(processInstanceId);
+ WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(
+ processInstanceId);
+ if (workflowExecuteThread == null) {
+ logger.warn(
+ "Check workflow timeout failed, can not find workflowExecuteThread from cache manager, will remove this workflowInstance from check list");
+ processInstanceTimeoutCheckList.remove(processInstanceId);
+ continue;
+ }
+ ProcessInstance processInstance = workflowExecuteThread.getProcessInstance();
+ if (processInstance == null) {
+ logger.warn("Check workflow timeout failed, the workflowInstance is null");
+ continue;
+ }
+ long timeRemain = DateUtils.getRemainTime(processInstance.getStartTime(),
+ (long) processInstance.getTimeout()
+ * Constants.SEC_2_MINUTES_TIME_UNIT);
+ if (timeRemain < 0) {
+ logger.info("Workflow instance timeout, adding timeout event");
+ addProcessTimeoutEvent(processInstance);
+ processInstanceTimeoutCheckList.remove(processInstance.getId());
+ logger.info("Workflow instance timeout, added timeout event");
+ }
+ } catch (Exception ex) {
+ logger.error("Check workflow instance timeout error");
+ } finally {
+ LoggerUtils.removeWorkflowInstanceIdMDC();
}
}
}
@@ -243,20 +254,26 @@ public class StateWheelExecuteThread extends BaseDaemonThread {
}
Optional<TaskInstance> taskInstanceOptional = workflowExecuteThread.getActiveTaskInstanceByTaskCode(taskCode);
if (!taskInstanceOptional.isPresent()) {
- logger.warn("Check task instance timeout failed, can not get taskInstance from workflowExecuteThread, taskCode: {}"
- + "will remove this check task", taskCode);
+ logger.warn(
+ "Check task instance timeout failed, can not get taskInstance from workflowExecuteThread, taskCode: {}"
+ + "will remove this check task",
+ taskCode);
taskInstanceTimeoutCheckList.remove(taskInstanceKey);
continue;
}
TaskInstance taskInstance = taskInstanceOptional.get();
if (TimeoutFlag.OPEN == taskInstance.getTaskDefine().getTimeoutFlag()) {
- long timeRemain = DateUtils.getRemainTime(taskInstance.getStartTime(), (long) taskInstance.getTaskDefine().getTimeout() * Constants.SEC_2_MINUTES_TIME_UNIT);
+ long timeRemain = DateUtils.getRemainTime(taskInstance.getStartTime(),
+ (long) taskInstance.getTaskDefine().getTimeout()
+ * Constants.SEC_2_MINUTES_TIME_UNIT);
if (timeRemain < 0) {
logger.info("Task instance is timeout, adding task timeout event and remove the check");
addTaskTimeoutEvent(taskInstance);
taskInstanceTimeoutCheckList.remove(taskInstanceKey);
}
}
+ } catch (Exception ex) {
+ logger.error("Check task timeout error, taskInstanceKey: {}", taskInstanceKey, ex);
} finally {
LoggerUtils.removeWorkflowInstanceIdMDC();
}
@@ -277,8 +294,9 @@ public class StateWheelExecuteThread extends BaseDaemonThread {
WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
if (workflowExecuteThread == null) {
- logger.warn("Task instance retry check failed, can not find workflowExecuteThread from cache manager, "
- + "will remove this check task");
+ logger.warn(
+ "Task instance retry check failed, can not find workflowExecuteThread from cache manager, "
+ + "will remove this check task");
taskInstanceRetryCheckList.remove(taskInstanceKey);
continue;
}
@@ -308,13 +326,15 @@ public class StateWheelExecuteThread extends BaseDaemonThread {
// reset taskInstance endTime and state
// todo relative funtion: TaskInstance.retryTaskIntervalOverTime, WorkflowExecuteThread.cloneRetryTaskInstance
logger.info("[TaskInstance-{}]The task instance can retry, will retry this task instance",
- taskInstance.getId());
+ taskInstance.getId());
taskInstance.setEndTime(null);
taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
addTaskRetryEvent(taskInstance);
taskInstanceRetryCheckList.remove(taskInstanceKey);
}
+ } catch (Exception ex) {
+ logger.error("Check task retry error, taskInstanceKey: {}", taskInstanceKey, ex);
} finally {
LoggerUtils.removeWorkflowInstanceIdMDC();
}
@@ -349,6 +369,8 @@ public class StateWheelExecuteThread extends BaseDaemonThread {
continue;
}
addTaskStateChangeEvent(taskInstance);
+ } catch (Exception ex) {
+ logger.error("Task state check error, taskInstanceKey: {}", taskInstanceKey, ex);
} finally {
LoggerUtils.removeWorkflowInstanceIdMDC();
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index a433981aca..0e94b0318d 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -552,8 +552,9 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
}
public Optional<TaskInstance> getActiveTaskInstanceByTaskCode(long taskCode) {
- if (activeTaskProcessorMaps.containsKey(taskCode)) {
- return Optional.ofNullable(activeTaskProcessorMaps.get(taskCode).taskInstance());
+ Integer taskInstanceId = validTaskMap.get(taskCode);
+ if (taskInstanceId != null) {
+ return Optional.ofNullable(taskInstanceMap.get(taskInstanceId));
}
return Optional.empty();
}
diff --git a/dolphinscheduler-master/src/main/resources/application.yaml b/dolphinscheduler-master/src/main/resources/application.yaml
index a277272026..ff29b19389 100644
--- a/dolphinscheduler-master/src/main/resources/application.yaml
+++ b/dolphinscheduler-master/src/main/resources/application.yaml
@@ -102,7 +102,7 @@ master:
task-commit-retry-times: 5
# master commit task interval
task-commit-interval: 1s
- state-wheel-interval: 5
+ state-wheel-interval: 5s
# master max cpuload avg, only higher than the system cpu load average, master server can schedule. default value -1: the number of cpu cores * 2
max-cpu-load-avg: -1
# master reserved memory, only lower than system available memory, master server can schedule. default value 0.3, the unit is G
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index b12cd77de2..9cce9152f2 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -909,7 +909,7 @@ public class ProcessServiceImpl implements ProcessService {
command.getProcessDefinitionVersion());
if (processDefinition == null) {
logger.error("cannot find the work process define! define code : {}", command.getProcessDefinitionCode());
- return null;
+ throw new IllegalArgumentException("Cannot find the process definition for this workflowInstance");
}
Map<String, String> cmdParam = JSONUtils.toMap(command.getCommandParam());
int processInstanceId = command.getProcessInstanceId();
diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
index a98138dd03..82dc3f4673 100644
--- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
+++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
@@ -292,7 +292,12 @@ public class ProcessServiceTest {
+ "\":\"111\",\""
+ CMD_PARAM_SUB_PROCESS_DEFINE_CODE
+ "\":\"222\"}");
- Assert.assertNull(processService.handleCommand(host, command));
+ try {
+ Assert.assertNull(processService.handleCommand(host, command));
+ } catch (IllegalArgumentException illegalArgumentException) {
+ // assert throw illegalArgumentException here since the definition is null
+ Assert.assertTrue(true);
+ }
int definitionVersion = 1;
long definitionCode = 123;
diff --git a/dolphinscheduler-standalone-server/src/main/resources/application.yaml b/dolphinscheduler-standalone-server/src/main/resources/application.yaml
index cf5ce5ea88..9a940b6c0e 100644
--- a/dolphinscheduler-standalone-server/src/main/resources/application.yaml
+++ b/dolphinscheduler-standalone-server/src/main/resources/application.yaml
@@ -120,7 +120,7 @@ master:
task-commit-retry-times: 5
# master commit task interval
task-commit-interval: 1s
- state-wheel-interval: 5
+ state-wheel-interval: 5s
# master max cpuload avg, only higher than the system cpu load average, master server can schedule. default value -1: the number of cpu cores * 2
max-cpu-load-avg: -1
# master reserved memory, only lower than system available memory, master server can schedule. default value 0.3, the unit is G