You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by zh...@apache.org on 2022/11/16 13:23:47 UTC
[dolphinscheduler] branch dev updated: [chore][python] Change name from process definition to workflow (#12918)
This is an automated email from the ASF dual-hosted git repository.
zhongjiajie pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new f20c9b3102 [chore][python] Change name from process definition to workflow (#12918)
f20c9b3102 is described below
commit f20c9b3102503a1306d5fa3504ddce56a76d58ab
Author: Jay Chung <zh...@gmail.com>
AuthorDate: Wed Nov 16 21:23:39 2022 +0800
[chore][python] Change name from process definition to workflow (#12918)
only change its name in python gateway server code, incluing
* Function name: all related to process definition
* Parameter name and comment related
ref: apache/dolphinscheduler-sdk-python#22
---
.../dolphinscheduler/api/python/PythonGateway.java | 150 ++++++++++-----------
1 file changed, 75 insertions(+), 75 deletions(-)
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
index c1cd15c807..66e13eace7 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
@@ -189,7 +189,7 @@ public class PythonGateway {
ProcessDefinition processDefinition =
processDefinitionMapper.queryByDefineName(project.getCode(), processDefinitionName);
- // In the case project exists, but current process definition still not created, we should also return the init
+ // In the case project exists, but current workflow still not created, we should also return the init
// version of it
if (processDefinition == null) {
result.put("code", CodeGenerateUtils.getInstance().genCode());
@@ -210,20 +210,20 @@ public class PythonGateway {
}
/**
- * create or update process definition.
- * If process definition do not exists in Project=`projectCode` would create a new one
- * If process definition already exists in Project=`projectCode` would update it
+ * create or update workflow.
+ * If workflow do not exists in Project=`projectCode` would create a new one
+ * If workflow already exists in Project=`projectCode` would update it
*
- * @param userName user name who create or update process definition
- * @param projectName project name which process definition belongs to
- * @param name process definition name
+ * @param userName user name who create or update workflow
+ * @param projectName project name which workflow belongs to
+ * @param name workflow name
* @param description description
* @param globalParams global params
- * @param schedule schedule for process definition, will not set schedule if null,
+ * @param schedule schedule for workflow, will not set schedule if null,
* and if would always fresh exists schedule if not null
* @param warningType warning type
* @param warningGroupId warning group id
- * @param timeout timeout for process definition working, if running time longer than timeout,
+ * @param timeout timeout for workflow working, if running time longer than timeout,
* task will mark as fail
* @param workerGroup run task in which worker group
* @param tenantCode tenantCode
@@ -232,33 +232,33 @@ public class PythonGateway {
* @param otherParamsJson otherParamsJson handle other params
* @return create result code
*/
- public Long createOrUpdateProcessDefinition(String userName,
- String projectName,
- String name,
- String description,
- String globalParams,
- String schedule,
- String warningType,
- int warningGroupId,
- int timeout,
- String workerGroup,
- String tenantCode,
- int releaseState,
- String taskRelationJson,
- String taskDefinitionJson,
- String otherParamsJson,
- String executionType) {
+ public Long createOrUpdateWorkflow(String userName,
+ String projectName,
+ String name,
+ String description,
+ String globalParams,
+ String schedule,
+ String warningType,
+ int warningGroupId,
+ int timeout,
+ String workerGroup,
+ String tenantCode,
+ int releaseState,
+ String taskRelationJson,
+ String taskDefinitionJson,
+ String otherParamsJson,
+ String executionType) {
User user = usersService.queryUser(userName);
Project project = projectMapper.queryByName(projectName);
long projectCode = project.getCode();
- ProcessDefinition processDefinition = getProcessDefinition(user, projectCode, name);
+ ProcessDefinition processDefinition = getWorkflow(user, projectCode, name);
ProcessExecutionTypeEnum executionTypeEnum = ProcessExecutionTypeEnum.valueOf(executionType);
long processDefinitionCode;
- // create or update process definition
+ // create or update workflow
if (processDefinition != null) {
processDefinitionCode = processDefinition.getCode();
- // make sure process definition offline which could edit
+ // make sure workflow offline which could edit
processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode,
ReleaseState.OFFLINE);
processDefinitionService.updateProcessDefinition(user, projectCode, name,
@@ -274,7 +274,7 @@ public class PythonGateway {
processDefinitionCode = processDefinition.getCode();
}
- // Fresh process definition schedule
+ // Fresh workflow schedule
if (schedule != null) {
createOrUpdateSchedule(user, projectCode, processDefinitionCode, schedule, workerGroup, warningType,
warningGroupId);
@@ -285,23 +285,23 @@ public class PythonGateway {
}
/**
- * get process definition
+ * get workflow
*
* @param user user who create or update schedule
- * @param projectCode project which process definition belongs to
- * @param processDefinitionName process definition name
+ * @param projectCode project which workflow belongs to
+ * @param workflowName workflow name
*/
- private ProcessDefinition getProcessDefinition(User user, long projectCode, String processDefinitionName) {
+ private ProcessDefinition getWorkflow(User user, long projectCode, String workflowName) {
Map<String, Object> verifyProcessDefinitionExists =
- processDefinitionService.verifyProcessDefinitionName(user, projectCode, processDefinitionName, 0);
+ processDefinitionService.verifyProcessDefinitionName(user, projectCode, workflowName, 0);
Status verifyStatus = (Status) verifyProcessDefinitionExists.get(Constants.STATUS);
ProcessDefinition processDefinition = null;
if (verifyStatus == Status.PROCESS_DEFINITION_NAME_EXIST) {
- processDefinition = processDefinitionMapper.queryByDefineName(projectCode, processDefinitionName);
+ processDefinition = processDefinitionMapper.queryByDefineName(projectCode, workflowName);
} else if (verifyStatus != Status.SUCCESS) {
String msg =
- "Verify process definition exists status is invalid, neither SUCCESS or PROCESS_DEFINITION_NAME_EXIST.";
+ "Verify workflow exists status is invalid, neither SUCCESS or WORKFLOW_NAME_EXIST.";
logger.error(msg);
throw new RuntimeException(msg);
}
@@ -310,13 +310,13 @@ public class PythonGateway {
}
/**
- * create or update process definition schedule.
+ * create or update workflow schedule.
* It would always use latest schedule define in workflow-as-code, and set schedule online when
* it's not null
*
* @param user user who create or update schedule
- * @param projectCode project which process definition belongs to
- * @param processDefinitionCode process definition code
+ * @param projectCode project which workflow belongs to
+ * @param workflowCode workflow code
* @param schedule schedule expression
* @param workerGroup work group
* @param warningType warning type
@@ -324,24 +324,24 @@ public class PythonGateway {
*/
private void createOrUpdateSchedule(User user,
long projectCode,
- long processDefinitionCode,
+ long workflowCode,
String schedule,
String workerGroup,
String warningType,
int warningGroupId) {
- Schedule scheduleObj = scheduleMapper.queryByProcessDefinitionCode(processDefinitionCode);
+ Schedule scheduleObj = scheduleMapper.queryByProcessDefinitionCode(workflowCode);
// create or update schedule
int scheduleId;
if (scheduleObj == null) {
- processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode,
+ processDefinitionService.releaseProcessDefinition(user, projectCode, workflowCode,
ReleaseState.ONLINE);
- Map<String, Object> result = schedulerService.insertSchedule(user, projectCode, processDefinitionCode,
+ Map<String, Object> result = schedulerService.insertSchedule(user, projectCode, workflowCode,
schedule, WarningType.valueOf(warningType),
warningGroupId, DEFAULT_FAILURE_STRATEGY, DEFAULT_PRIORITY, workerGroup, DEFAULT_ENVIRONMENT_CODE);
scheduleId = (int) result.get("scheduleId");
} else {
scheduleId = scheduleObj.getId();
- processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode,
+ processDefinitionService.releaseProcessDefinition(user, projectCode, workflowCode,
ReleaseState.OFFLINE);
schedulerService.updateSchedule(user, projectCode, scheduleId, schedule, WarningType.valueOf(warningType),
warningGroupId, DEFAULT_FAILURE_STRATEGY, DEFAULT_PRIORITY, workerGroup, DEFAULT_ENVIRONMENT_CODE);
@@ -349,20 +349,20 @@ public class PythonGateway {
schedulerService.setScheduleState(user, projectCode, scheduleId, ReleaseState.ONLINE);
}
- public void execProcessInstance(String userName,
- String projectName,
- String processDefinitionName,
- String cronTime,
- String workerGroup,
- String warningType,
- Integer warningGroupId,
- Integer timeout) {
+ public void execWorkflowInstance(String userName,
+ String projectName,
+ String workflowName,
+ String cronTime,
+ String workerGroup,
+ String warningType,
+ Integer warningGroupId,
+ Integer timeout) {
User user = usersService.queryUser(userName);
Project project = projectMapper.queryByName(projectName);
ProcessDefinition processDefinition =
- processDefinitionMapper.queryByDefineName(project.getCode(), processDefinitionName);
+ processDefinitionMapper.queryByDefineName(project.getCode(), workflowName);
- // make sure process definition online
+ // make sure workflow online
processDefinitionService.releaseProcessDefinition(user, project.getCode(), processDefinition.getCode(),
ReleaseState.ONLINE);
@@ -391,7 +391,7 @@ public class PythonGateway {
// side object
/*
* Grant project's permission to user. Use when project's created user not current but Python API use it to change
- * process definition.
+ * workflow.
*/
private Integer grantProjectToUser(Project project, User user) {
Date now = new Date();
@@ -512,31 +512,31 @@ public class PythonGateway {
}
/**
- * Get processDefinition by given processDefinitionName name. It return map contain processDefinition id, name, code.
- * Useful in Python API create subProcess task which need processDefinition information.
+ * Get workflow object by given workflow name. It returns map contain workflow id, name, code.
+ * Useful in Python API create subProcess task which need workflow information.
*
* @param userName user who create or update schedule
- * @param projectName project name which process definition belongs to
- * @param processDefinitionName process definition name
+ * @param projectName project name which workflow belongs to
+ * @param workflowName workflow name
*/
- public Map<String, Object> getProcessDefinitionInfo(String userName, String projectName,
- String processDefinitionName) {
+ public Map<String, Object> getWorkflowInfo(String userName, String projectName,
+ String workflowName) {
Map<String, Object> result = new HashMap<>();
User user = usersService.queryUser(userName);
Project project = (Project) projectService.queryByName(user, projectName).get(Constants.DATA_LIST);
long projectCode = project.getCode();
- ProcessDefinition processDefinition = getProcessDefinition(user, projectCode, processDefinitionName);
- // get process definition info
+ ProcessDefinition processDefinition = getWorkflow(user, projectCode, workflowName);
+ // get workflow info
if (processDefinition != null) {
- // make sure process definition online
+ // make sure workflow online
processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinition.getCode(),
ReleaseState.ONLINE);
result.put("id", processDefinition.getId());
result.put("name", processDefinition.getName());
result.put("code", processDefinition.getCode());
} else {
- String msg = String.format("Can not find valid process definition by name %s", processDefinitionName);
+ String msg = String.format("Can not find valid workflow by name %s", workflowName);
logger.error(msg);
throw new IllegalArgumentException(msg);
}
@@ -545,14 +545,14 @@ public class PythonGateway {
}
/**
- * Get project, process definition, task code.
- * Useful in Python API create dependent task which need processDefinition information.
+ * Get project, workflow, task code.
+ * Useful in Python API create dependent task which need workflow information.
*
- * @param projectName project name which process definition belongs to
- * @param processDefinitionName process definition name
+ * @param projectName project name which workflow belongs to
+ * @param workflowName workflow name
* @param taskName task name
*/
- public Map<String, Object> getDependentInfo(String projectName, String processDefinitionName, String taskName) {
+ public Map<String, Object> getDependentInfo(String projectName, String workflowName, String taskName) {
Map<String, Object> result = new HashMap<>();
Project project = projectMapper.queryByName(projectName);
@@ -565,9 +565,9 @@ public class PythonGateway {
result.put("projectCode", projectCode);
ProcessDefinition processDefinition =
- processDefinitionMapper.queryByDefineName(projectCode, processDefinitionName);
+ processDefinitionMapper.queryByDefineName(projectCode, workflowName);
if (processDefinition == null) {
- String msg = String.format("Can not find valid process definition by name %s", processDefinitionName);
+ String msg = String.format("Can not find valid workflow by name %s", workflowName);
logger.error(msg);
throw new IllegalArgumentException(msg);
}
@@ -582,8 +582,8 @@ public class PythonGateway {
}
/**
- * Get resource by given program type and full name. It return map contain resource id, name.
- * Useful in Python API create flink or spark task which need processDefinition information.
+ * Get resource by given program type and full name. It returns map contain resource id, name.
+ * Useful in Python API create flink or spark task which need workflow information.
*
* @param programType program type one of SCALA, JAVA and PYTHON
* @param fullName full name of the resource
@@ -628,7 +628,7 @@ public class PythonGateway {
/**
* Get resource by given resource type and full name. It return map contain resource id, name.
- * Useful in Python API create task which need processDefinition information.
+ * Useful in Python API create task which need workflow information.
*
* @param userName user who query resource
* @param fullName full name of the resource