You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by GitBox <gi...@apache.org> on 2022/02/23 06:35:28 UTC

[GitHub] [dolphinscheduler] SbloodyS commented on a change in pull request #8496: [Feature][MasterServer] Dependent tasks can re-run automatically in the case of complement

SbloodyS commented on a change in pull request #8496:
URL: https://github.com/apache/dolphinscheduler/pull/8496#discussion_r812592610



##########
File path: dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml
##########
@@ -88,4 +88,23 @@
          where project_code = #{projectCode}
                 and process_definition_code = #{processDefinitionCode}
     </select>
+
+    <select id="queryDependentProcessDefinitionByProcessDefinitionCode" resultType="DependentProcessDefinition">
+        SELECT
+        c.code AS process_definition_code
+        ,c.name AS process_definition_name
+        ,a.code AS task_definition_code
+        ,a.task_params
+        ,d.worker_group
+        FROM
+        t_ds_task_definition a
+        JOIN t_ds_process_task_relation b ON a.code	= b.pre_task_code and a.version = b.pre_task_version
+        JOIN t_ds_process_definition c ON c.code = b.process_definition_code AND c.version = b.process_definition_version AND c.project_code = b.project_code

Review comment:
       This project_code is just to find the task definition’s process definition. It won't affect cross projects.

##########
File path: dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
##########
@@ -673,14 +691,80 @@ private int createComplementCommandList(Date start, Date end, RunMode runMode, C
                         cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(listDate.get(endDateIndex)));
                         command.setCommandParam(JSONUtils.toJsonString(cmdParam));
                         processService.createCommand(command);
+
+                        if (schedules.isEmpty() || complementDependentMode == ComplementDependentMode.OFF_MODE) {
+                            logger.info("process code: {} complement dependent in off mode or schedule's size is 0, skip "
+                                    + "dependent complement data", command.getProcessDefinitionCode());
+                        } else {
+                            dependentProcessDefinitionCreateCount += createComplementDependentCommand(schedules, command);
+                        }
                     }
                 }
                 break;
             }
             default:
                 break;
         }
-        logger.info("create complement command count: {}", createCount);
+        logger.info("create complement command count: {}, create dependent complement command count: {}", createCount
+                , dependentProcessDefinitionCreateCount);
         return createCount;
     }
+
+    /**
+     * create complement dependent command
+     */
+    @SuppressWarnings("checkstyle:OperatorWrap")
+    private int createComplementDependentCommand(List<Schedule> schedules, Command command) {
+        int dependentProcessDefinitionCreateCount = 0;
+        Command dependentCommand;
+
+        try {
+            dependentCommand = (Command) BeanUtils.cloneBean(command);

Review comment:
       I'll remove it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org