You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by we...@apache.org on 2022/06/10 11:38:30 UTC
[dolphinscheduler] branch dev updated: Add some warning log in master (#10383)
This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new b0d9d3f9ab Add some warning log in master (#10383)
b0d9d3f9ab is described below
commit b0d9d3f9ab20f822ce019eb73c893ad8811d14a9
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Fri Jun 10 19:38:20 2022 +0800
Add some warning log in master (#10383)
* Add some warn log in master
* fix may skip sleep
---
.../server/master/config/MasterConfig.java | 34 ++++++++++++++++++++++
.../master/consumer/TaskPriorityQueueConsumer.java | 25 ++++++++--------
.../master/processor/TaskRecallProcessor.java | 2 +-
.../server/master/processor/queue/TaskEvent.java | 2 +-
.../master/runner/MasterSchedulerService.java | 13 ++++-----
.../master/runner/StateWheelExecuteThread.java | 10 +++++--
.../master/runner/WorkflowExecuteRunnable.java | 4 +--
.../master/runner/WorkflowExecuteThreadPool.java | 16 ++++++----
.../remote/NettyRemotingClient.java | 4 ++-
.../service/process/ProcessServiceImpl.java | 6 ++--
.../service/queue/TaskPriorityQueueImpl.java | 9 ++----
11 files changed, 84 insertions(+), 41 deletions(-)
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
index f7a8dddd03..76d4ae1525 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
@@ -18,6 +18,8 @@
package org.apache.dolphinscheduler.server.master.config;
import org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostSelector;
+import org.apache.dolphinscheduler.server.master.processor.queue.TaskExecuteRunnable;
+import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
@@ -27,15 +29,47 @@ import org.springframework.stereotype.Component;
@EnableConfigurationProperties
@ConfigurationProperties("master")
public class MasterConfig {
+ /**
+ * The master RPC server listen port.
+ */
private int listenPort;
+ /**
+ * The max batch size used to fetch command from database.
+ */
private int fetchCommandNum;
+ /**
+ * The thread number used to prepare processInstance. This number shouldn't bigger than fetchCommandNum.
+ */
private int preExecThreads;
+ /**
+ * todo: We may need to split the process/task into different thread size.
+ * The thread number used to handle processInstance and task event.
+ * Will create two thread poll to execute {@link WorkflowExecuteRunnable} and {@link TaskExecuteRunnable}.
+ */
private int execThreads;
+ /**
+ * The task dispatch thread pool size.
+ */
private int dispatchTaskNumber;
+ /**
+ * Worker select strategy.
+ */
private HostSelector hostSelector;
+ /**
+ * Master heart beat task execute interval.
+ */
private int heartbeatInterval;
+ /**
+ * task submit max retry times.
+ */
private int taskCommitRetryTimes;
+ /**
+ * task submit retry interval/ms.
+ */
private int taskCommitInterval;
+ /**
+ * state wheel check interval/ms, if this value is bigger, may increase the delay of task/processInstance.
+ */
private int stateWheelInterval;
private double maxCpuLoadAvg;
private double reservedMemory;
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
index adeaff248f..c1b54df62f 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
@@ -39,6 +39,8 @@ import org.apache.dolphinscheduler.service.queue.TaskPriority;
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
+import org.apache.commons.collections.CollectionUtils;
+
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -54,11 +56,6 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-import io.micrometer.core.annotation.Counted;
-import io.micrometer.core.annotation.Timed;
-
-import org.apache.commons.lang3.time.StopWatch;
-
/**
* TaskUpdateQueue consumer
*/
@@ -124,7 +121,7 @@ public class TaskPriorityQueueConsumer extends Thread {
try {
List<TaskPriority> failedDispatchTasks = this.batchDispatch(fetchTaskNum);
- if (!failedDispatchTasks.isEmpty()) {
+ if (CollectionUtils.isNotEmpty(failedDispatchTasks)) {
TaskMetrics.incTaskDispatchFailed(failedDispatchTasks.size());
for (TaskPriority dispatchFailedTask : failedDispatchTasks) {
taskPriorityQueue.put(dispatchFailedTask);
@@ -157,11 +154,15 @@ public class TaskPriorityQueueConsumer extends Thread {
}
consumerThreadPoolExecutor.submit(() -> {
- boolean dispatchResult = this.dispatchTask(taskPriority);
- if (!dispatchResult) {
- failedDispatchTasks.add(taskPriority);
+ try {
+ boolean dispatchResult = this.dispatchTask(taskPriority);
+ if (!dispatchResult) {
+ failedDispatchTasks.add(taskPriority);
+ }
+ } finally {
+ // make sure the latch countDown
+ latch.countDown();
}
- latch.countDown();
});
}
@@ -171,10 +172,10 @@ public class TaskPriorityQueueConsumer extends Thread {
}
/**
- * dispatch task
+ * Dispatch task to worker.
*
* @param taskPriority taskPriority
- * @return result
+ * @return dispatch result, return true if dispatch success, return false if dispatch failed.
*/
protected boolean dispatchTask(TaskPriority taskPriority) {
TaskMetrics.incTaskDispatch();
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskRecallProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskRecallProcessor.java
index 2d94d026fa..e8afc28703 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskRecallProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskRecallProcessor.java
@@ -56,7 +56,7 @@ public class TaskRecallProcessor implements NettyRequestProcessor {
Preconditions.checkArgument(CommandType.TASK_RECALL == command.getType(), String.format("invalid command type : %s", command.getType()));
TaskRecallCommand recallCommand = JSONUtils.parseObject(command.getBody(), TaskRecallCommand.class);
logger.info("taskRecallCommand : {}", recallCommand);
- TaskEvent taskEvent = TaskEvent.newRecall(recallCommand, channel);
+ TaskEvent taskEvent = TaskEvent.newRecallEvent(recallCommand, channel);
taskEventService.addEvent(taskEvent);
}
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java
index 8227793c9f..3ed41329fe 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java
@@ -136,7 +136,7 @@ public class TaskEvent {
return event;
}
- public static TaskEvent newRecall(TaskRecallCommand command, Channel channel) {
+ public static TaskEvent newRecallEvent(TaskRecallCommand command, Channel channel) {
TaskEvent event = new TaskEvent();
event.setTaskInstanceId(command.getTaskInstanceId());
event.setProcessInstanceId(command.getProcessInstanceId());
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
index ae9a461d10..23577a90db 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
@@ -49,7 +49,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
- * master scheduler thread
+ * Master scheduler thread, this thread will consume the commands from database and trigger processInstance executed.
*/
@Service
public class MasterSchedulerService extends Thread {
@@ -163,11 +163,8 @@ public class MasterSchedulerService extends Thread {
MasterServerMetrics.incMasterConsumeCommand(commands.size());
for (ProcessInstance processInstance : processInstances) {
- if (processInstance == null) {
- continue;
- }
- WorkflowExecuteRunnable workflowExecuteThread = new WorkflowExecuteRunnable(
+ WorkflowExecuteRunnable workflowExecuteRunnable = new WorkflowExecuteRunnable(
processInstance
, processService
, nettyExecutorManager
@@ -175,11 +172,11 @@ public class MasterSchedulerService extends Thread {
, masterConfig
, stateWheelExecuteThread);
- this.processInstanceExecCacheManager.cache(processInstance.getId(), workflowExecuteThread);
+ this.processInstanceExecCacheManager.cache(processInstance.getId(), workflowExecuteRunnable);
if (processInstance.getTimeout() > 0) {
stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance);
}
- workflowExecuteThreadPool.startWorkflow(workflowExecuteThread);
+ workflowExecuteThreadPool.startWorkflow(workflowExecuteRunnable);
}
}
@@ -203,7 +200,7 @@ public class MasterSchedulerService extends Thread {
logger.info("handle command {} end, create process instance {}", command.getId(), processInstance.getId());
}
} catch (Exception e) {
- logger.error("handle command error ", e);
+ logger.error("handle command {} error ", command.getId(), e);
processService.moveToErrorCommand(command, e.toString());
} finally {
latch.countDown();
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
index 7635209ebb..e85ddf08bd 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
@@ -31,8 +31,9 @@ import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheM
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.runner.task.TaskInstanceKey;
-import org.apache.hadoop.util.ThreadUtil;
+import org.apache.commons.lang3.ThreadUtils;
+import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -84,6 +85,7 @@ public class StateWheelExecuteThread extends Thread {
@Override
public void run() {
+ Duration checkInterval = Duration.ofMillis(masterConfig.getStateWheelInterval() * Constants.SLEEP_TIME_MILLIS);
while (Stopper.isRunning()) {
try {
checkTask4Timeout();
@@ -93,7 +95,11 @@ public class StateWheelExecuteThread extends Thread {
} catch (Exception e) {
logger.error("state wheel thread check error:", e);
}
- ThreadUtil.sleepAtLeastIgnoreInterrupts((long) masterConfig.getStateWheelInterval() * Constants.SLEEP_TIME_MILLIS);
+ try {
+ ThreadUtils.sleep(checkInterval);
+ } catch (InterruptedException e) {
+ logger.error("state wheel thread sleep error", e);
+ }
}
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index 6c162717ce..f9caf000d8 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -805,11 +805,11 @@ public class WorkflowExecuteRunnable implements Runnable {
*/
@Override
public void run() {
- if (this.taskInstanceMap.size() > 0) {
+ if (this.taskInstanceMap.size() > 0 || isStart) {
+ logger.warn("The workflow has already been started");
return;
}
try {
- isStart = false;
buildFlowDag();
initTaskQueue();
submitPostNode(null);
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
index 324e4d9367..849c3a23b3 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
@@ -46,6 +46,9 @@ import org.springframework.util.concurrent.ListenableFutureCallback;
import com.google.common.base.Strings;
+/**
+ * Used to execute {@link WorkflowExecuteRunnable}, when
+ */
@Component
public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
@@ -69,7 +72,7 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
/**
* multi-thread filter, avoid handling workflow at the same time
*/
- private ConcurrentHashMap<String, WorkflowExecuteRunnable> multiThreadFilterMap = new ConcurrentHashMap();
+ private ConcurrentHashMap<String, WorkflowExecuteRunnable> multiThreadFilterMap = new ConcurrentHashMap<>();
@PostConstruct
private void init() {
@@ -92,7 +95,7 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
}
/**
- * start workflow
+ * Start the given workflow.
*/
public void startWorkflow(WorkflowExecuteRunnable workflowExecuteThread) {
ProcessInstanceMetrics.incProcessInstanceSubmit();
@@ -100,13 +103,14 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
}
/**
- * execute workflow
+ * Handle the events belong to the given workflow.
*/
public void executeEvent(WorkflowExecuteRunnable workflowExecuteThread) {
if (!workflowExecuteThread.isStart() || workflowExecuteThread.eventSize() == 0) {
return;
}
if (multiThreadFilterMap.containsKey(workflowExecuteThread.getKey())) {
+ logger.warn("The workflow:{} has been executed by another thread", workflowExecuteThread.getKey());
return;
}
multiThreadFilterMap.put(workflowExecuteThread.getKey(), workflowExecuteThread);
@@ -121,8 +125,6 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
@Override
public void onSuccess(Object result) {
- // if an exception occurs, first, the error message cannot be printed in the log;
- // secondly, the `multiThreadFilterMap` cannot remove the `workflowExecuteThread`, resulting in the state of process instance cannot be changed and memory leak
try {
if (workflowExecuteThread.workFlowFinish()) {
stateWheelExecuteThread.removeProcess4TimeoutCheck(workflowExecuteThread.getProcessInstance());
@@ -132,8 +134,10 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
}
} catch (Exception e) {
logger.error("handle events {} success, but notify changed error", processInstanceId, e);
+ } finally {
+ // make sure the process has been removed from multiThreadFilterMap
+ multiThreadFilterMap.remove(workflowExecuteThread.getKey());
}
- multiThreadFilterMap.remove(workflowExecuteThread.getKey());
}
});
}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java
index 8d83574914..6668f2d36c 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java
@@ -304,6 +304,8 @@ public class NettyRemotingClient {
logger.error(msg, future.cause());
throw new RemotingException(msg);
}
+ } catch (RemotingException remotingException) {
+ throw remotingException;
} catch (Exception e) {
logger.error("Send command {} to address {} encounter error.", command, host.getAddress());
throw new RemotingException(String.format("Send command : %s , to :%s encounter error", command, host.getAddress()), e);
@@ -385,10 +387,10 @@ public class NettyRemotingClient {
if (this.responseFutureExecutor != null) {
this.responseFutureExecutor.shutdownNow();
}
+ logger.info("netty client closed");
} catch (Exception ex) {
logger.error("netty client close exception", ex);
}
- logger.info("netty client closed");
}
}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index c91a0fcdbb..3760bde2b6 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.service.process;
-import io.micrometer.core.annotation.Counted;
import static java.util.stream.Collectors.toSet;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
@@ -163,6 +162,8 @@ import com.google.common.base.Joiner;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
+import io.micrometer.core.annotation.Counted;
+
/**
* process relative dao that some mappers in this.
*/
@@ -1266,8 +1267,9 @@ public class ProcessServiceImpl implements ProcessService {
Thread.sleep(commitInterval);
} catch (Exception e) {
logger.error("task commit to db failed", e);
+ } finally {
+ retryTimes += 1;
}
- retryTimes += 1;
}
return task;
}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java
index 8d630beeb0..fef5f8ff79 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java
@@ -29,15 +29,12 @@ import org.springframework.stereotype.Service;
*/
@Service
public class TaskPriorityQueueImpl implements TaskPriorityQueue<TaskPriority> {
- /**
- * queue size
- */
- private static final Integer QUEUE_MAX_SIZE = 3000;
/**
- * queue
+ * Task queue, this queue is unbounded, this means it will cause OutOfMemoryError.
+ * The master will stop to generate the task if memory is too high.
*/
- private PriorityBlockingQueue<TaskPriority> queue = new PriorityBlockingQueue<>(QUEUE_MAX_SIZE);
+ private final PriorityBlockingQueue<TaskPriority> queue = new PriorityBlockingQueue<>(3000);
/**
* put task takePriorityInfo