You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by te...@apache.org on 2020/02/21 12:18:12 UTC
[incubator-dolphinscheduler] branch refactor-worker updated: 1,
master persistent task 2. extract master and worker communication
model (#1992)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch refactor-worker
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/refactor-worker by this push:
new ff86dc7 1, master persistent task 2. extract master and worker communication model (#1992)
ff86dc7 is described below
commit ff86dc7d57e42d62c8237a1bb44b16d473d23475
Author: qiaozhanwei <qi...@outlook.com>
AuthorDate: Fri Feb 21 20:18:02 2020 +0800
1, master persistent task 2. extract master and worker communication model (#1992)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
---
.../dolphinscheduler/common/enums/CommandType.java | 9 +
.../dolphinscheduler/dao/entity/TaskInstance.java | 5 +-
.../dao/mapper/DataSourceMapper.java | 2 +
.../remote/command/ExecuteTaskRequestCommand.java | 2 +-
.../dolphinscheduler/remote/command/TaskInfo.java | 250 +++++++++++++++++++++
.../server/master/MasterServer.java | 1 +
.../master/processor/TaskResponseProcessor.java | 7 +
.../master/runner/MasterBaseTaskExecThread.java | 104 ++++++++-
.../worker/processor/WorkerRequestProcessor.java | 64 ++----
.../server/worker/runner/TaskScheduleThread.java | 120 ++++------
10 files changed, 436 insertions(+), 128 deletions(-)
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java
index 1ee7915..56fdd07 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java
@@ -65,4 +65,13 @@ public enum CommandType {
public String getDescp() {
return descp;
}
+
+ public static CommandType of(Integer status){
+ for(CommandType cmdType : values()){
+ if(cmdType.getCode() == status){
+ return cmdType;
+ }
+ }
+ throw new IllegalArgumentException("invalid status : " + status);
+ }
}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
index 2db1eda..c9481ba 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
@@ -27,13 +27,14 @@ import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
+import java.io.Serializable;
import java.util.Date;
/**
* task instance
*/
@TableName("t_ds_task_instance")
-public class TaskInstance {
+public class TaskInstance implements Serializable {
/**
* id
@@ -198,7 +199,7 @@ public class TaskInstance {
- public void init(String host,Date startTime,String executePath){
+ public void init(String host,Date startTime,String executePath){
this.host = host;
this.startTime = startTime;
this.executePath = executePath;
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/DataSourceMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/DataSourceMapper.java
index f95fbc7..0c3238a 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/DataSourceMapper.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/DataSourceMapper.java
@@ -79,8 +79,10 @@ public interface DataSourceMapper extends BaseMapper<DataSource> {
/**
* list authorized UDF function
+ *
* @param userId userId
* @param dataSourceIds data source id array
+ * @param <T> T
* @return UDF function list
*/
<T> List<DataSource> listAuthorizedDataSource(@Param("userId") int userId,@Param("dataSourceIds")T[] dataSourceIds);
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java
index 4d01142..e1556f3 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java
@@ -1 +1 @@
-/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import java.io.Serializable;
/**
* execute task request command
*/
pub
lic class ExecuteTaskRequestCommand implements Serializable {
/**
* task instance json
*/
private String taskInstanceJson;
public String getTaskInstanceJson() {
return taskInstanceJson;
}
public void setTaskInstanceJson(String taskInstanceJson) {
this.taskInstanceJson = taskInstanceJson;
}
public ExecuteTaskRequestCommand() {
}
public ExecuteTaskRequestCommand(String taskInstanceJson) {
this.taskInstanceJson = taskInstanceJson;
}
/**
* package request command
*
* @return command
*/
public Command convert2Command(){
Command command = new Command();
command.setType(CommandType.EXECUTE_TASK_REQUEST);
byte[] body = FastJsonSerializer.serialize(this);
command.setBody(body);
return command;
}
@Override
public String toString() {
return "ExecuteTaskRequestCommand{" +
"taskInstanceJson='" + taskInstanceJson
+ '\'' +
'}';
}
}
\ No newline at end of file
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import java.io.Serializable;
/**
* execute task request command
*/
pub
lic class ExecuteTaskRequestCommand implements Serializable {
/**
* task instance json
*/
private String taskInfoJson;
public String getTaskInfoJson() {
return taskInfoJson;
}
public void setTaskInfoJson(String taskInfoJson) {
this.taskInfoJson = taskInfoJson;
}
public ExecuteTaskRequestCommand() {
}
public ExecuteTaskRequestCommand(String taskInfoJson) {
this.taskInfoJson = taskInfoJson;
}
/**
* package request command
*
* @return command
*/
public Command convert2Command(){
Command command = new Command();
command.setType(CommandType.EXECUTE_TASK_REQUEST);
byte[] body = FastJsonSerializer.serialize(this);
command.setBody(body);
return command;
}
@Override
public String toString() {
return "ExecuteTaskRequestCommand{" +
"taskInfoJson='" + taskInfoJson + '\'' +
'}';
}
}
\ No newline at end of file
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskInfo.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskInfo.java
new file mode 100644
index 0000000..3fb58fe
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskInfo.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.command;
+
+import java.io.Serializable;
+import java.util.Date;
+
+/**
+ * master/worker task transport
+ */
+public class TaskInfo implements Serializable{
+
+ /**
+ * task instance id
+ */
+ private Integer taskId;
+
+
+ /**
+ * taks name
+ */
+ private String taskName;
+
+ /**
+ * task start time
+ */
+ private Date startTime;
+
+ /**
+ * task type
+ */
+ private String taskType;
+
+ /**
+ * task execute path
+ */
+ private String executePath;
+
+ /**
+ * task json
+ */
+ private String taskJson;
+
+
+ /**
+ * process instance id
+ */
+ private Integer processInstanceId;
+
+
+ /**
+ * process instance schedule time
+ */
+ private Date scheduleTime;
+
+ /**
+ * process instance global parameters
+ */
+ private String globalParams;
+
+
+ /**
+ * execute user id
+ */
+ private Integer executorId;
+
+
+ /**
+ * command type if complement
+ */
+ private Integer cmdTypeIfComplement;
+
+
+ /**
+ * tenant code
+ */
+ private String tenantCode;
+
+ /**
+ * task queue
+ */
+ private String queue;
+
+
+ /**
+ * process define id
+ */
+ private Integer processDefineId;
+
+ /**
+ * project id
+ */
+ private Integer projectId;
+
+ public Integer getTaskId() {
+ return taskId;
+ }
+
+ public void setTaskId(Integer taskId) {
+ this.taskId = taskId;
+ }
+
+ public String getTaskName() {
+ return taskName;
+ }
+
+ public void setTaskName(String taskName) {
+ this.taskName = taskName;
+ }
+
+ public Date getStartTime() {
+ return startTime;
+ }
+
+ public void setStartTime(Date startTime) {
+ this.startTime = startTime;
+ }
+
+ public String getTaskType() {
+ return taskType;
+ }
+
+ public void setTaskType(String taskType) {
+ this.taskType = taskType;
+ }
+
+ public String getExecutePath() {
+ return executePath;
+ }
+
+ public void setExecutePath(String executePath) {
+ this.executePath = executePath;
+ }
+
+ public String getTaskJson() {
+ return taskJson;
+ }
+
+ public void setTaskJson(String taskJson) {
+ this.taskJson = taskJson;
+ }
+
+ public Integer getProcessInstanceId() {
+ return processInstanceId;
+ }
+
+ public void setProcessInstanceId(Integer processInstanceId) {
+ this.processInstanceId = processInstanceId;
+ }
+
+ public Date getScheduleTime() {
+ return scheduleTime;
+ }
+
+ public void setScheduleTime(Date scheduleTime) {
+ this.scheduleTime = scheduleTime;
+ }
+
+ public String getGlobalParams() {
+ return globalParams;
+ }
+
+ public void setGlobalParams(String globalParams) {
+ this.globalParams = globalParams;
+ }
+
+ public String getTenantCode() {
+ return tenantCode;
+ }
+
+ public void setTenantCode(String tenantCode) {
+ this.tenantCode = tenantCode;
+ }
+
+ public String getQueue() {
+ return queue;
+ }
+
+ public void setQueue(String queue) {
+ this.queue = queue;
+ }
+
+ public Integer getProcessDefineId() {
+ return processDefineId;
+ }
+
+ public void setProcessDefineId(Integer processDefineId) {
+ this.processDefineId = processDefineId;
+ }
+
+ public Integer getProjectId() {
+ return projectId;
+ }
+
+ public void setProjectId(Integer projectId) {
+ this.projectId = projectId;
+ }
+
+ public Integer getExecutorId() {
+ return executorId;
+ }
+
+ public void setExecutorId(Integer executorId) {
+ this.executorId = executorId;
+ }
+
+ public Integer getCmdTypeIfComplement() {
+ return cmdTypeIfComplement;
+ }
+
+ public void setCmdTypeIfComplement(Integer cmdTypeIfComplement) {
+ this.cmdTypeIfComplement = cmdTypeIfComplement;
+ }
+
+ @Override
+ public String toString() {
+ return "TaskInfo{" +
+ "taskId=" + taskId +
+ ", taskName='" + taskName + '\'' +
+ ", startTime=" + startTime +
+ ", taskType='" + taskType + '\'' +
+ ", executePath='" + executePath + '\'' +
+ ", taskJson='" + taskJson + '\'' +
+ ", processInstanceId=" + processInstanceId +
+ ", scheduleTime=" + scheduleTime +
+ ", globalParams='" + globalParams + '\'' +
+ ", executorId=" + executorId +
+ ", cmdTypeIfComplement=" + cmdTypeIfComplement +
+ ", tenantCode='" + tenantCode + '\'' +
+ ", queue='" + queue + '\'' +
+ ", processDefineId=" + processDefineId +
+ ", projectId=" + projectId +
+ '}';
+ }
+}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
index 0a153e7..d0c7bc2 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
@@ -118,6 +118,7 @@ public class MasterServer implements IStoppable {
//
//init remoting server
NettyServerConfig serverConfig = new NettyServerConfig();
+ serverConfig.setListenPort(45678);
this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
this.nettyRemotingServer.registerProcessor(CommandType.EXECUTE_TASK_RESPONSE, new TaskResponseProcessor(processService));
this.nettyRemotingServer.start();
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
index 0dd45f0..c3b6a05 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
@@ -46,6 +46,13 @@ public class TaskResponseProcessor implements NettyRequestProcessor {
this.processService = processService;
}
+ /**
+ * task final result response
+ * need master process , state persistence
+ *
+ * @param channel channel
+ * @param command command
+ */
@Override
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.EXECUTE_TASK_RESPONSE == command.getType(), String.format("invalid command type : %s", command.getType()));
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
index a261b34..4ae057a 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
@@ -17,14 +17,18 @@
package org.apache.dolphinscheduler.server.master.runner;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.common.utils.FileUtils;
+import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.utils.BeanContext;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand;
+import org.apache.dolphinscheduler.remote.command.TaskInfo;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
import org.apache.dolphinscheduler.remote.utils.Address;
@@ -124,11 +128,23 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
// TODO send task to worker
public void sendToWorker(TaskInstance taskInstance){
final Address address = new Address("127.0.0.1", 12346);
- ExecuteTaskRequestCommand taskRequestCommand = new ExecuteTaskRequestCommand(FastJsonSerializer.serializeToString(taskInstance));
+
+ /**
+ * set taskInstance relation
+ */
+ TaskInstance destTaskInstance = setTaskInstanceRelation(taskInstance);
+
+ ExecuteTaskRequestCommand taskRequestCommand = new ExecuteTaskRequestCommand(
+ FastJsonSerializer.serializeToString(convertToTaskInfo(destTaskInstance)));
try {
- Command responseCommand = nettyRemotingClient.sendSync(address, taskRequestCommand.convert2Command(), Integer.MAX_VALUE);
- ExecuteTaskAckCommand taskAckCommand = FastJsonSerializer.deserialize(responseCommand.getBody(), ExecuteTaskAckCommand.class);
+ Command responseCommand = nettyRemotingClient.sendSync(address,
+ taskRequestCommand.convert2Command(), Integer.MAX_VALUE);
+
+ ExecuteTaskAckCommand taskAckCommand = FastJsonSerializer.deserialize(
+ responseCommand.getBody(), ExecuteTaskAckCommand.class);
+
logger.info("taskAckCommand : {}",taskAckCommand);
+
processService.changeTaskState(ExecutionStatus.of(taskAckCommand.getStatus()),
taskAckCommand.getStartTime(),
taskAckCommand.getHost(),
@@ -141,6 +157,88 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
}
}
+
+ /**
+ * set task instance relation
+ *
+ * @param taskInstance taskInstance
+ */
+ private TaskInstance setTaskInstanceRelation(TaskInstance taskInstance){
+ taskInstance = processService.getTaskInstanceDetailByTaskId(taskInstance.getId());
+
+ int userId = taskInstance.getProcessDefine() == null ? 0 : taskInstance.getProcessDefine().getUserId();
+ Tenant tenant = processService.getTenantForProcess(taskInstance.getProcessInstance().getTenantId(), userId);
+ // verify tenant is null
+ if (verifyTenantIsNull(tenant, taskInstance)) {
+ processService.changeTaskState(ExecutionStatus.FAILURE, taskInstance.getStartTime(), taskInstance.getHost(), null, null, taskInstance.getId());
+ return null;
+ }
+ // set queue for process instance, user-specified queue takes precedence over tenant queue
+ String userQueue = processService.queryUserQueueByProcessInstanceId(taskInstance.getProcessInstanceId());
+ taskInstance.getProcessInstance().setQueue(StringUtils.isEmpty(userQueue) ? tenant.getQueue() : userQueue);
+ taskInstance.getProcessInstance().setTenantCode(tenant.getTenantCode());
+
+ return taskInstance;
+ }
+
+
+ /**
+ * whehter tenant is null
+ * @param tenant tenant
+ * @param taskInstance taskInstance
+ * @return result
+ */
+ private boolean verifyTenantIsNull(Tenant tenant, TaskInstance taskInstance) {
+ if(tenant == null){
+ logger.error("tenant not exists,process instance id : {},task instance id : {}",
+ taskInstance.getProcessInstance().getId(),
+ taskInstance.getId());
+ return true;
+ }
+ return false;
+ }
+
+
+ /**
+ * taskInstance convert to taskInfo
+ *
+ * @param taskInstance taskInstance
+ * @return taskInfo
+ */
+ private TaskInfo convertToTaskInfo(TaskInstance taskInstance){
+ TaskInfo taskInfo = new TaskInfo();
+ taskInfo.setTaskId(taskInstance.getId());
+ taskInfo.setTaskName(taskInstance.getName());
+ taskInfo.setStartTime(taskInstance.getStartTime());
+ taskInfo.setTaskType(taskInstance.getTaskType());
+ taskInfo.setExecutePath(getExecLocalPath(taskInstance));
+ taskInfo.setTaskJson(taskInstance.getTaskJson());
+ taskInfo.setProcessInstanceId(taskInstance.getProcessInstance().getId());
+ taskInfo.setScheduleTime(taskInstance.getProcessInstance().getScheduleTime());
+ taskInfo.setGlobalParams(taskInstance.getProcessInstance().getGlobalParams());
+ taskInfo.setExecutorId(taskInstance.getProcessInstance().getExecutorId());
+ taskInfo.setCmdTypeIfComplement(taskInstance.getProcessInstance().getCmdTypeIfComplement().getCode());
+ taskInfo.setTenantCode(taskInstance.getProcessInstance().getTenantCode());
+ taskInfo.setQueue(taskInstance.getProcessInstance().getQueue());
+ taskInfo.setProcessDefineId(taskInstance.getProcessDefine().getId());
+ taskInfo.setProjectId(taskInstance.getProcessDefine().getProjectId());
+
+ return taskInfo;
+ }
+
+
+ /**
+ * get execute local path
+ *
+ * @return execute local path
+ */
+ private String getExecLocalPath(TaskInstance taskInstance){
+ return FileUtils.getProcessExecDir(taskInstance.getProcessDefine().getProjectId(),
+ taskInstance.getProcessDefine().getId(),
+ taskInstance.getProcessInstance().getId(),
+ taskInstance.getId());
+ }
+
/**
* submit master base task exec thread
* @return TaskInstance
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerRequestProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerRequestProcessor.java
index 8ea9ccb..ba21494 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerRequestProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerRequestProcessor.java
@@ -30,7 +30,7 @@ import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand;
-import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
+import org.apache.dolphinscheduler.remote.command.TaskInfo;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
@@ -85,71 +85,39 @@ public class WorkerRequestProcessor implements NettyRequestProcessor {
ExecuteTaskRequestCommand taskRequestCommand = FastJsonSerializer.deserialize(
command.getBody(), ExecuteTaskRequestCommand.class);
- String taskInstanceJson = taskRequestCommand.getTaskInstanceJson();
+ String taskInstanceJson = taskRequestCommand.getTaskInfoJson();
- TaskInstance taskInstance = JSONObject.parseObject(taskInstanceJson, TaskInstance.class);
-
- taskInstance = processService.getTaskInstanceDetailByTaskId(taskInstance.getId());
-
-
- //TODO this logic need add to master
- int userId = taskInstance.getProcessDefine() == null ? 0 : taskInstance.getProcessDefine().getUserId();
- Tenant tenant = processService.getTenantForProcess(taskInstance.getProcessInstance().getTenantId(), userId);
- // verify tenant is null
- if (verifyTenantIsNull(tenant, taskInstance)) {
- processService.changeTaskState(ExecutionStatus.FAILURE, taskInstance.getStartTime(), taskInstance.getHost(), null, null, taskInstance.getId());
- return;
- }
- // set queue for process instance, user-specified queue takes precedence over tenant queue
- String userQueue = processService.queryUserQueueByProcessInstanceId(taskInstance.getProcessInstanceId());
- taskInstance.getProcessInstance().setQueue(StringUtils.isEmpty(userQueue) ? tenant.getQueue() : userQueue);
- taskInstance.getProcessInstance().setTenantCode(tenant.getTenantCode());
- //TODO end
+ TaskInfo taskInfo = JSONObject.parseObject(taskInstanceJson, TaskInfo.class);
// local execute path
- String execLocalPath = getExecLocalPath(taskInstance);
+ String execLocalPath = getExecLocalPath(taskInfo);
logger.info("task instance local execute path : {} ", execLocalPath);
- // init task
- taskInstance.init(OSUtils.getHost(), new Date(), execLocalPath);
+
try {
- FileUtils.createWorkDirAndUserIfAbsent(execLocalPath, tenant.getTenantCode());
+ FileUtils.createWorkDirAndUserIfAbsent(execLocalPath, taskInfo.getTenantCode());
} catch (Exception ex){
logger.error(String.format("create execLocalPath : %s", execLocalPath), ex);
}
- taskCallbackService.addCallbackChannel(taskInstance.getId(),
+ taskCallbackService.addCallbackChannel(taskInfo.getTaskId(),
new CallbackChannel(channel, command.getOpaque()));
// submit task
- workerExecService.submit(new TaskScheduleThread(taskInstance,
+ workerExecService.submit(new TaskScheduleThread(taskInfo,
processService, taskCallbackService));
}
- /**
- * whehter tenant is null
- * @param tenant tenant
- * @param taskInstance taskInstance
- * @return result
- */
- private boolean verifyTenantIsNull(Tenant tenant, TaskInstance taskInstance) {
- if(tenant == null){
- logger.error("tenant not exists,process instance id : {},task instance id : {}",
- taskInstance.getProcessInstance().getId(),
- taskInstance.getId());
- return true;
- }
- return false;
- }
/**
- * get execute local path
- * @param taskInstance taskInstance
+ * get execute local path
+ *
+ * @param taskInfo taskInfo
* @return execute local path
*/
- private String getExecLocalPath(TaskInstance taskInstance){
- return FileUtils.getProcessExecDir(taskInstance.getProcessDefine().getProjectId(),
- taskInstance.getProcessDefine().getId(),
- taskInstance.getProcessInstance().getId(),
- taskInstance.getId());
+ private String getExecLocalPath(TaskInfo taskInfo){
+ return FileUtils.getProcessExecDir(taskInfo.getProjectId(),
+ taskInfo.getProcessDefineId(),
+ taskInfo.getProcessInstanceId(),
+ taskInfo.getTaskId());
}
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java
index 349e762..0de8ea3 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java
@@ -22,6 +22,7 @@ import ch.qos.logback.classic.sift.SiftingAppender;
import com.alibaba.fastjson.JSONObject;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
+import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.log.TaskLogDiscriminator;
@@ -29,14 +30,12 @@ import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter;
-import org.apache.dolphinscheduler.common.utils.CommonUtils;
-import org.apache.dolphinscheduler.common.utils.HadoopUtils;
-import org.apache.dolphinscheduler.common.utils.LoggerUtils;
-import org.apache.dolphinscheduler.common.utils.TaskParametersUtils;
+import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
+import org.apache.dolphinscheduler.remote.command.TaskInfo;
import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.TaskManager;
@@ -64,7 +63,7 @@ public class TaskScheduleThread implements Runnable {
/**
* task instance
*/
- private TaskInstance taskInstance;
+ private TaskInfo taskInfo;
/**
* process service
@@ -82,72 +81,69 @@ public class TaskScheduleThread implements Runnable {
private TaskCallbackService taskInstanceCallbackService;
/**
- * constructor
+ * constructor
*
- * @param taskInstance task instance
- * @param processService process dao
+ * @param taskInfo taskInfo
+ * @param processService processService
+ * @param taskInstanceCallbackService taskInstanceCallbackService
*/
- public TaskScheduleThread(TaskInstance taskInstance, ProcessService processService, TaskCallbackService taskInstanceCallbackService){
+ public TaskScheduleThread(TaskInfo taskInfo, ProcessService processService, TaskCallbackService taskInstanceCallbackService){
this.processService = processService;
- this.taskInstance = taskInstance;
+ this.taskInfo = taskInfo;
this.taskInstanceCallbackService = taskInstanceCallbackService;
}
@Override
public void run() {
- ExecuteTaskResponseCommand responseCommand = new ExecuteTaskResponseCommand(taskInstance.getId());
+ ExecuteTaskResponseCommand responseCommand = new ExecuteTaskResponseCommand(taskInfo.getTaskId());
try {
// tell master that task is in executing
- ExecuteTaskAckCommand ackCommand = buildAckCommand(taskInstance.getTaskType());
- taskInstanceCallbackService.sendAck(taskInstance.getId(), ackCommand);
+ ExecuteTaskAckCommand ackCommand = buildAckCommand(taskInfo.getTaskType());
+ taskInstanceCallbackService.sendAck(taskInfo.getTaskId(), ackCommand);
- logger.info("script path : {}", taskInstance.getExecutePath());
+ logger.info("script path : {}", taskInfo.getExecutePath());
// task node
- TaskNode taskNode = JSONObject.parseObject(taskInstance.getTaskJson(), TaskNode.class);
+ TaskNode taskNode = JSONObject.parseObject(taskInfo.getTaskJson(), TaskNode.class);
// get resource files
List<String> resourceFiles = createProjectResFiles(taskNode);
// copy hdfs/minio file to local
downloadResource(
- taskInstance.getExecutePath(),
+ taskInfo.getExecutePath(),
resourceFiles,
logger);
-
- // get process instance according to tak instance
- ProcessInstance processInstance = taskInstance.getProcessInstance();
-
// set task props
TaskProps taskProps = new TaskProps(taskNode.getParams(),
- taskInstance.getExecutePath(),
- processInstance.getScheduleTime(),
- taskInstance.getName(),
- taskInstance.getTaskType(),
- taskInstance.getId(),
+ taskInfo.getExecutePath(),
+ taskInfo.getScheduleTime(),
+ taskInfo.getTaskName(),
+ taskInfo.getTaskType(),
+ taskInfo.getTaskId(),
CommonUtils.getSystemEnvPath(),
- processInstance.getTenantCode(),
- processInstance.getQueue(),
- taskInstance.getStartTime(),
+ taskInfo.getTenantCode(),
+ taskInfo.getQueue(),
+ taskInfo.getStartTime(),
getGlobalParamsMap(),
- taskInstance.getDependency(),
- processInstance.getCmdTypeIfComplement());
+ null,
+ CommandType.of(taskInfo.getCmdTypeIfComplement()));
// set task timeout
setTaskTimeout(taskProps, taskNode);
taskProps.setTaskAppId(String.format("%s_%s_%s",
- taskInstance.getProcessDefine().getId(),
- taskInstance.getProcessInstance().getId(),
- taskInstance.getId()));
+ taskInfo.getProcessDefineId(),
+ taskInfo.getProcessInstanceId(),
+ taskInfo.getTaskId()));
// custom logger
Logger taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
- taskInstance.getProcessDefine().getId(),
- taskInstance.getProcessInstance().getId(),
- taskInstance.getId()));
+ taskInfo.getProcessDefineId(),
+ taskInfo.getProcessInstanceId(),
+ taskInfo.getTaskId()));
- task = TaskManager.newTask(taskInstance.getTaskType(),
+ task = TaskManager.newTask(taskInfo.getTaskType(),
taskProps,
taskLogger);
@@ -163,14 +159,14 @@ public class TaskScheduleThread implements Runnable {
//
responseCommand.setStatus(task.getExitStatus().getCode());
responseCommand.setEndTime(new Date());
- logger.info("task instance id : {},task final status : {}", taskInstance.getId(), task.getExitStatus());
+ logger.info("task instance id : {},task final status : {}", taskInfo.getTaskId(), task.getExitStatus());
}catch (Exception e){
logger.error("task scheduler failure", e);
kill();
responseCommand.setStatus(ExecutionStatus.FAILURE.getCode());
responseCommand.setEndTime(new Date());
} finally {
- taskInstanceCallbackService.sendResult(taskInstance.getId(), responseCommand);
+ taskInstanceCallbackService.sendResult(taskInfo.getTaskId(), responseCommand);
}
}
@@ -182,7 +178,7 @@ public class TaskScheduleThread implements Runnable {
Map<String,String> globalParamsMap = new HashMap<>(16);
// global params string
- String globalParamsStr = taskInstance.getProcessInstance().getGlobalParams();
+ String globalParamsStr = taskInfo.getGlobalParams();
if (globalParamsStr != null) {
List<Property> globalParamsList = JSONObject.parseArray(globalParamsStr, Property.class);
@@ -192,18 +188,18 @@ public class TaskScheduleThread implements Runnable {
}
/**
* build ack command
- * @param taskType
+ * @param taskType taskType
*/
private ExecuteTaskAckCommand buildAckCommand(String taskType) {
ExecuteTaskAckCommand ackCommand = new ExecuteTaskAckCommand();
ackCommand.setStatus(ExecutionStatus.RUNNING_EXEUTION.getCode());
ackCommand.setLogPath(getTaskLogPath());
- ackCommand.setHost("localhost");
+ ackCommand.setHost(OSUtils.getHost());
ackCommand.setStartTime(new Date());
if(taskType.equals(TaskType.SQL.name()) || taskType.equals(TaskType.PROCEDURE.name())){
ackCommand.setExecutePath(null);
}else{
- ackCommand.setExecutePath(taskInstance.getExecutePath());
+ ackCommand.setExecutePath(taskInfo.getExecutePath());
}
return ackCommand;
}
@@ -219,15 +215,15 @@ public class TaskScheduleThread implements Runnable {
.getDiscriminator()).getLogBase();
if (baseLog.startsWith(Constants.SINGLE_SLASH)){
return baseLog + Constants.SINGLE_SLASH +
- taskInstance.getProcessDefinitionId() + Constants.SINGLE_SLASH +
- taskInstance.getProcessInstanceId() + Constants.SINGLE_SLASH +
- taskInstance.getId() + ".log";
+ taskInfo.getProcessDefineId() + Constants.SINGLE_SLASH +
+ taskInfo.getProcessInstanceId() + Constants.SINGLE_SLASH +
+ taskInfo.getTaskId() + ".log";
}
return System.getProperty("user.dir") + Constants.SINGLE_SLASH +
baseLog + Constants.SINGLE_SLASH +
- taskInstance.getProcessDefinitionId() + Constants.SINGLE_SLASH +
- taskInstance.getProcessInstanceId() + Constants.SINGLE_SLASH +
- taskInstance.getId() + ".log";
+ taskInfo.getProcessDefineId() + Constants.SINGLE_SLASH +
+ taskInfo.getProcessInstanceId() + Constants.SINGLE_SLASH +
+ taskInfo.getTaskId() + ".log";
}
/**
@@ -333,33 +329,9 @@ public class TaskScheduleThread implements Runnable {
* @throws Exception exception
*/
private void checkDownloadPermission(List<String> projectRes) throws Exception {
- int userId = taskInstance.getProcessInstance().getExecutorId();
+ int userId = taskInfo.getExecutorId();
String[] resNames = projectRes.toArray(new String[projectRes.size()]);
PermissionCheck<String> permissionCheck = new PermissionCheck<>(AuthorizationType.RESOURCE_FILE, processService,resNames,userId,logger);
permissionCheck.checkPermission();
}
-
- /**
- * update task state according to task type
- * @param taskType
- */
- private void updateTaskState(String taskType) {
- // update task status is running
- if(taskType.equals(TaskType.SQL.name()) ||
- taskType.equals(TaskType.PROCEDURE.name())){
- processService.changeTaskState(ExecutionStatus.RUNNING_EXEUTION,
- taskInstance.getStartTime(),
- taskInstance.getHost(),
- null,
- getTaskLogPath(),
- taskInstance.getId());
- }else{
- processService.changeTaskState(ExecutionStatus.RUNNING_EXEUTION,
- taskInstance.getStartTime(),
- taskInstance.getHost(),
- taskInstance.getExecutePath(),
- getTaskLogPath(),
- taskInstance.getId());
- }
- }
}
\ No newline at end of file