You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by le...@apache.org on 2021/10/02 05:09:05 UTC

[dolphinscheduler] branch dev updated: [Improvement#6438][Worker]remove meaningless DB query (#6440)

This is an automated email from the ASF dual-hosted git repository.

leonbao pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new ea2a8d2  [Improvement#6438][Worker]remove meaningless DB query (#6440)
ea2a8d2 is described below

commit ea2a8d26a0d42683309b8bda9ef24f4daaab266b
Author: Kirs <ac...@163.com>
AuthorDate: Sat Oct 2 13:09:00 2021 +0800

    [Improvement#6438][Worker]remove meaningless DB query (#6440)
    
    org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread
    In the process of task kill, it will query the DB to obtain the task instance object, but this judgment does not make any sense. In fact, the relevant context is already clear, so there is no need to query from DB.
---
 .../server/builder/TaskExecutionContextBuilder.java    |  1 +
 .../server/worker/runner/TaskExecuteThread.java        | 18 ++----------------
 .../service/queue/entity/TaskExecutionContext.java     | 14 ++++++++++++++
 3 files changed, 17 insertions(+), 16 deletions(-)

diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
index afc9b9a..807c521 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
@@ -61,6 +61,7 @@ public class TaskExecutionContextBuilder {
         taskExecutionContext.setResources(taskInstance.getResources());
         taskExecutionContext.setDelayTime(taskInstance.getDelayTime());
         taskExecutionContext.setVarPool(taskInstance.getVarPool());
+        taskExecutionContext.setDryRun(taskInstance.getDryRun());
         return this;
     }
 
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
index eaa7cc4..d1f1c39 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
@@ -32,7 +32,6 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.common.utils.LoggerUtils;
 import org.apache.dolphinscheduler.common.utils.OSUtils;
 import org.apache.dolphinscheduler.common.utils.RetryerUtils;
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
 import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
@@ -41,8 +40,6 @@ import org.apache.dolphinscheduler.server.worker.cache.ResponceCache;
 import org.apache.dolphinscheduler.server.worker.plugin.TaskPluginManager;
 import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
 import org.apache.dolphinscheduler.service.alert.AlertClientService;
-import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
-import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.spi.exception.PluginNotFoundException;
 import org.apache.dolphinscheduler.spi.task.AbstractTask;
@@ -114,11 +111,6 @@ public class TaskExecuteThread implements Runnable, Delayed {
     private TaskPluginManager taskPluginManager;
 
     /**
-     * process database access
-     */
-    protected ProcessService processService;
-
-    /**
      * constructor
      *
      * @param taskExecutionContext taskExecutionContext
@@ -130,7 +122,6 @@ public class TaskExecuteThread implements Runnable, Delayed {
         this.taskExecutionContext = taskExecutionContext;
         this.taskCallbackService = taskCallbackService;
         this.alertClientService = alertClientService;
-        this.processService = SpringApplicationContext.getBean(ProcessService.class);
     }
 
     public TaskExecuteThread(TaskExecutionContext taskExecutionContext,
@@ -141,7 +132,6 @@ public class TaskExecuteThread implements Runnable, Delayed {
         this.taskCallbackService = taskCallbackService;
         this.alertClientService = alertClientService;
         this.taskPluginManager = taskPluginManager;
-        this.processService = SpringApplicationContext.getBean(ProcessService.class);
     }
 
     @Override
@@ -167,8 +157,7 @@ public class TaskExecuteThread implements Runnable, Delayed {
             }
             logger.info("the task begins to execute. task instance id: {}", taskExecutionContext.getTaskInstanceId());
 
-            TaskInstance taskInstance = processService.findTaskInstanceById(taskExecutionContext.getTaskInstanceId());
-            int dryRun = taskInstance.getDryRun();
+            int dryRun = taskExecutionContext.getDryRun();
             // copy hdfs/minio file to local
             if (dryRun == Constants.DRY_RUN_FLAG_NO) {
                 downloadResource(taskExecutionContext.getExecutePath(),
@@ -294,10 +283,7 @@ public class TaskExecuteThread implements Runnable, Delayed {
         if (task != null) {
             try {
                 task.cancelApplication(true);
-                TaskInstance taskInstance = processService.findTaskInstanceById(taskExecutionContext.getTaskInstanceId());
-                if (taskInstance != null) {
-                    ProcessUtils.killYarnJob(taskExecutionContext);
-                }
+                ProcessUtils.killYarnJob(taskExecutionContext);
             } catch (Exception e) {
                 logger.error(e.getMessage(), e);
             }
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/entity/TaskExecutionContext.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/entity/TaskExecutionContext.java
index 9f73d82..609566a 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/entity/TaskExecutionContext.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/entity/TaskExecutionContext.java
@@ -233,6 +233,11 @@ public class TaskExecutionContext implements Serializable {
     private String varPool;
 
     /**
+     * dry run flag
+     */
+    private int dryRun;
+
+    /**
      * business param
      */
     private Map<String, Property> paramsMap;
@@ -552,6 +557,14 @@ public class TaskExecutionContext implements Serializable {
         this.sqoopTaskExecutionContext = sqoopTaskExecutionContext;
     }
 
+    public int getDryRun() {
+        return dryRun;
+    }
+
+    public void setDryRun(int dryRun) {
+        this.dryRun = dryRun;
+    }
+
     @Override
     public String toString() {
         return "TaskExecutionContext{"
@@ -579,6 +592,7 @@ public class TaskExecutionContext implements Serializable {
                 + ", projectCode=" + projectCode
                 + ", taskParams='" + taskParams + '\''
                 + ", envFile='" + envFile + '\''
+                + ", dryRun='" + dryRun + '\''
                 + ", definedParams=" + definedParams
                 + ", taskAppId='" + taskAppId + '\''
                 + ", taskTimeoutStrategy=" + taskTimeoutStrategy