You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by et...@apache.org on 2021/03/18 16:06:47 UTC

[storm] branch master updated: [STORM-3755] While scheduling multiple ackers with executor use best effort basis. (#3386)

This is an automated email from the ASF dual-hosted git repository.

ethanli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git


The following commit(s) were added to refs/heads/master by this push:
     new 4de0622  [STORM-3755] While scheduling multiple ackers with executor use best effort basis. (#3386)
4de0622 is described below

commit 4de0622c6e25e891419f8ccd002e243470dfe969
Author: Bipin Prasad <bi...@yahoo.com>
AuthorDate: Thu Mar 18 11:06:34 2021 -0500

    [STORM-3755] While scheduling multiple ackers with executor use best effort basis. (#3386)
---
 .../scheduling/BaseResourceAwareStrategy.java      | 61 +++++++++-------------
 1 file changed, 24 insertions(+), 37 deletions(-)

diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
index 4bc4d0b..bdd89ff 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
@@ -485,39 +485,22 @@ public abstract class BaseResourceAwareStrategy implements IStrategy {
                     }
                     progressIdxForExec[execIndex]++;
 
-                    int numBoundAckerAssigned
-                        = assignBoundAckersForNewWorkerSlot(exec, node, workerSlot);
-                    if (numBoundAckerAssigned == -1) {
-                        // This only happens when trying to assign bound ackers to the worker slot and failed.
-                        // Free the entire worker slot and put those bound ackers back to unassigned list
-                        LOG.debug("Failed to assign bound acker for exec={}, comp={}, topo: {} to worker: {}.  Backtracking.",
-                            exec, comp, topoName, workerSlot);
-                        searcherState.freeWorkerSlotWithBoundAckers(node, workerSlot);
-                        continue;
-                    }
-
                     if (!isExecAssignmentToWorkerValid(exec, workerSlot)) {
-                        // This only happens when this exec can not fit in the workerSlot
-                        // and this is not the first exec to this workerSlot.
-                        // So just go to next workerSlot and don't free the worker.
-                        if (numBoundAckerAssigned > 0) {
-                            LOG.debug("Failed to assign exec={}, comp={}, topo={} with bound ackers to worker: {}.  Backtracking.",
-                                exec, comp, topoName, workerSlot);
-                            searcherState.freeWorkerSlotWithBoundAckers(node, workerSlot);
-                        } else {
-                            LOG.debug("Failed to assign exec={}, comp={}, topo={} to worker={} on node=({}, availCpu={}, availMem={}).",
-                                exec, comp, topoName, workerSlot,
-                                node.getId(), node.getAvailableCpuResources(), node.getAvailableMemoryResources());
-                        }
+                        // exec can't fit in this workerSlot, try next workerSlot
+                        LOG.debug("Failed to assign exec={}, comp={}, topo={} to worker={} on node=({}, availCpu={}, availMem={}).",
+                            exec, comp, topoName, workerSlot,
+                            node.getId(), node.getAvailableCpuResources(), node.getAvailableMemoryResources());
                         continue;
                     }
 
                     searcherState.incStatesSearched();
                     searcherState.assignCurrentExecutor(execToComp, node, workerSlot);
+                    int numBoundAckerAssigned = assignBoundAckersForNewWorkerSlot(exec, node, workerSlot);
                     if (numBoundAckerAssigned > 0) {
-                        // This exec with its bounded ackers have all been successfully assigned
+                        // This exec with some of its bounded ackers have all been successfully assigned
                         searcherState.getExecsWithBoundAckers().add(exec);
                     }
+
                     if (searcherState.areAllExecsScheduled()) {
                         //Everything is scheduled correctly, so no need to search any more.
                         LOG.info("scheduleExecutorsOnNodes: Done at loopCnt={} in {}ms, state.elapsedtime={}, backtrackCnt={}, topo={}",
@@ -559,36 +542,40 @@ public abstract class BaseResourceAwareStrategy implements IStrategy {
     /**
      * <p>
      * Determine how many bound ackers to put into the given workerSlot.
-     * Then try to assign the ackers one by one into this workerSlot.
+     * Then try to assign the ackers one by one into this workerSlot upto the calculated
+     * maximum required. Return the number of ackers assigned.
      *
-     * Return -1 only if the bound ackers assignment process failed.
      * Return 0 if one of the conditions hold true:
      *  1. No bound ackers are used.
      *  2. This is not first exec assigned to this worker.
-     * Return positive int if all bound ackers assignments succeed.
+     *  3. No ackers could be assigned because of space or exception.
+     *
      * </p>
      * @param exec              being scheduled.
      * @param node              RasNode on which to schedule.
      * @param workerSlot        WorkerSlot on which to schedule.
-     * @return                  If we successfully assigned bound worker for this exec
+     * @return                  Number of ackers assigned.
      */
     private int assignBoundAckersForNewWorkerSlot(ExecutorDetails exec, RasNode node, WorkerSlot workerSlot) {
         int numOfAckersToBind = searcherState.getNumOfAckersToBind(exec, workerSlot);
         if (numOfAckersToBind > 0) {
             for (int i = 0; i < numOfAckersToBind; i++) {
                 if (!isExecAssignmentToWorkerValid(searcherState.peekUnassignedAckers(), workerSlot)) {
-                    return -1;
-                } else {
-                    try {
-                        searcherState.assignSingleBoundAcker(node, workerSlot);
-                    } catch (Exception e) {
-                        LOG.error("Exception happens when assigning {}th acker executor to workerSlot: {} for topology: {}",
-                                    i, workerSlot, topoName, e);
-                        return -1;
-                    }
+                    LOG.debug("Assigned {} of {} ackers on workerSlot={} with the executor={} for topology={}",
+                        i, numOfAckersToBind, workerSlot, exec, topoName);
+                    return i;
+                }
+                try {
+                    searcherState.assignSingleBoundAcker(node, workerSlot);
+                } catch (Exception e) {
+                    LOG.error("Exception happens when assigning {} of {} ackers on workerSlot={} for topology={}",
+                                i + 1, numOfAckersToBind, workerSlot, topoName, e);
+                    return i;
                 }
             }
         }
+        LOG.debug("Assigned {} ackers on workerSlot={} with the executor={} for topology={}",
+            numOfAckersToBind, workerSlot, exec, topoName);
         return numOfAckersToBind;
     }
 }