You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2022/07/19 08:16:35 UTC
[dolphinscheduler] 03/03: [Feature-10871] add workflow executing data query (#10875)
This is an automated email from the ASF dual-hosted git repository.
caishunfeng pushed a commit to branch 3.0.0-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
commit b9e3187ad5f561601bfe2a29dc227e6530fb2b22
Author: caishunfeng <ca...@gmail.com>
AuthorDate: Mon Jul 11 16:55:55 2022 +0800
[Feature-10871] add workflow executing data query (#10875)
* add workflow executing data query
* fix sonar check for interrupted
---
.../api/controller/ExecutorController.java | 35 ++++-
.../api/controller/ProcessInstanceController.java | 5 +-
.../apache/dolphinscheduler/api/enums/Status.java | 1 +
.../api/service/ExecutorService.java | 8 ++
.../api/service/impl/ExecutorServiceImpl.java | 25 ++++
.../service/impl/ProcessInstanceServiceImpl.java | 1 -
.../controller/WorkflowExecuteController.java | 51 +++++++
.../WorkflowExecutingDataRequestProcessor.java | 65 +++++++++
.../server/master/rpc/MasterRPCServer.java | 5 +
.../master/runner/WorkflowExecuteThreadPool.java | 9 +-
.../master/runner/task/SubTaskProcessor.java | 5 +-
.../server/master/service/ExecutingService.java | 75 ++++++++++
.../remote/command/CommandType.java | 12 +-
.../WorkflowExecutingDataRequestCommand.java | 50 +++++++
.../WorkflowExecutingDataResponseCommand.java | 51 +++++++
.../remote/dto/TaskInstanceExecuteDto.java | 109 +++++++++++++++
.../remote/dto/WorkflowExecuteDto.java | 154 +++++++++++++++++++++
.../processor/StateEventCallbackService.java | 36 +++--
.../service/process/ProcessServiceImpl.java | 15 +-
19 files changed, 673 insertions(+), 39 deletions(-)
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
index 0a6fed5072..6532dbda1c 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
@@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.api.controller;
import static org.apache.dolphinscheduler.api.enums.Status.CHECK_PROCESS_DEFINITION_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.EXECUTE_PROCESS_INSTANCE_ERROR;
+import static org.apache.dolphinscheduler.api.enums.Status.QUERY_EXECUTING_WORKFLOW_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.START_PROCESS_INSTANCE_ERROR;
import org.apache.dolphinscheduler.api.aspect.AccessLogAnnotation;
@@ -37,9 +38,21 @@ import org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.User;
+import org.apache.dolphinscheduler.remote.dto.WorkflowExecuteDto;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
+import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestAttribute;
@@ -55,12 +68,6 @@ import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import springfox.documentation.annotations.ApiIgnore;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
/**
* executor controller
@@ -303,4 +310,20 @@ public class ExecutorController extends BaseController {
Map<String, Object> result = execService.startCheckByProcessDefinedCode(processDefinitionCode);
return returnDataList(result);
}
+
+ /**
+ * query execute data of processInstance from master
+ */
+ @ApiOperation(value = "queryExecutingWorkflow", notes = "QUERY_WORKFLOW_EXECUTE_DATA")
+ @ApiImplicitParams({
+ @ApiImplicitParam(name = "processInstanceId", value = "PROCESS_INSTANCE_ID", required = true, dataType = "Int", example = "100")
+ })
+ @GetMapping(value = "/query-executing-workflow")
+ @ResponseStatus(HttpStatus.OK)
+ @ApiException(QUERY_EXECUTING_WORKFLOW_ERROR)
+ @AccessLogAnnotation
+ public Result queryExecutingWorkflow(@RequestParam("id") Integer processInstanceId) {
+ WorkflowExecuteDto workflowExecuteDto = execService.queryExecutingWorkflowByProcessInstanceId(processInstanceId);
+ return Result.success(workflowExecuteDto);
+ }
}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java
index 2484d9ec7b..2000b80245 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java
@@ -43,7 +43,10 @@ import org.apache.commons.lang.StringUtils;
import java.io.IOException;
import java.text.MessageFormat;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
index 3315ef671e..7017fa6d95 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
@@ -216,6 +216,7 @@ public enum Status {
QUERY_AUTHORIZED_USER(10183, "query authorized user error", "查询拥有项目权限的用户错误"),
PROJECT_NOT_EXIST(10190, "This project was not found. Please refresh page.", "该项目不存在,请刷新页面"),
TASK_INSTANCE_HOST_IS_NULL(10191, "task instance host is null", "任务实例host为空"),
+ QUERY_EXECUTING_WORKFLOW_ERROR(10192, "query executing workflow error", "查询运行的工作流实例错误"),
UDF_FUNCTION_NOT_EXIST(20001, "UDF function not found", "UDF函数不存在"),
UDF_FUNCTION_EXISTS(20002, "UDF function already exists", "UDF函数已存在"),
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
index 2d5868b251..b5cf04bd5e 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
@@ -27,6 +27,7 @@ import org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.User;
+import org.apache.dolphinscheduler.remote.dto.WorkflowExecuteDto;
import java.util.Map;
@@ -111,4 +112,11 @@ public interface ExecutorService {
* @return
*/
Map<String, Object> forceStartTaskInstance(User loginUser, int queueId);
+
+ /**
+ * query executing workflow data in Master memory
+ * @param processInstanceId
+ * @return
+ */
+ WorkflowExecuteDto queryExecutingWorkflowByProcessInstanceId(Integer processInstanceId);
}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
index b58cc9dbcd..da9f9a742e 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
@@ -64,6 +64,9 @@ import org.apache.dolphinscheduler.dao.mapper.TaskGroupQueueMapper;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
+import org.apache.dolphinscheduler.remote.command.WorkflowExecutingDataRequestCommand;
+import org.apache.dolphinscheduler.remote.command.WorkflowExecutingDataResponseCommand;
+import org.apache.dolphinscheduler.remote.dto.WorkflowExecuteDto;
import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.service.process.ProcessService;
@@ -851,4 +854,26 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
return validDependentProcessDefinitionList;
}
+
+ /**
+ * query executing data of processInstance by master
+ * @param processInstanceId
+ * @return
+ */
+ @Override
+ public WorkflowExecuteDto queryExecutingWorkflowByProcessInstanceId(Integer processInstanceId) {
+ ProcessInstance processInstance = processService.findProcessInstanceDetailById(processInstanceId);
+ if (processInstance == null) {
+ return null;
+ }
+ Host host = new Host(processInstance.getHost());
+ WorkflowExecutingDataRequestCommand requestCommand = new WorkflowExecutingDataRequestCommand();
+ requestCommand.setProcessInstanceId(processInstanceId);
+ org.apache.dolphinscheduler.remote.command.Command command = stateEventCallbackService.sendSync(host, requestCommand.convert2Command());
+ if (command == null) {
+ return null;
+ }
+ WorkflowExecutingDataResponseCommand responseCommand = JSONUtils.parseObject(command.getBody(), WorkflowExecutingDataResponseCommand.class);
+ return responseCommand.getWorkflowExecuteDto();
+ }
}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
index ae570bc725..dd17255fdb 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
@@ -829,5 +829,4 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
public List<ProcessInstance> queryByProcessDefineCode(Long processDefinitionCode, int size) {
return processInstanceMapper.queryByProcessDefineCode(processDefinitionCode, size);
}
-
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/controller/WorkflowExecuteController.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/controller/WorkflowExecuteController.java
new file mode 100644
index 0000000000..793dc40439
--- /dev/null
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/controller/WorkflowExecuteController.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.controller;
+
+import org.apache.dolphinscheduler.remote.dto.WorkflowExecuteDto;
+import org.apache.dolphinscheduler.server.master.service.ExecutingService;
+
+import java.util.Optional;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.HttpStatus;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.ResponseStatus;
+import org.springframework.web.bind.annotation.RestController;
+
+@RestController
+@RequestMapping("/workflow/execute")
+public class WorkflowExecuteController {
+
+ @Autowired
+ private ExecutingService executingService;
+
+ /**
+ * query workflow execute data in memory
+ * @param processInstanceId
+ * @return
+ */
+ @GetMapping("")
+ @ResponseStatus(HttpStatus.OK)
+ public WorkflowExecuteDto queryExecuteData(@RequestParam("id") int processInstanceId) {
+ Optional<WorkflowExecuteDto> workflowExecuteDtoOptional = executingService.queryWorkflowExecutingData(processInstanceId);
+ return workflowExecuteDtoOptional.orElse(null);
+ }
+}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/WorkflowExecutingDataRequestProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/WorkflowExecutingDataRequestProcessor.java
new file mode 100644
index 0000000000..c8f70d96d0
--- /dev/null
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/WorkflowExecutingDataRequestProcessor.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.processor;
+
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.command.WorkflowExecutingDataRequestCommand;
+import org.apache.dolphinscheduler.remote.command.WorkflowExecutingDataResponseCommand;
+import org.apache.dolphinscheduler.remote.dto.WorkflowExecuteDto;
+import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
+import org.apache.dolphinscheduler.server.master.service.ExecutingService;
+
+import java.util.Optional;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import com.google.common.base.Preconditions;
+
+import io.netty.channel.Channel;
+
+/**
+ * workflow executing data process from api/master
+ */
+@Component
+public class WorkflowExecutingDataRequestProcessor implements NettyRequestProcessor {
+
+ private final Logger logger = LoggerFactory.getLogger(WorkflowExecutingDataRequestProcessor.class);
+
+ @Autowired
+ private ExecutingService executingService;
+
+ @Override
+ public void process(Channel channel, Command command) {
+ Preconditions.checkArgument(CommandType.WORKFLOW_EXECUTING_DATA_REQUEST == command.getType(), String.format("invalid command type: %s", command.getType()));
+
+ WorkflowExecutingDataRequestCommand requestCommand = JSONUtils.parseObject(command.getBody(), WorkflowExecutingDataRequestCommand.class);
+
+ logger.info("received command, processInstanceId:{}", requestCommand.getProcessInstanceId());
+
+ Optional<WorkflowExecuteDto> workflowExecuteDtoOptional = executingService.queryWorkflowExecutingData(requestCommand.getProcessInstanceId());
+
+ WorkflowExecutingDataResponseCommand responseCommand = new WorkflowExecutingDataResponseCommand();
+ workflowExecuteDtoOptional.ifPresent(responseCommand::setWorkflowExecuteDto);
+ channel.writeAndFlush(responseCommand.convert2Command(command.getOpaque()));
+ }
+}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java
index 24c3530462..12bbef36f9 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java
@@ -28,6 +28,7 @@ import org.apache.dolphinscheduler.server.master.processor.TaskEventProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskExecuteResponseProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskExecuteRunningProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor;
+import org.apache.dolphinscheduler.server.master.processor.WorkflowExecutingDataRequestProcessor;
import javax.annotation.PostConstruct;
@@ -70,6 +71,9 @@ public class MasterRPCServer implements AutoCloseable {
@Autowired
private LoggerRequestProcessor loggerRequestProcessor;
+ @Autowired
+ private WorkflowExecutingDataRequestProcessor workflowExecutingDataRequestProcessor;
+
@PostConstruct
private void init() {
// init remoting server
@@ -83,6 +87,7 @@ public class MasterRPCServer implements AutoCloseable {
this.nettyRemotingServer.registerProcessor(CommandType.TASK_FORCE_STATE_EVENT_REQUEST, taskEventProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_WAKEUP_EVENT_REQUEST, taskEventProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.CACHE_EXPIRE, cacheProcessor);
+ this.nettyRemotingServer.registerProcessor(CommandType.WORKFLOW_EXECUTING_DATA_REQUEST, workflowExecutingDataRequestProcessor);
// logger server
this.nettyRemotingServer.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, loggerRequestProcessor);
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
index bb49d9dea1..e61643ab36 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
@@ -18,7 +18,6 @@
package org.apache.dolphinscheduler.server.master.runner;
import org.apache.dolphinscheduler.common.enums.Flag;
-import org.apache.dolphinscheduler.server.master.event.StateEvent;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
@@ -27,8 +26,10 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
+import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+import org.apache.dolphinscheduler.server.master.event.StateEvent;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.commons.lang.StringUtils;
@@ -46,8 +47,6 @@ import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
-import com.google.common.base.Strings;
-
import lombok.NonNull;
/**
@@ -191,11 +190,9 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
logger.error("process {} host is empty, cannot notify task {} now", processInstance.getId(), taskInstance.getId());
return;
}
- String address = host.split(":")[0];
- int port = Integer.parseInt(host.split(":")[1]);
StateEventChangeCommand stateEventChangeCommand = new StateEventChangeCommand(
finishProcessInstance.getId(), 0, finishProcessInstance.getState(), processInstance.getId(), taskInstance.getId()
);
- stateEventCallbackService.sendResult(address, port, stateEventChangeCommand.convert2Command());
+ stateEventCallbackService.sendResult(new Host(host), stateEventChangeCommand.convert2Command());
}
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
index 747c3dd77a..6c27adae57 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
@@ -29,6 +29,7 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
+import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.utils.LogUtils;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
@@ -210,9 +211,7 @@ public class SubTaskProcessor extends BaseTaskProcessor {
StateEventChangeCommand stateEventChangeCommand = new StateEventChangeCommand(
processInstance.getId(), taskInstance.getId(), subProcessInstance.getState(), subProcessInstance.getId(), 0
);
- String address = subProcessInstance.getHost().split(":")[0];
- int port = Integer.parseInt(subProcessInstance.getHost().split(":")[1]);
- this.stateEventCallbackService.sendResult(address, port, stateEventChangeCommand.convert2Command());
+ this.stateEventCallbackService.sendResult(new Host(subProcessInstance.getHost()), stateEventChangeCommand.convert2Command());
}
@Override
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/ExecutingService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/ExecutingService.java
new file mode 100644
index 0000000000..fa85b66f82
--- /dev/null
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/ExecutingService.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.service;
+
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.remote.dto.TaskInstanceExecuteDto;
+import org.apache.dolphinscheduler.remote.dto.WorkflowExecuteDto;
+import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
+import org.apache.dolphinscheduler.server.master.controller.WorkflowExecuteController;
+import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
+
+import org.apache.commons.beanutils.BeanUtils;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.compress.utils.Lists;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.List;
+import java.util.Optional;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+/**
+ * executing service, to query executing data from memory, such workflow instance
+ */
+@Component
+public class ExecutingService {
+
+ private static final Logger logger = LoggerFactory.getLogger(WorkflowExecuteController.class);
+
+ @Autowired
+ private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
+
+ public Optional<WorkflowExecuteDto> queryWorkflowExecutingData(Integer processInstanceId) {
+ WorkflowExecuteRunnable workflowExecuteRunnable = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
+ if (workflowExecuteRunnable == null) {
+ logger.info("workflow execute data not found, maybe it has finished, workflow id:{}", processInstanceId);
+ return Optional.empty();
+ }
+ try {
+ WorkflowExecuteDto workflowExecuteDto = new WorkflowExecuteDto();
+ BeanUtils.copyProperties(workflowExecuteDto, workflowExecuteRunnable.getProcessInstance());
+ List<TaskInstanceExecuteDto> taskInstanceList = Lists.newArrayList();
+ if (CollectionUtils.isNotEmpty(workflowExecuteRunnable.getAllTaskInstances())) {
+ for (TaskInstance taskInstance : workflowExecuteRunnable.getAllTaskInstances()) {
+ TaskInstanceExecuteDto taskInstanceExecuteDto = new TaskInstanceExecuteDto();
+ BeanUtils.copyProperties(taskInstanceExecuteDto, taskInstance);
+ taskInstanceList.add(taskInstanceExecuteDto);
+ }
+ }
+ workflowExecuteDto.setTaskInstances(taskInstanceList);
+ return Optional.of(workflowExecuteDto);
+ } catch (IllegalAccessException | InvocationTargetException e) {
+ logger.error("query workflow execute data fail, workflow id:{}", processInstanceId, e);
+ }
+ return Optional.empty();
+ }
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
index ed4c9ab94c..62bc7a53fc 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
@@ -130,5 +130,15 @@ public enum CommandType {
/**
* task state event request
*/
- TASK_WAKEUP_EVENT_REQUEST;
+ TASK_WAKEUP_EVENT_REQUEST,
+
+ /**
+ * workflow executing data request, from api to master
+ */
+ WORKFLOW_EXECUTING_DATA_REQUEST,
+
+ /**
+ * workflow executing data response, from master to api
+ */
+ WORKFLOW_EXECUTING_DATA_RESPONSE;
}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/WorkflowExecutingDataRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/WorkflowExecutingDataRequestCommand.java
new file mode 100644
index 0000000000..6085322831
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/WorkflowExecutingDataRequestCommand.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.command;
+
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+
+import java.io.Serializable;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * workflow executing data request, from api to master
+ */
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class WorkflowExecutingDataRequestCommand implements Serializable {
+
+ private Integer processInstanceId;
+
+ /**
+ * package request command
+ *
+ * @return command
+ */
+ public Command convert2Command() {
+ Command command = new Command();
+ command.setType(CommandType.WORKFLOW_EXECUTING_DATA_REQUEST);
+ byte[] body = JSONUtils.toJsonByteArray(this);
+ command.setBody(body);
+ return command;
+ }
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/WorkflowExecutingDataResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/WorkflowExecutingDataResponseCommand.java
new file mode 100644
index 0000000000..2d2dfa20b0
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/WorkflowExecutingDataResponseCommand.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.command;
+
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.remote.dto.WorkflowExecuteDto;
+
+import java.io.Serializable;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * workflow executing data response, from master to api
+ */
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class WorkflowExecutingDataResponseCommand implements Serializable {
+
+ private WorkflowExecuteDto workflowExecuteDto;
+
+ /**
+ * package request command
+ *
+ * @return command
+ */
+ public Command convert2Command(long opaque) {
+ Command command = new Command(opaque);
+ command.setType(CommandType.WORKFLOW_EXECUTING_DATA_RESPONSE);
+ byte[] body = JSONUtils.toJsonByteArray(this);
+ command.setBody(body);
+ return command;
+ }
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/dto/TaskInstanceExecuteDto.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/dto/TaskInstanceExecuteDto.java
new file mode 100644
index 0000000000..4bfae1036d
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/dto/TaskInstanceExecuteDto.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.dto;
+
+import org.apache.dolphinscheduler.common.enums.Flag;
+import org.apache.dolphinscheduler.common.enums.Priority;
+import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
+
+import java.util.Date;
+import java.util.Map;
+
+import lombok.Data;
+
+@Data
+public class TaskInstanceExecuteDto {
+
+ private int id;
+
+ private String name;
+
+ private String taskType;
+
+ private int processInstanceId;
+
+ private long taskCode;
+
+ private int taskDefinitionVersion;
+
+ private String processInstanceName;
+
+ private int taskGroupPriority;
+
+ private ExecutionStatus state;
+
+ private Date firstSubmitTime;
+
+ private Date submitTime;
+
+ private Date startTime;
+
+ private Date endTime;
+
+ private String host;
+
+ private String executePath;
+
+ private String logPath;
+
+ private int retryTimes;
+
+ private Flag alertFlag;
+
+ private int pid;
+
+ private String appLink;
+
+ private Flag flag;
+
+ private String duration;
+
+ private int maxRetryTimes;
+
+ private int retryInterval;
+
+ private Priority taskInstancePriority;
+
+ private Priority processInstancePriority;
+
+ private String workerGroup;
+
+ private Long environmentCode;
+
+ private String environmentConfig;
+
+ private int executorId;
+
+ private String varPool;
+
+ private String executorName;
+
+ private Map<String, String> resources;
+
+ private int delayTime;
+
+ private String taskParams;
+
+ private int dryRun;
+
+ private int taskGroupId;
+
+ private Integer cpuQuota;
+
+ private Integer memoryMax;
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/dto/WorkflowExecuteDto.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/dto/WorkflowExecuteDto.java
new file mode 100644
index 0000000000..64d5542b94
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/dto/WorkflowExecuteDto.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.dto;
+
+import org.apache.dolphinscheduler.common.enums.CommandType;
+import org.apache.dolphinscheduler.common.enums.FailureStrategy;
+import org.apache.dolphinscheduler.common.enums.Flag;
+import org.apache.dolphinscheduler.common.enums.Priority;
+import org.apache.dolphinscheduler.common.enums.TaskDependType;
+import org.apache.dolphinscheduler.common.enums.WarningType;
+import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
+
+import java.util.Collection;
+import java.util.Date;
+
+import lombok.Getter;
+import lombok.Setter;
+
+@Setter
+@Getter
+public class WorkflowExecuteDto {
+
+ private int id;
+
+ private String name;
+
+ private Long processDefinitionCode;
+
+ private int processDefinitionVersion;
+
+ private ExecutionStatus state;
+
+ /**
+ * recovery flag for failover
+ */
+ private Flag recovery;
+
+ private Date startTime;
+
+ private Date endTime;
+
+ private int runTimes;
+
+ private String host;
+
+ private CommandType commandType;
+
+ private String commandParam;
+
+ /**
+ * node depend type
+ */
+ private TaskDependType taskDependType;
+
+ private int maxTryTimes;
+
+ /**
+ * failure strategy when task failed.
+ */
+ private FailureStrategy failureStrategy;
+
+ /**
+ * warning type
+ */
+ private WarningType warningType;
+
+ private Integer warningGroupId;
+
+ private Date scheduleTime;
+
+ private Date commandStartTime;
+
+ /**
+ * user define parameters string
+ */
+ private String globalParams;
+
+ /**
+ * executor id
+ */
+ private int executorId;
+
+ /**
+ * executor name
+ */
+ private String executorName;
+
+ /**
+ * tenant code
+ */
+ private String tenantCode;
+
+ /**
+ * queue
+ */
+ private String queue;
+
+ /**
+ * process is sub process
+ */
+ private Flag isSubProcess;
+
+ /**
+ * history command
+ */
+ private String historyCmd;
+
+ /**
+ * depend processes schedule time
+ */
+ private String dependenceScheduleTimes;
+
+ private String duration;
+
+ private Priority processInstancePriority;
+
+ private String workerGroup;
+
+ private Long environmentCode;
+
+ private int timeout;
+
+ private int tenantId;
+
+ /**
+ * varPool string
+ */
+ private String varPool;
+
+ private int nextProcessInstanceId;
+
+ private int dryRun;
+
+ private Date restartTime;
+
+ private boolean isBlocked;
+
+ private Collection<TaskInstanceExecuteDto> taskInstances;
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/StateEventCallbackService.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/StateEventCallbackService.java
index be564261fb..32a0f4673d 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/StateEventCallbackService.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/StateEventCallbackService.java
@@ -17,11 +17,13 @@
package org.apache.dolphinscheduler.remote.processor;
+import static org.apache.dolphinscheduler.common.Constants.HTTP_CONNECTION_REQUEST_TIMEOUT;
import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
+import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
import org.apache.dolphinscheduler.remote.utils.Host;
import java.util.Optional;
@@ -110,17 +112,6 @@ public class StateEventCallbackService {
REMOTE_CHANNELS.remove(host);
}
- /**
- * Send the command to target address, this method doesn't guarantee the command send success.
- *
- * @param command command need tp send
- */
- public void sendResult(String address, int port, Command command) {
- logger.info("send result, host:{}, command:{}", address, command.toString());
- Host host = new Host(address, port);
- sendResult(host, command);
- }
-
/**
* Send the command to target host, this method doesn't guarantee the command send success.
*
@@ -133,4 +124,27 @@ public class StateEventCallbackService {
nettyRemoteChannel.writeAndFlush(command);
});
}
+
+ /**
+ * send sync and return response command
+ * @param host
+ * @param requestCommand
+ * @return
+ * @throws RemotingException
+ * @throws InterruptedException
+ */
+ public Command sendSync(Host host, Command requestCommand) {
+ try {
+ return this.nettyRemotingClient.sendSync(host, requestCommand, HTTP_CONNECTION_REQUEST_TIMEOUT);
+ } catch (InterruptedException e) {
+ logger.error("send sync fail, host:{}, command:{}", host, requestCommand, e);
+ Thread.currentThread().interrupt();
+ } catch (RemotingException e) {
+ logger.error("send sync fail, host:{}, command:{}", host, requestCommand, e);
+ }
+ finally {
+ this.nettyRemotingClient.closeChannel(host);
+ }
+ return null;
+ }
}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index 9cce9152f2..2f0e1c077a 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.service.process;
+import static java.util.stream.Collectors.toSet;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_EMPTY_SUB_PROCESS;
@@ -30,8 +31,6 @@ import static org.apache.dolphinscheduler.plugin.task.api.enums.DataType.VARCHAR
import static org.apache.dolphinscheduler.plugin.task.api.enums.Direct.IN;
import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TASK_INSTANCE_ID;
-import static java.util.stream.Collectors.toSet;
-
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.CommandType;
@@ -334,14 +333,12 @@ public class ProcessServiceImpl implements ProcessService {
int update = updateProcessInstance(info);
// determine whether the process is normal
if (update > 0) {
- String host = info.getHost();
- String address = host.split(":")[0];
- int port = Integer.parseInt(host.split(":")[1]);
+ Host host = new Host(info.getHost());
StateEventChangeCommand stateEventChangeCommand = new StateEventChangeCommand(
info.getId(), 0, info.getState(), info.getId(), 0
);
try {
- stateEventCallbackService.sendResult(address, port, stateEventChangeCommand.convert2Command());
+ stateEventCallbackService.sendResult(host, stateEventChangeCommand.convert2Command());
} catch (Exception e) {
logger.error("sendResultError");
}
@@ -3035,13 +3032,11 @@ public class ProcessServiceImpl implements ProcessService {
@Override
public void sendStartTask2Master(ProcessInstance processInstance, int taskId,
org.apache.dolphinscheduler.remote.command.CommandType taskType) {
- String host = processInstance.getHost();
- String address = host.split(":")[0];
- int port = Integer.parseInt(host.split(":")[1]);
+ Host host = new Host(processInstance.getHost());
TaskEventChangeCommand taskEventChangeCommand = new TaskEventChangeCommand(
processInstance.getId(), taskId
);
- stateEventCallbackService.sendResult(address, port, taskEventChangeCommand.convert2Command(taskType));
+ stateEventCallbackService.sendResult(host, taskEventChangeCommand.convert2Command(taskType));
}
@Override