You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by le...@apache.org on 2022/03/31 13:10:16 UTC

[dolphinscheduler] branch dev updated: [Bug-9295][Master] fix repeated submit task (#9304)

This is an automated email from the ASF dual-hosted git repository.

leonbao pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 1073fca  [Bug-9295][Master] fix repeated submit task (#9304)
1073fca is described below

commit 1073fcae443e417b082ac4a745454bdef1da1f7d
Author: caishunfeng <ca...@gmail.com>
AuthorDate: Thu Mar 31 21:10:09 2022 +0800

    [Bug-9295][Master] fix repeated submit task (#9304)
    
    Co-authored-by: caishunfeng <53...@qq.com>
---
 .../server/master/consumer/TaskPriorityQueueConsumer.java           | 6 ++++--
 .../server/master/processor/queue/TaskExecuteThreadPool.java        | 2 +-
 .../server/master/runner/WorkflowExecuteThreadPool.java             | 2 +-
 3 files changed, 6 insertions(+), 4 deletions(-)

diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
index 06bbc30..f692685 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
@@ -186,8 +186,10 @@ public class TaskPriorityQueueConsumer extends Thread {
             if (result) {
                 addDispatchEvent(context, executionContext);
             }
-        } catch (RuntimeException | ExecuteException e) {
-            logger.error("dispatch error: {}", e.getMessage(), e);
+        } catch (RuntimeException e) {
+            logger.error("dispatch error: ", e);
+        } catch (ExecuteException e) {
+            logger.error("dispatch error: {}", e.getMessage());
         }
         return result;
     }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java
index ccdab1b..7bea22f 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java
@@ -110,8 +110,8 @@ public class TaskExecuteThreadPool extends ThreadPoolTaskExecutor {
             return;
         }
         ListenableFuture future = this.submitListenable(() -> {
-            taskExecuteThread.run();
             multiThreadFilterMap.put(taskExecuteThread.getKey(), taskExecuteThread);
+            taskExecuteThread.run();
         });
         future.addCallback(new ListenableFutureCallback() {
             @Override
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
index 8edad98..0b20663 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
@@ -109,8 +109,8 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
         }
         int processInstanceId = workflowExecuteThread.getProcessInstance().getId();
         ListenableFuture future = this.submitListenable(() -> {
-            workflowExecuteThread.handleEvents();
             multiThreadFilterMap.put(workflowExecuteThread.getKey(), workflowExecuteThread);
+            workflowExecuteThread.handleEvents();
         });
         future.addCallback(new ListenableFutureCallback() {
             @Override