You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by we...@apache.org on 2022/07/19 04:25:33 UTC
[dolphinscheduler] 06/29: Fix TaskProcessorFactory#getTaskProcessor get common processor is not thread safe (#10479)
This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch 3.0.0-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
commit 9a4c7f876aafb54cdd28a889ffb1b7856022c448
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Thu Jun 16 21:46:18 2022 +0800
Fix TaskProcessorFactory#getTaskProcessor get common processor is not thread safe (#10479)
* Fix TaskProcessorFactory#getTaskProcessor get common processor is not thread safe
(cherry picked from commit ad2646ff1f7baa5d76d29023ced2c28a89b52f6b)
---
.../server/master/runner/WorkflowExecuteRunnable.java | 5 +++++
.../server/master/runner/WorkflowExecuteThreadPool.java | 2 +-
.../server/master/runner/task/CommonTaskProcessor.java | 2 +-
3 files changed, 7 insertions(+), 2 deletions(-)
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index 014abd30bc..2b11498a32 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -2050,6 +2050,11 @@ public class WorkflowExecuteRunnable implements Runnable {
}
private void measureTaskState(StateEvent taskStateEvent) {
+ if (taskStateEvent == null || taskStateEvent.getExecutionStatus() == null) {
+ // the event is broken
+ logger.warn("The task event is broken..., taskEvent: {}", taskStateEvent);
+ return;
+ }
if (taskStateEvent.getExecutionStatus().typeIsFinished()) {
TaskMetrics.incTaskFinish();
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
index d8e2fbbeda..92ff7c04f1 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
@@ -107,7 +107,7 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
/**
* Handle the events belong to the given workflow.
*/
- public void executeEvent(WorkflowExecuteRunnable workflowExecuteThread) {
+ public void executeEvent(final WorkflowExecuteRunnable workflowExecuteThread) {
if (!workflowExecuteThread.isStart() || workflowExecuteThread.eventSize() == 0) {
return;
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
index ae8198ebfd..a19d08affc 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
@@ -125,7 +125,7 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
taskPriority.setTaskExecutionContext(taskExecutionContext);
taskUpdateQueue.put(taskPriority);
- logger.info("master submit success, task : {}", taskInstance.getName());
+ logger.info("Master submit task to priority queue success, taskInstanceId : {}", taskInstance.getId());
return true;
} catch (Exception e) {
logger.error("submit task error", e);