You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by le...@apache.org on 2021/12/16 08:14:53 UTC

[dolphinscheduler] branch dev updated: add task-logger config (#7423)

This is an automated email from the ASF dual-hosted git repository.

leonbao pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 1cbc86b  add task-logger config (#7423)
1cbc86b is described below

commit 1cbc86b9e6c699487fc331799c43bb18787ac4d8
Author: wind <ca...@users.noreply.github.com>
AuthorDate: Thu Dec 16 16:14:46 2021 +0800

    add task-logger config (#7423)
    
    Co-authored-by: caishunfeng <53...@qq.com>
---
 .../server/master/config/MasterConfig.java         |  9 +++++++++
 .../master/runner/WorkflowExecuteThread.java       |  2 +-
 .../master/runner/task/BaseTaskProcessor.java      | 23 ++++++++++------------
 .../master/runner/task/CommonTaskProcessor.java    |  6 +++---
 .../master/runner/task/ConditionTaskProcessor.java |  6 +++---
 .../master/runner/task/DependentTaskProcessor.java |  4 ++--
 .../server/master/runner/task/ITaskProcessor.java  |  2 +-
 .../master/runner/task/SubTaskProcessor.java       |  4 ++--
 .../master/runner/task/SwitchTaskProcessor.java    |  5 ++---
 .../src/main/resources/application.yaml            |  2 ++
 .../src/main/resources/application.yaml            |  2 ++
 11 files changed, 37 insertions(+), 28 deletions(-)

diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
index 88ecfbd..a76dd5e 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
@@ -39,6 +39,7 @@ public class MasterConfig {
     private int stateWheelInterval;
     private double maxCpuLoadAvg;
     private double reservedMemory;
+    private boolean taskLogger;
 
     public int getListenPort() {
         return listenPort;
@@ -135,4 +136,12 @@ public class MasterConfig {
     public void setReservedMemory(double reservedMemory) {
         this.reservedMemory = reservedMemory;
     }
+
+    public boolean isTaskLogger() {
+        return taskLogger;
+    }
+
+    public void setTaskLogger(boolean taskLogger) {
+        this.taskLogger = taskLogger;
+    }
 }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
index a974e6d..24d49e8 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
@@ -825,7 +825,7 @@ public class WorkflowExecuteThread implements Runnable {
             // package task instance before submit
             processService.packageTaskInstance(taskInstance, processInstance);
 
-            boolean submit = taskProcessor.submit(taskInstance, processInstance, masterConfig.getTaskCommitRetryTimes(), masterConfig.getTaskCommitInterval());
+            boolean submit = taskProcessor.submit(taskInstance, processInstance, masterConfig.getTaskCommitRetryTimes(), masterConfig.getTaskCommitInterval(), masterConfig.isTaskLogger());
             if (!submit) {
                 logger.error("process id:{} name:{} submit standby task id:{} name:{} failed!",
                         processInstance.getId(), processInstance.getName(),
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
index b4951f4..ceefa26 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
@@ -85,22 +85,16 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
 
     /**
      * pause task, common tasks donot need this.
-     *
-     * @return
      */
     protected abstract boolean pauseTask();
 
     /**
      * kill task, all tasks need to realize this function
-     *
-     * @return
      */
     protected abstract boolean killTask();
 
     /**
      * task timeout process
-     *
-     * @return
      */
     protected abstract boolean taskTimeout();
 
@@ -134,7 +128,7 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
     }
 
     /**
-     * @return
+     *
      */
     protected boolean pause() {
         if (paused) {
@@ -227,7 +221,10 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
     /**
      * set master task running logger.
      */
-    public void setTaskExecutionLogger() {
+    public void setTaskExecutionLogger(boolean isTaskLogger) {
+        if (!isTaskLogger) {
+            return;
+        }
         logger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
                 taskInstance.getFirstSubmitTime(),
                 processInstance.getProcessDefinitionCode(),
@@ -240,7 +237,7 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
      * set procedure task relation
      *
      * @param procedureTaskExecutionContext procedureTaskExecutionContext
-     * @param taskInstance                  taskInstance
+     * @param taskInstance taskInstance
      */
     private void setProcedureTaskRelation(ProcedureTaskExecutionContext procedureTaskExecutionContext, TaskInstance taskInstance) {
         ProcedureParameters procedureParameters = JSONUtils.parseObject(taskInstance.getTaskParams(), ProcedureParameters.class);
@@ -253,7 +250,7 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
      * set datax task relation
      *
      * @param dataxTaskExecutionContext dataxTaskExecutionContext
-     * @param taskInstance              taskInstance
+     * @param taskInstance taskInstance
      */
     protected void setDataxTaskRelation(DataxTaskExecutionContext dataxTaskExecutionContext, TaskInstance taskInstance) {
         DataxParameters dataxParameters = JSONUtils.parseObject(taskInstance.getTaskParams(), DataxParameters.class);
@@ -278,7 +275,7 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
      * set sqoop task relation
      *
      * @param sqoopTaskExecutionContext sqoopTaskExecutionContext
-     * @param taskInstance              taskInstance
+     * @param taskInstance taskInstance
      */
     private void setSqoopTaskRelation(SqoopTaskExecutionContext sqoopTaskExecutionContext, TaskInstance taskInstance) {
         SqoopParameters sqoopParameters = JSONUtils.parseObject(taskInstance.getTaskParams(), SqoopParameters.class);
@@ -309,7 +306,7 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
      * set SQL task relation
      *
      * @param sqlTaskExecutionContext sqlTaskExecutionContext
-     * @param taskInstance            taskInstance
+     * @param taskInstance taskInstance
      */
     private void setSQLTaskRelation(SQLTaskExecutionContext sqlTaskExecutionContext, TaskInstance taskInstance) {
         SqlParameters sqlParameters = JSONUtils.parseObject(taskInstance.getTaskParams(), SqlParameters.class);
@@ -345,7 +342,7 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
     /**
      * whehter tenant is null
      *
-     * @param tenant       tenant
+     * @param tenant tenant
      * @param taskInstance taskInstance
      * @return result
      */
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
index d44315a..f7053ff 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
@@ -54,14 +54,14 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
     NettyExecutorManager nettyExecutorManager;
 
     @Override
-    public boolean submit(TaskInstance task, ProcessInstance processInstance, int maxRetryTimes, int commitInterval) {
+    public boolean submit(TaskInstance task, ProcessInstance processInstance, int maxRetryTimes, int commitInterval, boolean isTaskLogger) {
         this.processInstance = processInstance;
         this.taskInstance = processService.submitTaskWithRetry(processInstance, task, maxRetryTimes, commitInterval);
 
         if (this.taskInstance == null) {
             return false;
         }
-        setTaskExecutionLogger();
+        setTaskExecutionLogger(isTaskLogger);
         int taskGroupId = task.getTaskGroupId();
         if (taskGroupId > 0) {
             boolean acquireTaskGroup = processService.acquireTaskGroup(task.getId(),
@@ -85,7 +85,7 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
 
     @Override
     public void dispatch(TaskInstance taskInstance, ProcessInstance processInstance) {
-        this.dispatchTask(taskInstance,processInstance);
+        this.dispatchTask(taskInstance, processInstance);
     }
 
     @Override
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
index c3c65b3..bf2390a 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
@@ -71,7 +71,7 @@ public class ConditionTaskProcessor extends BaseTaskProcessor {
     private TaskDefinition taskDefinition;
 
     @Override
-    public boolean submit(TaskInstance task, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval) {
+    public boolean submit(TaskInstance task, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval, boolean isTaskLogger) {
         this.processInstance = processInstance;
         this.taskInstance = processService.submitTaskWithRetry(processInstance, task, masterTaskCommitRetryTimes, masterTaskCommitInterval);
 
@@ -82,7 +82,7 @@ public class ConditionTaskProcessor extends BaseTaskProcessor {
                 taskInstance.getTaskCode(), taskInstance.getTaskDefinitionVersion()
         );
 
-        setTaskExecutionLogger();
+        setTaskExecutionLogger(isTaskLogger);
         String threadLoggerInfoName = String.format(Constants.TASK_LOG_INFO_FORMAT, processService.formatTaskAppId(this.taskInstance));
         Thread.currentThread().setName(threadLoggerInfoName);
         initTaskParameters();
@@ -141,7 +141,7 @@ public class ConditionTaskProcessor extends BaseTaskProcessor {
     }
 
     private void initTaskParameters() {
-        taskInstance.setLogPath(LogUtils.getTaskLogPath(taskInstance.getFirstSubmitTime(),processInstance.getProcessDefinitionCode(),
+        taskInstance.setLogPath(LogUtils.getTaskLogPath(taskInstance.getFirstSubmitTime(), processInstance.getProcessDefinitionCode(),
                 processInstance.getProcessDefinitionVersion(),
                 taskInstance.getProcessInstanceId(),
                 taskInstance.getId()));
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
index 28cd0e7..6d311ee 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
@@ -81,7 +81,7 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
     boolean allDependentItemFinished;
 
     @Override
-    public boolean submit(TaskInstance task, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval) {
+    public boolean submit(TaskInstance task, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval, boolean isTaskLogger) {
         this.processInstance = processInstance;
         this.taskInstance = task;
         this.taskInstance = processService.submitTaskWithRetry(processInstance, task, masterTaskCommitRetryTimes, masterTaskCommitInterval);
@@ -97,7 +97,7 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
                 processInstance.getProcessDefinitionVersion(),
                 taskInstance.getProcessInstanceId(),
                 taskInstance.getId()));
-        setTaskExecutionLogger();
+        setTaskExecutionLogger(isTaskLogger);
         taskInstance.setHost(NetUtils.getAddr(masterConfig.getListenPort()));
         taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
         taskInstance.setStartTime(new Date());
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessor.java
index 8825337..66e49f9 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessor.java
@@ -32,7 +32,7 @@ public interface ITaskProcessor {
 
     String getType();
 
-    boolean submit(TaskInstance taskInstance, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval);
+    boolean submit(TaskInstance taskInstance, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval, boolean isTaskLogger);
 
     ExecutionStatus taskState();
 
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
index 10d1b28..0f8ccbc 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
@@ -53,7 +53,7 @@ public class SubTaskProcessor extends BaseTaskProcessor {
     private StateEventCallbackService stateEventCallbackService;
 
     @Override
-    public boolean submit(TaskInstance task, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval) {
+    public boolean submit(TaskInstance task, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval, boolean isTaskLogger) {
         this.processInstance = processInstance;
         taskDefinition = processService.findTaskDefinition(
                 task.getTaskCode(), task.getTaskDefinitionVersion()
@@ -63,7 +63,7 @@ public class SubTaskProcessor extends BaseTaskProcessor {
         if (this.taskInstance == null) {
             return false;
         }
-        setTaskExecutionLogger();
+        setTaskExecutionLogger(isTaskLogger);
         return true;
     }
 
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
index 0fa6e5d..a4259ce 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
@@ -65,8 +65,7 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
     private DependResult conditionResult;
 
     @Override
-    public boolean submit(TaskInstance taskInstance, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval) {
-
+    public boolean submit(TaskInstance taskInstance, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval, boolean isTaskLogger) {
         this.processInstance = processInstance;
         this.taskInstance = processService.submitTaskWithRetry(processInstance, taskInstance, masterTaskCommitRetryTimes, masterTaskCommitInterval);
 
@@ -80,7 +79,7 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
                 processInstance.getProcessDefinitionVersion(),
                 taskInstance.getProcessInstanceId(),
                 taskInstance.getId()));
-        setTaskExecutionLogger();
+        setTaskExecutionLogger(isTaskLogger);
         taskInstance.setHost(NetUtils.getAddr(masterConfig.getListenPort()));
         taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
         taskInstance.setStartTime(new Date());
diff --git a/dolphinscheduler-master/src/main/resources/application.yaml b/dolphinscheduler-master/src/main/resources/application.yaml
index 6800496..f043196 100644
--- a/dolphinscheduler-master/src/main/resources/application.yaml
+++ b/dolphinscheduler-master/src/main/resources/application.yaml
@@ -104,6 +104,8 @@ master:
   max-cpu-load-avg: -1
   # master reserved memory, only lower than system available memory, master server can schedule. default value 0.3, the unit is G
   reserved-memory: 0.3
+  # use task logger, default true; if true, it will create log for every task; if false, the task log will append to master log file
+  task-logger: true
 
 server:
   port: 5679
diff --git a/dolphinscheduler-standalone-server/src/main/resources/application.yaml b/dolphinscheduler-standalone-server/src/main/resources/application.yaml
index 4aacdba..bd98cd2 100644
--- a/dolphinscheduler-standalone-server/src/main/resources/application.yaml
+++ b/dolphinscheduler-standalone-server/src/main/resources/application.yaml
@@ -113,6 +113,8 @@ master:
   max-cpu-load-avg: -1
   # master reserved memory, only lower than system available memory, master server can schedule. default value 0.3, the unit is G
   reserved-memory: 0.3
+  # use task logger, default true; if true, it will create log for every task; if false, the task log will append to master log file
+  task-logger: true
 
 worker:
   # worker listener port