You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by zh...@apache.org on 2022/11/28 14:01:56 UTC
[dolphinscheduler] 02/05: [feat] Support set execute type to pydolphinscheduler (#12871)
This is an automated email from the ASF dual-hosted git repository.
zhongjiajie pushed a commit to branch 3.1.2-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
commit 696d8ae7f1357d3c10d781ade4c67d5a53cb2e27
Author: Jay Chung <zh...@gmail.com>
AuthorDate: Sat Nov 12 10:31:08 2022 +0800
[feat] Support set execute type to pydolphinscheduler (#12871)
Up to now, we can only submit workflow with parallel mode. this patch give users ability specific execute type to workflow
(cherry picked from commit 87a88e36627017607c73cfc66a92be08d1da22ee)
---
.../dolphinscheduler/api/python/PythonGateway.java | 20 +++++++++++++-------
1 file changed, 13 insertions(+), 7 deletions(-)
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
index 00a817a569..d9f0c78674 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
@@ -234,28 +234,34 @@ public class PythonGateway {
String taskRelationJson,
String taskDefinitionJson,
String otherParamsJson,
- ProcessExecutionTypeEnum executionType) {
+ String executionType) {
User user = usersService.queryUser(userName);
Project project = projectMapper.queryByName(projectName);
long projectCode = project.getCode();
ProcessDefinition processDefinition = getProcessDefinition(user, projectCode, name);
+ ProcessExecutionTypeEnum executionTypeEnum = ProcessExecutionTypeEnum.valueOf(executionType);
long processDefinitionCode;
// create or update process definition
if (processDefinition != null) {
processDefinitionCode = processDefinition.getCode();
// make sure process definition offline which could edit
- processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode, ReleaseState.OFFLINE);
- Map<String, Object> result = processDefinitionService.updateProcessDefinition(user, projectCode, name, processDefinitionCode, description, globalParams,
- null, timeout, tenantCode, taskRelationJson, taskDefinitionJson, otherParamsJson, executionType);
+ processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode,
+ ReleaseState.OFFLINE);
+ processDefinitionService.updateProcessDefinition(user, projectCode, name,
+ processDefinitionCode, description, globalParams,
+ null, timeout, tenantCode, taskRelationJson, taskDefinitionJson, otherParamsJson,
+ executionTypeEnum);
} else {
- Map<String, Object> result = processDefinitionService.createProcessDefinition(user, projectCode, name, description, globalParams,
- null, timeout, tenantCode, taskRelationJson, taskDefinitionJson, otherParamsJson, executionType);
+ Map<String, Object> result = processDefinitionService.createProcessDefinition(user, projectCode, name,
+ description, globalParams,
+ null, timeout, tenantCode, taskRelationJson, taskDefinitionJson, otherParamsJson,
+ executionTypeEnum);
processDefinition = (ProcessDefinition) result.get(Constants.DATA_LIST);
processDefinitionCode = processDefinition.getCode();
}
- // Fresh process definition schedule
+ // Fresh process definition schedule
if (schedule != null) {
createOrUpdateSchedule(user, projectCode, processDefinitionCode, schedule, workerGroup, warningType, warningGroupId);
}