You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by GitBox <gi...@apache.org> on 2022/12/05 10:03:40 UTC

[GitHub] [dolphinscheduler] zhongjiajie commented on a diff in pull request #13094: [Fix] Fix the version inconsistency when updating workflows, tasks and relationships in openapi.

zhongjiajie commented on code in PR #13094:
URL: https://github.com/apache/dolphinscheduler/pull/13094#discussion_r1039385375


##########
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java:
##########
@@ -1068,11 +1068,23 @@ public void testUpdateProcessDefinitionV2() {
                 ((ServiceException) exception).getCode());
 
         // success
+        Mockito.when(processDefinitionLogMapper.queryMaxVersionForDefinition(processDefinition.getCode()))

Review Comment:
   we should add more test about the version, including
   1. update workflow entity only
   2. update task only
   3. update task dependence only
   
   all three situation should check workflow, workflow task relation correct or not, and all task still in the workflow



##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java:
##########
@@ -2775,12 +2776,104 @@ public ProcessDefinition updateSingleProcessDefinition(User loginUser,
             }
             processDefinitionUpdate.setTenantId(tenant.getId());
         }
-        int update = processDefinitionMapper.updateById(processDefinitionUpdate);
-        if (update <= 0) {
+        int insertVersion = this.saveProcessDefine(loginUser, processDefinitionUpdate);
+        if (insertVersion == 0) {
+            logger.error("Update process definition error, projectCode:{}, processDefinitionName:{}.",
+                    processDefinitionUpdate.getCode(),
+                    processDefinitionUpdate.getName());
             throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
         }
-        this.syncObj2Log(loginUser, processDefinition);
-        return processDefinition;
+
+        int insertRelationVersion = this.saveTaskRelation(loginUser, processDefinitionUpdate, insertVersion);
+        if (insertRelationVersion != Constants.EXIT_CODE_SUCCESS) {
+            logger.error("Save process task relations error, projectCode:{}, processCode:{}, processVersion:{}.",
+                    processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion);
+            throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR);
+        } else {
+            logger.info("Save process task relations complete, projectCode:{}, processCode:{}, processVersion:{}.",
+                    processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion);
+        }

Review Comment:
   good catch



##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java:
##########
@@ -460,10 +474,19 @@ public List<ProcessTaskRelation> updateUpstreamTaskDefinition(User loginUser,
         // create relation not exists
         List<ProcessTaskRelation> processTaskRelations = new ArrayList<>();
         for (long createCode : taskCodeCreates) {
-            TaskDefinition upstreamTask = taskDefinitionMapper.queryByCode(createCode);
+            long upstreamCode = 0L;
+            int version = 0;
+            if (createCode != 0L) {

Review Comment:
   ```suggestion
               // 0 for DAG root, should not, it may already exists and skip to create anymore
               if (createCode != 0L) {
   ```



##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java:
##########
@@ -2775,12 +2776,104 @@ public ProcessDefinition updateSingleProcessDefinition(User loginUser,
             }
             processDefinitionUpdate.setTenantId(tenant.getId());
         }
-        int update = processDefinitionMapper.updateById(processDefinitionUpdate);
-        if (update <= 0) {
+        int insertVersion = this.saveProcessDefine(loginUser, processDefinitionUpdate);
+        if (insertVersion == 0) {
+            logger.error("Update process definition error, projectCode:{}, processDefinitionName:{}.",
+                    processDefinitionUpdate.getCode(),
+                    processDefinitionUpdate.getName());
             throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
         }
-        this.syncObj2Log(loginUser, processDefinition);
-        return processDefinition;
+
+        int insertRelationVersion = this.saveTaskRelation(loginUser, processDefinitionUpdate, insertVersion);
+        if (insertRelationVersion != Constants.EXIT_CODE_SUCCESS) {
+            logger.error("Save process task relations error, projectCode:{}, processCode:{}, processVersion:{}.",
+                    processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion);
+            throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR);
+        } else {
+            logger.info("Save process task relations complete, projectCode:{}, processCode:{}, processVersion:{}.",
+                    processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion);
+        }
+        processDefinitionUpdate.setVersion(insertVersion);
+        return processDefinitionUpdate;
+    }
+    public int saveProcessDefine(User loginUser, ProcessDefinition processDefinition) {
+        ProcessDefinitionLog processDefinitionLog = new ProcessDefinitionLog(processDefinition);
+        Integer version = processDefinitionLogMapper.queryMaxVersionForDefinition(processDefinition.getCode());
+        int insertVersion = version == null || version == 0 ? Constants.VERSION_FIRST : version + 1;
+        processDefinitionLog.setVersion(insertVersion);
+
+        processDefinitionLog.setOperator(loginUser.getId());
+        processDefinitionLog.setOperateTime(processDefinition.getUpdateTime());
+        processDefinitionLog.setId(null);
+        int insertLog = processDefinitionLogMapper.insert(processDefinitionLog);
+        processDefinitionLog.setId(processDefinition.getId());
+        int result = processDefinitionMapper.updateById(processDefinitionLog);

Review Comment:
   Yeah, It seems you directly copy-paste the code from the process service to here, but is it better to refactor them and make them more sense, when we want to operator `processDefinitionMapper ` show use `ProcessDefinition` object instead of  `ProcessDefinitionLog`



##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/task/TaskCreateRequest.java:
##########
@@ -97,7 +97,7 @@ public class TaskCreateRequest {
     private Integer memoryMax;
 
     @Schema(example = "upstream-task-codes1,upstream-task-codes2", description = "use , to split multiple upstream task codes")
-    private String upstreamTasksCodes;
+    private String upstreamTasksCodes = "0";

Review Comment:
   mean the root node of dag, maybe we should add constants for this



##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java:
##########
@@ -2775,12 +2776,104 @@ public ProcessDefinition updateSingleProcessDefinition(User loginUser,
             }
             processDefinitionUpdate.setTenantId(tenant.getId());
         }
-        int update = processDefinitionMapper.updateById(processDefinitionUpdate);
-        if (update <= 0) {
+        int insertVersion = this.saveProcessDefine(loginUser, processDefinitionUpdate);
+        if (insertVersion == 0) {
+            logger.error("Update process definition error, projectCode:{}, processDefinitionName:{}.",
+                    processDefinitionUpdate.getCode(),
+                    processDefinitionUpdate.getName());
             throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
         }
-        this.syncObj2Log(loginUser, processDefinition);
-        return processDefinition;
+
+        int insertRelationVersion = this.saveTaskRelation(loginUser, processDefinitionUpdate, insertVersion);
+        if (insertRelationVersion != Constants.EXIT_CODE_SUCCESS) {
+            logger.error("Save process task relations error, projectCode:{}, processCode:{}, processVersion:{}.",
+                    processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion);
+            throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR);
+        } else {
+            logger.info("Save process task relations complete, projectCode:{}, processCode:{}, processVersion:{}.",
+                    processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion);
+        }
+        processDefinitionUpdate.setVersion(insertVersion);
+        return processDefinitionUpdate;
+    }
+    public int saveProcessDefine(User loginUser, ProcessDefinition processDefinition) {
+        ProcessDefinitionLog processDefinitionLog = new ProcessDefinitionLog(processDefinition);
+        Integer version = processDefinitionLogMapper.queryMaxVersionForDefinition(processDefinition.getCode());
+        int insertVersion = version == null || version == 0 ? Constants.VERSION_FIRST : version + 1;
+        processDefinitionLog.setVersion(insertVersion);
+
+        processDefinitionLog.setOperator(loginUser.getId());
+        processDefinitionLog.setOperateTime(processDefinition.getUpdateTime());
+        processDefinitionLog.setId(null);
+        int insertLog = processDefinitionLogMapper.insert(processDefinitionLog);
+        processDefinitionLog.setId(processDefinition.getId());
+        int result = processDefinitionMapper.updateById(processDefinitionLog);

Review Comment:
   And BTW, as the ProcessDefinitionLog is a snapshot of ProcessDefinition, I think we should better update ProcessDefinition before we ProcessDefinitionLog to make it more sense



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org