You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2022/09/23 14:25:31 UTC

[dolphinscheduler] branch dev updated: [Feature][API] New restful API for workflow and schedule (#11912)

This is an automated email from the ASF dual-hosted git repository.

caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 82ddd72e4a [Feature][API] New restful API for workflow and schedule  (#11912)
82ddd72e4a is described below

commit 82ddd72e4a2fae8d8926e5ff8ec35600d0483fbc
Author: Jiajie Zhong <zh...@hotmail.com>
AuthorDate: Fri Sep 23 22:25:19 2022 +0800

    [Feature][API] New restful API for workflow and schedule  (#11912)
    
    * [feat] New restful API for workflow and schedule
    
    CURD for workflow and schedule, different with exists
    API, this new restful api only operate single resource
    in each request, and return the latest. For example,
    previous workflow should also need to post tasks definition
    and tasks relation definition, but this patch will allow
    you to create workflow without task relate information
    
    * use checkProjectAndAuthThrowException, and fix CI error
    
    * Update dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ScheduleV2Controller.java
    
    * change method name
    from createProcessDefinitionV2 to createSingleProcessDefinition
    from updateProcessDefinitionV2 to updateSingleProcessDefinition
    
    Co-authored-by: caishunfeng <ca...@gmail.com>
---
 .../controller/ProcessDefinitionController.java    |  15 +-
 .../api/controller/ScheduleV2Controller.java       | 162 +++++
 .../api/controller/SchedulerController.java        |  13 +-
 .../api/controller/WorkflowV2Controller.java       | 165 ++++++
 .../dolphinscheduler/api/dto/PageQueryDto.java     |   8 +-
 .../api/dto/schedule/ScheduleCreateRequest.java    | 120 ++++
 .../api/dto/schedule/ScheduleFilterRequest.java    |  63 ++
 .../ScheduleParam.java}                            |  32 +-
 .../api/dto/schedule/ScheduleUpdateRequest.java    | 138 +++++
 .../api/dto/workflow/WorkflowCreateRequest.java    |  89 +++
 .../api/dto/workflow/WorkflowFilterRequest.java    |  63 ++
 .../api/dto/workflow/WorkflowUpdateRequest.java    |  99 ++++
 .../apache/dolphinscheduler/api/enums/Status.java  | 157 +++--
 .../api/service/ExecutorService.java               |  19 +-
 .../api/service/ProcessDefinitionService.java      |  51 +-
 .../api/service/SchedulerService.java              |  52 +-
 .../api/service/impl/ExecutorServiceImpl.java      |  49 +-
 .../service/impl/ProcessDefinitionServiceImpl.java | 262 +++++++--
 .../api/service/impl/SchedulerServiceImpl.java     | 255 ++++++--
 .../apache/dolphinscheduler/api/utils/Result.java  |   7 +-
 .../src/main/resources/i18n/messages.properties    |   4 +-
 .../main/resources/i18n/messages_en_US.properties  |   4 +-
 .../main/resources/i18n/messages_zh_CN.properties  |   4 +-
 .../ProcessDefinitionControllerTest.java           |  13 +-
 .../api/controller/SchedulerControllerTest.java    |  91 +--
 .../api/controller/WorkflowV2ControllerTest.java   | 168 ++++++
 .../api/service/BaseServiceTest.java               |  61 +-
 .../api/service/BaseServiceTestTool.java}          |  30 +-
 .../api/service/ProcessDefinitionServiceTest.java  | 654 +++++++++++++--------
 .../api/service/SchedulerServiceTest.java          | 373 +++++++++++-
 .../dao/mapper/ProcessDefinitionMapper.java        |  10 +
 .../dao/mapper/ScheduleMapper.java                 |  10 +
 .../dao/mapper/ProcessDefinitionMapper.xml         |  20 +
 .../dolphinscheduler/dao/mapper/ScheduleMapper.xml |  25 +
 34 files changed, 2654 insertions(+), 632 deletions(-)

diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java
index 03b6fc8359..c6c1cab8d2 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java
@@ -38,6 +38,7 @@ import static org.apache.dolphinscheduler.api.enums.Status.VERIFY_PROCESS_DEFINI
 import org.apache.dolphinscheduler.api.aspect.AccessLogAnnotation;
 import org.apache.dolphinscheduler.api.enums.Status;
 import org.apache.dolphinscheduler.api.exceptions.ApiException;
+import org.apache.dolphinscheduler.api.exceptions.ServiceException;
 import org.apache.dolphinscheduler.api.service.ProcessDefinitionService;
 import org.apache.dolphinscheduler.api.utils.PageInfo;
 import org.apache.dolphinscheduler.api.utils.Result;
@@ -660,9 +661,8 @@ public class ProcessDefinitionController extends BaseController {
     public Result deleteProcessDefinitionByCode(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
                                                 @ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode,
                                                 @PathVariable("code") long code) {
-        Map<String, Object> result =
-                processDefinitionService.deleteProcessDefinitionByCode(loginUser, projectCode, code);
-        return returnDataList(result);
+        processDefinitionService.deleteProcessDefinitionByCode(loginUser, code);
+        return new Result(Status.SUCCESS);
     }
 
     /**
@@ -691,13 +691,8 @@ public class ProcessDefinitionController extends BaseController {
             for (String strProcessDefinitionCode : processDefinitionCodeArray) {
                 long code = Long.parseLong(strProcessDefinitionCode);
                 try {
-                    Map<String, Object> deleteResult =
-                            processDefinitionService.deleteProcessDefinitionByCode(loginUser, projectCode, code);
-                    if (!Status.SUCCESS.equals(deleteResult.get(Constants.STATUS))) {
-                        deleteFailedCodeSet.add((String) deleteResult.get(Constants.MSG));
-                        logger.error((String) deleteResult.get(Constants.MSG));
-                    }
-                } catch (Exception e) {
+                    processDefinitionService.deleteProcessDefinitionByCode(loginUser, code);
+                } catch (ServiceException e) {
                     deleteFailedCodeSet.add(MessageFormat.format(Status.DELETE_PROCESS_DEFINE_BY_CODES_ERROR.getMsg(),
                             strProcessDefinitionCode));
                 }
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ScheduleV2Controller.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ScheduleV2Controller.java
new file mode 100644
index 0000000000..ba26cfa227
--- /dev/null
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ScheduleV2Controller.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.api.controller;
+
+import static org.apache.dolphinscheduler.api.enums.Status.CREATE_SCHEDULE_ERROR;
+import static org.apache.dolphinscheduler.api.enums.Status.DELETE_SCHEDULE_BY_ID_ERROR;
+import static org.apache.dolphinscheduler.api.enums.Status.QUERY_SCHEDULE_LIST_ERROR;
+import static org.apache.dolphinscheduler.api.enums.Status.QUERY_SCHEDULE_LIST_PAGING_ERROR;
+import static org.apache.dolphinscheduler.api.enums.Status.UPDATE_SCHEDULE_ERROR;
+
+import org.apache.dolphinscheduler.api.aspect.AccessLogAnnotation;
+import org.apache.dolphinscheduler.api.dto.schedule.ScheduleCreateRequest;
+import org.apache.dolphinscheduler.api.dto.schedule.ScheduleFilterRequest;
+import org.apache.dolphinscheduler.api.dto.schedule.ScheduleUpdateRequest;
+import org.apache.dolphinscheduler.api.exceptions.ApiException;
+import org.apache.dolphinscheduler.api.service.SchedulerService;
+import org.apache.dolphinscheduler.api.utils.PageInfo;
+import org.apache.dolphinscheduler.api.utils.Result;
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.dao.entity.Schedule;
+import org.apache.dolphinscheduler.dao.entity.User;
+
+import springfox.documentation.annotations.ApiIgnore;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.HttpStatus;
+import org.springframework.web.bind.annotation.DeleteMapping;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.PutMapping;
+import org.springframework.web.bind.annotation.RequestAttribute;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.ResponseStatus;
+import org.springframework.web.bind.annotation.RestController;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiImplicitParam;
+import io.swagger.annotations.ApiImplicitParams;
+import io.swagger.annotations.ApiOperation;
+
+/**
+ * schedule controller
+ */
+@Api(tags = "SCHEDULER_TAG")
+@RestController
+@RequestMapping("/v2/schedules")
+public class ScheduleV2Controller extends BaseController {
+
+    @Autowired
+    private SchedulerService schedulerService;
+
+    /**
+     * Create resource schedule
+     *
+     * @param loginUser             login user
+     * @param scheduleCreateRequest the new schedule object will be created
+     * @return ResourceResponse object created
+     */
+    @ApiOperation(value = "create", notes = "CREATE_SCHEDULE_NOTES")
+    @PostMapping(consumes = {"application/json"})
+    @ResponseStatus(HttpStatus.CREATED)
+    @ApiException(CREATE_SCHEDULE_ERROR)
+    @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
+    public Result<Schedule> createSchedule(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
+                                           @RequestBody ScheduleCreateRequest scheduleCreateRequest) {
+        Schedule schedule = schedulerService.createSchedulesV2(loginUser, scheduleCreateRequest);
+        return Result.success(schedule);
+    }
+
+    /**
+     * Delete schedule by id
+     *
+     * @param loginUser login user
+     * @param id        schedule object id
+     */
+    @ApiOperation(value = "delete", notes = "DELETE_SCHEDULE_NOTES")
+    @ApiImplicitParams({
+            @ApiImplicitParam(name = "id", value = "SCHEDULE_ID", dataTypeClass = long.class, example = "123456", required = true)
+    })
+    @DeleteMapping(value = "/{id}")
+    @ResponseStatus(HttpStatus.OK)
+    @ApiException(DELETE_SCHEDULE_BY_ID_ERROR)
+    @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
+    public Result deleteSchedule(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
+                                 @PathVariable("id") Integer id) {
+        schedulerService.deleteSchedulesById(loginUser, id);
+        return Result.success();
+    }
+
+    /**
+     * Update resource schedule
+     *
+     * @param loginUser        login user
+     * @param id               schedule object id
+     * @param scheduleUpdateRequest the schedule object will be updated
+     * @return result Result
+     */
+    @ApiOperation(value = "update", notes = "UPDATE_SCHEDULE_NOTES")
+    @PutMapping(value = "/{id}")
+    @ResponseStatus(HttpStatus.OK)
+    @ApiException(UPDATE_SCHEDULE_ERROR)
+    @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
+    public Result<Schedule> updateSchedule(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
+                                           @PathVariable("id") Integer id,
+                                           @RequestBody ScheduleUpdateRequest scheduleUpdateRequest) {
+        Schedule schedule = schedulerService.updateSchedulesV2(loginUser, id, scheduleUpdateRequest);
+        return Result.success(schedule);
+    }
+
+    /**
+     * Get resource schedule by id
+     *
+     * @param loginUser        login user
+     * @param id               schedule object id
+     * @return result Result
+     */
+    @ApiOperation(value = "get", notes = "GET_SCHEDULE_BY_ID_NOTES")
+    @GetMapping(value = "/{id}")
+    @ResponseStatus(HttpStatus.OK)
+    @ApiException(QUERY_SCHEDULE_LIST_ERROR)
+    @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
+    public Result<Schedule> getSchedule(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
+                                        @PathVariable("id") Integer id) {
+        Schedule schedule = schedulerService.getSchedule(loginUser, id);
+        return Result.success(schedule);
+    }
+
+    /**
+     * Get resource schedule according to query parameter
+     *
+     * @param loginUser        login user
+     * @
+     * @return result Result
+     */
+    @ApiOperation(value = "get", notes = "QUERY_SCHEDULE_LIST_PAGING_NOTES")
+    @GetMapping(consumes = {"application/json"})
+    @ResponseStatus(HttpStatus.OK)
+    @ApiException(QUERY_SCHEDULE_LIST_PAGING_ERROR)
+    @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
+    public Result<PageInfo<Schedule>> filterSchedule(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
+                                                     @RequestBody ScheduleFilterRequest scheduleFilterRequest) {
+        PageInfo<Schedule> schedules = schedulerService.filterSchedules(loginUser, scheduleFilterRequest);
+        return Result.success(schedules);
+    }
+}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/SchedulerController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/SchedulerController.java
index e2a6429f8e..cc01db15a7 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/SchedulerController.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/SchedulerController.java
@@ -18,7 +18,7 @@
 package org.apache.dolphinscheduler.api.controller;
 
 import static org.apache.dolphinscheduler.api.enums.Status.CREATE_SCHEDULE_ERROR;
-import static org.apache.dolphinscheduler.api.enums.Status.DELETE_SCHEDULE_CRON_BY_ID_ERROR;
+import static org.apache.dolphinscheduler.api.enums.Status.DELETE_SCHEDULE_BY_ID_ERROR;
 import static org.apache.dolphinscheduler.api.enums.Status.OFFLINE_SCHEDULE_ERROR;
 import static org.apache.dolphinscheduler.api.enums.Status.PREVIEW_SCHEDULE_ERROR;
 import static org.apache.dolphinscheduler.api.enums.Status.PUBLISH_SCHEDULE_ONLINE_ERROR;
@@ -28,6 +28,7 @@ import static org.apache.dolphinscheduler.api.enums.Status.UPDATE_SCHEDULE_ERROR
 import static org.apache.dolphinscheduler.common.Constants.SESSION_USER;
 
 import org.apache.dolphinscheduler.api.aspect.AccessLogAnnotation;
+import org.apache.dolphinscheduler.api.enums.Status;
 import org.apache.dolphinscheduler.api.exceptions.ApiException;
 import org.apache.dolphinscheduler.api.service.SchedulerService;
 import org.apache.dolphinscheduler.api.utils.Result;
@@ -64,7 +65,7 @@ import io.swagger.annotations.ApiParam;
 /**
  * scheduler controller
  */
-@Api(tags = "SCHEDULER_TAG")
+@Api(tags = "SCHEDULE_TAG")
 @RestController
 @RequestMapping("/projects/{projectCode}/schedules")
 public class SchedulerController extends BaseController {
@@ -260,19 +261,19 @@ public class SchedulerController extends BaseController {
      * @param id scheule id
      * @return delete result code
      */
-    @ApiOperation(value = "deleteScheduleById", notes = "OFFLINE_SCHEDULE_NOTES")
+    @ApiOperation(value = "deleteScheduleById", notes = "DELETE_SCHEDULE_NOTES")
     @ApiImplicitParams({
             @ApiImplicitParam(name = "id", value = "SCHEDULE_ID", required = true, dataTypeClass = int.class, example = "100")
     })
     @DeleteMapping(value = "/{id}")
     @ResponseStatus(HttpStatus.OK)
-    @ApiException(DELETE_SCHEDULE_CRON_BY_ID_ERROR)
+    @ApiException(DELETE_SCHEDULE_BY_ID_ERROR)
     @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
     public Result deleteScheduleById(@RequestAttribute(value = SESSION_USER) User loginUser,
                                      @ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode,
                                      @PathVariable("id") Integer id) {
-        Map<String, Object> result = schedulerService.deleteScheduleById(loginUser, projectCode, id);
-        return returnDataList(result);
+        schedulerService.deleteSchedulesById(loginUser, id);
+        return new Result(Status.SUCCESS);
     }
 
     /**
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkflowV2Controller.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkflowV2Controller.java
new file mode 100644
index 0000000000..1270f01a68
--- /dev/null
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkflowV2Controller.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.api.controller;
+
+import static org.apache.dolphinscheduler.api.enums.Status.CREATE_PROCESS_DEFINITION_ERROR;
+import static org.apache.dolphinscheduler.api.enums.Status.DELETE_PROCESS_DEFINE_BY_CODE_ERROR;
+import static org.apache.dolphinscheduler.api.enums.Status.QUERY_PROCESS_DEFINITION_LIST;
+import static org.apache.dolphinscheduler.api.enums.Status.UPDATE_PROCESS_DEFINITION_ERROR;
+
+import org.apache.dolphinscheduler.api.aspect.AccessLogAnnotation;
+import org.apache.dolphinscheduler.api.dto.workflow.WorkflowCreateRequest;
+import org.apache.dolphinscheduler.api.dto.workflow.WorkflowFilterRequest;
+import org.apache.dolphinscheduler.api.dto.workflow.WorkflowUpdateRequest;
+import org.apache.dolphinscheduler.api.exceptions.ApiException;
+import org.apache.dolphinscheduler.api.service.ProcessDefinitionService;
+import org.apache.dolphinscheduler.api.utils.PageInfo;
+import org.apache.dolphinscheduler.api.utils.Result;
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
+import org.apache.dolphinscheduler.dao.entity.User;
+
+import springfox.documentation.annotations.ApiIgnore;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.HttpStatus;
+import org.springframework.web.bind.annotation.DeleteMapping;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.PutMapping;
+import org.springframework.web.bind.annotation.RequestAttribute;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.ResponseStatus;
+import org.springframework.web.bind.annotation.RestController;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiImplicitParam;
+import io.swagger.annotations.ApiImplicitParams;
+import io.swagger.annotations.ApiOperation;
+
+/**
+ * workflow controller
+ */
+@Api(tags = "WORKFLOW_TAG")
+@RestController
+@RequestMapping("/v2/workflows")
+public class WorkflowV2Controller extends BaseController {
+
+    @Autowired
+    private ProcessDefinitionService processDefinitionService;
+
+    /**
+     * Create resource workflow
+     *
+     * @param loginUser             login user
+     * @param workflowCreateRequest the new workflow object will be created
+     * @return ResourceResponse object created
+     */
+    @ApiOperation(value = "create", notes = "CREATE_WORKFLOWS_NOTES")
+    @PostMapping(consumes = {"application/json"})
+    @ResponseStatus(HttpStatus.CREATED)
+    @ApiException(CREATE_PROCESS_DEFINITION_ERROR)
+    @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
+    public Result<ProcessDefinition> createWorkflow(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
+                                                    @RequestBody WorkflowCreateRequest workflowCreateRequest) {
+        ProcessDefinition processDefinition =
+                processDefinitionService.createSingleProcessDefinition(loginUser, workflowCreateRequest);
+        return Result.success(processDefinition);
+    }
+
+    /**
+     * Delete workflow by code
+     *
+     * @param loginUser login user
+     * @param code      process definition code
+     * @return Result result object delete
+     */
+    @ApiOperation(value = "delete", notes = "DELETE_WORKFLOWS_NOTES")
+    @ApiImplicitParams({
+            @ApiImplicitParam(name = "code", value = "WORKFLOW_CODE", dataTypeClass = long.class, example = "123456", required = true)
+    })
+    @DeleteMapping(value = "/{code}")
+    @ResponseStatus(HttpStatus.OK)
+    @ApiException(DELETE_PROCESS_DEFINE_BY_CODE_ERROR)
+    @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
+    public Result deleteWorkflow(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
+                                 @PathVariable("code") Long code) {
+        processDefinitionService.deleteProcessDefinitionByCode(loginUser, code);
+        return Result.success();
+    }
+
+    /**
+     * Update resource workflow
+     *
+     * @param loginUser        login user
+     * @param code             workflow resource code you want to update
+     * @param workflowUpdateRequest workflowUpdateRequest
+     * @return ResourceResponse object updated
+     */
+    @ApiOperation(value = "update", notes = "UPDATE_WORKFLOWS_NOTES")
+    @PutMapping(value = "/{code}")
+    @ResponseStatus(HttpStatus.OK)
+    @ApiException(UPDATE_PROCESS_DEFINITION_ERROR)
+    @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
+    public Result<ProcessDefinition> updateWorkflow(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
+                                                    @PathVariable("code") Long code,
+                                                    @RequestBody WorkflowUpdateRequest workflowUpdateRequest) {
+        ProcessDefinition processDefinition =
+                processDefinitionService.updateSingleProcessDefinition(loginUser, code, workflowUpdateRequest);
+        return Result.success(processDefinition);
+    }
+
+    /**
+     * Get resource workflow
+     *
+     * @param loginUser        login user
+     * @param code             workflow resource code you want to update
+     * @return ResourceResponse object get from condition
+     */
+    @ApiOperation(value = "get", notes = "GET_WORKFLOWS_NOTES")
+    @GetMapping(value = "/{code}")
+    @ResponseStatus(HttpStatus.OK)
+    @ApiException(QUERY_PROCESS_DEFINITION_LIST)
+    @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
+    public Result<ProcessDefinition> getWorkflow(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
+                                                 @PathVariable("code") Long code) {
+        ProcessDefinition processDefinition = processDefinitionService.getProcessDefinition(loginUser, code);
+        return Result.success(processDefinition);
+    }
+
+    /**
+     * Get resource workflows according to query parameter
+     *
+     * @param loginUser        login user
+     * @param workflowFilterRequest workflowFilterRequest
+     * @return PageResourceResponse from condition
+     */
+    @ApiOperation(value = "get", notes = "FILTER_WORKFLOWS_NOTES")
+    @GetMapping(consumes = {"application/json"})
+    @ResponseStatus(HttpStatus.OK)
+    @ApiException(QUERY_PROCESS_DEFINITION_LIST)
+    @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
+    public Result<PageInfo<ProcessDefinition>> filterWorkflows(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
+                                                               @RequestBody WorkflowFilterRequest workflowFilterRequest) {
+        PageInfo<ProcessDefinition> processDefinitions =
+                processDefinitionService.filterProcessDefinition(loginUser, workflowFilterRequest);
+        return Result.success(processDefinitions);
+    }
+}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/PageQueryDto.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/PageQueryDto.java
index 8afd131e79..d527c9fd5f 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/PageQueryDto.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/PageQueryDto.java
@@ -17,9 +17,9 @@
 
 package org.apache.dolphinscheduler.api.dto;
 
+import lombok.Data;
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
-import lombok.Data;
 
 /**
  * page query dto
@@ -29,8 +29,8 @@ import lombok.Data;
 public class PageQueryDto {
 
     @ApiModelProperty(example = "10", required = true)
-    private Integer pageSize;
+    private Integer pageSize = 10;
 
     @ApiModelProperty(example = "1", required = true)
-    private Integer pageNo;
-}
\ No newline at end of file
+    private Integer pageNo = 1;
+}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/schedule/ScheduleCreateRequest.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/schedule/ScheduleCreateRequest.java
new file mode 100644
index 0000000000..ff42d75845
--- /dev/null
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/schedule/ScheduleCreateRequest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.api.dto.schedule;
+
+import static org.apache.dolphinscheduler.common.utils.DateUtils.stringToDate;
+
+import org.apache.dolphinscheduler.common.enums.FailureStrategy;
+import org.apache.dolphinscheduler.common.enums.Priority;
+import org.apache.dolphinscheduler.common.enums.ReleaseState;
+import org.apache.dolphinscheduler.common.enums.WarningType;
+import org.apache.dolphinscheduler.dao.entity.Schedule;
+
+import java.util.Date;
+
+import lombok.Data;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import io.swagger.annotations.ApiModelProperty;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+
+/**
+ * schedule create request
+ */
+@Data
+public class ScheduleCreateRequest {
+
+    @ApiModelProperty(example = "1234567890123", required = true)
+    private long processDefinitionCode;
+
+    @ApiModelProperty(example = "schedule timezone", required = true)
+    private String crontab;
+
+    @ApiModelProperty(example = "2021-01-01 10:00:00", required = true)
+    private String startTime;
+
+    @ApiModelProperty(example = "2022-01-01 12:00:00", required = true)
+    private String endTime;
+
+    @ApiModelProperty(example = "Asia/Shanghai", required = true)
+    private String timezoneId;
+
+    @ApiModelProperty(allowableValues = "CONTINUE / END", example = "CONTINUE", notes = "default CONTINUE if value not provide.")
+    private String failureStrategy;
+
+    @ApiModelProperty(allowableValues = "ONLINE / OFFLINE", example = "OFFLINE", notes = "default OFFLINE if value not provide.")
+    private String releaseState;
+
+    @ApiModelProperty(allowableValues = "NONE / SUCCESS / FAILURE / ALL", example = "SUCCESS", notes = "default NONE if value not provide.")
+    private String warningType;
+
+    @ApiModelProperty(example = "2", notes = "default 0 if value not provide.")
+    private int warningGroupId;
+
+    @ApiModelProperty(allowableValues = "HIGHEST / HIGH / MEDIUM / LOW / LOWEST", example = "MEDIUM", notes = "default MEDIUM if value not provide.")
+    private String processInstancePriority;
+
+    @ApiModelProperty(example = "worker-group-name")
+    private String workerGroup;
+
+    @ApiModelProperty(example = "environment-code")
+    private long environmentCode;
+
+    public String getScheduleParam() {
+        Gson gson = new GsonBuilder().serializeNulls().create();
+        ScheduleParam scheduleParam = new ScheduleParam(this.startTime, this.endTime, this.crontab, this.timezoneId);
+        return gson.toJson(scheduleParam);
+    }
+
+    public Schedule convert2Schedule() {
+        Schedule schedule = new Schedule();
+
+        schedule.setProcessDefinitionCode(this.processDefinitionCode);
+        schedule.setCrontab(this.crontab);
+        schedule.setStartTime(stringToDate(this.startTime));
+        schedule.setEndTime(stringToDate(this.endTime));
+        schedule.setTimezoneId(this.timezoneId);
+        schedule.setWarningGroupId(this.warningGroupId);
+        schedule.setWorkerGroup(this.workerGroup);
+        schedule.setEnvironmentCode(this.environmentCode);
+
+        FailureStrategy newFailureStrategy =
+                this.failureStrategy == null ? FailureStrategy.CONTINUE : FailureStrategy.valueOf(this.failureStrategy);
+        schedule.setFailureStrategy(newFailureStrategy);
+
+        ReleaseState newReleaseState =
+                this.releaseState == null ? ReleaseState.OFFLINE : ReleaseState.valueOf(this.releaseState);
+        schedule.setReleaseState(newReleaseState);
+
+        WarningType newWarningType =
+                this.warningType == null ? WarningType.NONE : WarningType.valueOf(this.warningType);
+        schedule.setWarningType(newWarningType);
+
+        Priority newPriority =
+                this.processInstancePriority == null ? Priority.MEDIUM : Priority.valueOf(this.processInstancePriority);
+        schedule.setProcessInstancePriority(newPriority);
+
+        Date date = new Date();
+        schedule.setCreateTime(date);
+        schedule.setUpdateTime(date);
+        return schedule;
+    }
+}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/schedule/ScheduleFilterRequest.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/schedule/ScheduleFilterRequest.java
new file mode 100644
index 0000000000..120e773e0f
--- /dev/null
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/schedule/ScheduleFilterRequest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.api.dto.schedule;
+
+import org.apache.dolphinscheduler.api.dto.PageQueryDto;
+import org.apache.dolphinscheduler.common.enums.ReleaseState;
+import org.apache.dolphinscheduler.dao.entity.Schedule;
+
+import lombok.Data;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonInclude;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+
+/**
+ * schedule query request
+ */
+@ApiModel("SCHEDULE-QUERY")
+@JsonIgnoreProperties(ignoreUnknown = true)
+@JsonInclude(JsonInclude.Include.NON_NULL)
+@Data
+public class ScheduleFilterRequest extends PageQueryDto {
+
+    @ApiModelProperty(example = "project-name")
+    private String projectName;
+
+    @ApiModelProperty(example = "process-definition-name")
+    private String processDefinitionName;
+
+    @ApiModelProperty(allowableValues = "ONLINE / OFFLINE", example = "OFFLINE", notes = "default OFFLINE if value not provide.")
+    private String releaseState;
+
+    public Schedule convert2Schedule() {
+        Schedule schedule = new Schedule();
+        if (this.projectName != null) {
+            schedule.setProjectName(this.projectName);
+        }
+        if (this.processDefinitionName != null) {
+            schedule.setProcessDefinitionName(this.processDefinitionName);
+        }
+        if (this.releaseState != null) {
+            schedule.setReleaseState(ReleaseState.valueOf(this.releaseState));
+        }
+        return schedule;
+    }
+}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/PageQueryDto.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/schedule/ScheduleParam.java
similarity index 61%
copy from dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/PageQueryDto.java
copy to dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/schedule/ScheduleParam.java
index 8afd131e79..b50e006965 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/PageQueryDto.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/schedule/ScheduleParam.java
@@ -15,22 +15,24 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.api.dto;
+package org.apache.dolphinscheduler.api.dto.schedule;
 
-import io.swagger.annotations.ApiModel;
-import io.swagger.annotations.ApiModelProperty;
-import lombok.Data;
+import lombok.Getter;
+import lombok.Setter;
 
-/**
- * page query dto
- */
-@ApiModel("QUERY-PAGE-INFO")
-@Data
-public class PageQueryDto {
+@Getter
+@Setter
+public class ScheduleParam {
 
-    @ApiModelProperty(example = "10", required = true)
-    private Integer pageSize;
+    private String startTime;
+    private String endTime;
+    private String crontab;
+    private String timezoneId;
 
-    @ApiModelProperty(example = "1", required = true)
-    private Integer pageNo;
-}
\ No newline at end of file
+    public ScheduleParam(String startTime, String endTime, String crontab, String timezoneId) {
+        this.startTime = startTime;
+        this.endTime = endTime;
+        this.crontab = crontab;
+        this.timezoneId = timezoneId;
+    }
+}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/schedule/ScheduleUpdateRequest.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/schedule/ScheduleUpdateRequest.java
new file mode 100644
index 0000000000..3991af21b7
--- /dev/null
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/schedule/ScheduleUpdateRequest.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.api.dto.schedule;
+
+import static org.apache.dolphinscheduler.common.Constants.YYYY_MM_DD_HH_MM_SS;
+import static org.apache.dolphinscheduler.common.utils.DateUtils.format;
+import static org.apache.dolphinscheduler.common.utils.DateUtils.stringToDate;
+
+import org.apache.dolphinscheduler.common.enums.FailureStrategy;
+import org.apache.dolphinscheduler.common.enums.Priority;
+import org.apache.dolphinscheduler.common.enums.ReleaseState;
+import org.apache.dolphinscheduler.common.enums.WarningType;
+import org.apache.dolphinscheduler.dao.entity.Schedule;
+
+import org.apache.commons.beanutils.BeanUtils;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.Date;
+
+import lombok.Data;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import io.swagger.annotations.ApiModelProperty;
+
+/**
+ * schedule update request
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+@JsonInclude(JsonInclude.Include.NON_NULL)
+@Data
+public class ScheduleUpdateRequest {
+
+    @ApiModelProperty(example = "schedule timezone", required = true)
+    private String crontab;
+
+    @ApiModelProperty(example = "2021-01-01 10:00:00", required = true)
+    private String startTime;
+
+    @ApiModelProperty(example = "2022-01-01 12:00:00", required = true)
+    private String endTime;
+
+    @ApiModelProperty(example = "Asia/Shanghai", required = true)
+    private String timezoneId;
+
+    @ApiModelProperty(allowableValues = "CONTINUE / END", example = "CONTINUE", notes = "default CONTINUE if value not provide.")
+    private String failureStrategy;
+
+    @ApiModelProperty(allowableValues = "ONLINE / OFFLINE", example = "OFFLINE", notes = "default OFFLINE if value not provide.")
+    private String releaseState;
+
+    @ApiModelProperty(allowableValues = "NONE / SUCCESS / FAILURE / ALL", example = "SUCCESS", notes = "default NONE if value not provide.")
+    private String warningType;
+
+    @ApiModelProperty(example = "2", notes = "default 0 if value not provide.")
+    private int warningGroupId;
+
+    @ApiModelProperty(allowableValues = "HIGHEST / HIGH / MEDIUM / LOW / LOWEST", example = "MEDIUM", notes = "default MEDIUM if value not provide.")
+    private String processInstancePriority;
+
+    @ApiModelProperty(example = "worker-group-name")
+    private String workerGroup;
+
+    @ApiModelProperty(example = "environment-code")
+    private long environmentCode;
+
+    public String updateScheduleParam(Schedule schedule) throws InvocationTargetException, IllegalAccessException, InstantiationException, NoSuchMethodException {
+        Schedule scheduleUpdate = this.mergeIntoSchedule(schedule);
+
+        String startTimeUpdate = scheduleUpdate.getStartTime() == null ? null
+                : format(scheduleUpdate.getStartTime(), YYYY_MM_DD_HH_MM_SS, schedule.getTimezoneId());
+        String endTimeUpdate = scheduleUpdate.getEndTime() == null ? null
+                : format(scheduleUpdate.getEndTime(), YYYY_MM_DD_HH_MM_SS, schedule.getTimezoneId());
+        ScheduleParam scheduleParam = new ScheduleParam(startTimeUpdate, endTimeUpdate, scheduleUpdate.getCrontab(),
+                scheduleUpdate.getTimezoneId());
+
+        Gson gson = new GsonBuilder().serializeNulls().create();
+        return gson.toJson(scheduleParam);
+    }
+
+    public Schedule mergeIntoSchedule(Schedule schedule) throws InvocationTargetException, IllegalAccessException, InstantiationException, NoSuchMethodException {
+        Schedule scheduleDeepCopy = (Schedule) BeanUtils.cloneBean(schedule);;
+        assert scheduleDeepCopy != null;
+        if (this.crontab != null) {
+            scheduleDeepCopy.setCrontab(this.crontab);
+        }
+        if (this.startTime != null) {
+            scheduleDeepCopy.setStartTime(stringToDate(this.startTime));
+        }
+        if (this.endTime != null) {
+            scheduleDeepCopy.setEndTime(stringToDate(this.endTime));
+        }
+        if (this.timezoneId != null) {
+            scheduleDeepCopy.setTimezoneId(this.timezoneId);
+        }
+        if (this.failureStrategy != null) {
+            scheduleDeepCopy.setFailureStrategy(FailureStrategy.valueOf(this.failureStrategy));
+        }
+        if (this.releaseState != null) {
+            scheduleDeepCopy.setReleaseState(ReleaseState.valueOf(this.releaseState));
+        }
+        if (this.warningType != null) {
+            scheduleDeepCopy.setWarningType(WarningType.valueOf(this.warningType));
+        }
+        if (this.warningGroupId != 0) {
+            scheduleDeepCopy.setWarningGroupId(this.warningGroupId);
+        }
+        if (this.processInstancePriority != null) {
+            scheduleDeepCopy.setProcessInstancePriority(Priority.valueOf(this.processInstancePriority));
+        }
+        if (this.workerGroup != null) {
+            scheduleDeepCopy.setWorkerGroup(this.workerGroup);
+        }
+        if (this.environmentCode != 0L) {
+            scheduleDeepCopy.setEnvironmentCode(this.environmentCode);
+        }
+
+        scheduleDeepCopy.setUpdateTime(new Date());
+        return scheduleDeepCopy;
+    }
+}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/workflow/WorkflowCreateRequest.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/workflow/WorkflowCreateRequest.java
new file mode 100644
index 0000000000..e6d78a176b
--- /dev/null
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/workflow/WorkflowCreateRequest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.api.dto.workflow;
+
+import static org.apache.dolphinscheduler.common.Constants.VERSION_FIRST;
+
+import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum;
+import org.apache.dolphinscheduler.common.enums.ReleaseState;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
+
+import java.util.Date;
+
+import lombok.Data;
+import io.swagger.annotations.ApiModelProperty;
+
+/**
+ * workflow create request
+ */
+@Data
+public class WorkflowCreateRequest {
+
+    @ApiModelProperty(example = "workflow name", required = true)
+    private String name;
+
+    @ApiModelProperty(example = "workflow's description")
+    private String description;
+
+    @ApiModelProperty(example = "12345", required = true)
+    private long projectCode;
+
+    @ApiModelProperty(allowableValues = "ONLINE / OFFLINE", example = "OFFLINE", notes = "default OFFLINE if not provide.")
+    private String releaseState;
+
+    @ApiModelProperty(example = "[{\"prop\":\"key\",\"value\":\"value\",\"direct\":\"IN\",\"type\":\"VARCHAR\"}]")
+    private String globalParams;
+
+    @ApiModelProperty(example = "2")
+    private int warningGroupId;
+
+    @ApiModelProperty(example = "60")
+    private int timeout;
+
+    @ApiModelProperty(example = "tenant1", required = true)
+    private String tenantCode;
+
+    @ApiModelProperty(allowableValues = "PARALLEL / SERIAL_WAIT / SERIAL_DISCARD / SERIAL_PRIORITY", example = "PARALLEL", notes = "default PARALLEL if not provide.")
+    private String executionType;
+
+    public ProcessDefinition convert2ProcessDefinition() {
+        ProcessDefinition processDefinition = new ProcessDefinition();
+
+        processDefinition.setName(this.name);
+        processDefinition.setDescription(this.description);
+        processDefinition.setProjectCode(this.projectCode);
+        processDefinition.setGlobalParams(this.globalParams);
+        processDefinition.setWarningGroupId(this.warningGroupId);
+        processDefinition.setTimeout(this.timeout);
+        processDefinition.setTenantCode(this.tenantCode);
+
+        ReleaseState pdReleaseState =
+                this.releaseState == null ? ReleaseState.OFFLINE : ReleaseState.valueOf(this.releaseState);
+        processDefinition.setReleaseState(pdReleaseState);
+        ProcessExecutionTypeEnum processExecutionTypeEnum =
+                this.executionType == null ? ProcessExecutionTypeEnum.PARALLEL
+                        : ProcessExecutionTypeEnum.valueOf(this.executionType);
+        processDefinition.setExecutionType(processExecutionTypeEnum);
+
+        processDefinition.setVersion(VERSION_FIRST);
+        Date date = new Date();
+        processDefinition.setCreateTime(date);
+        processDefinition.setUpdateTime(date);
+        return processDefinition;
+    }
+}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/workflow/WorkflowFilterRequest.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/workflow/WorkflowFilterRequest.java
new file mode 100644
index 0000000000..5a5681754c
--- /dev/null
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/workflow/WorkflowFilterRequest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.api.dto.workflow;
+
+import org.apache.dolphinscheduler.api.dto.PageQueryDto;
+import org.apache.dolphinscheduler.common.enums.ReleaseState;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
+
+import lombok.Data;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonInclude;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+
+/**
+ * workflow query response
+ */
+@ApiModel("WORKFLOW-QUERY")
+@JsonIgnoreProperties(ignoreUnknown = true)
+@JsonInclude(JsonInclude.Include.NON_NULL)
+@Data
+public class WorkflowFilterRequest extends PageQueryDto {
+
+    @ApiModelProperty(example = "project-name")
+    private String projectName;
+
+    @ApiModelProperty(example = "workflow-name")
+    private String workflowName;
+
+    @ApiModelProperty(example = "ONLINE / OFFLINE")
+    private String releaseState;
+
+    @ApiModelProperty(example = "ONLINE / OFFLINE")
+    private String scheduleReleaseState;
+
+    public ProcessDefinition convert2ProcessDefinition() {
+        ProcessDefinition processDefinition = new ProcessDefinition();
+        if (this.workflowName != null) {
+            processDefinition.setName(this.workflowName);
+        }
+        if (this.releaseState != null) {
+            processDefinition.setReleaseState(ReleaseState.valueOf(this.releaseState));
+        }
+        return processDefinition;
+    }
+}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/workflow/WorkflowUpdateRequest.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/workflow/WorkflowUpdateRequest.java
new file mode 100644
index 0000000000..44aa86df67
--- /dev/null
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/workflow/WorkflowUpdateRequest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.api.dto.workflow;
+
+import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum;
+import org.apache.dolphinscheduler.common.enums.ReleaseState;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
+
+import java.util.Date;
+
+import lombok.Data;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import io.swagger.annotations.ApiModelProperty;
+
+/**
+ * workflow update request
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+@JsonInclude(JsonInclude.Include.NON_NULL)
+@Data
+public class WorkflowUpdateRequest {
+
+    @ApiModelProperty(example = "workflow's name")
+    private String name;
+
+    @ApiModelProperty(example = "workflow's description")
+    private String description;
+
+    @ApiModelProperty(allowableValues = "ONLINE / OFFLINE", example = "OFFLINE")
+    private String releaseState;
+
+    @ApiModelProperty(example = "[{\"prop\":\"key\",\"value\":\"value\",\"direct\":\"IN\",\"type\":\"VARCHAR\"}]")
+    private String globalParams;
+
+    @ApiModelProperty(example = "2")
+    private int warningGroupId;
+
+    @ApiModelProperty(example = "60")
+    private int timeout;
+
+    @ApiModelProperty(example = "tenantCode1")
+    private String tenantCode;
+
+    @ApiModelProperty(allowableValues = "PARALLEL / SERIAL_WAIT / SERIAL_DISCARD / SERIAL_PRIORITY", example = "PARALLEL", notes = "default PARALLEL if not provide.")
+    private String executionType;
+
+    public ProcessDefinition mergeIntoProcessDefinition(ProcessDefinition processDefinition) {
+        ProcessDefinition processDefinitionDeepCopy =
+                JSONUtils.parseObject(JSONUtils.toJsonString(processDefinition), ProcessDefinition.class);
+        assert processDefinitionDeepCopy != null;
+        if (this.name != null) {
+            processDefinitionDeepCopy.setName(this.name);
+        }
+        if (this.description != null) {
+            processDefinitionDeepCopy.setDescription(this.description);
+        }
+        if (this.releaseState != null) {
+            processDefinitionDeepCopy.setReleaseState(ReleaseState.valueOf(this.releaseState));
+        }
+        if (this.globalParams != null) {
+            processDefinitionDeepCopy.setGlobalParams(this.globalParams);
+        }
+        if (this.warningGroupId != 0) {
+            processDefinitionDeepCopy.setWarningGroupId(this.warningGroupId);
+        }
+        if (this.timeout != 0) {
+            processDefinitionDeepCopy.setTimeout(this.timeout);
+        }
+        if (this.tenantCode != null) {
+            processDefinitionDeepCopy.setTenantCode(this.tenantCode);
+        }
+        if (this.executionType != null) {
+            processDefinitionDeepCopy.setExecutionType(ProcessExecutionTypeEnum.valueOf(this.executionType));
+        }
+
+        int version = processDefinitionDeepCopy.getVersion() + 1;
+        processDefinitionDeepCopy.setVersion(version);
+        processDefinitionDeepCopy.setUpdateTime(new Date());
+        return processDefinitionDeepCopy;
+    }
+}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
index a34ca157d9..231a9b5d08 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
@@ -17,11 +17,11 @@
 
 package org.apache.dolphinscheduler.api.enums;
 
-import org.springframework.context.i18n.LocaleContextHolder;
-
 import java.util.Locale;
 import java.util.Optional;
 
+import org.springframework.context.i18n.LocaleContextHolder;
+
 /**
  * status enum      // todo #4855 One category one interval
  */
@@ -140,16 +140,20 @@ public enum Status {
     QUERY_DETAIL_OF_PROCESS_DEFINITION_ERROR(10109, "query detail of process definition error", "查询工作流详细信息错误"),
     QUERY_PROCESS_DEFINITION_LIST(10110, "query process definition list", "查询工作流列表错误"),
     ENCAPSULATION_TREEVIEW_STRUCTURE_ERROR(10111, "encapsulation treeview structure error", "查询工作流树形图数据错误"),
-    GET_TASKS_LIST_BY_PROCESS_DEFINITION_ID_ERROR(10112, "get tasks list by process definition id error", "查询工作流定义节点信息错误"),
+    GET_TASKS_LIST_BY_PROCESS_DEFINITION_ID_ERROR(10112, "get tasks list by process definition id error",
+            "查询工作流定义节点信息错误"),
     QUERY_PROCESS_INSTANCE_LIST_PAGING_ERROR(10113, "query process instance list paging error", "分页查询工作流实例列表错误"),
     QUERY_TASK_LIST_BY_PROCESS_INSTANCE_ID_ERROR(10114, "query task list by process instance id error", "查询任务实例列表错误"),
     UPDATE_PROCESS_INSTANCE_ERROR(10115, "update process instance error", "更新工作流实例错误"),
     QUERY_PROCESS_INSTANCE_BY_ID_ERROR(10116, "query process instance by id error", "查询工作流实例错误"),
     DELETE_PROCESS_INSTANCE_BY_ID_ERROR(10117, "delete process instance by id error", "删除工作流实例错误"),
-    QUERY_SUB_PROCESS_INSTANCE_DETAIL_INFO_BY_TASK_ID_ERROR(10118, "query sub process instance detail info by task id error", "查询子流程任务实例错误"),
-    QUERY_PARENT_PROCESS_INSTANCE_DETAIL_INFO_BY_SUB_PROCESS_INSTANCE_ID_ERROR(10119, "query parent process instance detail info by sub process instance id error", "查询子流程该工作流实例错误"),
+    QUERY_SUB_PROCESS_INSTANCE_DETAIL_INFO_BY_TASK_ID_ERROR(10118,
+            "query sub process instance detail info by task id error", "查询子流程任务实例错误"),
+    QUERY_PARENT_PROCESS_INSTANCE_DETAIL_INFO_BY_SUB_PROCESS_INSTANCE_ID_ERROR(10119,
+            "query parent process instance detail info by sub process instance id error", "查询子流程该工作流实例错误"),
     QUERY_PROCESS_INSTANCE_ALL_VARIABLES_ERROR(10120, "query process instance all variables error", "查询工作流自定义变量信息错误"),
-    ENCAPSULATION_PROCESS_INSTANCE_GANTT_STRUCTURE_ERROR(10121, "encapsulation process instance gantt structure error", "查询工作流实例甘特图数据错误"),
+    ENCAPSULATION_PROCESS_INSTANCE_GANTT_STRUCTURE_ERROR(10121, "encapsulation process instance gantt structure error",
+            "查询工作流实例甘特图数据错误"),
     QUERY_PROCESS_DEFINITION_LIST_PAGING_ERROR(10122, "query process definition list paging error", "分页查询工作流定义列表错误"),
     SIGN_OUT_ERROR(10123, "sign out error", "退出错误"),
     OS_TENANT_CODE_HAS_ALREADY_EXISTS(10124, "os tenant code has already exists", "操作系统租户已存在"),
@@ -165,25 +169,37 @@ public enum Status {
     NAME_NULL(10134, "name must be not null", "名称不能为空"),
     NAME_EXIST(10135, "name {0} already exists", "名称[{0}]已存在"),
     SAVE_ERROR(10136, "save error", "保存错误"),
-    DELETE_PROJECT_ERROR_DEFINES_NOT_NULL(10137, "please delete the process definitions in project first!", "请先删除全部工作流定义"),
-    BATCH_DELETE_PROCESS_INSTANCE_BY_IDS_ERROR(10117, "batch delete process instance by ids {0} error", "批量删除工作流实例错误: {0}"),
+    DELETE_PROJECT_ERROR_DEFINES_NOT_NULL(10137, "please delete the process definitions in project first!",
+            "请先删除全部工作流定义"),
+    BATCH_DELETE_PROCESS_INSTANCE_BY_IDS_ERROR(10117, "batch delete process instance by ids {0} error",
+            "批量删除工作流实例错误: {0}"),
     PREVIEW_SCHEDULE_ERROR(10139, "preview schedule error", "预览调度配置错误"),
     PARSE_TO_CRON_EXPRESSION_ERROR(10140, "parse cron to cron expression error", "解析调度表达式错误"),
     SCHEDULE_START_TIME_END_TIME_SAME(10141, "The start time must not be the same as the end", "开始时间不能和结束时间一样"),
-    DELETE_TENANT_BY_ID_FAIL(10142, "delete tenant by id fail, for there are {0} process instances in executing using it", "删除租户失败,有[{0}]个运行中的工作流实例正在使用"),
-    DELETE_TENANT_BY_ID_FAIL_DEFINES(10143, "delete tenant by id fail, for there are {0} process definitions using it", "删除租户失败,有[{0}]个工作流定义正在使用"),
-    DELETE_TENANT_BY_ID_FAIL_USERS(10144, "delete tenant by id fail, for there are {0} users using it", "删除租户失败,有[{0}]个用户正在使用"),
-    DELETE_WORKER_GROUP_BY_ID_FAIL(10145, "delete worker group by id fail, for there are {0} process instances in executing using it", "删除Worker分组失败,有[{0}]个运行中的工作流实例正在使用"),
+    DELETE_TENANT_BY_ID_FAIL(10142,
+            "delete tenant by id fail, for there are {0} process instances in executing using it",
+            "删除租户失败,有[{0}]个运行中的工作流实例正在使用"),
+    DELETE_TENANT_BY_ID_FAIL_DEFINES(10143, "delete tenant by id fail, for there are {0} process definitions using it",
+            "删除租户失败,有[{0}]个工作流定义正在使用"),
+    DELETE_TENANT_BY_ID_FAIL_USERS(10144, "delete tenant by id fail, for there are {0} users using it",
+            "删除租户失败,有[{0}]个用户正在使用"),
+    DELETE_WORKER_GROUP_BY_ID_FAIL(10145,
+            "delete worker group by id fail, for there are {0} process instances in executing using it",
+            "删除Worker分组失败,有[{0}]个运行中的工作流实例正在使用"),
     QUERY_WORKER_GROUP_FAIL(10146, "query worker group fail ", "查询worker分组失败"),
     DELETE_WORKER_GROUP_FAIL(10147, "delete worker group fail ", "删除worker分组失败"),
     USER_DISABLED(10148, "The current user is disabled", "当前用户已停用"),
-    COPY_PROCESS_DEFINITION_ERROR(10149, "copy process definition from {0} to {1} error : {2}", "从{0}复制工作流到{1}错误 : {2}"),
-    MOVE_PROCESS_DEFINITION_ERROR(10150, "move process definition from {0} to {1} error : {2}", "从{0}移动工作流到{1}错误 : {2}"),
+    COPY_PROCESS_DEFINITION_ERROR(10149, "copy process definition from {0} to {1} error : {2}",
+            "从{0}复制工作流到{1}错误 : {2}"),
+    MOVE_PROCESS_DEFINITION_ERROR(10150, "move process definition from {0} to {1} error : {2}",
+            "从{0}移动工作流到{1}错误 : {2}"),
     SWITCH_PROCESS_DEFINITION_VERSION_ERROR(10151, "Switch process definition version error", "切换工作流版本出错"),
-    SWITCH_PROCESS_DEFINITION_VERSION_NOT_EXIST_PROCESS_DEFINITION_ERROR(10152
-            , "Switch process definition version error: not exists process definition, [process definition id {0}]", "切换工作流版本出错:工作流不存在,[工作流id {0}]"),
-    SWITCH_PROCESS_DEFINITION_VERSION_NOT_EXIST_PROCESS_DEFINITION_VERSION_ERROR(10153
-            , "Switch process definition version error: not exists process definition version, [process definition id {0}] [version number {1}]", "切换工作流版本出错:工作流版本信息不存在,[工作流id {0}] [版本号 {1}]"),
+    SWITCH_PROCESS_DEFINITION_VERSION_NOT_EXIST_PROCESS_DEFINITION_ERROR(10152,
+            "Switch process definition version error: not exists process definition, [process definition id {0}]",
+            "切换工作流版本出错:工作流不存在,[工作流id {0}]"),
+    SWITCH_PROCESS_DEFINITION_VERSION_NOT_EXIST_PROCESS_DEFINITION_VERSION_ERROR(10153,
+            "Switch process definition version error: not exists process definition version, [process definition id {0}] [version number {1}]",
+            "切换工作流版本出错:工作流版本信息不存在,[工作流id {0}] [版本号 {1}]"),
     QUERY_PROCESS_DEFINITION_VERSIONS_ERROR(10154, "query process definition versions error", "查询工作流历史版本信息出错"),
     DELETE_PROCESS_DEFINITION_VERSION_ERROR(10156, "delete process definition version error", "删除工作流历史版本出错"),
 
@@ -192,11 +208,17 @@ public enum Status {
     BATCH_COPY_PROCESS_DEFINITION_ERROR(10159, "batch copy process definition error", "复制工作流错误"),
     BATCH_MOVE_PROCESS_DEFINITION_ERROR(10160, "batch move process definition error", "移动工作流错误"),
     QUERY_WORKFLOW_LINEAGE_ERROR(10161, "query workflow lineage error", "查询血缘失败"),
-    QUERY_AUTHORIZED_AND_USER_CREATED_PROJECT_ERROR(10162, "query authorized and user created project error error", "查询授权的和用户创建的项目错误"),
-    DELETE_PROCESS_DEFINITION_EXECUTING_FAIL(10163, "delete process definition by code fail, for there are {0} process instances in executing using it", "删除工作流定义失败,有[{0}]个运行中的工作流实例正在使用"),
-    CHECK_OS_TENANT_CODE_ERROR(10164, "Tenant code invalid, should follow linux's users naming conventions", "非法的租户名,需要遵守 Linux 用户命名规范"),
+    QUERY_AUTHORIZED_AND_USER_CREATED_PROJECT_ERROR(10162, "query authorized and user created project error error",
+            "查询授权的和用户创建的项目错误"),
+    DELETE_PROCESS_DEFINITION_EXECUTING_FAIL(10163,
+            "delete process definition by code fail, for there are {0} process instances in executing using it",
+            "删除工作流定义失败,有[{0}]个运行中的工作流实例正在使用"),
+    CHECK_OS_TENANT_CODE_ERROR(10164, "Tenant code invalid, should follow linux's users naming conventions",
+            "非法的租户名,需要遵守 Linux 用户命名规范"),
     FORCE_TASK_SUCCESS_ERROR(10165, "force task success error", "强制成功任务实例错误"),
-    TASK_INSTANCE_STATE_OPERATION_ERROR(10166, "the status of task instance {0} is {1},Cannot perform force success operation", "任务实例[{0}]的状态是[{1}],无法执行强制成功操作"),
+    TASK_INSTANCE_STATE_OPERATION_ERROR(10166,
+            "the status of task instance {0} is {1},Cannot perform force success operation",
+            "任务实例[{0}]的状态是[{1}],无法执行强制成功操作"),
     DATASOURCE_TYPE_NOT_EXIST(10167, "data source type not exist", "数据源类型不存在"),
     PROCESS_DEFINITION_NAME_EXIST(10168, "process definition name {0} already exists", "工作流定义名称[{0}]已存在"),
     DATASOURCE_DB_TYPE_ILLEGAL(10169, "datasource type illegal", "数据源类型参数不合法"),
@@ -211,20 +233,28 @@ public enum Status {
     QUERY_WORKER_ADDRESS_LIST_FAIL(10178, "query worker address list fail ", "查询worker地址列表失败"),
     TRANSFORM_PROJECT_OWNERSHIP(10179, "Please transform project ownership [{0}]", "请先转移项目所有权[{0}]"),
     QUERY_ALERT_GROUP_ERROR(10180, "query alert group error", "查询告警组错误"),
-    CURRENT_LOGIN_USER_TENANT_NOT_EXIST(10181, "the tenant of the currently login user is not specified", "未指定当前登录用户的租户"),
+    CURRENT_LOGIN_USER_TENANT_NOT_EXIST(10181, "the tenant of the currently login user is not specified",
+            "未指定当前登录用户的租户"),
     REVOKE_PROJECT_ERROR(10182, "revoke project error", "撤销项目授权错误"),
     QUERY_AUTHORIZED_USER(10183, "query authorized user error", "查询拥有项目权限的用户错误"),
     PROJECT_NOT_EXIST(10190, "This project was not found. Please refresh page.", "该项目不存在,请刷新页面"),
     TASK_INSTANCE_HOST_IS_NULL(10191, "task instance host is null", "任务实例host为空"),
     QUERY_EXECUTING_WORKFLOW_ERROR(10192, "query executing workflow error", "查询运行的工作流实例错误"),
-    DELETE_PROCESS_DEFINITION_USE_BY_OTHER_FAIL(10193, "delete process definition fail, cause used by other tasks: {0}", "删除工作流定时失败,被其他任务引用:{0}"),
-    DELETE_TASK_USE_BY_OTHER_FAIL(10194, "delete task {0} fail, cause used by other tasks: {1}", "删除任务 {0} 失败,被其他任务引用:{1}"),
+    DELETE_PROCESS_DEFINITION_USE_BY_OTHER_FAIL(10193, "delete process definition fail, cause used by other tasks: {0}",
+            "删除工作流定时失败,被其他任务引用:{0}"),
+    DELETE_TASK_USE_BY_OTHER_FAIL(10194, "delete task {0} fail, cause used by other tasks: {1}",
+            "删除任务 {0} 失败,被其他任务引用:{1}"),
     TASK_WITH_DEPENDENT_ERROR(10195, "task used in other tasks", "删除被其他任务引用"),
     TASK_SAVEPOINT_ERROR(10196, "task savepoint error", "任务实例savepoint错误"),
     TASK_STOP_ERROR(10197, "task stop error", "任务实例停止错误"),
     LIST_TASK_TYPE_ERROR(10200, "list task type error", "查询任务类型列表错误"),
     DELETE_TASK_TYPE_ERROR(10200, "delete task type error", "删除任务类型错误"),
     ADD_TASK_TYPE_ERROR(10200, "add task type error", "添加任务类型错误"),
+    CREATE_PROCESS_DEFINITION_LOG_ERROR(10201, "Create process definition log error", "创建 process definition log 对象失败"),
+    PARSE_SCHEDULE_PARAM_ERROR(10202, "Parse schedule parameter error, {0}", "解析 schedule 参数错误, {0}"),
+    SCHEDULE_NOT_EXISTS(10023, "schedule {0} does not exist", "调度 id {0} 不存在"),
+    SCHEDULE_ALREADY_EXISTS(10024, "workflow {0} schedule {1} already exist, please update or delete it",
+            "工作流 {0} 的定时 {1} 已经存在,请更新或删除"),
 
     UDF_FUNCTION_NOT_EXIST(20001, "UDF function not found", "UDF函数不存在"),
     UDF_FUNCTION_EXISTS(20002, "UDF function already exists", "UDF函数已存在"),
@@ -235,30 +265,39 @@ public enum Status {
     RESOURCE_SUFFIX_FORBID_CHANGE(20008, "resource suffix not allowed to be modified", "资源文件后缀不支持修改"),
     UDF_RESOURCE_SUFFIX_NOT_JAR(20009, "UDF resource suffix name must be jar", "UDF资源文件后缀名只支持[jar]"),
     HDFS_COPY_FAIL(20010, "hdfs copy {0} -> {1} fail", "hdfs复制失败:[{0}] -> [{1}]"),
-    RESOURCE_FILE_EXIST(20011, "resource file {0} already exists in hdfs,please delete it or change name!", "资源文件[{0}]在hdfs中已存在,请删除或修改资源名"),
+    RESOURCE_FILE_EXIST(20011, "resource file {0} already exists in hdfs,please delete it or change name!",
+            "资源文件[{0}]在hdfs中已存在,请删除或修改资源名"),
     RESOURCE_FILE_NOT_EXIST(20012, "resource file {0} not exists !", "资源文件[{0}]不存在"),
     UDF_RESOURCE_IS_BOUND(20013, "udf resource file is bound by UDF functions:{0}", "udf函数绑定了资源文件[{0}]"),
     RESOURCE_IS_USED(20014, "resource file is used by process definition", "资源文件被上线的流程定义使用了"),
     PARENT_RESOURCE_NOT_EXIST(20015, "parent resource not exist", "父资源文件不存在"),
-    RESOURCE_NOT_EXIST_OR_NO_PERMISSION(20016, "resource not exist or no permission,please view the task node and remove error resource", "请检查任务节点并移除无权限或者已删除的资源"),
-    RESOURCE_IS_AUTHORIZED(20017, "resource is authorized to user {0},suffix not allowed to be modified", "资源文件已授权其他用户[{0}],后缀不允许修改"),
+    RESOURCE_NOT_EXIST_OR_NO_PERMISSION(20016,
+            "resource not exist or no permission,please view the task node and remove error resource",
+            "请检查任务节点并移除无权限或者已删除的资源"),
+    RESOURCE_IS_AUTHORIZED(20017, "resource is authorized to user {0},suffix not allowed to be modified",
+            "资源文件已授权其他用户[{0}],后缀不允许修改"),
     RESOURCE_HAS_FOLDER(20018, "There are files or folders in the current directory:{0}", "当前目录下有文件或文件夹[{0}]"),
 
     USER_NO_OPERATION_PERM(30001, "user has no operation privilege", "当前用户没有操作权限"),
     USER_NO_OPERATION_PROJECT_PERM(30002, "user {0} is not has project {1} permission", "当前用户[{0}]没有[{1}]项目的操作权限"),
 
-
     PROCESS_INSTANCE_NOT_EXIST(50001, "process instance {0} does not exist", "工作流实例[{0}]不存在"),
     PROCESS_INSTANCE_EXIST(50002, "process instance {0} already exists", "工作流实例[{0}]已存在"),
     PROCESS_DEFINE_NOT_EXIST(50003, "process definition {0} does not exist", "工作流定义[{0}]不存在"),
-    PROCESS_DEFINE_NOT_RELEASE(50004, "process definition {0} process version {1} not online", "工作流定义[{0}] 工作流版本[{1}]不是上线状态"),
+    PROCESS_DEFINE_NOT_RELEASE(50004, "process definition {0} process version {1} not online",
+            "工作流定义[{0}] 工作流版本[{1}]不是上线状态"),
     SUB_PROCESS_DEFINE_NOT_RELEASE(50004, "exist sub process definition not online", "存在子工作流定义不是上线状态"),
-    PROCESS_INSTANCE_ALREADY_CHANGED(50005, "the status of process instance {0} is already {1}", "工作流实例[{0}]的状态已经是[{1}]"),
-    PROCESS_INSTANCE_STATE_OPERATION_ERROR(50006, "the status of process instance {0} is {1},Cannot perform {2} operation", "工作流实例[{0}]的状态是[{1}],无法执行[{2}]操作"),
+    PROCESS_INSTANCE_ALREADY_CHANGED(50005, "the status of process instance {0} is already {1}",
+            "工作流实例[{0}]的状态已经是[{1}]"),
+    PROCESS_INSTANCE_STATE_OPERATION_ERROR(50006,
+            "the status of process instance {0} is {1},Cannot perform {2} operation",
+            "工作流实例[{0}]的状态是[{1}],无法执行[{2}]操作"),
     SUB_PROCESS_INSTANCE_NOT_EXIST(50007, "the task belong to process instance does not exist", "子工作流实例不存在"),
     PROCESS_DEFINE_NOT_ALLOWED_EDIT(50008, "process definition {0} does not allow edit", "工作流定义[{0}]不允许修改"),
-    PROCESS_INSTANCE_EXECUTING_COMMAND(50009, "process instance {0} is executing the command, please wait ...", "工作流实例[{0}]正在执行命令,请稍等..."),
-    PROCESS_INSTANCE_NOT_SUB_PROCESS_INSTANCE(50010, "process instance {0} is not sub process instance", "工作流实例[{0}]不是子工作流实例"),
+    PROCESS_INSTANCE_EXECUTING_COMMAND(50009, "process instance {0} is executing the command, please wait ...",
+            "工作流实例[{0}]正在执行命令,请稍等..."),
+    PROCESS_INSTANCE_NOT_SUB_PROCESS_INSTANCE(50010, "process instance {0} is not sub process instance",
+            "工作流实例[{0}]不是子工作流实例"),
     TASK_INSTANCE_STATE_COUNT_ERROR(50011, "task instance state count error", "查询各状态任务实例数错误"),
     COUNT_PROCESS_INSTANCE_STATE_ERROR(50012, "count process instance state error", "查询各状态流程实例数错误"),
     COUNT_PROCESS_DEFINITION_USER_ERROR(50013, "count process definition user error", "查询各用户流程定义数错误"),
@@ -267,26 +306,30 @@ public enum Status {
     PROCESS_INSTANCE_ERROR(50014, "process instance delete error: {0}", "工作流实例删除[{0}]错误"),
     EXECUTE_PROCESS_INSTANCE_ERROR(50015, "execute process instance error", "操作工作流实例错误"),
     CHECK_PROCESS_DEFINITION_ERROR(50016, "check process definition error", "工作流定义错误"),
-    QUERY_RECIPIENTS_AND_COPYERS_BY_PROCESS_DEFINITION_ERROR(50017, "query recipients and copyers by process definition error", "查询收件人和抄送人错误"),
+    QUERY_RECIPIENTS_AND_COPYERS_BY_PROCESS_DEFINITION_ERROR(50017,
+            "query recipients and copyers by process definition error", "查询收件人和抄送人错误"),
     DATA_IS_NOT_VALID(50017, "data {0} not valid", "数据[{0}]无效"),
     DATA_IS_NULL(50018, "data {0} is null", "数据[{0}]不能为空"),
     PROCESS_NODE_HAS_CYCLE(50019, "process node has cycle", "流程节点间存在循环依赖"),
     PROCESS_NODE_S_PARAMETER_INVALID(50020, "process node {0} parameter invalid", "流程节点[{0}]参数无效"),
     PROCESS_DEFINE_STATE_ONLINE(50021, "process definition [{0}] is already online", "工作流定义[{0}]已上线"),
     DELETE_PROCESS_DEFINE_BY_CODE_ERROR(50022, "delete process definition by code error", "删除工作流定义错误"),
-    SCHEDULE_CRON_STATE_ONLINE(50023, "the status of schedule {0} is already online", "调度配置[{0}]已上线"),
-    DELETE_SCHEDULE_CRON_BY_ID_ERROR(50024, "delete schedule by id error", "删除调度配置错误"),
+    SCHEDULE_STATE_ONLINE(50023, "the status of schedule {0} is already online", "调度配置[{0}]已上线"),
+    DELETE_SCHEDULE_BY_ID_ERROR(50024, "delete schedule by id error", "删除调度配置错误"),
     BATCH_DELETE_PROCESS_DEFINE_ERROR(50025, "batch delete process definition error", "批量删除工作流定义错误"),
-    BATCH_DELETE_PROCESS_DEFINE_BY_CODES_ERROR(50026, "batch delete process definition by codes {0} error", "批量删除工作流定义[{0}]错误"),
+    BATCH_DELETE_PROCESS_DEFINE_BY_CODES_ERROR(50026, "batch delete process definition by codes {0} error",
+            "批量删除工作流定义[{0}]错误"),
     DELETE_PROCESS_DEFINE_BY_CODES_ERROR(50026, "delete process definition by codes {0} error", "删除工作流定义[{0}]错误"),
-    TENANT_NOT_SUITABLE(50027, "there is not any tenant suitable, please choose a tenant available.", "没有合适的租户,请选择可用的租户"),
+    TENANT_NOT_SUITABLE(50027, "there is not any tenant suitable, please choose a tenant available.",
+            "没有合适的租户,请选择可用的租户"),
     EXPORT_PROCESS_DEFINE_BY_ID_ERROR(50028, "export process definition by id error", "导出工作流定义错误"),
     BATCH_EXPORT_PROCESS_DEFINE_BY_IDS_ERROR(50028, "batch export process definition by ids error", "批量导出工作流定义错误"),
     IMPORT_PROCESS_DEFINE_ERROR(50029, "import process definition error", "导入工作流定义错误"),
     TASK_DEFINE_NOT_EXIST(50030, "task definition [{0}] does not exist", "任务定义[{0}]不存在"),
     CREATE_PROCESS_TASK_RELATION_ERROR(50032, "create process task relation error", "创建工作流任务关系错误"),
     PROCESS_TASK_RELATION_NOT_EXIST(50033, "process task relation [{0}] does not exist", "工作流任务关系[{0}]不存在"),
-    PROCESS_TASK_RELATION_EXIST(50034, "process task relation is already exist, processCode:[{0}]", "工作流任务关系已存在, processCode:[{0}]"),
+    PROCESS_TASK_RELATION_EXIST(50034, "process task relation is already exist, processCode:[{0}]",
+            "工作流任务关系已存在, processCode:[{0}]"),
     PROCESS_DAG_IS_EMPTY(50035, "process dag is empty", "工作流dag是空"),
     CHECK_PROCESS_TASK_RELATION_ERROR(50036, "check process task relation error", "工作流任务关系参数错误"),
     CREATE_TASK_DEFINITION_ERROR(50037, "create task definition error", "创建任务错误"),
@@ -329,7 +372,6 @@ public enum Status {
     ACCESS_TOKEN_NOT_EXIST(70015, "access token not exist", "访问token不存在"),
     QUERY_ACCESSTOKEN_BY_USER_ERROR(70016, "query access token by user error", "查询访问指定用户的token错误"),
 
-
     COMMAND_STATE_COUNT_ERROR(80001, "task instance state count error", "查询各状态任务实例数错误"),
     NEGTIVE_SIZE_NUMBER_ERROR(80002, "query size number error", "查询size错误"),
     START_TIME_BIGGER_THAN_END_TIME_ERROR(80003, "start time bigger than end time error", "开始时间在结束时间之后错误"),
@@ -340,9 +382,11 @@ public enum Status {
     // audit log
     QUERY_AUDIT_LOG_LIST_PAGING(10057, "query resources list paging", "分页查询资源列表错误"),
 
-    //plugin
+    // plugin
     PLUGIN_NOT_A_UI_COMPONENT(110001, "query plugin error, this plugin has no UI component", "查询插件错误,此插件无UI组件"),
-    QUERY_PLUGINS_RESULT_IS_NULL(110002, "query alarm plugins result is empty, please check the startup status of the alarm component and confirm that the relevant alarm plugin is successfully registered", "查询告警插件为空, 请检查告警组件启动状态并确认相关告警插件已注册成功"),
+    QUERY_PLUGINS_RESULT_IS_NULL(110002,
+            "query alarm plugins result is empty, please check the startup status of the alarm component and confirm that the relevant alarm plugin is successfully registered",
+            "查询告警插件为空, 请检查告警组件启动状态并确认相关告警插件已注册成功"),
     QUERY_PLUGINS_ERROR(110003, "query plugins error", "查询插件错误"),
     QUERY_PLUGIN_DETAIL_RESULT_IS_NULL(110004, "query plugin detail result is null", "查询插件详情结果为空"),
 
@@ -353,7 +397,8 @@ public enum Status {
     QUERY_ALL_ALERT_PLUGIN_INSTANCE_ERROR(110009, "query all alert plugin instance error", "查询所有告警实例失败"),
     PLUGIN_INSTANCE_ALREADY_EXIT(110010, "plugin instance already exit", "该告警插件实例已存在"),
     LIST_PAGING_ALERT_PLUGIN_INSTANCE_ERROR(110011, "query plugin instance page error", "分页查询告警实例失败"),
-    DELETE_ALERT_PLUGIN_INSTANCE_ERROR_HAS_ALERT_GROUP_ASSOCIATED(110012, "failed to delete the alert instance, there is an alarm group associated with this alert instance",
+    DELETE_ALERT_PLUGIN_INSTANCE_ERROR_HAS_ALERT_GROUP_ASSOCIATED(110012,
+            "failed to delete the alert instance, there is an alarm group associated with this alert instance",
             "删除告警实例失败,存在与此告警实例关联的警报组"),
     PROCESS_DEFINITION_VERSION_IS_USED(110013, "this process definition version is used", "此工作流定义版本被使用"),
 
@@ -363,9 +408,10 @@ public enum Status {
     ENVIRONMENT_CONFIG_IS_NULL(120004, "this environment config shouldn't be empty.", "环境配置信息不能为空"),
     UPDATE_ENVIRONMENT_ERROR(120005, "update environment [{0}] info error", "更新环境[{0}]信息失败"),
     DELETE_ENVIRONMENT_ERROR(120006, "delete environment error", "删除环境信息失败"),
-    DELETE_ENVIRONMENT_RELATED_TASK_EXISTS(120007, "this environment has been used in tasks,so you can't delete it.", "该环境已经被任务使用,所以不能删除该环境信息"),
-    QUERY_ENVIRONMENT_BY_NAME_ERROR(1200008, "not found environment [{0}] ", "查询环境名称[{0}]信息不存在"),
-    QUERY_ENVIRONMENT_BY_CODE_ERROR(1200009, "not found environment [{0}] ", "查询环境编码[{0}]不存在"),
+    DELETE_ENVIRONMENT_RELATED_TASK_EXISTS(120007, "this environment has been used in tasks,so you can't delete it.",
+            "该环境已经被任务使用,所以不能删除该环境信息"),
+    QUERY_ENVIRONMENT_BY_NAME_ERROR(1200008, "not found environment name [{0}] ", "查询环境名称[{0}]不存在"),
+    QUERY_ENVIRONMENT_BY_CODE_ERROR(1200009, "not found environment code [{0}] ", "查询环境编码[{0}]不存在"),
     QUERY_ENVIRONMENT_ERROR(1200010, "login user query environment error", "分页查询环境列表错误"),
     VERIFY_ENVIRONMENT_ERROR(1200011, "verify environment error", "验证环境信息错误"),
     GET_RULE_FORM_CREATE_JSON_ERROR(1200012, "get rule form create json error", "获取规则 FROM-CREATE-JSON 错误"),
@@ -383,16 +429,19 @@ public enum Status {
     CLUSTER_CONFIG_IS_NULL(120023, "this cluster config shouldn't be empty.", "集群配置信息不能为空"),
     UPDATE_CLUSTER_ERROR(120024, "update cluster [{0}] info error", "更新集群[{0}]信息失败"),
     DELETE_CLUSTER_ERROR(120025, "delete cluster error", "删除集群信息失败"),
-    DELETE_CLUSTER_RELATED_TASK_EXISTS(120026, "this cluster has been used in tasks,so you can't delete it.", "该集群已经被任务使用,所以不能删除该集群信息"),
+    DELETE_CLUSTER_RELATED_TASK_EXISTS(120026, "this cluster has been used in tasks,so you can't delete it.",
+            "该集群已经被任务使用,所以不能删除该集群信息"),
     QUERY_CLUSTER_BY_NAME_ERROR(1200027, "not found cluster [{0}] ", "查询集群名称[{0}]信息不存在"),
     QUERY_CLUSTER_BY_CODE_ERROR(1200028, "not found cluster [{0}] ", "查询集群编码[{0}]不存在"),
     QUERY_CLUSTER_ERROR(1200029, "login user query cluster error", "分页查询集群列表错误"),
     VERIFY_CLUSTER_ERROR(1200030, "verify cluster error", "验证集群信息错误"),
     CLUSTER_PROCESS_DEFINITIONS_IS_INVALID(1200031, "cluster worker groups is invalid format", "集群关联的工作组参数解析错误"),
-    UPDATE_CLUSTER_PROCESS_DEFINITION_RELATION_ERROR(1200032, "You can't modify the process definition, because the process definition [{0}] and this cluster [{1}] already be used in the task [{2}]",
-        "您不能修改集群选项,因为该工作流组 [{0}] 和 该集群 [{1}] 已经被用在任务 [{2}] 中"),
+    UPDATE_CLUSTER_PROCESS_DEFINITION_RELATION_ERROR(1200032,
+            "You can't modify the process definition, because the process definition [{0}] and this cluster [{1}] already be used in the task [{2}]",
+            "您不能修改集群选项,因为该工作流组 [{0}] 和 该集群 [{1}] 已经被用在任务 [{2}] 中"),
     CLUSTER_NOT_EXISTS(120033, "this cluster can not found in db.", "集群配置数据库里查询不到为空"),
-    DELETE_CLUSTER_RELATED_NAMESPACE_EXISTS(120034, "this cluster has been used in namespace,so you can't delete it.", "该集群已经被命名空间使用,所以不能删除该集群信息"),
+    DELETE_CLUSTER_RELATED_NAMESPACE_EXISTS(120034, "this cluster has been used in namespace,so you can't delete it.",
+            "该集群已经被命名空间使用,所以不能删除该集群信息"),
 
     TASK_GROUP_NAME_EXSIT(130001, "this task group name is repeated in a project", "该任务组名称在一个项目中已经使用"),
     TASK_GROUP_SIZE_ERROR(130002, "task group size error", "任务组大小应该为大于1的整数"),
@@ -409,7 +458,8 @@ public enum Status {
     QUERY_TASK_GROUP_QUEUE_LIST_ERROR(130013, "query task group queue list error", "查询任务组队列列表错误"),
     TASK_GROUP_CACHE_START_FAILED(130014, "cache start failed", "任务组相关的缓存启动失败"),
     ENVIRONMENT_WORKER_GROUPS_IS_INVALID(130015, "environment worker groups is invalid format", "环境关联的工作组参数解析错误"),
-    UPDATE_ENVIRONMENT_WORKER_GROUP_RELATION_ERROR(130016, "You can't modify the worker group, because the worker group [{0}] and this environment [{1}] already be used in the task [{2}]",
+    UPDATE_ENVIRONMENT_WORKER_GROUP_RELATION_ERROR(130016,
+            "You can't modify the worker group, because the worker group [{0}] and this environment [{1}] already be used in the task [{2}]",
             "您不能修改工作组选项,因为该工作组 [{0}] 和 该环境 [{1}] 已经被用在任务 [{2}] 中"),
     TASK_GROUP_QUEUE_ALREADY_START(130017, "task group queue already start", "节点已经获取任务组资源"),
     TASK_GROUP_STATUS_CLOSED(130018, "The task group has been closed.", "任务组已经被关闭"),
@@ -418,7 +468,8 @@ public enum Status {
     NOT_ALLOW_TO_DELETE_DEFAULT_ALARM_GROUP(130030, "Not allow to delete the default alarm group ", "不能删除默认告警组"),
     TIME_ZONE_ILLEGAL(130031, "time zone [{0}] is illegal", "时区参数 [{0}] 不合法"),
 
-    QUERY_K8S_NAMESPACE_LIST_PAGING_ERROR(1300001, "login user query k8s namespace list paging error", "分页查询k8s名称空间列表错误"),
+    QUERY_K8S_NAMESPACE_LIST_PAGING_ERROR(1300001, "login user query k8s namespace list paging error",
+            "分页查询k8s名称空间列表错误"),
     K8S_NAMESPACE_EXIST(1300002, "k8s namespace {0} already exists", "k8s命名空间[{0}]已存在"),
     CREATE_K8S_NAMESPACE_ERROR(1300003, "create k8s namespace error", "创建k8s命名空间错误"),
     UPDATE_K8S_NAMESPACE_ERROR(1300004, "update k8s namespace error", "更新k8s命名空间信息错误"),
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
index d63c9a6cb7..df44dc1445 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
@@ -63,7 +63,8 @@ public interface ExecutorService {
                                             FailureStrategy failureStrategy, String startNodeList,
                                             TaskDependType taskDependType, WarningType warningType, int warningGroupId,
                                             RunMode runMode,
-                                            Priority processInstancePriority, String workerGroup, Long environmentCode, Integer timeout,
+                                            Priority processInstancePriority, String workerGroup, Long environmentCode,
+                                            Integer timeout,
                                             Map<String, String> startParams, Integer expectedParallelismNumber,
                                             int dryRun, int testFlag,
                                             ComplementDependentMode complementDependentMode);
@@ -74,10 +75,10 @@ public interface ExecutorService {
      * @param projectCode project code
      * @param processDefinition process definition
      * @param processDefineCode process definition code
-     * @param verison process definition version
-     * @return check result code
+     * @param version process definition version
      */
-    Map<String, Object> checkProcessDefinitionValid(long projectCode, ProcessDefinition processDefinition, long processDefineCode, Integer verison);
+    void checkProcessDefinitionValid(long projectCode, ProcessDefinition processDefinition, long processDefineCode,
+                                     Integer version);
 
     /**
      * do action to process instance:pause, stop, repeat, recover from pause, recover from stop
@@ -132,9 +133,9 @@ public interface ExecutorService {
      * @return execute process instance code
      */
     Map<String, Object> execStreamTaskInstance(User loginUser, long projectCode,
-                                            long taskDefinitionCode, int taskDefinitionVersion,
-                                            int warningGroupId,
-                                            String workerGroup, Long environmentCode,
-                                            Map<String, String> startParams,
-                                            int dryRun);
+                                               long taskDefinitionCode, int taskDefinitionVersion,
+                                               int warningGroupId,
+                                               String workerGroup, Long environmentCode,
+                                               Map<String, String> startParams,
+                                               int dryRun);
 }
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
index beab02207a..464ee139be 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
@@ -17,6 +17,9 @@
 
 package org.apache.dolphinscheduler.api.service;
 
+import org.apache.dolphinscheduler.api.dto.workflow.WorkflowCreateRequest;
+import org.apache.dolphinscheduler.api.dto.workflow.WorkflowFilterRequest;
+import org.apache.dolphinscheduler.api.dto.workflow.WorkflowUpdateRequest;
 import org.apache.dolphinscheduler.api.utils.PageInfo;
 import org.apache.dolphinscheduler.api.utils.Result;
 import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum;
@@ -67,6 +70,15 @@ public interface ProcessDefinitionService {
                                                 String otherParamsJson,
                                                 ProcessExecutionTypeEnum executionType);
 
+    /**
+     * create process definition V2
+     *
+     * @param loginUser login user
+     * @param workflowCreateRequest the new workflow object will be created
+     * @return New ProcessDefinition object created just now
+     */
+    ProcessDefinition createSingleProcessDefinition(User loginUser, WorkflowCreateRequest workflowCreateRequest);
+
     /**
      * query process definition list
      *
@@ -107,6 +119,16 @@ public interface ProcessDefinitionService {
                                                                  Integer pageNo,
                                                                  Integer pageSize);
 
+    /**
+     * Filter resource process definitions
+     *
+     * @param loginUser login user
+     * @param workflowFilterRequest workflow filter requests
+     * @return List process definition
+     */
+    PageInfo<ProcessDefinition> filterProcessDefinition(User loginUser,
+                                                        WorkflowFilterRequest workflowFilterRequest);
+
     /**
      * query detail of process definition
      *
@@ -120,6 +142,16 @@ public interface ProcessDefinitionService {
                                                      long projectCode,
                                                      long code);
 
+    /**
+     * Get resource workflow
+     *
+     * @param loginUser login user
+     * @param code process definition code
+     * @return Process definition Object
+     */
+    ProcessDefinition getProcessDefinition(User loginUser,
+                                           long code);
+
     /**
      * query detail of process definition
      *
@@ -208,13 +240,10 @@ public interface ProcessDefinitionService {
      * delete process definition by code
      *
      * @param loginUser login user
-     * @param projectCode project code
      * @param code process definition code
-     * @return delete result code
      */
-    Map<String, Object> deleteProcessDefinitionByCode(User loginUser,
-                                                      long projectCode,
-                                                      long code);
+    void deleteProcessDefinitionByCode(User loginUser,
+                                       long code);
 
     /**
      * release process definition: online / offline
@@ -430,6 +459,18 @@ public interface ProcessDefinitionService {
                                                          String otherParamsJson,
                                                          ProcessExecutionTypeEnum executionType);
 
+    /**
+     * update process definition basic info, not including task definition, task relation and location.
+     *
+     * @param loginUser login user
+     * @param workflowCode workflow resource code you want to update
+     * @param workflowUpdateRequest workflow update requests
+     * @return ProcessDefinition instance
+     */
+    ProcessDefinition updateSingleProcessDefinition(User loginUser,
+                                                    long workflowCode,
+                                                    WorkflowUpdateRequest workflowUpdateRequest);
+
     /**
      * release process definition and schedule
      *
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SchedulerService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SchedulerService.java
index 70d77690f0..400643f3e9 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SchedulerService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SchedulerService.java
@@ -17,6 +17,10 @@
 
 package org.apache.dolphinscheduler.api.service;
 
+import org.apache.dolphinscheduler.api.dto.schedule.ScheduleCreateRequest;
+import org.apache.dolphinscheduler.api.dto.schedule.ScheduleFilterRequest;
+import org.apache.dolphinscheduler.api.dto.schedule.ScheduleUpdateRequest;
+import org.apache.dolphinscheduler.api.utils.PageInfo;
 import org.apache.dolphinscheduler.api.utils.Result;
 import org.apache.dolphinscheduler.common.enums.FailureStrategy;
 import org.apache.dolphinscheduler.common.enums.Priority;
@@ -59,6 +63,16 @@ public interface SchedulerService {
                                        String workerGroup,
                                        Long environmentCode);
 
+    /**
+     * save schedule V2
+     *
+     * @param loginUser             login user
+     * @param scheduleCreateRequest the new schedule object will be created
+     * @return Schedule object
+     */
+    Schedule createSchedulesV2(User loginUser,
+                               ScheduleCreateRequest scheduleCreateRequest);
+
     /**
      * updateProcessInstance schedule
      *
@@ -85,6 +99,28 @@ public interface SchedulerService {
                                        String workerGroup,
                                        Long environmentCode);
 
+    /**
+     * update schedule object V2
+     *
+     * @param loginUser login user
+     * @param scheduleId scheduler id
+     * @param scheduleUpdateRequest the schedule object will be updated
+     * @return Schedule object
+     */
+    Schedule updateSchedulesV2(User loginUser,
+                               Integer scheduleId,
+                               ScheduleUpdateRequest scheduleUpdateRequest);
+
+    /**
+     * get schedule object
+     *
+     * @param loginUser login user
+     * @param scheduleId scheduler id
+     * @return Schedule object
+     */
+    Schedule getSchedule(User loginUser,
+                         Integer scheduleId);
+
     /**
      * set schedule online or offline
      *
@@ -115,6 +151,16 @@ public interface SchedulerService {
 
     List<Schedule> queryScheduleByProcessDefinitionCodes(List<Long> processDefinitionCodes);
 
+    /**
+     * query schedule V2
+     *
+     * @param loginUser login user
+     * @param scheduleFilterRequest schedule filter request
+     * @return schedule list page
+     */
+    PageInfo<Schedule> filterSchedules(User loginUser,
+                                       ScheduleFilterRequest scheduleFilterRequest);
+
     /**
      * query schedule list
      *
@@ -137,11 +183,9 @@ public interface SchedulerService {
      * delete schedule by id
      *
      * @param loginUser login user
-     * @param projectCode project code
-     * @param scheduleId scheule id
-     * @return delete result code
+     * @param scheduleId schedule id
      */
-    Map<String, Object> deleteScheduleById(User loginUser, long projectCode, Integer scheduleId);
+    void deleteSchedulesById(User loginUser, Integer scheduleId);
 
     /**
      * preview schedule
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
index 32de874b1d..a5bfe2780a 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
@@ -195,11 +195,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
 
         // check process define release state
         ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode);
-        result = checkProcessDefinitionValid(projectCode, processDefinition, processDefinitionCode,
+        this.checkProcessDefinitionValid(projectCode, processDefinition, processDefinitionCode,
                 processDefinition.getVersion());
-        if (result.get(Constants.STATUS) != Status.SUCCESS) {
-            return result;
-        }
 
         if (!checkTenantSuitable(processDefinition)) {
             logger.error(
@@ -291,32 +288,23 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
      * @param projectCode       project code
      * @param processDefinition process definition
      * @param processDefineCode process definition code
-     * @param version           process instance verison
-     * @return check result code
+     * @param version           process instance version
      */
     @Override
-    public Map<String, Object> checkProcessDefinitionValid(long projectCode, ProcessDefinition processDefinition,
-                                                           long processDefineCode, Integer version) {
-        Map<String, Object> result = new HashMap<>();
+    public void checkProcessDefinitionValid(long projectCode, ProcessDefinition processDefinition,
+                                            long processDefineCode, Integer version) {
+        // check process definition exists
         if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
-            // check process definition exists
-            logger.error("Process definition does not exist, projectCode:{}, processDefinitionCode:{}.", projectCode,
-                    processDefineCode);
-            putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(processDefineCode));
-        } else if (processDefinition.getReleaseState() != ReleaseState.ONLINE) {
-            // check process definition online
-            logger.warn("Process definition is not {}, processDefinitionCode:{}, version:{}.",
-                    ReleaseState.ONLINE.getDescp(), processDefineCode, version);
-            putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, String.valueOf(processDefineCode), version);
-        } else if (!checkSubProcessDefinitionValid(processDefinition)) {
-            // check sub process definition online
-            logger.warn("Subprocess definition of process definition is not {}, processDefinitionCode:{}.",
-                    ReleaseState.ONLINE.getDescp(), processDefineCode);
-            putMsg(result, Status.SUB_PROCESS_DEFINE_NOT_RELEASE);
-        } else {
-            result.put(Constants.STATUS, Status.SUCCESS);
+            throw new ServiceException(Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(processDefineCode));
+        }
+        // check process definition online
+        if (processDefinition.getReleaseState() != ReleaseState.ONLINE) {
+            throw new ServiceException(Status.PROCESS_DEFINE_NOT_RELEASE, String.valueOf(processDefineCode), version);
+        }
+        // check sub process definition online
+        if (!checkSubProcessDefinitionValid(processDefinition)) {
+            throw new ServiceException(Status.SUB_PROCESS_DEFINE_NOT_RELEASE);
         }
-        return result;
     }
 
     /**
@@ -392,13 +380,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
                         processInstance.getProcessDefinitionVersion());
         processDefinition.setReleaseState(ReleaseState.ONLINE);
         if (executeType != ExecuteType.STOP && executeType != ExecuteType.PAUSE) {
-            result =
-                    checkProcessDefinitionValid(projectCode, processDefinition,
-                            processInstance.getProcessDefinitionCode(),
-                            processInstance.getProcessDefinitionVersion());
-            if (result.get(Constants.STATUS) != Status.SUCCESS) {
-                return result;
-            }
+            this.checkProcessDefinitionValid(projectCode, processDefinition, processInstance.getProcessDefinitionCode(),
+                    processInstance.getProcessDefinitionVersion());
         }
 
         result = checkExecuteType(processInstance, executeType);
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
index 68bb2ee7a8..dac045b28e 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
@@ -43,6 +43,9 @@ import org.apache.dolphinscheduler.api.dto.DagDataSchedule;
 import org.apache.dolphinscheduler.api.dto.ScheduleParam;
 import org.apache.dolphinscheduler.api.dto.treeview.Instance;
 import org.apache.dolphinscheduler.api.dto.treeview.TreeViewDto;
+import org.apache.dolphinscheduler.api.dto.workflow.WorkflowCreateRequest;
+import org.apache.dolphinscheduler.api.dto.workflow.WorkflowFilterRequest;
+import org.apache.dolphinscheduler.api.dto.workflow.WorkflowUpdateRequest;
 import org.apache.dolphinscheduler.api.enums.Status;
 import org.apache.dolphinscheduler.api.exceptions.ServiceException;
 import org.apache.dolphinscheduler.api.service.ProcessDefinitionService;
@@ -292,6 +295,82 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
         return createDagDefine(loginUser, taskRelationList, processDefinition, taskDefinitionLogs, otherParamsJson);
     }
 
+    private void createWorkflowValid(User user, ProcessDefinition processDefinition) {
+        Project project = projectMapper.queryByCode(processDefinition.getProjectCode());
+        if (project == null) {
+            throw new ServiceException(Status.PROJECT_NOT_FOUND, processDefinition.getProjectCode());
+        }
+        // check user access for project
+        projectService.checkProjectAndAuthThrowException(user, project, WORKFLOW_CREATE);
+
+        if (checkDescriptionLength(processDefinition.getDescription())) {
+            throw new ServiceException(Status.DESCRIPTION_TOO_LONG_ERROR);
+        }
+
+        // check whether the new process define name exist
+        ProcessDefinition definition =
+                processDefinitionMapper.verifyByDefineName(project.getCode(), processDefinition.getName());
+        if (definition != null) {
+            throw new ServiceException(Status.PROCESS_DEFINITION_NAME_EXIST, processDefinition.getName());
+        }
+
+        this.getTenantId(processDefinition);
+    }
+
+    private int getTenantId(ProcessDefinition processDefinition) {
+        int tenantId = -1;
+        if (!Constants.DEFAULT.equals(processDefinition.getTenantCode())) {
+            Tenant tenant = tenantMapper.queryByTenantCode(processDefinition.getTenantCode());
+            if (tenant == null) {
+                throw new ServiceException(Status.TENANT_NOT_EXIST);
+            }
+            tenantId = tenant.getId();
+        }
+        return tenantId;
+    }
+
+    private void syncObj2Log(User user, ProcessDefinition processDefinition) {
+        ProcessDefinitionLog processDefinitionLog = new ProcessDefinitionLog(processDefinition);
+        processDefinitionLog.setOperator(user.getId());
+        int result = processDefinitionLogMapper.insert(processDefinitionLog);
+        if (result <= 0) {
+            throw new ServiceException(Status.CREATE_PROCESS_DEFINITION_LOG_ERROR);
+        }
+    }
+
+    /**
+     * create single process definition
+     *
+     * @param loginUser login user
+     * @param workflowCreateRequest the new workflow object will be created
+     * @return New ProcessDefinition object created just now
+     */
+    @Override
+    @Transactional
+    public ProcessDefinition createSingleProcessDefinition(User loginUser,
+                                                           WorkflowCreateRequest workflowCreateRequest) {
+        ProcessDefinition processDefinition = workflowCreateRequest.convert2ProcessDefinition();
+        this.createWorkflowValid(loginUser, processDefinition);
+
+        long processDefinitionCode;
+        try {
+            processDefinitionCode = CodeGenerateUtils.getInstance().genCode();
+        } catch (CodeGenerateException e) {
+            throw new ServiceException(Status.INTERNAL_SERVER_ERROR_ARGS);
+        }
+
+        processDefinition.setTenantId(this.getTenantId(processDefinition));
+        processDefinition.setCode(processDefinitionCode);
+        processDefinition.setUserId(loginUser.getId());
+
+        int create = processDefinitionMapper.insert(processDefinition);
+        if (create <= 0) {
+            throw new ServiceException(Status.CREATE_PROCESS_DEFINITION_ERROR);
+        }
+        this.syncObj2Log(loginUser, processDefinition);
+        return processDefinition;
+    }
+
     protected Map<String, Object> createDagDefine(User loginUser,
                                                   List<ProcessTaskRelationLog> taskRelationList,
                                                   ProcessDefinition processDefinition,
@@ -515,6 +594,46 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
         return pageInfo;
     }
 
+    /**
+     * Filter resource process definitions
+     *
+     * @param loginUser login user
+     * @param workflowFilterRequest workflow filter requests
+     * @return List process definition
+     */
+    @Override
+    public PageInfo<ProcessDefinition> filterProcessDefinition(User loginUser,
+                                                               WorkflowFilterRequest workflowFilterRequest) {
+        ProcessDefinition processDefinition = workflowFilterRequest.convert2ProcessDefinition();
+        if (workflowFilterRequest.getProjectName() != null) {
+            Project project = projectMapper.queryByName(workflowFilterRequest.getProjectName());
+            // check user access for project
+            projectService.checkProjectAndAuthThrowException(loginUser, project, WORKFLOW_DEFINITION);
+            processDefinition.setProjectCode(project.getCode());
+        }
+
+        Page<ProcessDefinition> page =
+                new Page<>(workflowFilterRequest.getPageNo(), workflowFilterRequest.getPageSize());
+        IPage<ProcessDefinition> processDefinitionIPage =
+                processDefinitionMapper.filterProcessDefinition(page, processDefinition);
+
+        List<ProcessDefinition> records = processDefinitionIPage.getRecords();
+        for (ProcessDefinition pd : records) {
+            ProcessDefinitionLog processDefinitionLog =
+                    processDefinitionLogMapper.queryByDefinitionCodeAndVersion(pd.getCode(), pd.getVersion());
+            User user = userMapper.selectById(processDefinitionLog.getOperator());
+            pd.setModifyBy(user.getUserName());
+        }
+
+        processDefinitionIPage.setRecords(records);
+        PageInfo<ProcessDefinition> pageInfo =
+                new PageInfo<>(workflowFilterRequest.getPageNo(), workflowFilterRequest.getPageSize());
+        pageInfo.setTotal((int) processDefinitionIPage.getTotal());
+        pageInfo.setTotalList(processDefinitionIPage.getRecords());
+
+        return pageInfo;
+    }
+
     /**
      * query detail of process definition
      *
@@ -549,6 +668,32 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
         return result;
     }
 
+    /**
+     * query detail of process definition
+     *
+     * @param loginUser login user
+     * @param code process definition code
+     * @return process definition detail
+     */
+    @Override
+    public ProcessDefinition getProcessDefinition(User loginUser, long code) {
+        ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code);
+        if (processDefinition == null) {
+            throw new ServiceException(Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(code));
+        }
+
+        Project project = projectMapper.queryByCode(processDefinition.getProjectCode());
+        // check user access for project
+        projectService.checkProjectAndAuthThrowException(loginUser, project, WORKFLOW_DEFINITION);
+
+        Tenant tenant = tenantMapper.queryById(processDefinition.getTenantId());
+        if (tenant != null) {
+            processDefinition.setTenantCode(tenant.getTenantCode());
+        }
+
+        return processDefinition;
+    }
+
     @Override
     public Map<String, Object> queryProcessDefinitionByName(User loginUser, long projectCode, String name) {
         Project project = projectMapper.queryByCode(projectCode);
@@ -815,8 +960,6 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
     private void processDefinitionUsedInOtherTaskValid(ProcessDefinition processDefinition) {
         // check process definition is already online
         if (processDefinition.getReleaseState() == ReleaseState.ONLINE) {
-            logger.warn("Process definition can not be deleted due to {}, processDefinitionCode:{}.",
-                    ReleaseState.ONLINE.getDescp(), processDefinition.getCode());
             throw new ServiceException(Status.PROCESS_DEFINE_STATE_ONLINE, processDefinition.getName());
         }
 
@@ -824,9 +967,6 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
         List<ProcessInstance> processInstances = processInstanceService
                 .queryByProcessDefineCodeAndStatus(processDefinition.getCode(), Constants.NOT_TERMINATED_STATES);
         if (CollectionUtils.isNotEmpty(processInstances)) {
-            logger.warn(
-                    "Process definition can not be deleted because there are {} executing process instances, processDefinitionCode:{}",
-                    processInstances.size(), processDefinition.getCode());
             throw new ServiceException(Status.DELETE_PROCESS_DEFINITION_EXECUTING_FAIL, processInstances.size());
         }
 
@@ -838,9 +978,6 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
                     .map(task -> String.format(Constants.FORMAT_S_S_COLON, task.getProcessDefinitionName(),
                             task.getTaskName()))
                     .collect(Collectors.joining(Constants.COMMA));
-            logger.warn(
-                    "Process definition can not be deleted due to being referenced by other tasks:{}, processDefinitionCode:{}",
-                    taskDepDetail, processDefinition.getCode());
             throw new ServiceException(Status.DELETE_PROCESS_DEFINITION_USE_BY_OTHER_FAIL, taskDepDetail);
         }
     }
@@ -849,33 +986,23 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
      * delete process definition by code
      *
      * @param loginUser login user
-     * @param projectCode project code
      * @param code process definition code
-     * @return delete result code
      */
     @Override
     @Transactional
-    public Map<String, Object> deleteProcessDefinitionByCode(User loginUser, long projectCode, long code) {
-        Project project = projectMapper.queryByCode(projectCode);
-        // check user access for project
-        Map<String, Object> result =
-                projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_DEFINITION_DELETE);
-        if (result.get(Constants.STATUS) != Status.SUCCESS) {
-            return result;
-        }
+    public void deleteProcessDefinitionByCode(User loginUser, long code) {
         ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code);
-        if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
-            logger.error("Process definition does not exist, processCode:{}.", code);
-            putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(code));
-            return result;
+        if (processDefinition == null) {
+            throw new ServiceException(Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(code));
         }
 
+        Project project = projectMapper.queryByCode(processDefinition.getProjectCode());
+        // check user access for project
+        projectService.checkProjectAndAuthThrowException(loginUser, project, WORKFLOW_DEFINITION_DELETE);
+
         // Determine if the login user is the owner of the process definition
         if (loginUser.getId() != processDefinition.getUserId() && loginUser.getUserType() != UserType.ADMIN_USER) {
-            logger.warn("User does not have permission for process definition, userId:{}, processDefinitionCode:{}.",
-                    loginUser.getId(), code);
-            putMsg(result, Status.USER_NO_OPERATION_PERM);
-            return result;
+            throw new ServiceException(Status.USER_NO_OPERATION_PERM);
         }
 
         processDefinitionUsedInOtherTaskValid(processDefinition);
@@ -886,25 +1013,16 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
             if (scheduleObj.getReleaseState() == ReleaseState.OFFLINE) {
                 int delete = scheduleMapper.deleteById(scheduleObj.getId());
                 if (delete == 0) {
-                    logger.error(
-                            "Delete schedule of process definition error, processDefinitionCode:{}, scheduleId:{}.",
-                            code, scheduleObj.getId());
-                    putMsg(result, Status.DELETE_SCHEDULE_CRON_BY_ID_ERROR);
-                    throw new ServiceException(Status.DELETE_SCHEDULE_CRON_BY_ID_ERROR);
+                    throw new ServiceException(Status.DELETE_SCHEDULE_BY_ID_ERROR);
                 }
             }
             if (scheduleObj.getReleaseState() == ReleaseState.ONLINE) {
-                logger.warn(
-                        "Process definition can not be deleted due to schedule {}, processDefinitionCode:{}, scheduleId:{}.",
-                        ReleaseState.ONLINE.getDescp(), processDefinition.getCode(), scheduleObj.getId());
-                putMsg(result, Status.SCHEDULE_CRON_STATE_ONLINE, scheduleObj.getId());
-                return result;
+                throw new ServiceException(Status.SCHEDULE_STATE_ONLINE, scheduleObj.getId());
             }
         }
         int delete = processDefinitionMapper.deleteById(processDefinition.getId());
         if (delete == 0) {
             logger.error("Delete process definition error, processDefinitionCode:{}.", code);
-            putMsg(result, Status.DELETE_PROCESS_DEFINE_BY_CODE_ERROR);
             throw new ServiceException(Status.DELETE_PROCESS_DEFINE_BY_CODE_ERROR);
         }
         int deleteRelation = processTaskRelationMapper.deleteByCode(project.getCode(), processDefinition.getCode());
@@ -913,9 +1031,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
                     "The process definition has not relation, it will be delete successfully, processDefinitionCode:{}.",
                     code);
         }
-        deleteOtherRelation(project, result, processDefinition);
-        putMsg(result, Status.SUCCESS);
-        return result;
+        deleteOtherRelation(project, new HashMap<>(), processDefinition);
     }
 
     /**
@@ -1119,7 +1235,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
     @Override
     @Transactional
     public Map<String, Object> importSqlProcessDefinition(User loginUser, long projectCode, MultipartFile file) {
-        Map<String, Object> result = new HashMap<>();
+        Map<String, Object> result;
         Project project = projectMapper.queryByCode(projectCode);
         result = projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_IMPORT);
         if (result.get(Constants.STATUS) != Status.SUCCESS) {
@@ -2092,7 +2208,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
      * get new Task name or Process name when copy or import operate
      * @param originalName Task or Process original name
      * @param suffix "_copy_" or "_import_"
-     * @return
+     * @return new name
      */
     public String getNewName(String originalName, String suffix) {
         StringBuilder newName = new StringBuilder();
@@ -2526,6 +2642,68 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
         return result;
     }
 
+    private void updateWorkflowValid(User user, ProcessDefinition oldProcessDefinition,
+                                     ProcessDefinition newProcessDefinition) {
+        // online can not permit edit
+        if (oldProcessDefinition.getReleaseState() == ReleaseState.ONLINE) {
+            throw new ServiceException(Status.PROCESS_DEFINE_NOT_ALLOWED_EDIT, oldProcessDefinition.getName());
+        }
+
+        Project project = projectMapper.queryByCode(oldProcessDefinition.getProjectCode());
+        // check user access for project
+        projectService.checkProjectAndAuthThrowException(user, project, WORKFLOW_UPDATE);
+
+        if (checkDescriptionLength(newProcessDefinition.getDescription())) {
+            throw new ServiceException(Status.DESCRIPTION_TOO_LONG_ERROR);
+        }
+
+        // check whether the new process define name exist
+        if (!oldProcessDefinition.getName().equals(newProcessDefinition.getName())) {
+            ProcessDefinition definition = processDefinitionMapper
+                    .verifyByDefineName(newProcessDefinition.getProjectCode(), newProcessDefinition.getName());
+            if (definition != null) {
+                throw new ServiceException(Status.PROCESS_DEFINITION_NAME_EXIST, newProcessDefinition.getName());
+            }
+        }
+    }
+
+    /**
+     * update single resource workflow
+     *
+     * @param loginUser     login user
+     * @param workflowCode     workflow resource code want to update
+     * @param workflowUpdateRequest   workflow update resource object
+     * @return Process definition
+     */
+    @Override
+    @Transactional
+    public ProcessDefinition updateSingleProcessDefinition(User loginUser,
+                                                           long workflowCode,
+                                                           WorkflowUpdateRequest workflowUpdateRequest) {
+        ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(workflowCode);
+        // check process definition exists
+        if (processDefinition == null) {
+            throw new ServiceException(Status.PROCESS_DEFINE_NOT_EXIST, workflowCode);
+        }
+
+        ProcessDefinition processDefinitionUpdate = workflowUpdateRequest.mergeIntoProcessDefinition(processDefinition);
+        this.updateWorkflowValid(loginUser, processDefinition, processDefinitionUpdate);
+
+        if (processDefinitionUpdate.getTenantCode() != null) {
+            Tenant tenant = tenantMapper.queryByTenantCode(processDefinitionUpdate.getTenantCode());
+            if (tenant == null) {
+                throw new ServiceException(Status.TENANT_NOT_EXIST);
+            }
+            processDefinitionUpdate.setTenantId(tenant.getId());
+        }
+        int update = processDefinitionMapper.updateById(processDefinitionUpdate);
+        if (update <= 0) {
+            throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
+        }
+        this.syncObj2Log(loginUser, processDefinition);
+        return processDefinition;
+    }
+
     protected Map<String, Object> updateDagSchedule(User loginUser,
                                                     long projectCode,
                                                     long processDefinitionCode,
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java
index 633760121b..56b475a346 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java
@@ -20,6 +20,9 @@ package org.apache.dolphinscheduler.api.service.impl;
 import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.PROJECT;
 
 import org.apache.dolphinscheduler.api.dto.ScheduleParam;
+import org.apache.dolphinscheduler.api.dto.schedule.ScheduleCreateRequest;
+import org.apache.dolphinscheduler.api.dto.schedule.ScheduleFilterRequest;
+import org.apache.dolphinscheduler.api.dto.schedule.ScheduleUpdateRequest;
 import org.apache.dolphinscheduler.api.enums.Status;
 import org.apache.dolphinscheduler.api.exceptions.ServiceException;
 import org.apache.dolphinscheduler.api.service.ExecutorService;
@@ -38,11 +41,13 @@ import org.apache.dolphinscheduler.common.enums.WarningType;
 import org.apache.dolphinscheduler.common.model.Server;
 import org.apache.dolphinscheduler.common.utils.DateUtils;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.dao.entity.Environment;
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
 import org.apache.dolphinscheduler.dao.entity.Project;
 import org.apache.dolphinscheduler.dao.entity.Schedule;
 import org.apache.dolphinscheduler.dao.entity.User;
+import org.apache.dolphinscheduler.dao.mapper.EnvironmentMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
@@ -55,6 +60,7 @@ import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 
+import java.lang.reflect.InvocationTargetException;
 import java.time.ZoneId;
 import java.time.ZonedDateTime;
 import java.util.ArrayList;
@@ -68,6 +74,7 @@ import java.util.stream.Collectors;
 
 import lombok.NonNull;
 
+import org.quartz.CronExpression;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -110,6 +117,9 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
     @Autowired
     private ProcessTaskRelationMapper processTaskRelationMapper;
 
+    @Autowired
+    private EnvironmentMapper environmentMapper;
+
     /**
      * save schedule
      *
@@ -150,11 +160,8 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
 
         // check work flow define release state
         ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefineCode);
-        result = executorService.checkProcessDefinitionValid(projectCode, processDefinition, processDefineCode,
+        executorService.checkProcessDefinitionValid(projectCode, processDefinition, processDefineCode,
                 processDefinition.getVersion());
-        if (result.get(Constants.STATUS) != Status.SUCCESS) {
-            return result;
-        }
 
         Schedule scheduleObj = new Schedule();
         Date now = new Date();
@@ -212,6 +219,80 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
         return result;
     }
 
+    protected void projectPermCheckByProcess(User loginUser, long processDefinitionCode) {
+        ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode);
+        if (processDefinition == null) {
+            throw new ServiceException(Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionCode);
+        }
+        Project project = projectMapper.queryByCode(processDefinition.getProjectCode());
+        // check project auth
+        this.projectService.checkProjectAndAuthThrowException(loginUser, project, null);
+    }
+
+    private void scheduleParamCheck(String scheduleParamStr) {
+        ScheduleParam scheduleParam = JSONUtils.parseObject(scheduleParamStr, ScheduleParam.class);
+        if (scheduleParam == null) {
+            throw new ServiceException(Status.PARSE_SCHEDULE_PARAM_ERROR, scheduleParamStr);
+        }
+        if (DateUtils.differSec(scheduleParam.getStartTime(), scheduleParam.getEndTime()) == 0) {
+            throw new ServiceException(Status.SCHEDULE_START_TIME_END_TIME_SAME);
+        }
+        if (scheduleParam.getStartTime().getTime() > scheduleParam.getEndTime().getTime()) {
+            throw new ServiceException(Status.START_TIME_BIGGER_THAN_END_TIME_ERROR);
+        }
+        if (!CronExpression.isValidExpression(scheduleParam.getCrontab())) {
+            throw new ServiceException(Status.SCHEDULE_CRON_CHECK_FAILED, scheduleParam.getCrontab());
+        }
+    }
+
+    /**
+     * save schedule V2, will also change process definition's warningGroupId if schedule's warningGroupId be set
+     *
+     * @param loginUser               login user
+     * @param scheduleCreateRequest   schedule create object
+     * @return Schedule object just be created
+     */
+    @Override
+    @Transactional
+    public Schedule createSchedulesV2(User loginUser,
+                                      ScheduleCreateRequest scheduleCreateRequest) {
+        this.projectPermCheckByProcess(loginUser, scheduleCreateRequest.getProcessDefinitionCode());
+
+        ProcessDefinition processDefinition =
+                processDefinitionMapper.queryByCode(scheduleCreateRequest.getProcessDefinitionCode());
+
+        // check workflow define release state
+        executorService.checkProcessDefinitionValid(processDefinition.getProjectCode(), processDefinition,
+                processDefinition.getCode(), processDefinition.getVersion());
+
+        Schedule scheduleExists =
+                scheduleMapper.queryByProcessDefinitionCode(scheduleCreateRequest.getProcessDefinitionCode());
+        if (scheduleExists != null) {
+            throw new ServiceException(Status.SCHEDULE_ALREADY_EXISTS, scheduleCreateRequest.getProcessDefinitionCode(),
+                    scheduleExists.getId());
+        }
+
+        Schedule schedule = scheduleCreateRequest.convert2Schedule();
+        Environment environment = environmentMapper.queryByEnvironmentCode(schedule.getEnvironmentCode());
+        if (environment == null) {
+            throw new ServiceException(Status.QUERY_ENVIRONMENT_BY_CODE_ERROR, schedule.getEnvironmentCode());
+        }
+        schedule.setUserId(loginUser.getId());
+        // give more detail when return schedule object
+        schedule.setUserName(loginUser.getUserName());
+        schedule.setProcessDefinitionName(processDefinition.getName());
+
+        this.scheduleParamCheck(scheduleCreateRequest.getScheduleParam());
+        int create = scheduleMapper.insert(schedule);
+        if (create <= 0) {
+            throw new ServiceException(Status.CREATE_SCHEDULE_ERROR);
+        }
+        // updateProcessInstance receivers and cc by process definition id
+        processDefinition.setWarningGroupId(schedule.getWarningGroupId());
+        processDefinitionMapper.updateById(processDefinition);
+        return schedule;
+    }
+
     /**
      * updateProcessInstance schedule
      *
@@ -254,13 +335,14 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
 
         if (schedule == null) {
             logger.error("Schedule does not exist, scheduleId:{}.", id);
-            putMsg(result, Status.SCHEDULE_CRON_NOT_EXISTS, id);
+            putMsg(result, Status.SCHEDULE_NOT_EXISTS, id);
             return result;
         }
 
         ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(schedule.getProcessDefinitionCode());
         if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
-            logger.error("Process definition does not exist, processDefinitionCode:{}.", schedule.getProcessDefinitionCode());
+            logger.error("Process definition does not exist, processDefinitionCode:{}.",
+                    schedule.getProcessDefinitionCode());
             putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(schedule.getProcessDefinitionCode()));
             return result;
         }
@@ -270,6 +352,69 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
         return result;
     }
 
+    /**
+     * update schedule object V2
+     *
+     * @param loginUser login user
+     * @param scheduleId scheduler id
+     * @param scheduleUpdateRequest the schedule object will be updated
+     * @return Schedule object
+     */
+    @Override
+    @Transactional
+    public Schedule updateSchedulesV2(User loginUser,
+                                      Integer scheduleId,
+                                      ScheduleUpdateRequest scheduleUpdateRequest) {
+        Schedule schedule = scheduleMapper.selectById(scheduleId);
+        if (schedule == null) {
+            throw new ServiceException(Status.SCHEDULE_NOT_EXISTS, scheduleId);
+        }
+
+        Schedule scheduleUpdate;
+        try {
+            scheduleUpdate = scheduleUpdateRequest.mergeIntoSchedule(schedule);
+            // check update params
+            this.scheduleParamCheck(scheduleUpdateRequest.updateScheduleParam(scheduleUpdate));
+        } catch (InvocationTargetException | IllegalAccessException | InstantiationException
+                | NoSuchMethodException e) {
+            throw new ServiceException(Status.REQUEST_PARAMS_NOT_VALID_ERROR, scheduleUpdateRequest.toString());
+        }
+        // check update params
+        this.projectPermCheckByProcess(loginUser, scheduleUpdate.getProcessDefinitionCode());
+
+        if (scheduleUpdate.getEnvironmentCode() != null) {
+            Environment environment = environmentMapper.queryByEnvironmentCode(scheduleUpdate.getEnvironmentCode());
+            if (environment == null) {
+                throw new ServiceException(Status.QUERY_ENVIRONMENT_BY_CODE_ERROR, scheduleUpdate.getEnvironmentCode());
+            }
+        }
+
+        int update = scheduleMapper.updateById(scheduleUpdate);
+        if (update <= 0) {
+            throw new ServiceException(Status.UPDATE_SCHEDULE_ERROR);
+        }
+        return scheduleUpdate;
+    }
+
+    /**
+     * get schedule object
+     *
+     * @param loginUser login user
+     * @param scheduleId scheduler id
+     * @return Schedule object
+     */
+    @Override
+    @Transactional
+    public Schedule getSchedule(User loginUser,
+                                Integer scheduleId) {
+        Schedule schedule = scheduleMapper.selectById(scheduleId);
+        if (schedule == null) {
+            throw new ServiceException(Status.SCHEDULE_NOT_EXISTS, scheduleId);
+        }
+        this.projectPermCheckByProcess(loginUser, schedule.getProcessDefinitionCode());
+        return schedule;
+    }
+
     /**
      * set schedule online or offline
      *
@@ -312,14 +457,16 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
         ProcessDefinition processDefinition =
                 processDefinitionMapper.queryByCode(scheduleObj.getProcessDefinitionCode());
         if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
-            logger.error("Process definition does not exist, processDefinitionCode:{}.", scheduleObj.getProcessDefinitionCode());
+            logger.error("Process definition does not exist, processDefinitionCode:{}.",
+                    scheduleObj.getProcessDefinitionCode());
             putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(scheduleObj.getProcessDefinitionCode()));
             return result;
         }
         List<ProcessTaskRelation> processTaskRelations =
                 processTaskRelationMapper.queryByProcessCode(projectCode, scheduleObj.getProcessDefinitionCode());
         if (processTaskRelations.isEmpty()) {
-            logger.error("Process task relations do not exist, projectCode:{}, processDefinitionCode:{}.", projectCode, processDefinition.getCode());
+            logger.error("Process task relations do not exist, projectCode:{}, processDefinitionCode:{}.", projectCode,
+                    processDefinition.getCode());
             putMsg(result, Status.PROCESS_DAG_IS_EMPTY);
             return result;
         }
@@ -335,7 +482,8 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
             List<Long> subProcessDefineCodes = new ArrayList<>();
             processService.recurseFindSubProcess(processDefinition.getCode(), subProcessDefineCodes);
             if (!subProcessDefineCodes.isEmpty()) {
-                logger.info("Need to check sub process definition state before change schedule state, subProcessDefineCodes:{}.",
+                logger.info(
+                        "Need to check sub process definition state before change schedule state, subProcessDefineCodes:{}.",
                         org.apache.commons.lang.StringUtils.join(subProcessDefineCodes, ","));
                 List<ProcessDefinition> subProcessDefinitionList =
                         processDefinitionMapper.queryByCodes(subProcessDefineCodes);
@@ -345,7 +493,8 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
                          * if there is no online process, exit directly
                          */
                         if (subProcessDefinition.getReleaseState() != ReleaseState.ONLINE) {
-                            logger.warn("Only sub process definition state is {} can change schedule state, subProcessDefinitionCode:{}.",
+                            logger.warn(
+                                    "Only sub process definition state is {} can change schedule state, subProcessDefinitionCode:{}.",
                                     ReleaseState.ONLINE.getDescp(), subProcessDefinition.getCode());
                             putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE,
                                     String.valueOf(subProcessDefinition.getId()));
@@ -454,6 +603,33 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
         return scheduleMapper.querySchedulesByProcessDefinitionCodes(processDefinitionCodes);
     }
 
+    /**
+     * query schedule
+     *
+     * @param loginUser login user
+     * @param scheduleFilterRequest schedule filter request
+     * @return schedule list page
+     */
+    @Override
+    @Transactional
+    public PageInfo<Schedule> filterSchedules(User loginUser,
+                                              ScheduleFilterRequest scheduleFilterRequest) {
+        if (scheduleFilterRequest.getProjectName() != null) {
+            Project project = projectMapper.queryByName(scheduleFilterRequest.getProjectName());
+            // check project auth
+            projectService.checkProjectAndAuthThrowException(loginUser, project, null);
+        }
+        Page<Schedule> page = new Page<>(scheduleFilterRequest.getPageNo(), scheduleFilterRequest.getPageSize());
+        IPage<Schedule> scheduleIPage = scheduleMapper.filterSchedules(page, scheduleFilterRequest.convert2Schedule());
+
+        PageInfo<Schedule> pageInfo =
+                new PageInfo<>(scheduleFilterRequest.getPageNo(), scheduleFilterRequest.getPageSize());
+        pageInfo.setTotal((int) scheduleIPage.getTotal());
+        pageInfo.setTotalList(scheduleIPage.getRecords());
+
+        return pageInfo;
+    }
+
     /**
      * query schedule list
      *
@@ -485,7 +661,8 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
     }
 
     public void setSchedule(int projectId, Schedule schedule) {
-        logger.info("Set schedule state {}, project id: {}, scheduleId: {}", schedule.getReleaseState().getDescp(), projectId, schedule.getId());
+        logger.info("Set schedule state {}, project id: {}, scheduleId: {}", schedule.getReleaseState().getDescp(),
+                projectId, schedule.getId());
         schedulerApi.insertOrUpdateScheduleTask(projectId, schedule);
     }
 
@@ -523,54 +700,28 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
      * delete schedule by id
      *
      * @param loginUser   login user
-     * @param projectCode project code
-     * @param scheduleId  scheule id
-     * @return delete result code
+     * @param scheduleId  schedule id
      */
     @Override
-    public Map<String, Object> deleteScheduleById(User loginUser, long projectCode, Integer scheduleId) {
-
-        Map<String, Object> result = new HashMap<>();
-        Project project = projectMapper.queryByCode(projectCode);
-
-        Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectCode, null);
-        Status resultEnum = (Status) checkResult.get(Constants.STATUS);
-        if (resultEnum != Status.SUCCESS) {
-            return checkResult;
-        }
-
+    public void deleteSchedulesById(User loginUser, Integer scheduleId) {
         Schedule schedule = scheduleMapper.selectById(scheduleId);
-
         if (schedule == null) {
-            logger.error("Schedule does not exist, scheduleId:{}.", scheduleId);
-            putMsg(result, Status.SCHEDULE_CRON_NOT_EXISTS, scheduleId);
-            return result;
+            throw new ServiceException(Status.SCHEDULE_NOT_EXISTS, scheduleId);
         }
-
-        // Determine if the login user is the owner of the schedule
-        if (loginUser.getId() != schedule.getUserId() && loginUser.getUserType() != UserType.ADMIN_USER) {
-            logger.warn("User does not have permission to delete schedule, loginUserName:{}, scheduleId:{}.", loginUser.getUserName(), scheduleId);
-            putMsg(result, Status.USER_NO_OPERATION_PERM);
-            return result;
-        }
-
         // check schedule is already online
         if (schedule.getReleaseState() == ReleaseState.ONLINE) {
-            logger.warn("Only {} state schedule can be deleted, scheduleId:{}.", ReleaseState.OFFLINE.getDescp(), scheduleId);
-            putMsg(result, Status.SCHEDULE_CRON_STATE_ONLINE, schedule.getId());
-            return result;
+            throw new ServiceException(Status.SCHEDULE_STATE_ONLINE, scheduleId);
+        }
+        // Determine if the login user is the owner of the schedule
+        if (loginUser.getId() != schedule.getUserId() && loginUser.getUserType() != UserType.ADMIN_USER) {
+            throw new ServiceException(Status.USER_NO_OPERATION_PERM);
         }
 
+        this.projectPermCheckByProcess(loginUser, schedule.getProcessDefinitionCode());
         int delete = scheduleMapper.deleteById(scheduleId);
-
-        if (delete > 0) {
-            logger.info("Schedule delete complete, scheduleId:{}.", scheduleId);
-            putMsg(result, Status.SUCCESS);
-        } else {
-            logger.error("Schedule delete error, scheduleId:{}.", scheduleId);
-            putMsg(result, Status.DELETE_SCHEDULE_CRON_BY_ID_ERROR);
+        if (delete <= 0) {
+            throw new ServiceException(Status.DELETE_SCHEDULE_BY_ID_ERROR);
         }
-        return result;
     }
 
     /**
@@ -643,7 +794,8 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
         // check schedule exists
         Schedule schedule = scheduleMapper.queryByProcessDefinitionCode(processDefinitionCode);
         if (schedule == null) {
-            logger.error("Schedule of process definition does not exist, processDefinitionCode:{}.", processDefinitionCode);
+            logger.error("Schedule of process definition does not exist, processDefinitionCode:{}.",
+                    processDefinitionCode);
             putMsg(result, Status.SCHEDULE_CRON_NOT_EXISTS, processDefinitionCode);
             return result;
         }
@@ -665,8 +817,9 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
                                 FailureStrategy failureStrategy, Priority processInstancePriority, String workerGroup,
                                 long environmentCode) {
         if (checkValid(result, schedule.getReleaseState() == ReleaseState.ONLINE,
-            Status.SCHEDULE_CRON_ONLINE_FORBID_UPDATE)) {
-            logger.warn("Schedule can not be updated due to schedule is {}, scheduleId:{}.", ReleaseState.ONLINE.getDescp(), schedule.getId());
+                Status.SCHEDULE_CRON_ONLINE_FORBID_UPDATE)) {
+            logger.warn("Schedule can not be updated due to schedule is {}, scheduleId:{}.",
+                    ReleaseState.ONLINE.getDescp(), schedule.getId());
             return;
         }
 
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/Result.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/Result.java
index 3415db54bf..a08f056b90 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/Result.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/Result.java
@@ -27,6 +27,7 @@ import java.text.MessageFormat;
  * @param <T> T
  */
 public class Result<T> {
+
     /**
      * status
      */
@@ -50,13 +51,13 @@ public class Result<T> {
         this.msg = msg;
     }
 
-    private Result(Status status) {
+    public Result(Status status) {
         if (status != null) {
             this.code = status.getCode();
             this.msg = status.getMsg();
         }
     }
-    
+
     public Result(Integer code, String msg, T data) {
         this.code = code;
         this.msg = msg;
@@ -73,7 +74,7 @@ public class Result<T> {
     public static <T> Result<T> success(T data) {
         return new Result<>(Status.SUCCESS.getCode(), Status.SUCCESS.getMsg(), data);
     }
-    
+
     public static Result success() {
         return success(null);
     }
diff --git a/dolphinscheduler-api/src/main/resources/i18n/messages.properties b/dolphinscheduler-api/src/main/resources/i18n/messages.properties
index 59669f9e2e..24b3b5fd86 100644
--- a/dolphinscheduler-api/src/main/resources/i18n/messages.properties
+++ b/dolphinscheduler-api/src/main/resources/i18n/messages.properties
@@ -217,7 +217,7 @@ SKIP_LINE_NUM=skip line num
 QUERY_TASK_INSTANCE_LOG_NOTES=query task instance log
 DOWNLOAD_TASK_INSTANCE_LOG_NOTES=download task instance log
 USERS_TAG=users related operation
-SCHEDULER_TAG=scheduler related operation
+SCHEDULE_TAG=schedule related operation
 CREATE_SCHEDULE_NOTES=create schedule
 CREATE_USER_NOTES=create user
 TENANT_ID=tenant id
@@ -269,7 +269,7 @@ DELETE_DATA_SOURCE_NOTES=delete data source
 VERIFY_DATA_SOURCE_NOTES=verify data source
 UNAUTHORIZED_DATA_SOURCE_NOTES=unauthorized data source
 AUTHORIZED_DATA_SOURCE_NOTES=authorized data source
-DELETE_SCHEDULER_BY_ID_NOTES=delete scheduler by id
+DELETE_SCHEDULE_NOTES=delete schedule by id
 QUERY_ALERT_GROUP_LIST_PAGING_NOTES=query alert group list paging
 EXPORT_PROCESS_DEFINITION_BY_ID_NOTES=export process definition by id
 BATCH_EXPORT_PROCESS_DEFINITION_BY_IDS_NOTES= batch export process definition by ids
diff --git a/dolphinscheduler-api/src/main/resources/i18n/messages_en_US.properties b/dolphinscheduler-api/src/main/resources/i18n/messages_en_US.properties
index 7f825464d9..2b5dfe2d03 100644
--- a/dolphinscheduler-api/src/main/resources/i18n/messages_en_US.properties
+++ b/dolphinscheduler-api/src/main/resources/i18n/messages_en_US.properties
@@ -265,7 +265,7 @@ SKIP_LINE_NUM=skip line num
 QUERY_TASK_INSTANCE_LOG_NOTES=query task instance log
 DOWNLOAD_TASK_INSTANCE_LOG_NOTES=download task instance log
 USERS_TAG=users related operation
-SCHEDULER_TAG=scheduler related operation
+SCHEDULE_TAG=schedule related operation
 CREATE_SCHEDULE_NOTES=create schedule
 CREATE_USER_NOTES=create user
 TENANT_ID=tenant id
@@ -326,7 +326,7 @@ DELETE_DATA_SOURCE_NOTES=delete data source
 VERIFY_DATA_SOURCE_NOTES=verify data source
 UNAUTHORIZED_DATA_SOURCE_NOTES=unauthorized data source
 AUTHORIZED_DATA_SOURCE_NOTES=authorized data source
-DELETE_SCHEDULER_BY_ID_NOTES=delete scheduler by id
+DELETE_SCHEDULE_NOTES=delete schedule by id
 QUERY_ALERT_GROUP_LIST_PAGING_NOTES=query alert group list paging
 EXPORT_PROCESS_DEFINITION_BY_ID_NOTES=export process definition by id
 BATCH_EXPORT_PROCESS_DEFINITION_BY_IDS_NOTES=batch export process definition by ids
diff --git a/dolphinscheduler-api/src/main/resources/i18n/messages_zh_CN.properties b/dolphinscheduler-api/src/main/resources/i18n/messages_zh_CN.properties
index 3ab39b13c9..ed0f739232 100644
--- a/dolphinscheduler-api/src/main/resources/i18n/messages_zh_CN.properties
+++ b/dolphinscheduler-api/src/main/resources/i18n/messages_zh_CN.properties
@@ -248,7 +248,7 @@ SKIP_LINE_NUM=忽略行数
 QUERY_TASK_INSTANCE_LOG_NOTES=查询任务实例日志
 DOWNLOAD_TASK_INSTANCE_LOG_NOTES=下载任务实例日志
 USERS_TAG=用户相关操作
-SCHEDULER_TAG=定时相关操作
+SCHEDULE_TAG=定时相关操作
 CREATE_SCHEDULE_NOTES=创建定时
 CREATE_USER_NOTES=创建用户
 CREATE_WORKER_GROUP_NOTES=创建Worker分组
@@ -323,7 +323,7 @@ DELETE_DATA_SOURCE_NOTES=删除数据源
 VERIFY_DATA_SOURCE_NOTES=验证数据源
 UNAUTHORIZED_DATA_SOURCE_NOTES=未授权的数据源
 AUTHORIZED_DATA_SOURCE_NOTES=授权的数据源
-DELETE_SCHEDULER_BY_ID_NOTES=根据定时id删除定时数据
+DELETE_SCHEDULE_NOTES=根据定时id删除定时数据
 QUERY_ALERT_GROUP_LIST_PAGING_NOTES=分页查询告警组列表
 EXPORT_PROCESS_DEFINITION_BY_ID_NOTES=通过工作流ID导出工作流定义
 BATCH_EXPORT_PROCESS_DEFINITION_BY_IDS_NOTES=批量导出工作流定义
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java
index 4f8df288bd..3ae23d9fc3 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java
@@ -40,6 +40,7 @@ import javax.servlet.http.HttpServletResponse;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
 import org.junit.runner.RunWith;
 import org.mockito.InjectMocks;
 import org.mockito.Mock;
@@ -299,15 +300,9 @@ public class ProcessDefinitionControllerTest {
     public void testDeleteProcessDefinitionByCode() {
         long projectCode = 1L;
         long code = 1L;
-
-        Map<String, Object> result = new HashMap<>();
-        putMsg(result, Status.SUCCESS);
-
-        Mockito.when(processDefinitionService.deleteProcessDefinitionByCode(user, projectCode, code))
-                .thenReturn(result);
-        Result response = processDefinitionController.deleteProcessDefinitionByCode(user, projectCode, code);
-
-        Assert.assertTrue(response != null && response.isSuccess());
+        // not throw error mean pass
+        Assertions.assertDoesNotThrow(
+                () -> processDefinitionController.deleteProcessDefinitionByCode(user, projectCode, code));
     }
 
     @Test
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/SchedulerControllerTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/SchedulerControllerTest.java
index 7b90330b3d..63ec80d287 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/SchedulerControllerTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/SchedulerControllerTest.java
@@ -38,7 +38,6 @@ import org.apache.dolphinscheduler.dao.entity.Resource;
 import org.apache.dolphinscheduler.dao.entity.User;
 
 import org.junit.Assert;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.mockito.Mockito;
 import org.slf4j.Logger;
@@ -50,6 +49,7 @@ import org.springframework.util.LinkedMultiValueMap;
 import org.springframework.util.MultiValueMap;
 
 public class SchedulerControllerTest extends AbstractControllerTest {
+
     private static final Logger logger = LoggerFactory.getLogger(SchedulerControllerTest.class);
 
     @MockBean(name = "schedulerService")
@@ -58,21 +58,22 @@ public class SchedulerControllerTest extends AbstractControllerTest {
     @Test
     public void testCreateSchedule() throws Exception {
         MultiValueMap<String, String> paramsMap = new LinkedMultiValueMap<>();
-        paramsMap.add("processDefinitionCode","40");
-        paramsMap.add("schedule","{'startTime':'2019-12-16 00:00:00','endTime':'2019-12-17 00:00:00','crontab':'0 0 6 * * ? *'}");
-        paramsMap.add("warningType",String.valueOf(WarningType.NONE));
-        paramsMap.add("warningGroupId","1");
-        paramsMap.add("failureStrategy",String.valueOf(FailureStrategy.CONTINUE));
-        paramsMap.add("receivers","");
-        paramsMap.add("receiversCc","");
-        paramsMap.add("workerGroupId","1");
-        paramsMap.add("processInstancePriority",String.valueOf(Priority.HIGH));
+        paramsMap.add("processDefinitionCode", "40");
+        paramsMap.add("schedule",
+                "{'startTime':'2019-12-16 00:00:00','endTime':'2019-12-17 00:00:00','crontab':'0 0 6 * * ? *'}");
+        paramsMap.add("warningType", String.valueOf(WarningType.NONE));
+        paramsMap.add("warningGroupId", "1");
+        paramsMap.add("failureStrategy", String.valueOf(FailureStrategy.CONTINUE));
+        paramsMap.add("receivers", "");
+        paramsMap.add("receiversCc", "");
+        paramsMap.add("workerGroupId", "1");
+        paramsMap.add("processInstancePriority", String.valueOf(Priority.HIGH));
 
         Mockito.when(schedulerService.insertSchedule(isA(User.class), isA(Long.class), isA(Long.class),
                 isA(String.class), isA(WarningType.class), isA(int.class), isA(FailureStrategy.class),
                 isA(Priority.class), isA(String.class), isA(Long.class))).thenReturn(success());
 
-        MvcResult mvcResult = mockMvc.perform(post("/projects/{projectCode}/schedules/",123)
+        MvcResult mvcResult = mockMvc.perform(post("/projects/{projectCode}/schedules/", 123)
                 .header(SESSION_ID, sessionId)
                 .params(paramsMap))
                 .andExpect(status().isCreated())
@@ -80,23 +81,23 @@ public class SchedulerControllerTest extends AbstractControllerTest {
                 .andReturn();
 
         Result result = JSONUtils.parseObject(mvcResult.getResponse().getContentAsString(), Result.class);
-        Assert.assertEquals(Status.SUCCESS.getCode(),result.getCode().intValue());
+        Assert.assertEquals(Status.SUCCESS.getCode(), result.getCode().intValue());
         logger.info(mvcResult.getResponse().getContentAsString());
     }
 
     @Test
-    @Ignore
     public void testUpdateSchedule() throws Exception {
         MultiValueMap<String, String> paramsMap = new LinkedMultiValueMap<>();
-        paramsMap.add("id","37");
-        paramsMap.add("schedule","{'startTime':'2019-12-16 00:00:00','endTime':'2019-12-17 00:00:00','crontab':'0 0 7 * * ? *'}");
-        paramsMap.add("warningType",String.valueOf(WarningType.NONE));
-        paramsMap.add("warningGroupId","1");
-        paramsMap.add("failureStrategy",String.valueOf(FailureStrategy.CONTINUE));
-        paramsMap.add("receivers","");
-        paramsMap.add("receiversCc","");
-        paramsMap.add("workerGroupId","1");
-        paramsMap.add("processInstancePriority",String.valueOf(Priority.HIGH));
+        paramsMap.add("id", "37");
+        paramsMap.add("schedule",
+                "{'startTime':'2019-12-16 00:00:00','endTime':'2019-12-17 00:00:00','crontab':'0 0 7 * * ? *'}");
+        paramsMap.add("warningType", String.valueOf(WarningType.NONE));
+        paramsMap.add("warningGroupId", "1");
+        paramsMap.add("failureStrategy", String.valueOf(FailureStrategy.CONTINUE));
+        paramsMap.add("receivers", "");
+        paramsMap.add("receiversCc", "");
+        paramsMap.add("workerGroupId", "1");
+        paramsMap.add("processInstancePriority", String.valueOf(Priority.HIGH));
 
         Mockito.when(schedulerService.updateSchedule(isA(User.class), isA(Long.class), isA(Integer.class),
                 isA(String.class), isA(WarningType.class), isA(Integer.class), isA(FailureStrategy.class),
@@ -110,19 +111,19 @@ public class SchedulerControllerTest extends AbstractControllerTest {
                 .andReturn();
 
         Result result = JSONUtils.parseObject(mvcResult.getResponse().getContentAsString(), Result.class);
-        Assert.assertEquals(Status.SUCCESS.getCode(),result.getCode().intValue());
+        Assert.assertEquals(Status.SUCCESS.getCode(), result.getCode().intValue());
         logger.info(mvcResult.getResponse().getContentAsString());
     }
 
     @Test
     public void testOnline() throws Exception {
         MultiValueMap<String, String> paramsMap = new LinkedMultiValueMap<>();
-        paramsMap.add("id","37");
+        paramsMap.add("id", "37");
 
         Mockito.when(schedulerService.setScheduleState(isA(User.class), isA(Long.class), isA(Integer.class),
                 isA(ReleaseState.class))).thenReturn(success());
 
-        MvcResult mvcResult = mockMvc.perform(post("/projects/{projectCode}/schedules/{id}/online",123, 37)
+        MvcResult mvcResult = mockMvc.perform(post("/projects/{projectCode}/schedules/{id}/online", 123, 37)
                 .header(SESSION_ID, sessionId)
                 .params(paramsMap))
                 .andExpect(status().isOk())
@@ -130,19 +131,19 @@ public class SchedulerControllerTest extends AbstractControllerTest {
                 .andReturn();
 
         Result result = JSONUtils.parseObject(mvcResult.getResponse().getContentAsString(), Result.class);
-        Assert.assertEquals(Status.SUCCESS.getCode(),result.getCode().intValue());
+        Assert.assertEquals(Status.SUCCESS.getCode(), result.getCode().intValue());
         logger.info(mvcResult.getResponse().getContentAsString());
     }
 
     @Test
     public void testOffline() throws Exception {
         MultiValueMap<String, String> paramsMap = new LinkedMultiValueMap<>();
-        paramsMap.add("id","28");
+        paramsMap.add("id", "28");
 
         Mockito.when(schedulerService.setScheduleState(isA(User.class), isA(Long.class), isA(Integer.class),
                 isA(ReleaseState.class))).thenReturn(success());
 
-        MvcResult mvcResult = mockMvc.perform(post("/projects/{projectCode}/schedules/{id}/offline",123, 28)
+        MvcResult mvcResult = mockMvc.perform(post("/projects/{projectCode}/schedules/{id}/offline", 123, 28)
                 .header(SESSION_ID, sessionId)
                 .params(paramsMap))
                 .andExpect(status().isOk())
@@ -150,17 +151,17 @@ public class SchedulerControllerTest extends AbstractControllerTest {
                 .andReturn();
 
         Result result = JSONUtils.parseObject(mvcResult.getResponse().getContentAsString(), Result.class);
-        Assert.assertEquals(Status.SUCCESS.getCode(),result.getCode().intValue());
+        Assert.assertEquals(Status.SUCCESS.getCode(), result.getCode().intValue());
         logger.info(mvcResult.getResponse().getContentAsString());
     }
 
     @Test
     public void testQueryScheduleListPaging() throws Exception {
         MultiValueMap<String, String> paramsMap = new LinkedMultiValueMap<>();
-        paramsMap.add("processDefinitionCode","40");
-        paramsMap.add("searchVal","test");
-        paramsMap.add("pageNo","1");
-        paramsMap.add("pageSize","30");
+        paramsMap.add("processDefinitionCode", "40");
+        paramsMap.add("searchVal", "test");
+        paramsMap.add("pageNo", "1");
+        paramsMap.add("pageSize", "30");
 
         PageInfo<Resource> pageInfo = new PageInfo<>(1, 10);
         Result mockResult = Result.success(pageInfo);
@@ -168,7 +169,7 @@ public class SchedulerControllerTest extends AbstractControllerTest {
         Mockito.when(schedulerService.querySchedule(isA(User.class), isA(Long.class), isA(Long.class),
                 isA(String.class), isA(Integer.class), isA(Integer.class))).thenReturn(mockResult);
 
-        MvcResult mvcResult = mockMvc.perform(get("/projects/{projectCode}/schedules/",123)
+        MvcResult mvcResult = mockMvc.perform(get("/projects/{projectCode}/schedules/", 123)
                 .header(SESSION_ID, sessionId)
                 .params(paramsMap))
                 .andExpect(status().isOk())
@@ -176,7 +177,7 @@ public class SchedulerControllerTest extends AbstractControllerTest {
                 .andReturn();
 
         Result result = JSONUtils.parseObject(mvcResult.getResponse().getContentAsString(), Result.class);
-        Assert.assertEquals(Status.SUCCESS.getCode(),result.getCode().intValue());
+        Assert.assertEquals(Status.SUCCESS.getCode(), result.getCode().intValue());
         logger.info(mvcResult.getResponse().getContentAsString());
     }
 
@@ -184,14 +185,14 @@ public class SchedulerControllerTest extends AbstractControllerTest {
     public void testQueryScheduleList() throws Exception {
         Mockito.when(schedulerService.queryScheduleList(isA(User.class), isA(Long.class))).thenReturn(success());
 
-        MvcResult mvcResult = mockMvc.perform(post("/projects/{projectCode}/schedules/list",123)
+        MvcResult mvcResult = mockMvc.perform(post("/projects/{projectCode}/schedules/list", 123)
                 .header(SESSION_ID, sessionId))
                 .andExpect(status().isOk())
                 .andExpect(content().contentType(MediaType.APPLICATION_JSON))
                 .andReturn();
 
         Result result = JSONUtils.parseObject(mvcResult.getResponse().getContentAsString(), Result.class);
-        Assert.assertEquals(Status.SUCCESS.getCode(),result.getCode().intValue());
+        Assert.assertEquals(Status.SUCCESS.getCode(), result.getCode().intValue());
         logger.info(mvcResult.getResponse().getContentAsString());
     }
 
@@ -200,27 +201,27 @@ public class SchedulerControllerTest extends AbstractControllerTest {
         Mockito.when(schedulerService.previewSchedule(isA(User.class), isA(String.class)))
                 .thenReturn(success());
 
-        MvcResult mvcResult = mockMvc.perform(post("/projects/{projectCode}/schedules/preview",123)
+        MvcResult mvcResult = mockMvc.perform(post("/projects/{projectCode}/schedules/preview", 123)
                 .header(SESSION_ID, sessionId)
-                .param("schedule","{'startTime':'2019-06-10 00:00:00','endTime':'2019-06-13 00:00:00','crontab':'0 0 3/6 * * ? *','timezoneId':'Asia/Shanghai'}"))
+                .param("schedule",
+                        "{'startTime':'2019-06-10 00:00:00','endTime':'2019-06-13 00:00:00','crontab':'0 0 3/6 * * ? *','timezoneId':'Asia/Shanghai'}"))
                 .andExpect(status().isCreated())
                 .andExpect(content().contentType(MediaType.APPLICATION_JSON))
                 .andReturn();
 
         Result result = JSONUtils.parseObject(mvcResult.getResponse().getContentAsString(), Result.class);
-        Assert.assertEquals(Status.SUCCESS.getCode(),result.getCode().intValue());
+        Assert.assertEquals(Status.SUCCESS.getCode(), result.getCode().intValue());
         logger.info(mvcResult.getResponse().getContentAsString());
     }
 
     @Test
     public void testDeleteScheduleById() throws Exception {
         MultiValueMap<String, String> paramsMap = new LinkedMultiValueMap<>();
-        paramsMap.add("scheduleId","37");
+        paramsMap.add("scheduleId", "37");
 
-        Mockito.when(schedulerService.deleteScheduleById(isA(User.class), isA(Long.class), isA(Integer.class)))
-                .thenReturn(success());
+        Mockito.doNothing().when(schedulerService).deleteSchedulesById(isA(User.class), isA(Integer.class));
 
-        MvcResult mvcResult = mockMvc.perform(delete("/projects/{projectCode}/schedules/{id}",123, 37)
+        MvcResult mvcResult = mockMvc.perform(delete("/projects/{projectCode}/schedules/{id}", 123, 37)
                 .header(SESSION_ID, sessionId)
                 .params(paramsMap))
                 .andExpect(status().isOk())
@@ -228,7 +229,7 @@ public class SchedulerControllerTest extends AbstractControllerTest {
                 .andReturn();
 
         Result result = JSONUtils.parseObject(mvcResult.getResponse().getContentAsString(), Result.class);
-        Assert.assertEquals(Status.SUCCESS.getCode(),result.getCode().intValue());
+        Assert.assertEquals(Status.SUCCESS.getCode(), result.getCode().intValue());
         logger.info(mvcResult.getResponse().getContentAsString());
     }
 }
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkflowV2ControllerTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkflowV2ControllerTest.java
new file mode 100644
index 0000000000..88ae51c176
--- /dev/null
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkflowV2ControllerTest.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.api.controller;
+
+import static org.apache.dolphinscheduler.common.Constants.EMPTY_STRING;
+
+import org.apache.dolphinscheduler.api.dto.workflow.WorkflowCreateRequest;
+import org.apache.dolphinscheduler.api.dto.workflow.WorkflowFilterRequest;
+import org.apache.dolphinscheduler.api.dto.workflow.WorkflowUpdateRequest;
+import org.apache.dolphinscheduler.api.service.ProcessDefinitionService;
+import org.apache.dolphinscheduler.api.utils.PageInfo;
+import org.apache.dolphinscheduler.api.utils.Result;
+import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum;
+import org.apache.dolphinscheduler.common.enums.ReleaseState;
+import org.apache.dolphinscheduler.common.enums.UserType;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
+import org.apache.dolphinscheduler.dao.entity.User;
+import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+
+/**
+ * project v2 controller test
+ */
+@RunWith(MockitoJUnitRunner.Silent.class)
+public class WorkflowV2ControllerTest {
+
+    protected User user;
+    @InjectMocks
+    private WorkflowV2Controller workflowV2Controller;
+    @Mock
+    private ProcessDefinitionService processDefinitionService;
+    @Mock
+    private TenantMapper tenantMapper;
+
+    private final static String name = "workflowName";
+    private final static String newName = "workflowNameNew";
+    private final static String releaseState = "ONLINE";
+    private final static int projectCode = 13579;
+    private final static String description = "the workflow description";
+    private final static int timeout = 30;
+    private final static String tenantCode = "dolphinscheduler";
+    private final static int warningGroupId = 0;
+    private final static String executionType = "PARALLEL";
+
+    @Before
+    public void before() {
+        User loginUser = new User();
+        loginUser.setId(1);
+        loginUser.setUserType(UserType.GENERAL_USER);
+        loginUser.setUserName("admin");
+        user = loginUser;
+    }
+
+    @Test
+    public void testCreateWorkflow() {
+        WorkflowCreateRequest workflowCreateRequest = new WorkflowCreateRequest();
+        workflowCreateRequest.setName(name);
+        workflowCreateRequest.setReleaseState(releaseState);
+        workflowCreateRequest.setProjectCode(projectCode);
+        workflowCreateRequest.setDescription(description);
+        workflowCreateRequest.setGlobalParams(EMPTY_STRING);
+        workflowCreateRequest.setTimeout(timeout);
+        workflowCreateRequest.setTenantCode(tenantCode);
+        workflowCreateRequest.setWarningGroupId(warningGroupId);
+        workflowCreateRequest.setExecutionType(executionType);
+
+        Mockito.when(processDefinitionService.createSingleProcessDefinition(user, workflowCreateRequest))
+                .thenReturn(this.getProcessDefinition(name));
+        Result<ProcessDefinition> resourceResponse = workflowV2Controller.createWorkflow(user, workflowCreateRequest);
+        Assert.assertEquals(this.getProcessDefinition(name), resourceResponse.getData());
+    }
+
+    @Test
+    public void testUpdateWorkflow() {
+        WorkflowUpdateRequest workflowUpdateRequest = new WorkflowUpdateRequest();
+        workflowUpdateRequest.setName(newName);
+
+        Mockito.when(processDefinitionService.updateSingleProcessDefinition(user, 1L, workflowUpdateRequest))
+                .thenReturn(this.getProcessDefinition(newName));
+        Result<ProcessDefinition> resourceResponse =
+                workflowV2Controller.updateWorkflow(user, 1L, workflowUpdateRequest);
+
+        Assert.assertEquals(this.getProcessDefinition(newName), resourceResponse.getData());
+    }
+
+    @Test
+    public void testGetWorkflow() {
+        Mockito.when(processDefinitionService.getProcessDefinition(user, 1L))
+                .thenReturn(this.getProcessDefinition(name));
+        Result<ProcessDefinition> resourceResponse = workflowV2Controller.getWorkflow(user, 1L);
+        Assertions.assertEquals(this.getProcessDefinition(name), resourceResponse.getData());
+    }
+
+    @Test
+    public void testFilterWorkflow() {
+        WorkflowFilterRequest workflowFilterRequest = new WorkflowFilterRequest();
+        workflowFilterRequest.setWorkflowName(name);
+
+        Mockito.when(processDefinitionService.filterProcessDefinition(user, workflowFilterRequest))
+                .thenReturn(this.getProcessDefinitionPage(name));
+        Result<PageInfo<ProcessDefinition>> pageResourceResponse =
+                workflowV2Controller.filterWorkflows(user, workflowFilterRequest);
+
+        PageInfo<ProcessDefinition> processDefinitionPage = pageResourceResponse.getData();
+        Assertions.assertIterableEquals(this.getProcessDefinitionPage(name).getTotalList(),
+                processDefinitionPage.getTotalList());
+    }
+
+    private ProcessDefinition getProcessDefinition(String pdName) {
+        ProcessDefinition processDefinition = new ProcessDefinition();
+        processDefinition.setId(1);
+        processDefinition.setName(pdName);
+        processDefinition.setDescription(description);
+        processDefinition.setReleaseState(ReleaseState.valueOf(releaseState));
+        processDefinition.setProjectCode(projectCode);
+        processDefinition.setTenantId(1);
+        processDefinition.setExecutionType(ProcessExecutionTypeEnum.valueOf(executionType));
+        processDefinition.setWarningGroupId(warningGroupId);
+        processDefinition.setGlobalParams(EMPTY_STRING);
+        return processDefinition;
+    }
+
+    private PageInfo<ProcessDefinition> getProcessDefinitionPage(String pdName) {
+        ProcessDefinition processDefinition = new ProcessDefinition();
+        processDefinition.setId(1);
+        processDefinition.setName(pdName);
+        processDefinition.setDescription(description);
+        processDefinition.setReleaseState(ReleaseState.valueOf(releaseState));
+        processDefinition.setProjectCode(projectCode);
+        processDefinition.setTenantId(1);
+        processDefinition.setExecutionType(ProcessExecutionTypeEnum.valueOf(executionType));
+        processDefinition.setWarningGroupId(warningGroupId);
+        processDefinition.setGlobalParams(EMPTY_STRING);
+
+        PageInfo<ProcessDefinition> pageInfoProcessDefinitions = new PageInfo<ProcessDefinition>();
+        List<ProcessDefinition> processDefinitions = new ArrayList<ProcessDefinition>();
+        processDefinitions.add(processDefinition);
+        pageInfoProcessDefinitions.setTotalList(processDefinitions);
+        return pageInfoProcessDefinitions;
+    }
+}
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/BaseServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/BaseServiceTest.java
index 4588e28248..49b3c1c299 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/BaseServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/BaseServiceTest.java
@@ -24,6 +24,10 @@ import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.UserType;
 import org.apache.dolphinscheduler.common.utils.HadoopUtils;
 import org.apache.dolphinscheduler.dao.entity.User;
+
+import java.util.HashMap;
+import java.util.Map;
+
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -35,9 +39,6 @@ import org.powermock.modules.junit4.PowerMockRunner;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashMap;
-import java.util.Map;
-
 /**
  * base service test
  */
@@ -63,9 +64,9 @@ public class BaseServiceTest {
 
         User user = new User();
         user.setUserType(UserType.ADMIN_USER);
-        //ADMIN_USER
+        // ADMIN_USER
         Assert.assertTrue(baseService.isAdmin(user));
-        //GENERAL_USER
+        // GENERAL_USER
         user.setUserType(UserType.GENERAL_USER);
         Assert.assertFalse(baseService.isAdmin(user));
 
@@ -76,9 +77,9 @@ public class BaseServiceTest {
 
         Map<String, Object> result = new HashMap<>();
         baseService.putMsg(result, Status.SUCCESS);
-        Assert.assertEquals(Status.SUCCESS,result.get(Constants.STATUS));
-        //has params
-        baseService.putMsg(result, Status.PROJECT_NOT_FOUND,"test");
+        Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
+        // has params
+        baseService.putMsg(result, Status.PROJECT_NOT_FOUND, "test");
 
     }
 
@@ -87,39 +88,39 @@ public class BaseServiceTest {
 
         Result result = new Result();
         baseService.putMsg(result, Status.SUCCESS);
-        Assert.assertEquals(Status.SUCCESS.getMsg(),result.getMsg());
-        //has params
-        baseService.putMsg(result,Status.PROJECT_NOT_FOUND,"test");
+        Assert.assertEquals(Status.SUCCESS.getMsg(), result.getMsg());
+        // has params
+        baseService.putMsg(result, Status.PROJECT_NOT_FOUND, "test");
     }
 
-//    @Test
-//    public void testCreateTenantDirIfNotExists() {
-//
-//        PowerMockito.mockStatic(HadoopUtils.class);
-//        PowerMockito.when(HadoopUtils.getInstance()).thenReturn(hadoopUtils);
-//
-//        try {
-//            baseService.createTenantDirIfNotExists("test");
-//        } catch (Exception e) {
-//            Assert.fail();
-//            logger.error("CreateTenantDirIfNotExists error ",e);
-//            e.printStackTrace();
-//        }
-//
-//    }
+    // @Test
+    // public void testCreateTenantDirIfNotExists() {
+    //
+    // PowerMockito.mockStatic(HadoopUtils.class);
+    // PowerMockito.when(HadoopUtils.getInstance()).thenReturn(hadoopUtils);
+    //
+    // try {
+    // baseService.createTenantDirIfNotExists("test");
+    // } catch (Exception e) {
+    // Assert.fail();
+    // logger.error("CreateTenantDirIfNotExists error ",e);
+    // e.printStackTrace();
+    // }
+    //
+    // }
 
     @Test
     public void testHasPerm() {
 
         User user = new User();
         user.setId(1);
-        //create user
-        Assert.assertTrue(baseService.canOperator(user,1));
+        // create user
+        Assert.assertTrue(baseService.canOperator(user, 1));
 
-        //admin
+        // admin
         user.setId(2);
         user.setUserType(UserType.ADMIN_USER);
-        Assert.assertTrue(baseService.canOperator(user,1));
+        Assert.assertTrue(baseService.canOperator(user, 1));
 
     }
 
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/PageQueryDto.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/BaseServiceTestTool.java
similarity index 56%
copy from dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/PageQueryDto.java
copy to dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/BaseServiceTestTool.java
index 8afd131e79..10c642a011 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/PageQueryDto.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/BaseServiceTestTool.java
@@ -15,22 +15,22 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.api.dto;
+package org.apache.dolphinscheduler.api.service;
 
-import io.swagger.annotations.ApiModel;
-import io.swagger.annotations.ApiModelProperty;
-import lombok.Data;
+import org.apache.dolphinscheduler.api.enums.Status;
+import org.apache.dolphinscheduler.common.Constants;
 
-/**
- * page query dto
- */
-@ApiModel("QUERY-PAGE-INFO")
-@Data
-public class PageQueryDto {
+import java.text.MessageFormat;
+import java.util.Map;
 
-    @ApiModelProperty(example = "10", required = true)
-    private Integer pageSize;
+public class BaseServiceTestTool {
 
-    @ApiModelProperty(example = "1", required = true)
-    private Integer pageNo;
-}
\ No newline at end of file
+    protected void putMsg(Map<String, Object> result, Status status, Object... statusParams) {
+        result.put(Constants.STATUS, status);
+        if (statusParams != null && statusParams.length > 0) {
+            result.put(Constants.MSG, MessageFormat.format(status.getMsg(), statusParams));
+        } else {
+            result.put(Constants.MSG, status.getMsg());
+        }
+    }
+}
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
index e4401cd71d..0253182c1c 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
@@ -23,10 +23,16 @@ import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationCon
 import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_DEFINITION;
 import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_DEFINITION_DELETE;
 import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_IMPORT;
+import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_ONLINE_OFFLINE;
 import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_TREE_VIEW;
 import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_UPDATE;
-import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.apache.dolphinscheduler.common.Constants.DEFAULT;
+import static org.apache.dolphinscheduler.common.Constants.EMPTY_STRING;
+import static org.mockito.ArgumentMatchers.isA;
 
+import org.apache.dolphinscheduler.api.dto.workflow.WorkflowCreateRequest;
+import org.apache.dolphinscheduler.api.dto.workflow.WorkflowFilterRequest;
+import org.apache.dolphinscheduler.api.dto.workflow.WorkflowUpdateRequest;
 import org.apache.dolphinscheduler.api.enums.Status;
 import org.apache.dolphinscheduler.api.exceptions.ServiceException;
 import org.apache.dolphinscheduler.api.service.impl.ProcessDefinitionServiceImpl;
@@ -53,6 +59,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskMainInfo;
 import org.apache.dolphinscheduler.dao.entity.Tenant;
 import org.apache.dolphinscheduler.dao.entity.User;
 import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper;
+import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
@@ -67,12 +74,11 @@ import org.apache.commons.lang3.StringUtils;
 
 import java.io.ByteArrayOutputStream;
 import java.nio.charset.StandardCharsets;
-import java.text.MessageFormat;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -83,7 +89,7 @@ import java.util.zip.ZipOutputStream;
 import javax.servlet.http.HttpServletResponse;
 
 import org.junit.Assert;
-import org.junit.Ignore;
+import org.junit.Before;
 import org.junit.Test;
 import org.junit.jupiter.api.Assertions;
 import org.junit.runner.RunWith;
@@ -97,7 +103,7 @@ import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 
 @RunWith(MockitoJUnitRunner.class)
-public class ProcessDefinitionServiceTest {
+public class ProcessDefinitionServiceTest extends BaseServiceTestTool {
 
     private static final String taskRelationJson =
             "[{\"name\":\"\",\"preTaskCode\":0,\"preTaskVersion\":0,\"postTaskCode\":123456789,"
@@ -117,7 +123,10 @@ public class ProcessDefinitionServiceTest {
     private ProcessDefinitionServiceImpl processDefinitionService;
 
     @Mock
-    private ProcessDefinitionMapper processDefineMapper;
+    private ProcessDefinitionMapper processDefinitionMapper;
+
+    @Mock
+    private ProcessDefinitionLogMapper processDefinitionLogMapper;
 
     @Mock
     private ProcessDefinitionDao processDefinitionDao;
@@ -152,61 +161,76 @@ public class ProcessDefinitionServiceTest {
     @Mock
     private WorkFlowLineageService workFlowLineageService;
 
+    protected User user;
+    protected Exception exception;
+    protected final static long projectCode = 1L;
+    protected final static long projectCodeOther = 2L;
+    protected final static long processDefinitionCode = 11L;
+    protected final static String name = "testProcessDefinitionName";
+    protected final static String description = "this is a description";
+    protected final static String releaseState = "ONLINE";
+    protected final static int warningGroupId = 1;
+    protected final static int timeout = 60;
+    protected final static String executionType = "PARALLEL";
+    protected final static String tenantCode = "tenant";
+
+    @Before
+    public void before() {
+        User loginUser = new User();
+        loginUser.setId(1);
+        loginUser.setTenantId(2);
+        loginUser.setUserType(UserType.GENERAL_USER);
+        loginUser.setUserName("admin");
+        user = loginUser;
+    }
+
     @Test
     public void testQueryProcessDefinitionList() {
-        long projectCode = 1L;
         Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(getProject(projectCode));
 
         Project project = getProject(projectCode);
-        User loginUser = new User();
-        loginUser.setId(-1);
-        loginUser.setUserType(UserType.GENERAL_USER);
 
         Map<String, Object> result = new HashMap<>();
         putMsg(result, Status.PROJECT_NOT_FOUND, projectCode);
 
         // project not found
-        Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_DEFINITION))
+        Mockito.when(projectService.checkProjectAndAuth(user, project, projectCode, WORKFLOW_DEFINITION))
                 .thenReturn(result);
-        Map<String, Object> map = processDefinitionService.queryProcessDefinitionList(loginUser, projectCode);
+        Map<String, Object> map = processDefinitionService.queryProcessDefinitionList(user, projectCode);
         Assert.assertEquals(Status.PROJECT_NOT_FOUND, map.get(Constants.STATUS));
 
         // project check auth success
         putMsg(result, Status.SUCCESS, projectCode);
-        Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_DEFINITION))
+        Mockito.when(projectService.checkProjectAndAuth(user, project, projectCode, WORKFLOW_DEFINITION))
                 .thenReturn(result);
         List<ProcessDefinition> resourceList = new ArrayList<>();
         resourceList.add(getProcessDefinition());
-        Mockito.when(processDefineMapper.queryAllDefinitionList(project.getCode())).thenReturn(resourceList);
+        Mockito.when(processDefinitionMapper.queryAllDefinitionList(project.getCode())).thenReturn(resourceList);
         Map<String, Object> checkSuccessRes =
-                processDefinitionService.queryProcessDefinitionList(loginUser, projectCode);
+                processDefinitionService.queryProcessDefinitionList(user, projectCode);
         Assert.assertEquals(Status.SUCCESS, checkSuccessRes.get(Constants.STATUS));
     }
 
     @Test
-    @SuppressWarnings("unchecked")
     public void testQueryProcessDefinitionListPaging() {
-        long projectCode = 1L;
-        Project project = getProject(projectCode);
+        Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(getProject(projectCode));
 
-        User loginUser = new User();
-        loginUser.setId(-1);
-        loginUser.setUserType(UserType.GENERAL_USER);
+        Project project = getProject(projectCode);
 
         // project not found
         try {
             Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(null);
             Mockito.doThrow(new ServiceException(Status.PROJECT_NOT_EXIST)).when(projectService)
-                    .checkProjectAndAuthThrowException(loginUser, null, WORKFLOW_DEFINITION);
-            processDefinitionService.queryProcessDefinitionListPaging(loginUser, projectCode, "", "", 1, 5, 0);
+                    .checkProjectAndAuthThrowException(user, null, WORKFLOW_DEFINITION);
+            processDefinitionService.queryProcessDefinitionListPaging(user, projectCode, "", "", 1, 5, 0);
         } catch (ServiceException serviceException) {
             Assert.assertEquals(Status.PROJECT_NOT_EXIST.getCode(), serviceException.getCode());
         }
 
         Map<String, Object> result = new HashMap<>();
         putMsg(result, Status.SUCCESS, projectCode);
-        loginUser.setId(1);
-        Mockito.doNothing().when(projectService).checkProjectAndAuthThrowException(loginUser, project,
+        user.setId(1);
+        Mockito.doNothing().when(projectService).checkProjectAndAuthThrowException(user, project,
                 WORKFLOW_DEFINITION);
         Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(project);
         PageListingResult<ProcessDefinition> pageListingResult = PageListingResult.<ProcessDefinition>builder()
@@ -223,21 +247,17 @@ public class ProcessDefinitionServiceTest {
                 Mockito.eq(project.getCode()))).thenReturn(pageListingResult);
 
         PageInfo<ProcessDefinition> pageInfo = processDefinitionService.queryProcessDefinitionListPaging(
-                loginUser, project.getCode(), "", "", 1, 0, 10);
+                user, project.getCode(), "", "", 1, 0, 10);
 
         Assert.assertNotNull(pageInfo);
     }
 
     @Test
     public void testQueryProcessDefinitionByCode() {
-        long projectCode = 1L;
         Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(getProject(projectCode));
 
         Project project = getProject(projectCode);
 
-        User loginUser = new User();
-        loginUser.setId(-1);
-        loginUser.setUserType(UserType.GENERAL_USER);
         Tenant tenant = new Tenant();
         tenant.setId(1);
         tenant.setTenantCode("root");
@@ -245,284 +265,263 @@ public class ProcessDefinitionServiceTest {
         putMsg(result, Status.PROJECT_NOT_FOUND, projectCode);
 
         // project check auth fail
-        Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_DEFINITION))
+        Mockito.when(projectService.checkProjectAndAuth(user, project, projectCode, WORKFLOW_DEFINITION))
                 .thenReturn(result);
-        Map<String, Object> map = processDefinitionService.queryProcessDefinitionByCode(loginUser, 1L, 1L);
+        Map<String, Object> map = processDefinitionService.queryProcessDefinitionByCode(user, 1L, 1L);
         Assert.assertEquals(Status.PROJECT_NOT_FOUND, map.get(Constants.STATUS));
 
         // project check auth success, instance not exist
         putMsg(result, Status.SUCCESS, projectCode);
-        Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_DEFINITION))
+        Mockito.when(projectService.checkProjectAndAuth(user, project, projectCode, WORKFLOW_DEFINITION))
                 .thenReturn(result);
         DagData dagData = new DagData(getProcessDefinition(), null, null);
         Mockito.when(processService.genDagData(Mockito.any())).thenReturn(dagData);
 
         Map<String, Object> instanceNotexitRes =
-                processDefinitionService.queryProcessDefinitionByCode(loginUser, projectCode, 1L);
+                processDefinitionService.queryProcessDefinitionByCode(user, projectCode, 1L);
         Assert.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST, instanceNotexitRes.get(Constants.STATUS));
 
         // instance exit
-        Mockito.when(processDefineMapper.queryByCode(46L)).thenReturn(getProcessDefinition());
+        Mockito.when(processDefinitionMapper.queryByCode(46L)).thenReturn(getProcessDefinition());
         putMsg(result, Status.SUCCESS, projectCode);
-        Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_DEFINITION))
+        Mockito.when(projectService.checkProjectAndAuth(user, project, projectCode, WORKFLOW_DEFINITION))
                 .thenReturn(result);
         Mockito.when(tenantMapper.queryById(1)).thenReturn(tenant);
         Map<String, Object> successRes =
-                processDefinitionService.queryProcessDefinitionByCode(loginUser, projectCode, 46L);
+                processDefinitionService.queryProcessDefinitionByCode(user, projectCode, 46L);
         Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS));
     }
 
     @Test
     public void testQueryProcessDefinitionByName() {
-        long projectCode = 1L;
         Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(getProject(projectCode));
 
         Project project = getProject(projectCode);
 
-        User loginUser = new User();
-        loginUser.setId(-1);
-        loginUser.setUserType(UserType.GENERAL_USER);
-
         Map<String, Object> result = new HashMap<>();
         putMsg(result, Status.PROJECT_NOT_FOUND, projectCode);
 
         // project check auth fail
-        Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_DEFINITION))
+        Mockito.when(projectService.checkProjectAndAuth(user, project, projectCode, WORKFLOW_DEFINITION))
                 .thenReturn(result);
         Map<String, Object> map =
-                processDefinitionService.queryProcessDefinitionByName(loginUser, projectCode, "test_def");
+                processDefinitionService.queryProcessDefinitionByName(user, projectCode, "test_def");
         Assert.assertEquals(Status.PROJECT_NOT_FOUND, map.get(Constants.STATUS));
 
         // project check auth success, instance not exist
         putMsg(result, Status.SUCCESS, projectCode);
-        Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_DEFINITION))
+        Mockito.when(projectService.checkProjectAndAuth(user, project, projectCode, WORKFLOW_DEFINITION))
                 .thenReturn(result);
-        Mockito.when(processDefineMapper.queryByDefineName(project.getCode(), "test_def")).thenReturn(null);
+        Mockito.when(processDefinitionMapper.queryByDefineName(project.getCode(), "test_def")).thenReturn(null);
 
         Map<String, Object> instanceNotExitRes =
-                processDefinitionService.queryProcessDefinitionByName(loginUser, projectCode, "test_def");
+                processDefinitionService.queryProcessDefinitionByName(user, projectCode, "test_def");
         Assert.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST, instanceNotExitRes.get(Constants.STATUS));
 
         // instance exit
-        Mockito.when(processDefineMapper.queryByDefineName(project.getCode(), "test"))
+        Mockito.when(processDefinitionMapper.queryByDefineName(project.getCode(), "test"))
                 .thenReturn(getProcessDefinition());
         putMsg(result, Status.SUCCESS, projectCode);
-        Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_DEFINITION))
+        Mockito.when(projectService.checkProjectAndAuth(user, project, projectCode, WORKFLOW_DEFINITION))
                 .thenReturn(result);
         Map<String, Object> successRes =
-                processDefinitionService.queryProcessDefinitionByName(loginUser, projectCode, "test");
+                processDefinitionService.queryProcessDefinitionByName(user, projectCode, "test");
         Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS));
     }
 
     @Test
     public void testBatchCopyProcessDefinition() {
-        long projectCode = 1L;
         Project project = getProject(projectCode);
-        User loginUser = new User();
-        loginUser.setId(1);
-        loginUser.setUserType(UserType.GENERAL_USER);
+
         Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(getProject(projectCode));
         Map<String, Object> result = new HashMap<>();
         putMsg(result, Status.SUCCESS, projectCode);
-        Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_BATCH_COPY))
+        Mockito.when(projectService.checkProjectAndAuth(user, project, projectCode, WORKFLOW_BATCH_COPY))
                 .thenReturn(result);
 
         // copy project definition ids empty test
         Map<String, Object> map =
-                processDefinitionService.batchCopyProcessDefinition(loginUser, projectCode, StringUtils.EMPTY, 2L);
+                processDefinitionService.batchCopyProcessDefinition(user, projectCode, StringUtils.EMPTY, 2L);
         Assert.assertEquals(Status.PROCESS_DEFINITION_CODES_IS_EMPTY, map.get(Constants.STATUS));
 
         // project check auth fail
         putMsg(result, Status.PROJECT_NOT_FOUND, projectCode);
-        Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_BATCH_COPY))
+        Mockito.when(projectService.checkProjectAndAuth(user, project, projectCode, WORKFLOW_BATCH_COPY))
                 .thenReturn(result);
         Map<String, Object> map1 = processDefinitionService.batchCopyProcessDefinition(
-                loginUser, projectCode, String.valueOf(project.getId()), 2L);
+                user, projectCode, String.valueOf(project.getId()), 2L);
         Assert.assertEquals(Status.PROJECT_NOT_FOUND, map1.get(Constants.STATUS));
 
         // project check auth success, target project name not equal project name, check auth target project fail
-        projectCode = 2L;
-        Project project1 = getProject(projectCode);
-        Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(project1);
-        Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_BATCH_COPY))
+        Project project1 = getProject(projectCodeOther);
+        Mockito.when(projectMapper.queryByCode(projectCodeOther)).thenReturn(project1);
+        Mockito.when(projectService.checkProjectAndAuth(user, project, projectCodeOther, WORKFLOW_BATCH_COPY))
                 .thenReturn(result);
 
-        putMsg(result, Status.SUCCESS, projectCode);
+        putMsg(result, Status.SUCCESS, projectCodeOther);
         ProcessDefinition definition = getProcessDefinition();
         List<ProcessDefinition> processDefinitionList = new ArrayList<>();
         processDefinitionList.add(definition);
-        Set<Long> definitionCodes =
-                Arrays.stream("46".split(Constants.COMMA)).map(Long::parseLong).collect(Collectors.toSet());
-        Mockito.when(processDefineMapper.queryByCodes(definitionCodes)).thenReturn(processDefinitionList);
-        Mockito.when(processService.saveProcessDefine(loginUser, definition, Boolean.TRUE, Boolean.TRUE)).thenReturn(2);
+        Set<Long> definitionCodes = new HashSet<>();
+        // Change this catch NumberFormatException
+        for (String code : String.valueOf(processDefinitionCode).split(Constants.COMMA)) {
+            try {
+                long parse = Long.parseLong(code);
+                definitionCodes.add(parse);
+            } catch (NumberFormatException e) {
+                Assertions.fail();
+            }
+        }
+        Mockito.when(processDefinitionMapper.queryByCodes(definitionCodes)).thenReturn(processDefinitionList);
+        Mockito.when(processService.saveProcessDefine(user, definition, Boolean.TRUE, Boolean.TRUE)).thenReturn(2);
         Map<String, Object> map3 = processDefinitionService.batchCopyProcessDefinition(
-                loginUser, projectCode, "46", 1L);
+                user, projectCodeOther, String.valueOf(processDefinitionCode), projectCode);
         Assert.assertEquals(Status.SUCCESS, map3.get(Constants.STATUS));
     }
 
     @Test
     public void testBatchMoveProcessDefinition() {
-        long projectCode = 1L;
         Project project1 = getProject(projectCode);
         Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(project1);
 
-        long projectCode2 = 2L;
-        Project project2 = getProject(projectCode2);
-        Mockito.when(projectMapper.queryByCode(projectCode2)).thenReturn(project2);
-
-        User loginUser = new User();
-        loginUser.setId(-1);
-        loginUser.setUserType(UserType.GENERAL_USER);
+        Project project2 = getProject(projectCodeOther);
+        Mockito.when(projectMapper.queryByCode(projectCodeOther)).thenReturn(project2);
 
         Map<String, Object> result = new HashMap<>();
         putMsg(result, Status.SUCCESS, projectCode);
 
-        Mockito.when(projectService.checkProjectAndAuth(loginUser, project1, projectCode, TASK_DEFINITION_MOVE))
+        Mockito.when(projectService.checkProjectAndAuth(user, project1, projectCode, TASK_DEFINITION_MOVE))
                 .thenReturn(result);
-        Mockito.when(projectService.checkProjectAndAuth(loginUser, project2, projectCode2, TASK_DEFINITION_MOVE))
+        Mockito.when(projectService.checkProjectAndAuth(user, project2, projectCodeOther, TASK_DEFINITION_MOVE))
                 .thenReturn(result);
 
         ProcessDefinition definition = getProcessDefinition();
         definition.setVersion(1);
         List<ProcessDefinition> processDefinitionList = new ArrayList<>();
         processDefinitionList.add(definition);
-        Set<Long> definitionCodes =
-                Arrays.stream("46".split(Constants.COMMA)).map(Long::parseLong).collect(Collectors.toSet());
-        Mockito.when(processDefineMapper.queryByCodes(definitionCodes)).thenReturn(processDefinitionList);
-        Mockito.when(processService.saveProcessDefine(loginUser, definition, Boolean.TRUE, Boolean.TRUE)).thenReturn(2);
-        Mockito.when(processTaskRelationMapper.queryByProcessCode(projectCode, 46L))
-                .thenReturn(getProcessTaskRelation(projectCode));
+        Set<Long> definitionCodes = new HashSet<>();
+        // Change this catch NumberFormatException
+        for (String code : String.valueOf(processDefinitionCode).split(Constants.COMMA)) {
+            try {
+                long parse = Long.parseLong(code);
+                definitionCodes.add(parse);
+            } catch (NumberFormatException e) {
+                Assertions.fail();
+            }
+        }
+        Mockito.when(processDefinitionMapper.queryByCodes(definitionCodes)).thenReturn(processDefinitionList);
+        Mockito.when(processService.saveProcessDefine(user, definition, Boolean.TRUE, Boolean.TRUE)).thenReturn(2);
+        Mockito.when(processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode))
+                .thenReturn(getProcessTaskRelation());
         putMsg(result, Status.SUCCESS);
 
         Map<String, Object> successRes = processDefinitionService.batchMoveProcessDefinition(
-                loginUser, projectCode, "46", projectCode2);
+                user, projectCode, String.valueOf(processDefinitionCode), projectCodeOther);
         Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS));
     }
 
     @Test
     public void deleteProcessDefinitionByCodeTest() {
-        long projectCode = 1L;
         Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(getProject(projectCode));
 
         Project project = getProject(projectCode);
-        User loginUser = new User();
-        loginUser.setId(-1);
-        loginUser.setUserType(UserType.GENERAL_USER);
+
+        // process definition not exists
+        exception = Assertions.assertThrows(ServiceException.class,
+                () -> processDefinitionService.deleteProcessDefinitionByCode(user, 2L));
+        Assertions.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST.getCode(), ((ServiceException) exception).getCode());
 
         // project check auth fail
-        Map<String, Object> result = new HashMap<>();
-        putMsg(result, Status.PROJECT_NOT_FOUND, projectCode);
-        Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_DEFINITION_DELETE))
-                .thenReturn(result);
-        Map<String, Object> map = processDefinitionService.deleteProcessDefinitionByCode(loginUser, projectCode, 6L);
-        Assert.assertEquals(Status.PROJECT_NOT_FOUND, map.get(Constants.STATUS));
+        Mockito.when(processDefinitionMapper.queryByCode(6L)).thenReturn(this.getProcessDefinition());
+        Mockito.doThrow(new ServiceException(Status.PROJECT_NOT_FOUND)).when(projectService)
+                .checkProjectAndAuthThrowException(user, project, WORKFLOW_DEFINITION_DELETE);
+        exception = Assertions.assertThrows(ServiceException.class,
+                () -> processDefinitionService.deleteProcessDefinitionByCode(user, 6L));
+        Assertions.assertEquals(Status.PROJECT_NOT_FOUND.getCode(), ((ServiceException) exception).getCode());
 
         // project check auth success, instance not exist
-        putMsg(result, Status.SUCCESS, projectCode);
-        Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_DEFINITION_DELETE))
-                .thenReturn(result);
-        Mockito.when(processDefineMapper.queryByCode(1L)).thenReturn(null);
-        Map<String, Object> instanceNotExitRes =
-                processDefinitionService.deleteProcessDefinitionByCode(loginUser, projectCode, 1L);
-        Assert.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST, instanceNotExitRes.get(Constants.STATUS));
+        Mockito.doNothing().when(projectService).checkProjectAndAuthThrowException(user, project,
+                WORKFLOW_DEFINITION_DELETE);
+        Mockito.when(processDefinitionMapper.queryByCode(1L)).thenReturn(null);
+        exception = Assertions.assertThrows(ServiceException.class,
+                () -> processDefinitionService.deleteProcessDefinitionByCode(user, 1L));
+        Assertions.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST.getCode(), ((ServiceException) exception).getCode());
 
         ProcessDefinition processDefinition = getProcessDefinition();
-        putMsg(result, Status.SUCCESS, projectCode);
-        Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_DEFINITION_DELETE))
-                .thenReturn(result);
         // user no auth
-        loginUser.setUserType(UserType.GENERAL_USER);
-        Mockito.when(processDefineMapper.queryByCode(46L)).thenReturn(processDefinition);
-        Map<String, Object> userNoAuthRes =
-                processDefinitionService.deleteProcessDefinitionByCode(loginUser, projectCode, 46L);
-        Assert.assertEquals(Status.USER_NO_OPERATION_PERM, userNoAuthRes.get(Constants.STATUS));
+        Mockito.when(processDefinitionMapper.queryByCode(46L)).thenReturn(processDefinition);
+        exception = Assertions.assertThrows(ServiceException.class,
+                () -> processDefinitionService.deleteProcessDefinitionByCode(user, 46L));
+        Assertions.assertEquals(Status.USER_NO_OPERATION_PERM.getCode(), ((ServiceException) exception).getCode());
 
         // process definition online
-        loginUser.setUserType(UserType.ADMIN_USER);
-        putMsg(result, Status.SUCCESS, projectCode);
+        user.setUserType(UserType.ADMIN_USER);
         processDefinition.setReleaseState(ReleaseState.ONLINE);
-        Mockito.when(processDefineMapper.queryByCode(46L)).thenReturn(processDefinition);
-        Throwable exception = Assertions.assertThrows(ServiceException.class,
-                () -> processDefinitionService.deleteProcessDefinitionByCode(loginUser, projectCode, 46L));
-        String formatter =
-                MessageFormat.format(Status.PROCESS_DEFINE_STATE_ONLINE.getMsg(), processDefinition.getName());
-        Assertions.assertEquals(formatter, exception.getMessage());
+        Mockito.when(processDefinitionMapper.queryByCode(46L)).thenReturn(processDefinition);
+        exception = Assertions.assertThrows(ServiceException.class,
+                () -> processDefinitionService.deleteProcessDefinitionByCode(user, 46L));
+        Assertions.assertEquals(Status.PROCESS_DEFINE_STATE_ONLINE.getCode(), ((ServiceException) exception).getCode());
 
         // scheduler list elements > 1
         processDefinition.setReleaseState(ReleaseState.OFFLINE);
-        Mockito.when(processDefineMapper.queryByCode(46L)).thenReturn(processDefinition);
-        putMsg(result, Status.SUCCESS, projectCode);
+        Mockito.when(processDefinitionMapper.queryByCode(46L)).thenReturn(processDefinition);
         Mockito.when(scheduleMapper.queryByProcessDefinitionCode(46L)).thenReturn(getSchedule());
         Mockito.when(scheduleMapper.deleteById(46)).thenReturn(1);
-        Mockito.when(processDefineMapper.deleteById(processDefinition.getId())).thenReturn(1);
+        Mockito.when(processDefinitionMapper.deleteById(processDefinition.getId())).thenReturn(1);
         Mockito.when(processTaskRelationMapper.deleteByCode(project.getCode(), processDefinition.getCode()))
                 .thenReturn(1);
         Mockito.when(workFlowLineageService.queryTaskDepOnProcess(project.getCode(), processDefinition.getCode()))
                 .thenReturn(Collections.emptySet());
-        Map<String, Object> schedulerGreaterThanOneRes =
-                processDefinitionService.deleteProcessDefinitionByCode(loginUser, projectCode, 46L);
-        Assert.assertEquals(Status.SUCCESS, schedulerGreaterThanOneRes.get(Constants.STATUS));
+        processDefinitionService.deleteProcessDefinitionByCode(user, 46L);
 
         // scheduler online
         Schedule schedule = getSchedule();
         schedule.setReleaseState(ReleaseState.ONLINE);
-        putMsg(result, Status.SUCCESS, projectCode);
         Mockito.when(scheduleMapper.queryByProcessDefinitionCode(46L)).thenReturn(schedule);
-        Mockito.when(workFlowLineageService.queryTaskDepOnProcess(project.getCode(), processDefinition.getCode()))
-                .thenReturn(Collections.emptySet());
-        Map<String, Object> schedulerOnlineRes =
-                processDefinitionService.deleteProcessDefinitionByCode(loginUser, projectCode, 46L);
-        Assert.assertEquals(Status.SCHEDULE_CRON_STATE_ONLINE, schedulerOnlineRes.get(Constants.STATUS));
+        exception = Assertions.assertThrows(ServiceException.class,
+                () -> processDefinitionService.deleteProcessDefinitionByCode(user, 46L));
+        Assertions.assertEquals(Status.SCHEDULE_STATE_ONLINE.getCode(), ((ServiceException) exception).getCode());
 
         // process used by other task, sub process
-        loginUser.setUserType(UserType.ADMIN_USER);
-        putMsg(result, Status.SUCCESS, projectCode);
+        user.setUserType(UserType.ADMIN_USER);
         TaskMainInfo taskMainInfo = getTaskMainInfo().get(0);
         Mockito.when(workFlowLineageService.queryTaskDepOnProcess(project.getCode(), processDefinition.getCode()))
                 .thenReturn(ImmutableSet.copyOf(getTaskMainInfo()));
         exception = Assertions.assertThrows(ServiceException.class,
-                () -> processDefinitionService.deleteProcessDefinitionByCode(loginUser, projectCode, 46L));
-        formatter = MessageFormat.format(Status.DELETE_PROCESS_DEFINITION_USE_BY_OTHER_FAIL.getMsg(),
-                String.format("%s:%s", taskMainInfo.getProcessDefinitionName(), taskMainInfo.getTaskName()));
-        Assertions.assertEquals(formatter, exception.getMessage());
+                () -> processDefinitionService.deleteProcessDefinitionByCode(user, 46L));
+        Assertions.assertEquals(Status.DELETE_PROCESS_DEFINITION_USE_BY_OTHER_FAIL.getCode(),
+                ((ServiceException) exception).getCode());
 
         // delete success
         schedule.setReleaseState(ReleaseState.OFFLINE);
-        Mockito.when(processDefineMapper.deleteById(46)).thenReturn(1);
+        Mockito.when(processDefinitionMapper.deleteById(46)).thenReturn(1);
         Mockito.when(scheduleMapper.deleteById(schedule.getId())).thenReturn(1);
         Mockito.when(processTaskRelationMapper.deleteByCode(project.getCode(), processDefinition.getCode()))
                 .thenReturn(1);
         Mockito.when(scheduleMapper.queryByProcessDefinitionCode(46L)).thenReturn(getSchedule());
         Mockito.when(workFlowLineageService.queryTaskDepOnProcess(project.getCode(), processDefinition.getCode()))
                 .thenReturn(Collections.emptySet());
-        putMsg(result, Status.SUCCESS, projectCode);
-        Map<String, Object> deleteSuccess =
-                processDefinitionService.deleteProcessDefinitionByCode(loginUser, projectCode, 46L);
-        Assert.assertEquals(Status.SUCCESS, deleteSuccess.get(Constants.STATUS));
+        Assertions.assertDoesNotThrow(() -> processDefinitionService.deleteProcessDefinitionByCode(user, 46L));
     }
 
     @Test
-    @Ignore
     public void testReleaseProcessDefinition() {
-        long projectCode = 1L;
         Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(getProject(projectCode));
 
         Project project = getProject(projectCode);
-        User loginUser = new User();
-        loginUser.setId(1);
-        loginUser.setUserType(UserType.GENERAL_USER);
 
         // project check auth fail
         Map<String, Object> result = new HashMap<>();
         putMsg(result, Status.PROJECT_NOT_FOUND, projectCode);
-        Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, null)).thenReturn(result);
-        Map<String, Object> map = processDefinitionService.releaseProcessDefinition(loginUser, projectCode,
-                6, ReleaseState.OFFLINE);
+        Mockito.when(projectService.checkProjectAndAuth(user, project, projectCode, WORKFLOW_ONLINE_OFFLINE))
+                .thenReturn(result);
+        Map<String, Object> map = processDefinitionService.releaseProcessDefinition(user, projectCode,
+                processDefinitionCode, ReleaseState.OFFLINE);
         Assert.assertEquals(Status.PROJECT_NOT_FOUND, map.get(Constants.STATUS));
 
-        // project check auth success, processs definition online
+        // project check auth success, processes definition online
         putMsg(result, Status.SUCCESS, projectCode);
-        Mockito.when(processDefineMapper.queryByCode(46L)).thenReturn(getProcessDefinition());
+        Mockito.when(processDefinitionMapper.queryByCode(46L)).thenReturn(getProcessDefinition());
         List<ProcessTaskRelation> processTaskRelationList = new ArrayList<>();
         ProcessTaskRelation processTaskRelation = new ProcessTaskRelation();
         processTaskRelation.setProjectCode(projectCode);
@@ -530,52 +529,46 @@ public class ProcessDefinitionServiceTest {
         processTaskRelation.setPostTaskCode(123L);
         processTaskRelationList.add(processTaskRelation);
         Mockito.when(processService.findRelationByCode(46L, 1)).thenReturn(processTaskRelationList);
-        Map<String, Object> onlineRes = processDefinitionService.releaseProcessDefinition(
-                loginUser, projectCode, 46, ReleaseState.ONLINE);
+        Map<String, Object> onlineRes =
+                processDefinitionService.releaseProcessDefinition(user, projectCode, 46, ReleaseState.ONLINE);
         Assert.assertEquals(Status.SUCCESS, onlineRes.get(Constants.STATUS));
 
-        // project check auth success, processs definition online
-        Map<String, Object> onlineWithResourceRes = processDefinitionService.releaseProcessDefinition(
-                loginUser, projectCode, 46, ReleaseState.ONLINE);
+        // project check auth success, processes definition online
+        Map<String, Object> onlineWithResourceRes =
+                processDefinitionService.releaseProcessDefinition(user, projectCode, 46, ReleaseState.ONLINE);
         Assert.assertEquals(Status.SUCCESS, onlineWithResourceRes.get(Constants.STATUS));
 
         // release error code
-        Map<String, Object> failRes = processDefinitionService.releaseProcessDefinition(
-                loginUser, projectCode, 46, ReleaseState.getEnum(2));
+        Map<String, Object> failRes =
+                processDefinitionService.releaseProcessDefinition(user, projectCode, 46, ReleaseState.getEnum(2));
         Assert.assertEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR, failRes.get(Constants.STATUS));
-
     }
 
     @Test
     public void testVerifyProcessDefinitionName() {
-        long projectCode = 1L;
         Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(getProject(projectCode));
-
         Project project = getProject(projectCode);
-        User loginUser = new User();
-        loginUser.setId(-1);
-        loginUser.setUserType(UserType.GENERAL_USER);
 
         // project check auth fail
         Map<String, Object> result = new HashMap<>();
         putMsg(result, Status.PROJECT_NOT_FOUND, projectCode);
-        Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_CREATE))
+        Mockito.when(projectService.checkProjectAndAuth(user, project, projectCode, WORKFLOW_CREATE))
                 .thenReturn(result);
-        Map<String, Object> map = processDefinitionService.verifyProcessDefinitionName(loginUser,
+        Map<String, Object> map = processDefinitionService.verifyProcessDefinitionName(user,
                 projectCode, "test_pdf", 0);
         Assert.assertEquals(Status.PROJECT_NOT_FOUND, map.get(Constants.STATUS));
 
         // project check auth success, process not exist
         putMsg(result, Status.SUCCESS, projectCode);
-        Mockito.when(processDefineMapper.verifyByDefineName(project.getCode(), "test_pdf")).thenReturn(null);
-        Map<String, Object> processNotExistRes = processDefinitionService.verifyProcessDefinitionName(loginUser,
-                projectCode, "test_pdf", 0);
+        Mockito.when(processDefinitionMapper.verifyByDefineName(project.getCode(), "test_pdf")).thenReturn(null);
+        Map<String, Object> processNotExistRes =
+                processDefinitionService.verifyProcessDefinitionName(user, projectCode, "test_pdf", 0);
         Assert.assertEquals(Status.SUCCESS, processNotExistRes.get(Constants.STATUS));
 
         // process exist
-        Mockito.when(processDefineMapper.verifyByDefineName(project.getCode(), "test_pdf"))
+        Mockito.when(processDefinitionMapper.verifyByDefineName(project.getCode(), "test_pdf"))
                 .thenReturn(getProcessDefinition());
-        Map<String, Object> processExistRes = processDefinitionService.verifyProcessDefinitionName(loginUser,
+        Map<String, Object> processExistRes = processDefinitionService.verifyProcessDefinitionName(user,
                 projectCode, "test_pdf", 0);
         Assert.assertEquals(Status.PROCESS_DEFINITION_NAME_EXIST, processExistRes.get(Constants.STATUS));
     }
@@ -594,55 +587,45 @@ public class ProcessDefinitionServiceTest {
 
     @Test
     public void testGetTaskNodeListByDefinitionCode() {
-        long projectCode = 1L;
         Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(getProject(projectCode));
-
         Project project = getProject(projectCode);
-        User loginUser = new User();
-        loginUser.setId(-1);
-        loginUser.setUserType(UserType.GENERAL_USER);
 
         // project check auth fail
         Map<String, Object> result = new HashMap<>();
         putMsg(result, Status.SUCCESS, projectCode);
-        Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, null)).thenReturn(result);
+        Mockito.when(projectService.checkProjectAndAuth(user, project, projectCode, null)).thenReturn(result);
         // process definition not exist
-        Mockito.when(processDefineMapper.queryByCode(46L)).thenReturn(null);
+        Mockito.when(processDefinitionMapper.queryByCode(46L)).thenReturn(null);
         Map<String, Object> processDefinitionNullRes =
-                processDefinitionService.getTaskNodeListByDefinitionCode(loginUser, projectCode, 46L);
+                processDefinitionService.getTaskNodeListByDefinitionCode(user, projectCode, 46L);
         Assert.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionNullRes.get(Constants.STATUS));
 
         // success
         ProcessDefinition processDefinition = getProcessDefinition();
         putMsg(result, Status.SUCCESS, projectCode);
         Mockito.when(processService.genDagData(Mockito.any())).thenReturn(new DagData(processDefinition, null, null));
-        Mockito.when(processDefineMapper.queryByCode(46L)).thenReturn(processDefinition);
+        Mockito.when(processDefinitionMapper.queryByCode(46L)).thenReturn(processDefinition);
         Map<String, Object> dataNotValidRes =
-                processDefinitionService.getTaskNodeListByDefinitionCode(loginUser, projectCode, 46L);
+                processDefinitionService.getTaskNodeListByDefinitionCode(user, projectCode, 46L);
         Assert.assertEquals(Status.SUCCESS, dataNotValidRes.get(Constants.STATUS));
     }
 
     @Test
     public void testGetTaskNodeListByDefinitionCodes() {
-        long projectCode = 1L;
         Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(getProject(projectCode));
-
         Project project = getProject(projectCode);
-        User loginUser = new User();
-        loginUser.setId(-1);
-        loginUser.setUserType(UserType.GENERAL_USER);
 
         // project check auth fail
         Map<String, Object> result = new HashMap<>();
         putMsg(result, Status.SUCCESS, projectCode);
-        Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, null)).thenReturn(result);
+        Mockito.when(projectService.checkProjectAndAuth(user, project, projectCode, null)).thenReturn(result);
         // process definition not exist
         String defineCodes = "46";
         Set<Long> defineCodeSet = Lists.newArrayList(defineCodes.split(Constants.COMMA)).stream().map(Long::parseLong)
                 .collect(Collectors.toSet());
-        Mockito.when(processDefineMapper.queryByCodes(defineCodeSet)).thenReturn(null);
+        Mockito.when(processDefinitionMapper.queryByCodes(defineCodeSet)).thenReturn(null);
         Map<String, Object> processNotExistRes =
-                processDefinitionService.getNodeListMapByDefinitionCodes(loginUser, projectCode, defineCodes);
+                processDefinitionService.getNodeListMapByDefinitionCodes(user, projectCode, defineCodes);
         Assert.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST, processNotExistRes.get(Constants.STATUS));
 
         putMsg(result, Status.SUCCESS, projectCode);
@@ -650,111 +633,94 @@ public class ProcessDefinitionServiceTest {
         List<ProcessDefinition> processDefinitionList = new ArrayList<>();
         processDefinitionList.add(processDefinition);
 
-        Mockito.when(processDefineMapper.queryByCodes(defineCodeSet)).thenReturn(processDefinitionList);
+        Mockito.when(processDefinitionMapper.queryByCodes(defineCodeSet)).thenReturn(processDefinitionList);
         Mockito.when(processService.genDagData(Mockito.any())).thenReturn(new DagData(processDefinition, null, null));
         Project project1 = getProject(projectCode);
         List<Project> projects = new ArrayList<>();
         projects.add(project1);
-        Mockito.when(projectMapper.queryProjectCreatedAndAuthorizedByUserId(loginUser.getId())).thenReturn(projects);
+        Mockito.when(projectMapper.queryProjectCreatedAndAuthorizedByUserId(user.getId())).thenReturn(projects);
 
         Map<String, Object> successRes =
-                processDefinitionService.getNodeListMapByDefinitionCodes(loginUser, projectCode, defineCodes);
+                processDefinitionService.getNodeListMapByDefinitionCodes(user, projectCode, defineCodes);
         Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS));
     }
 
     @Test
     public void testQueryAllProcessDefinitionByProjectCode() {
-        User loginUser = new User();
-        loginUser.setId(1);
-        loginUser.setUserType(UserType.GENERAL_USER);
         Map<String, Object> result = new HashMap<>();
-        long projectCode = 2L;
         Project project = getProject(projectCode);
         Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(project);
         putMsg(result, Status.SUCCESS, projectCode);
-        Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_DEFINITION))
+        Mockito.when(projectService.checkProjectAndAuth(user, project, projectCode, WORKFLOW_DEFINITION))
                 .thenReturn(result);
         ProcessDefinition processDefinition = getProcessDefinition();
         List<ProcessDefinition> processDefinitionList = new ArrayList<>();
         processDefinitionList.add(processDefinition);
-        Mockito.when(processDefineMapper.queryAllDefinitionList(projectCode)).thenReturn(processDefinitionList);
+        Mockito.when(processDefinitionMapper.queryAllDefinitionList(projectCode)).thenReturn(processDefinitionList);
         Map<String, Object> successRes =
-                processDefinitionService.queryAllProcessDefinitionByProjectCode(loginUser, projectCode);
+                processDefinitionService.queryAllProcessDefinitionByProjectCode(user, projectCode);
         Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS));
     }
 
     @Test
     public void testViewTree() {
-        User loginUser = new User();
-        loginUser.setId(1);
-        loginUser.setTenantId(1);
-        loginUser.setUserType(UserType.ADMIN_USER);
-        long projectCode = 1;
         Project project1 = getProject(projectCode);
         Map<String, Object> result = new HashMap<>();
         putMsg(result, Status.SUCCESS, projectCode);
         Mockito.when(projectMapper.queryByCode(1)).thenReturn(project1);
-        Mockito.when(projectService.checkProjectAndAuth(loginUser, project1, projectCode, WORKFLOW_TREE_VIEW))
+        Mockito.when(projectService.checkProjectAndAuth(user, project1, projectCode, WORKFLOW_TREE_VIEW))
                 .thenReturn(result);
         // process definition not exist
         ProcessDefinition processDefinition = getProcessDefinition();
         Map<String, Object> processDefinitionNullRes =
-                processDefinitionService.viewTree(loginUser, processDefinition.getProjectCode(), 46, 10);
+                processDefinitionService.viewTree(user, processDefinition.getProjectCode(), 46, 10);
         Assert.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionNullRes.get(Constants.STATUS));
 
         // task instance not existproject
         putMsg(result, Status.SUCCESS, projectCode);
         Mockito.when(projectMapper.queryByCode(1)).thenReturn(project1);
-        Mockito.when(projectService.checkProjectAndAuth(loginUser, project1, 1, WORKFLOW_TREE_VIEW)).thenReturn(result);
-        Mockito.when(processDefineMapper.queryByCode(46L)).thenReturn(processDefinition);
+        Mockito.when(projectService.checkProjectAndAuth(user, project1, 1, WORKFLOW_TREE_VIEW)).thenReturn(result);
+        Mockito.when(processDefinitionMapper.queryByCode(46L)).thenReturn(processDefinition);
         Mockito.when(processService.genDagGraph(processDefinition)).thenReturn(new DAG<>());
         Map<String, Object> taskNullRes =
-                processDefinitionService.viewTree(loginUser, processDefinition.getProjectCode(), 46, 10);
+                processDefinitionService.viewTree(user, processDefinition.getProjectCode(), 46, 10);
         Assert.assertEquals(Status.SUCCESS, taskNullRes.get(Constants.STATUS));
 
         // task instance exist
         Map<String, Object> taskNotNuLLRes =
-                processDefinitionService.viewTree(loginUser, processDefinition.getProjectCode(), 46, 10);
+                processDefinitionService.viewTree(user, processDefinition.getProjectCode(), 46, 10);
         Assert.assertEquals(Status.SUCCESS, taskNotNuLLRes.get(Constants.STATUS));
     }
 
     @Test
     public void testSubProcessViewTree() {
-        User loginUser = new User();
-        loginUser.setId(1);
-        loginUser.setUserType(UserType.ADMIN_USER);
         ProcessDefinition processDefinition = getProcessDefinition();
-        Mockito.when(processDefineMapper.queryByCode(46L)).thenReturn(processDefinition);
+        Mockito.when(processDefinitionMapper.queryByCode(46L)).thenReturn(processDefinition);
 
         Project project1 = getProject(1);
         Map<String, Object> result = new HashMap<>();
         result.put(Constants.STATUS, Status.SUCCESS);
         Mockito.when(projectMapper.queryByCode(1)).thenReturn(project1);
-        Mockito.when(projectService.checkProjectAndAuth(loginUser, project1, 1, WORKFLOW_TREE_VIEW)).thenReturn(result);
+        Mockito.when(projectService.checkProjectAndAuth(user, project1, 1, WORKFLOW_TREE_VIEW)).thenReturn(result);
 
         Mockito.when(processService.genDagGraph(processDefinition)).thenReturn(new DAG<>());
         Map<String, Object> taskNotNuLLRes =
-                processDefinitionService.viewTree(loginUser, processDefinition.getProjectCode(), 46, 10);
+                processDefinitionService.viewTree(user, processDefinition.getProjectCode(), 46, 10);
         Assert.assertEquals(Status.SUCCESS, taskNotNuLLRes.get(Constants.STATUS));
     }
 
     @Test
     public void testUpdateProcessDefinition() {
-        User loginUser = new User();
-        loginUser.setId(1);
-        loginUser.setUserType(UserType.ADMIN_USER);
-
         Map<String, Object> result = new HashMap<>();
         putMsg(result, Status.SUCCESS);
 
-        long projectCode = 1L;
         Project project = getProject(projectCode);
         Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(getProject(projectCode));
-        Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_UPDATE))
+        Mockito.when(projectService.checkProjectAndAuth(user, project, projectCode, WORKFLOW_UPDATE))
                 .thenReturn(result);
 
         try {
-            processDefinitionService.updateProcessDefinition(loginUser, projectCode, "test", 1,
+            processDefinitionService.updateProcessDefinition(user, projectCode, "test", 1,
                     "", "", "", 0, "root", null, "", null, ProcessExecutionTypeEnum.PARALLEL);
             Assert.fail();
         } catch (ServiceException ex) {
@@ -765,41 +731,26 @@ public class ProcessDefinitionServiceTest {
     @Test
     public void testBatchExportProcessDefinitionByCodes() {
         processDefinitionService.batchExportProcessDefinitionByCodes(null, 1L, null, null);
-
-        User loginUser = new User();
-        loginUser.setId(1);
-        loginUser.setUserType(UserType.ADMIN_USER);
-
-        long projectCode = 1L;
         Project project = getProject(projectCode);
 
         Map<String, Object> result = new HashMap<>();
         putMsg(result, Status.PROJECT_NOT_FOUND);
         Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(getProject(projectCode));
-        processDefinitionService.batchExportProcessDefinitionByCodes(
-                loginUser, projectCode, "1", null);
+        processDefinitionService.batchExportProcessDefinitionByCodes(user, projectCode, "1", null);
 
         ProcessDefinition processDefinition = new ProcessDefinition();
         processDefinition.setId(1);
-        Map<String, Object> checkResult = new HashMap<>();
-        checkResult.put(Constants.STATUS, Status.SUCCESS);
         Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(project);
-        HttpServletResponse response = mock(HttpServletResponse.class);
+        HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
 
         DagData dagData = new DagData(getProcessDefinition(), null, null);
         Mockito.when(processService.genDagData(Mockito.any())).thenReturn(dagData);
-        processDefinitionService.batchExportProcessDefinitionByCodes(loginUser, projectCode, "1", response);
+        processDefinitionService.batchExportProcessDefinitionByCodes(user, projectCode, "1", response);
         Assert.assertNotNull(processDefinitionService.exportProcessDagData(processDefinition));
     }
 
     @Test
     public void testImportSqlProcessDefinition() throws Exception {
-        int userId = 10;
-        User loginUser = Mockito.mock(User.class);
-        Mockito.when(loginUser.getId()).thenReturn(userId);
-        Mockito.when(loginUser.getTenantId()).thenReturn(2);
-        Mockito.when(loginUser.getUserType()).thenReturn(UserType.GENERAL_USER);
-
         ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
         ZipOutputStream outputStream = new ZipOutputStream(byteArrayOutputStream);
         outputStream.putNextEntry(new ZipEntry("import_sql/"));
@@ -821,24 +772,23 @@ public class ProcessDefinitionServiceTest {
         Mockito.when(dataSource.getId()).thenReturn(1);
         Mockito.when(dataSource.getType()).thenReturn(DbType.MYSQL);
 
-        Mockito.when(dataSourceMapper.queryDataSourceByNameAndUserId(userId, "mysql_1")).thenReturn(dataSource);
+        Mockito.when(dataSourceMapper.queryDataSourceByNameAndUserId(user.getId(), "mysql_1")).thenReturn(dataSource);
 
-        long projectCode = 1001;
-        Project project1 = getProject(projectCode);
+        Project project = getProject(projectCode);
         Map<String, Object> result = new HashMap<>();
         result.put(Constants.STATUS, Status.SUCCESS);
         Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(getProject(projectCode));
-        Mockito.when(projectService.checkProjectAndAuth(loginUser, project1, projectCode, WORKFLOW_IMPORT))
+        Mockito.when(projectService.checkProjectAndAuth(user, project, projectCode, WORKFLOW_IMPORT))
                 .thenReturn(result);
-        Mockito.when(processService.saveTaskDefine(Mockito.same(loginUser), Mockito.eq(projectCode), Mockito.notNull(),
+        Mockito.when(processService.saveTaskDefine(Mockito.same(user), Mockito.eq(projectCode), Mockito.notNull(),
                 Mockito.anyBoolean())).thenReturn(2);
-        Mockito.when(processService.saveProcessDefine(Mockito.same(loginUser), Mockito.notNull(), Mockito.notNull(),
+        Mockito.when(processService.saveProcessDefine(Mockito.same(user), Mockito.notNull(), Mockito.notNull(),
                 Mockito.anyBoolean())).thenReturn(1);
         Mockito.when(
-                processService.saveTaskRelation(Mockito.same(loginUser), Mockito.eq(projectCode), Mockito.anyLong(),
+                processService.saveTaskRelation(Mockito.same(user), Mockito.eq(projectCode), Mockito.anyLong(),
                         Mockito.eq(1), Mockito.notNull(), Mockito.notNull(), Mockito.anyBoolean()))
                 .thenReturn(0);
-        result = processDefinitionService.importSqlProcessDefinition(loginUser, projectCode, mockMultipartFile);
+        result = processDefinitionService.importSqlProcessDefinition(user, projectCode, mockMultipartFile);
 
         Assert.assertEquals(result.get(Constants.STATUS), Status.SUCCESS);
     }
@@ -856,6 +806,198 @@ public class ProcessDefinitionServiceTest {
         Assert.assertEquals(2, newName3.split(Constants.IMPORT_SUFFIX).length);
     }
 
+    @Test
+    public void testCreateProcessDefinitionV2() {
+        Project project = this.getProject(projectCode);
+
+        WorkflowCreateRequest workflowCreateRequest = new WorkflowCreateRequest();
+        workflowCreateRequest.setName(name);
+        workflowCreateRequest.setProjectCode(projectCode);
+
+        // project not exists
+        exception = Assertions.assertThrows(ServiceException.class,
+                () -> processDefinitionService.createSingleProcessDefinition(user, workflowCreateRequest));
+        Assertions.assertEquals(Status.PROJECT_NOT_FOUND.getCode(), ((ServiceException) exception).getCode());
+
+        // project permission error
+        Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(project);
+        Mockito.doThrow(new ServiceException(Status.USER_NO_OPERATION_PROJECT_PERM)).when(projectService)
+                .checkProjectAndAuthThrowException(user, project, WORKFLOW_CREATE);
+        exception = Assertions.assertThrows(ServiceException.class,
+                () -> processDefinitionService.createSingleProcessDefinition(user, workflowCreateRequest));
+        Assertions.assertEquals(Status.USER_NO_OPERATION_PROJECT_PERM.getCode(),
+                ((ServiceException) exception).getCode());
+
+        // description too long
+        workflowCreateRequest.setDescription(taskDefinitionJson);
+        Mockito.doThrow(new ServiceException(Status.DESCRIPTION_TOO_LONG_ERROR)).when(projectService)
+                .checkProjectAndAuthThrowException(user, project, WORKFLOW_CREATE);
+        exception = Assertions.assertThrows(ServiceException.class,
+                () -> processDefinitionService.createSingleProcessDefinition(user, workflowCreateRequest));
+        Assertions.assertEquals(Status.DESCRIPTION_TOO_LONG_ERROR.getCode(), ((ServiceException) exception).getCode());
+        workflowCreateRequest.setDescription(EMPTY_STRING);
+
+        // duplicate process definition name
+        Mockito.doNothing().when(projectService).checkProjectAndAuthThrowException(user, project, WORKFLOW_CREATE);
+        Mockito.when(processDefinitionMapper.verifyByDefineName(project.getCode(), name))
+                .thenReturn(this.getProcessDefinition());
+        exception = Assertions.assertThrows(ServiceException.class,
+                () -> processDefinitionService.createSingleProcessDefinition(user, workflowCreateRequest));
+        Assertions.assertEquals(Status.PROCESS_DEFINITION_NAME_EXIST.getCode(),
+                ((ServiceException) exception).getCode());
+
+        // tenant not exists
+        Mockito.when(processDefinitionMapper.verifyByDefineName(project.getCode(), name)).thenReturn(null);
+        exception = Assertions.assertThrows(ServiceException.class,
+                () -> processDefinitionService.createSingleProcessDefinition(user, workflowCreateRequest));
+        Assertions.assertEquals(Status.TENANT_NOT_EXIST.getCode(), ((ServiceException) exception).getCode());
+
+        // test success
+        workflowCreateRequest.setTenantCode(DEFAULT);
+        workflowCreateRequest.setDescription(description);
+        workflowCreateRequest.setTimeout(timeout);
+        workflowCreateRequest.setReleaseState(releaseState);
+        workflowCreateRequest.setWarningGroupId(warningGroupId);
+        workflowCreateRequest.setExecutionType(executionType);
+        Mockito.when(processDefinitionLogMapper.insert(Mockito.any())).thenReturn(1);
+        Mockito.when(processDefinitionMapper.insert(Mockito.any())).thenReturn(1);
+        ProcessDefinition processDefinition =
+                processDefinitionService.createSingleProcessDefinition(user, workflowCreateRequest);
+
+        Assertions.assertTrue(processDefinition.getCode() > 0L);
+        Assertions.assertEquals(workflowCreateRequest.getName(), processDefinition.getName());
+        Assertions.assertEquals(workflowCreateRequest.getDescription(), processDefinition.getDescription());
+        Assertions.assertTrue(StringUtils.endsWithIgnoreCase(workflowCreateRequest.getReleaseState(),
+                processDefinition.getReleaseState().getDescp()));
+        Assertions.assertEquals(workflowCreateRequest.getTimeout(), processDefinition.getTimeout());
+        Assertions.assertTrue(StringUtils.endsWithIgnoreCase(workflowCreateRequest.getExecutionType(),
+                processDefinition.getExecutionType().getDescp()));
+    }
+
+    @Test
+    public void testFilterProcessDefinition() {
+        Project project = this.getProject(projectCode);
+        WorkflowFilterRequest workflowFilterRequest = new WorkflowFilterRequest();
+        workflowFilterRequest.setProjectName(project.getName());
+
+        // project permission error
+        Mockito.when(projectMapper.queryByName(project.getName())).thenReturn(project);
+        Mockito.doThrow(new ServiceException(Status.USER_NO_OPERATION_PROJECT_PERM, user.getUserName(), projectCode))
+                .when(projectService).checkProjectAndAuthThrowException(user, project, WORKFLOW_DEFINITION);
+        exception = Assertions.assertThrows(ServiceException.class,
+                () -> processDefinitionService.filterProcessDefinition(user, workflowFilterRequest));
+        Assertions.assertEquals(Status.USER_NO_OPERATION_PROJECT_PERM.getCode(),
+                ((ServiceException) exception).getCode());
+    }
+
+    @Test
+    public void testGetProcessDefinition() {
+        // process definition not exists
+        exception = Assertions.assertThrows(ServiceException.class,
+                () -> processDefinitionService.getProcessDefinition(user, processDefinitionCode));
+        Assertions.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST.getCode(), ((ServiceException) exception).getCode());
+
+        // project permission error
+        Mockito.when(processDefinitionMapper.queryByCode(processDefinitionCode))
+                .thenReturn(this.getProcessDefinition());
+        Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(this.getProject(projectCode));
+        Mockito.doThrow(new ServiceException(Status.USER_NO_OPERATION_PROJECT_PERM, user.getUserName(), projectCode))
+                .when(projectService)
+                .checkProjectAndAuthThrowException(user, this.getProject(projectCode), WORKFLOW_DEFINITION);
+        exception = Assertions.assertThrows(ServiceException.class,
+                () -> processDefinitionService.getProcessDefinition(user, processDefinitionCode));
+        Assertions.assertEquals(Status.USER_NO_OPERATION_PROJECT_PERM.getCode(),
+                ((ServiceException) exception).getCode());
+
+        // success
+        Mockito.doNothing().when(projectService).checkProjectAndAuthThrowException(user, this.getProject(projectCode),
+                WORKFLOW_DEFINITION);
+        ProcessDefinition processDefinition =
+                processDefinitionService.getProcessDefinition(user, processDefinitionCode);
+        Assertions.assertEquals(this.getProcessDefinition(), processDefinition);
+    }
+
+    @Test
+    public void testUpdateProcessDefinitionV2() {
+        ProcessDefinition processDefinition;
+
+        WorkflowUpdateRequest workflowUpdateRequest = new WorkflowUpdateRequest();
+        workflowUpdateRequest.setName(name);
+
+        // error process definition not exists
+        exception = Assertions.assertThrows(ServiceException.class,
+                () -> processDefinitionService.getProcessDefinition(user, processDefinitionCode));
+        Assertions.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST.getCode(), ((ServiceException) exception).getCode());
+
+        // error old process definition in release state
+        processDefinition = this.getProcessDefinition();
+        processDefinition.setReleaseState(ReleaseState.ONLINE);
+        Mockito.when(processDefinitionMapper.queryByCode(processDefinitionCode)).thenReturn(processDefinition);
+        exception = Assertions.assertThrows(ServiceException.class, () -> processDefinitionService
+                .updateSingleProcessDefinition(user, processDefinitionCode, workflowUpdateRequest));
+        Assertions.assertEquals(Status.PROCESS_DEFINE_NOT_ALLOWED_EDIT.getCode(),
+                ((ServiceException) exception).getCode());
+
+        // error project permission
+        processDefinition = this.getProcessDefinition();
+        Mockito.when(processDefinitionMapper.queryByCode(processDefinitionCode)).thenReturn(processDefinition);
+        Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(this.getProject(projectCode));
+        Mockito.doThrow(new ServiceException(Status.USER_NO_OPERATION_PROJECT_PERM, user.getUserName(), projectCode))
+                .when(projectService)
+                .checkProjectAndAuthThrowException(user, this.getProject(projectCode), WORKFLOW_DEFINITION);
+        exception = Assertions.assertThrows(ServiceException.class,
+                () -> processDefinitionService.getProcessDefinition(user, processDefinitionCode));
+        Assertions.assertEquals(Status.USER_NO_OPERATION_PROJECT_PERM.getCode(),
+                ((ServiceException) exception).getCode());
+
+        // error description too long
+        workflowUpdateRequest.setDescription(taskDefinitionJson);
+        Mockito.doThrow(new ServiceException(Status.DESCRIPTION_TOO_LONG_ERROR)).when(projectService)
+                .checkProjectAndAuthThrowException(user, this.getProject(projectCode), WORKFLOW_UPDATE);
+        exception = Assertions.assertThrows(ServiceException.class, () -> processDefinitionService
+                .updateSingleProcessDefinition(user, processDefinitionCode, workflowUpdateRequest));
+        Assertions.assertEquals(Status.DESCRIPTION_TOO_LONG_ERROR.getCode(), ((ServiceException) exception).getCode());
+        workflowUpdateRequest.setDescription(EMPTY_STRING);
+
+        // error new definition name already exists
+        Mockito.doNothing().when(projectService).checkProjectAndAuthThrowException(user, this.getProject(projectCode),
+                WORKFLOW_UPDATE);
+        Mockito.when(processDefinitionMapper.verifyByDefineName(projectCode, workflowUpdateRequest.getName()))
+                .thenReturn(this.getProcessDefinition());
+        exception = Assertions.assertThrows(ServiceException.class, () -> processDefinitionService
+                .updateSingleProcessDefinition(user, processDefinitionCode, workflowUpdateRequest));
+        Assertions.assertEquals(Status.PROCESS_DEFINITION_NAME_EXIST.getCode(),
+                ((ServiceException) exception).getCode());
+
+        // error tenant code not exists
+        processDefinition = this.getProcessDefinition();
+        workflowUpdateRequest.setTenantCode(tenantCode);
+        Mockito.when(processDefinitionMapper.queryByCode(processDefinitionCode)).thenReturn(processDefinition);
+        Mockito.when(processDefinitionMapper.verifyByDefineName(projectCode, workflowUpdateRequest.getName()))
+                .thenReturn(null);
+        Mockito.when(tenantMapper.queryByTenantCode(workflowUpdateRequest.getTenantCode())).thenReturn(null);
+        exception = Assertions.assertThrows(ServiceException.class, () -> processDefinitionService
+                .updateSingleProcessDefinition(user, processDefinitionCode, workflowUpdateRequest));
+        Assertions.assertEquals(Status.TENANT_NOT_EXIST.getCode(), ((ServiceException) exception).getCode());
+        workflowUpdateRequest.setTenantCode(null);
+
+        // error update process definition mapper
+        workflowUpdateRequest.setName(name);
+        Mockito.when(processDefinitionMapper.queryByCode(processDefinitionCode)).thenReturn(processDefinition);
+        Mockito.when(processDefinitionLogMapper.insert(Mockito.any())).thenReturn(1);
+        exception = Assertions.assertThrows(ServiceException.class, () -> processDefinitionService
+                .updateSingleProcessDefinition(user, processDefinitionCode, workflowUpdateRequest));
+        Assertions.assertEquals(Status.UPDATE_PROCESS_DEFINITION_ERROR.getCode(),
+                ((ServiceException) exception).getCode());
+
+        // success
+        Mockito.when(processDefinitionMapper.updateById(isA(ProcessDefinition.class))).thenReturn(1);
+        ProcessDefinition processDefinitionUpdate =
+                processDefinitionService.updateSingleProcessDefinition(user, processDefinitionCode,
+                        workflowUpdateRequest);
+        Assertions.assertEquals(processDefinition, processDefinitionUpdate);
+    }
+
     /**
      * get mock processDefinition
      *
@@ -868,7 +1010,8 @@ public class ProcessDefinitionServiceTest {
         processDefinition.setName("test_pdf");
         processDefinition.setTenantId(1);
         processDefinition.setDescription("");
-        processDefinition.setCode(46L);
+        processDefinition.setCode(processDefinitionCode);
+        processDefinition.setProjectCode(projectCode);
         processDefinition.setVersion(1);
         return processDefinition;
     }
@@ -888,7 +1031,7 @@ public class ProcessDefinitionServiceTest {
         return project;
     }
 
-    private List<ProcessTaskRelation> getProcessTaskRelation(long projectCode) {
+    private List<ProcessTaskRelation> getProcessTaskRelation() {
         List<ProcessTaskRelation> processTaskRelations = new ArrayList<>();
         ProcessTaskRelation processTaskRelation = new ProcessTaskRelation();
         processTaskRelation.setProjectCode(projectCode);
@@ -921,15 +1064,6 @@ public class ProcessDefinitionServiceTest {
         return schedule;
     }
 
-    private void putMsg(Map<String, Object> result, Status status, Object... statusParams) {
-        result.put(Constants.STATUS, status);
-        if (statusParams != null && statusParams.length > 0) {
-            result.put(Constants.MSG, MessageFormat.format(status.getMsg(), statusParams));
-        } else {
-            result.put(Constants.MSG, status.getMsg());
-        }
-    }
-
     /**
      * get mock task main info
      *
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/SchedulerServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/SchedulerServiceTest.java
index 9d292f23fe..1e8b1a698c 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/SchedulerServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/SchedulerServiceTest.java
@@ -17,44 +17,50 @@
 
 package org.apache.dolphinscheduler.api.service;
 
+import static org.apache.dolphinscheduler.common.utils.DateUtils.stringToDate;
+import static org.mockito.ArgumentMatchers.isA;
+
+import org.apache.dolphinscheduler.api.dto.schedule.ScheduleCreateRequest;
+import org.apache.dolphinscheduler.api.dto.schedule.ScheduleFilterRequest;
+import org.apache.dolphinscheduler.api.dto.schedule.ScheduleUpdateRequest;
 import org.apache.dolphinscheduler.api.enums.Status;
-import org.apache.dolphinscheduler.api.service.impl.ProjectServiceImpl;
+import org.apache.dolphinscheduler.api.exceptions.ServiceException;
 import org.apache.dolphinscheduler.api.service.impl.SchedulerServiceImpl;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.ReleaseState;
 import org.apache.dolphinscheduler.common.model.Server;
+import org.apache.dolphinscheduler.dao.entity.Environment;
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.Project;
 import org.apache.dolphinscheduler.dao.entity.Schedule;
 import org.apache.dolphinscheduler.dao.entity.User;
+import org.apache.dolphinscheduler.dao.mapper.EnvironmentMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
 import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
 import org.apache.dolphinscheduler.scheduler.api.SchedulerApi;
-import org.apache.dolphinscheduler.scheduler.quartz.QuartzScheduler;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
 import org.junit.runner.RunWith;
 import org.mockito.InjectMocks;
 import org.mockito.Mock;
 import org.mockito.Mockito;
-import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
 /**
  * scheduler service test
  */
 @RunWith(PowerMockRunner.class)
-public class SchedulerServiceTest {
+public class SchedulerServiceTest extends BaseServiceTestTool {
 
     @InjectMocks
     private SchedulerServiceImpl schedulerService;
@@ -78,24 +84,43 @@ public class SchedulerServiceTest {
     private ProcessDefinitionMapper processDefinitionMapper;
 
     @Mock
-    private ProjectServiceImpl projectService;
+    private ProjectService projectService;
 
     @Mock
     private SchedulerApi schedulerApi;
 
+    @Mock
+    private ExecutorService executorService;
+
+    @Mock
+    private EnvironmentMapper environmentMapper;
+
+    protected static User user;
+    protected Exception exception;
+    private static final String userName = "userName";
+    private static final String projectName = "projectName";
+    private static final long projectCode = 1L;
+    private static final int userId = 1;
+    private static final String processDefinitionName = "processDefinitionName";
+    private static final long processDefinitionCode = 2L;
+    private static final int processDefinitionVersion = 3;
+    private static final int scheduleId = 3;
+    private static final long environmentCode = 4L;
+    private static final String startTime = "2020-01-01 12:13:14";
+    private static final String endTime = "2020-02-01 12:13:14";
+    private static final String crontab = "0 0 * * * ? *";
+
     @Before
     public void setUp() {
-
+        user = new User();
+        user.setUserName(userName);
+        user.setId(userId);
     }
 
     @Test
     public void testSetScheduleState() {
-        String projectName = "test";
-        long projectCode = 1L;
-        User loginUser = new User();
-        loginUser.setId(1);
-        Map<String, Object> result = new HashMap<String, Object>();
-        Project project = getProject(projectName, projectCode);
+        Map<String, Object> result;
+        Project project = getProject();
 
         ProcessDefinition processDefinition = new ProcessDefinition();
         processDefinition.setProjectCode(projectCode);
@@ -115,48 +140,332 @@ public class SchedulerServiceTest {
 
         Mockito.when(processDefinitionMapper.queryByCode(1)).thenReturn(processDefinition);
 
-        //hash no auth
-        result = schedulerService.setScheduleState(loginUser, project.getCode(), 1, ReleaseState.ONLINE);
+        // hash no auth
+        result = schedulerService.setScheduleState(user, project.getCode(), 1, ReleaseState.ONLINE);
 
-        Mockito.when(projectService.hasProjectAndPerm(loginUser, project, result,null)).thenReturn(true);
-        //schedule not exists
-        result = schedulerService.setScheduleState(loginUser, project.getCode(), 2, ReleaseState.ONLINE);
+        Mockito.when(projectService.hasProjectAndPerm(user, project, result, null)).thenReturn(true);
+        // schedule not exists
+        result = schedulerService.setScheduleState(user, project.getCode(), 2, ReleaseState.ONLINE);
         Assert.assertEquals(Status.SCHEDULE_CRON_NOT_EXISTS, result.get(Constants.STATUS));
 
-        //SCHEDULE_CRON_REALEASE_NEED_NOT_CHANGE
-        result = schedulerService.setScheduleState(loginUser, project.getCode(), 1, ReleaseState.OFFLINE);
+        // SCHEDULE_CRON_REALEASE_NEED_NOT_CHANGE
+        result = schedulerService.setScheduleState(user, project.getCode(), 1, ReleaseState.OFFLINE);
         Assert.assertEquals(Status.SCHEDULE_CRON_REALEASE_NEED_NOT_CHANGE, result.get(Constants.STATUS));
 
-        //PROCESS_DEFINE_NOT_EXIST
+        // PROCESS_DEFINE_NOT_EXIST
         schedule.setProcessDefinitionCode(2);
-        result = schedulerService.setScheduleState(loginUser, project.getCode(), 1, ReleaseState.ONLINE);
+        result = schedulerService.setScheduleState(user, project.getCode(), 1, ReleaseState.ONLINE);
         Assert.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST, result.get(Constants.STATUS));
         schedule.setProcessDefinitionCode(1);
 
-        result = schedulerService.setScheduleState(loginUser, project.getCode(), 1, ReleaseState.ONLINE);
+        result = schedulerService.setScheduleState(user, project.getCode(), 1, ReleaseState.ONLINE);
         Assert.assertEquals(Status.PROCESS_DAG_IS_EMPTY, result.get(Constants.STATUS));
 
         processDefinition.setReleaseState(ReleaseState.ONLINE);
         Mockito.when(processService.findProcessDefineById(1)).thenReturn(processDefinition);
 
-        result = schedulerService.setScheduleState(loginUser, project.getCode(), 1, ReleaseState.ONLINE);
+        result = schedulerService.setScheduleState(user, project.getCode(), 1, ReleaseState.ONLINE);
         Assert.assertEquals(Status.PROCESS_DAG_IS_EMPTY, result.get(Constants.STATUS));
 
-        //set master
+        // set master
         Mockito.when(monitorService.getServerListFromRegistry(true)).thenReturn(masterServers);
 
-        //SUCCESS
-        result = schedulerService.setScheduleState(loginUser, project.getCode(), 1, ReleaseState.ONLINE);
+        // SUCCESS
+        result = schedulerService.setScheduleState(user, project.getCode(), 1, ReleaseState.ONLINE);
         Assert.assertEquals(Status.PROCESS_DAG_IS_EMPTY, result.get(Constants.STATUS));
     }
 
-    private Project getProject(String name, long code) {
-        Project project = new Project();
-        project.setName(name);
-        project.setCode(code);
-        project.setUserId(1);
+    @Test
+    public void testCreateSchedulesV2() {
+        Project project = this.getProject();
+        ProcessDefinition processDefinition = this.getProcessDefinition();
+        Schedule schedule = this.getSchedule();
+
+        ScheduleCreateRequest scheduleCreateRequest = new ScheduleCreateRequest();
+        scheduleCreateRequest.setProcessDefinitionCode(processDefinitionCode);
+        scheduleCreateRequest.setEnvironmentCode(environmentCode);
+
+        // error process definition not exists
+        exception = Assertions.assertThrows(ServiceException.class,
+                () -> schedulerService.createSchedulesV2(user, scheduleCreateRequest));
+        Assertions.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST.getCode(), ((ServiceException) exception).getCode());
+
+        // error project permissions
+        Mockito.when(processDefinitionMapper.queryByCode(processDefinitionCode)).thenReturn(processDefinition);
+        Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(project);
+        Mockito.doThrow(new ServiceException(Status.USER_NO_OPERATION_PROJECT_PERM)).when(projectService)
+                .checkProjectAndAuthThrowException(user, project, null);
+        exception = Assertions.assertThrows(ServiceException.class,
+                () -> schedulerService.createSchedulesV2(user, scheduleCreateRequest));
+        Assertions.assertEquals(Status.USER_NO_OPERATION_PROJECT_PERM.getCode(),
+                ((ServiceException) exception).getCode());
+
+        // we do not check method `executorService.checkProcessDefinitionValid` because it should be check in
+        // executorServiceTest
+        // error process definition already exists schedule
+        Mockito.doNothing().when(projectService).checkProjectAndAuthThrowException(user, project, null);
+        Mockito.when(scheduleMapper.queryByProcessDefinitionCode(processDefinitionCode)).thenReturn(schedule);
+        exception = Assertions.assertThrows(ServiceException.class,
+                () -> schedulerService.createSchedulesV2(user, scheduleCreateRequest));
+        Assertions.assertEquals(Status.SCHEDULE_ALREADY_EXISTS.getCode(), ((ServiceException) exception).getCode());
+
+        // error environment do not exists
+        Mockito.when(scheduleMapper.queryByProcessDefinitionCode(processDefinitionCode)).thenReturn(null);
+        Mockito.when(environmentMapper.queryByEnvironmentCode(environmentCode)).thenReturn(null);
+        exception = Assertions.assertThrows(ServiceException.class,
+                () -> schedulerService.createSchedulesV2(user, scheduleCreateRequest));
+        Assertions.assertEquals(Status.QUERY_ENVIRONMENT_BY_CODE_ERROR.getCode(),
+                ((ServiceException) exception).getCode());
+
+        // error schedule parameter same start time and end time
+        Mockito.when(environmentMapper.queryByEnvironmentCode(environmentCode)).thenReturn(this.getEnvironment());
+        exception = Assertions.assertThrows(ServiceException.class,
+                () -> schedulerService.createSchedulesV2(user, scheduleCreateRequest));
+        Assertions.assertEquals(Status.SCHEDULE_START_TIME_END_TIME_SAME.getCode(),
+                ((ServiceException) exception).getCode());
+
+        // error schedule parameter same start time after than end time
+        scheduleCreateRequest.setEndTime(endTime);
+        String badStartTime = "2022-01-01 12:13:14";
+        scheduleCreateRequest.setStartTime(badStartTime);
+        exception = Assertions.assertThrows(ServiceException.class,
+                () -> schedulerService.createSchedulesV2(user, scheduleCreateRequest));
+        Assertions.assertEquals(Status.START_TIME_BIGGER_THAN_END_TIME_ERROR.getCode(),
+                ((ServiceException) exception).getCode());
+
+        // error schedule crontab
+        String badCrontab = "0 0 123 * * ? *";
+        scheduleCreateRequest.setStartTime(startTime);
+        scheduleCreateRequest.setCrontab(badCrontab);
+        exception = Assertions.assertThrows(ServiceException.class,
+                () -> schedulerService.createSchedulesV2(user, scheduleCreateRequest));
+        Assertions.assertEquals(Status.SCHEDULE_CRON_CHECK_FAILED.getCode(), ((ServiceException) exception).getCode());
+
+        // error create error
+        scheduleCreateRequest.setCrontab(crontab);
+        Mockito.when(scheduleMapper.insert(isA(Schedule.class))).thenReturn(0);
+        exception = Assertions.assertThrows(ServiceException.class,
+                () -> schedulerService.createSchedulesV2(user, scheduleCreateRequest));
+        Assertions.assertEquals(Status.CREATE_SCHEDULE_ERROR.getCode(), ((ServiceException) exception).getCode());
+
+        // success
+        scheduleCreateRequest.setCrontab(crontab);
+        Mockito.when(scheduleMapper.insert(isA(Schedule.class))).thenReturn(1);
+        Schedule scheduleCreated = schedulerService.createSchedulesV2(user, scheduleCreateRequest);
+        Assertions.assertEquals(scheduleCreateRequest.getProcessDefinitionCode(),
+                scheduleCreated.getProcessDefinitionCode());
+        Assertions.assertEquals(scheduleCreateRequest.getEnvironmentCode(), scheduleCreated.getEnvironmentCode());
+        Assertions.assertEquals(stringToDate(scheduleCreateRequest.getStartTime()), scheduleCreated.getStartTime());
+        Assertions.assertEquals(stringToDate(scheduleCreateRequest.getEndTime()), scheduleCreated.getEndTime());
+        Assertions.assertEquals(scheduleCreateRequest.getCrontab(), scheduleCreated.getCrontab());
+    }
+
+    @Test
+    public void testDeleteSchedules() {
+        Schedule schedule = this.getSchedule();
+
+        // error schedule not exists
+        exception = Assertions.assertThrows(ServiceException.class,
+                () -> schedulerService.deleteSchedulesById(user, scheduleId));
+        Assertions.assertEquals(Status.SCHEDULE_NOT_EXISTS.getCode(), ((ServiceException) exception).getCode());
+
+        // error schedule already online
+        schedule.setReleaseState(ReleaseState.ONLINE);
+        Mockito.when(scheduleMapper.selectById(scheduleId)).thenReturn(schedule);
+        exception = Assertions.assertThrows(ServiceException.class,
+                () -> schedulerService.deleteSchedulesById(user, scheduleId));
+        Assertions.assertEquals(Status.SCHEDULE_STATE_ONLINE.getCode(), ((ServiceException) exception).getCode());
+        schedule.setReleaseState(ReleaseState.OFFLINE);
+
+        // error user not own schedule
+        int notOwnUserId = 2;
+        schedule.setUserId(notOwnUserId);
+        Mockito.when(scheduleMapper.selectById(scheduleId)).thenReturn(schedule);
+        exception = Assertions.assertThrows(ServiceException.class,
+                () -> schedulerService.deleteSchedulesById(user, scheduleId));
+        Assertions.assertEquals(Status.USER_NO_OPERATION_PERM.getMsg(), exception.getMessage());
+        schedule.setUserId(userId);
+
+        // error process definition not exists
+        Mockito.when(scheduleMapper.selectById(scheduleId)).thenReturn(schedule);
+        exception = Assertions.assertThrows(ServiceException.class,
+                () -> schedulerService.deleteSchedulesById(user, scheduleId));
+        Assertions.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST.getCode(), ((ServiceException) exception).getCode());
+
+        // error project permissions
+        Mockito.when(processDefinitionMapper.queryByCode(processDefinitionCode))
+                .thenReturn(this.getProcessDefinition());
+        Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(this.getProject());
+        Mockito.doThrow(new ServiceException(Status.USER_NO_OPERATION_PROJECT_PERM)).when(projectService)
+                .checkProjectAndAuthThrowException(user, this.getProject(), null);
+        exception = Assertions.assertThrows(ServiceException.class,
+                () -> schedulerService.deleteSchedulesById(user, scheduleId));
+        Assertions.assertEquals(Status.USER_NO_OPERATION_PROJECT_PERM.getCode(),
+                ((ServiceException) exception).getCode());
+
+        // error delete mapper
+        Mockito.doNothing().when(projectService).checkProjectAndAuthThrowException(user, this.getProject(), null);
+        Mockito.when(scheduleMapper.deleteById(scheduleId)).thenReturn(0);
+        exception = Assertions.assertThrows(ServiceException.class,
+                () -> schedulerService.deleteSchedulesById(user, scheduleId));
+        Assertions.assertEquals(Status.DELETE_SCHEDULE_BY_ID_ERROR.getCode(), ((ServiceException) exception).getCode());
+
+        // success
+        Mockito.when(scheduleMapper.deleteById(scheduleId)).thenReturn(1);
+        Assertions.assertDoesNotThrow(() -> schedulerService.deleteSchedulesById(user, scheduleId));
+    }
+
+    @Test
+    public void testFilterSchedules() {
+        Project project = this.getProject();
+        ScheduleFilterRequest scheduleFilterRequest = new ScheduleFilterRequest();
+        scheduleFilterRequest.setProjectName(project.getName());
+
+        // project permission error
+        Mockito.when(projectMapper.queryByName(project.getName())).thenReturn(project);
+        Mockito.doThrow(new ServiceException(Status.USER_NO_OPERATION_PROJECT_PERM)).when(projectService)
+                .checkProjectAndAuthThrowException(user, project, null);
+        exception = Assertions.assertThrows(ServiceException.class,
+                () -> schedulerService.filterSchedules(user, scheduleFilterRequest));
+        Assertions.assertEquals(Status.USER_NO_OPERATION_PROJECT_PERM.getCode(),
+                ((ServiceException) exception).getCode());
+    }
 
+    @Test
+    public void testGetSchedules() {
+        // error schedule not exists
+        exception =
+                Assertions.assertThrows(ServiceException.class, () -> schedulerService.getSchedule(user, scheduleId));
+        Assertions.assertEquals(Status.SCHEDULE_NOT_EXISTS.getCode(), ((ServiceException) exception).getCode());
+
+        // error process definition not exists
+        Mockito.when(scheduleMapper.selectById(scheduleId)).thenReturn(this.getSchedule());
+        exception =
+                Assertions.assertThrows(ServiceException.class, () -> schedulerService.getSchedule(user, scheduleId));
+        Assertions.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST.getCode(), ((ServiceException) exception).getCode());
+
+        // error project permissions
+        Mockito.when(processDefinitionMapper.queryByCode(processDefinitionCode))
+                .thenReturn(this.getProcessDefinition());
+        Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(this.getProject());
+        Mockito.doThrow(new ServiceException(Status.USER_NO_OPERATION_PROJECT_PERM)).when(projectService)
+                .checkProjectAndAuthThrowException(user, this.getProject(), null);
+        exception =
+                Assertions.assertThrows(ServiceException.class, () -> schedulerService.getSchedule(user, scheduleId));
+        Assertions.assertEquals(Status.USER_NO_OPERATION_PROJECT_PERM.getCode(),
+                ((ServiceException) exception).getCode());
+
+        // success
+        Mockito.doNothing().when(projectService).checkProjectAndAuthThrowException(user, this.getProject(), null);
+        Schedule schedule = schedulerService.getSchedule(user, scheduleId);
+        Assertions.assertEquals(this.getSchedule().getId(), schedule.getId());
+    }
+
+    @Test
+    public void testUpdateSchedulesV2() {
+        ScheduleUpdateRequest scheduleUpdateRequest = new ScheduleUpdateRequest();
+
+        // error schedule not exists
+        exception = Assertions.assertThrows(ServiceException.class,
+                () -> schedulerService.updateSchedulesV2(user, scheduleId, scheduleUpdateRequest));
+        Assertions.assertEquals(Status.SCHEDULE_NOT_EXISTS.getCode(), ((ServiceException) exception).getCode());
+
+        // error schedule parameter same start time and end time
+        scheduleUpdateRequest.setEndTime(endTime);
+        scheduleUpdateRequest.setStartTime(endTime);
+        Mockito.when(scheduleMapper.selectById(scheduleId)).thenReturn(this.getSchedule());
+        exception = Assertions.assertThrows(ServiceException.class,
+                () -> schedulerService.updateSchedulesV2(user, scheduleId, scheduleUpdateRequest));
+        Assertions.assertEquals(Status.SCHEDULE_START_TIME_END_TIME_SAME.getCode(),
+                ((ServiceException) exception).getCode());
+
+        // error schedule parameter same start time after than end time
+        String badStartTime = "2022-01-01 12:13:14";
+        scheduleUpdateRequest.setStartTime(badStartTime);
+        exception = Assertions.assertThrows(ServiceException.class,
+                () -> schedulerService.updateSchedulesV2(user, scheduleId, scheduleUpdateRequest));
+        Assertions.assertEquals(Status.START_TIME_BIGGER_THAN_END_TIME_ERROR.getCode(),
+                ((ServiceException) exception).getCode());
+        scheduleUpdateRequest.setStartTime(startTime);
+
+        // error schedule crontab
+        String badCrontab = "0 0 123 * * ? *";
+        scheduleUpdateRequest.setCrontab(badCrontab);
+        exception = Assertions.assertThrows(ServiceException.class,
+                () -> schedulerService.updateSchedulesV2(user, scheduleId, scheduleUpdateRequest));
+        Assertions.assertEquals(Status.SCHEDULE_CRON_CHECK_FAILED.getCode(), ((ServiceException) exception).getCode());
+        scheduleUpdateRequest.setCrontab(crontab);
+
+        // error process definition not exists
+        exception = Assertions.assertThrows(ServiceException.class,
+                () -> schedulerService.updateSchedulesV2(user, scheduleId, scheduleUpdateRequest));
+        Assertions.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST.getCode(), ((ServiceException) exception).getCode());
+
+        // error project permissions
+        Mockito.when(processDefinitionMapper.queryByCode(processDefinitionCode))
+                .thenReturn(this.getProcessDefinition());
+        Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(this.getProject());
+        Mockito.doThrow(new ServiceException(Status.USER_NO_OPERATION_PROJECT_PERM)).when(projectService)
+                .checkProjectAndAuthThrowException(user, this.getProject(), null);
+        exception = Assertions.assertThrows(ServiceException.class,
+                () -> schedulerService.updateSchedulesV2(user, scheduleId, scheduleUpdateRequest));
+        Assertions.assertEquals(Status.USER_NO_OPERATION_PROJECT_PERM.getCode(),
+                ((ServiceException) exception).getCode());
+
+        // error environment do not exists
+        Mockito.doNothing().when(projectService).checkProjectAndAuthThrowException(user, this.getProject(), null);
+        Mockito.when(environmentMapper.queryByEnvironmentCode(environmentCode)).thenReturn(null);
+        exception = Assertions.assertThrows(ServiceException.class,
+                () -> schedulerService.updateSchedulesV2(user, scheduleId, scheduleUpdateRequest));
+        Assertions.assertEquals(Status.QUERY_ENVIRONMENT_BY_CODE_ERROR.getCode(),
+                ((ServiceException) exception).getCode());
+
+        // error environment do not exists
+        Mockito.when(environmentMapper.queryByEnvironmentCode(environmentCode)).thenReturn(this.getEnvironment());
+        Mockito.when(scheduleMapper.updateById(isA(Schedule.class))).thenReturn(0);
+        exception = Assertions.assertThrows(ServiceException.class,
+                () -> schedulerService.updateSchedulesV2(user, scheduleId, scheduleUpdateRequest));
+        Assertions.assertEquals(Status.UPDATE_SCHEDULE_ERROR.getCode(), ((ServiceException) exception).getCode());
+
+        // success
+        Mockito.when(scheduleMapper.updateById(isA(Schedule.class))).thenReturn(1);
+        Schedule schedule = schedulerService.updateSchedulesV2(user, scheduleId, scheduleUpdateRequest);
+        Assertions.assertEquals(scheduleUpdateRequest.getCrontab(), schedule.getCrontab());
+        Assertions.assertEquals(stringToDate(scheduleUpdateRequest.getStartTime()), schedule.getStartTime());
+        Assertions.assertEquals(stringToDate(scheduleUpdateRequest.getEndTime()), schedule.getEndTime());
+    }
+
+    private Project getProject() {
+        Project project = new Project();
+        project.setName(projectName);
+        project.setCode(projectCode);
+        project.setUserId(userId);
         return project;
     }
 
+    private ProcessDefinition getProcessDefinition() {
+        ProcessDefinition processDefinition = new ProcessDefinition();
+        processDefinition.setName(processDefinitionName);
+        processDefinition.setCode(processDefinitionCode);
+        processDefinition.setProjectCode(projectCode);
+        processDefinition.setVersion(processDefinitionVersion);
+        processDefinition.setUserId(userId);
+        return processDefinition;
+    }
+
+    private Schedule getSchedule() {
+        Schedule schedule = new Schedule();
+        schedule.setId(scheduleId);
+        schedule.setProcessDefinitionCode(processDefinitionCode);
+        schedule.setEnvironmentCode(environmentCode);
+        schedule.setUserId(userId);
+        return schedule;
+    }
+
+    private Environment getEnvironment() {
+        Environment environment = new Environment();
+        environment.setCode(environmentCode);
+        return environment;
+    }
+
 }
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.java
index 3e42207396..e61b527217 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.java
@@ -115,6 +115,16 @@ public interface ProcessDefinitionMapper extends BaseMapper<ProcessDefinition> {
                                                    @Param("userId") int userId,
                                                    @Param("projectCode") long projectCode);
 
+    /**
+     * Filter process definitions
+     *
+     * @param page page
+     * @param processDefinition process definition object
+     * @return process definition IPage
+     */
+    IPage<ProcessDefinition> filterProcessDefinition(IPage<ProcessDefinition> page,
+                                                     @Param("pd") ProcessDefinition processDefinition);
+
     /**
      * query all process definition list
      *
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.java
index 2af88fcde0..65e2022ef3 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.java
@@ -63,6 +63,16 @@ public interface ScheduleMapper extends BaseMapper<Schedule> {
                                                    @Param("processDefinitionCode") long processDefinitionCode,
                                                    @Param("searchVal") String searchVal);
 
+    /**
+     * Filter schedule
+     *
+     * @param page page
+     * @param schedule schedule
+     * @return schedule IPage
+     */
+    IPage<Schedule> filterSchedules(IPage<Schedule> page,
+                                    @Param("schedule") Schedule schedule);
+
     /**
      * query schedule list by project name
      *
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.xml
index 3404568f2c..a69150b3f7 100644
--- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.xml
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.xml
@@ -83,6 +83,26 @@
         </if>
         order by update_time desc
     </select>
+    <select id="filterProcessDefinition"
+            parameterType="org.apache.dolphinscheduler.dao.entity.ProcessDefinition"
+            resultType="org.apache.dolphinscheduler.dao.entity.ProcessDefinition"
+    >
+        SELECT
+        <include refid="baseSql"/>
+        FROM t_ds_process_definition
+        <where>
+            <if test=" pd.projectCode != 0">
+                and project_code = #{pd.projectCode}
+            </if>
+            <if test=" pd.name != null and pd.name != ''">
+                and name like concat('%', #{pd.name}, '%')
+            </if>
+            <if test=" pd.releaseState != null and pd.releaseState != ''">
+                and release_state = #{pd.releaseState}
+            </if>
+        </where>
+        order by update_time desc, id asc
+    </select>
 
     <select id="queryAllDefinitionList" resultType="org.apache.dolphinscheduler.dao.entity.ProcessDefinition">
         select
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.xml
index 7558926eeb..8c17213a07 100644
--- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.xml
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.xml
@@ -90,4 +90,29 @@
         from t_ds_schedules
         where process_definition_code = #{processDefinitionCode} and release_state = 1
     </select>
+
+    <select id="filterSchedules"
+            parameterType="org.apache.dolphinscheduler.dao.entity.Schedule"
+            resultType="org.apache.dolphinscheduler.dao.entity.Schedule"
+    >
+        SELECT pd.name as process_definition_name, p.name as project_name,
+        <include refid="baseSqlV2">
+            <property name="alias" value="s"/>
+        </include>
+        FROM t_ds_schedules s
+        join t_ds_process_definition pd on s.process_definition_code = pd.code
+        join t_ds_project as p on pd.project_code = p.code
+        <where>
+            <if test=" schedule.projectName != null and schedule.projectName != ''">
+                and p.name like concat('%', #{schedule.projectName}, '%')
+            </if>
+            <if test=" schedule.processDefinitionName != null and schedule.processDefinitionName != ''">
+                and pd.name like concat('%', #{schedule.processDefinitionName}, '%')
+            </if>
+            <if test=" schedule.releaseState != null and schedule.releaseState != ''">
+                and s.release_state = #{schedule.releaseState}
+            </if>
+        </where>
+        order by update_time desc, id asc
+    </select>
 </mapper>