You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by GitBox <gi...@apache.org> on 2022/06/29 15:16:27 UTC

[GitHub] [dolphinscheduler] caishunfeng commented on a diff in pull request #10667: [Fix-10666] Workflow submit failed will still in memory and never retry

caishunfeng commented on code in PR #10667:
URL: https://github.com/apache/dolphinscheduler/pull/10667#discussion_r909999242


##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java:
##########
@@ -164,38 +172,53 @@ private void scheduleWorkflow() throws InterruptedException {
 
         List<ProcessInstance> processInstances = command2ProcessInstance(commands);
         if (CollectionUtils.isEmpty(processInstances)) {
+            // indicate that the command transform to processInstance error, sleep for 1s
+            Thread.sleep(Constants.SLEEP_TIME_MILLIS);
             return;
         }
         MasterServerMetrics.incMasterConsumeCommand(commands.size());
 
         for (ProcessInstance processInstance : processInstances) {
-            try {
-                LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
-                logger.info("Master schedule service starting workflow instance");
-                final WorkflowExecuteRunnable workflowExecuteRunnable = new WorkflowExecuteRunnable(
-                    processInstance
-                    , processService
-                    , nettyExecutorManager
-                    , processAlertManager
-                    , masterConfig
-                    , stateWheelExecuteThread
-                    , curingGlobalParamsService);
-
-                this.processInstanceExecCacheManager.cache(processInstance.getId(), workflowExecuteRunnable);
-                if (processInstance.getTimeout() > 0) {
-                    stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance);
-                }
-                ProcessInstanceMetrics.incProcessInstanceSubmit();
-                workflowExecuteThreadPool.submit(workflowExecuteRunnable);
-                logger.info("Master schedule service started workflow instance");
-
-            } catch (Exception ex) {
-                processInstanceExecCacheManager.removeByProcessInstanceId(processInstance.getId());
-                stateWheelExecuteThread.removeProcess4TimeoutCheck(processInstance.getId());
-                logger.info("Master submit workflow to thread pool failed, will remove workflow runnable from cache manager", ex);
-            } finally {
-                LoggerUtils.removeWorkflowInstanceIdMDC();
+            submitProcessInstance(processInstance);
+        }
+    }
+
+    private void submitProcessInstance(@NonNull ProcessInstance processInstance) {
+        try {
+            LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
+            logger.info("Master schedule service starting workflow instance");
+            final WorkflowExecuteRunnable workflowExecuteRunnable = new WorkflowExecuteRunnable(
+                processInstance
+                , processService
+                , nettyExecutorManager
+                , processAlertManager
+                , masterConfig
+                , stateWheelExecuteThread
+                , curingGlobalParamsService);
+
+            this.processInstanceExecCacheManager.cache(processInstance.getId(), workflowExecuteRunnable);
+            if (processInstance.getTimeout() > 0) {
+                stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance);
             }
+            ProcessInstanceMetrics.incProcessInstanceSubmit();
+            CompletableFuture<WorkflowSubmitStatue> workflowSubmitFuture = CompletableFuture.supplyAsync(
+                workflowExecuteRunnable::call, workflowExecuteThreadPool);
+            workflowSubmitFuture.thenAccept(workflowSubmitStatue -> {
+                if (WorkflowSubmitStatue.FAILED == workflowSubmitStatue) {
+                    // submit failed
+                    processInstanceExecCacheManager.removeByProcessInstanceId(processInstance.getId());
+                    stateWheelExecuteThread.removeProcess4TimeoutCheck(processInstance.getId());
+                    submitFailedProcessInstances.add(processInstance);
+                }
+            });
+            logger.info("Master schedule service started workflow instance");

Review Comment:
   ```suggestion
               logger.info("Master schedule service started workflow instance, id:{}, name:{} ", processInstance.getId(), processInstance.getName());
   ```



##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java:
##########
@@ -164,38 +172,53 @@ private void scheduleWorkflow() throws InterruptedException {
 
         List<ProcessInstance> processInstances = command2ProcessInstance(commands);
         if (CollectionUtils.isEmpty(processInstances)) {
+            // indicate that the command transform to processInstance error, sleep for 1s
+            Thread.sleep(Constants.SLEEP_TIME_MILLIS);
             return;
         }
         MasterServerMetrics.incMasterConsumeCommand(commands.size());
 
         for (ProcessInstance processInstance : processInstances) {
-            try {
-                LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
-                logger.info("Master schedule service starting workflow instance");
-                final WorkflowExecuteRunnable workflowExecuteRunnable = new WorkflowExecuteRunnable(
-                    processInstance
-                    , processService
-                    , nettyExecutorManager
-                    , processAlertManager
-                    , masterConfig
-                    , stateWheelExecuteThread
-                    , curingGlobalParamsService);
-
-                this.processInstanceExecCacheManager.cache(processInstance.getId(), workflowExecuteRunnable);
-                if (processInstance.getTimeout() > 0) {
-                    stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance);
-                }
-                ProcessInstanceMetrics.incProcessInstanceSubmit();
-                workflowExecuteThreadPool.submit(workflowExecuteRunnable);
-                logger.info("Master schedule service started workflow instance");
-
-            } catch (Exception ex) {
-                processInstanceExecCacheManager.removeByProcessInstanceId(processInstance.getId());
-                stateWheelExecuteThread.removeProcess4TimeoutCheck(processInstance.getId());
-                logger.info("Master submit workflow to thread pool failed, will remove workflow runnable from cache manager", ex);
-            } finally {
-                LoggerUtils.removeWorkflowInstanceIdMDC();
+            submitProcessInstance(processInstance);
+        }
+    }
+
+    private void submitProcessInstance(@NonNull ProcessInstance processInstance) {
+        try {
+            LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
+            logger.info("Master schedule service starting workflow instance");

Review Comment:
   add processInstance id or name



##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java:
##########
@@ -164,38 +172,53 @@ private void scheduleWorkflow() throws InterruptedException {
 
         List<ProcessInstance> processInstances = command2ProcessInstance(commands);
         if (CollectionUtils.isEmpty(processInstances)) {
+            // indicate that the command transform to processInstance error, sleep for 1s
+            Thread.sleep(Constants.SLEEP_TIME_MILLIS);
             return;
         }
         MasterServerMetrics.incMasterConsumeCommand(commands.size());
 
         for (ProcessInstance processInstance : processInstances) {
-            try {
-                LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
-                logger.info("Master schedule service starting workflow instance");
-                final WorkflowExecuteRunnable workflowExecuteRunnable = new WorkflowExecuteRunnable(
-                    processInstance
-                    , processService
-                    , nettyExecutorManager
-                    , processAlertManager
-                    , masterConfig
-                    , stateWheelExecuteThread
-                    , curingGlobalParamsService);
-
-                this.processInstanceExecCacheManager.cache(processInstance.getId(), workflowExecuteRunnable);
-                if (processInstance.getTimeout() > 0) {
-                    stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance);
-                }
-                ProcessInstanceMetrics.incProcessInstanceSubmit();
-                workflowExecuteThreadPool.submit(workflowExecuteRunnable);
-                logger.info("Master schedule service started workflow instance");
-
-            } catch (Exception ex) {
-                processInstanceExecCacheManager.removeByProcessInstanceId(processInstance.getId());
-                stateWheelExecuteThread.removeProcess4TimeoutCheck(processInstance.getId());
-                logger.info("Master submit workflow to thread pool failed, will remove workflow runnable from cache manager", ex);
-            } finally {
-                LoggerUtils.removeWorkflowInstanceIdMDC();
+            submitProcessInstance(processInstance);
+        }
+    }
+
+    private void submitProcessInstance(@NonNull ProcessInstance processInstance) {
+        try {
+            LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
+            logger.info("Master schedule service starting workflow instance");
+            final WorkflowExecuteRunnable workflowExecuteRunnable = new WorkflowExecuteRunnable(
+                processInstance
+                , processService
+                , nettyExecutorManager
+                , processAlertManager
+                , masterConfig
+                , stateWheelExecuteThread
+                , curingGlobalParamsService);
+
+            this.processInstanceExecCacheManager.cache(processInstance.getId(), workflowExecuteRunnable);
+            if (processInstance.getTimeout() > 0) {
+                stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance);
             }
+            ProcessInstanceMetrics.incProcessInstanceSubmit();
+            CompletableFuture<WorkflowSubmitStatue> workflowSubmitFuture = CompletableFuture.supplyAsync(
+                workflowExecuteRunnable::call, workflowExecuteThreadPool);
+            workflowSubmitFuture.thenAccept(workflowSubmitStatue -> {
+                if (WorkflowSubmitStatue.FAILED == workflowSubmitStatue) {
+                    // submit failed
+                    processInstanceExecCacheManager.removeByProcessInstanceId(processInstance.getId());
+                    stateWheelExecuteThread.removeProcess4TimeoutCheck(processInstance.getId());
+                    submitFailedProcessInstances.add(processInstance);
+                }
+            });
+            logger.info("Master schedule service started workflow instance");
+
+        } catch (Exception ex) {
+            processInstanceExecCacheManager.removeByProcessInstanceId(processInstance.getId());
+            stateWheelExecuteThread.removeProcess4TimeoutCheck(processInstance.getId());
+            logger.info("Master submit workflow to thread pool failed, will remove workflow runnable from cache manager", ex);

Review Comment:
   add processInstance id or name



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org