You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by GitBox <gi...@apache.org> on 2021/11/24 10:03:55 UTC

[GitHub] [dolphinscheduler] caishunfeng commented on a change in pull request #6722: [Feature-#6422] task group queue

caishunfeng commented on a change in pull request #6722:
URL: https://github.com/apache/dolphinscheduler/pull/6722#discussion_r749872573



##########
File path: dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
##########
@@ -2568,6 +2581,174 @@ private void addAuthorizedResources(List<Resource> ownResources, int userId) {
         return processTaskMap;
     }
 
+    /**
+     * @param taskId    task id
+     * @param taskName
+     * @param groupId
+     * @param processId
+     * @param priority
+     * @return
+     */
+    public boolean acquireTaskGroup(int taskId,
+                                    String taskName, int groupId,
+                                    int processId, int priority) {
+        TaskGroup taskGroup = taskGroupMapper.selectById(groupId);
+        if (taskGroup == null) {
+            return true;
+        }
+        // if task group is not applicable
+        if (taskGroup.getStatus() == Flag.NO.getCode()) {
+            return true;
+        }
+        TaskGroupQueue taskGroupQueue = this.taskGroupQueueMapper.queryByTaskId(taskId);
+        if (taskGroupQueue == null) {
+            taskGroupQueue = insertIntoTaskGroupQueue(taskId, taskName, groupId, processId, priority, TaskGroupQueueStatus.WAIT_QUEUE);
+        } else {
+            if (taskGroupQueue.getStatus() == TaskGroupQueueStatus.ACQUIRE_SUCCESS) {
+                return true;
+            }
+            taskGroupQueue.setInQueue(Flag.NO.getCode());
+            taskGroupQueue.setStatus(TaskGroupQueueStatus.WAIT_QUEUE);
+            this.taskGroupQueueMapper.updateById(taskGroupQueue);
+        }
+        //check priority
+        List<TaskGroupQueue> highPriorityTasks = taskGroupQueueMapper.queryHighPriorityTasks(groupId, priority, TaskGroupQueueStatus.WAIT_QUEUE.getCode());
+        if (CollectionUtils.isNotEmpty(highPriorityTasks)) {
+            this.taskGroupQueueMapper.updateInQueue(Flag.NO.getCode(), taskGroupQueue.getId());
+            return false;
+        }
+        //try to get taskGroup
+        int count = taskGroupMapper.selectAvailableCountById(groupId);
+        if (count == 1 && robTaskGroupResouce(taskGroupQueue)) {
+            return true;
+        }
+        this.taskGroupQueueMapper.updateInQueue(Flag.NO.getCode(), taskGroupQueue.getId());
+        return false;
+    }
+
+    public boolean robTaskGroupResouce(TaskGroupQueue taskGroupQueue) {

Review comment:
       what's the function name mean?

##########
File path: dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskGroupController.java
##########
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.api.controller;
+
+import static org.apache.dolphinscheduler.api.enums.Status.CLOSE_TASK_GROUP_ERROR;
+import static org.apache.dolphinscheduler.api.enums.Status.CREATE_TASK_GROUP_ERROR;
+import static org.apache.dolphinscheduler.api.enums.Status.QUERY_TASK_GROUP_LIST_ERROR;
+import static org.apache.dolphinscheduler.api.enums.Status.QUERY_TASK_GROUP_QUEUE_LIST_ERROR;
+import static org.apache.dolphinscheduler.api.enums.Status.START_TASK_GROUP_ERROR;
+import static org.apache.dolphinscheduler.api.enums.Status.UPDATE_TASK_GROUP_ERROR;
+
+import org.apache.dolphinscheduler.api.aspect.AccessLogAnnotation;
+import org.apache.dolphinscheduler.api.exceptions.ApiException;
+import org.apache.dolphinscheduler.api.service.TaskGroupQueueService;
+import org.apache.dolphinscheduler.api.service.TaskGroupService;
+import org.apache.dolphinscheduler.api.utils.Result;
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.dao.entity.User;
+
+import java.util.Map;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.HttpStatus;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestAttribute;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.ResponseStatus;
+import org.springframework.web.bind.annotation.RestController;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiImplicitParam;
+import io.swagger.annotations.ApiImplicitParams;
+import io.swagger.annotations.ApiOperation;
+import springfox.documentation.annotations.ApiIgnore;
+
+
+/**
+ * task group controller
+ */
+@Api(tags = "task group")
+@RestController
+@RequestMapping("/task-group")
+public class TaskGroupController extends BaseController {
+
+    @Autowired
+    private TaskGroupService taskGroupService;
+
+    /**
+     * query task group list
+     *
+     * @param loginUser   login user
+     * @param name        name
+     * @param description description
+     * @param groupSize   group size
+     * @param name        project id
+     * @return result and msg code
+     */
+    @ApiOperation(value = "createTaskGroup", notes = "CREATE_TAKS_GROUP_NOTE")
+    @ApiImplicitParams({
+        @ApiImplicitParam(name = "name", value = "NAME", dataType = "String"),
+        @ApiImplicitParam(name = "description", value = "DESCRIPTION", dataType = "String"),
+        @ApiImplicitParam(name = "groupSize", value = "GROUPSIZE", dataType = "Int"),
+
+    })
+    @PostMapping(value = "/create")
+    @ResponseStatus(HttpStatus.CREATED)
+    @ApiException(CREATE_TASK_GROUP_ERROR)
+    @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
+    public Result createTaskGroup(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
+                                  @RequestParam("name") String name,
+                                  @RequestParam("description") String description,
+                                  @RequestParam("groupSize") Integer groupSize) {
+        User user = new User();

Review comment:
       the user seems never use;

##########
File path: dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupQueueServiceImpl.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.api.service.impl;
+
+import org.apache.dolphinscheduler.api.enums.Status;
+import org.apache.dolphinscheduler.api.service.TaskGroupQueueService;
+import org.apache.dolphinscheduler.api.utils.PageInfo;
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue;
+import org.apache.dolphinscheduler.dao.entity.User;
+import org.apache.dolphinscheduler.dao.mapper.TaskGroupQueueMapper;
+import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import com.baomidou.mybatisplus.core.metadata.IPage;
+import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
+
+/**
+ * task group queue service
+ */
+@Service
+public class TaskGroupQueueServiceImpl extends BaseServiceImpl implements TaskGroupQueueService {
+
+    @Autowired
+    TaskGroupQueueMapper taskGroupQueueMapper;
+
+    @Autowired
+    private TaskInstanceMapper taskInstanceMapper;
+
+    private static final Logger logger = LoggerFactory.getLogger(TaskGroupQueueServiceImpl.class);
+
+    /**
+     * query tasks in task group queue by group id
+     *
+     * @param loginUser login user
+     * @param groupId   group id
+     * @param pageNo    page no
+     * @param pageSize  page size
+     * @return tasks list
+     */
+    @Override
+    public Map<String, Object> queryTasksByGroupId(User loginUser, Integer groupId, Integer pageNo, Integer pageSize) {
+        return this.doQuery(loginUser, pageNo, pageSize, null, groupId);
+    }
+
+    /**
+     * query tasks in task group queue by project id
+     *
+     * @param loginUser login user
+     * @param pageNo    page no
+     * @param pageSize  page size
+     * @param processId process id
+     * @return tasks list
+     */
+    @Override
+    public Map<String, Object> queryTasksByProcessId(User loginUser, Integer pageNo, Integer pageSize, Integer processId) {
+        return this.doQuery(loginUser, pageNo, pageSize, null, processId);
+    }
+
+    /**
+     * query all tasks in task group queue
+     *
+     * @param loginUser login user
+     * @param pageNo    page no
+     * @param pageSize  page size
+     * @return tasks list
+     */
+    @Override
+    public Map<String, Object> queryAllTasks(User loginUser, Integer pageNo, Integer pageSize) {
+        return this.doQuery(loginUser, pageNo, pageSize, null, null);
+    }
+
+    public Map<String, Object> doQuery(User loginUser, Integer pageNo, Integer pageSize,
+                                       Integer groupId,
+                                       Integer processId) {
+        Map<String, Object> result = new HashMap<>();
+        if (isNotAdmin(loginUser, result)) {
+            return result;
+        }
+        Page<TaskGroupQueue> page = new Page<>(pageNo, pageSize);
+        IPage<TaskGroupQueue> taskGroupQueue = taskGroupQueueMapper.queryTaskGroupQueuePaging(page, groupId);
+
+        PageInfo<TaskGroupQueue> pageInfo = new PageInfo<>(pageNo, pageSize);
+        pageInfo.setTotal((int) taskGroupQueue.getTotal());
+        pageInfo.setTotalList(taskGroupQueue.getRecords());
+
+        result.put(Constants.DATA_LIST, pageInfo);
+        logger.info("select result:{}", taskGroupQueue);

Review comment:
       Is this log for debug?

##########
File path: dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
##########
@@ -2568,6 +2581,174 @@ private void addAuthorizedResources(List<Resource> ownResources, int userId) {
         return processTaskMap;
     }
 
+    /**
+     * @param taskId    task id
+     * @param taskName
+     * @param groupId
+     * @param processId
+     * @param priority
+     * @return
+     */
+    public boolean acquireTaskGroup(int taskId,
+                                    String taskName, int groupId,
+                                    int processId, int priority) {
+        TaskGroup taskGroup = taskGroupMapper.selectById(groupId);
+        if (taskGroup == null) {
+            return true;
+        }
+        // if task group is not applicable
+        if (taskGroup.getStatus() == Flag.NO.getCode()) {
+            return true;
+        }
+        TaskGroupQueue taskGroupQueue = this.taskGroupQueueMapper.queryByTaskId(taskId);
+        if (taskGroupQueue == null) {
+            taskGroupQueue = insertIntoTaskGroupQueue(taskId, taskName, groupId, processId, priority, TaskGroupQueueStatus.WAIT_QUEUE);
+        } else {
+            if (taskGroupQueue.getStatus() == TaskGroupQueueStatus.ACQUIRE_SUCCESS) {
+                return true;
+            }
+            taskGroupQueue.setInQueue(Flag.NO.getCode());
+            taskGroupQueue.setStatus(TaskGroupQueueStatus.WAIT_QUEUE);
+            this.taskGroupQueueMapper.updateById(taskGroupQueue);
+        }
+        //check priority
+        List<TaskGroupQueue> highPriorityTasks = taskGroupQueueMapper.queryHighPriorityTasks(groupId, priority, TaskGroupQueueStatus.WAIT_QUEUE.getCode());
+        if (CollectionUtils.isNotEmpty(highPriorityTasks)) {
+            this.taskGroupQueueMapper.updateInQueue(Flag.NO.getCode(), taskGroupQueue.getId());
+            return false;
+        }
+        //try to get taskGroup
+        int count = taskGroupMapper.selectAvailableCountById(groupId);
+        if (count == 1 && robTaskGroupResouce(taskGroupQueue)) {
+            return true;
+        }
+        this.taskGroupQueueMapper.updateInQueue(Flag.NO.getCode(), taskGroupQueue.getId());
+        return false;
+    }
+
+    public boolean robTaskGroupResouce(TaskGroupQueue taskGroupQueue) {
+        TaskGroup taskGroup = taskGroupMapper.selectById(taskGroupQueue.getGroupId());
+        int affectedCount = taskGroupMapper.updateTaskGroupResource(taskGroup.getId(),taskGroupQueue.getId(),
+                TaskGroupQueueStatus.WAIT_QUEUE.getCode());
+        if (affectedCount > 0) {
+            taskGroupQueue.setStatus(TaskGroupQueueStatus.ACQUIRE_SUCCESS);
+            this.taskGroupQueueMapper.updateById(taskGroupQueue);
+            this.taskGroupQueueMapper.updateInQueue(Flag.NO.getCode(), taskGroupQueue.getId());
+            return true;
+        }
+        return false;
+    }
+
+    public boolean acquireTaskGroupAgain(TaskGroupQueue taskGroupQueue) {
+        return robTaskGroupResouce(taskGroupQueue);
+    }
+
+    public void releaseAllTaskGroup(int processInstanceId) {
+        List<TaskInstance> taskInstances = this.taskInstanceMapper.loadAllInfosNoRelease(processInstanceId, TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode());
+        for (TaskInstance info : taskInstances) {
+            releaseTaskGroup(info);
+        }
+    }
+
+    /**
+     * release the TGQ resource when the corresponding task is finished.
+     *
+     * @return the result code and msg
+     */
+    public TaskInstance releaseTaskGroup(TaskInstance taskInstance) {
+
+        TaskGroup taskGroup = taskGroupMapper.selectById(taskInstance.getTaskGroupId());
+        if (taskGroup == null) {
+            return null;
+        }
+        TaskGroupQueue thisTaskGroupQueue = this.taskGroupQueueMapper.queryByTaskId(taskInstance.getId());
+        if (thisTaskGroupQueue.getStatus() == TaskGroupQueueStatus.RELEASE) {
+            return null;
+        }
+        try {
+            while (taskGroupMapper.releaseTaskGroupResource(taskGroup.getId(), taskGroup.getUseSize()
+                    , thisTaskGroupQueue.getId(), TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode()) != 1) {
+                thisTaskGroupQueue = this.taskGroupQueueMapper.queryByTaskId(taskInstance.getId());
+                if (thisTaskGroupQueue.getStatus() == TaskGroupQueueStatus.RELEASE) {
+                    return null;
+                }
+                taskGroup = taskGroupMapper.selectById(taskInstance.getTaskGroupId());
+            }
+        } catch (Exception e) {
+            logger.info("error:{}",e);

Review comment:
       should be logger.error

##########
File path: dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskEventProcessor.java
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.processor;
+
+import org.apache.dolphinscheduler.common.enums.StateEvent;
+import org.apache.dolphinscheduler.common.enums.StateEventType;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.command.TaskEventChangeCommand;
+import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
+import org.apache.dolphinscheduler.server.master.processor.queue.StateEventResponseService;
+import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+import io.netty.channel.Channel;
+
+/**
+ * handle state event received from master/api
+ */
+public class TaskEventProcessor implements NettyRequestProcessor {
+
+    private final Logger logger = LoggerFactory.getLogger(TaskEventProcessor.class);
+
+    private StateEventResponseService stateEventResponseService;
+
+    public TaskEventProcessor() {
+        stateEventResponseService = SpringApplicationContext.getBean(StateEventResponseService.class);
+    }
+
+    @Override
+    public void process(Channel channel, Command command) {
+        Preconditions.checkArgument(CommandType.TASK_FORCE_STATE_EVENT_REQUEST == command.getType()
+                        || CommandType.TASK_WAKEUP_EVENT_REQUEST == command.getType()
+                , String.format("invalid command type: %s", command.getType()));
+
+        TaskEventChangeCommand taskEventChangeCommand = JSONUtils.parseObject(command.getBody(), TaskEventChangeCommand.class);
+        StateEvent stateEvent = new StateEvent();
+        stateEvent.setKey(taskEventChangeCommand.getKey());
+        stateEvent.setProcessInstanceId(taskEventChangeCommand.getProcessInstanceId());
+        stateEvent.setTaskInstanceId(taskEventChangeCommand.getTaskInstanceId());
+        stateEvent.setType(StateEventType.WAIT_TASK_GROUP);
+        logger.info("received command : {}", stateEvent);
+        stateEventResponseService.addEnent2WorkflowExecute(stateEvent);

Review comment:
       should be `addEvent2WorkflowExecute`?

##########
File path: dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
##########
@@ -2568,6 +2581,174 @@ private void addAuthorizedResources(List<Resource> ownResources, int userId) {
         return processTaskMap;
     }
 
+    /**
+     * @param taskId    task id
+     * @param taskName
+     * @param groupId
+     * @param processId
+     * @param priority
+     * @return
+     */
+    public boolean acquireTaskGroup(int taskId,
+                                    String taskName, int groupId,
+                                    int processId, int priority) {
+        TaskGroup taskGroup = taskGroupMapper.selectById(groupId);
+        if (taskGroup == null) {
+            return true;
+        }
+        // if task group is not applicable
+        if (taskGroup.getStatus() == Flag.NO.getCode()) {
+            return true;
+        }
+        TaskGroupQueue taskGroupQueue = this.taskGroupQueueMapper.queryByTaskId(taskId);
+        if (taskGroupQueue == null) {
+            taskGroupQueue = insertIntoTaskGroupQueue(taskId, taskName, groupId, processId, priority, TaskGroupQueueStatus.WAIT_QUEUE);
+        } else {
+            if (taskGroupQueue.getStatus() == TaskGroupQueueStatus.ACQUIRE_SUCCESS) {
+                return true;
+            }
+            taskGroupQueue.setInQueue(Flag.NO.getCode());
+            taskGroupQueue.setStatus(TaskGroupQueueStatus.WAIT_QUEUE);
+            this.taskGroupQueueMapper.updateById(taskGroupQueue);
+        }
+        //check priority
+        List<TaskGroupQueue> highPriorityTasks = taskGroupQueueMapper.queryHighPriorityTasks(groupId, priority, TaskGroupQueueStatus.WAIT_QUEUE.getCode());
+        if (CollectionUtils.isNotEmpty(highPriorityTasks)) {
+            this.taskGroupQueueMapper.updateInQueue(Flag.NO.getCode(), taskGroupQueue.getId());
+            return false;
+        }
+        //try to get taskGroup
+        int count = taskGroupMapper.selectAvailableCountById(groupId);
+        if (count == 1 && robTaskGroupResouce(taskGroupQueue)) {
+            return true;
+        }
+        this.taskGroupQueueMapper.updateInQueue(Flag.NO.getCode(), taskGroupQueue.getId());
+        return false;
+    }
+
+    public boolean robTaskGroupResouce(TaskGroupQueue taskGroupQueue) {
+        TaskGroup taskGroup = taskGroupMapper.selectById(taskGroupQueue.getGroupId());
+        int affectedCount = taskGroupMapper.updateTaskGroupResource(taskGroup.getId(),taskGroupQueue.getId(),
+                TaskGroupQueueStatus.WAIT_QUEUE.getCode());
+        if (affectedCount > 0) {
+            taskGroupQueue.setStatus(TaskGroupQueueStatus.ACQUIRE_SUCCESS);
+            this.taskGroupQueueMapper.updateById(taskGroupQueue);
+            this.taskGroupQueueMapper.updateInQueue(Flag.NO.getCode(), taskGroupQueue.getId());
+            return true;
+        }
+        return false;
+    }
+
+    public boolean acquireTaskGroupAgain(TaskGroupQueue taskGroupQueue) {

Review comment:
       Is it better to call the `robTaskGroupResouce` directly?

##########
File path: dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
##########
@@ -2568,6 +2581,174 @@ private void addAuthorizedResources(List<Resource> ownResources, int userId) {
         return processTaskMap;
     }
 
+    /**
+     * @param taskId    task id

Review comment:
       Please add some comments for what this function to do.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org