You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by GitBox <gi...@apache.org> on 2021/02/18 10:03:31 UTC

[GitHub] [incubator-dolphinscheduler] chengshiwen commented on a change in pull request #4765: [Improvement-3369][api] Introduce monitor, processinstance and queue service interface for clear code

chengshiwen commented on a change in pull request #4765:
URL: https://github.com/apache/incubator-dolphinscheduler/pull/4765#discussion_r578284848



##########
File path: dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
##########
@@ -0,0 +1,741 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.api.service.impl;
+
+import static org.apache.dolphinscheduler.common.Constants.DATA_LIST;
+import static org.apache.dolphinscheduler.common.Constants.DEPENDENT_SPLIT;
+import static org.apache.dolphinscheduler.common.Constants.GLOBAL_PARAMS;
+import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS;
+import static org.apache.dolphinscheduler.common.Constants.PROCESS_INSTANCE_STATE;
+import static org.apache.dolphinscheduler.common.Constants.TASK_LIST;
+
+import org.apache.dolphinscheduler.api.dto.gantt.GanttDto;
+import org.apache.dolphinscheduler.api.dto.gantt.Task;
+import org.apache.dolphinscheduler.api.enums.Status;
+import org.apache.dolphinscheduler.api.service.BaseService;
+import org.apache.dolphinscheduler.api.service.ExecutorService;
+import org.apache.dolphinscheduler.api.service.LoggerService;
+import org.apache.dolphinscheduler.api.service.ProcessDefinitionService;
+import org.apache.dolphinscheduler.api.service.ProcessDefinitionVersionService;
+import org.apache.dolphinscheduler.api.service.ProcessInstanceService;
+import org.apache.dolphinscheduler.api.service.ProjectService;
+import org.apache.dolphinscheduler.api.service.UsersService;
+import org.apache.dolphinscheduler.api.utils.PageInfo;
+import org.apache.dolphinscheduler.api.utils.Result;
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.DependResult;
+import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.common.enums.Flag;
+import org.apache.dolphinscheduler.common.enums.TaskType;
+import org.apache.dolphinscheduler.common.graph.DAG;
+import org.apache.dolphinscheduler.common.model.TaskNode;
+import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
+import org.apache.dolphinscheduler.common.process.ProcessDag;
+import org.apache.dolphinscheduler.common.process.Property;
+import org.apache.dolphinscheduler.common.utils.CollectionUtils;
+import org.apache.dolphinscheduler.common.utils.DateUtils;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.ParameterUtils;
+import org.apache.dolphinscheduler.common.utils.StringUtils;
+import org.apache.dolphinscheduler.common.utils.placeholder.BusinessTimeUtils;
+import org.apache.dolphinscheduler.dao.entity.ProcessData;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.Project;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.entity.Tenant;
+import org.apache.dolphinscheduler.dao.entity.User;
+import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
+import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
+import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
+import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
+import org.apache.dolphinscheduler.dao.utils.DagHelper;
+import org.apache.dolphinscheduler.service.process.ProcessService;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
+
+import com.baomidou.mybatisplus.core.metadata.IPage;
+import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
+
+/**
+ * process instance service impl
+ */
+@Service
+public class ProcessInstanceServiceImpl extends BaseService implements ProcessInstanceService {
+
+    @Autowired
+    ProjectMapper projectMapper;
+
+    @Autowired
+    ProjectService projectService;
+
+    @Autowired
+    ProcessService processService;
+
+    @Autowired
+    ProcessInstanceMapper processInstanceMapper;
+
+    @Autowired
+    ProcessDefinitionMapper processDefineMapper;
+
+    @Autowired
+    ProcessDefinitionService processDefinitionService;
+
+    @Autowired
+    ProcessDefinitionVersionService processDefinitionVersionService;
+
+    @Autowired
+    ExecutorService execService;
+
+    @Autowired
+    TaskInstanceMapper taskInstanceMapper;
+
+    @Autowired
+    LoggerService loggerService;
+
+
+    @Autowired
+    UsersService usersService;
+
+    /**
+     * return top n SUCCESS process instance order by running time which started between startTime and endTime
+     */
+    public Map<String, Object> queryTopNLongestRunningProcessInstance(User loginUser, String projectName, int size, String startTime, String endTime) {
+        Map<String, Object> result = new HashMap<>();
+
+        Project project = projectMapper.queryByName(projectName);
+        Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName);
+        Status resultEnum = (Status) checkResult.get(Constants.STATUS);
+        if (resultEnum != Status.SUCCESS) {
+            return checkResult;
+        }
+
+        if (0 > size) {
+            putMsg(result, Status.NEGTIVE_SIZE_NUMBER_ERROR, size);
+            return result;
+        }
+        if (Objects.isNull(startTime)) {
+            putMsg(result, Status.DATA_IS_NULL, Constants.START_TIME);
+            return result;
+        }
+        Date start = DateUtils.stringToDate(startTime);
+        if (Objects.isNull(endTime)) {
+            putMsg(result, Status.DATA_IS_NULL, Constants.END_TIME);
+            return result;
+        }
+        Date end = DateUtils.stringToDate(endTime);
+        if (start == null || end == null) {
+            putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "startDate,endDate");
+            return result;
+        }
+        if (start.getTime() > end.getTime()) {
+            putMsg(result, Status.START_TIME_BIGGER_THAN_END_TIME_ERROR, startTime, endTime);
+            return result;
+        }
+
+        List<ProcessInstance> processInstances = processInstanceMapper.queryTopNProcessInstance(size, start, end, ExecutionStatus.SUCCESS);
+        result.put(DATA_LIST, processInstances);
+        putMsg(result, Status.SUCCESS);
+        return result;
+    }
+
+    /**
+     * query process instance by id
+     *
+     * @param loginUser login user
+     * @param projectName project name
+     * @param processId process instance id
+     * @return process instance detail
+     */
+    public Map<String, Object> queryProcessInstanceById(User loginUser, String projectName, Integer processId) {
+        Map<String, Object> result = new HashMap<>();
+        Project project = projectMapper.queryByName(projectName);
+
+        Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName);
+        Status resultEnum = (Status) checkResult.get(Constants.STATUS);
+        if (resultEnum != Status.SUCCESS) {
+            return checkResult;
+        }
+        ProcessInstance processInstance = processService.findProcessInstanceDetailById(processId);
+
+        ProcessDefinition processDefinition = processService.findProcessDefineById(processInstance.getProcessDefinitionId());
+        processInstance.setWarningGroupId(processDefinition.getWarningGroupId());
+        result.put(DATA_LIST, processInstance);
+        putMsg(result, Status.SUCCESS);
+
+        return result;
+    }
+
+    /**
+     * paging query process instance list, filtering according to project, process definition, time range, keyword, process status
+     *
+     * @param loginUser login user
+     * @param projectName project name
+     * @param pageNo page number
+     * @param pageSize page size
+     * @param processDefineId process definition id
+     * @param searchVal search value
+     * @param stateType state type
+     * @param host host
+     * @param startDate start time
+     * @param endDate end time
+     * @return process instance list
+     */
+    public Map<String, Object> queryProcessInstanceList(User loginUser, String projectName, Integer processDefineId,
+                                                        String startDate, String endDate,
+                                                        String searchVal, String executorName, ExecutionStatus stateType, String host,
+                                                        Integer pageNo, Integer pageSize) {
+
+        Map<String, Object> result = new HashMap<>();
+        Project project = projectMapper.queryByName(projectName);
+
+        Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName);
+        Status resultEnum = (Status) checkResult.get(Constants.STATUS);
+        if (resultEnum != Status.SUCCESS) {
+            return checkResult;
+        }
+
+        int[] statusArray = null;
+        // filter by state
+        if (stateType != null) {
+            statusArray = new int[]{stateType.ordinal()};
+        }
+
+        Date start = null;
+        Date end = null;
+        try {
+            if (StringUtils.isNotEmpty(startDate)) {
+                start = DateUtils.getScheduleDate(startDate);
+            }
+            if (StringUtils.isNotEmpty(endDate)) {
+                end = DateUtils.getScheduleDate(endDate);
+            }
+        } catch (Exception e) {
+            putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "startDate,endDate");
+            return result;
+        }
+
+        Page<ProcessInstance> page = new Page<>(pageNo, pageSize);
+        PageInfo<ProcessInstance> pageInfo = new PageInfo<>(pageNo, pageSize);
+        int executorId = usersService.getUserIdByName(executorName);
+
+        IPage<ProcessInstance> processInstanceList =
+                processInstanceMapper.queryProcessInstanceListPaging(page,
+                        project.getId(), processDefineId, searchVal, executorId, statusArray, host, start, end);
+
+        List<ProcessInstance> processInstances = processInstanceList.getRecords();
+
+        for (ProcessInstance processInstance : processInstances) {
+            processInstance.setDuration(DateUtils.format2Duration(processInstance.getStartTime(), processInstance.getEndTime()));
+            User executor = usersService.queryUser(processInstance.getExecutorId());
+            if (null != executor) {
+                processInstance.setExecutorName(executor.getUserName());
+            }
+        }
+
+        pageInfo.setTotalCount((int) processInstanceList.getTotal());
+        pageInfo.setLists(processInstances);
+        result.put(DATA_LIST, pageInfo);
+        putMsg(result, Status.SUCCESS);
+        return result;
+    }
+
+    /**
+     * query task list by process instance id
+     *
+     * @param loginUser login user
+     * @param projectName project name
+     * @param processId process instance id
+     * @return task list for the process instance
+     * @throws IOException io exception
+     */
+    public Map<String, Object> queryTaskListByProcessId(User loginUser, String projectName, Integer processId) throws IOException {
+        Map<String, Object> result = new HashMap<>();
+        Project project = projectMapper.queryByName(projectName);
+
+        Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName);
+        Status resultEnum = (Status) checkResult.get(Constants.STATUS);
+        if (resultEnum != Status.SUCCESS) {
+            return checkResult;
+        }
+        ProcessInstance processInstance = processService.findProcessInstanceDetailById(processId);
+        List<TaskInstance> taskInstanceList = processService.findValidTaskListByProcessId(processId);
+        addDependResultForTaskList(taskInstanceList);
+        Map<String, Object> resultMap = new HashMap<>();
+        resultMap.put(PROCESS_INSTANCE_STATE, processInstance.getState().toString());
+        resultMap.put(TASK_LIST, taskInstanceList);
+        result.put(DATA_LIST, resultMap);
+
+        putMsg(result, Status.SUCCESS);
+        return result;
+    }
+
+    /**
+     * add dependent result for dependent task
+     */
+    private void addDependResultForTaskList(List<TaskInstance> taskInstanceList) throws IOException {
+        for (TaskInstance taskInstance : taskInstanceList) {
+            if (taskInstance.getTaskType().equalsIgnoreCase(TaskType.DEPENDENT.toString())) {
+                Result<String> logResult = loggerService.queryLog(
+                        taskInstance.getId(), 0, 4098);

Review comment:
       Done

##########
File path: dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
##########
@@ -0,0 +1,741 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.api.service.impl;
+
+import static org.apache.dolphinscheduler.common.Constants.DATA_LIST;
+import static org.apache.dolphinscheduler.common.Constants.DEPENDENT_SPLIT;
+import static org.apache.dolphinscheduler.common.Constants.GLOBAL_PARAMS;
+import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS;
+import static org.apache.dolphinscheduler.common.Constants.PROCESS_INSTANCE_STATE;
+import static org.apache.dolphinscheduler.common.Constants.TASK_LIST;
+
+import org.apache.dolphinscheduler.api.dto.gantt.GanttDto;
+import org.apache.dolphinscheduler.api.dto.gantt.Task;
+import org.apache.dolphinscheduler.api.enums.Status;
+import org.apache.dolphinscheduler.api.service.BaseService;
+import org.apache.dolphinscheduler.api.service.ExecutorService;
+import org.apache.dolphinscheduler.api.service.LoggerService;
+import org.apache.dolphinscheduler.api.service.ProcessDefinitionService;
+import org.apache.dolphinscheduler.api.service.ProcessDefinitionVersionService;
+import org.apache.dolphinscheduler.api.service.ProcessInstanceService;
+import org.apache.dolphinscheduler.api.service.ProjectService;
+import org.apache.dolphinscheduler.api.service.UsersService;
+import org.apache.dolphinscheduler.api.utils.PageInfo;
+import org.apache.dolphinscheduler.api.utils.Result;
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.DependResult;
+import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.common.enums.Flag;
+import org.apache.dolphinscheduler.common.enums.TaskType;
+import org.apache.dolphinscheduler.common.graph.DAG;
+import org.apache.dolphinscheduler.common.model.TaskNode;
+import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
+import org.apache.dolphinscheduler.common.process.ProcessDag;
+import org.apache.dolphinscheduler.common.process.Property;
+import org.apache.dolphinscheduler.common.utils.CollectionUtils;
+import org.apache.dolphinscheduler.common.utils.DateUtils;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.ParameterUtils;
+import org.apache.dolphinscheduler.common.utils.StringUtils;
+import org.apache.dolphinscheduler.common.utils.placeholder.BusinessTimeUtils;
+import org.apache.dolphinscheduler.dao.entity.ProcessData;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.Project;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.entity.Tenant;
+import org.apache.dolphinscheduler.dao.entity.User;
+import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
+import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
+import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
+import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
+import org.apache.dolphinscheduler.dao.utils.DagHelper;
+import org.apache.dolphinscheduler.service.process.ProcessService;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
+
+import com.baomidou.mybatisplus.core.metadata.IPage;
+import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
+
+/**
+ * process instance service impl
+ */
+@Service
+public class ProcessInstanceServiceImpl extends BaseService implements ProcessInstanceService {
+
+    @Autowired
+    ProjectMapper projectMapper;
+
+    @Autowired
+    ProjectService projectService;
+
+    @Autowired
+    ProcessService processService;
+
+    @Autowired
+    ProcessInstanceMapper processInstanceMapper;
+
+    @Autowired
+    ProcessDefinitionMapper processDefineMapper;
+
+    @Autowired
+    ProcessDefinitionService processDefinitionService;
+
+    @Autowired
+    ProcessDefinitionVersionService processDefinitionVersionService;
+
+    @Autowired
+    ExecutorService execService;
+
+    @Autowired
+    TaskInstanceMapper taskInstanceMapper;
+
+    @Autowired
+    LoggerService loggerService;
+
+
+    @Autowired
+    UsersService usersService;
+
+    /**
+     * return top n SUCCESS process instance order by running time which started between startTime and endTime
+     */
+    public Map<String, Object> queryTopNLongestRunningProcessInstance(User loginUser, String projectName, int size, String startTime, String endTime) {
+        Map<String, Object> result = new HashMap<>();
+
+        Project project = projectMapper.queryByName(projectName);
+        Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName);
+        Status resultEnum = (Status) checkResult.get(Constants.STATUS);
+        if (resultEnum != Status.SUCCESS) {
+            return checkResult;
+        }
+
+        if (0 > size) {
+            putMsg(result, Status.NEGTIVE_SIZE_NUMBER_ERROR, size);
+            return result;
+        }
+        if (Objects.isNull(startTime)) {
+            putMsg(result, Status.DATA_IS_NULL, Constants.START_TIME);
+            return result;
+        }
+        Date start = DateUtils.stringToDate(startTime);
+        if (Objects.isNull(endTime)) {
+            putMsg(result, Status.DATA_IS_NULL, Constants.END_TIME);
+            return result;
+        }
+        Date end = DateUtils.stringToDate(endTime);
+        if (start == null || end == null) {
+            putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "startDate,endDate");
+            return result;
+        }
+        if (start.getTime() > end.getTime()) {
+            putMsg(result, Status.START_TIME_BIGGER_THAN_END_TIME_ERROR, startTime, endTime);

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org