You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by jo...@apache.org on 2020/03/23 06:45:56 UTC

[incubator-dolphinscheduler] branch refactor-worker updated: Refactor worker (#2279)

This is an automated email from the ASF dual-hosted git repository.

journey pushed a commit to branch refactor-worker
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/refactor-worker by this push:
     new 92fd647  Refactor worker (#2279)
92fd647 is described below

commit 92fd6479a948883e47757833a77842e6bfbec101
Author: Tboy <gu...@immomo.com>
AuthorDate: Mon Mar 23 14:45:07 2020 +0800

    Refactor worker (#2279)
    
    * let quartz use the same datasource
    
    * move master/worker config from dao.properties to each config
    add master/worker registry test
    
    * move mybatis config from application.properties to SpringConnectionFactory
    
    * move mybatis-plus config from application.properties to SpringConnectionFactory
    
    * refactor TaskCallbackService
    
    * add ZookeeperNodeManagerTest
    
    * add NettyExecutorManagerTest
    
    * refactor TaskKillProcessor
---
 .../remote/command/TaskKillRequestCommand.java     |   2 +-
 .../server/entity/TaskExecutionContext.java        |   7 --
 .../server/master/runner/MasterTaskExecThread.java |  16 +--
 .../server/worker/processor/TaskKillProcessor.java | 116 ++++++++++-----------
 4 files changed, 60 insertions(+), 81 deletions(-)

diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillRequestCommand.java
index e5c756a..b8e02dd 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillRequestCommand.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillRequestCommand.java
@@ -1 +1 @@
-/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.dolphinscheduler.remote.command;

import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;

import java.io.Serializable;

/**
 *  kill task request command
 */
public
  class TaskKillRequestCommand implements Serializable {

    /**
     *  task execution context
     */
    private String taskExecutionContext;

    public String getTaskExecutionContext() {
        return taskExecutionContext;
    }

    public void setTaskExecutionContext(String taskExecutionContext) {
        this.taskExecutionContext = taskExecutionContext;
    }

    public TaskKillRequestCommand() {
    }

    public TaskKillRequestCommand(String taskExecutionContext) {
        this.taskExecutionContext = taskExecutionContext;
    }

    /**
     *  package request command
     *
     * @return command
     */
    public Command convert2Command(){
        Command command = new Command();
        command.setType(CommandType.TASK_KILL_REQUEST);
        byte[] body = FastJsonSerializer.serialize(this);
        command.setBody(body);
        return command;
    }

    @Override
    public String toString() {
        return "TaskKillRequestCommand{" +
                "taskExecutio
 nContext='" + taskExecutionContext + '\'' +
                '}';
    }
}
\ No newline at end of file
+/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.dolphinscheduler.remote.command;

import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;

import java.io.Serializable;

/**
 *  kill task request command
 */
public
  class TaskKillRequestCommand implements Serializable {

    /**
     *  task id
     */
    private int taskInstanceId;


    public int getTaskInstanceId() {
        return taskInstanceId;
    }

    public void setTaskInstanceId(int taskInstanceId) {
        this.taskInstanceId = taskInstanceId;
    }

    /**
     *  package request command
     *
     * @return command
     */
    public Command convert2Command(){
        Command command = new Command();
        command.setType(CommandType.TASK_KILL_REQUEST);
        byte[] body = FastJsonSerializer.serialize(this);
        command.setBody(body);
        return command;
    }

    @Override
    public String toString() {
        return "TaskKillRequestCommand{" +
                "taskInstanceId=" + taskInstanceId +
                '}';
    }
}
\ No newline at end of file
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java
index cdaa0f0..0d88d6a 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java
@@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.server.entity;
 
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
-import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand;
 import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
 
 import java.io.Serializable;
@@ -421,12 +420,6 @@ public class TaskExecutionContext implements Serializable{
         return requestCommand.convert2Command();
     }
 
-    public Command toKillCommand(){
-        TaskKillRequestCommand requestCommand = new TaskKillRequestCommand();
-        requestCommand.setTaskExecutionContext(FastJsonSerializer.serializeToString(this));
-        return requestCommand.convert2Command();
-    }
-
     @Override
     public String toString() {
         return "TaskExecutionContext{" +
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java
index 172794e..f220a09 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java
@@ -16,6 +16,7 @@
  */
 package org.apache.dolphinscheduler.server.master.runner;
 
+import com.alibaba.fastjson.JSONObject;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy;
@@ -25,9 +26,8 @@ import org.apache.dolphinscheduler.common.thread.Stopper;
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import com.alibaba.fastjson.JSONObject;
+import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand;
 import org.apache.dolphinscheduler.remote.utils.Host;
-import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
 import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl;
 import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
@@ -172,8 +172,6 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
 
 
     /**
-     *  TODO Kill TASK
-     *
      *  task instance add queue , waiting worker to kill
      */
     private void cancelTaskInstance() throws Exception{
@@ -182,14 +180,10 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
         }
         alreadyKilled = true;
 
-        TaskExecutionContext taskExecutionContext = new TaskExecutionContext();
-        taskExecutionContext.setTaskInstanceId(taskInstance.getId());
-        taskExecutionContext.setHost(taskInstance.getHost());
-        taskExecutionContext.setLogPath(taskInstance.getLogPath());
-        taskExecutionContext.setExecutePath(taskInstance.getExecutePath());
-        taskExecutionContext.setProcessId(taskInstance.getPid());
+        TaskKillRequestCommand killCommand = new TaskKillRequestCommand();
+        killCommand.setTaskInstanceId(taskInstance.getId());
 
-        ExecutionContext executionContext = new ExecutionContext(taskExecutionContext.toKillCommand(), ExecutorType.WORKER, taskExecutionContext.getWorkerGroup());
+        ExecutionContext executionContext = new ExecutionContext(killCommand.convert2Command(), ExecutorType.WORKER);
 
         Host host = Host.of(taskInstance.getHost());
         executionContext.setHost(host);
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
index 5a8c668..b6f5827 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
@@ -17,7 +17,6 @@
 
 package org.apache.dolphinscheduler.server.worker.processor;
 
-import com.alibaba.fastjson.JSONObject;
 import io.netty.channel.Channel;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
@@ -32,6 +31,7 @@ import org.apache.dolphinscheduler.remote.command.TaskKillResponseCommand;
 import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
 import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
 import org.apache.dolphinscheduler.remote.utils.Host;
+import org.apache.dolphinscheduler.remote.utils.Pair;
 import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.server.utils.ProcessUtils;
 import org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCacheManager;
@@ -42,6 +42,7 @@ import org.apache.dolphinscheduler.service.log.LogClientService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Collections;
 import java.util.List;
 
 /**
@@ -66,11 +67,6 @@ public class TaskKillProcessor implements NettyRequestProcessor {
      */
     private TaskExecutionContextCacheManager taskExecutionContextCacheManager;
 
-    /**
-     * appIds
-     */
-    private List<String> appIds;
-
 
     public TaskKillProcessor(){
         this.taskCallbackService = SpringApplicationContext.getBean(TaskCallbackService.class);
@@ -79,86 +75,80 @@ public class TaskKillProcessor implements NettyRequestProcessor {
     }
 
     /**
-     * kill task logic
+     * task kill process
      *
-     * @param context context
-     * @return execute result
+     * @param channel channel channel
+     * @param command command command
      */
-    private Boolean doKill(TaskExecutionContext context){
+    @Override
+    public void process(Channel channel, Command command) {
+        Preconditions.checkArgument(CommandType.TASK_KILL_REQUEST == command.getType(), String.format("invalid command type : %s", command.getType()));
+        TaskKillRequestCommand killCommand = FastJsonSerializer.deserialize(command.getBody(), TaskKillRequestCommand.class);
+        logger.info("received kill command : {}", killCommand);
+
+        Pair<Boolean, List<String>> result = doKill(killCommand);
+
+        taskCallbackService.addRemoteChannel(killCommand.getTaskInstanceId(),
+                new NettyRemoteChannel(channel, command.getOpaque()));
+
+        TaskKillResponseCommand taskKillResponseCommand = buildKillTaskResponseCommand(killCommand,result);
+        taskCallbackService.sendResult(taskKillResponseCommand.getTaskInstanceId(), taskKillResponseCommand.convert2Command());
+    }
+
+    /**
+     *  do kill
+     * @param killCommand
+     * @return kill result
+     */
+    private Pair<Boolean, List<String>> doKill(TaskKillRequestCommand killCommand){
+        List<String> appIds = Collections.EMPTY_LIST;
         try {
-            TaskExecutionContext taskExecutionContext = taskExecutionContextCacheManager.getByTaskInstanceId(context.getTaskInstanceId());
-            context.setProcessId(taskExecutionContext.getProcessId());
+            TaskExecutionContext taskExecutionContext = taskExecutionContextCacheManager.getByTaskInstanceId(killCommand.getTaskInstanceId());
 
             Integer processId = taskExecutionContext.getProcessId();
 
             if (processId == null || processId.equals(0)){
-                logger.error("process kill failed, process id :{}, task id:{}", processId, context.getTaskInstanceId());
-                return false;
+                logger.error("process kill failed, process id :{}, task id:{}", processId, killCommand.getTaskInstanceId());
+                return Pair.of(false, appIds);
             }
 
+            String cmd = String.format("sudo kill -9 %s", ProcessUtils.getPidsStr(taskExecutionContext.getProcessId()));
 
-            String cmd = String.format("sudo kill -9 %s", ProcessUtils.getPidsStr(context.getProcessId()));
-
-            logger.info("process id:{}, cmd:{}", context.getProcessId(), cmd);
+            logger.info("process id:{}, cmd:{}", taskExecutionContext.getProcessId(), cmd);
 
             OSUtils.exeCmd(cmd);
 
-
             // find log and kill yarn job
-            killYarnJob(Host.of(context.getHost()).getIp(),
-                    context.getLogPath(),
-                    context.getExecutePath(),
-                    context.getTenantCode());
+            appIds = killYarnJob(Host.of(taskExecutionContext.getHost()).getIp(),
+                    taskExecutionContext.getLogPath(),
+                    taskExecutionContext.getExecutePath(),
+                    taskExecutionContext.getTenantCode());
 
-            return true;
+            return Pair.of(true, appIds);
         } catch (Exception e) {
             logger.error("kill task error", e);
-            return false;
         }
-    }
-
-    /**
-     * task kill process
-     *
-     * @param channel channel channel
-     * @param command command command
-     */
-    @Override
-    public void process(Channel channel, Command command) {
-        Preconditions.checkArgument(CommandType.TASK_KILL_REQUEST == command.getType(), String.format("invalid command type : %s", command.getType()));
-        TaskKillRequestCommand taskKillRequestCommand = FastJsonSerializer.deserialize(command.getBody(), TaskKillRequestCommand.class);
-        logger.info("received command : {}", taskKillRequestCommand);
-
-
-        String contextJson = taskKillRequestCommand.getTaskExecutionContext();
-
-        TaskExecutionContext taskExecutionContext = JSONObject.parseObject(contextJson, TaskExecutionContext.class);
-
-        Boolean killStatus = doKill(taskExecutionContext);
-
-        taskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(),
-                new NettyRemoteChannel(channel, command.getOpaque()));
-
-        TaskKillResponseCommand taskKillResponseCommand = buildKillTaskResponseCommand(taskExecutionContext,killStatus);
-        taskCallbackService.sendResult(taskKillResponseCommand.getTaskInstanceId(), taskKillResponseCommand.convert2Command());
+        return Pair.of(false, appIds);
     }
 
     /**
      * build TaskKillResponseCommand
      *
-     * @param taskExecutionContext taskExecutionContext
-     * @param killStatus killStatus
+     * @param killCommand  kill command
+     * @param result exe result
      * @return build TaskKillResponseCommand
      */
-    private TaskKillResponseCommand buildKillTaskResponseCommand(TaskExecutionContext taskExecutionContext,
-                                                                 Boolean killStatus) {
+    private TaskKillResponseCommand buildKillTaskResponseCommand(TaskKillRequestCommand killCommand,
+                                                                 Pair<Boolean, List<String>> result) {
         TaskKillResponseCommand taskKillResponseCommand = new TaskKillResponseCommand();
-        taskKillResponseCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
-        taskKillResponseCommand.setHost(taskExecutionContext.getHost());
-        taskKillResponseCommand.setStatus(killStatus ? ExecutionStatus.SUCCESS.getCode() : ExecutionStatus.FAILURE.getCode());
-        taskKillResponseCommand.setProcessId(taskExecutionContext.getProcessId());
-        taskKillResponseCommand.setAppIds(appIds);
-
+        taskKillResponseCommand.setStatus(result.getLeft() ? ExecutionStatus.SUCCESS.getCode() : ExecutionStatus.FAILURE.getCode());
+        taskKillResponseCommand.setAppIds(result.getRight());
+        TaskExecutionContext taskExecutionContext = taskExecutionContextCacheManager.getByTaskInstanceId(killCommand.getTaskInstanceId());
+        if(taskExecutionContext != null){
+            taskKillResponseCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
+            taskKillResponseCommand.setHost(taskExecutionContext.getHost());
+            taskKillResponseCommand.setProcessId(taskExecutionContext.getProcessId());
+        }
         return taskKillResponseCommand;
     }
 
@@ -169,8 +159,9 @@ public class TaskKillProcessor implements NettyRequestProcessor {
      * @param logPath logPath
      * @param executePath executePath
      * @param tenantCode tenantCode
+     * @return List<String> appIds
      */
-    private void killYarnJob(String host, String logPath, String executePath, String tenantCode) {
+    private List<String> killYarnJob(String host, String logPath, String executePath, String tenantCode) {
         LogClientService logClient = null;
         try {
             logClient = new LogClientService();
@@ -185,9 +176,9 @@ public class TaskKillProcessor implements NettyRequestProcessor {
                 }
                 if (appIds.size() > 0) {
                     ProcessUtils.cancelApplication(appIds, logger, tenantCode, executePath);
+                    return appIds;
                 }
             }
-
         } catch (Exception e) {
             logger.error("kill yarn job error",e);
         } finally {
@@ -195,6 +186,7 @@ public class TaskKillProcessor implements NettyRequestProcessor {
                 logClient.close();
             }
         }
+        return Collections.EMPTY_LIST;
     }
 
 }