You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2022/03/14 08:22:56 UTC

[dolphinscheduler] branch dev updated: [Fix][Master] Fix master-server execution logic thread unsafe problem (#8707)

This is an automated email from the ASF dual-hosted git repository.

caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new c4fc59c  [Fix][Master] Fix master-server execution logic thread unsafe problem (#8707)
c4fc59c is described below

commit c4fc59c9bb4797fcc044c1c335b1c0691da5994c
Author: Kerwin <37...@users.noreply.github.com>
AuthorDate: Mon Mar 14 16:22:48 2022 +0800

    [Fix][Master] Fix master-server execution logic thread unsafe problem (#8707)
    
    * Fix thread safety issue in master service
    
    * optimize code.
---
 .../server/master/consumer/TaskPriorityQueueConsumer.java         | 3 ++-
 .../server/master/runner/MasterSchedulerService.java              | 4 +++-
 .../apache/dolphinscheduler/service/process/ProcessService.java   | 8 ++++----
 3 files changed, 9 insertions(+), 6 deletions(-)

diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
index c6562bc..c65195c 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
@@ -33,6 +33,7 @@ import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
 import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.CountDownLatch;
@@ -120,7 +121,7 @@ public class TaskPriorityQueueConsumer extends Thread {
      * batch dispatch with thread pool
      */
     private List<TaskPriority> batchDispatch(int fetchTaskNum) throws TaskPriorityQueueException, InterruptedException {
-        List<TaskPriority> failedDispatchTasks = new ArrayList<>();
+        List<TaskPriority> failedDispatchTasks = Collections.synchronizedList(new ArrayList<>());
         CountDownLatch latch = new CountDownLatch(fetchTaskNum);
 
         for (int i = 0; i < fetchTaskNum; i++) {
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
index 2202cc5..9d61aa1 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
@@ -36,6 +36,7 @@ import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.apache.commons.collections4.CollectionUtils;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -179,7 +180,8 @@ public class MasterSchedulerService extends Thread {
     }
 
     private List<ProcessInstance> command2ProcessInstance(List<Command> commands) {
-        List<ProcessInstance> processInstances = new ArrayList<>(commands.size());
+
+        List<ProcessInstance> processInstances = Collections.synchronizedList(new ArrayList<>(commands.size()));
         CountDownLatch latch = new CountDownLatch(commands.size());
         for (final Command command : commands) {
             masterPrepareExecService.execute(() -> {
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index 8fabaf9..b62e6ed 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -886,11 +886,11 @@ public class ProcessService {
         //reset command parameter
         if (processInstance.getCommandParam() != null) {
             Map<String, String> processCmdParam = JSONUtils.toMap(processInstance.getCommandParam());
-            for (Map.Entry<String, String> entry : processCmdParam.entrySet()) {
-                if (!cmdParam.containsKey(entry.getKey())) {
-                    cmdParam.put(entry.getKey(), entry.getValue());
+            processCmdParam.forEach((key, value) -> {
+                if (!cmdParam.containsKey(key)) {
+                    cmdParam.put(key, value);
                 }
-            }
+            });
         }
         // reset command parameter if sub process
         if (cmdParam != null && cmdParam.containsKey(Constants.CMD_PARAM_SUB_PROCESS)) {