You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by zh...@apache.org on 2022/11/28 14:01:58 UTC

[dolphinscheduler] 04/05: [chore][python] Change name from process definition to workflow (#12918)

This is an automated email from the ASF dual-hosted git repository.

zhongjiajie pushed a commit to branch 3.1.2-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git

commit 6ed16056808bca20ef562363de93193d4885df88
Author: Jay Chung <zh...@gmail.com>
AuthorDate: Wed Nov 16 21:23:39 2022 +0800

    [chore][python] Change name from process definition to workflow (#12918)
    
    only change its name in python gateway server code, incluing
    
    * Function name: all related to process definition
    * Parameter name and comment related
    
    ref: apache/dolphinscheduler-sdk-python#22
    
    (cherry picked from commit f20c9b3102503a1306d5fa3504ddce56a76d58ab)
---
 .../dolphinscheduler/api/python/PythonGateway.java | 170 +++++++++++----------
 1 file changed, 90 insertions(+), 80 deletions(-)

diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
index b79eaf307e..7fb225268c 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
@@ -82,7 +82,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
-import py4j.GatewayServer;
 
 @Component
 public class PythonGateway {
@@ -183,8 +182,10 @@ public class PythonGateway {
             return result;
         }
 
-        ProcessDefinition processDefinition = processDefinitionMapper.queryByDefineName(project.getCode(), processDefinitionName);
-        // In the case project exists, but current process definition still not created, we should also return the init version of it
+        ProcessDefinition processDefinition =
+                processDefinitionMapper.queryByDefineName(project.getCode(), processDefinitionName);
+        // In the case project exists, but current workflow still not created, we should also return the init
+        // version of it
         if (processDefinition == null) {
             result.put("code", CodeGenerateUtils.getInstance().genCode());
             result.put("version", 0L);
@@ -203,20 +204,20 @@ public class PythonGateway {
     }
 
     /**
-     * create or update process definition.
-     * If process definition do not exists in Project=`projectCode` would create a new one
-     * If process definition already exists in Project=`projectCode` would update it
+     * create or update workflow.
+     * If workflow do not exists in Project=`projectCode` would create a new one
+     * If workflow already exists in Project=`projectCode` would update it
      *
-     * @param userName user name who create or update process definition
-     * @param projectName project name which process definition belongs to
-     * @param name process definition name
+     * @param userName user name who create or update workflow
+     * @param projectName project name which workflow belongs to
+     * @param name workflow name
      * @param description description
      * @param globalParams global params
-     * @param schedule schedule for process definition, will not set schedule if null,
+     * @param schedule schedule for workflow, will not set schedule if null,
      * and if would always fresh exists schedule if not null
      * @param warningType warning type
      * @param warningGroupId warning group id
-     * @param timeout timeout for process definition working, if running time longer than timeout,
+     * @param timeout timeout for workflow working, if running time longer than timeout,
      * task will mark as fail
      * @param workerGroup run task in which worker group
      * @param tenantCode tenantCode
@@ -225,33 +226,33 @@ public class PythonGateway {
      * @param otherParamsJson otherParamsJson handle other params
      * @return create result code
      */
-    public Long createOrUpdateProcessDefinition(String userName,
-                                                String projectName,
-                                                String name,
-                                                String description,
-                                                String globalParams,
-                                                String schedule,
-                                                String warningType,
-                                                int warningGroupId,
-                                                int timeout,
-                                                String workerGroup,
-                                                String tenantCode,
-                                                int releaseState,
-                                                String taskRelationJson,
-                                                String taskDefinitionJson,
-                                                String otherParamsJson,
-                                                String executionType) {
+    public Long createOrUpdateWorkflow(String userName,
+                                       String projectName,
+                                       String name,
+                                       String description,
+                                       String globalParams,
+                                       String schedule,
+                                       String warningType,
+                                       int warningGroupId,
+                                       int timeout,
+                                       String workerGroup,
+                                       String tenantCode,
+                                       int releaseState,
+                                       String taskRelationJson,
+                                       String taskDefinitionJson,
+                                       String otherParamsJson,
+                                       String executionType) {
         User user = usersService.queryUser(userName);
         Project project = projectMapper.queryByName(projectName);
         long projectCode = project.getCode();
 
-        ProcessDefinition processDefinition = getProcessDefinition(user, projectCode, name);
+        ProcessDefinition processDefinition = getWorkflow(user, projectCode, name);
         ProcessExecutionTypeEnum executionTypeEnum = ProcessExecutionTypeEnum.valueOf(executionType);
         long processDefinitionCode;
-        // create or update process definition
+        // create or update workflow
         if (processDefinition != null) {
             processDefinitionCode = processDefinition.getCode();
-            // make sure process definition offline which could edit
+            // make sure workflow offline which could edit
             processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode,
                     ReleaseState.OFFLINE);
             processDefinitionService.updateProcessDefinition(user, projectCode, name,
@@ -267,7 +268,7 @@ public class PythonGateway {
             processDefinitionCode = processDefinition.getCode();
         }
 
-        // Fresh process definition schedule
+        // Fresh workflow schedule
         if (schedule != null) {
             createOrUpdateSchedule(user, projectCode, processDefinitionCode, schedule, workerGroup, warningType, warningGroupId);
         }
@@ -276,21 +277,23 @@ public class PythonGateway {
     }
 
     /**
-     * get process definition
+     * get workflow
      *
      * @param user user who create or update schedule
-     * @param projectCode project which process definition belongs to
-     * @param processDefinitionName process definition name
+     * @param projectCode project which workflow belongs to
+     * @param workflowName workflow name
      */
-    private ProcessDefinition getProcessDefinition(User user, long projectCode, String processDefinitionName) {
-        Map<String, Object> verifyProcessDefinitionExists = processDefinitionService.verifyProcessDefinitionName(user, projectCode, processDefinitionName, 0);
+    private ProcessDefinition getWorkflow(User user, long projectCode, String workflowName) {
+        Map<String, Object> verifyProcessDefinitionExists =
+                processDefinitionService.verifyProcessDefinitionName(user, projectCode, workflowName, 0);
         Status verifyStatus = (Status) verifyProcessDefinitionExists.get(Constants.STATUS);
 
         ProcessDefinition processDefinition = null;
         if (verifyStatus == Status.PROCESS_DEFINITION_NAME_EXIST) {
-            processDefinition = processDefinitionMapper.queryByDefineName(projectCode, processDefinitionName);
+            processDefinition = processDefinitionMapper.queryByDefineName(projectCode, workflowName);
         } else if (verifyStatus != Status.SUCCESS) {
-            String msg = "Verify process definition exists status is invalid, neither SUCCESS or PROCESS_DEFINITION_NAME_EXIST.";
+            String msg =
+                    "Verify workflow exists status is invalid, neither SUCCESS or WORKFLOW_NAME_EXIST.";
             logger.error(msg);
             throw new RuntimeException(msg);
         }
@@ -299,13 +302,13 @@ public class PythonGateway {
     }
 
     /**
-     * create or update process definition schedule.
+     * create or update workflow schedule.
      * It would always use latest schedule define in workflow-as-code, and set schedule online when
      * it's not null
      *
      * @param user user who create or update schedule
-     * @param projectCode project which process definition belongs to
-     * @param processDefinitionCode process definition code
+     * @param projectCode project which workflow belongs to
+     * @param workflowCode workflow code
      * @param schedule schedule expression
      * @param workerGroup work group
      * @param warningType warning type
@@ -313,43 +316,47 @@ public class PythonGateway {
      */
     private void createOrUpdateSchedule(User user,
                                         long projectCode,
-                                        long processDefinitionCode,
+                                        long workflowCode,
                                         String schedule,
                                         String workerGroup,
                                         String warningType,
                                         int warningGroupId) {
-        Schedule scheduleObj = scheduleMapper.queryByProcessDefinitionCode(processDefinitionCode);
+        Schedule scheduleObj = scheduleMapper.queryByProcessDefinitionCode(workflowCode);
         // create or update schedule
         int scheduleId;
         if (scheduleObj == null) {
-            processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode, ReleaseState.ONLINE);
-            Map<String, Object> result = schedulerService.insertSchedule(user, projectCode, processDefinitionCode, schedule, WarningType.valueOf(warningType),
+            processDefinitionService.releaseProcessDefinition(user, projectCode, workflowCode,
+                    ReleaseState.ONLINE);
+            Map<String, Object> result = schedulerService.insertSchedule(user, projectCode, workflowCode,
+                    schedule, WarningType.valueOf(warningType),
                     warningGroupId, DEFAULT_FAILURE_STRATEGY, DEFAULT_PRIORITY, workerGroup, DEFAULT_ENVIRONMENT_CODE);
             scheduleId = (int) result.get("scheduleId");
         } else {
             scheduleId = scheduleObj.getId();
-            processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode, ReleaseState.OFFLINE);
+            processDefinitionService.releaseProcessDefinition(user, projectCode, workflowCode,
+                    ReleaseState.OFFLINE);
             schedulerService.updateSchedule(user, projectCode, scheduleId, schedule, WarningType.valueOf(warningType),
                     warningGroupId, DEFAULT_FAILURE_STRATEGY, DEFAULT_PRIORITY, workerGroup, DEFAULT_ENVIRONMENT_CODE);
         }
         schedulerService.setScheduleState(user, projectCode, scheduleId, ReleaseState.ONLINE);
     }
 
-    public void execProcessInstance(String userName,
-                                    String projectName,
-                                    String processDefinitionName,
-                                    String cronTime,
-                                    String workerGroup,
-                                    String warningType,
-                                    int warningGroupId,
-                                    Integer timeout
-    ) {
+    public void execWorkflowInstance(String userName,
+                                     String projectName,
+                                     String workflowName,
+                                     String cronTime,
+                                     String workerGroup,
+                                     String warningType,
+                                     Integer warningGroupId,
+                                     Integer timeout) {
         User user = usersService.queryUser(userName);
         Project project = projectMapper.queryByName(projectName);
-        ProcessDefinition processDefinition = processDefinitionMapper.queryByDefineName(project.getCode(), processDefinitionName);
+        ProcessDefinition processDefinition =
+                processDefinitionMapper.queryByDefineName(project.getCode(), workflowName);
 
-        // make sure process definition online
-        processDefinitionService.releaseProcessDefinition(user, project.getCode(), processDefinition.getCode(), ReleaseState.ONLINE);
+        // make sure workflow online
+        processDefinitionService.releaseProcessDefinition(user, project.getCode(), processDefinition.getCode(),
+                ReleaseState.ONLINE);
 
         executorService.execProcessInstance(user,
                 project.getCode(),
@@ -375,8 +382,8 @@ public class PythonGateway {
 
     // side object
     /*
-      Grant project's permission to user. Use when project's created user not current but
-      Python API use it to change process definition.
+     * Grant project's permission to user. Use when project's created user not current but Python API use it to change
+     * workflow.
      */
     private Integer grantProjectToUser(Project project, User user) {
         Date now = new Date();
@@ -492,29 +499,31 @@ public class PythonGateway {
     }
 
     /**
-     * Get processDefinition by given processDefinitionName name. It return map contain processDefinition id, name, code.
-     * Useful in Python API create subProcess task which need processDefinition information.
+     * Get workflow object by given workflow name. It returns map contain workflow id, name, code.
+     * Useful in Python API create subProcess task which need workflow information.
      *
      * @param userName user who create or update schedule
-     * @param projectName project name which process definition belongs to
-     * @param processDefinitionName process definition name
+     * @param projectName project name which workflow belongs to
+     * @param workflowName workflow name
      */
-    public Map<String, Object> getProcessDefinitionInfo(String userName, String projectName, String processDefinitionName) {
+    public Map<String, Object> getWorkflowInfo(String userName, String projectName,
+                                               String workflowName) {
         Map<String, Object> result = new HashMap<>();
 
         User user = usersService.queryUser(userName);
         Project project = (Project) projectService.queryByName(user, projectName).get(Constants.DATA_LIST);
         long projectCode = project.getCode();
-        ProcessDefinition processDefinition = getProcessDefinition(user, projectCode, processDefinitionName);
-        // get process definition info
+        ProcessDefinition processDefinition = getWorkflow(user, projectCode, workflowName);
+        // get workflow info
         if (processDefinition != null) {
-            // make sure process definition online
-            processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinition.getCode(), ReleaseState.ONLINE);
+            // make sure workflow online
+            processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinition.getCode(),
+                    ReleaseState.ONLINE);
             result.put("id", processDefinition.getId());
             result.put("name", processDefinition.getName());
             result.put("code", processDefinition.getCode());
         } else {
-            String msg = String.format("Can not find valid process definition by name %s", processDefinitionName);
+            String msg = String.format("Can not find valid workflow by name %s", workflowName);
             logger.error(msg);
             throw new IllegalArgumentException(msg);
         }
@@ -523,14 +532,14 @@ public class PythonGateway {
     }
 
     /**
-     * Get project, process definition, task code.
-     * Useful in Python API create dependent task which need processDefinition information.
+     * Get project, workflow, task code.
+     * Useful in Python API create dependent task which need workflow information.
      *
-     * @param projectName project name which process definition belongs to
-     * @param processDefinitionName process definition name
+     * @param projectName project name which workflow belongs to
+     * @param workflowName workflow name
      * @param taskName task name
      */
-    public Map<String, Object> getDependentInfo(String projectName, String processDefinitionName, String taskName) {
+    public Map<String, Object> getDependentInfo(String projectName, String workflowName, String taskName) {
         Map<String, Object> result = new HashMap<>();
 
         Project project = projectMapper.queryByName(projectName);
@@ -542,9 +551,10 @@ public class PythonGateway {
         long projectCode = project.getCode();
         result.put("projectCode", projectCode);
 
-        ProcessDefinition processDefinition = processDefinitionMapper.queryByDefineName(projectCode, processDefinitionName);
+        ProcessDefinition processDefinition =
+                processDefinitionMapper.queryByDefineName(projectCode, workflowName);
         if (processDefinition == null) {
-            String msg = String.format("Can not find valid process definition by name %s", processDefinitionName);
+            String msg = String.format("Can not find valid workflow by name %s", workflowName);
             logger.error(msg);
             throw new IllegalArgumentException(msg);
         }
@@ -558,8 +568,8 @@ public class PythonGateway {
     }
 
     /**
-     * Get resource by given program type and full name. It return map contain resource id, name.
-     * Useful in Python API create flink or spark task which need processDefinition information.
+     * Get resource by given program type and full name. It returns map contain resource id, name.
+     * Useful in Python API create flink or spark task which need workflow information.
      *
      * @param programType program type one of SCALA, JAVA and PYTHON
      * @param fullName full name of the resource
@@ -602,7 +612,7 @@ public class PythonGateway {
 
     /**
      * Get resource by given resource type and full name. It return map contain resource id, name.
-     * Useful in Python API create task which need processDefinition information.
+     * Useful in Python API create task which need workflow information.
      *
      * @param userName user who query resource
      * @param fullName full name of the resource