You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by le...@apache.org on 2021/12/16 08:14:53 UTC
[dolphinscheduler] branch dev updated: add task-logger config (#7423)
This is an automated email from the ASF dual-hosted git repository.
leonbao pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 1cbc86b add task-logger config (#7423)
1cbc86b is described below
commit 1cbc86b9e6c699487fc331799c43bb18787ac4d8
Author: wind <ca...@users.noreply.github.com>
AuthorDate: Thu Dec 16 16:14:46 2021 +0800
add task-logger config (#7423)
Co-authored-by: caishunfeng <53...@qq.com>
---
.../server/master/config/MasterConfig.java | 9 +++++++++
.../master/runner/WorkflowExecuteThread.java | 2 +-
.../master/runner/task/BaseTaskProcessor.java | 23 ++++++++++------------
.../master/runner/task/CommonTaskProcessor.java | 6 +++---
.../master/runner/task/ConditionTaskProcessor.java | 6 +++---
.../master/runner/task/DependentTaskProcessor.java | 4 ++--
.../server/master/runner/task/ITaskProcessor.java | 2 +-
.../master/runner/task/SubTaskProcessor.java | 4 ++--
.../master/runner/task/SwitchTaskProcessor.java | 5 ++---
.../src/main/resources/application.yaml | 2 ++
.../src/main/resources/application.yaml | 2 ++
11 files changed, 37 insertions(+), 28 deletions(-)
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
index 88ecfbd..a76dd5e 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
@@ -39,6 +39,7 @@ public class MasterConfig {
private int stateWheelInterval;
private double maxCpuLoadAvg;
private double reservedMemory;
+ private boolean taskLogger;
public int getListenPort() {
return listenPort;
@@ -135,4 +136,12 @@ public class MasterConfig {
public void setReservedMemory(double reservedMemory) {
this.reservedMemory = reservedMemory;
}
+
+ public boolean isTaskLogger() {
+ return taskLogger;
+ }
+
+ public void setTaskLogger(boolean taskLogger) {
+ this.taskLogger = taskLogger;
+ }
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
index a974e6d..24d49e8 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
@@ -825,7 +825,7 @@ public class WorkflowExecuteThread implements Runnable {
// package task instance before submit
processService.packageTaskInstance(taskInstance, processInstance);
- boolean submit = taskProcessor.submit(taskInstance, processInstance, masterConfig.getTaskCommitRetryTimes(), masterConfig.getTaskCommitInterval());
+ boolean submit = taskProcessor.submit(taskInstance, processInstance, masterConfig.getTaskCommitRetryTimes(), masterConfig.getTaskCommitInterval(), masterConfig.isTaskLogger());
if (!submit) {
logger.error("process id:{} name:{} submit standby task id:{} name:{} failed!",
processInstance.getId(), processInstance.getName(),
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
index b4951f4..ceefa26 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
@@ -85,22 +85,16 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
/**
* pause task, common tasks donot need this.
- *
- * @return
*/
protected abstract boolean pauseTask();
/**
* kill task, all tasks need to realize this function
- *
- * @return
*/
protected abstract boolean killTask();
/**
* task timeout process
- *
- * @return
*/
protected abstract boolean taskTimeout();
@@ -134,7 +128,7 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
}
/**
- * @return
+ *
*/
protected boolean pause() {
if (paused) {
@@ -227,7 +221,10 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
/**
* set master task running logger.
*/
- public void setTaskExecutionLogger() {
+ public void setTaskExecutionLogger(boolean isTaskLogger) {
+ if (!isTaskLogger) {
+ return;
+ }
logger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
taskInstance.getFirstSubmitTime(),
processInstance.getProcessDefinitionCode(),
@@ -240,7 +237,7 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
* set procedure task relation
*
* @param procedureTaskExecutionContext procedureTaskExecutionContext
- * @param taskInstance taskInstance
+ * @param taskInstance taskInstance
*/
private void setProcedureTaskRelation(ProcedureTaskExecutionContext procedureTaskExecutionContext, TaskInstance taskInstance) {
ProcedureParameters procedureParameters = JSONUtils.parseObject(taskInstance.getTaskParams(), ProcedureParameters.class);
@@ -253,7 +250,7 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
* set datax task relation
*
* @param dataxTaskExecutionContext dataxTaskExecutionContext
- * @param taskInstance taskInstance
+ * @param taskInstance taskInstance
*/
protected void setDataxTaskRelation(DataxTaskExecutionContext dataxTaskExecutionContext, TaskInstance taskInstance) {
DataxParameters dataxParameters = JSONUtils.parseObject(taskInstance.getTaskParams(), DataxParameters.class);
@@ -278,7 +275,7 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
* set sqoop task relation
*
* @param sqoopTaskExecutionContext sqoopTaskExecutionContext
- * @param taskInstance taskInstance
+ * @param taskInstance taskInstance
*/
private void setSqoopTaskRelation(SqoopTaskExecutionContext sqoopTaskExecutionContext, TaskInstance taskInstance) {
SqoopParameters sqoopParameters = JSONUtils.parseObject(taskInstance.getTaskParams(), SqoopParameters.class);
@@ -309,7 +306,7 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
* set SQL task relation
*
* @param sqlTaskExecutionContext sqlTaskExecutionContext
- * @param taskInstance taskInstance
+ * @param taskInstance taskInstance
*/
private void setSQLTaskRelation(SQLTaskExecutionContext sqlTaskExecutionContext, TaskInstance taskInstance) {
SqlParameters sqlParameters = JSONUtils.parseObject(taskInstance.getTaskParams(), SqlParameters.class);
@@ -345,7 +342,7 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
/**
* whehter tenant is null
*
- * @param tenant tenant
+ * @param tenant tenant
* @param taskInstance taskInstance
* @return result
*/
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
index d44315a..f7053ff 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
@@ -54,14 +54,14 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
NettyExecutorManager nettyExecutorManager;
@Override
- public boolean submit(TaskInstance task, ProcessInstance processInstance, int maxRetryTimes, int commitInterval) {
+ public boolean submit(TaskInstance task, ProcessInstance processInstance, int maxRetryTimes, int commitInterval, boolean isTaskLogger) {
this.processInstance = processInstance;
this.taskInstance = processService.submitTaskWithRetry(processInstance, task, maxRetryTimes, commitInterval);
if (this.taskInstance == null) {
return false;
}
- setTaskExecutionLogger();
+ setTaskExecutionLogger(isTaskLogger);
int taskGroupId = task.getTaskGroupId();
if (taskGroupId > 0) {
boolean acquireTaskGroup = processService.acquireTaskGroup(task.getId(),
@@ -85,7 +85,7 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
@Override
public void dispatch(TaskInstance taskInstance, ProcessInstance processInstance) {
- this.dispatchTask(taskInstance,processInstance);
+ this.dispatchTask(taskInstance, processInstance);
}
@Override
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
index c3c65b3..bf2390a 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
@@ -71,7 +71,7 @@ public class ConditionTaskProcessor extends BaseTaskProcessor {
private TaskDefinition taskDefinition;
@Override
- public boolean submit(TaskInstance task, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval) {
+ public boolean submit(TaskInstance task, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval, boolean isTaskLogger) {
this.processInstance = processInstance;
this.taskInstance = processService.submitTaskWithRetry(processInstance, task, masterTaskCommitRetryTimes, masterTaskCommitInterval);
@@ -82,7 +82,7 @@ public class ConditionTaskProcessor extends BaseTaskProcessor {
taskInstance.getTaskCode(), taskInstance.getTaskDefinitionVersion()
);
- setTaskExecutionLogger();
+ setTaskExecutionLogger(isTaskLogger);
String threadLoggerInfoName = String.format(Constants.TASK_LOG_INFO_FORMAT, processService.formatTaskAppId(this.taskInstance));
Thread.currentThread().setName(threadLoggerInfoName);
initTaskParameters();
@@ -141,7 +141,7 @@ public class ConditionTaskProcessor extends BaseTaskProcessor {
}
private void initTaskParameters() {
- taskInstance.setLogPath(LogUtils.getTaskLogPath(taskInstance.getFirstSubmitTime(),processInstance.getProcessDefinitionCode(),
+ taskInstance.setLogPath(LogUtils.getTaskLogPath(taskInstance.getFirstSubmitTime(), processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion(),
taskInstance.getProcessInstanceId(),
taskInstance.getId()));
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
index 28cd0e7..6d311ee 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
@@ -81,7 +81,7 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
boolean allDependentItemFinished;
@Override
- public boolean submit(TaskInstance task, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval) {
+ public boolean submit(TaskInstance task, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval, boolean isTaskLogger) {
this.processInstance = processInstance;
this.taskInstance = task;
this.taskInstance = processService.submitTaskWithRetry(processInstance, task, masterTaskCommitRetryTimes, masterTaskCommitInterval);
@@ -97,7 +97,7 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
processInstance.getProcessDefinitionVersion(),
taskInstance.getProcessInstanceId(),
taskInstance.getId()));
- setTaskExecutionLogger();
+ setTaskExecutionLogger(isTaskLogger);
taskInstance.setHost(NetUtils.getAddr(masterConfig.getListenPort()));
taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
taskInstance.setStartTime(new Date());
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessor.java
index 8825337..66e49f9 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessor.java
@@ -32,7 +32,7 @@ public interface ITaskProcessor {
String getType();
- boolean submit(TaskInstance taskInstance, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval);
+ boolean submit(TaskInstance taskInstance, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval, boolean isTaskLogger);
ExecutionStatus taskState();
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
index 10d1b28..0f8ccbc 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
@@ -53,7 +53,7 @@ public class SubTaskProcessor extends BaseTaskProcessor {
private StateEventCallbackService stateEventCallbackService;
@Override
- public boolean submit(TaskInstance task, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval) {
+ public boolean submit(TaskInstance task, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval, boolean isTaskLogger) {
this.processInstance = processInstance;
taskDefinition = processService.findTaskDefinition(
task.getTaskCode(), task.getTaskDefinitionVersion()
@@ -63,7 +63,7 @@ public class SubTaskProcessor extends BaseTaskProcessor {
if (this.taskInstance == null) {
return false;
}
- setTaskExecutionLogger();
+ setTaskExecutionLogger(isTaskLogger);
return true;
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
index 0fa6e5d..a4259ce 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
@@ -65,8 +65,7 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
private DependResult conditionResult;
@Override
- public boolean submit(TaskInstance taskInstance, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval) {
-
+ public boolean submit(TaskInstance taskInstance, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval, boolean isTaskLogger) {
this.processInstance = processInstance;
this.taskInstance = processService.submitTaskWithRetry(processInstance, taskInstance, masterTaskCommitRetryTimes, masterTaskCommitInterval);
@@ -80,7 +79,7 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
processInstance.getProcessDefinitionVersion(),
taskInstance.getProcessInstanceId(),
taskInstance.getId()));
- setTaskExecutionLogger();
+ setTaskExecutionLogger(isTaskLogger);
taskInstance.setHost(NetUtils.getAddr(masterConfig.getListenPort()));
taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
taskInstance.setStartTime(new Date());
diff --git a/dolphinscheduler-master/src/main/resources/application.yaml b/dolphinscheduler-master/src/main/resources/application.yaml
index 6800496..f043196 100644
--- a/dolphinscheduler-master/src/main/resources/application.yaml
+++ b/dolphinscheduler-master/src/main/resources/application.yaml
@@ -104,6 +104,8 @@ master:
max-cpu-load-avg: -1
# master reserved memory, only lower than system available memory, master server can schedule. default value 0.3, the unit is G
reserved-memory: 0.3
+ # use task logger, default true; if true, it will create log for every task; if false, the task log will append to master log file
+ task-logger: true
server:
port: 5679
diff --git a/dolphinscheduler-standalone-server/src/main/resources/application.yaml b/dolphinscheduler-standalone-server/src/main/resources/application.yaml
index 4aacdba..bd98cd2 100644
--- a/dolphinscheduler-standalone-server/src/main/resources/application.yaml
+++ b/dolphinscheduler-standalone-server/src/main/resources/application.yaml
@@ -113,6 +113,8 @@ master:
max-cpu-load-avg: -1
# master reserved memory, only lower than system available memory, master server can schedule. default value 0.3, the unit is G
reserved-memory: 0.3
+ # use task logger, default true; if true, it will create log for every task; if false, the task log will append to master log file
+ task-logger: true
worker:
# worker listener port