You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by jo...@apache.org on 2020/03/09 08:09:31 UTC
[incubator-dolphinscheduler] branch refactor-worker updated:
cancelTaskInstance set TaskExecutionContext host, logPath,
executePath (#2126)
This is an automated email from the ASF dual-hosted git repository.
journey pushed a commit to branch refactor-worker
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/refactor-worker by this push:
new 20af8ff cancelTaskInstance set TaskExecutionContext host,logPath,executePath (#2126)
20af8ff is described below
commit 20af8ff93e7d41485def1adaec90ba79065bbc46
Author: qiaozhanwei <qi...@outlook.com>
AuthorDate: Mon Mar 9 16:09:25 2020 +0800
cancelTaskInstance set TaskExecutionContext host,logPath,executePath (#2126)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
* buildAckCommand taskInstanceId not set modify
* java doc error modify
* add comment
* ExecutorManager interface add generic type
* add TaskInstanceCacheManager receive Worker report result
* TaskInstance setExecutePath
* add TaskInstanceCacheManager to receive Worker Task result report
* TaskInstanceCacheManager add remove method
* add license
* add dispatcht task method
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* taskInstanceCache is null ,need load from db
* taskInstanceCache is null ,need load from db
* taskInstanceCache is null ,need load from db
* 1,worker TaskPros use TaskExecutionContext replase
2,Master kill Task , KillTaskProcessor modify
* worker remove db
* ShellTask modify
* master persistence processId and appIds
* master persistence processId and appIds
* master add kill task logic
* master add kill task logic
* master add kill task logic
* javadoc error modify
* remove chinese log
* executeDirectly method add Override
* remote module modify
* TaskKillResponseProcessor command type modify
* create buildKillCommand
* host add host:port format
* host add host:port format
* TaskAckProcessor modify
* TaskAckProcessor modify
* task prioriry refator
* remove ITaskQueue
* task prioriry refator
* remove ITaskQueue
* TaskPriority refactor
* remove logs
* WorkerServer refactor
* MasterSchedulerService modify
* WorkerConfig listen port modify
* modify master and worker listen port
* cancelTaskInstance set TaskExecutionContext host,logPath,executePath
* cancelTaskInstance set TaskExecutionContext host,logPath,executePath
Co-authored-by: qiaozhanwei <qi...@analysys.com.cn>
---
.../server/master/config/MasterConfig.java | 2 +-
.../master/runner/MasterSchedulerService.java | 12 ++++--
.../server/master/runner/MasterTaskExecThread.java | 3 ++
.../server/utils/ProcessUtils.java | 8 ++--
.../server/worker/config/WorkerConfig.java | 2 +-
.../server/worker/processor/TaskKillProcessor.java | 46 ++++++++++++----------
6 files changed, 44 insertions(+), 29 deletions(-)
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
index 7e6ae56..86dd1c9 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
@@ -46,7 +46,7 @@ public class MasterConfig {
@Value("${master.host.selector:lowerWeight}")
private String hostSelector;
- @Value("${master.listen.port:45678}")
+ @Value("${master.listen.port:5678}")
private int listenPort;
public int getListenPort() {
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
index 6949ada..c1925e0 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
@@ -34,6 +34,7 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
+import javax.annotation.PostConstruct;
import java.util.concurrent.ThreadPoolExecutor;
/**
@@ -59,28 +60,33 @@ public class MasterSchedulerService extends Thread {
@Autowired
private ZKMasterClient zkMasterClient;
+ /**
+ * master config
+ */
@Autowired
private MasterConfig masterConfig;
/**
* netty remoting client
*/
- private final NettyRemotingClient nettyRemotingClient;
+ private NettyRemotingClient nettyRemotingClient;
/**
* master exec service
*/
- private final ThreadPoolExecutor masterExecService;
+ private ThreadPoolExecutor masterExecService;
/**
* constructor of MasterSchedulerThread
*/
- public MasterSchedulerService(){
+ @PostConstruct
+ public void init(){
this.masterExecService = (ThreadPoolExecutor)ThreadUtils.newDaemonFixedThreadExecutor("Master-Exec-Thread", masterConfig.getMasterExecThreads());
NettyClientConfig clientConfig = new NettyClientConfig();
this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
}
+ @Override
public void start(){
super.setName("MasterSchedulerThread");
super.start();
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java
index 1197dc2..172794e 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java
@@ -184,6 +184,9 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
TaskExecutionContext taskExecutionContext = new TaskExecutionContext();
taskExecutionContext.setTaskInstanceId(taskInstance.getId());
+ taskExecutionContext.setHost(taskInstance.getHost());
+ taskExecutionContext.setLogPath(taskInstance.getLogPath());
+ taskExecutionContext.setExecutePath(taskInstance.getExecutePath());
taskExecutionContext.setProcessId(taskInstance.getPid());
ExecutionContext executionContext = new ExecutionContext(taskExecutionContext.toKillCommand(), ExecutorType.WORKER, taskExecutionContext.getWorkerGroup());
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java
index ee1f091..f29e7df 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java
@@ -273,15 +273,15 @@ public class ProcessUtils {
* @param appIds app id list
* @param logger logger
* @param tenantCode tenant code
- * @param workDir work dir
+ * @param executePath execute path
* @throws IOException io exception
*/
- public static void cancelApplication(List<String> appIds, Logger logger, String tenantCode,String workDir)
+ public static void cancelApplication(List<String> appIds, Logger logger, String tenantCode,String executePath)
throws IOException {
if (appIds.size() > 0) {
String appid = appIds.get(appIds.size() - 1);
String commandFile = String
- .format("%s/%s.kill", workDir, appid);
+ .format("%s/%s.kill", executePath, appid);
String cmd = "yarn application -kill " + appid;
try {
StringBuilder sb = new StringBuilder();
@@ -309,7 +309,7 @@ public class ProcessUtils {
Runtime.getRuntime().exec(runCmd);
} catch (Exception e) {
- logger.error("kill application failed", e);
+ logger.error("kill application error", e);
}
}
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
index f3e701b..792f922 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
@@ -40,7 +40,7 @@ public class WorkerConfig {
@Value("${worker.group: default}")
private String workerGroup;
- @Value("${worker.listen.port: 12345}")
+ @Value("${worker.listen.port: 1234}")
private int listenPort;
public int getListenPort() {
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
index c23f199..5a8c668 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
@@ -31,6 +31,7 @@ import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand;
import org.apache.dolphinscheduler.remote.command.TaskKillResponseCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
+import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCacheManager;
@@ -104,15 +105,24 @@ public class TaskKillProcessor implements NettyRequestProcessor {
// find log and kill yarn job
- killYarnJob(context.getHost(), context.getLogPath(), context.getExecutePath(), context.getTenantCode());
+ killYarnJob(Host.of(context.getHost()).getIp(),
+ context.getLogPath(),
+ context.getExecutePath(),
+ context.getTenantCode());
return true;
} catch (Exception e) {
- logger.error("kill task failed", e);
+ logger.error("kill task error", e);
return false;
}
}
+ /**
+ * task kill process
+ *
+ * @param channel channel channel
+ * @param command command command
+ */
@Override
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.TASK_KILL_REQUEST == command.getType(), String.format("invalid command type : %s", command.getType()));
@@ -160,26 +170,18 @@ public class TaskKillProcessor implements NettyRequestProcessor {
* @param executePath executePath
* @param tenantCode tenantCode
*/
- public void killYarnJob(String host, String logPath, String executePath, String tenantCode) {
- List<String> appIds = null;
+ private void killYarnJob(String host, String logPath, String executePath, String tenantCode) {
+ LogClientService logClient = null;
try {
- Thread.sleep(Constants.SLEEP_TIME_MILLIS);
- LogClientService logClient = null;
- String log = null;
- try {
- logClient = new LogClientService();
- logger.info("view log host : {},logPath : {}", host,logPath);
- log = logClient.viewLog(host, Constants.RPC_PORT, logPath);
- } finally {
- if(logClient != null){
- logClient.close();
- }
- }
+ logClient = new LogClientService();
+ logger.info("view log host : {},logPath : {}", host,logPath);
+ String log = logClient.viewLog(host, Constants.RPC_PORT, logPath);
+
if (StringUtils.isNotEmpty(log)) {
- appIds = LoggerUtils.getAppIds(log, logger);
+ List<String> appIds = LoggerUtils.getAppIds(log, logger);
if (StringUtils.isEmpty(executePath)) {
- logger.error("task instance work dir is empty");
- throw new RuntimeException("task instance work dir is empty");
+ logger.error("task instance execute path is empty");
+ throw new RuntimeException("task instance execute path is empty");
}
if (appIds.size() > 0) {
ProcessUtils.cancelApplication(appIds, logger, tenantCode, executePath);
@@ -187,7 +189,11 @@ public class TaskKillProcessor implements NettyRequestProcessor {
}
} catch (Exception e) {
- logger.error("kill yarn job failure",e);
+ logger.error("kill yarn job error",e);
+ } finally {
+ if(logClient != null){
+ logClient.close();
+ }
}
}