You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by we...@apache.org on 2022/06/10 11:38:30 UTC

[dolphinscheduler] branch dev updated: Add some warning log in master (#10383)

This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new b0d9d3f9ab Add some warning log in master (#10383)
b0d9d3f9ab is described below

commit b0d9d3f9ab20f822ce019eb73c893ad8811d14a9
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Fri Jun 10 19:38:20 2022 +0800

    Add some warning log in master (#10383)
    
    * Add some warn log in master
    
    * fix may skip sleep
---
 .../server/master/config/MasterConfig.java         | 34 ++++++++++++++++++++++
 .../master/consumer/TaskPriorityQueueConsumer.java | 25 ++++++++--------
 .../master/processor/TaskRecallProcessor.java      |  2 +-
 .../server/master/processor/queue/TaskEvent.java   |  2 +-
 .../master/runner/MasterSchedulerService.java      | 13 ++++-----
 .../master/runner/StateWheelExecuteThread.java     | 10 +++++--
 .../master/runner/WorkflowExecuteRunnable.java     |  4 +--
 .../master/runner/WorkflowExecuteThreadPool.java   | 16 ++++++----
 .../remote/NettyRemotingClient.java                |  4 ++-
 .../service/process/ProcessServiceImpl.java        |  6 ++--
 .../service/queue/TaskPriorityQueueImpl.java       |  9 ++----
 11 files changed, 84 insertions(+), 41 deletions(-)

diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
index f7a8dddd03..76d4ae1525 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
@@ -18,6 +18,8 @@
 package org.apache.dolphinscheduler.server.master.config;
 
 import org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostSelector;
+import org.apache.dolphinscheduler.server.master.processor.queue.TaskExecuteRunnable;
+import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
 
 import org.springframework.boot.context.properties.ConfigurationProperties;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
@@ -27,15 +29,47 @@ import org.springframework.stereotype.Component;
 @EnableConfigurationProperties
 @ConfigurationProperties("master")
 public class MasterConfig {
+    /**
+     * The master RPC server listen port.
+     */
     private int listenPort;
+    /**
+     * The max batch size used to fetch command from database.
+     */
     private int fetchCommandNum;
+    /**
+     * The thread number used to prepare processInstance. This number shouldn't bigger than fetchCommandNum.
+     */
     private int preExecThreads;
+    /**
+     * todo: We may need to split the process/task into different thread size.
+     * The thread number used to handle processInstance and task event.
+     * Will create two thread poll to execute {@link WorkflowExecuteRunnable} and {@link TaskExecuteRunnable}.
+     */
     private int execThreads;
+    /**
+     * The task dispatch thread pool size.
+     */
     private int dispatchTaskNumber;
+    /**
+     * Worker select strategy.
+     */
     private HostSelector hostSelector;
+    /**
+     * Master heart beat task execute interval.
+     */
     private int heartbeatInterval;
+    /**
+     * task submit max retry times.
+     */
     private int taskCommitRetryTimes;
+    /**
+     * task submit retry interval/ms.
+     */
     private int taskCommitInterval;
+    /**
+     * state wheel check interval/ms, if this value is bigger, may increase the delay of task/processInstance.
+     */
     private int stateWheelInterval;
     private double maxCpuLoadAvg;
     private double reservedMemory;
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
index adeaff248f..c1b54df62f 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
@@ -39,6 +39,8 @@ import org.apache.dolphinscheduler.service.queue.TaskPriority;
 import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
 import org.apache.dolphinscheduler.spi.utils.JSONUtils;
 
+import org.apache.commons.collections.CollectionUtils;
+
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -54,11 +56,6 @@ import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
-import io.micrometer.core.annotation.Counted;
-import io.micrometer.core.annotation.Timed;
-
-import org.apache.commons.lang3.time.StopWatch;
-
 /**
  * TaskUpdateQueue consumer
  */
@@ -124,7 +121,7 @@ public class TaskPriorityQueueConsumer extends Thread {
             try {
                 List<TaskPriority> failedDispatchTasks = this.batchDispatch(fetchTaskNum);
 
-                if (!failedDispatchTasks.isEmpty()) {
+                if (CollectionUtils.isNotEmpty(failedDispatchTasks)) {
                     TaskMetrics.incTaskDispatchFailed(failedDispatchTasks.size());
                     for (TaskPriority dispatchFailedTask : failedDispatchTasks) {
                         taskPriorityQueue.put(dispatchFailedTask);
@@ -157,11 +154,15 @@ public class TaskPriorityQueueConsumer extends Thread {
             }
 
             consumerThreadPoolExecutor.submit(() -> {
-                boolean dispatchResult = this.dispatchTask(taskPriority);
-                if (!dispatchResult) {
-                    failedDispatchTasks.add(taskPriority);
+                try {
+                    boolean dispatchResult = this.dispatchTask(taskPriority);
+                    if (!dispatchResult) {
+                        failedDispatchTasks.add(taskPriority);
+                    }
+                } finally {
+                    // make sure the latch countDown
+                    latch.countDown();
                 }
-                latch.countDown();
             });
         }
 
@@ -171,10 +172,10 @@ public class TaskPriorityQueueConsumer extends Thread {
     }
 
     /**
-     * dispatch task
+     * Dispatch task to worker.
      *
      * @param taskPriority taskPriority
-     * @return result
+     * @return dispatch result, return true if dispatch success, return false if dispatch failed.
      */
     protected boolean dispatchTask(TaskPriority taskPriority) {
         TaskMetrics.incTaskDispatch();
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskRecallProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskRecallProcessor.java
index 2d94d026fa..e8afc28703 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskRecallProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskRecallProcessor.java
@@ -56,7 +56,7 @@ public class TaskRecallProcessor implements NettyRequestProcessor {
         Preconditions.checkArgument(CommandType.TASK_RECALL == command.getType(), String.format("invalid command type : %s", command.getType()));
         TaskRecallCommand recallCommand = JSONUtils.parseObject(command.getBody(), TaskRecallCommand.class);
         logger.info("taskRecallCommand : {}", recallCommand);
-        TaskEvent taskEvent = TaskEvent.newRecall(recallCommand, channel);
+        TaskEvent taskEvent = TaskEvent.newRecallEvent(recallCommand, channel);
         taskEventService.addEvent(taskEvent);
     }
 }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java
index 8227793c9f..3ed41329fe 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java
@@ -136,7 +136,7 @@ public class TaskEvent {
         return event;
     }
 
-    public static TaskEvent newRecall(TaskRecallCommand command, Channel channel) {
+    public static TaskEvent newRecallEvent(TaskRecallCommand command, Channel channel) {
         TaskEvent event = new TaskEvent();
         event.setTaskInstanceId(command.getTaskInstanceId());
         event.setProcessInstanceId(command.getProcessInstanceId());
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
index ae9a461d10..23577a90db 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
@@ -49,7 +49,7 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
 /**
- * master scheduler thread
+ * Master scheduler thread, this thread will consume the commands from database and trigger processInstance executed.
  */
 @Service
 public class MasterSchedulerService extends Thread {
@@ -163,11 +163,8 @@ public class MasterSchedulerService extends Thread {
         MasterServerMetrics.incMasterConsumeCommand(commands.size());
 
         for (ProcessInstance processInstance : processInstances) {
-            if (processInstance == null) {
-                continue;
-            }
 
-            WorkflowExecuteRunnable workflowExecuteThread = new WorkflowExecuteRunnable(
+            WorkflowExecuteRunnable workflowExecuteRunnable = new WorkflowExecuteRunnable(
                     processInstance
                     , processService
                     , nettyExecutorManager
@@ -175,11 +172,11 @@ public class MasterSchedulerService extends Thread {
                     , masterConfig
                     , stateWheelExecuteThread);
 
-            this.processInstanceExecCacheManager.cache(processInstance.getId(), workflowExecuteThread);
+            this.processInstanceExecCacheManager.cache(processInstance.getId(), workflowExecuteRunnable);
             if (processInstance.getTimeout() > 0) {
                 stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance);
             }
-            workflowExecuteThreadPool.startWorkflow(workflowExecuteThread);
+            workflowExecuteThreadPool.startWorkflow(workflowExecuteRunnable);
         }
     }
 
@@ -203,7 +200,7 @@ public class MasterSchedulerService extends Thread {
                         logger.info("handle command {} end, create process instance {}", command.getId(), processInstance.getId());
                     }
                 } catch (Exception e) {
-                    logger.error("handle command error ", e);
+                    logger.error("handle command {} error ", command.getId(), e);
                     processService.moveToErrorCommand(command, e.toString());
                 } finally {
                     latch.countDown();
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
index 7635209ebb..e85ddf08bd 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
@@ -31,8 +31,9 @@ import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheM
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.master.runner.task.TaskInstanceKey;
 
-import org.apache.hadoop.util.ThreadUtil;
+import org.apache.commons.lang3.ThreadUtils;
 
+import java.time.Duration;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
@@ -84,6 +85,7 @@ public class StateWheelExecuteThread extends Thread {
 
     @Override
     public void run() {
+        Duration checkInterval = Duration.ofMillis(masterConfig.getStateWheelInterval() * Constants.SLEEP_TIME_MILLIS);
         while (Stopper.isRunning()) {
             try {
                 checkTask4Timeout();
@@ -93,7 +95,11 @@ public class StateWheelExecuteThread extends Thread {
             } catch (Exception e) {
                 logger.error("state wheel thread check error:", e);
             }
-            ThreadUtil.sleepAtLeastIgnoreInterrupts((long) masterConfig.getStateWheelInterval() * Constants.SLEEP_TIME_MILLIS);
+            try {
+                ThreadUtils.sleep(checkInterval);
+            } catch (InterruptedException e) {
+                logger.error("state wheel thread sleep error", e);
+            }
         }
     }
 
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index 6c162717ce..f9caf000d8 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -805,11 +805,11 @@ public class WorkflowExecuteRunnable implements Runnable {
      */
     @Override
     public void run() {
-        if (this.taskInstanceMap.size() > 0) {
+        if (this.taskInstanceMap.size() > 0 || isStart) {
+            logger.warn("The workflow has already been started");
             return;
         }
         try {
-            isStart = false;
             buildFlowDag();
             initTaskQueue();
             submitPostNode(null);
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
index 324e4d9367..849c3a23b3 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
@@ -46,6 +46,9 @@ import org.springframework.util.concurrent.ListenableFutureCallback;
 
 import com.google.common.base.Strings;
 
+/**
+ * Used to execute {@link WorkflowExecuteRunnable}, when
+ */
 @Component
 public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
 
@@ -69,7 +72,7 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
     /**
      * multi-thread filter, avoid handling workflow at the same time
      */
-    private ConcurrentHashMap<String, WorkflowExecuteRunnable> multiThreadFilterMap = new ConcurrentHashMap();
+    private ConcurrentHashMap<String, WorkflowExecuteRunnable> multiThreadFilterMap = new ConcurrentHashMap<>();
 
     @PostConstruct
     private void init() {
@@ -92,7 +95,7 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
     }
 
     /**
-     * start workflow
+     * Start the given workflow.
      */
     public void startWorkflow(WorkflowExecuteRunnable workflowExecuteThread) {
         ProcessInstanceMetrics.incProcessInstanceSubmit();
@@ -100,13 +103,14 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
     }
 
     /**
-     * execute workflow
+     * Handle the events belong to the given workflow.
      */
     public void executeEvent(WorkflowExecuteRunnable workflowExecuteThread) {
         if (!workflowExecuteThread.isStart() || workflowExecuteThread.eventSize() == 0) {
             return;
         }
         if (multiThreadFilterMap.containsKey(workflowExecuteThread.getKey())) {
+            logger.warn("The workflow:{} has been executed by another thread", workflowExecuteThread.getKey());
             return;
         }
         multiThreadFilterMap.put(workflowExecuteThread.getKey(), workflowExecuteThread);
@@ -121,8 +125,6 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
 
             @Override
             public void onSuccess(Object result) {
-                // if an exception occurs, first, the error message cannot be printed in the log;
-                // secondly, the `multiThreadFilterMap` cannot remove the `workflowExecuteThread`, resulting in the state of process instance cannot be changed and memory leak
                 try {
                     if (workflowExecuteThread.workFlowFinish()) {
                         stateWheelExecuteThread.removeProcess4TimeoutCheck(workflowExecuteThread.getProcessInstance());
@@ -132,8 +134,10 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
                     }
                 } catch (Exception e) {
                     logger.error("handle events {} success, but notify changed error", processInstanceId, e);
+                } finally {
+                    // make sure the process has been removed from multiThreadFilterMap
+                    multiThreadFilterMap.remove(workflowExecuteThread.getKey());
                 }
-                multiThreadFilterMap.remove(workflowExecuteThread.getKey());
             }
         });
     }
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java
index 8d83574914..6668f2d36c 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java
@@ -304,6 +304,8 @@ public class NettyRemotingClient {
                 logger.error(msg, future.cause());
                 throw new RemotingException(msg);
             }
+        } catch (RemotingException remotingException) {
+            throw remotingException;
         } catch (Exception e) {
             logger.error("Send command {} to address {} encounter error.", command, host.getAddress());
             throw new RemotingException(String.format("Send command : %s , to :%s encounter error", command, host.getAddress()), e);
@@ -385,10 +387,10 @@ public class NettyRemotingClient {
                 if (this.responseFutureExecutor != null) {
                     this.responseFutureExecutor.shutdownNow();
                 }
+                logger.info("netty client closed");
             } catch (Exception ex) {
                 logger.error("netty client close exception", ex);
             }
-            logger.info("netty client closed");
         }
     }
 
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index c91a0fcdbb..3760bde2b6 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -17,7 +17,6 @@
 
 package org.apache.dolphinscheduler.service.process;
 
-import io.micrometer.core.annotation.Counted;
 import static java.util.stream.Collectors.toSet;
 import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
 import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
@@ -163,6 +162,8 @@ import com.google.common.base.Joiner;
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 
+import io.micrometer.core.annotation.Counted;
+
 /**
  * process relative dao that some mappers in this.
  */
@@ -1266,8 +1267,9 @@ public class ProcessServiceImpl implements ProcessService {
                 Thread.sleep(commitInterval);
             } catch (Exception e) {
                 logger.error("task commit to db failed", e);
+            } finally {
+                retryTimes += 1;
             }
-            retryTimes += 1;
         }
         return task;
     }
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java
index 8d630beeb0..fef5f8ff79 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java
@@ -29,15 +29,12 @@ import org.springframework.stereotype.Service;
  */
 @Service
 public class TaskPriorityQueueImpl implements TaskPriorityQueue<TaskPriority> {
-    /**
-     * queue size
-     */
-    private static final Integer QUEUE_MAX_SIZE = 3000;
 
     /**
-     * queue
+     * Task queue, this queue is unbounded, this means it will cause OutOfMemoryError.
+     * The master will stop to generate the task if memory is too high.
      */
-    private PriorityBlockingQueue<TaskPriority> queue = new PriorityBlockingQueue<>(QUEUE_MAX_SIZE);
+    private final PriorityBlockingQueue<TaskPriority> queue = new PriorityBlockingQueue<>(3000);
 
     /**
      * put task takePriorityInfo