You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by jo...@apache.org on 2020/03/03 14:18:57 UTC
[incubator-dolphinscheduler] branch refactor-worker updated: host
add host:port format (#2070)
This is an automated email from the ASF dual-hosted git repository.
journey pushed a commit to branch refactor-worker
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/refactor-worker by this push:
new 72e8f8d host add host:port format (#2070)
72e8f8d is described below
commit 72e8f8d195ebd47869600017bc220f19088930fb
Author: qiaozhanwei <qi...@outlook.com>
AuthorDate: Tue Mar 3 22:18:51 2020 +0800
host add host:port format (#2070)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
* buildAckCommand taskInstanceId not set modify
* java doc error modify
* add comment
* ExecutorManager interface add generic type
* add TaskInstanceCacheManager receive Worker report result
* TaskInstance setExecutePath
* add TaskInstanceCacheManager to receive Worker Task result report
* TaskInstanceCacheManager add remove method
* add license
* add dispatcht task method
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* taskInstanceCache is null ,need load from db
* taskInstanceCache is null ,need load from db
* taskInstanceCache is null ,need load from db
* 1,worker TaskPros use TaskExecutionContext replase
2,Master kill Task , KillTaskProcessor modify
* worker remove db
* ShellTask modify
* master persistence processId and appIds
* master persistence processId and appIds
* master add kill task logic
* master add kill task logic
* master add kill task logic
* javadoc error modify
* remove chinese log
* executeDirectly method add Override
* remote module modify
* TaskKillResponseProcessor command type modify
* create buildKillCommand
* host add host:port format
* host add host:port format
---
.../api/service/LoggerService.java | 5 +-
.../dolphinscheduler/dao/entity/TaskInstance.java | 10 --
.../builder/TaskExecutionContextBuilder.java | 1 -
.../server/entity/TaskExecutionContext.java | 139 ++++++++++-----------
.../server/master/dispatch/ExecutorDispatcher.java | 1 +
.../server/master/runner/MasterTaskExecThread.java | 4 +-
.../worker/processor/TaskExecuteProcessor.java | 6 +-
7 files changed, 77 insertions(+), 89 deletions(-)
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/LoggerService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/LoggerService.java
index 1f65208..91316af 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/LoggerService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/LoggerService.java
@@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.service.log.LogClientService;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
@@ -68,7 +69,7 @@ public class LoggerService {
return new Result(Status.TASK_INSTANCE_NOT_FOUND.getCode(), Status.TASK_INSTANCE_NOT_FOUND.getMsg());
}
- String host = taskInstance.getHost();
+ String host = Host.of(taskInstance.getHost()).getIp();
if(StringUtils.isEmpty(host)){
return new Result(Status.TASK_INSTANCE_NOT_FOUND.getCode(), Status.TASK_INSTANCE_NOT_FOUND.getMsg());
}
@@ -94,7 +95,7 @@ public class LoggerService {
if (taskInstance == null){
throw new RuntimeException("task instance is null");
}
- String host = taskInstance.getHost();
+ String host = Host.of(taskInstance.getHost()).getIp();
return logClient.getLogBytes(host, Constants.RPC_PORT, taskInstance.getLogPath());
}
}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
index 92cb3af..e444ad2 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
@@ -186,21 +186,11 @@ public class TaskInstance implements Serializable {
@TableField(exist = false)
private String dependentResult;
-
/**
* worker group id
*/
private int workerGroupId;
-
-
- public void init(String host,Date startTime,String executePath){
- this.host = host;
- this.startTime = startTime;
- this.executePath = executePath;
- }
-
-
public ProcessInstance getProcessInstance() {
return processInstance;
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
index 1388e79..34d96aa 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
@@ -48,7 +48,6 @@ public class TaskExecutionContextBuilder {
taskExecutionContext.setLogPath(taskInstance.getLogPath());
taskExecutionContext.setExecutePath(taskInstance.getExecutePath());
taskExecutionContext.setTaskJson(taskInstance.getTaskJson());
- taskExecutionContext.setHost(taskInstance.getHost());
taskExecutionContext.setWorkerGroup(Constants.DEFAULT_WORKER_GROUP);
return this;
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java
index fb3aab9..2348b47 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java
@@ -29,7 +29,7 @@ public class TaskExecutionContext implements Serializable{
/**
* task id
*/
- private Integer taskInstanceId;
+ private int taskInstanceId;
/**
@@ -51,7 +51,7 @@ public class TaskExecutionContext implements Serializable{
* host
*/
private String host;
-
+
/**
* task execute path
*/
@@ -70,7 +70,7 @@ public class TaskExecutionContext implements Serializable{
/**
* processId
*/
- private Integer processId;
+ private int processId;
/**
* appIds
@@ -80,7 +80,7 @@ public class TaskExecutionContext implements Serializable{
/**
* process instance id
*/
- private Integer processInstanceId;
+ private int processInstanceId;
/**
@@ -97,13 +97,13 @@ public class TaskExecutionContext implements Serializable{
/**
* execute user id
*/
- private Integer executorId;
+ private int executorId;
/**
* command type if complement
*/
- private Integer cmdTypeIfComplement;
+ private int cmdTypeIfComplement;
/**
@@ -120,12 +120,12 @@ public class TaskExecutionContext implements Serializable{
/**
* process define id
*/
- private Integer processDefineId;
+ private int processDefineId;
/**
* project id
*/
- private Integer projectId;
+ private int projectId;
/**
* taskParams
@@ -173,22 +173,11 @@ public class TaskExecutionContext implements Serializable{
*/
private DataxTaskExecutionContext dataxTaskExecutionContext;
-
-
-
- public String getWorkerGroup() {
- return workerGroup;
- }
-
- public void setWorkerGroup(String workerGroup) {
- this.workerGroup = workerGroup;
- }
-
- public Integer getTaskInstanceId() {
+ public int getTaskInstanceId() {
return taskInstanceId;
}
- public void setTaskInstanceId(Integer taskInstanceId) {
+ public void setTaskInstanceId(int taskInstanceId) {
this.taskInstanceId = taskInstanceId;
}
@@ -216,6 +205,14 @@ public class TaskExecutionContext implements Serializable{
this.taskType = taskType;
}
+ public String getHost() {
+ return host;
+ }
+
+ public void setHost(String host) {
+ this.host = host;
+ }
+
public String getExecutePath() {
return executePath;
}
@@ -224,6 +221,14 @@ public class TaskExecutionContext implements Serializable{
this.executePath = executePath;
}
+ public String getLogPath() {
+ return logPath;
+ }
+
+ public void setLogPath(String logPath) {
+ this.logPath = logPath;
+ }
+
public String getTaskJson() {
return taskJson;
}
@@ -232,11 +237,27 @@ public class TaskExecutionContext implements Serializable{
this.taskJson = taskJson;
}
- public Integer getProcessInstanceId() {
+ public int getProcessId() {
+ return processId;
+ }
+
+ public void setProcessId(int processId) {
+ this.processId = processId;
+ }
+
+ public String getAppIds() {
+ return appIds;
+ }
+
+ public void setAppIds(String appIds) {
+ this.appIds = appIds;
+ }
+
+ public int getProcessInstanceId() {
return processInstanceId;
}
- public void setProcessInstanceId(Integer processInstanceId) {
+ public void setProcessInstanceId(int processInstanceId) {
this.processInstanceId = processInstanceId;
}
@@ -256,6 +277,22 @@ public class TaskExecutionContext implements Serializable{
this.globalParams = globalParams;
}
+ public int getExecutorId() {
+ return executorId;
+ }
+
+ public void setExecutorId(int executorId) {
+ this.executorId = executorId;
+ }
+
+ public int getCmdTypeIfComplement() {
+ return cmdTypeIfComplement;
+ }
+
+ public void setCmdTypeIfComplement(int cmdTypeIfComplement) {
+ this.cmdTypeIfComplement = cmdTypeIfComplement;
+ }
+
public String getTenantCode() {
return tenantCode;
}
@@ -272,46 +309,22 @@ public class TaskExecutionContext implements Serializable{
this.queue = queue;
}
- public Integer getProcessDefineId() {
+ public int getProcessDefineId() {
return processDefineId;
}
- public void setProcessDefineId(Integer processDefineId) {
+ public void setProcessDefineId(int processDefineId) {
this.processDefineId = processDefineId;
}
- public Integer getProjectId() {
+ public int getProjectId() {
return projectId;
}
- public void setProjectId(Integer projectId) {
+ public void setProjectId(int projectId) {
this.projectId = projectId;
}
- public Integer getExecutorId() {
- return executorId;
- }
-
- public void setExecutorId(Integer executorId) {
- this.executorId = executorId;
- }
-
- public Integer getCmdTypeIfComplement() {
- return cmdTypeIfComplement;
- }
-
- public void setCmdTypeIfComplement(Integer cmdTypeIfComplement) {
- this.cmdTypeIfComplement = cmdTypeIfComplement;
- }
-
- public String getLogPath() {
- return logPath;
- }
-
- public void setLogPath(String logPath) {
- this.logPath = logPath;
- }
-
public String getTaskParams() {
return taskParams;
}
@@ -360,28 +373,12 @@ public class TaskExecutionContext implements Serializable{
this.taskTimeout = taskTimeout;
}
- public String getHost() {
- return host;
- }
-
- public void setHost(String host) {
- this.host = host;
- }
-
- public Integer getProcessId() {
- return processId;
- }
-
- public void setProcessId(Integer processId) {
- this.processId = processId;
- }
-
- public String getAppIds() {
- return appIds;
+ public String getWorkerGroup() {
+ return workerGroup;
}
- public void setAppIds(String appIds) {
- this.appIds = appIds;
+ public void setWorkerGroup(String workerGroup) {
+ this.workerGroup = workerGroup;
}
public SQLTaskExecutionContext getSqlTaskExecutionContext() {
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java
index df563a6..97b489e 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java
@@ -90,6 +90,7 @@ public class ExecutorDispatcher implements InitializingBean {
throw new ExecuteException(String.format("fail to execute : %s due to no worker ", context.getContext()));
}
context.setHost(host);
+ context.getContext().setHost(host.getAddress());
executorManager.beforeExecute(context);
try {
/**
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java
index 51a4f44..a196832 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java
@@ -186,9 +186,7 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
TaskExecutionContext taskExecutionContext = super.getTaskExecutionContext(taskInstance);
ExecutionContext executionContext = new ExecutionContext(taskExecutionContext, ExecutorType.WORKER);
- Host host = new Host();
- host.setIp(taskInstance.getHost());
- host.setPort(12346);
+ Host host = Host.of(taskInstance.getHost());
executionContext.setHost(host);
nettyExecutorManager.executeDirectly(executionContext);
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
index b53ef17..5042c97 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
@@ -78,10 +78,12 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.TASK_EXECUTE_REQUEST == command.getType(),
String.format("invalid command type : %s", command.getType()));
- logger.info("received command : {}", command);
+
TaskExecuteRequestCommand taskRequestCommand = FastJsonSerializer.deserialize(
command.getBody(), TaskExecuteRequestCommand.class);
+ logger.info("received command : {}", taskRequestCommand);
+
String contextJson = taskRequestCommand.getTaskExecutionContext();
TaskExecutionContext taskExecutionContext = JSONObject.parseObject(contextJson, TaskExecutionContext.class);
@@ -141,7 +143,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
ackCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
ackCommand.setStatus(ExecutionStatus.RUNNING_EXEUTION.getCode());
ackCommand.setLogPath(getTaskLogPath(taskExecutionContext));
- ackCommand.setHost(OSUtils.getHost());
+ ackCommand.setHost(taskExecutionContext.getHost());
ackCommand.setStartTime(new Date());
if(taskExecutionContext.getTaskType().equals(TaskType.SQL.name()) || taskExecutionContext.getTaskType().equals(TaskType.PROCEDURE.name())){
ackCommand.setExecutePath(null);