You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by we...@apache.org on 2022/07/19 04:25:37 UTC
[dolphinscheduler] 10/29: Optimize master log, use MDC to inject workflow instance id and task instance id in log (#10516)
This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch 3.0.0-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
commit 3ab9ee13fcffcfe69de90353ad43f8eb86c015a6
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Thu Jun 23 11:45:06 2022 +0800
Optimize master log, use MDC to inject workflow instance id and task instance id in log (#10516)
* Optimize master log, add workflow instance id and task instance id in log
* Use MDC to set the workflow info in log4j
* Add workflowInstanceId and taskInstanceId in MDC
(cherry picked from commit db595b3eff8f5e4728665b4b7b7082af710e8f9d)
---
.../apache/dolphinscheduler/alert/AlertServer.java | 23 ++-
.../apache/dolphinscheduler/common/Constants.java | 36 ++--
.../dolphinscheduler/common/thread/Stopper.java | 34 +++-
.../common/thread/ThreadUtils.java | 18 +-
.../dolphinscheduler/common/utils/LoggerUtils.java | 34 +++-
.../server/master/MasterServer.java | 25 +--
.../master/processor/StateEventProcessor.java | 12 +-
.../master/processor/TaskEventProcessor.java | 14 +-
.../processor/TaskExecuteResponseProcessor.java | 12 +-
.../processor/TaskKillResponseProcessor.java | 3 +-
.../processor/queue/StateEventResponseService.java | 23 ++-
.../processor/queue/TaskExecuteRunnable.java | 3 +
.../processor/queue/TaskExecuteThreadPool.java | 12 +-
.../master/registry/MasterRegistryClient.java | 15 +-
.../server/master/runner/EventExecuteService.java | 18 +-
.../master/runner/FailoverExecuteThread.java | 5 +-
.../master/runner/MasterSchedulerService.java | 60 ++++--
.../master/runner/StateWheelExecuteThread.java | 222 +++++++++++----------
.../master/runner/WorkflowExecuteRunnable.java | 24 ++-
.../master/runner/WorkflowExecuteThreadPool.java | 29 ++-
.../master/runner/task/TaskProcessorFactory.java | 9 +-
.../server/master/service/FailoverService.java | 24 ++-
.../src/main/resources/logback-spring.xml | 4 +-
.../service/alert/AlertClientService.java | 21 +-
.../service/task/TaskPluginManager.java | 5 +-
.../src/main/resources/logback-spring.xml | 4 +-
.../plugin/task/api/ProcessUtils.java | 15 +-
.../plugin/task/api/TaskPluginException.java | 23 +--
.../server/worker/WorkerServer.java | 117 +++--------
.../server/worker/prc/WorkerRpcServer.java | 97 +++++++++
.../worker/processor/TaskExecuteProcessor.java | 169 +++++++++-------
.../processor/TaskExecuteRunningAckProcessor.java | 17 +-
.../worker/runner/RetryReportTaskStatusThread.java | 4 +-
.../server/worker/runner/TaskExecuteThread.java | 31 ++-
.../server/worker/runner/WorkerManagerThread.java | 5 +-
.../src/main/resources/logback-spring.xml | 4 +-
.../worker/processor/TaskExecuteProcessorTest.java | 42 ++--
.../worker/runner/TaskExecuteThreadTest.java | 27 ++-
38 files changed, 747 insertions(+), 493 deletions(-)
diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java
index ee9d5b3f62..5f449fcab4 100644
--- a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java
+++ b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java
@@ -49,7 +49,10 @@ public class AlertServer implements Closeable {
private final AlertConfig alertConfig;
private NettyRemotingServer nettyRemotingServer;
- public AlertServer(PluginDao pluginDao, AlertSenderService alertSenderService, AlertRequestProcessor alertRequestProcessor, AlertConfig alertConfig) {
+ public AlertServer(PluginDao pluginDao,
+ AlertSenderService alertSenderService,
+ AlertRequestProcessor alertRequestProcessor,
+ AlertConfig alertConfig) {
this.pluginDao = pluginDao;
this.alertSenderService = alertSenderService;
this.alertRequestProcessor = alertRequestProcessor;
@@ -68,11 +71,12 @@ public class AlertServer implements Closeable {
@EventListener
public void run(ApplicationReadyEvent readyEvent) {
- logger.info("alert server starting...");
+ logger.info("Alert server is staring ...");
checkTable();
startServer();
alertSenderService.start();
+ logger.info("Alert server is started ...");
}
@Override
@@ -89,24 +93,23 @@ public class AlertServer implements Closeable {
public void destroy(String cause) {
try {
+ // set stop signal is true
// execute only once
- if (Stopper.isStopped()) {
+ if (!Stopper.stop()) {
+ logger.warn("AlterServer is already stopped");
return;
}
- logger.info("alert server is stopping ..., cause : {}", cause);
-
- // set stop signal is true
- Stopper.stop();
+ logger.info("Alert server is stopping, cause: {}", cause);
// thread sleep 3 seconds for thread quietly stop
- ThreadUtils.sleep(3000L);
+ ThreadUtils.sleep(Constants.SERVER_CLOSE_WAIT_TIME.toMillis());
// close
this.nettyRemotingServer.close();
-
+ logger.info("Alter server stopped, cause: {}", cause);
} catch (Exception e) {
- logger.error("alert server stop exception ", e);
+ logger.error("Alert server stop failed, cause: {}", cause, e);
}
}
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
index b7714c7817..f39a0c94aa 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
@@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.SystemUtils;
+import java.time.Duration;
import java.util.regex.Pattern;
/**
@@ -376,6 +377,8 @@ public final class Constants {
*/
public static final long SLEEP_TIME_MILLIS_SHORT = 100L;
+ public static final Duration SERVER_CLOSE_WAIT_TIME = Duration.ofSeconds(3);
+
/**
* one second mils
*/
@@ -636,28 +639,31 @@ public final class Constants {
*/
public static final String LOGIN_USER_KEY_TAB_PATH = "login.user.keytab.path";
+ public static final String WORKFLOW_INSTANCE_ID_MDC_KEY = "workflowInstanceId";
+ public static final String TASK_INSTANCE_ID_MDC_KEY = "taskInstanceId";
+
/**
* task log info format
*/
public static final String TASK_LOG_INFO_FORMAT = "TaskLogInfo-%s";
- public static final int[] NOT_TERMINATED_STATES = new int[]{
- ExecutionStatus.SUBMITTED_SUCCESS.ordinal(),
- ExecutionStatus.DISPATCH.ordinal(),
- ExecutionStatus.RUNNING_EXECUTION.ordinal(),
- ExecutionStatus.DELAY_EXECUTION.ordinal(),
- ExecutionStatus.READY_PAUSE.ordinal(),
- ExecutionStatus.READY_STOP.ordinal(),
- ExecutionStatus.NEED_FAULT_TOLERANCE.ordinal(),
- ExecutionStatus.WAITING_THREAD.ordinal(),
- ExecutionStatus.WAITING_DEPEND.ordinal()
+ public static final int[] NOT_TERMINATED_STATES = new int[] {
+ ExecutionStatus.SUBMITTED_SUCCESS.ordinal(),
+ ExecutionStatus.DISPATCH.ordinal(),
+ ExecutionStatus.RUNNING_EXECUTION.ordinal(),
+ ExecutionStatus.DELAY_EXECUTION.ordinal(),
+ ExecutionStatus.READY_PAUSE.ordinal(),
+ ExecutionStatus.READY_STOP.ordinal(),
+ ExecutionStatus.NEED_FAULT_TOLERANCE.ordinal(),
+ ExecutionStatus.WAITING_THREAD.ordinal(),
+ ExecutionStatus.WAITING_DEPEND.ordinal()
};
- public static final int[] RUNNING_PROCESS_STATE = new int[]{
- ExecutionStatus.RUNNING_EXECUTION.ordinal(),
- ExecutionStatus.SUBMITTED_SUCCESS.ordinal(),
- ExecutionStatus.DISPATCH.ordinal(),
- ExecutionStatus.SERIAL_WAIT.ordinal()
+ public static final int[] RUNNING_PROCESS_STATE = new int[] {
+ ExecutionStatus.RUNNING_EXECUTION.ordinal(),
+ ExecutionStatus.SUBMITTED_SUCCESS.ordinal(),
+ ExecutionStatus.DISPATCH.ordinal(),
+ ExecutionStatus.SERIAL_WAIT.ordinal()
};
/**
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/Stopper.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/Stopper.java
index 67abde7e7a..777203c0f3 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/Stopper.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/Stopper.java
@@ -19,22 +19,40 @@ package org.apache.dolphinscheduler.common.thread;
import java.util.concurrent.atomic.AtomicBoolean;
+import lombok.experimental.UtilityClass;
+
/**
- * if the process closes, a signal is placed as true, and all threads get this flag to stop working
+ * If the process closes, a signal is placed as true, and all threads get this flag to stop working.
*/
+@UtilityClass
public class Stopper {
- private static AtomicBoolean signal = new AtomicBoolean(false);
+ private static final AtomicBoolean stoppedSignal = new AtomicBoolean(false);
- public static final boolean isStopped() {
- return signal.get();
+ /**
+ * Return the flag if the Server is stopped.
+ *
+ * @return True, if the server is stopped; False, the server is still running.
+ */
+ public static boolean isStopped() {
+ return stoppedSignal.get();
}
- public static final boolean isRunning() {
- return !signal.get();
+ /**
+ * Return the flag if the Server is stopped.
+ *
+ * @return True, if the server is running, False, the server is stopped.
+ */
+ public static boolean isRunning() {
+ return !stoppedSignal.get();
}
- public static final void stop() {
- signal.set(true);
+ /**
+ * Stop the server
+ *
+ * @return True, if the server stopped success. False, if the server is already stopped.
+ */
+ public static boolean stop() {
+ return stoppedSignal.compareAndSet(false, true);
}
}
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java
index 75f624d15d..5c8020b7cd 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java
@@ -23,24 +23,28 @@ import java.util.concurrent.ThreadFactory;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
-/**
- * thread utils
- */
+import lombok.experimental.UtilityClass;
+
+@UtilityClass
public class ThreadUtils {
/**
* Wrapper over newDaemonFixedThreadExecutor.
+ *
* @param threadName threadName
* @param threadsNum threadsNum
* @return ExecutorService
*/
- public static ExecutorService newDaemonFixedThreadExecutor(String threadName,int threadsNum) {
+ public static ExecutorService newDaemonFixedThreadExecutor(String threadName, int threadsNum) {
ThreadFactory threadFactory = new ThreadFactoryBuilder()
- .setDaemon(true)
- .setNameFormat(threadName)
- .build();
+ .setDaemon(true)
+ .setNameFormat(threadName)
+ .build();
return Executors.newFixedThreadPool(threadsNum, threadFactory);
}
+ /**
+ * Sleep in given mills, this is not accuracy.
+ */
public static void sleep(final long millis) {
try {
Thread.sleep(millis);
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/LoggerUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/LoggerUtils.java
index 8dd4969494..8c749c53fc 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/LoggerUtils.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/LoggerUtils.java
@@ -32,16 +32,16 @@ import java.util.regex.Pattern;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+
+import lombok.experimental.UtilityClass;
/**
* logger utils
*/
+@UtilityClass
public class LoggerUtils {
- private LoggerUtils() {
- throw new UnsupportedOperationException("Construct LoggerUtils");
- }
-
private static final Logger logger = LoggerFactory.getLogger(LoggerUtils.class);
/**
@@ -109,4 +109,30 @@ public class LoggerUtils {
}
return "";
}
+
+ public static void setWorkflowAndTaskInstanceIDMDC(int workflowInstanceId, int taskInstanceId) {
+ setWorkflowInstanceIdMDC(workflowInstanceId);
+ setTaskInstanceIdMDC(taskInstanceId);
+ }
+
+ public static void setWorkflowInstanceIdMDC(int workflowInstanceId) {
+ MDC.put(Constants.WORKFLOW_INSTANCE_ID_MDC_KEY, String.valueOf(workflowInstanceId));
+ }
+
+ public static void setTaskInstanceIdMDC(int taskInstanceId) {
+ MDC.put(Constants.TASK_INSTANCE_ID_MDC_KEY, String.valueOf(taskInstanceId));
+ }
+
+ public static void removeWorkflowAndTaskInstanceIdMDC() {
+ removeWorkflowInstanceIdMDC();
+ removeTaskInstanceIdMDC();
+ }
+
+ public static void removeWorkflowInstanceIdMDC() {
+ MDC.remove(Constants.WORKFLOW_INSTANCE_ID_MDC_KEY);
+ }
+
+ public static void removeTaskInstanceIdMDC() {
+ MDC.remove(Constants.TASK_INSTANCE_ID_MDC_KEY);
+ }
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
index cb01e14504..95409a616b 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.master;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.thread.Stopper;
+import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient;
import org.apache.dolphinscheduler.server.master.rpc.MasterRPCServer;
import org.apache.dolphinscheduler.server.master.runner.EventExecuteService;
@@ -103,7 +104,7 @@ public class MasterServer implements IStoppable {
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
if (Stopper.isRunning()) {
- close("shutdownHook");
+ close("MasterServer shutdownHook");
}
}));
}
@@ -116,23 +117,17 @@ public class MasterServer implements IStoppable {
public void close(String cause) {
try {
+ // set stop signal is true
// execute only once
- if (Stopper.isStopped()) {
- logger.warn("MasterServer has been stopped ..., current cause: {}", cause);
+ if (!Stopper.stop()) {
+ logger.warn("MasterServer is already stopped, current cause: {}", cause);
return;
}
- logger.info("master server is stopping ..., cause : {}", cause);
-
- // set stop signal is true
- Stopper.stop();
+ logger.info("Master server is stopping, current cause : {}", cause);
- try {
- // thread sleep 3 seconds for thread quietly stop
- Thread.sleep(3000L);
- } catch (Exception e) {
- logger.warn("thread sleep exception ", e);
- }
+ // thread sleep 3 seconds for thread quietly stop
+ ThreadUtils.sleep(Constants.SERVER_CLOSE_WAIT_TIME.toMillis());
// close
this.masterSchedulerService.close();
this.masterRPCServer.close();
@@ -141,9 +136,9 @@ public class MasterServer implements IStoppable {
// like ServerNodeManager,HostManager,TaskResponseService,CuratorZookeeperClient,etc
springApplicationContext.close();
- logger.info("MasterServer stopped...");
+ logger.info("MasterServer stopped, current cause: {}", cause);
} catch (Exception e) {
- logger.error("master server stop exception ", e);
+ logger.error("MasterServer stop failed, current cause: {}", cause, e);
}
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java
index e132f285de..b2c47e4112 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java
@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.master.processor;
import org.apache.dolphinscheduler.common.enums.StateEvent;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
@@ -64,8 +65,15 @@ public class StateEventProcessor implements NettyRequestProcessor {
StateEventType type = stateEvent.getTaskInstanceId() == 0 ? StateEventType.PROCESS_STATE_CHANGE : StateEventType.TASK_STATE_CHANGE;
stateEvent.setType(type);
- logger.info("received command : {}", stateEvent);
- stateEventResponseService.addResponse(stateEvent);
+ try {
+ LoggerUtils.setWorkflowAndTaskInstanceIDMDC(stateEvent.getProcessInstanceId(), stateEvent.getTaskInstanceId());
+
+ logger.info("Received state event change command, event: {}", stateEvent);
+ stateEventResponseService.addResponse(stateEvent);
+ }finally {
+ LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
+ }
+
}
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskEventProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskEventProcessor.java
index ea2e4539c3..e597f1cae5 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskEventProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskEventProcessor.java
@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.master.processor;
import org.apache.dolphinscheduler.common.enums.StateEvent;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskEventChangeCommand;
@@ -49,8 +50,8 @@ public class TaskEventProcessor implements NettyRequestProcessor {
@Override
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.TASK_FORCE_STATE_EVENT_REQUEST == command.getType()
- || CommandType.TASK_WAKEUP_EVENT_REQUEST == command.getType()
- , String.format("invalid command type: %s", command.getType()));
+ || CommandType.TASK_WAKEUP_EVENT_REQUEST == command.getType()
+ , String.format("invalid command type: %s", command.getType()));
TaskEventChangeCommand taskEventChangeCommand = JSONUtils.parseObject(command.getBody(), TaskEventChangeCommand.class);
StateEvent stateEvent = new StateEvent();
@@ -58,8 +59,13 @@ public class TaskEventProcessor implements NettyRequestProcessor {
stateEvent.setProcessInstanceId(taskEventChangeCommand.getProcessInstanceId());
stateEvent.setTaskInstanceId(taskEventChangeCommand.getTaskInstanceId());
stateEvent.setType(StateEventType.WAIT_TASK_GROUP);
- logger.info("received command : {}", stateEvent);
- stateEventResponseService.addEvent2WorkflowExecute(stateEvent);
+ try {
+ LoggerUtils.setWorkflowAndTaskInstanceIDMDC(stateEvent.getProcessInstanceId(), stateEvent.getTaskInstanceId());
+ logger.info("Received task event change command, event: {}", stateEvent);
+ stateEventResponseService.addEvent2WorkflowExecute(stateEvent);
+ } finally {
+ LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
+ }
}
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteResponseProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteResponseProcessor.java
index 264e42e3a5..b17def873c 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteResponseProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteResponseProcessor.java
@@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.server.master.processor;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
@@ -57,9 +58,14 @@ public class TaskExecuteResponseProcessor implements NettyRequestProcessor {
Preconditions.checkArgument(CommandType.TASK_EXECUTE_RESPONSE == command.getType(), String.format("invalid command type : %s", command.getType()));
TaskExecuteResponseCommand taskExecuteResponseCommand = JSONUtils.parseObject(command.getBody(), TaskExecuteResponseCommand.class);
- logger.info("received command : {}", taskExecuteResponseCommand);
-
TaskEvent taskResponseEvent = TaskEvent.newResultEvent(taskExecuteResponseCommand, channel);
- taskEventService.addEvent(taskResponseEvent);
+ try {
+ LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskResponseEvent.getProcessInstanceId(), taskResponseEvent.getTaskInstanceId());
+ logger.info("Received task execute response, event: {}", taskResponseEvent);
+
+ taskEventService.addEvent(taskResponseEvent);
+ } finally {
+ LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
+ }
}
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java
index 135257c9a8..6079329b90 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java
@@ -51,7 +51,8 @@ public class TaskKillResponseProcessor implements NettyRequestProcessor {
Preconditions.checkArgument(CommandType.TASK_KILL_RESPONSE == command.getType(), String.format("invalid command type : %s", command.getType()));
TaskKillResponseCommand responseCommand = JSONUtils.parseObject(command.getBody(), TaskKillResponseCommand.class);
- logger.info("received task kill response command : {}", responseCommand);
+ logger.info("[TaskInstance-{}] Received task kill response command : {}",
+ responseCommand.getTaskInstanceId(), responseCommand);
}
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java
index e34dedca67..4ca6b9eccb 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java
@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.master.processor.queue;
import org.apache.dolphinscheduler.common.enums.StateEvent;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.thread.Stopper;
+import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.StateEventResponseCommand;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
@@ -81,7 +82,13 @@ public class StateEventResponseService {
List<StateEvent> remainEvents = new ArrayList<>(eventQueue.size());
eventQueue.drainTo(remainEvents);
for (StateEvent event : remainEvents) {
- this.persist(event);
+ try {
+ LoggerUtils.setWorkflowAndTaskInstanceIDMDC(event.getProcessInstanceId(), event.getTaskInstanceId());
+ this.persist(event);
+
+ } finally {
+ LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
+ }
}
}
}
@@ -93,7 +100,7 @@ public class StateEventResponseService {
try {
eventQueue.put(stateEvent);
} catch (InterruptedException e) {
- logger.error("put state event : {} error :{}", stateEvent, e);
+ logger.error("Put state event : {} error", stateEvent, e);
Thread.currentThread().interrupt();
}
}
@@ -109,18 +116,22 @@ public class StateEventResponseService {
@Override
public void run() {
-
+ logger.info("State event loop service started");
while (Stopper.isRunning()) {
try {
// if not task , blocking here
StateEvent stateEvent = eventQueue.take();
+ LoggerUtils.setWorkflowAndTaskInstanceIDMDC(stateEvent.getProcessInstanceId(), stateEvent.getTaskInstanceId());
persist(stateEvent);
} catch (InterruptedException e) {
+ logger.warn("State event loop service interrupted, will stop this loop", e);
Thread.currentThread().interrupt();
break;
+ } finally {
+ LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
}
}
- logger.info("StateEventResponseWorker stopped");
+ logger.info("State event loop service stopped");
}
}
@@ -135,6 +146,8 @@ public class StateEventResponseService {
private void persist(StateEvent stateEvent) {
try {
if (!this.processInstanceExecCacheManager.contains(stateEvent.getProcessInstanceId())) {
+ logger.warn("Persist event into workflow execute thread error, "
+ + "cannot find the workflow instance from cache manager, event: {}", stateEvent);
writeResponse(stateEvent, ExecutionStatus.FAILURE);
return;
}
@@ -152,7 +165,7 @@ public class StateEventResponseService {
workflowExecuteThreadPool.submitStateEvent(stateEvent);
writeResponse(stateEvent, ExecutionStatus.SUCCESS);
} catch (Exception e) {
- logger.error("persist event queue error, event: {}", stateEvent, e);
+ logger.error("Persist event queue error, event: {}", stateEvent, e);
}
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteRunnable.java
index 593567c6eb..3bf249ed3e 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteRunnable.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteRunnable.java
@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.master.processor.queue;
import org.apache.dolphinscheduler.common.enums.Event;
import org.apache.dolphinscheduler.common.enums.StateEvent;
import org.apache.dolphinscheduler.common.enums.StateEventType;
+import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseAckCommand;
@@ -71,11 +72,13 @@ public class TaskExecuteRunnable implements Runnable {
while (!this.events.isEmpty()) {
TaskEvent event = this.events.peek();
try {
+ LoggerUtils.setWorkflowAndTaskInstanceIDMDC(event.getProcessInstanceId(), event.getTaskInstanceId());
persist(event);
} catch (Exception e) {
logger.error("persist error, event:{}, error: {}", event, e);
} finally {
this.events.remove(event);
+ LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
}
}
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java
index 323ea86411..68e4511c42 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java
@@ -115,11 +115,11 @@ public class TaskExecuteThreadPool extends ThreadPoolTaskExecutor {
@Override
public void onFailure(Throwable ex) {
Integer processInstanceId = taskExecuteThread.getProcessInstanceId();
- logger.error("persist event failed processInstanceId: {}", processInstanceId, ex);
+ logger.error("[WorkflowInstance-{}] persist event failed", processInstanceId, ex);
if (!processInstanceExecCacheManager.contains(processInstanceId)) {
taskExecuteThreadMap.remove(processInstanceId);
- logger.info("Cannot find processInstance from cacheManager, remove process instance from threadMap: {}",
- processInstanceId);
+ logger.info("[WorkflowInstance-{}] Cannot find processInstance from cacheManager, remove process instance from threadMap",
+ processInstanceId);
}
multiThreadFilterMap.remove(taskExecuteThread.getKey());
}
@@ -127,11 +127,11 @@ public class TaskExecuteThreadPool extends ThreadPoolTaskExecutor {
@Override
public void onSuccess(Object result) {
Integer processInstanceId = taskExecuteThread.getProcessInstanceId();
- logger.info("persist events succeeded, processInstanceId: {}", processInstanceId);
+ logger.info("[WorkflowInstance-{}] persist events succeeded", processInstanceId);
if (!processInstanceExecCacheManager.contains(processInstanceId)) {
taskExecuteThreadMap.remove(processInstanceId);
- logger.info("Cannot find processInstance from cacheManager, remove process instance from threadMap: {}",
- processInstanceId);
+ logger.info("[WorkflowInstance-{}] Cannot find processInstance from cacheManager, remove process instance from threadMap",
+ processInstanceId);
}
multiThreadFilterMap.remove(taskExecuteThread.getKey());
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
index 5cf1c945af..771bac4a7a 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
@@ -99,7 +99,6 @@ public class MasterRegistryClient {
registryClient));
registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_NODE, new MasterRegistryDataListener());
} catch (Exception e) {
- logger.error("master start up exception", e);
throw new RegistryException("Master registry client start up error", e);
}
}
@@ -186,7 +185,7 @@ public class MasterRegistryClient {
* Registry the current master server itself to registry.
*/
void registry() {
- logger.info("master node : {} registering to registry center...", masterAddress);
+ logger.info("Master node : {} registering to registry center", masterAddress);
String localNodePath = getCurrentNodePath();
int masterHeartbeatInterval = masterConfig.getHeartbeatInterval();
HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime,
@@ -201,7 +200,7 @@ public class MasterRegistryClient {
registryClient.persistEphemeral(localNodePath, heartBeatTask.getHeartBeatInfo());
while (!registryClient.checkNodeExists(NetUtils.getHost(), NodeType.MASTER)) {
- logger.warn("The current master server node:{} cannot find in registry....", NetUtils.getHost());
+ logger.warn("The current master server node:{} cannot find in registry", NetUtils.getHost());
ThreadUtils.sleep(SLEEP_TIME_MILLIS);
}
@@ -212,9 +211,7 @@ public class MasterRegistryClient {
registryClient.handleDeadServer(Collections.singleton(localNodePath), NodeType.MASTER, Constants.DELETE_OP);
this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, 0L, masterHeartbeatInterval, TimeUnit.SECONDS);
- logger.info("master node : {} registry to ZK successfully with heartBeatInterval : {}s",
- masterAddress,
- masterHeartbeatInterval);
+ logger.info("Master node : {} registered to registry center successfully with heartBeatInterval : {}s", masterAddress, masterHeartbeatInterval);
}
@@ -223,12 +220,12 @@ public class MasterRegistryClient {
String address = getLocalAddress();
String localNodePath = getCurrentNodePath();
registryClient.remove(localNodePath);
- logger.info("master node : {} unRegistry to register center.", address);
+ logger.info("Master node : {} unRegistry to register center.", address);
heartBeatExecutor.shutdown();
- logger.info("heartbeat executor shutdown");
+ logger.info("MasterServer heartbeat executor shutdown");
registryClient.close();
} catch (Exception e) {
- logger.error("remove registry path exception ", e);
+ logger.error("MasterServer remove registry path exception ", e);
}
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java
index 97c67d8493..3ce7d4e240 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java
@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.master.runner;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.thread.Stopper;
+import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import java.util.concurrent.TimeUnit;
@@ -49,25 +50,36 @@ public class EventExecuteService extends BaseDaemonThread {
@Override
public synchronized void start() {
+ logger.info("Master Event execute service starting");
super.start();
+ logger.info("Master Event execute service started");
}
@Override
public void run() {
- logger.info("Event service started");
while (Stopper.isRunning()) {
try {
eventHandler();
TimeUnit.MILLISECONDS.sleep(Constants.SLEEP_TIME_MILLIS_SHORT);
+ } catch (InterruptedException interruptedException) {
+ logger.warn("Master event service interrupted, will exit this loop", interruptedException);
+ Thread.currentThread().interrupt();
+ break;
} catch (Exception e) {
- logger.error("Event service thread error", e);
+ logger.error("Master event execute service error", e);
}
}
}
private void eventHandler() {
for (WorkflowExecuteRunnable workflowExecuteThread : this.processInstanceExecCacheManager.getAll()) {
- workflowExecuteThreadPool.executeEvent(workflowExecuteThread);
+ try {
+ LoggerUtils.setWorkflowInstanceIdMDC(workflowExecuteThread.getProcessInstance().getId());
+ workflowExecuteThreadPool.executeEvent(workflowExecuteThread);
+
+ } finally {
+ LoggerUtils.removeWorkflowInstanceIdMDC();
+ }
}
}
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
index 1c6ea144e4..d6f3937f4b 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
@@ -49,7 +49,9 @@ public class FailoverExecuteThread extends BaseDaemonThread {
@Override
public synchronized void start() {
+ logger.info("Master failover thread staring");
super.start();
+ logger.info("Master failover thread stared");
}
@Override
@@ -57,14 +59,13 @@ public class FailoverExecuteThread extends BaseDaemonThread {
// when startup, wait 10s for ready
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS * 10);
- logger.info("failover execute thread started");
while (Stopper.isRunning()) {
try {
// todo: DO we need to schedule a task to do this kind of check
// This kind of check may only need to be executed when a master server start
failoverService.checkMasterFailover();
} catch (Exception e) {
- logger.error("failover execute error", e);
+ logger.error("Master failover thread execute error", e);
} finally {
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS * masterConfig.getFailoverInterval() * 60);
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
index 94991a50c3..023d871355 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
@@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.common.enums.SlotCheckState;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
+import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
@@ -111,20 +112,23 @@ public class MasterSchedulerService extends BaseDaemonThread {
* constructor of MasterSchedulerService
*/
public void init() {
- this.masterPrepareExecService = (ThreadPoolExecutor) ThreadUtils.newDaemonFixedThreadExecutor("Master-Pre-Exec-Thread", masterConfig.getPreExecThreads());
+ this.masterPrepareExecService = (ThreadPoolExecutor) ThreadUtils.newDaemonFixedThreadExecutor("MasterPreExecThread", masterConfig.getPreExecThreads());
NettyClientConfig clientConfig = new NettyClientConfig();
this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
}
@Override
public synchronized void start() {
+ logger.info("Master schedule service starting..");
this.stateWheelExecuteThread.start();
super.start();
+ logger.info("Master schedule service started...");
}
public void close() {
+ logger.info("Master schedule service stopping...");
nettyRemotingClient.close();
- logger.info("master schedule service stopped...");
+ logger.info("Master schedule service stopped...");
}
/**
@@ -132,7 +136,6 @@ public class MasterSchedulerService extends BaseDaemonThread {
*/
@Override
public void run() {
- logger.info("master scheduler started");
while (Stopper.isRunning()) {
try {
boolean runCheckFlag = OSUtils.checkResource(masterConfig.getMaxCpuLoadAvg(), masterConfig.getReservedMemory());
@@ -142,8 +145,12 @@ public class MasterSchedulerService extends BaseDaemonThread {
continue;
}
scheduleProcess();
+ } catch (InterruptedException interruptedException) {
+ logger.warn("Master schedule service interrupted, close the loop", interruptedException);
+ Thread.currentThread().interrupt();
+ break;
} catch (Exception e) {
- logger.error("master scheduler thread error", e);
+ logger.error("Master schedule service loop command error", e);
}
}
}
@@ -152,7 +159,7 @@ public class MasterSchedulerService extends BaseDaemonThread {
* 1. get command by slot
* 2. donot handle command if slot is empty
*/
- private void scheduleProcess() throws Exception {
+ private void scheduleProcess() throws InterruptedException {
List<Command> commands = findCommands();
if (CollectionUtils.isEmpty(commands)) {
//indicate that no command ,sleep for 1s
@@ -167,8 +174,10 @@ public class MasterSchedulerService extends BaseDaemonThread {
MasterServerMetrics.incMasterConsumeCommand(commands.size());
for (ProcessInstance processInstance : processInstances) {
-
- WorkflowExecuteRunnable workflowExecuteRunnable = new WorkflowExecuteRunnable(
+ try {
+ LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
+ logger.info("Master schedule service starting workflow instance");
+ WorkflowExecuteRunnable workflowExecuteRunnable = new WorkflowExecuteRunnable(
processInstance
, processService
, nettyExecutorManager
@@ -176,15 +185,21 @@ public class MasterSchedulerService extends BaseDaemonThread {
, masterConfig
, stateWheelExecuteThread);
- this.processInstanceExecCacheManager.cache(processInstance.getId(), workflowExecuteRunnable);
- if (processInstance.getTimeout() > 0) {
- stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance);
+ this.processInstanceExecCacheManager.cache(processInstance.getId(), workflowExecuteRunnable);
+ if (processInstance.getTimeout() > 0) {
+ stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance);
+ }
+ workflowExecuteThreadPool.startWorkflow(workflowExecuteRunnable);
+ logger.info("Master schedule service started workflow instance");
+
+ } finally {
+ LoggerUtils.removeWorkflowInstanceIdMDC();
}
- workflowExecuteThreadPool.startWorkflow(workflowExecuteRunnable);
}
}
- private List<ProcessInstance> command2ProcessInstance(List<Command> commands) {
+ private List<ProcessInstance> command2ProcessInstance(List<Command> commands) throws InterruptedException {
+ logger.info("Master schedule service transforming command to ProcessInstance, commandSize: {}", commands.size());
List<ProcessInstance> processInstances = Collections.synchronizedList(new ArrayList<>(commands.size()));
CountDownLatch latch = new CountDownLatch(commands.size());
for (final Command command : commands) {
@@ -193,7 +208,7 @@ public class MasterSchedulerService extends BaseDaemonThread {
// slot check again
SlotCheckState slotCheckState = slotCheck(command);
if (slotCheckState.equals(SlotCheckState.CHANGE) || slotCheckState.equals(SlotCheckState.INJECT)) {
- logger.info("handle command {} skip, slot check state: {}", command.getId(), slotCheckState);
+ logger.info("Master handle command {} skip, slot check state: {}", command.getId(), slotCheckState);
return;
}
ProcessInstance processInstance = processService.handleCommand(logger,
@@ -201,10 +216,10 @@ public class MasterSchedulerService extends BaseDaemonThread {
command);
if (processInstance != null) {
processInstances.add(processInstance);
- logger.info("handle command {} end, create process instance {}", command.getId(), processInstance.getId());
+ logger.info("Master handle command {} end, create process instance {}", command.getId(), processInstance.getId());
}
} catch (Exception e) {
- logger.error("handle command {} error ", command.getId(), e);
+ logger.error("Master handle command {} error ", command.getId(), e);
processService.moveToErrorCommand(command, e.toString());
} finally {
latch.countDown();
@@ -212,13 +227,10 @@ public class MasterSchedulerService extends BaseDaemonThread {
});
}
- try {
- // make sure to finish handling command each time before next scan
- latch.await();
- } catch (InterruptedException e) {
- logger.error("countDownLatch await error ", e);
- }
-
+ // make sure to finish handling command each time before next scan
+ latch.await();
+ logger.info("Master schedule service transformed command to ProcessInstance, commandSize: {}, processInstanceSize: {}",
+ commands.size(), processInstances.size());
return processInstances;
}
@@ -231,6 +243,10 @@ public class MasterSchedulerService extends BaseDaemonThread {
int masterCount = ServerNodeManager.getMasterSize();
if (masterCount > 0) {
result = processService.findCommandPageBySlot(pageSize, pageNumber, masterCount, thisMasterSlot);
+ if (CollectionUtils.isNotEmpty(result)) {
+ logger.info("Master schedule service loop command success, command size: {}, current slot: {}, total slot size: {}",
+ result.size(), thisMasterSlot, masterCount);
+ }
}
}
return result;
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
index 12d404406c..53acb4ce83 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
@@ -24,6 +24,7 @@ import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.DateUtils;
+import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
@@ -43,6 +44,8 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
+import lombok.NonNull;
+
/**
* Check thread
* 1. timeout task check
@@ -110,10 +113,16 @@ public class StateWheelExecuteThread extends BaseDaemonThread {
public void addProcess4TimeoutCheck(ProcessInstance processInstance) {
processInstanceTimeoutCheckList.add(processInstance.getId());
+ logger.info("Success add workflow instance into timeout check list");
}
public void removeProcess4TimeoutCheck(ProcessInstance processInstance) {
- processInstanceTimeoutCheckList.remove(processInstance.getId());
+ boolean removeFlag = processInstanceTimeoutCheckList.remove(processInstance.getId());
+ if (removeFlag) {
+ logger.info("Success remove workflow instance from timeout check list");
+ } else {
+ logger.warn("Failed to remove workflow instance from timeout check list");
+ }
}
private void checkProcess4Timeout() {
@@ -121,106 +130,95 @@ public class StateWheelExecuteThread extends BaseDaemonThread {
return;
}
for (Integer processInstanceId : processInstanceTimeoutCheckList) {
- if (processInstanceId == null) {
- continue;
- }
WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
if (workflowExecuteThread == null) {
- logger.warn("can not find workflowExecuteThread, this check event will remove, processInstanceId:{}", processInstanceId);
+ logger.warn("Check workflow timeout failed, can not find workflowExecuteThread from cache manager, will remove this workflowInstance from check list");
processInstanceTimeoutCheckList.remove(processInstanceId);
continue;
}
ProcessInstance processInstance = workflowExecuteThread.getProcessInstance();
if (processInstance == null) {
+ logger.warn("Check workflow timeout failed, the workflowInstance is null");
continue;
}
long timeRemain = DateUtils.getRemainTime(processInstance.getStartTime(), (long) processInstance.getTimeout() * Constants.SEC_2_MINUTES_TIME_UNIT);
if (timeRemain < 0) {
+ logger.info("Workflow instance timeout, adding timeout event");
addProcessTimeoutEvent(processInstance);
processInstanceTimeoutCheckList.remove(processInstance.getId());
+ logger.info("Workflow instance timeout, added timeout event");
}
}
}
- public void addTask4TimeoutCheck(ProcessInstance processInstance, TaskInstance taskInstance) {
+ public void addTask4TimeoutCheck(@NonNull ProcessInstance processInstance, @NonNull TaskInstance taskInstance) {
TaskInstanceKey taskInstanceKey = TaskInstanceKey.getTaskInstanceKey(processInstance, taskInstance);
- if (taskInstanceKey == null) {
- logger.error("taskInstanceKey is null");
- return;
- }
+ logger.info("Adding task instance into timeout check list");
if (taskInstanceTimeoutCheckList.contains(taskInstanceKey)) {
+ logger.warn("Task instance is already in timeout check list");
return;
}
TaskDefinition taskDefinition = taskInstance.getTaskDefine();
if (taskDefinition == null) {
- logger.error("taskDefinition is null, taskId:{}", taskInstance.getId());
+ logger.error("Failed to add task instance into timeout check list, taskDefinition is null");
return;
}
if (TimeoutFlag.OPEN == taskDefinition.getTimeoutFlag()) {
taskInstanceTimeoutCheckList.add(taskInstanceKey);
+ logger.info("Timeout flag is open, added task instance into timeout check list");
}
if (taskInstance.isDependTask() || taskInstance.isSubProcess()) {
taskInstanceTimeoutCheckList.add(taskInstanceKey);
+ logger.info("task instance is dependTask orSubProcess, added task instance into timeout check list");
}
}
- public void removeTask4TimeoutCheck(ProcessInstance processInstance, TaskInstance taskInstance) {
+ public void removeTask4TimeoutCheck(@NonNull ProcessInstance processInstance, @NonNull TaskInstance taskInstance) {
TaskInstanceKey taskInstanceKey = TaskInstanceKey.getTaskInstanceKey(processInstance, taskInstance);
- if (taskInstanceKey == null) {
- logger.error("taskInstanceKey is null");
- return;
- }
taskInstanceTimeoutCheckList.remove(taskInstanceKey);
+ logger.info("remove task instance from timeout check list");
}
- public void addTask4RetryCheck(ProcessInstance processInstance, TaskInstance taskInstance) {
+ public void addTask4RetryCheck(@NonNull ProcessInstance processInstance, @NonNull TaskInstance taskInstance) {
+ logger.info("Adding task instance into retry check list");
TaskInstanceKey taskInstanceKey = TaskInstanceKey.getTaskInstanceKey(processInstance, taskInstance);
- if (taskInstanceKey == null) {
- logger.error("taskInstanceKey is null");
- return;
- }
if (taskInstanceRetryCheckList.contains(taskInstanceKey)) {
+ logger.warn("Task instance is already in retry check list");
return;
}
TaskDefinition taskDefinition = taskInstance.getTaskDefine();
if (taskDefinition == null) {
- logger.error("taskDefinition is null, taskId:{}", taskInstance.getId());
+ logger.error("Add task instance into retry check list error, taskDefinition is null");
return;
}
- logger.debug("addTask4RetryCheck, taskCode:{}, processInstanceId:{}", taskInstance.getTaskCode(), taskInstance.getProcessInstanceId());
taskInstanceRetryCheckList.add(taskInstanceKey);
+ logger.info("[WorkflowInstance-{}][TaskInstance-{}] Added task instance into retry check list",
+ processInstance.getId(), taskInstance.getId());
}
- public void removeTask4RetryCheck(ProcessInstance processInstance, TaskInstance taskInstance) {
+ public void removeTask4RetryCheck(@NonNull ProcessInstance processInstance, @NonNull TaskInstance taskInstance) {
TaskInstanceKey taskInstanceKey = TaskInstanceKey.getTaskInstanceKey(processInstance, taskInstance);
- if (taskInstanceKey == null) {
- logger.error("taskInstanceKey is null");
- return;
- }
taskInstanceRetryCheckList.remove(taskInstanceKey);
+ logger.info("remove task instance from retry check list");
}
- public void addTask4StateCheck(ProcessInstance processInstance, TaskInstance taskInstance) {
+ public void addTask4StateCheck(@NonNull ProcessInstance processInstance, @NonNull TaskInstance taskInstance) {
+ logger.info("Adding task instance into state check list");
TaskInstanceKey taskInstanceKey = TaskInstanceKey.getTaskInstanceKey(processInstance, taskInstance);
- if (taskInstanceKey == null) {
- logger.error("taskInstanceKey is null");
- return;
- }
if (taskInstanceStateCheckList.contains(taskInstanceKey)) {
+ logger.warn("Task instance is already in state check list");
return;
}
if (taskInstance.isDependTask() || taskInstance.isSubProcess()) {
taskInstanceStateCheckList.add(taskInstanceKey);
+ logger.info("Added task instance into state check list");
}
}
- public void removeTask4StateCheck(ProcessInstance processInstance, TaskInstance taskInstance) {
+ public void removeTask4StateCheck(@NonNull ProcessInstance processInstance, @NonNull TaskInstance taskInstance) {
TaskInstanceKey taskInstanceKey = TaskInstanceKey.getTaskInstanceKey(processInstance, taskInstance);
- if (taskInstanceKey == null) {
- logger.error("taskInstanceKey is null");
- return;
- }
taskInstanceStateCheckList.remove(taskInstanceKey);
+ logger.info("Removed task instance from state check list");
}
private void checkTask4Timeout() {
@@ -228,30 +226,35 @@ public class StateWheelExecuteThread extends BaseDaemonThread {
return;
}
for (TaskInstanceKey taskInstanceKey : taskInstanceTimeoutCheckList) {
- int processInstanceId = taskInstanceKey.getProcessInstanceId();
- long taskCode = taskInstanceKey.getTaskCode();
+ try {
+ int processInstanceId = taskInstanceKey.getProcessInstanceId();
+ LoggerUtils.setWorkflowInstanceIdMDC(processInstanceId);
+ long taskCode = taskInstanceKey.getTaskCode();
- WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
- if (workflowExecuteThread == null) {
- logger.warn("can not find workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}",
- processInstanceId, taskCode);
- taskInstanceTimeoutCheckList.remove(taskInstanceKey);
- continue;
- }
- Optional<TaskInstance> taskInstanceOptional = workflowExecuteThread.getActiveTaskInstanceByTaskCode(taskCode);
- if (!taskInstanceOptional.isPresent()) {
- logger.warn("can not find taskInstance from workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}",
- processInstanceId, taskCode);
- taskInstanceTimeoutCheckList.remove(taskInstanceKey);
- continue;
- }
- TaskInstance taskInstance = taskInstanceOptional.get();
- if (TimeoutFlag.OPEN == taskInstance.getTaskDefine().getTimeoutFlag()) {
- long timeRemain = DateUtils.getRemainTime(taskInstance.getStartTime(), (long) taskInstance.getTaskDefine().getTimeout() * Constants.SEC_2_MINUTES_TIME_UNIT);
- if (timeRemain < 0) {
- addTaskTimeoutEvent(taskInstance);
+ WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
+ if (workflowExecuteThread == null) {
+ logger.warn("Check task instance timeout failed, can not find workflowExecuteThread from cache manager, will remove this check task");
+ taskInstanceTimeoutCheckList.remove(taskInstanceKey);
+ continue;
+ }
+ Optional<TaskInstance> taskInstanceOptional = workflowExecuteThread.getActiveTaskInstanceByTaskCode(taskCode);
+ if (!taskInstanceOptional.isPresent()) {
+ logger.warn("Check task instance timeout failed, can not get taskInstance from workflowExecuteThread, taskCode: {}"
+ + "will remove this check task", taskCode);
taskInstanceTimeoutCheckList.remove(taskInstanceKey);
+ continue;
+ }
+ TaskInstance taskInstance = taskInstanceOptional.get();
+ if (TimeoutFlag.OPEN == taskInstance.getTaskDefine().getTimeoutFlag()) {
+ long timeRemain = DateUtils.getRemainTime(taskInstance.getStartTime(), (long) taskInstance.getTaskDefine().getTimeout() * Constants.SEC_2_MINUTES_TIME_UNIT);
+ if (timeRemain < 0) {
+ logger.info("Task instance is timeout, adding task timeout event and remove the check");
+ addTaskTimeoutEvent(taskInstance);
+ taskInstanceTimeoutCheckList.remove(taskInstanceKey);
+ }
}
+ } finally {
+ LoggerUtils.removeWorkflowInstanceIdMDC();
}
}
}
@@ -264,41 +267,46 @@ public class StateWheelExecuteThread extends BaseDaemonThread {
for (TaskInstanceKey taskInstanceKey : taskInstanceRetryCheckList) {
int processInstanceId = taskInstanceKey.getProcessInstanceId();
long taskCode = taskInstanceKey.getTaskCode();
+ try {
+ LoggerUtils.setWorkflowInstanceIdMDC(processInstanceId);
- WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
+ WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
- if (workflowExecuteThread == null) {
- logger.warn("can not find workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}",
- processInstanceId, taskCode);
- taskInstanceRetryCheckList.remove(taskInstanceKey);
- continue;
- }
+ if (workflowExecuteThread == null) {
+ logger.warn("Task instance retry check failed, can not find workflowExecuteThread from cache manager, "
+ + "will remove this check task");
+ taskInstanceRetryCheckList.remove(taskInstanceKey);
+ continue;
+ }
- Optional<TaskInstance> taskInstanceOptional = workflowExecuteThread.getRetryTaskInstanceByTaskCode(taskCode);
- ProcessInstance processInstance = workflowExecuteThread.getProcessInstance();
+ Optional<TaskInstance> taskInstanceOptional = workflowExecuteThread.getRetryTaskInstanceByTaskCode(taskCode);
+ ProcessInstance processInstance = workflowExecuteThread.getProcessInstance();
- if (processInstance.getState() == ExecutionStatus.READY_STOP) {
- addProcessStopEvent(processInstance);
- taskInstanceRetryCheckList.remove(taskInstanceKey);
- break;
- }
+ if (processInstance.getState() == ExecutionStatus.READY_STOP) {
+ logger.warn("The process instance is ready to stop, will send process stop event and remove the check task");
+ addProcessStopEvent(processInstance);
+ taskInstanceRetryCheckList.remove(taskInstanceKey);
+ break;
+ }
- if (!taskInstanceOptional.isPresent()) {
- logger.warn("can not find taskInstance from workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}",
- processInstanceId, taskCode);
- taskInstanceRetryCheckList.remove(taskInstanceKey);
- continue;
- }
+ if (!taskInstanceOptional.isPresent()) {
+ logger.warn("Task instance retry check failed, can not find taskInstance from workflowExecuteThread, will remove this check");
+ taskInstanceRetryCheckList.remove(taskInstanceKey);
+ continue;
+ }
- TaskInstance taskInstance = taskInstanceOptional.get();
- if (taskInstance.retryTaskIntervalOverTime()) {
- // reset taskInstance endTime and state
- // todo relative funtion: TaskInstance.retryTaskIntervalOverTime, WorkflowExecuteThread.cloneRetryTaskInstance
- taskInstance.setEndTime(null);
- taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
+ TaskInstance taskInstance = taskInstanceOptional.get();
+ if (taskInstance.retryTaskIntervalOverTime()) {
+ // reset taskInstance endTime and state
+ // todo relative funtion: TaskInstance.retryTaskIntervalOverTime, WorkflowExecuteThread.cloneRetryTaskInstance
+ taskInstance.setEndTime(null);
+ taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
- addTaskRetryEvent(taskInstance);
- taskInstanceRetryCheckList.remove(taskInstanceKey);
+ addTaskRetryEvent(taskInstance);
+ taskInstanceRetryCheckList.remove(taskInstanceKey);
+ }
+ } finally {
+ LoggerUtils.removeWorkflowInstanceIdMDC();
}
}
}
@@ -311,25 +319,29 @@ public class StateWheelExecuteThread extends BaseDaemonThread {
int processInstanceId = taskInstanceKey.getProcessInstanceId();
long taskCode = taskInstanceKey.getTaskCode();
- WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
- if (workflowExecuteThread == null) {
- logger.warn("can not find workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}",
- processInstanceId, taskCode);
- taskInstanceStateCheckList.remove(taskInstanceKey);
- continue;
- }
- Optional<TaskInstance> taskInstanceOptional = workflowExecuteThread.getActiveTaskInstanceByTaskCode(taskCode);
- if (!taskInstanceOptional.isPresent()) {
- logger.warn("can not find taskInstance from workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}",
- processInstanceId, taskCode);
- taskInstanceStateCheckList.remove(taskInstanceKey);
- continue;
- }
- TaskInstance taskInstance = taskInstanceOptional.get();
- if (taskInstance.getState().typeIsFinished()) {
- continue;
+ try {
+ LoggerUtils.setTaskInstanceIdMDC(processInstanceId);
+ WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
+ if (workflowExecuteThread == null) {
+ logger.warn("Task instance state check failed, can not find workflowExecuteThread from cache manager, will remove this check task");
+ taskInstanceStateCheckList.remove(taskInstanceKey);
+ continue;
+ }
+ Optional<TaskInstance> taskInstanceOptional = workflowExecuteThread.getActiveTaskInstanceByTaskCode(taskCode);
+ if (!taskInstanceOptional.isPresent()) {
+ logger.warn(
+ "Task instance state check failed, can not find taskInstance from workflowExecuteThread, will remove this check event");
+ taskInstanceStateCheckList.remove(taskInstanceKey);
+ continue;
+ }
+ TaskInstance taskInstance = taskInstanceOptional.get();
+ if (taskInstance.getState().typeIsFinished()) {
+ continue;
+ }
+ addTaskStateChangeEvent(taskInstance);
+ } finally {
+ LoggerUtils.removeWorkflowInstanceIdMDC();
}
- addTaskStateChangeEvent(taskInstance);
}
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index 07e0f14c11..9c5f4062f7 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -43,6 +43,7 @@ import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
import org.apache.dolphinscheduler.common.process.ProcessDag;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
@@ -271,12 +272,16 @@ public class WorkflowExecuteRunnable implements Runnable {
while (!this.stateEvents.isEmpty()) {
try {
StateEvent stateEvent = this.stateEvents.peek();
+ LoggerUtils.setWorkflowAndTaskInstanceIDMDC(stateEvent.getProcessInstanceId(), stateEvent.getTaskInstanceId());
if (stateEventHandler(stateEvent)) {
this.stateEvents.remove(stateEvent);
}
} catch (Exception e) {
logger.error("state handle error:", e);
+ } finally {
+ LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
}
+
}
}
@@ -775,7 +780,16 @@ public class WorkflowExecuteRunnable implements Runnable {
if (cmdParam.containsKey(Constants.CMD_PARAM_RECOVERY_START_NODE_STRING)) {
cmdParam.remove(Constants.CMD_PARAM_RECOVERY_START_NODE_STRING);
}
- cmdParam.replace(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.format(scheduleDate, "yyyy-MM-dd HH:mm:ss", null));
+
+ if (cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST)) {
+ cmdParam.replace(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST,
+ cmdParam.get(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST)
+ .substring(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST).indexOf(COMMA) + 1));
+ }
+
+ if (cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_START_DATE)) {
+ cmdParam.replace(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.format(scheduleDate, YYYY_MM_DD_HH_MM_SS, null));
+ }
command.setCommandParam(JSONUtils.toJsonString(cmdParam));
command.setTaskDependType(processInstance.getTaskDependType());
command.setFailureStrategy(processInstance.getFailureStrategy());
@@ -962,8 +976,12 @@ public class WorkflowExecuteRunnable implements Runnable {
// reset global params while there are start parameters
setGlobalParamIfCommanded(processDefinition, cmdParam);
- Date start = DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE));
- Date end = DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE));
+ Date start = null;
+ Date end = null;
+ if(cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_START_DATE) && cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_END_DATE)){
+ start = DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE));
+ end = DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE));
+ }
List<Schedule> schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(processInstance.getProcessDefinitionCode());
if (complementListDate.isEmpty() && needComplementProcess()) {
complementListDate = CronUtils.getSelfFireDateList(start, end, schedules);
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
index 92ff7c04f1..031f8986ea 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.master.runner;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.StateEvent;
import org.apache.dolphinscheduler.common.enums.StateEventType;
+import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
@@ -48,6 +49,8 @@ import org.springframework.util.concurrent.ListenableFutureCallback;
import com.google.common.base.Strings;
+import lombok.NonNull;
+
/**
* Used to execute {@link WorkflowExecuteRunnable}, when
*/
@@ -79,7 +82,7 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
@PostConstruct
private void init() {
this.setDaemon(true);
- this.setThreadNamePrefix("Workflow-Execute-Thread-");
+ this.setThreadNamePrefix("WorkflowExecuteThread-");
this.setMaxPoolSize(masterConfig.getExecThreads());
this.setCorePoolSize(masterConfig.getExecThreads());
}
@@ -90,10 +93,11 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
public void submitStateEvent(StateEvent stateEvent) {
WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(stateEvent.getProcessInstanceId());
if (workflowExecuteThread == null) {
- logger.warn("workflowExecuteThread is null, stateEvent:{}", stateEvent);
+ logger.warn("Submit state event error, cannot from workflowExecuteThread from cache manager, stateEvent:{}", stateEvent);
return;
}
workflowExecuteThread.addStateEvent(stateEvent);
+ logger.info("Submit state event success, stateEvent: {}", stateEvent);
}
/**
@@ -112,7 +116,7 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
return;
}
if (multiThreadFilterMap.containsKey(workflowExecuteThread.getKey())) {
- logger.warn("The workflow:{} has been executed by another thread", workflowExecuteThread.getKey());
+ logger.warn("The workflow has been executed by another thread");
return;
}
multiThreadFilterMap.put(workflowExecuteThread.getKey(), workflowExecuteThread);
@@ -121,24 +125,31 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
future.addCallback(new ListenableFutureCallback() {
@Override
public void onFailure(Throwable ex) {
- logger.error("handle events {} failed", processInstanceId, ex);
- multiThreadFilterMap.remove(workflowExecuteThread.getKey());
+ LoggerUtils.setWorkflowInstanceIdMDC(processInstanceId);
+ try {
+ logger.error("Workflow instance events handle failed", ex);
+ multiThreadFilterMap.remove(workflowExecuteThread.getKey());
+ } finally {
+ LoggerUtils.removeWorkflowInstanceIdMDC();
+ }
}
@Override
public void onSuccess(Object result) {
try {
+ LoggerUtils.setWorkflowInstanceIdMDC(workflowExecuteThread.getProcessInstance().getId());
if (workflowExecuteThread.workFlowFinish()) {
stateWheelExecuteThread.removeProcess4TimeoutCheck(workflowExecuteThread.getProcessInstance());
processInstanceExecCacheManager.removeByProcessInstanceId(processInstanceId);
notifyProcessChanged(workflowExecuteThread.getProcessInstance());
- logger.info("process instance {} finished.", processInstanceId);
+ logger.info("Workflow instance is finished.");
}
} catch (Exception e) {
- logger.error("handle events {} success, but notify changed error", processInstanceId, e);
+ logger.error("Workflow instance is finished, but notify changed error", e);
} finally {
// make sure the process has been removed from multiThreadFilterMap
multiThreadFilterMap.remove(workflowExecuteThread.getKey());
+ LoggerUtils.removeWorkflowInstanceIdMDC();
}
}
});
@@ -167,9 +178,9 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
/**
* notify myself
*/
- private void notifyMyself(ProcessInstance processInstance, TaskInstance taskInstance) {
- logger.info("notify process {} task {} state change", processInstance.getId(), taskInstance.getId());
+ private void notifyMyself(@NonNull ProcessInstance processInstance, @NonNull TaskInstance taskInstance) {
if (!processInstanceExecCacheManager.contains(processInstance.getId())) {
+ logger.warn("The execute cache manager doesn't contains this workflow instance");
return;
}
StateEvent stateEvent = new StateEvent();
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java
index 41c2bd56d3..4c732a0325 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java
@@ -30,9 +30,12 @@ import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import lombok.experimental.UtilityClass;
+
/**
* the factory to create task processor
*/
+@UtilityClass
public final class TaskProcessorFactory {
private static final Logger logger = LoggerFactory.getLogger(TaskProcessorFactory.class);
@@ -46,7 +49,7 @@ public final class TaskProcessorFactory {
try {
PROCESS_MAP.put(iTaskProcessor.getType(), (Constructor<ITaskProcessor>) iTaskProcessor.getClass().getConstructor());
} catch (NoSuchMethodException e) {
- throw new IllegalArgumentException("The task processor should has a no args constructor");
+ throw new IllegalArgumentException("The task processor should has a no args constructor", e);
}
}
}
@@ -57,7 +60,6 @@ public final class TaskProcessorFactory {
}
Constructor<ITaskProcessor> iTaskProcessorConstructor = PROCESS_MAP.get(type);
if (iTaskProcessorConstructor == null) {
- logger.warn("ITaskProcessor could not found for taskType: {}", type);
iTaskProcessorConstructor = PROCESS_MAP.get(DEFAULT_PROCESSOR);
}
@@ -74,7 +76,4 @@ public final class TaskProcessorFactory {
return PROCESS_MAP.containsKey(type);
}
- private TaskProcessorFactory() {
- throw new UnsupportedOperationException("TaskProcessorFactory cannot be instantiated");
- }
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java
index 3cc92fb905..94f33ccd83 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java
@@ -17,8 +17,7 @@
package org.apache.dolphinscheduler.server.master.service;
-import io.micrometer.core.annotation.Counted;
-import io.micrometer.core.annotation.Timed;
+import static com.google.common.base.Preconditions.checkNotNull;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.NodeType;
@@ -55,6 +54,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
+import io.micrometer.core.annotation.Counted;
+import io.micrometer.core.annotation.Timed;
+
/**
* failover service
*/
@@ -66,12 +68,14 @@ public class FailoverService {
private final ProcessService processService;
private final WorkflowExecuteThreadPool workflowExecuteThreadPool;
- public FailoverService(RegistryClient registryClient, MasterConfig masterConfig, ProcessService processService,
+ public FailoverService(RegistryClient registryClient,
+ MasterConfig masterConfig,
+ ProcessService processService,
WorkflowExecuteThreadPool workflowExecuteThreadPool) {
- this.registryClient = registryClient;
- this.masterConfig = masterConfig;
- this.processService = processService;
- this.workflowExecuteThreadPool = workflowExecuteThreadPool;
+ this.registryClient = checkNotNull(registryClient);
+ this.masterConfig = checkNotNull(masterConfig);
+ this.processService = checkNotNull(processService);
+ this.workflowExecuteThreadPool = checkNotNull(workflowExecuteThreadPool);
}
/**
@@ -84,7 +88,7 @@ public class FailoverService {
if (CollectionUtils.isEmpty(hosts)) {
return;
}
- LOGGER.info("{} begin to failover hosts:{}", getLocalAddress(), hosts);
+ LOGGER.info("Master failover service {} begin to failover hosts:{}", getLocalAddress(), hosts);
for (String host : hosts) {
failoverMasterWithLock(host);
@@ -274,7 +278,7 @@ public class FailoverService {
while (iterator.hasNext()) {
String host = iterator.next();
if (registryClient.checkNodeExists(host, NodeType.MASTER)) {
- if (!host.equals(getLocalAddress())) {
+ if (!getLocalAddress().equals(host)) {
iterator.remove();
}
}
@@ -294,7 +298,7 @@ public class FailoverService {
boolean taskNeedFailover = true;
if (taskInstance == null) {
- LOGGER.error("failover task instance error, taskInstance is null");
+ LOGGER.error("Master failover task instance error, taskInstance is null");
return false;
}
diff --git a/dolphinscheduler-master/src/main/resources/logback-spring.xml b/dolphinscheduler-master/src/main/resources/logback-spring.xml
index 2e9cb45ada..f1a2c1d51b 100644
--- a/dolphinscheduler-master/src/main/resources/logback-spring.xml
+++ b/dolphinscheduler-master/src/main/resources/logback-spring.xml
@@ -21,7 +21,7 @@
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>
- [%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} %logger{96}:[%line] - %msg%n
+ [%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} %logger{96}:[%line] - [WorkflowInstance-%X{workflowInstanceId:-0}][TaskInstance-%X{taskInstanceId:-0}] - %msg%n
</pattern>
<charset>UTF-8</charset>
</encoder>
@@ -57,7 +57,7 @@
</rollingPolicy>
<encoder>
<pattern>
- [%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} %logger{96}:[%line] - %msg%n
+ [%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} %logger{96}:[%line] - [WorkflowInstance-%X{workflowInstanceId:-0}][TaskInstance-%X{taskInstanceId:-0}] - %msg%n
</pattern>
<charset>UTF-8</charset>
</encoder>
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/AlertClientService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/AlertClientService.java
index f5d7f935b3..ce6ad57035 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/AlertClientService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/AlertClientService.java
@@ -25,6 +25,8 @@ import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.remote.utils.JsonSerializer;
+import java.util.concurrent.atomic.AtomicBoolean;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,7 +38,7 @@ public class AlertClientService implements AutoCloseable {
private final NettyRemotingClient client;
- private volatile boolean isRunning;
+ private final AtomicBoolean isRunning;
private String host;
@@ -53,16 +55,14 @@ public class AlertClientService implements AutoCloseable {
public AlertClientService() {
this.clientConfig = new NettyClientConfig();
this.client = new NettyRemotingClient(clientConfig);
- this.isRunning = true;
+ this.isRunning = new AtomicBoolean(true);
}
/**
* alert client
*/
public AlertClientService(String host, int port) {
- this.clientConfig = new NettyClientConfig();
- this.client = new NettyRemotingClient(clientConfig);
- this.isRunning = true;
+ this();
this.host = host;
this.port = port;
}
@@ -72,9 +72,14 @@ public class AlertClientService implements AutoCloseable {
*/
@Override
public void close() {
+ if (isRunning.compareAndSet(true, false)) {
+ logger.warn("Alert client is already closed");
+ return;
+ }
+
+ logger.info("Alter client closing");
this.client.close();
- this.isRunning = false;
- logger.info("alter client closed");
+ logger.info("Alter client closed");
}
/**
@@ -116,6 +121,6 @@ public class AlertClientService implements AutoCloseable {
}
public boolean isRunning() {
- return isRunning;
+ return isRunning.get();
}
}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/task/TaskPluginManager.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/task/TaskPluginManager.java
index 7dffe9c204..289e8ddbfb 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/task/TaskPluginManager.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/task/TaskPluginManager.java
@@ -24,6 +24,7 @@ import org.apache.dolphinscheduler.dao.PluginDao;
import org.apache.dolphinscheduler.dao.entity.PluginDefine;
import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
import org.apache.dolphinscheduler.plugin.task.api.TaskChannelFactory;
+import org.apache.dolphinscheduler.plugin.task.api.TaskPluginException;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode;
import org.apache.dolphinscheduler.spi.params.PluginParamsTransfer;
@@ -93,7 +94,7 @@ public class TaskPluginManager {
logger.info("Registering task plugin: {}", name);
if (!names.add(name)) {
- throw new IllegalStateException(format("Duplicate task plugins named '%s'", name));
+ throw new TaskPluginException(format("Duplicate task plugins named '%s'", name));
}
loadTaskChannel(factory);
@@ -106,7 +107,7 @@ public class TaskPluginManager {
PluginDefine pluginDefine = new PluginDefine(name, PluginType.TASK.getDesc(), paramsJson);
int count = pluginDao.addOrUpdatePluginDefine(pluginDefine);
if (count <= 0) {
- throw new RuntimeException("Failed to update task plugin: " + name);
+ throw new TaskPluginException("Failed to update task plugin: " + name);
}
});
}
diff --git a/dolphinscheduler-standalone-server/src/main/resources/logback-spring.xml b/dolphinscheduler-standalone-server/src/main/resources/logback-spring.xml
index d6ef76fa9e..0cdeabde5e 100644
--- a/dolphinscheduler-standalone-server/src/main/resources/logback-spring.xml
+++ b/dolphinscheduler-standalone-server/src/main/resources/logback-spring.xml
@@ -22,7 +22,7 @@
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>
- [%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} %logger{96}:[%line] - %msg%n
+ [%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} %logger{96}:[%line] - [WorkflowInstance-%X{workflowInstanceId:-0}][TaskInstance-%X{taskInstanceId:-0}] - %msg%n
</pattern>
<charset>UTF-8</charset>
</encoder>
@@ -63,7 +63,7 @@
<file>${log.base}/${taskAppId}.log</file>
<encoder>
<pattern>
- [%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} [%thread] %logger{96}:[%line] - %messsage%n
+ [%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} [%thread] %logger{96}:[%line] - [WorkflowInstance-%X{workflowInstanceId:-0}][TaskInstance-%X{taskInstanceId:-0}] - %messsage%n
</pattern>
<charset>UTF-8</charset>
</encoder>
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/ProcessUtils.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/ProcessUtils.java
index d052075988..528dfa4058 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/ProcessUtils.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/ProcessUtils.java
@@ -25,6 +25,8 @@ import java.util.regex.Pattern;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import lombok.NonNull;
+
public final class ProcessUtils {
private static final Logger logger = LoggerFactory.getLogger(ProcessUtils.class);
@@ -46,13 +48,13 @@ public final class ProcessUtils {
/**
* kill tasks according to different task types.
*/
- public static void kill(TaskExecutionContext request) {
+ public static boolean kill(@NonNull TaskExecutionContext request) {
try {
+ logger.info("Begin kill task instance, processId: {}", request.getProcessId());
int processId = request.getProcessId();
if (processId == 0) {
- logger.error("process kill failed, process id :{}, task id:{}",
- processId, request.getTaskInstanceId());
- return;
+ logger.error("Task instance kill failed, processId is not exist");
+ return false;
}
String cmd = String.format("kill -9 %s", getPidsStr(processId));
@@ -60,8 +62,11 @@ public final class ProcessUtils {
logger.info("process id:{}, cmd:{}", processId, cmd);
OSUtils.exeCmd(cmd);
+ logger.info("Success kill task instance, processId: {}", request.getProcessId());
+ return true;
} catch (Exception e) {
- logger.error("kill task failed", e);
+ logger.error("Kill task instance error, processId: {}", request.getProcessId(), e);
+ return false;
}
}
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/Stopper.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskPluginException.java
similarity index 60%
copy from dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/Stopper.java
copy to dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskPluginException.java
index 67abde7e7a..246835260b 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/Stopper.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskPluginException.java
@@ -15,26 +15,11 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.common.thread;
+package org.apache.dolphinscheduler.plugin.task.api;
-import java.util.concurrent.atomic.AtomicBoolean;
+public class TaskPluginException extends RuntimeException {
-/**
- * if the process closes, a signal is placed as true, and all threads get this flag to stop working
- */
-public class Stopper {
-
- private static AtomicBoolean signal = new AtomicBoolean(false);
-
- public static final boolean isStopped() {
- return signal.get();
- }
-
- public static final boolean isRunning() {
- return !signal.get();
- }
-
- public static final void stop() {
- signal.set(true);
+ public TaskPluginException(String message) {
+ super(message);
}
}
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
index 8ac528dee4..93dc6309db 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
@@ -21,18 +21,11 @@ import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.thread.Stopper;
+import org.apache.dolphinscheduler.common.utils.LoggerUtils;
+import org.apache.dolphinscheduler.plugin.task.api.ProcessUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
-import org.apache.dolphinscheduler.remote.NettyRemotingServer;
-import org.apache.dolphinscheduler.remote.command.CommandType;
-import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
-import org.apache.dolphinscheduler.server.log.LoggerRequestProcessor;
-import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
-import org.apache.dolphinscheduler.server.worker.processor.HostUpdateProcessor;
-import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor;
-import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteResponseAckProcessor;
-import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteRunningAckProcessor;
-import org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor;
+import org.apache.dolphinscheduler.server.worker.prc.WorkerRpcServer;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient;
import org.apache.dolphinscheduler.server.worker.runner.RetryReportTaskStatusThread;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
@@ -65,17 +58,6 @@ public class WorkerServer implements IStoppable {
*/
private static final Logger logger = LoggerFactory.getLogger(WorkerServer.class);
- /**
- * netty remote server
- */
- private NettyRemotingServer nettyRemotingServer;
-
- /**
- * worker config
- */
- @Autowired
- private WorkerConfig workerConfig;
-
/**
* spring application context
* only use it for initialization
@@ -105,22 +87,7 @@ public class WorkerServer implements IStoppable {
private TaskPluginManager taskPluginManager;
@Autowired
- private TaskExecuteProcessor taskExecuteProcessor;
-
- @Autowired
- private TaskKillProcessor taskKillProcessor;
-
- @Autowired
- private TaskExecuteRunningAckProcessor taskExecuteRunningAckProcessor;
-
- @Autowired
- private TaskExecuteResponseAckProcessor taskExecuteResponseAckProcessor;
-
- @Autowired
- private HostUpdateProcessor hostUpdateProcessor;
-
- @Autowired
- private LoggerRequestProcessor loggerRequestProcessor;
+ private WorkerRpcServer workerRpcServer;
/**
* worker server startup, not use web service
@@ -132,48 +99,19 @@ public class WorkerServer implements IStoppable {
SpringApplication.run(WorkerServer.class);
}
- /**
- * worker server run
- */
@PostConstruct
public void run() {
- // init remoting server
- NettyServerConfig serverConfig = new NettyServerConfig();
- serverConfig.setListenPort(workerConfig.getListenPort());
- this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
- this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_REQUEST, taskExecuteProcessor);
- this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, taskKillProcessor);
- this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RUNNING_ACK, taskExecuteRunningAckProcessor);
- this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE_ACK, taskExecuteResponseAckProcessor);
- this.nettyRemotingServer.registerProcessor(CommandType.PROCESS_HOST_UPDATE_REQUEST, hostUpdateProcessor);
-
- // logger server
- this.nettyRemotingServer.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, loggerRequestProcessor);
- this.nettyRemotingServer.registerProcessor(CommandType.ROLL_VIEW_LOG_REQUEST, loggerRequestProcessor);
- this.nettyRemotingServer.registerProcessor(CommandType.VIEW_WHOLE_LOG_REQUEST, loggerRequestProcessor);
- this.nettyRemotingServer.registerProcessor(CommandType.REMOVE_TAK_LOG_REQUEST, loggerRequestProcessor);
-
- this.nettyRemotingServer.start();
-
- // install task plugin
- this.taskPluginManager.installPlugin();
+ this.workerRpcServer.start();
- // worker registry
- try {
- this.workerRegistryClient.registry();
- this.workerRegistryClient.setRegistryStoppable(this);
- Set<String> workerZkPaths = this.workerRegistryClient.getWorkerZkPaths();
+ this.taskPluginManager.installPlugin();
- this.workerRegistryClient.handleDeadServer(workerZkPaths, NodeType.WORKER, Constants.DELETE_OP);
- } catch (Exception e) {
- logger.error(e.getMessage(), e);
- throw new RuntimeException(e);
- }
+ this.workerRegistryClient.registry();
+ this.workerRegistryClient.setRegistryStoppable(this);
+ Set<String> workerZkPaths = this.workerRegistryClient.getWorkerZkPaths();
+ this.workerRegistryClient.handleDeadServer(workerZkPaths, NodeType.WORKER, Constants.DELETE_OP);
- // task execute manager
this.workerManagerThread.start();
- // retry report task status
this.retryReportTaskStatusThread.start();
/*
@@ -181,7 +119,7 @@ public class WorkerServer implements IStoppable {
*/
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
if (Stopper.isRunning()) {
- close("shutdownHook");
+ close("WorkerServer shutdown hook");
}
}));
}
@@ -189,24 +127,23 @@ public class WorkerServer implements IStoppable {
public void close(String cause) {
try {
// execute only once
- if (Stopper.isStopped()) {
+ // set stop signal is true
+ if (!Stopper.stop()) {
+ logger.warn("WorkerServer is already stopped, current cause: {}", cause);
return;
}
- logger.info("worker server is stopping ..., cause : {}", cause);
-
- // set stop signal is true
- Stopper.stop();
+ logger.info("Worker server is stopping, current cause : {}", cause);
try {
// thread sleep 3 seconds for thread quitely stop
- Thread.sleep(3000L);
+ Thread.sleep(Constants.SERVER_CLOSE_WAIT_TIME.toMillis());
} catch (Exception e) {
- logger.warn("thread sleep exception", e);
+ logger.warn("Worker server close wait error", e);
}
// close
- this.nettyRemotingServer.close();
+ this.workerRpcServer.close();
this.workerRegistryClient.unRegistry();
this.alertClientService.close();
@@ -215,8 +152,9 @@ public class WorkerServer implements IStoppable {
// close the application context
this.springApplicationContext.close();
+ logger.info("Worker server stopped, current cause: {}", cause);
} catch (Exception e) {
- logger.error("worker server stop exception ", e);
+ logger.error("Worker server stop failed, current cause: {}", cause, e);
}
}
@@ -230,15 +168,22 @@ public class WorkerServer implements IStoppable {
*/
public void killAllRunningTasks() {
Collection<TaskExecutionContext> taskRequests = TaskExecutionContextCacheManager.getAllTaskRequestList();
- logger.info("ready to kill all cache job, job size:{}", taskRequests.size());
-
if (CollectionUtils.isEmpty(taskRequests)) {
return;
}
-
+ logger.info("Worker begin to kill all cache task, task size: {}", taskRequests.size());
+ int killNumber = 0;
for (TaskExecutionContext taskRequest : taskRequests) {
// kill task when it's not finished yet
- org.apache.dolphinscheduler.plugin.task.api.ProcessUtils.kill(taskRequest);
+ try {
+ LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskRequest.getProcessInstanceId(), taskRequest.getTaskInstanceId());
+ if (ProcessUtils.kill(taskRequest)) {
+ killNumber++;
+ }
+ } finally {
+ LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
+ }
}
+ logger.info("Worker after kill all cache task, task size: {}, killed number: {}", taskRequests.size(), killNumber);
}
}
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/prc/WorkerRpcServer.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/prc/WorkerRpcServer.java
new file mode 100644
index 0000000000..acb0ac6508
--- /dev/null
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/prc/WorkerRpcServer.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.prc;
+
+import org.apache.dolphinscheduler.remote.NettyRemotingServer;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
+import org.apache.dolphinscheduler.server.log.LoggerRequestProcessor;
+import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
+import org.apache.dolphinscheduler.server.worker.processor.HostUpdateProcessor;
+import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor;
+import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteResponseAckProcessor;
+import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteRunningAckProcessor;
+import org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor;
+import org.apache.dolphinscheduler.server.worker.processor.TaskRecallAckProcessor;
+
+import java.io.Closeable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+@Service
+public class WorkerRpcServer implements Closeable {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(WorkerRpcServer.class);
+
+ @Autowired
+ private TaskExecuteProcessor taskExecuteProcessor;
+
+ @Autowired
+ private TaskKillProcessor taskKillProcessor;
+
+ @Autowired
+ private TaskRecallAckProcessor taskRecallAckProcessor;
+
+ @Autowired
+ private TaskExecuteRunningAckProcessor taskExecuteRunningAckProcessor;
+
+ @Autowired
+ private TaskExecuteResponseAckProcessor taskExecuteResponseAckProcessor;
+
+ @Autowired
+ private HostUpdateProcessor hostUpdateProcessor;
+
+ @Autowired
+ private LoggerRequestProcessor loggerRequestProcessor;
+
+ @Autowired
+ private WorkerConfig workerConfig;
+
+ private NettyRemotingServer nettyRemotingServer;
+
+ public void start() {
+ LOGGER.info("Worker rpc server starting");
+ NettyServerConfig serverConfig = new NettyServerConfig();
+ serverConfig.setListenPort(workerConfig.getListenPort());
+ this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
+ this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_REQUEST, taskExecuteProcessor);
+ this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, taskKillProcessor);
+ this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RUNNING_ACK, taskExecuteRunningAckProcessor);
+ this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE_ACK, taskExecuteResponseAckProcessor);
+ this.nettyRemotingServer.registerProcessor(CommandType.PROCESS_HOST_UPDATE_REQUEST, hostUpdateProcessor);
+ this.nettyRemotingServer.registerProcessor(CommandType.TASK_RECALL_ACK, taskRecallAckProcessor);
+ // logger server
+ this.nettyRemotingServer.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, loggerRequestProcessor);
+ this.nettyRemotingServer.registerProcessor(CommandType.ROLL_VIEW_LOG_REQUEST, loggerRequestProcessor);
+ this.nettyRemotingServer.registerProcessor(CommandType.VIEW_WHOLE_LOG_REQUEST, loggerRequestProcessor);
+ this.nettyRemotingServer.registerProcessor(CommandType.REMOVE_TAK_LOG_REQUEST, loggerRequestProcessor);
+ this.nettyRemotingServer.start();
+ LOGGER.info("Worker rpc server started");
+ }
+
+ @Override
+ public void close() {
+ LOGGER.info("Worker rpc server closing");
+ this.nettyRemotingServer.close();
+ LOGGER.info("Worker rpc server closed");
+ }
+
+}
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
index 34382257d7..b355c6702b 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
@@ -18,10 +18,12 @@
package org.apache.dolphinscheduler.server.worker.processor;
import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.storage.StorageOperate;
import org.apache.dolphinscheduler.common.utils.CommonUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
@@ -90,15 +92,18 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
@Autowired
private WorkerManagerThread workerManager;
- @Counted(value = "dolphinscheduler_task_execution_count", description = "task execute total count")
- @Timed(value = "dolphinscheduler_task_execution_timer", percentiles = {0.5, 0.75, 0.95, 0.99}, histogram = true)
+ @Autowired(required = false)
+ private StorageOperate storageOperate;
+
+ @Counted(value = "ds.task.execution.count", description = "task execute total count")
+ @Timed(value = "ds.task.execution.duration", percentiles = {0.5, 0.75, 0.95, 0.99}, histogram = true)
@Override
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.TASK_EXECUTE_REQUEST == command.getType(),
- String.format("invalid command type : %s", command.getType()));
+ String.format("invalid command type : %s", command.getType()));
- TaskExecuteRequestCommand taskRequestCommand = JSONUtils.parseObject(
- command.getBody(), TaskExecuteRequestCommand.class);
+ TaskExecuteRequestCommand taskRequestCommand = JSONUtils.parseObject(command.getBody(),
+ TaskExecuteRequestCommand.class);
if (taskRequestCommand == null) {
logger.error("task execute request command is null");
@@ -113,70 +118,98 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
logger.error("task execution context is null");
return;
}
- TaskMetrics.incrTaskTypeExecuteCount(taskExecutionContext.getTaskType());
-
- // set cache, it will be used when kill task
- TaskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext);
-
- // todo custom logger
-
- taskExecutionContext.setHost(NetUtils.getAddr(workerConfig.getListenPort()));
- taskExecutionContext.setLogPath(LogUtils.getTaskLogPath(taskExecutionContext));
-
- if (Constants.DRY_RUN_FLAG_NO == taskExecutionContext.getDryRun()) {
- if (CommonUtils.isSudoEnable() && workerConfig.isTenantAutoCreate()) {
- OSUtils.createUserIfAbsent(taskExecutionContext.getTenantCode());
- }
-
- // check if the OS user exists
- if (!OSUtils.getUserList().contains(taskExecutionContext.getTenantCode())) {
- logger.error("tenantCode: {} does not exist, taskInstanceId: {}",
- taskExecutionContext.getTenantCode(), taskExecutionContext.getTaskInstanceId());
- TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
- taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE);
- taskExecutionContext.setEndTime(new Date());
- taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
- return;
- }
-
- // local execute path
- String execLocalPath = getExecLocalPath(taskExecutionContext);
- logger.info("task instance local execute path : {}", execLocalPath);
- taskExecutionContext.setExecutePath(execLocalPath);
-
- try {
- FileUtils.createWorkDirIfAbsent(execLocalPath);
- } catch (Throwable ex) {
- logger.error("create execLocalPath fail, path: {}, taskInstanceId: {}", execLocalPath, taskExecutionContext.getTaskInstanceId());
- logger.error("create executeLocalPath fail", ex);
- TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
- taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE);
- taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
- return;
+ try {
+ LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(),
+ taskExecutionContext.getTaskInstanceId());
+
+ TaskMetrics.incrTaskTypeExecuteCount(taskExecutionContext.getTaskType());
+
+ // set cache, it will be used when kill task
+ TaskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext);
+
+ // todo custom logger
+
+ taskExecutionContext.setHost(NetUtils.getAddr(workerConfig.getListenPort()));
+ taskExecutionContext.setLogPath(LogUtils.getTaskLogPath(taskExecutionContext));
+
+ if (Constants.DRY_RUN_FLAG_NO == taskExecutionContext.getDryRun()) {
+ boolean osUserExistFlag;
+ //if Using distributed is true and Currently supported systems are linux,Should not let it automatically
+ //create tenants,so TenantAutoCreate has no effect
+ if (workerConfig.isTenantDistributedUser() && SystemUtils.IS_OS_LINUX) {
+ //use the id command to judge in linux
+ osUserExistFlag = OSUtils.existTenantCodeInLinux(taskExecutionContext.getTenantCode());
+ } else if (CommonUtils.isSudoEnable() && workerConfig.isTenantAutoCreate()) {
+ // if not exists this user, then create
+ OSUtils.createUserIfAbsent(taskExecutionContext.getTenantCode());
+ osUserExistFlag = OSUtils.getUserList().contains(taskExecutionContext.getTenantCode());
+ } else {
+ osUserExistFlag = OSUtils.getUserList().contains(taskExecutionContext.getTenantCode());
+ }
+ if (Constants.DRY_RUN_FLAG_NO == taskExecutionContext.getDryRun()) {
+ if (CommonUtils.isSudoEnable() && workerConfig.isTenantAutoCreate()) {
+ OSUtils.createUserIfAbsent(taskExecutionContext.getTenantCode());
+ }
+
+ // check if the OS user exists
+ if (!OSUtils.getUserList().contains(taskExecutionContext.getTenantCode())) {
+ logger.error("tenantCode: {} does not exist, taskInstanceId: {}",
+ taskExecutionContext.getTenantCode(),
+ taskExecutionContext.getTaskInstanceId());
+ TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
+ taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE);
+ taskExecutionContext.setEndTime(new Date());
+ taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
+ return;
+ }
+
+ // local execute path
+ String execLocalPath = getExecLocalPath(taskExecutionContext);
+ logger.info("task instance local execute path : {}", execLocalPath);
+ taskExecutionContext.setExecutePath(execLocalPath);
+
+ try {
+ FileUtils.createWorkDirIfAbsent(execLocalPath);
+ } catch (Throwable ex) {
+ logger.error("create execLocalPath fail, path: {}, taskInstanceId: {}",
+ execLocalPath,
+ taskExecutionContext.getTaskInstanceId());
+ logger.error("create executeLocalPath fail", ex);
+ TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
+ taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE);
+ taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
+ return;
+ }
+ }
+
+ taskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(),
+ new NettyRemoteChannel(channel, command.getOpaque()));
+
+ // delay task process
+ long remainTime = DateUtils.getRemainTime(taskExecutionContext.getFirstSubmitTime(),
+ taskExecutionContext.getDelayTime() * 60L);
+ if (remainTime > 0) {
+ logger.info("delay the execution of task instance {}, delay time: {} s",
+ taskExecutionContext.getTaskInstanceId(),
+ remainTime);
+ taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.DELAY_EXECUTION);
+ taskExecutionContext.setStartTime(null);
+ taskCallbackService.sendTaskExecuteDelayCommand(taskExecutionContext);
+ }
+
+ // submit task to manager
+ boolean offer = workerManager.offer(new TaskExecuteThread(taskExecutionContext,
+ taskCallbackService,
+ alertClientService,
+ taskPluginManager));
+ if (!offer) {
+ logger.error("submit task to manager error, queue is full, queue size is {}, taskInstanceId: {}",
+ workerManager.getDelayQueueSize(),
+ taskExecutionContext.getTaskInstanceId());
+ taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE);
+ taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
+ }
}
- }
-
- taskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(),
- new NettyRemoteChannel(channel, command.getOpaque()));
-
- // delay task process
- long remainTime = DateUtils.getRemainTime(taskExecutionContext.getFirstSubmitTime(), taskExecutionContext.getDelayTime() * 60L);
- if (remainTime > 0) {
- logger.info("delay the execution of task instance {}, delay time: {} s", taskExecutionContext.getTaskInstanceId(), remainTime);
- taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.DELAY_EXECUTION);
- taskExecutionContext.setStartTime(null);
- taskCallbackService.sendTaskExecuteDelayCommand(taskExecutionContext);
- }
-
- // submit task to manager
- boolean offer = workerManager.offer(new TaskExecuteThread(taskExecutionContext, taskCallbackService, alertClientService, taskPluginManager));
- if (!offer) {
- logger.error("submit task to manager error, queue is full, queue size is {}, taskInstanceId: {}",
- workerManager.getDelayQueueSize(), taskExecutionContext.getTaskInstanceId());
- taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE);
- taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
- }
- }
/**
* get execute local path
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteRunningAckProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteRunningAckProcessor.java
index 9d74dc5dcc..421e92a530 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteRunningAckProcessor.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteRunningAckProcessor.java
@@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.server.worker.processor;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
@@ -44,19 +45,23 @@ public class TaskExecuteRunningAckProcessor implements NettyRequestProcessor {
@Override
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.TASK_EXECUTE_RUNNING_ACK == command.getType(),
- String.format("invalid command type : %s", command.getType()));
+ String.format("invalid command type : %s", command.getType()));
TaskExecuteRunningAckCommand runningAckCommand = JSONUtils.parseObject(
- command.getBody(), TaskExecuteRunningAckCommand.class);
-
+ command.getBody(), TaskExecuteRunningAckCommand.class);
if (runningAckCommand == null) {
logger.error("task execute running ack command is null");
return;
}
- logger.info("task execute running ack command : {}", runningAckCommand);
+ try {
+ LoggerUtils.setTaskInstanceIdMDC(runningAckCommand.getTaskInstanceId());
+ logger.info("task execute running ack command : {}", runningAckCommand);
- if (runningAckCommand.getStatus() == ExecutionStatus.SUCCESS.getCode()) {
- ResponseCache.get().removeRunningCache(runningAckCommand.getTaskInstanceId());
+ if (runningAckCommand.getStatus() == ExecutionStatus.SUCCESS.getCode()) {
+ ResponseCache.get().removeRunningCache(runningAckCommand.getTaskInstanceId());
+ }
+ } finally {
+ LoggerUtils.removeTaskInstanceIdMDC();
}
}
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java
index fc737ca1de..0e48465a07 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java
@@ -47,9 +47,11 @@ public class RetryReportTaskStatusThread implements Runnable {
private TaskCallbackService taskCallbackService;
public void start() {
+ logger.info("Retry report task status thread starting");
Thread thread = new Thread(this, "RetryReportTaskStatusThread");
thread.setDaemon(true);
thread.start();
+ logger.info("Retry report task status thread started");
}
/**
@@ -83,7 +85,7 @@ public class RetryReportTaskStatusThread implements Runnable {
}
}
} catch (Exception e) {
- logger.warn("retry report task status error", e);
+ logger.warn("Retry report task status error", e);
}
}
}
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
index e6ff2b5653..ae89e9e946 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
@@ -76,14 +76,6 @@ public class TaskExecuteThread implements Runnable, Delayed {
*/
private TaskExecutionContext taskExecutionContext;
- public StorageOperate getStorageOperate() {
- return storageOperate;
- }
-
- public void setStorageOperate(StorageOperate storageOperate) {
- this.storageOperate = storageOperate;
- }
-
private StorageOperate storageOperate;
/**
@@ -107,24 +99,28 @@ public class TaskExecuteThread implements Runnable, Delayed {
* constructor
*
* @param taskExecutionContext taskExecutionContext
- * @param taskCallbackService taskCallbackService
+ * @param taskCallbackService taskCallbackService
*/
public TaskExecuteThread(TaskExecutionContext taskExecutionContext,
TaskCallbackService taskCallbackService,
- AlertClientService alertClientService) {
+ AlertClientService alertClientService,
+ StorageOperate storageOperate) {
this.taskExecutionContext = taskExecutionContext;
this.taskCallbackService = taskCallbackService;
this.alertClientService = alertClientService;
+ this.storageOperate = storageOperate;
}
public TaskExecuteThread(TaskExecutionContext taskExecutionContext,
TaskCallbackService taskCallbackService,
AlertClientService alertClientService,
- TaskPluginManager taskPluginManager) {
+ TaskPluginManager taskPluginManager,
+ StorageOperate storageOperate) {
this.taskExecutionContext = taskExecutionContext;
this.taskCallbackService = taskCallbackService;
this.alertClientService = alertClientService;
this.taskPluginManager = taskPluginManager;
+ this.storageOperate = storageOperate;
}
@Override
@@ -139,6 +135,7 @@ public class TaskExecuteThread implements Runnable, Delayed {
}
try {
+ LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId());
logger.info("script path : {}", taskExecutionContext.getExecutePath());
if (taskExecutionContext.getStartTime() == null) {
taskExecutionContext.setStartTime(new Date());
@@ -151,7 +148,7 @@ public class TaskExecuteThread implements Runnable, Delayed {
// copy hdfs/minio file to local
List<Pair<String, String>> fileDownloads = downloadCheck(taskExecutionContext.getExecutePath(), taskExecutionContext.getResources());
- if (!fileDownloads.isEmpty()){
+ if (!fileDownloads.isEmpty()) {
downloadResource(taskExecutionContext.getExecutePath(), logger, fileDownloads);
}
@@ -211,6 +208,7 @@ public class TaskExecuteThread implements Runnable, Delayed {
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
clearTaskExecPath();
+ LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
}
}
@@ -308,11 +306,12 @@ public class TaskExecuteThread implements Runnable, Delayed {
/**
* download resource check
+ *
* @param execLocalPath
* @param projectRes
* @return
*/
- public List<Pair<String, String>> downloadCheck(String execLocalPath, Map<String, String> projectRes){
+ public List<Pair<String, String>> downloadCheck(String execLocalPath, Map<String, String> projectRes) {
if (MapUtils.isEmpty(projectRes)) {
return Collections.emptyList();
}
@@ -320,13 +319,13 @@ public class TaskExecuteThread implements Runnable, Delayed {
projectRes.forEach((key, value) -> {
File resFile = new File(execLocalPath, key);
boolean notExist = !resFile.exists();
- if (notExist){
+ if (notExist) {
downloadFile.add(Pair.of(key, value));
- } else{
+ } else {
logger.info("file : {} exists ", resFile.getName());
}
});
- if (!downloadFile.isEmpty() && !PropertyUtils.getResUploadStartupState()){
+ if (!downloadFile.isEmpty() && !PropertyUtils.getResUploadStartupState()) {
throw new StorageOperateNoConfiguredException("Storage service config does not exist!");
}
return downloadFile;
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
index 2702d4af42..ae735f4338 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
@@ -24,13 +24,10 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
-import org.apache.dolphinscheduler.server.worker.metrics.WorkerServerMetrics;
import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ThreadPoolExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -132,9 +129,11 @@ public class WorkerManagerThread implements Runnable {
}
public void start() {
+ logger.info("Worker manager thread starting");
Thread thread = new Thread(this, this.getClass().getName());
thread.setDaemon(true);
thread.start();
+ logger.info("Worker manager thread started");
}
@Override
diff --git a/dolphinscheduler-worker/src/main/resources/logback-spring.xml b/dolphinscheduler-worker/src/main/resources/logback-spring.xml
index c6d8ee415d..571a3addd2 100644
--- a/dolphinscheduler-worker/src/main/resources/logback-spring.xml
+++ b/dolphinscheduler-worker/src/main/resources/logback-spring.xml
@@ -22,7 +22,7 @@
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>
- [%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} %logger{96}:[%line] - %msg%n
+ [%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} %logger{96}:[%line] - [WorkflowInstance-%X{workflowInstanceId:-0}][TaskInstance-%X{taskInstanceId:-0}] - %msg%n
</pattern>
<charset>UTF-8</charset>
</encoder>
@@ -58,7 +58,7 @@
</rollingPolicy>
<encoder>
<pattern>
- [%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} %logger{96}:[%line] - %msg%n
+ [%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} %logger{96}:[%line] - [WorkflowInstance-%X{workflowInstanceId:-0}][TaskInstance-%X{taskInstanceId:-0}] - %msg%n
</pattern>
<charset>UTF-8</charset>
</encoder>
diff --git a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java
index 2bb23cad6a..13c533f1cf 100644
--- a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java
+++ b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java
@@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.worker.processor;
+import org.apache.dolphinscheduler.common.storage.StorageOperate;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
@@ -53,7 +54,7 @@ import org.slf4j.Logger;
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest({SpringApplicationContext.class, TaskCallbackService.class, WorkerConfig.class, FileUtils.class,
- JsonSerializer.class, JSONUtils.class, ThreadUtils.class, ExecutorService.class, ChannelUtils.class})
+ JsonSerializer.class, JSONUtils.class, ThreadUtils.class, ExecutorService.class, ChannelUtils.class})
@Ignore
public class TaskExecuteProcessorTest {
@@ -63,6 +64,8 @@ public class TaskExecuteProcessorTest {
private ExecutorService workerExecService;
+ private StorageOperate storageOperate;
+
private WorkerConfig workerConfig;
private Command command;
@@ -99,19 +102,23 @@ public class TaskExecuteProcessorTest {
PowerMockito.mockStatic(SpringApplicationContext.class);
PowerMockito.when(SpringApplicationContext.getBean(TaskCallbackService.class))
- .thenReturn(taskCallbackService);
+ .thenReturn(taskCallbackService);
PowerMockito.when(SpringApplicationContext.getBean(WorkerConfig.class))
- .thenReturn(workerConfig);
+ .thenReturn(workerConfig);
workerManager = PowerMockito.mock(WorkerManagerThread.class);
- PowerMockito.when(workerManager.offer(new TaskExecuteThread(taskExecutionContext, taskCallbackService, alertClientService))).thenReturn(Boolean.TRUE);
+
+ storageOperate = PowerMockito.mock(StorageOperate.class);
+ PowerMockito.when(
+ workerManager.offer(new TaskExecuteThread(taskExecutionContext, taskCallbackService, alertClientService, storageOperate)))
+ .thenReturn(Boolean.TRUE);
PowerMockito.when(SpringApplicationContext.getBean(WorkerManagerThread.class))
- .thenReturn(workerManager);
+ .thenReturn(workerManager);
PowerMockito.mockStatic(ThreadUtils.class);
PowerMockito.when(ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", workerConfig.getExecThreads()))
- .thenReturn(workerExecService);
+ .thenReturn(workerExecService);
PowerMockito.mockStatic(JsonSerializer.class);
PowerMockito.when(JsonSerializer.deserialize(command.getBody(), TaskExecuteRequestCommand.class))
@@ -125,16 +132,17 @@ public class TaskExecuteProcessorTest {
PowerMockito.mockStatic(FileUtils.class);
PowerMockito.when(FileUtils.getProcessExecDir(taskExecutionContext.getProjectCode(),
- taskExecutionContext.getProcessDefineCode(),
- taskExecutionContext.getProcessDefineVersion(),
- taskExecutionContext.getProcessInstanceId(),
- taskExecutionContext.getTaskInstanceId()))
- .thenReturn(taskExecutionContext.getExecutePath());
+ taskExecutionContext.getProcessDefineCode(),
+ taskExecutionContext.getProcessDefineVersion(),
+ taskExecutionContext.getProcessInstanceId(),
+ taskExecutionContext.getTaskInstanceId()))
+ .thenReturn(taskExecutionContext.getExecutePath());
PowerMockito.doNothing().when(FileUtils.class, "createWorkDirIfAbsent", taskExecutionContext.getExecutePath());
- SimpleTaskExecuteThread simpleTaskExecuteThread = new SimpleTaskExecuteThread(null, null, null, alertClientService);
+ SimpleTaskExecuteThread simpleTaskExecuteThread = new SimpleTaskExecuteThread(
+ null, null, null, alertClientService, storageOperate);
PowerMockito.whenNew(TaskExecuteThread.class).withAnyArguments()
- .thenReturn(simpleTaskExecuteThread);
+ .thenReturn(simpleTaskExecuteThread);
}
@Test
@@ -172,8 +180,12 @@ public class TaskExecuteProcessorTest {
private static class SimpleTaskExecuteThread extends TaskExecuteThread {
- public SimpleTaskExecuteThread(TaskExecutionContext taskExecutionContext, TaskCallbackService taskCallbackService, Logger taskLogger, AlertClientService alertClientService) {
- super(taskExecutionContext, taskCallbackService, alertClientService);
+ public SimpleTaskExecuteThread(TaskExecutionContext taskExecutionContext,
+ TaskCallbackService taskCallbackService,
+ Logger taskLogger,
+ AlertClientService alertClientService,
+ StorageOperate storageOperate) {
+ super(taskExecutionContext, taskCallbackService, alertClientService, storageOperate);
}
@Override
diff --git a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java
index f847690a6c..90577111b9 100644
--- a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java
+++ b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java
@@ -17,12 +17,20 @@
package org.apache.dolphinscheduler.server.worker.runner;
-import org.apache.commons.lang3.tuple.Pair;
+import org.apache.dolphinscheduler.common.storage.StorageOperate;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClientTest;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.task.TaskPluginManager;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -31,11 +39,6 @@ import org.powermock.modules.junit4.PowerMockRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
@RunWith(PowerMockRunner.class)
public class TaskExecuteThreadTest {
@@ -50,20 +53,24 @@ public class TaskExecuteThreadTest {
@Mock
private AlertClientService alertClientService;
+ @Mock
+ private StorageOperate storageOperate;
+
@Mock
private TaskPluginManager taskPluginManager;
@Test
- public void checkTest(){
- TaskExecuteThread taskExecuteThread = new TaskExecuteThread(taskExecutionContext, taskCallbackService, alertClientService, taskPluginManager);
+ public void checkTest() {
+ TaskExecuteThread taskExecuteThread = new TaskExecuteThread(taskExecutionContext, taskCallbackService,
+ alertClientService, taskPluginManager, storageOperate);
String path = "/";
Map<String, String> projectRes = new HashMap<>();
projectRes.put("shell", "shell.sh");
List<Pair<String, String>> downloads = new ArrayList<>();
- try{
+ try {
downloads = taskExecuteThread.downloadCheck(path, projectRes);
- }catch (Exception e){
+ } catch (Exception e) {
Assert.assertNotNull(e);
}
downloads.add(Pair.of("shell", "shell.sh"));