You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by we...@apache.org on 2022/07/19 04:25:33 UTC

[dolphinscheduler] 06/29: Fix TaskProcessorFactory#getTaskProcessor get common processor is not thread safe (#10479)

This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch 3.0.0-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git

commit 9a4c7f876aafb54cdd28a889ffb1b7856022c448
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Thu Jun 16 21:46:18 2022 +0800

    Fix TaskProcessorFactory#getTaskProcessor get common processor is not thread safe (#10479)
    
    * Fix TaskProcessorFactory#getTaskProcessor get common processor is not thread safe
    
    (cherry picked from commit ad2646ff1f7baa5d76d29023ced2c28a89b52f6b)
---
 .../server/master/runner/WorkflowExecuteRunnable.java                | 5 +++++
 .../server/master/runner/WorkflowExecuteThreadPool.java              | 2 +-
 .../server/master/runner/task/CommonTaskProcessor.java               | 2 +-
 3 files changed, 7 insertions(+), 2 deletions(-)

diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index 014abd30bc..2b11498a32 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -2050,6 +2050,11 @@ public class WorkflowExecuteRunnable implements Runnable {
     }
 
     private void measureTaskState(StateEvent taskStateEvent) {
+        if (taskStateEvent == null || taskStateEvent.getExecutionStatus() == null) {
+            // the event is broken
+            logger.warn("The task event is broken..., taskEvent: {}", taskStateEvent);
+            return;
+        }
         if (taskStateEvent.getExecutionStatus().typeIsFinished()) {
             TaskMetrics.incTaskFinish();
         }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
index d8e2fbbeda..92ff7c04f1 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
@@ -107,7 +107,7 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
     /**
      * Handle the events belong to the given workflow.
      */
-    public void executeEvent(WorkflowExecuteRunnable workflowExecuteThread) {
+    public void executeEvent(final WorkflowExecuteRunnable workflowExecuteThread) {
         if (!workflowExecuteThread.isStart() || workflowExecuteThread.eventSize() == 0) {
             return;
         }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
index ae8198ebfd..a19d08affc 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
@@ -125,7 +125,7 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
             taskPriority.setTaskExecutionContext(taskExecutionContext);
 
             taskUpdateQueue.put(taskPriority);
-            logger.info("master submit success, task : {}", taskInstance.getName());
+            logger.info("Master submit task to priority queue success, taskInstanceId : {}", taskInstance.getId());
             return true;
         } catch (Exception e) {
             logger.error("submit task error", e);