You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by jo...@apache.org on 2020/03/09 08:09:31 UTC

[incubator-dolphinscheduler] branch refactor-worker updated: cancelTaskInstance set TaskExecutionContext host, logPath, executePath (#2126)

This is an automated email from the ASF dual-hosted git repository.

journey pushed a commit to branch refactor-worker
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/refactor-worker by this push:
     new 20af8ff  cancelTaskInstance set TaskExecutionContext host,logPath,executePath (#2126)
20af8ff is described below

commit 20af8ff93e7d41485def1adaec90ba79065bbc46
Author: qiaozhanwei <qi...@outlook.com>
AuthorDate: Mon Mar 9 16:09:25 2020 +0800

    cancelTaskInstance set TaskExecutionContext host,logPath,executePath (#2126)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    * TaskExecutionContext create modify
    
    * buildAckCommand taskInstanceId not set modify
    
    * java doc error modify
    
    * add comment
    
    * ExecutorManager interface add generic type
    
    * add TaskInstanceCacheManager receive Worker report result
    
    * TaskInstance setExecutePath
    
    * add TaskInstanceCacheManager to receive Worker Task result report
    
    * TaskInstanceCacheManager add remove method
    
    * add license
    
    * add dispatcht task method
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * taskInstanceCache is null ,need load from db
    
    * taskInstanceCache is null ,need load from db
    
    * taskInstanceCache is null ,need load from db
    
    * 1,worker TaskPros use TaskExecutionContext replase
    2,Master kill Task , KillTaskProcessor modify
    
    * worker remove db
    
    * ShellTask modify
    
    * master persistence processId and appIds
    
    * master persistence processId and appIds
    
    * master add kill task logic
    
    * master add kill task logic
    
    * master add kill task logic
    
    * javadoc error modify
    
    * remove chinese log
    
    * executeDirectly method add Override
    
    * remote module modify
    
    * TaskKillResponseProcessor command type modify
    
    * create buildKillCommand
    
    * host add host:port format
    
    * host add host:port format
    
    * TaskAckProcessor modify
    
    * TaskAckProcessor modify
    
    * task prioriry refator
    
    * remove ITaskQueue
    
    * task prioriry refator
    
    * remove ITaskQueue
    
    * TaskPriority refactor
    
    * remove logs
    
    * WorkerServer refactor
    
    * MasterSchedulerService modify
    
    * WorkerConfig listen port modify
    
    * modify master and worker listen port
    
    * cancelTaskInstance set TaskExecutionContext host,logPath,executePath
    
    * cancelTaskInstance set TaskExecutionContext host,logPath,executePath
    
    Co-authored-by: qiaozhanwei <qi...@analysys.com.cn>
---
 .../server/master/config/MasterConfig.java         |  2 +-
 .../master/runner/MasterSchedulerService.java      | 12 ++++--
 .../server/master/runner/MasterTaskExecThread.java |  3 ++
 .../server/utils/ProcessUtils.java                 |  8 ++--
 .../server/worker/config/WorkerConfig.java         |  2 +-
 .../server/worker/processor/TaskKillProcessor.java | 46 ++++++++++++----------
 6 files changed, 44 insertions(+), 29 deletions(-)

diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
index 7e6ae56..86dd1c9 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
@@ -46,7 +46,7 @@ public class MasterConfig {
     @Value("${master.host.selector:lowerWeight}")
     private String hostSelector;
 
-    @Value("${master.listen.port:45678}")
+    @Value("${master.listen.port:5678}")
     private int listenPort;
 
     public int getListenPort() {
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
index 6949ada..c1925e0 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
@@ -34,6 +34,7 @@ import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
+import javax.annotation.PostConstruct;
 import java.util.concurrent.ThreadPoolExecutor;
 
 /**
@@ -59,28 +60,33 @@ public class MasterSchedulerService extends Thread {
     @Autowired
     private ZKMasterClient zkMasterClient;
 
+    /**
+     * master config
+     */
     @Autowired
     private MasterConfig masterConfig;
 
     /**
      *  netty remoting client
      */
-    private final NettyRemotingClient nettyRemotingClient;
+    private NettyRemotingClient nettyRemotingClient;
 
     /**
      * master exec service
      */
-    private final ThreadPoolExecutor masterExecService;
+    private ThreadPoolExecutor masterExecService;
 
     /**
      * constructor of MasterSchedulerThread
      */
-    public MasterSchedulerService(){
+    @PostConstruct
+    public void init(){
         this.masterExecService = (ThreadPoolExecutor)ThreadUtils.newDaemonFixedThreadExecutor("Master-Exec-Thread", masterConfig.getMasterExecThreads());
         NettyClientConfig clientConfig = new NettyClientConfig();
         this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
     }
 
+    @Override
     public void start(){
         super.setName("MasterSchedulerThread");
         super.start();
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java
index 1197dc2..172794e 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java
@@ -184,6 +184,9 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
 
         TaskExecutionContext taskExecutionContext = new TaskExecutionContext();
         taskExecutionContext.setTaskInstanceId(taskInstance.getId());
+        taskExecutionContext.setHost(taskInstance.getHost());
+        taskExecutionContext.setLogPath(taskInstance.getLogPath());
+        taskExecutionContext.setExecutePath(taskInstance.getExecutePath());
         taskExecutionContext.setProcessId(taskInstance.getPid());
 
         ExecutionContext executionContext = new ExecutionContext(taskExecutionContext.toKillCommand(), ExecutorType.WORKER, taskExecutionContext.getWorkerGroup());
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java
index ee1f091..f29e7df 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java
@@ -273,15 +273,15 @@ public class ProcessUtils {
    * @param appIds      app id list
    * @param logger      logger
    * @param tenantCode  tenant code
-   * @param workDir     work dir
+   * @param executePath     execute path
    * @throws IOException io exception
    */
-  public static void cancelApplication(List<String> appIds, Logger logger, String tenantCode,String workDir)
+  public static void cancelApplication(List<String> appIds, Logger logger, String tenantCode,String executePath)
           throws IOException {
     if (appIds.size() > 0) {
       String appid = appIds.get(appIds.size() - 1);
       String commandFile = String
-              .format("%s/%s.kill", workDir, appid);
+              .format("%s/%s.kill", executePath, appid);
       String cmd = "yarn application -kill " + appid;
       try {
         StringBuilder sb = new StringBuilder();
@@ -309,7 +309,7 @@ public class ProcessUtils {
 
         Runtime.getRuntime().exec(runCmd);
       } catch (Exception e) {
-        logger.error("kill application failed", e);
+        logger.error("kill application error", e);
       }
     }
   }
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
index f3e701b..792f922 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
@@ -40,7 +40,7 @@ public class WorkerConfig {
     @Value("${worker.group: default}")
     private String workerGroup;
 
-    @Value("${worker.listen.port: 12345}")
+    @Value("${worker.listen.port: 1234}")
     private int listenPort;
 
     public int getListenPort() {
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
index c23f199..5a8c668 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
@@ -31,6 +31,7 @@ import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand;
 import org.apache.dolphinscheduler.remote.command.TaskKillResponseCommand;
 import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
 import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
+import org.apache.dolphinscheduler.remote.utils.Host;
 import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.server.utils.ProcessUtils;
 import org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCacheManager;
@@ -104,15 +105,24 @@ public class TaskKillProcessor implements NettyRequestProcessor {
 
 
             // find log and kill yarn job
-            killYarnJob(context.getHost(), context.getLogPath(), context.getExecutePath(), context.getTenantCode());
+            killYarnJob(Host.of(context.getHost()).getIp(),
+                    context.getLogPath(),
+                    context.getExecutePath(),
+                    context.getTenantCode());
 
             return true;
         } catch (Exception e) {
-            logger.error("kill task failed", e);
+            logger.error("kill task error", e);
             return false;
         }
     }
 
+    /**
+     * task kill process
+     *
+     * @param channel channel channel
+     * @param command command command
+     */
     @Override
     public void process(Channel channel, Command command) {
         Preconditions.checkArgument(CommandType.TASK_KILL_REQUEST == command.getType(), String.format("invalid command type : %s", command.getType()));
@@ -160,26 +170,18 @@ public class TaskKillProcessor implements NettyRequestProcessor {
      * @param executePath executePath
      * @param tenantCode tenantCode
      */
-    public void killYarnJob(String host, String logPath, String executePath, String tenantCode) {
-        List<String> appIds = null;
+    private void killYarnJob(String host, String logPath, String executePath, String tenantCode) {
+        LogClientService logClient = null;
         try {
-            Thread.sleep(Constants.SLEEP_TIME_MILLIS);
-            LogClientService logClient = null;
-            String log = null;
-            try {
-                logClient = new LogClientService();
-                logger.info("view log host : {},logPath : {}", host,logPath);
-                log = logClient.viewLog(host, Constants.RPC_PORT, logPath);
-            } finally {
-                if(logClient != null){
-                    logClient.close();
-                }
-            }
+            logClient = new LogClientService();
+            logger.info("view log host : {},logPath : {}", host,logPath);
+            String log  = logClient.viewLog(host, Constants.RPC_PORT, logPath);
+
             if (StringUtils.isNotEmpty(log)) {
-                appIds = LoggerUtils.getAppIds(log, logger);
+                List<String> appIds = LoggerUtils.getAppIds(log, logger);
                 if (StringUtils.isEmpty(executePath)) {
-                    logger.error("task instance work dir is empty");
-                    throw new RuntimeException("task instance work dir is empty");
+                    logger.error("task instance execute path is empty");
+                    throw new RuntimeException("task instance execute path is empty");
                 }
                 if (appIds.size() > 0) {
                     ProcessUtils.cancelApplication(appIds, logger, tenantCode, executePath);
@@ -187,7 +189,11 @@ public class TaskKillProcessor implements NettyRequestProcessor {
             }
 
         } catch (Exception e) {
-            logger.error("kill yarn job failure",e);
+            logger.error("kill yarn job error",e);
+        } finally {
+            if(logClient != null){
+                logClient.close();
+            }
         }
     }