You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by GitBox <gi...@apache.org> on 2020/09/17 15:09:00 UTC

[GitHub] [storm] Ethanlm commented on a change in pull request #3328: [STORM-3691] Refactor Resource Aware Strategies.

Ethanlm commented on a change in pull request #3328:
URL: https://github.com/apache/storm/pull/3328#discussion_r490319536



##########
File path: storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/IStrategy.java
##########
@@ -19,21 +19,29 @@
 
 /**
  * An interface to for implementing different scheduling strategies for the resource aware scheduling.
+ * Scheduler should call {@link #prepare(Map)} followed by {@link #schedule(Cluster, TopologyDetails)}.
+ * <p>
+ *     A fully functioning implementation is in the abstract class {@link BaseResourceAwareStrategy}.
+ *     Subclasses classes should extend {@link BaseResourceAwareStrategy#BaseResourceAwareStrategy(boolean, ObjectResourceSortType)}

Review comment:
       ObjectResourceSortType doesn't exist.  

##########
File path: storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
##########
@@ -50,709 +37,308 @@
 import org.apache.storm.scheduler.resource.RasNodes;
 import org.apache.storm.scheduler.resource.SchedulingResult;
 import org.apache.storm.scheduler.resource.SchedulingStatus;
-import org.apache.storm.scheduler.resource.normalization.NormalizedResourceOffer;
-import org.apache.storm.scheduler.resource.normalization.NormalizedResourceRequest;
-import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
-import org.apache.storm.shade.com.google.common.collect.Sets;
+import org.apache.storm.scheduler.resource.strategies.scheduling.sorter.ExecSorterByConnectionCount;
+import org.apache.storm.scheduler.resource.strategies.scheduling.sorter.ExecSorterByProximity;
+import org.apache.storm.scheduler.resource.strategies.scheduling.sorter.IExecSorter;
+import org.apache.storm.scheduler.resource.strategies.scheduling.sorter.INodeSorter;
+import org.apache.storm.scheduler.resource.strategies.scheduling.sorter.NodeSorter;
 import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public abstract class BaseResourceAwareStrategy implements IStrategy {
     private static final Logger LOG = LoggerFactory.getLogger(BaseResourceAwareStrategy.class);
-    protected Cluster cluster;
-    // Rack id to list of host names in that rack
-    private Map<String, List<String>> networkTopography;
-    private final Map<String, String> superIdToRack = new HashMap<>();
-    private final Map<String, String> superIdToHostname = new HashMap<>();
-    private final Map<String, List<RasNode>> hostnameToNodes = new HashMap<>();
-    private final Map<String, List<RasNode>> rackIdToNodes = new HashMap<>();
-    protected RasNodes nodes;
 
-    @VisibleForTesting
-    void prepare(Cluster cluster) {
-        this.cluster = cluster;
-        nodes = new RasNodes(cluster);
-        networkTopography = cluster.getNetworkTopography();
-        Map<String, String> hostToRack = new HashMap<>();
-        for (Map.Entry<String, List<String>> entry : networkTopography.entrySet()) {
-            String rackId = entry.getKey();
-            for (String hostName: entry.getValue()) {
-                hostToRack.put(hostName, rackId);
-            }
-        }
-        for (RasNode node: nodes.getNodes()) {
-            String superId = node.getId();
-            String hostName = node.getHostname();
-            String rackId = hostToRack.getOrDefault(hostName, DNSToSwitchMapping.DEFAULT_RACK);
-            superIdToHostname.put(superId, hostName);
-            superIdToRack.put(superId, rackId);
-            hostnameToNodes.computeIfAbsent(hostName, (hn) -> new ArrayList<>()).add(node);
-            rackIdToNodes.computeIfAbsent(rackId, (hn) -> new ArrayList<>()).add(node);
-        }
-        logClusterInfo();
+    /**
+     * Different node sorting types available. Two of these are for backward compatibility.
+     * The last one (COMMON) is the new sorting type used across the board.
+     * Refer to {@link NodeSorter#NodeSorter(Cluster, TopologyDetails, NodeSortType)} for more details.
+     */
+    public enum NodeSortType {
+        GENERIC_RAS, // for deprecation, Used by GenericResourceAwareStrategyOld
+        DEFAULT_RAS, // for deprecation, Used by DefaultResourceAwareStrategyOld
+        COMMON       // new and only node sorting type going forward
     }
 
-    @Override
-    public void prepare(Map<String, Object> config) {
-        //NOOP
-    }
+    // instance variables from class instantiation
+    protected final boolean sortNodesForEachExecutor;
+    protected final NodeSortType nodeSortType;
+
+    // instance variable set by two IStrategy methods
+    protected Map<String, Object> config;
+    protected Cluster cluster;
+    protected TopologyDetails topologyDetails;
 
-    protected SchedulingResult mkNotEnoughResources(TopologyDetails td) {
-        return  SchedulingResult.failure(
-            SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES,
-            td.getExecutors().size() + " executors not scheduled");
+    // Instance variables derived from Cluster.
+    protected RasNodes nodes;
+    private Map<String, List<String>> networkTopography;
+    private Map<String, List<RasNode>> hostnameToNodes;
+
+    // Instance variables derived from TopologyDetails
+    protected String topoName;
+    protected Map<String, Set<ExecutorDetails>> compToExecs;
+    protected Map<ExecutorDetails, String> execToComp;
+    protected boolean orderExecutorsByProximity;
+    private long maxSchedulingTimeMs;
+
+    // Instance variables from Cluster and TopologyDetails.
+    Set<ExecutorDetails> unassignedExecutors;
+    private int maxStateSearch;
+    protected SchedulingSearcherState searcherState;
+    protected IExecSorter execSorter;
+    protected INodeSorter nodeSorter;
+
+    public BaseResourceAwareStrategy() {
+        this(true, NodeSortType.COMMON);
     }
 
     /**
-     * Schedule executor exec from topology td.
+     * Initialize for the default implementation of schedule().
      *
-     * @param exec           the executor to schedule
-     * @param td             the topology executor exec is a part of
-     * @param scheduledTasks executors that have been scheduled
-     * @return true if scheduled successfully, else false.
+     * @param sortNodesForEachExecutor Sort nodes before scheduling each executor.
+     * @param nodeSortType type of sorting to be applied to object resource collection {@link NodeSortType}.
      */
-    protected boolean scheduleExecutor(
-            ExecutorDetails exec, TopologyDetails td, Collection<ExecutorDetails> scheduledTasks, Iterable<String> sortedNodes) {
-        WorkerSlot targetSlot = findWorkerForExec(exec, td, sortedNodes);
-        if (targetSlot != null) {
-            RasNode targetNode = idToNode(targetSlot.getNodeId());
-            targetNode.assignSingleExecutor(targetSlot, exec, td);
-            scheduledTasks.add(exec);
-            LOG.debug(
-                "TASK {} assigned to Node: {} avail [ mem: {} cpu: {} ] total [ mem: {} cpu: {} ] on "
-                + "slot: {} on Rack: {}",
-                exec,
-                targetNode.getHostname(),
-                targetNode.getAvailableMemoryResources(),
-                targetNode.getAvailableCpuResources(),
-                targetNode.getTotalMemoryResources(),
-                targetNode.getTotalCpuResources(),
-                targetSlot,
-                nodeToRack(targetNode));
-            return true;
-        } else {
-            String comp = td.getExecutorToComponent().get(exec);
-            NormalizedResourceRequest requestedResources = td.getTotalResources(exec);
-            LOG.warn("Not Enough Resources to schedule Task {} - {} {}", exec, comp, requestedResources);
-            return false;
-        }
+    public BaseResourceAwareStrategy(boolean sortNodesForEachExecutor, NodeSortType nodeSortType) {
+        this.sortNodesForEachExecutor = sortNodesForEachExecutor;
+        this.nodeSortType = nodeSortType;
     }
 
-    protected abstract TreeSet<ObjectResources> sortObjectResources(
-        AllResources allResources, ExecutorDetails exec, TopologyDetails topologyDetails,
-        ExistingScheduleFunc existingScheduleFunc
-    );
-
-    /**
-     * Find a worker to schedule executor exec on.
-     *
-     * @param exec the executor to schedule
-     * @param td   the topology that the executor is a part of
-     * @return a worker to assign exec on. Returns null if a worker cannot be successfully found in cluster
-     */
-    protected WorkerSlot findWorkerForExec(ExecutorDetails exec, TopologyDetails td, Iterable<String> sortedNodes) {
-        for (String id : sortedNodes) {
-            RasNode node = nodes.getNodeById(id);
-            if (node.couldEverFit(exec, td)) {
-                for (WorkerSlot ws : node.getSlotsAvailableToScheduleOn()) {
-                    if (node.wouldFit(ws, exec, td)) {
-                        return ws;
-                    }
-                }
-            }
-        }
-        return null;
+    @Override
+    public void prepare(Map<String, Object> config) {
+        this.config = config;
     }
 
     /**
-     * Nodes are sorted by two criteria.
+     * Note that this method is not thread-safe.
+     * Several instance variables are generated from supplied
+     * parameters. In addition, the following instance variables are set to complete scheduling:
+     *  <li>{@link #searcherState}</li>
+     *  <li>{@link #execSorter} to sort executors</li>
+     *  <li>{@link #nodeSorter} to sort nodes</li>
+     * <p>
+     * Scheduling consists of three main steps:
+     *  <li>{@link #prepareForScheduling(Cluster, TopologyDetails)}</li>
+     *  <li>{@link #checkSchedulingFeasibility()}, and</li>
+     *  <li>{@link #scheduleExecutorsOnNodes(List, Iterable)}</li>
+     * </p><p>
+     * The executors and nodes are sorted in the order most conducive to scheduling for the strategy.
+     * Those interfaces may be overridden by subclasses using mutators:
+     *  <li>{@link #setExecSorter(IExecSorter)} and</li>
+     *  <li>{@link #setNodeSorter(INodeSorter)}</li>
+     *</p>
      *
-     * <p>1) the number executors of the topology that needs to be scheduled is already on the node in
-     * descending order. The reasoning to sort based on criterion 1 is so we schedule the rest of a topology on the same node as the
-     * existing executors of the topology.
-     *
-     * <p>2) the subordinate/subservient resource availability percentage of a node in descending
-     * order We calculate the resource availability percentage by dividing the resource availability that have exhausted or little of one of
-     * the resources mentioned above will be ranked after on the node by the resource availability of the entire rack By doing this
-     * calculation, nodes nodes that have more balanced resource availability. So we will be less likely to pick a node that have a lot of
-     * one resource but a low amount of another.
-     *
-     * @param availNodes a list of all the nodes we want to sort
-     * @param rackId     the rack id availNodes are a part of
-     * @return a sorted list of nodes.
+     * @param cluster on which executors will be scheduled.
+     * @param td the topology to schedule for.
+     * @return result of scheduling (success, failure, or null when interrupted).
      */
-    protected TreeSet<ObjectResources> sortNodes(
-            List<RasNode> availNodes, ExecutorDetails exec, TopologyDetails topologyDetails, String rackId,
-            Map<String, AtomicInteger> scheduledCount) {
-        AllResources allRackResources = new AllResources("RACK");
-        List<ObjectResources> nodes = allRackResources.objectResources;
-
-        for (RasNode rasNode : availNodes) {
-            String superId = rasNode.getId();
-            ObjectResources node = new ObjectResources(superId);
-
-            node.availableResources = rasNode.getTotalAvailableResources();
-            node.totalResources = rasNode.getTotalResources();
-
-            nodes.add(node);
-            allRackResources.availableResourcesOverall.add(node.availableResources);
-            allRackResources.totalResourcesOverall.add(node.totalResources);
-
-        }
-
-        LOG.debug(
-            "Rack {}: Overall Avail [ {} ] Total [ {} ]",
-            rackId,
-            allRackResources.availableResourcesOverall,
-            allRackResources.totalResourcesOverall);
-
-        return sortObjectResources(
-            allRackResources,
-            exec,
-            topologyDetails,
-            (superId) -> {
-                AtomicInteger count = scheduledCount.get(superId);
-                if (count == null) {
-                    return 0;
-                }
-                return count.get();
-            });
-    }
-
-    protected List<String> makeHostToNodeIds(List<String> hosts) {
-        if (hosts == null) {
-            return Collections.emptyList();
-        }
-        List<String> ret = new ArrayList<>(hosts.size());
-        for (String host: hosts) {
-            List<RasNode> nodes = hostnameToNodes.get(host);
-            if (nodes != null) {
-                for (RasNode node : nodes) {
-                    ret.add(node.getId());
-                }
-            }
-        }
-        return ret;
-    }
-
-    private static class LazyNodeSortingIterator implements Iterator<String> {
-        private final LazyNodeSorting parent;
-        private final Iterator<ObjectResources> rackIterator;
-        private Iterator<ObjectResources> nodeIterator;
-        private String nextValueFromNode = null;
-        private final Iterator<String> pre;
-        private final Iterator<String> post;
-        private final Set<String> skip;
-
-        LazyNodeSortingIterator(LazyNodeSorting parent,
-                                       TreeSet<ObjectResources> sortedRacks) {
-            this.parent = parent;
-            rackIterator = sortedRacks.iterator();
-            pre = parent.favoredNodeIds.iterator();
-            post = Stream.concat(parent.unFavoredNodeIds.stream(), parent.greyListedSupervisorIds.stream())
-                            .collect(Collectors.toList())
-                            .iterator();
-            skip = parent.skippedNodeIds;
-        }
-
-        private Iterator<ObjectResources> getNodeIterator() {
-            if (nodeIterator != null && nodeIterator.hasNext()) {
-                return nodeIterator;
-            }
-            //need to get the next node iterator
-            if (rackIterator.hasNext()) {
-                ObjectResources rack = rackIterator.next();
-                final String rackId = rack.id;
-                nodeIterator = parent.getSortedNodesFor(rackId).iterator();
-                return nodeIterator;
-            }
-
-            return null;
-        }
-
-        @Override
-        public boolean hasNext() {
-            if (pre.hasNext()) {
-                return true;
-            }
-            if (nextValueFromNode != null) {
-                return true;
-            }
-            while (true) {
-                //For the node we don't know if we have another one unless we look at the contents
-                Iterator<ObjectResources> nodeIterator = getNodeIterator();
-                if (nodeIterator == null || !nodeIterator.hasNext()) {
-                    break;
-                }
-                String tmp = nodeIterator.next().id;
-                if (!skip.contains(tmp)) {
-                    nextValueFromNode = tmp;
-                    return true;
-                }
-            }
-            if (post.hasNext()) {
-                return true;
-            }
-            return false;
-        }
-
-        @Override
-        public String next() {
-            if (!hasNext()) {
-                throw new NoSuchElementException();
-            }
-            if (pre.hasNext()) {
-                return pre.next();
-            }
-            if (nextValueFromNode != null) {
-                String tmp = nextValueFromNode;
-                nextValueFromNode = null;
-                return tmp;
-            }
-            return post.next();
-        }
-    }
-
-    private class LazyNodeSorting implements Iterable<String> {
-        private final Map<String, AtomicInteger> perNodeScheduledCount = new HashMap<>();
-        private final TreeSet<ObjectResources> sortedRacks;
-        private final Map<String, TreeSet<ObjectResources>> cachedNodes = new HashMap<>();
-        private final ExecutorDetails exec;
-        private final TopologyDetails td;
-        private final List<String> favoredNodeIds;
-        private final List<String> unFavoredNodeIds;
-        private final List<String> greyListedSupervisorIds;
-        private final Set<String> skippedNodeIds = new HashSet<>();
-
-        LazyNodeSorting(TopologyDetails td, ExecutorDetails exec,
-                               List<String> favoredNodeIds, List<String> unFavoredNodeIds) {
-            this.favoredNodeIds = favoredNodeIds;
-            this.unFavoredNodeIds = unFavoredNodeIds;
-            this.greyListedSupervisorIds = cluster.getGreyListedSupervisors();
-            this.unFavoredNodeIds.removeAll(favoredNodeIds);
-            this.favoredNodeIds.removeAll(greyListedSupervisorIds);
-            this.unFavoredNodeIds.removeAll(greyListedSupervisorIds);
-            skippedNodeIds.addAll(favoredNodeIds);
-            skippedNodeIds.addAll(unFavoredNodeIds);
-            skippedNodeIds.addAll(greyListedSupervisorIds);
-
-            this.td = td;
-            this.exec = exec;
-            String topoId = td.getId();
-            SchedulerAssignment assignment = cluster.getAssignmentById(topoId);
-            if (assignment != null) {
-                for (Map.Entry<WorkerSlot, Collection<ExecutorDetails>> entry :
-                    assignment.getSlotToExecutors().entrySet()) {
-                    String superId = entry.getKey().getNodeId();
-                    perNodeScheduledCount.computeIfAbsent(superId, (sid) -> new AtomicInteger(0))
-                        .getAndAdd(entry.getValue().size());
-                }
-            }
-            sortedRacks = sortRacks(exec, td);
-        }
-
-        private TreeSet<ObjectResources> getSortedNodesFor(String rackId) {
-            return cachedNodes.computeIfAbsent(rackId,
-                (rid) -> sortNodes(rackIdToNodes.getOrDefault(rid, Collections.emptyList()), exec, td, rid, perNodeScheduledCount));
-        }
-
-        @Override
-        public Iterator<String> iterator() {
-            return new LazyNodeSortingIterator(this, sortedRacks);
-        }
-    }
-
-    protected Iterable<String> sortAllNodes(TopologyDetails td, ExecutorDetails exec,
-                                            List<String> favoredNodeIds, List<String> unFavoredNodeIds) {
-        return new LazyNodeSorting(td, exec, favoredNodeIds, unFavoredNodeIds);
-    }
-
-    private AllResources createClusterAllResources() {
-        AllResources allResources = new AllResources("Cluster");
-        List<ObjectResources> racks = allResources.objectResources;
-
-        //This is the first time so initialize the resources.
-        for (Map.Entry<String, List<String>> entry : networkTopography.entrySet()) {
-            String rackId = entry.getKey();
-            List<String> nodeHosts = entry.getValue();
-            ObjectResources rack = new ObjectResources(rackId);
-            racks.add(rack);
-            for (String nodeHost : nodeHosts) {
-                for (RasNode node : hostnameToNodes(nodeHost)) {
-                    rack.availableResources.add(node.getTotalAvailableResources());
-                    rack.totalResources.add(node.getTotalAvailableResources());
-                }
-            }
-
-            allResources.totalResourcesOverall.add(rack.totalResources);
-            allResources.availableResourcesOverall.add(rack.availableResources);
+    @Override
+    public SchedulingResult schedule(Cluster cluster, TopologyDetails td) {
+        prepareForScheduling(cluster, td);
+        // early detection of success or failure
+        SchedulingResult earlyResult = checkSchedulingFeasibility();
+        if (earlyResult != null) {
+            return earlyResult;
         }
 
-        LOG.debug(
-            "Cluster Overall Avail [ {} ] Total [ {} ]",
-            allResources.availableResourcesOverall,
-            allResources.totalResourcesOverall);
-        return allResources;
-    }
+        LOG.debug("Topology {} {} Number of ExecutorsNeedScheduling: {}", topoName, topologyDetails.getId(), unassignedExecutors.size());
 
-    private Map<String, AtomicInteger> getScheduledCount(TopologyDetails topologyDetails) {
-        String topoId = topologyDetails.getId();
-        SchedulerAssignment assignment = cluster.getAssignmentById(topoId);
-        Map<String, AtomicInteger> scheduledCount = new HashMap<>();
-        if (assignment != null) {
-            for (Map.Entry<WorkerSlot, Collection<ExecutorDetails>> entry :
-                assignment.getSlotToExecutors().entrySet()) {
-                String superId = entry.getKey().getNodeId();
-                String rackId = superIdToRack.get(superId);
-                scheduledCount.computeIfAbsent(rackId, (rid) -> new AtomicInteger(0))
-                    .getAndAdd(entry.getValue().size());
-            }
+        //order executors to be scheduled
+        List<ExecutorDetails> orderedExecutors = execSorter.sortExecutors(unassignedExecutors);
+        Iterable<String> sortedNodes = null;
+        if (!this.sortNodesForEachExecutor) {
+            sortedNodes = nodeSorter.sortAllNodes(null);
         }
-        return scheduledCount;
+        return scheduleExecutorsOnNodes(orderedExecutors, sortedNodes);
     }
 
     /**
-     * Racks are sorted by two criteria.
-     *
-     * <p>1) the number executors of the topology that needs to be scheduled is already on the rack in descending order.
-     * The reasoning to sort based on criterion 1 is so we schedule the rest of a topology on the same rack as the existing executors of the
-     * topology.
+     * Initialize instance variables as the first step in {@link #schedule(Cluster, TopologyDetails)}.
+     * This method may be extended by subclasses to initialize additional variables as in
+     * {@link ConstraintSolverStrategy#prepareForScheduling(Cluster, TopologyDetails)}.
      *
-     * <p>2) the subordinate/subservient resource availability percentage of a rack in descending order We calculate
-     * the resource availability percentage by dividing the resource availability on the rack by the resource availability of the  entire
-     * cluster By doing this calculation, racks that have exhausted or little of one of the resources mentioned above will be ranked after
-     * racks that have more balanced resource availability. So we will be less likely to pick a rack that have a lot of one resource but a
-     * low amount of another.
-     *
-     * @return a sorted list of racks
+     * @param cluster on which executors will be scheduled.
+     * @param topologyDetails to be scheduled.
      */
-    @VisibleForTesting
-    TreeSet<ObjectResources> sortRacks(ExecutorDetails exec, TopologyDetails topologyDetails) {
-
-        final AllResources allResources = createClusterAllResources();
-        final Map<String, AtomicInteger> scheduledCount = getScheduledCount(topologyDetails);
-
-        return sortObjectResources(
-            allResources,
-            exec,
-            topologyDetails,
-            (rackId) -> {
-                AtomicInteger count = scheduledCount.get(rackId);
-                if (count == null) {
-                    return 0;
-                }
-                return count.get();
-            });
+    protected void prepareForScheduling(Cluster cluster, TopologyDetails topologyDetails) {
+        this.cluster = cluster;
+        this.topologyDetails = topologyDetails;
+
+        // from Cluster
+        this.nodes = new RasNodes(cluster);
+        networkTopography = cluster.getNetworkTopography();
+        hostnameToNodes = this.nodes.getHostnameToNodes();
+
+        // from TopologyDetails
+        topoName = topologyDetails.getName();
+        execToComp = topologyDetails.getExecutorToComponent();
+        compToExecs = topologyDetails.getComponentToExecutors();
+        Map<String, Object> topoConf = topologyDetails.getConf();
+        orderExecutorsByProximity = isOrderByProximity(topoConf);
+        maxSchedulingTimeMs = computeMaxSchedulingTimeMs(topoConf);
+
+        // From Cluster and TopologyDetails - and cleaned-up
+        unassignedExecutors = Collections.unmodifiableSet(new HashSet<>(cluster.getUnassignedExecutors(topologyDetails)));
+        int confMaxStateSearch = getMaxStateSearchFromTopoConf(topologyDetails.getConf());
+        int daemonMaxStateSearch = ObjectReader.getInt(cluster.getConf().get(DaemonConfig.RESOURCE_AWARE_SCHEDULER_MAX_STATE_SEARCH));
+        maxStateSearch = Math.min(daemonMaxStateSearch, confMaxStateSearch);
+        LOG.debug("The max state search configured by topology {} is {}", topologyDetails.getId(), confMaxStateSearch);
+        LOG.debug("The max state search that will be used by topology {} is {}", topologyDetails.getId(), maxStateSearch);
+
+        searcherState = createSearcherState();
+        setNodeSorter(new NodeSorter(cluster, topologyDetails, nodeSortType));
+        setExecSorter(orderExecutorsByProximity
+                ? new ExecSorterByProximity(topologyDetails)
+                : new ExecSorterByConnectionCount(topologyDetails));
+
+        logClusterInfo();
     }
 
     /**
-     * Get the rack on which a node is a part of.
+     * Set the pluggable sorter for ExecutorDetails.
      *
-     * @param node the node to find out which rack its on
-     * @return the rack id
+     * @param execSorter to use for sorting executorDetails when scheduling.
      */
-    protected String nodeToRack(RasNode node) {
-        return superIdToRack.get(node.getId());
+    protected void setExecSorter(IExecSorter execSorter) {
+        this.execSorter = execSorter;
     }
 
     /**
-     * sort components by the number of in and out connections that need to be made, in descending order.
+     * Set the pluggable sorter for Nodes.
      *
-     * @param componentMap The components that need to be sorted
-     * @return a sorted set of components
+     * @param nodeSorter to use for sorting nodes when scheduling.
      */
-    private Set<Component> sortComponents(final Map<String, Component> componentMap) {
-        Set<Component> sortedComponents =
-            new TreeSet<>((o1, o2) -> {
-                int connections1 = 0;
-                int connections2 = 0;
-
-                for (String childId : Sets.union(o1.getChildren(), o1.getParents())) {
-                    connections1 +=
-                        (componentMap.get(childId).getExecs().size() * o1.getExecs().size());
-                }
-
-                for (String childId : Sets.union(o2.getChildren(), o2.getParents())) {
-                    connections2 +=
-                        (componentMap.get(childId).getExecs().size() * o2.getExecs().size());
-                }
-
-                if (connections1 > connections2) {
-                    return -1;
-                } else if (connections1 < connections2) {
-                    return 1;
-                } else {
-                    return o1.getId().compareTo(o2.getId());
-                }
-            });
-        sortedComponents.addAll(componentMap.values());
-        return sortedComponents;
+    protected void setNodeSorter(INodeSorter nodeSorter) {
+        this.nodeSorter = nodeSorter;
     }
 
-    /**
-     * Sort a component's neighbors by the number of connections it needs to make with this component.
-     *
-     * @param thisComp     the component that we need to sort its neighbors
-     * @param componentMap all the components to sort
-     * @return a sorted set of components
-     */
-    private Set<Component> sortNeighbors(
-        final Component thisComp, final Map<String, Component> componentMap) {
-        Set<Component> sortedComponents =
-            new TreeSet<>((o1, o2) -> {
-                int connections1 = o1.getExecs().size() * thisComp.getExecs().size();
-                int connections2 = o2.getExecs().size() * thisComp.getExecs().size();
-                if (connections1 < connections2) {
-                    return -1;
-                } else if (connections1 > connections2) {
-                    return 1;
-                } else {
-                    return o1.getId().compareTo(o2.getId());
-                }
-            });
-        sortedComponents.addAll(componentMap.values());
-        return sortedComponents;
+    private static long computeMaxSchedulingTimeMs(Map<String, Object> topoConf) {
+        // expect to be killed by DaemonConfig.SCHEDULING_TIMEOUT_SECONDS_PER_TOPOLOGY seconds, terminate slightly before
+        int daemonMaxTimeSec = ObjectReader.getInt(topoConf.get(DaemonConfig.SCHEDULING_TIMEOUT_SECONDS_PER_TOPOLOGY), 60);
+        int confMaxTimeSec = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_RAS_CONSTRAINT_MAX_TIME_SECS), daemonMaxTimeSec);
+        return (confMaxTimeSec >= daemonMaxTimeSec) ? daemonMaxTimeSec * 1000L - 200L :  confMaxTimeSec * 1000L;
     }
 
-    protected List<ExecutorDetails> orderExecutors(
-        TopologyDetails td, Collection<ExecutorDetails> unassignedExecutors) {
-        Boolean orderByProximity = ObjectReader.getBoolean(
-            td.getConf().get(Config.TOPOLOGY_RAS_ORDER_EXECUTORS_BY_PROXIMITY_NEEDS), false);
-        if (!orderByProximity) {
-            return orderExecutorsDefault(td, unassignedExecutors);
+    public static int getMaxStateSearchFromTopoConf(Map<String, Object> topoConf) {
+        int confMaxStateSearch;
+        if (topoConf.containsKey(Config.TOPOLOGY_RAS_CONSTRAINT_MAX_STATE_SEARCH)) {
+            //this config is always set for topologies of 2.0 or newer versions since it is in defaults.yaml file
+            //topologies of older versions can also use it if configures it explicitly
+            confMaxStateSearch = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_RAS_CONSTRAINT_MAX_STATE_SEARCH));
         } else {
-            LOG.info("{} is set to true", Config.TOPOLOGY_RAS_ORDER_EXECUTORS_BY_PROXIMITY_NEEDS);
-            return orderExecutorsByProximityNeeds(td, unassignedExecutors);
+            // For backwards compatibility
+            confMaxStateSearch = 10_000;
         }
+        return confMaxStateSearch;
     }
 
-    /**
-     * Order executors based on how many in and out connections it will potentially need to make, in descending order. First order
-     * components by the number of in and out connections it will have.  Then iterate through the sorted list of components. For each
-     * component sort the neighbors of that component by how many connections it will have to make with that component. Add an executor from
-     * this component and then from each neighboring component in sorted order. Do this until there is nothing left to schedule.
-     *
-     * @param td                  The topology the executors belong to
-     * @param unassignedExecutors a collection of unassigned executors that need to be assigned. Should only try to assign executors from
-     *                            this list
-     * @return a list of executors in sorted order
-     */
-    private List<ExecutorDetails> orderExecutorsDefault(
-        TopologyDetails td, Collection<ExecutorDetails> unassignedExecutors) {
-        Map<String, Component> componentMap = td.getComponents();
-        List<ExecutorDetails> execsScheduled = new LinkedList<>();
-
-        Map<String, Queue<ExecutorDetails>> compToExecsToSchedule = new HashMap<>();
-        for (Component component : componentMap.values()) {
-            compToExecsToSchedule.put(component.getId(), new LinkedList<>());
-            for (ExecutorDetails exec : component.getExecs()) {
-                if (unassignedExecutors.contains(exec)) {
-                    compToExecsToSchedule.get(component.getId()).add(exec);
-                }
+    public static boolean isOrderByProximity(Map<String, Object> topoConf) {
+        Boolean orderByProximity = (Boolean) topoConf.get(Config.TOPOLOGY_RAS_ORDER_EXECUTORS_BY_PROXIMITY_NEEDS);
+        if (orderByProximity == null) {
+            orderByProximity = (Boolean) topoConf.get(EXPERIMENTAL_TOPOLOGY_RAS_ORDER_EXECUTORS_BY_PROXIMITY_NEEDS);
+            if (orderByProximity == null) {
+                orderByProximity = false;
+            } else {
+                LOG.warn("{} is deprecated; please use {} instead", EXPERIMENTAL_TOPOLOGY_RAS_ORDER_EXECUTORS_BY_PROXIMITY_NEEDS,
+                        Config.TOPOLOGY_RAS_ORDER_EXECUTORS_BY_PROXIMITY_NEEDS);
             }
         }
-
-        Set<Component> sortedComponents = sortComponents(componentMap);
-        sortedComponents.addAll(componentMap.values());
-
-        for (Component currComp : sortedComponents) {
-            Map<String, Component> neighbors = new HashMap<>();
-            for (String compId : Sets.union(currComp.getChildren(), currComp.getParents())) {
-                neighbors.put(compId, componentMap.get(compId));
-            }
-            Set<Component> sortedNeighbors = sortNeighbors(currComp, neighbors);
-            Queue<ExecutorDetails> currCompExesToSched = compToExecsToSchedule.get(currComp.getId());
-
-            boolean flag = false;
-            do {
-                flag = false;
-                if (!currCompExesToSched.isEmpty()) {
-                    execsScheduled.add(currCompExesToSched.poll());
-                    flag = true;
-                }
-
-                for (Component neighborComp : sortedNeighbors) {
-                    Queue<ExecutorDetails> neighborCompExesToSched =
-                        compToExecsToSchedule.get(neighborComp.getId());
-                    if (!neighborCompExesToSched.isEmpty()) {
-                        execsScheduled.add(neighborCompExesToSched.poll());
-                        flag = true;
-                    }
-                }
-            } while (flag);
-        }
-        return execsScheduled;
+        return orderByProximity;
     }
 
     /**
-     * Order executors by network proximity needs.
-     * @param td The topology the executors belong to
-     * @param unassignedExecutors a collection of unassigned executors that need to be unassigned. Should only try to
-     *     assign executors from this list
-     * @return a list of executors in sorted order
+     * Create an instance of {@link SchedulingSearcherState}. This method is called by
+     * {@link #prepareForScheduling(Cluster, TopologyDetails)} and depends on variables initialized therein prior.
+     *
+     * @return a new instance of {@link SchedulingSearcherState}.
      */
-    private List<ExecutorDetails> orderExecutorsByProximityNeeds(
-        TopologyDetails td, Collection<ExecutorDetails> unassignedExecutors) {
-        Map<String, Component> componentMap = td.getComponents();
-        List<ExecutorDetails> execsScheduled = new LinkedList<>();
-
-        Map<String, Queue<ExecutorDetails>> compToExecsToSchedule = new HashMap<>();
-        for (Component component : componentMap.values()) {
-            compToExecsToSchedule.put(component.getId(), new LinkedList<>());
-            for (ExecutorDetails exec : component.getExecs()) {
-                if (unassignedExecutors.contains(exec)) {
-                    compToExecsToSchedule.get(component.getId()).add(exec);
-                }
-            }
-        }
-
-        List<Component> sortedComponents = topologicalSortComponents(componentMap);
-
-        for (Component currComp: sortedComponents) {
-            int numExecs = compToExecsToSchedule.get(currComp.getId()).size();
-            for (int i = 0; i < numExecs; i++) {
-                execsScheduled.addAll(takeExecutors(currComp, componentMap, compToExecsToSchedule));
-            }
+    private SchedulingSearcherState createSearcherState() {
+        Map<WorkerSlot, Map<String, Integer>> workerCompCnts = new HashMap<>();
+        Map<RasNode, Map<String, Integer>> nodeCompCnts = new HashMap<>();
+
+        //populate with existing assignments
+        SchedulerAssignment existingAssignment = cluster.getAssignmentById(topologyDetails.getId());
+        if (existingAssignment != null) {
+            existingAssignment.getExecutorToSlot().forEach((exec, ws) -> {
+                String compId = execToComp.get(exec);
+                RasNode node = nodes.getNodeById(ws.getNodeId());
+                Map<String, Integer> compCnts = nodeCompCnts.computeIfAbsent(node, (k) -> new HashMap<>());
+                compCnts.put(compId, compCnts.getOrDefault(compId, 0) + 1); // increment
+                //populate worker to comp assignments
+                compCnts = workerCompCnts.computeIfAbsent(ws, (k) -> new HashMap<>());
+                compCnts.put(compId, compCnts.getOrDefault(compId, 0) + 1); // increment
+            });
         }
 
-        return execsScheduled;
+        return new SchedulingSearcherState(workerCompCnts, nodeCompCnts,
+                maxStateSearch, maxSchedulingTimeMs, new ArrayList<>(unassignedExecutors), topologyDetails, execToComp);
     }
 
     /**
-     * Sort components topologically.
-     * @param componentMap The map of component Id to Component Object.
-     * @return The sorted components
+     * Check scheduling feasibility for a quick failure as the second step in {@link #schedule(Cluster, TopologyDetails)}.
+     * If scheduling is not possible, then return a SchedulingStatus object with a failure status.
+     * If fully scheduled then return a successful SchedulingStatus.
+     * This method can be extended by subclasses {@link ConstraintSolverStrategy#checkSchedulingFeasibility()}
+     * to check for additional failure conditions.
+     *
+     * @return A non-null {@link SchedulingResult} to terminate scheduling, otherwise return null to continue scheduling.
      */
-    private List<Component> topologicalSortComponents(final Map<String, Component> componentMap) {
-        List<Component> sortedComponents = new ArrayList<>();
-        boolean[] visited = new boolean[componentMap.size()];
-        int[] inDegree = new int[componentMap.size()];
-        List<String> componentIds = new ArrayList<>(componentMap.keySet());
-        Map<String, Integer> compIdToIndex = new HashMap<>();
-        for (int i = 0; i < componentIds.size(); i++) {
-            compIdToIndex.put(componentIds.get(i), i);
-        }
-        //initialize the in-degree array
-        for (int i = 0; i < inDegree.length; i++) {
-            String compId = componentIds.get(i);
-            Component comp = componentMap.get(compId);
-            for (String childId : comp.getChildren()) {
-                inDegree[compIdToIndex.get(childId)] += 1;
-            }
-        }
-        //sorting components topologically
-        for (int t = 0; t < inDegree.length; t++) {
-            for (int i = 0; i < inDegree.length; i++) {
-                if (inDegree[i] == 0 && !visited[i]) {
-                    String compId = componentIds.get(i);
-                    Component comp = componentMap.get(compId);
-                    sortedComponents.add(comp);
-                    visited[i] = true;
-                    for (String childId : comp.getChildren()) {
-                        inDegree[compIdToIndex.get(childId)]--;
-                    }
-                    break;
-                }
-            }
+    protected SchedulingResult checkSchedulingFeasibility() {
+        if (unassignedExecutors.isEmpty()) {
+            return SchedulingResult.success("Fully Scheduled by " + this.getClass().getSimpleName());
         }
-        return sortedComponents;
-    }
 
-    /**
-     * Take unscheduled executors from current and all its downstream components in a particular order.
-     * First, take one executor from the current component;
-     * then for every child (direct downstream component) of this component,
-     *     if it's shuffle grouping from the current component to this child,
-     *         the number of executors to take from this child is the max of
-     *         1 and (the number of unscheduled executors this child has / the number of unscheduled executors the current component has);
-     *     otherwise, the number of executors to take is 1;
-     *     for every executor to take from this child, call takeExecutors(...).
-     * @param currComp The current component.
-     * @param componentMap The map from component Id to component object.
-     * @param compToExecsToSchedule The map from component Id to unscheduled executors.
-     * @return The executors to schedule in order.
-     */
-    private List<ExecutorDetails> takeExecutors(Component currComp,
-                                                final Map<String, Component> componentMap,
-                                                final Map<String, Queue<ExecutorDetails>> compToExecsToSchedule) {
-        List<ExecutorDetails> execsScheduled = new ArrayList<>();
-        Queue<ExecutorDetails> currQueue = compToExecsToSchedule.get(currComp.getId());
-        int currUnscheduledNumExecs = currQueue.size();
-        //Just for defensive programming as this won't actually happen.
-        if (currUnscheduledNumExecs == 0) {
-            return execsScheduled;
-        }
-        execsScheduled.add(currQueue.poll());
-        Set<String> sortedChildren = getSortedChildren(currComp, componentMap);
-        for (String childId: sortedChildren) {
-            Component childComponent = componentMap.get(childId);
-            Queue<ExecutorDetails> childQueue = compToExecsToSchedule.get(childId);
-            int childUnscheduledNumExecs = childQueue.size();
-            if (childUnscheduledNumExecs == 0) {
-                continue;
-            }
-            int numExecsToTake = 1;
-            if (hasShuffleGroupingFromParentToChild(currComp, childComponent)) {
-                // if it's shuffle grouping, truncate
-                numExecsToTake = Math.max(1, childUnscheduledNumExecs / currUnscheduledNumExecs);
-            } // otherwise, one-by-one
-            for (int i = 0; i < numExecsToTake; i++) {
-                execsScheduled.addAll(takeExecutors(childComponent, componentMap, compToExecsToSchedule));
-            }
+        String err;
+        if (nodes.getNodes().size() <= 0) {
+            err = "No available nodes to schedule tasks on!";
+            LOG.warn("Topology {}:{}", topoName, err);
+            return SchedulingResult.failure(SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES, err);
         }
-        return execsScheduled;
-    }
 
-    private Set<String> getSortedChildren(Component component, final Map<String, Component> componentMap) {
-        Set<String> children = component.getChildren();
-        Set<String> sortedChildren =
-            new TreeSet<>((o1, o2) -> {
-                Component child1 = componentMap.get(o1);
-                Component child2 = componentMap.get(o2);
-                boolean child1IsShuffle = hasShuffleGroupingFromParentToChild(component, child1);
-                boolean child2IsShuffle = hasShuffleGroupingFromParentToChild(component, child2);
-                if (child1IsShuffle && child2IsShuffle) {
-                    return o1.compareTo(o2);
-                } else if (child1IsShuffle) {
-                    return 1;
-                } else {
-                    return -1;
-                }
-            });
-        sortedChildren.addAll(children);
-        return sortedChildren;
-    }
+        if (!topologyDetails.hasSpouts()) {
+            err = "Cannot find a Spout!";
+            LOG.error("Topology {}:{}", topoName, err);
+            return SchedulingResult.failure(SchedulingStatus.FAIL_INVALID_TOPOLOGY, err);
+        }
 
-    private boolean hasShuffleGroupingFromParentToChild(Component parent, Component child) {
-        for (Map.Entry<GlobalStreamId, Grouping> inputEntry: child.getInputs().entrySet()) {
-            GlobalStreamId globalStreamId = inputEntry.getKey();
-            Grouping grouping = inputEntry.getValue();
-            if (globalStreamId.get_componentId().equals(parent.getId())
-                && (inputEntry.getValue().is_set_local_or_shuffle() || grouping.is_set_shuffle())) {
-                return true;
-            }
+        int execCnt = unassignedExecutors.size();
+        if (execCnt >= maxStateSearch) {
+            err = String.format("Unassignerd Executor count (%d) is greater than searchable state count %d", execCnt, maxStateSearch);
+            LOG.error("Topology {}:{}", topoName, err);
+            return SchedulingResult.failure(SchedulingStatus.FAIL_OTHER, err);
         }
-        return false;
+
+        return null;
     }
 
     /**
-     * Get a list of all the spouts in the topology.
+     * Check if the assignment of the executor to the worker is valid. In simple cases,
+     * this is simply a check of {@link RasNode#wouldFit(WorkerSlot, ExecutorDetails, TopologyDetails)}.
+     * This method may be extended by subclasses to add additional checks,
+     * see {@link ConstraintSolverStrategy#isExecAssignmentToWorkerValid(ExecutorDetails, WorkerSlot)}.
      *
-     * @param td topology to get spouts from
-     * @return a list of spouts
+     * @param exec being scheduled.
+     * @param worker on which to schedule.
+     * @return true if executor can be assigned to the worker, false otherwise.
      */
-    protected List<Component> getSpouts(TopologyDetails td) {
-        List<Component> spouts = new ArrayList<>();
-
-        for (Component c : td.getComponents().values()) {
-            if (c.getType() == ComponentType.SPOUT) {
-                spouts.add(c);
-            }
+    protected boolean isExecAssignmentToWorkerValid(ExecutorDetails exec, WorkerSlot worker) {
+        //check resources
+        RasNode node = nodes.getNodeById(worker.getNodeId());
+        if (!node.wouldFit(worker, exec, topologyDetails)) {
+            LOG.trace("Topology {}, executor {} would not fit in resources available on worker {}", topoName, exec, worker);
+            return false;
         }
-        return spouts;
+        return true;
     }
 
+    /**
+     * If this config is set to true, unassigned executors will be sorted by topological order with network proximity needs.
+     * @deprecated Use {@link Config#TOPOLOGY_RAS_ORDER_EXECUTORS_BY_PROXIMITY_NEEDS} instead.
+     */
+    @Deprecated
+    public static final String EXPERIMENTAL_TOPOLOGY_RAS_ORDER_EXECUTORS_BY_PROXIMITY_NEEDS

Review comment:
       This should be removed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org