You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by et...@apache.org on 2021/03/18 16:06:47 UTC
[storm] branch master updated: [STORM-3755] While scheduling
multiple ackers with executor use best effort basis. (#3386)
This is an automated email from the ASF dual-hosted git repository.
ethanli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push:
new 4de0622 [STORM-3755] While scheduling multiple ackers with executor use best effort basis. (#3386)
4de0622 is described below
commit 4de0622c6e25e891419f8ccd002e243470dfe969
Author: Bipin Prasad <bi...@yahoo.com>
AuthorDate: Thu Mar 18 11:06:34 2021 -0500
[STORM-3755] While scheduling multiple ackers with executor use best effort basis. (#3386)
---
.../scheduling/BaseResourceAwareStrategy.java | 61 +++++++++-------------
1 file changed, 24 insertions(+), 37 deletions(-)
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
index 4bc4d0b..bdd89ff 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
@@ -485,39 +485,22 @@ public abstract class BaseResourceAwareStrategy implements IStrategy {
}
progressIdxForExec[execIndex]++;
- int numBoundAckerAssigned
- = assignBoundAckersForNewWorkerSlot(exec, node, workerSlot);
- if (numBoundAckerAssigned == -1) {
- // This only happens when trying to assign bound ackers to the worker slot and failed.
- // Free the entire worker slot and put those bound ackers back to unassigned list
- LOG.debug("Failed to assign bound acker for exec={}, comp={}, topo: {} to worker: {}. Backtracking.",
- exec, comp, topoName, workerSlot);
- searcherState.freeWorkerSlotWithBoundAckers(node, workerSlot);
- continue;
- }
-
if (!isExecAssignmentToWorkerValid(exec, workerSlot)) {
- // This only happens when this exec can not fit in the workerSlot
- // and this is not the first exec to this workerSlot.
- // So just go to next workerSlot and don't free the worker.
- if (numBoundAckerAssigned > 0) {
- LOG.debug("Failed to assign exec={}, comp={}, topo={} with bound ackers to worker: {}. Backtracking.",
- exec, comp, topoName, workerSlot);
- searcherState.freeWorkerSlotWithBoundAckers(node, workerSlot);
- } else {
- LOG.debug("Failed to assign exec={}, comp={}, topo={} to worker={} on node=({}, availCpu={}, availMem={}).",
- exec, comp, topoName, workerSlot,
- node.getId(), node.getAvailableCpuResources(), node.getAvailableMemoryResources());
- }
+ // exec can't fit in this workerSlot, try next workerSlot
+ LOG.debug("Failed to assign exec={}, comp={}, topo={} to worker={} on node=({}, availCpu={}, availMem={}).",
+ exec, comp, topoName, workerSlot,
+ node.getId(), node.getAvailableCpuResources(), node.getAvailableMemoryResources());
continue;
}
searcherState.incStatesSearched();
searcherState.assignCurrentExecutor(execToComp, node, workerSlot);
+ int numBoundAckerAssigned = assignBoundAckersForNewWorkerSlot(exec, node, workerSlot);
if (numBoundAckerAssigned > 0) {
- // This exec with its bounded ackers have all been successfully assigned
+ // This exec with some of its bounded ackers have all been successfully assigned
searcherState.getExecsWithBoundAckers().add(exec);
}
+
if (searcherState.areAllExecsScheduled()) {
//Everything is scheduled correctly, so no need to search any more.
LOG.info("scheduleExecutorsOnNodes: Done at loopCnt={} in {}ms, state.elapsedtime={}, backtrackCnt={}, topo={}",
@@ -559,36 +542,40 @@ public abstract class BaseResourceAwareStrategy implements IStrategy {
/**
* <p>
* Determine how many bound ackers to put into the given workerSlot.
- * Then try to assign the ackers one by one into this workerSlot.
+ * Then try to assign the ackers one by one into this workerSlot upto the calculated
+ * maximum required. Return the number of ackers assigned.
*
- * Return -1 only if the bound ackers assignment process failed.
* Return 0 if one of the conditions hold true:
* 1. No bound ackers are used.
* 2. This is not first exec assigned to this worker.
- * Return positive int if all bound ackers assignments succeed.
+ * 3. No ackers could be assigned because of space or exception.
+ *
* </p>
* @param exec being scheduled.
* @param node RasNode on which to schedule.
* @param workerSlot WorkerSlot on which to schedule.
- * @return If we successfully assigned bound worker for this exec
+ * @return Number of ackers assigned.
*/
private int assignBoundAckersForNewWorkerSlot(ExecutorDetails exec, RasNode node, WorkerSlot workerSlot) {
int numOfAckersToBind = searcherState.getNumOfAckersToBind(exec, workerSlot);
if (numOfAckersToBind > 0) {
for (int i = 0; i < numOfAckersToBind; i++) {
if (!isExecAssignmentToWorkerValid(searcherState.peekUnassignedAckers(), workerSlot)) {
- return -1;
- } else {
- try {
- searcherState.assignSingleBoundAcker(node, workerSlot);
- } catch (Exception e) {
- LOG.error("Exception happens when assigning {}th acker executor to workerSlot: {} for topology: {}",
- i, workerSlot, topoName, e);
- return -1;
- }
+ LOG.debug("Assigned {} of {} ackers on workerSlot={} with the executor={} for topology={}",
+ i, numOfAckersToBind, workerSlot, exec, topoName);
+ return i;
+ }
+ try {
+ searcherState.assignSingleBoundAcker(node, workerSlot);
+ } catch (Exception e) {
+ LOG.error("Exception happens when assigning {} of {} ackers on workerSlot={} for topology={}",
+ i + 1, numOfAckersToBind, workerSlot, topoName, e);
+ return i;
}
}
}
+ LOG.debug("Assigned {} ackers on workerSlot={} with the executor={} for topology={}",
+ numOfAckersToBind, workerSlot, exec, topoName);
return numOfAckersToBind;
}
}