You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by GitBox <gi...@apache.org> on 2022/05/07 09:55:04 UTC

[GitHub] [dolphinscheduler] caishunfeng commented on a diff in pull request #9919: [Fix-8828] [Master] Assign tasks to worker optimization

caishunfeng commented on code in PR #9919:
URL: https://github.com/apache/dolphinscheduler/pull/9919#discussion_r867329374


##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java:
##########
@@ -71,30 +77,24 @@ public ExecutorDispatcher() {
      * @throws ExecuteException if error throws ExecuteException
      */
     public Boolean dispatch(final ExecutionContext context) throws ExecuteException {
-        /**
-         * get executor manager
-         */
+        // get executor manager
         ExecutorManager<Boolean> executorManager = this.executorManagers.get(context.getExecutorType());
         if (executorManager == null) {
             throw new ExecuteException("no ExecutorManager for type : " + context.getExecutorType());
         }
 
-        /**
-         * host select
-         */
-
+        // host select
         Host host = hostManager.select(context);
         if (StringUtils.isEmpty(host.getAddress())) {
-            throw new ExecuteException(String.format("fail to execute : %s due to no suitable worker, "
-                            + "current task needs worker group %s to execute",
-                    context.getCommand(),context.getWorkerGroup()));
+            ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);

Review Comment:
   Why to sleep here? Dispatcher is a single thread, we should avoid to sleep here.



##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobin.java:
##########
@@ -45,7 +51,21 @@ public HostWeight doSelect(Collection<HostWeight> sources) {
         }
         lowerNode.setCurrentWeight(lowerNode.getCurrentWeight() + totalWeight);
         return lowerNode;
+    }
 
+    private List<HostWeight> canAssignTaskHost(Collection<HostWeight> sources) {
+        List<HostWeight> zeroWaitingTask = sources.stream().filter(h -> h.getWaitingTaskCount() == 0).collect(Collectors.toList());
+        if (!zeroWaitingTask.isEmpty()) {
+            return zeroWaitingTask;
+        }
+        HostWeight hostWeight = sources.stream().min(Comparator.comparing(HostWeight::getWaitingTaskCount)).get();
+        List<HostWeight> waitingTask = Lists.newArrayList(hostWeight);
+        List<HostWeight> equalWaitingTask = sources.stream().filter(h -> !h.getHost().equals(hostWeight.getHost()) && h.getWaitingTaskCount() == hostWeight.getWaitingTaskCount())

Review Comment:
   Why not just return the hostWeight list which order by waitingTaskCount asc?



##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThread.java:
##########
@@ -221,4 +227,23 @@ private void handleResultEvent(TaskEvent taskEvent, TaskInstance taskInstance) {
             channel.writeAndFlush(taskExecuteResponseAckCommand.convert2Command());
         }
     }
+
+    /**
+     * handle result event
+     */
+    private void handleWorkerRejectEvent(Channel channel, TaskInstance taskInstance, WorkflowExecuteThread executeThread) {
+        try {
+            if (executeThread != null) {
+                executeThread.resubmit(taskInstance.getTaskCode());
+            }
+            if (channel != null) {
+                TaskRecallAckCommand taskRecallAckCommand = new TaskRecallAckCommand(ExecutionStatus.SUCCESS.getCode(), taskInstance.getId());
+                channel.writeAndFlush(taskRecallAckCommand.convert2Command());
+            }
+        } catch (Exception e) {
+            logger.error("worker reject error", e);

Review Comment:
   ```suggestion
               logger.error("handle worker reject event error", e);
   ```



##########
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java:
##########
@@ -47,7 +51,7 @@ public class WorkerManagerThread implements Runnable {
     /**
      * task queue
      */
-    private final DelayQueue<TaskExecuteThread> workerExecuteQueue = new DelayQueue<>();

Review Comment:
   The DelayQueue will take effect of the delay strategy, so it's useful and don't remove it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org