You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by revans2 <gi...@git.apache.org> on 2017/11/30 19:29:42 UTC

[GitHub] storm pull request #2442: STORM-2837: ConstraintSolverStrategy

GitHub user revans2 opened a pull request:

    https://github.com/apache/storm/pull/2442

    STORM-2837: ConstraintSolverStrategy

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/revans2/incubator-storm STORM-2837

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/2442.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2442
    
----
commit fb4ba2b424d1ae8b0447713ac907812b7c4d2b8d
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Date:   2017-11-30T16:04:46Z

    STORM-2837: ConstraintSolverStrategy

----


---

[GitHub] storm issue #2442: STORM-2837: ConstraintSolverStrategy

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/2442
  
    I have been feeling that many schedulers co-exist today, and some schedulers occupy config keys which don't contain scheduler name or so to determine the usage, and so as newly added configurations.
    
    Given that there're lots of efforts on RAS, I would like to see which schedulers are also covered via RAS, and if it covers most of schedulers existing today, I think we can change the default scheduler to RAS.
    
    If we don't want to make RAS default yet, the configuration keys should contain the mark for determining RAS, or at least documented in javadoc.


---

[GitHub] storm issue #2442: STORM-2837: ConstraintSolverStrategy

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:

    https://github.com/apache/storm/pull/2442
  
    @jerrypeng I reverted the one rename you asked about and rebased to fix a merge conflict.  I also squashed because I thought we were really close on this.
    
    I don't understand your comment about using heap space instead of stack space with the -Xss setting.  We don't have Xss anywhere in the code base.  If you could point out the code that has issues I am happy to look at it.
    
    As for Configs.java we already support that.  We use a service loader for org.apache.storm.validation.Validated to enable config validation. That is how we made org.apache.storm.DaemonConfig and org.apache.storm.hdfs.spout.Configs work.  If you really want me to try and clean things up for 2.x I can move some of the configs into separate classes, but that might be better as a separate jira.


---

[GitHub] storm pull request #2442: STORM-2837: ConstraintSolverStrategy

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2442#discussion_r154193748
  
    --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java ---
    @@ -0,0 +1,623 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.scheduler.resource.strategies.scheduling;
    +
    +import com.google.common.annotations.VisibleForTesting;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Comparator;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.LinkedHashMap;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.NavigableMap;
    +import java.util.Set;
    +import java.util.Stack;
    +import java.util.TreeMap;
    +import java.util.TreeSet;
    +import org.apache.storm.Config;
    +import org.apache.storm.scheduler.Cluster;
    +import org.apache.storm.scheduler.ExecutorDetails;
    +import org.apache.storm.scheduler.SchedulerAssignment;
    +import org.apache.storm.scheduler.TopologyDetails;
    +import org.apache.storm.scheduler.WorkerSlot;
    +import org.apache.storm.scheduler.resource.RAS_Node;
    +import org.apache.storm.scheduler.resource.RAS_Nodes;
    +import org.apache.storm.scheduler.resource.SchedulingResult;
    +import org.apache.storm.scheduler.resource.SchedulingStatus;
    +import org.apache.storm.utils.ObjectReader;
    +import org.apache.storm.utils.Time;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +public class ConstraintSolverStrategy extends BaseResourceAwareStrategy {
    +    private static final Logger LOG = LoggerFactory.getLogger(ConstraintSolverStrategy.class);
    +
    +    protected static class SolverResult {
    +        private final int statesSearched;
    +        private final boolean success;
    +        private final long timeTakenMillis;
    +        private final int backtracked;
    +
    +        public SolverResult(SearcherState state, boolean success) {
    +            this.statesSearched = state.getStatesSearched();
    +            this.success = success;
    +            timeTakenMillis = Time.currentTimeMillis() - state.startTimeMillis;
    +            backtracked = state.numBacktrack;
    +        }
    +
    +        public SchedulingResult asSchedulingResult() {
    +            if (success) {
    +                return SchedulingResult.success("Fully Scheduled by ConstraintSolverStrategy (" + statesSearched
    +                    + " states traversed in " + timeTakenMillis + "ms, backtracked " + backtracked + " times)");
    +            }
    +            return SchedulingResult.failure(SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES,
    +                "Cannot find scheduling that satisfies all constraints (" + statesSearched
    +                    + " states traversed in " + timeTakenMillis + "ms, backtracked " + backtracked + " times)");
    +        }
    +    }
    +
    +    protected static class SearcherState {
    +        // Metrics
    +        // How many states searched so far.
    +        private int statesSearched = 0;
    +        // Number of times we had to backtrack.
    +        private int numBacktrack = 0;
    +        final long startTimeMillis;
    +        private final long maxEndTimeMs;
    +
    +        // Current state
    +        // The current executor we are trying to schedule
    +        private int execIndex = 0;
    +        // A map of the worker to the components in the worker to be able to enforce constraints.
    +        private final Map<WorkerSlot, Set<String>> workerCompAssignment;
    +        private final boolean[] okToRemoveFromWorker;
    +        // for the currently tested assignment a Map of the node to the components on it to be able to enforce constraints
    +        private final Map<RAS_Node, Set<String>> nodeCompAssignment;
    +        private final boolean[] okToRemoveFromNode;
    +
    +        // Static State
    +        // The list of all executors (preferably sorted to make assignments simpler).
    +        private final List<ExecutorDetails> execs;
    +        //The maximum number of state to search before stopping.
    +        private final int maxStatesSearched;
    +        //The topology we are scheduling
    +        private final TopologyDetails td;
    +
    +        private SearcherState(Map<WorkerSlot, Set<String>> workerCompAssignment, Map<RAS_Node, Set<String>> nodeCompAssignment,
    +                              int maxStatesSearched, long maxTimeMs, List<ExecutorDetails> execs, TopologyDetails td) {
    +            assert !execs.isEmpty();
    +            assert execs != null;
    +
    +            this.workerCompAssignment = workerCompAssignment;
    +            this.nodeCompAssignment = nodeCompAssignment;
    +            this.maxStatesSearched = maxStatesSearched;
    +            this.execs = execs;
    +            okToRemoveFromWorker = new boolean[execs.size()];
    +            okToRemoveFromNode = new boolean[execs.size()];
    +            this.td = td;
    +            startTimeMillis = Time.currentTimeMillis();
    +            if (maxTimeMs <= 0) {
    +                maxEndTimeMs = Long.MAX_VALUE;
    +            } else {
    +                maxEndTimeMs = startTimeMillis + maxTimeMs;
    +            }
    +        }
    +
    +        public void incStatesSearched() {
    +            statesSearched++;
    +            if (LOG.isDebugEnabled() && statesSearched % 1_000 == 0) {
    +                LOG.debug("States Searched: {}", statesSearched);
    +                LOG.debug("backtrack: {}", numBacktrack);
    +            }
    +        }
    +
    +        public int getStatesSearched() {
    +            return statesSearched;
    +        }
    +
    +        public boolean areSearchLimitsExceeded() {
    +            return statesSearched > maxStatesSearched || Time.currentTimeMillis() > maxEndTimeMs;
    +        }
    +
    +        public SearcherState nextExecutor() {
    +            execIndex++;
    +            if (execIndex >= execs.size()) {
    +                throw new IllegalStateException("Exceeded the exec limit " + execIndex + " >= " + execs.size());
    +            }
    +            return this;
    +        }
    +
    +        public boolean areAllExecsScheduled() {
    +            return execIndex == execs.size() - 1;
    +        }
    +
    +        public ExecutorDetails currentExec() {
    +            return execs.get(execIndex);
    +        }
    +
    +        public void tryToSchedule(String comp, RAS_Node node, WorkerSlot workerSlot) {
    +            ExecutorDetails exec = currentExec();
    +            LOG.trace("Trying assignment of {} {} to {}", exec, comp, workerSlot);
    +            //It is possible that this component is already scheduled on this node or worker.  If so when we backtrack we cannot remove it
    +            okToRemoveFromWorker[execIndex] = workerCompAssignment.computeIfAbsent(workerSlot, (k) -> new HashSet<>()).add(comp);
    +            okToRemoveFromNode[execIndex] = nodeCompAssignment.get(node).add(comp);
    +            node.assignSingleExecutor(workerSlot, exec, td);
    +        }
    +
    +        public void backtrack(String comp, RAS_Node node, WorkerSlot workerSlot) {
    +            execIndex--;
    +            if (execIndex < 0) {
    +                throw new IllegalStateException("Internal Error exec index became negative");
    +            }
    +            numBacktrack++;
    +            ExecutorDetails exec = currentExec();
    +            LOG.trace("Backtracking {} {} from {}", exec, comp, workerSlot);
    +            if (okToRemoveFromWorker[execIndex]) {
    +                workerCompAssignment.get(workerSlot).remove(comp);
    +            }
    +            if (okToRemoveFromNode[execIndex]) {
    +                nodeCompAssignment.get(node).remove(comp);
    +            }
    +            node.freeSingleExecutor(exec, td);
    +        }
    +    }
    +
    +    private Map<String, RAS_Node> nodes;
    +    private Map<ExecutorDetails, String> execToComp;
    +    private Map<String, Set<ExecutorDetails>> compToExecs;
    +    private List<String> favoredNodes;
    +    private List<String> unFavoredNodes;
    +
    +    //constraints and spreads
    +    private Map<String, Map<String, Integer>> constraintMatrix;
    +    private HashSet<String> spreadComps = new HashSet<>();
    +
    +    //hard coded max number of states to search
    +    public static final int MAX_STATE_SEARCH = 100_000;
    +
    +    @Override
    +    public SchedulingResult schedule(Cluster cluster, TopologyDetails td) {
    +        prepare(cluster);
    +        LOG.debug("Scheduling {}", td.getId());
    +        nodes = RAS_Nodes.getAllNodesFrom(cluster);
    +        Map<WorkerSlot, Set<String>> workerCompAssignment = new HashMap<>();
    +        Map<RAS_Node, Set<String>> nodeCompAssignment = new HashMap<>();
    +        //set max number of states to search
    +        final int maxStateSearch = Math.min(MAX_STATE_SEARCH,
    +            ObjectReader.getInt(td.getConf().get(Config.TOPOLOGY_CONSTRAINTS_MAX_DEPTH_TRAVERSAL)));
    --- End diff --
    
    We probably want a default here instead of throwing an exception....


---

[GitHub] storm pull request #2442: STORM-2837: ConstraintSolverStrategy

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2442#discussion_r157803142
  
    --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/multitenant/NodePool.java ---
    @@ -98,7 +98,7 @@ public RoundRobinSlotScheduler(TopologyDetails td, int slotsToUse,
           }
           
           _spreadToSchedule = new HashMap<>();
    -      List<String> spreadComps = (List<String>)td.getConf().get(Config.TOPOLOGY_SPREAD_COMPONENTS);
    +      List<String> spreadComps = (List<String>)td.getConf().get(Config.TOPOLOGY_RAS_CONSTRAINT_SPREAD_COMPONENTS);
    --- End diff --
    
    good point I forgot that I moved that in config instead of adding it.


---

[GitHub] storm pull request #2442: STORM-2837: ConstraintSolverStrategy

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2442#discussion_r154194548
  
    --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java ---
    @@ -0,0 +1,623 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.scheduler.resource.strategies.scheduling;
    +
    +import com.google.common.annotations.VisibleForTesting;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Comparator;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.LinkedHashMap;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.NavigableMap;
    +import java.util.Set;
    +import java.util.Stack;
    +import java.util.TreeMap;
    +import java.util.TreeSet;
    +import org.apache.storm.Config;
    +import org.apache.storm.scheduler.Cluster;
    +import org.apache.storm.scheduler.ExecutorDetails;
    +import org.apache.storm.scheduler.SchedulerAssignment;
    +import org.apache.storm.scheduler.TopologyDetails;
    +import org.apache.storm.scheduler.WorkerSlot;
    +import org.apache.storm.scheduler.resource.RAS_Node;
    +import org.apache.storm.scheduler.resource.RAS_Nodes;
    +import org.apache.storm.scheduler.resource.SchedulingResult;
    +import org.apache.storm.scheduler.resource.SchedulingStatus;
    +import org.apache.storm.utils.ObjectReader;
    +import org.apache.storm.utils.Time;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +public class ConstraintSolverStrategy extends BaseResourceAwareStrategy {
    +    private static final Logger LOG = LoggerFactory.getLogger(ConstraintSolverStrategy.class);
    +
    +    protected static class SolverResult {
    +        private final int statesSearched;
    +        private final boolean success;
    +        private final long timeTakenMillis;
    +        private final int backtracked;
    +
    +        public SolverResult(SearcherState state, boolean success) {
    +            this.statesSearched = state.getStatesSearched();
    +            this.success = success;
    +            timeTakenMillis = Time.currentTimeMillis() - state.startTimeMillis;
    +            backtracked = state.numBacktrack;
    +        }
    +
    +        public SchedulingResult asSchedulingResult() {
    +            if (success) {
    +                return SchedulingResult.success("Fully Scheduled by ConstraintSolverStrategy (" + statesSearched
    +                    + " states traversed in " + timeTakenMillis + "ms, backtracked " + backtracked + " times)");
    +            }
    +            return SchedulingResult.failure(SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES,
    +                "Cannot find scheduling that satisfies all constraints (" + statesSearched
    +                    + " states traversed in " + timeTakenMillis + "ms, backtracked " + backtracked + " times)");
    +        }
    +    }
    +
    +    protected static class SearcherState {
    +        // Metrics
    +        // How many states searched so far.
    +        private int statesSearched = 0;
    +        // Number of times we had to backtrack.
    +        private int numBacktrack = 0;
    +        final long startTimeMillis;
    +        private final long maxEndTimeMs;
    +
    +        // Current state
    +        // The current executor we are trying to schedule
    +        private int execIndex = 0;
    +        // A map of the worker to the components in the worker to be able to enforce constraints.
    +        private final Map<WorkerSlot, Set<String>> workerCompAssignment;
    +        private final boolean[] okToRemoveFromWorker;
    +        // for the currently tested assignment a Map of the node to the components on it to be able to enforce constraints
    +        private final Map<RAS_Node, Set<String>> nodeCompAssignment;
    +        private final boolean[] okToRemoveFromNode;
    +
    +        // Static State
    +        // The list of all executors (preferably sorted to make assignments simpler).
    +        private final List<ExecutorDetails> execs;
    +        //The maximum number of state to search before stopping.
    +        private final int maxStatesSearched;
    +        //The topology we are scheduling
    +        private final TopologyDetails td;
    +
    +        private SearcherState(Map<WorkerSlot, Set<String>> workerCompAssignment, Map<RAS_Node, Set<String>> nodeCompAssignment,
    +                              int maxStatesSearched, long maxTimeMs, List<ExecutorDetails> execs, TopologyDetails td) {
    +            assert !execs.isEmpty();
    +            assert execs != null;
    +
    +            this.workerCompAssignment = workerCompAssignment;
    +            this.nodeCompAssignment = nodeCompAssignment;
    +            this.maxStatesSearched = maxStatesSearched;
    +            this.execs = execs;
    +            okToRemoveFromWorker = new boolean[execs.size()];
    +            okToRemoveFromNode = new boolean[execs.size()];
    +            this.td = td;
    +            startTimeMillis = Time.currentTimeMillis();
    +            if (maxTimeMs <= 0) {
    +                maxEndTimeMs = Long.MAX_VALUE;
    +            } else {
    +                maxEndTimeMs = startTimeMillis + maxTimeMs;
    +            }
    +        }
    +
    +        public void incStatesSearched() {
    +            statesSearched++;
    +            if (LOG.isDebugEnabled() && statesSearched % 1_000 == 0) {
    +                LOG.debug("States Searched: {}", statesSearched);
    +                LOG.debug("backtrack: {}", numBacktrack);
    +            }
    +        }
    +
    +        public int getStatesSearched() {
    +            return statesSearched;
    +        }
    +
    +        public boolean areSearchLimitsExceeded() {
    +            return statesSearched > maxStatesSearched || Time.currentTimeMillis() > maxEndTimeMs;
    +        }
    +
    +        public SearcherState nextExecutor() {
    +            execIndex++;
    +            if (execIndex >= execs.size()) {
    +                throw new IllegalStateException("Exceeded the exec limit " + execIndex + " >= " + execs.size());
    +            }
    +            return this;
    +        }
    +
    +        public boolean areAllExecsScheduled() {
    +            return execIndex == execs.size() - 1;
    +        }
    +
    +        public ExecutorDetails currentExec() {
    +            return execs.get(execIndex);
    +        }
    +
    +        public void tryToSchedule(String comp, RAS_Node node, WorkerSlot workerSlot) {
    --- End diff --
    
    Instead of passing in the comp we may want to pass in the mapping and get it ourselves.  Just because I would want to be sure that the state matches correctly.


---

[GitHub] storm issue #2442: STORM-2837: ConstraintSolverStrategy

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:

    https://github.com/apache/storm/pull/2442
  
    @jerrypeng TOPOLOGY_RAS_CONSTRAINT_MAX_STATE_SEARCH is not an effective limit on the max stack depth.  The max stack depth is the number of bolt and spout instances in a topology.  If we want to limit the depth then we need to have a separate config that limits the maximum number of components that can be scheduled, limiting the number of states searched is more around not spending too much time/CPU on something that is not likely to happen.
    
    I agree that having an iterative algorithm would be best, but the search is not tail recursive, so making the switch was not that simple and I thought it would be better to just get something out.  I did place some of the stack variables in SearcherState instead of on the call stack.  I could move more there, but the code would be rather ugly and not use as much of the nice java syntactic sugar.  We have been running with something like this in production for a while and have not had to set the stack size at all to make this work.


---

[GitHub] storm pull request #2442: STORM-2837: ConstraintSolverStrategy

Posted by jerrypeng <gi...@git.apache.org>.
Github user jerrypeng commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2442#discussion_r157679506
  
    --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java ---
    @@ -0,0 +1,623 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.scheduler.resource.strategies.scheduling;
    +
    +import com.google.common.annotations.VisibleForTesting;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Comparator;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.LinkedHashMap;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.NavigableMap;
    +import java.util.Set;
    +import java.util.Stack;
    +import java.util.TreeMap;
    +import java.util.TreeSet;
    +import org.apache.storm.Config;
    +import org.apache.storm.scheduler.Cluster;
    +import org.apache.storm.scheduler.ExecutorDetails;
    +import org.apache.storm.scheduler.SchedulerAssignment;
    +import org.apache.storm.scheduler.TopologyDetails;
    +import org.apache.storm.scheduler.WorkerSlot;
    +import org.apache.storm.scheduler.resource.RAS_Node;
    +import org.apache.storm.scheduler.resource.RAS_Nodes;
    +import org.apache.storm.scheduler.resource.SchedulingResult;
    +import org.apache.storm.scheduler.resource.SchedulingStatus;
    +import org.apache.storm.utils.ObjectReader;
    +import org.apache.storm.utils.Time;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +public class ConstraintSolverStrategy extends BaseResourceAwareStrategy {
    +    private static final Logger LOG = LoggerFactory.getLogger(ConstraintSolverStrategy.class);
    +
    +    protected static class SolverResult {
    +        private final int statesSearched;
    +        private final boolean success;
    +        private final long timeTakenMillis;
    +        private final int backtracked;
    +
    +        public SolverResult(SearcherState state, boolean success) {
    +            this.statesSearched = state.getStatesSearched();
    +            this.success = success;
    +            timeTakenMillis = Time.currentTimeMillis() - state.startTimeMillis;
    +            backtracked = state.numBacktrack;
    +        }
    +
    +        public SchedulingResult asSchedulingResult() {
    +            if (success) {
    +                return SchedulingResult.success("Fully Scheduled by ConstraintSolverStrategy (" + statesSearched
    +                    + " states traversed in " + timeTakenMillis + "ms, backtracked " + backtracked + " times)");
    +            }
    +            return SchedulingResult.failure(SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES,
    +                "Cannot find scheduling that satisfies all constraints (" + statesSearched
    +                    + " states traversed in " + timeTakenMillis + "ms, backtracked " + backtracked + " times)");
    +        }
    +    }
    +
    +    protected static class SearcherState {
    +        // Metrics
    +        // How many states searched so far.
    +        private int statesSearched = 0;
    +        // Number of times we had to backtrack.
    +        private int numBacktrack = 0;
    +        final long startTimeMillis;
    +        private final long maxEndTimeMs;
    +
    +        // Current state
    +        // The current executor we are trying to schedule
    +        private int execIndex = 0;
    +        // A map of the worker to the components in the worker to be able to enforce constraints.
    +        private final Map<WorkerSlot, Set<String>> workerCompAssignment;
    +        private final boolean[] okToRemoveFromWorker;
    +        // for the currently tested assignment a Map of the node to the components on it to be able to enforce constraints
    +        private final Map<RAS_Node, Set<String>> nodeCompAssignment;
    +        private final boolean[] okToRemoveFromNode;
    +
    +        // Static State
    +        // The list of all executors (preferably sorted to make assignments simpler).
    +        private final List<ExecutorDetails> execs;
    +        //The maximum number of state to search before stopping.
    +        private final int maxStatesSearched;
    +        //The topology we are scheduling
    +        private final TopologyDetails td;
    +
    +        private SearcherState(Map<WorkerSlot, Set<String>> workerCompAssignment, Map<RAS_Node, Set<String>> nodeCompAssignment,
    +                              int maxStatesSearched, long maxTimeMs, List<ExecutorDetails> execs, TopologyDetails td) {
    +            assert !execs.isEmpty();
    +            assert execs != null;
    +
    +            this.workerCompAssignment = workerCompAssignment;
    +            this.nodeCompAssignment = nodeCompAssignment;
    +            this.maxStatesSearched = maxStatesSearched;
    +            this.execs = execs;
    +            okToRemoveFromWorker = new boolean[execs.size()];
    +            okToRemoveFromNode = new boolean[execs.size()];
    +            this.td = td;
    +            startTimeMillis = Time.currentTimeMillis();
    +            if (maxTimeMs <= 0) {
    +                maxEndTimeMs = Long.MAX_VALUE;
    +            } else {
    +                maxEndTimeMs = startTimeMillis + maxTimeMs;
    +            }
    +        }
    +
    +        public void incStatesSearched() {
    +            statesSearched++;
    +            if (LOG.isDebugEnabled() && statesSearched % 1_000 == 0) {
    +                LOG.debug("States Searched: {}", statesSearched);
    +                LOG.debug("backtrack: {}", numBacktrack);
    +            }
    +        }
    +
    +        public int getStatesSearched() {
    +            return statesSearched;
    +        }
    +
    +        public boolean areSearchLimitsExceeded() {
    +            return statesSearched > maxStatesSearched || Time.currentTimeMillis() > maxEndTimeMs;
    +        }
    +
    +        public SearcherState nextExecutor() {
    +            execIndex++;
    +            if (execIndex >= execs.size()) {
    +                throw new IllegalStateException("Exceeded the exec limit " + execIndex + " >= " + execs.size());
    +            }
    +            return this;
    +        }
    +
    +        public boolean areAllExecsScheduled() {
    +            return execIndex == execs.size() - 1;
    +        }
    +
    +        public ExecutorDetails currentExec() {
    +            return execs.get(execIndex);
    +        }
    +
    +        public void tryToSchedule(String comp, RAS_Node node, WorkerSlot workerSlot) {
    +            ExecutorDetails exec = currentExec();
    +            LOG.trace("Trying assignment of {} {} to {}", exec, comp, workerSlot);
    +            //It is possible that this component is already scheduled on this node or worker.  If so when we backtrack we cannot remove it
    +            okToRemoveFromWorker[execIndex] = workerCompAssignment.computeIfAbsent(workerSlot, (k) -> new HashSet<>()).add(comp);
    +            okToRemoveFromNode[execIndex] = nodeCompAssignment.get(node).add(comp);
    +            node.assignSingleExecutor(workerSlot, exec, td);
    +        }
    +
    +        public void backtrack(String comp, RAS_Node node, WorkerSlot workerSlot) {
    +            execIndex--;
    +            if (execIndex < 0) {
    +                throw new IllegalStateException("Internal Error exec index became negative");
    +            }
    +            numBacktrack++;
    +            ExecutorDetails exec = currentExec();
    +            LOG.trace("Backtracking {} {} from {}", exec, comp, workerSlot);
    +            if (okToRemoveFromWorker[execIndex]) {
    +                workerCompAssignment.get(workerSlot).remove(comp);
    +            }
    +            if (okToRemoveFromNode[execIndex]) {
    +                nodeCompAssignment.get(node).remove(comp);
    +            }
    +            node.freeSingleExecutor(exec, td);
    +        }
    +    }
    +
    +    private Map<String, RAS_Node> nodes;
    +    private Map<ExecutorDetails, String> execToComp;
    +    private Map<String, Set<ExecutorDetails>> compToExecs;
    +    private List<String> favoredNodes;
    +    private List<String> unFavoredNodes;
    +
    +    //constraints and spreads
    +    private Map<String, Map<String, Integer>> constraintMatrix;
    +    private HashSet<String> spreadComps = new HashSet<>();
    +
    +    //hard coded max number of states to search
    +    public static final int MAX_STATE_SEARCH = 100_000;
    +
    +    @Override
    +    public SchedulingResult schedule(Cluster cluster, TopologyDetails td) {
    +        prepare(cluster);
    +        LOG.debug("Scheduling {}", td.getId());
    +        nodes = RAS_Nodes.getAllNodesFrom(cluster);
    +        Map<WorkerSlot, Set<String>> workerCompAssignment = new HashMap<>();
    +        Map<RAS_Node, Set<String>> nodeCompAssignment = new HashMap<>();
    +        //set max number of states to search
    +        final int maxStateSearch = Math.min(MAX_STATE_SEARCH,
    +            ObjectReader.getInt(td.getConf().get(Config.TOPOLOGY_CONSTRAINTS_MAX_DEPTH_TRAVERSAL)));
    +
    +        final long maxTimeMs =
    +            ObjectReader.getInt(td.getConf().get(Config.TOPOLOGY_CONSTRAINTS_MAX_TIME_SECS), -1).intValue() * 1000L;
    +
    +        favoredNodes = (List<String>) td.getConf().get(Config.TOPOLOGY_SCHEDULER_FAVORED_NODES);
    +        unFavoredNodes = (List<String>) td.getConf().get(Config.TOPOLOGY_SCHEDULER_UNFAVORED_NODES);
    +
    +        //get mapping of execs to components
    +        execToComp = td.getExecutorToComponent();
    +        //get mapping of components to executors
    +        compToExecs = getCompToExecs(execToComp);
    +
    +        //get topology constraints
    +        constraintMatrix = getConstraintMap(td, compToExecs.keySet());
    +
    +        //get spread components
    +        spreadComps = getSpreadComps(td);
    +
    +        ArrayList<ExecutorDetails> sortedExecs = new ArrayList<>();
    +        //get a sorted list of unassigned executors based on number of constraints
    +        Set<ExecutorDetails> unassignedExecutors = new HashSet<>(cluster.getUnassignedExecutors(td));
    +        for (ExecutorDetails exec1 : getSortedExecs(spreadComps, constraintMatrix, compToExecs)) {
    +            if (unassignedExecutors.contains(exec1)) {
    +                sortedExecs.add(exec1);
    +            }
    +        }
    +
    +        //initialize structures
    +        for (RAS_Node node : nodes.values()) {
    +            nodeCompAssignment.put(node, new HashSet<>());
    +        }
    +        //populate with existing assignments
    +        SchedulerAssignment existingAssignment = cluster.getAssignmentById(td.getId());
    +        if (existingAssignment != null) {
    +            for (Map.Entry<ExecutorDetails, WorkerSlot> entry1 : existingAssignment.getExecutorToSlot().entrySet()) {
    +                ExecutorDetails exec1 = entry1.getKey();
    +                String compId = execToComp.get(exec1);
    +                WorkerSlot ws = entry1.getValue();
    +                RAS_Node node = nodes.get(ws.getNodeId());
    +                //populate node to component Assignments
    +                nodeCompAssignment.get(node).add(compId);
    +                //populate worker to comp assignments
    +                workerCompAssignment.computeIfAbsent(ws, (k) -> new HashSet<>()).add(compId);
    +            }
    +        }
    +
    +        //early detection/early fail
    +        if (!checkSchedulingFeasibility()) {
    +            //Scheduling Status set to FAIL_OTHER so no eviction policy will be attempted to make space for this topology
    +            return SchedulingResult.failure(SchedulingStatus.FAIL_OTHER, "Scheduling not feasible!");
    +        }
    +        return backtrackSearch(new SearcherState(workerCompAssignment, nodeCompAssignment, maxStateSearch, maxTimeMs, sortedExecs, td))
    +            .asSchedulingResult();
    +    }
    +
    +    private boolean checkSchedulingFeasibility() {
    +        for (String comp : spreadComps) {
    +            int numExecs = compToExecs.get(comp).size();
    +            if (numExecs > nodes.size()) {
    +                LOG.error("Unsatisfiable constraint: Component: {} marked as spread has {} executors which is larger "
    +                    + "than number of nodes: {}", comp, numExecs, nodes.size());
    +                return false;
    +            }
    +        }
    +        if (execToComp.size() >= MAX_STATE_SEARCH) {
    +            LOG.error("Number of executors is greater than the maximum number of states allowed to be searched.  "
    +                + "# of executors: {} Max states to search: {}", execToComp.size(), MAX_STATE_SEARCH);
    +            return false;
    +        }
    +        return true;
    +    }
    +    
    +    @Override
    +    protected TreeSet<ObjectResources> sortObjectResources(
    +        final AllResources allResources, ExecutorDetails exec, TopologyDetails topologyDetails,
    +        final ExistingScheduleFunc existingScheduleFunc) {
    +        return GenericResourceAwareStrategy.sortObjectResourcesImpl(allResources, exec, topologyDetails, existingScheduleFunc);
    +    }
    +
    +    // Backtracking algorithm does not take into account the ordering of executors in worker to reduce traversal space
    +    @VisibleForTesting
    +    protected SolverResult backtrackSearch(SearcherState state) {
    +        state.incStatesSearched();
    +        if (state.areSearchLimitsExceeded()) {
    +            LOG.warn("Limits Exceeded");
    +            return new SolverResult(state, false);
    +        }
    +
    +        ExecutorDetails exec = state.currentExec();
    +        List<ObjectResources> sortedNodes = sortAllNodes(state.td, exec, favoredNodes, unFavoredNodes);
    +
    +        for (ObjectResources nodeResources: sortedNodes) {
    +            RAS_Node node = nodes.get(nodeResources.id);
    +            for (WorkerSlot workerSlot : node.getSlotsAvailbleTo(state.td)) {
    +                if (isExecAssignmentToWorkerValid(workerSlot, state)) {
    +                    String comp = execToComp.get(exec);
    +
    +                    state.tryToSchedule(comp, node, workerSlot);
    +
    +                    if (state.areAllExecsScheduled()) {
    +                        //Everything is scheduled correctly, so no need to search any more.
    +                        return new SolverResult(state, true);
    +                    }
    +
    +                    SolverResult results = backtrackSearch(state.nextExecutor());
    +                    if (results.success) {
    +                        //We found a good result we are done.
    +                        return results;
    +                    }
    +
    +                    if (state.areSearchLimitsExceeded()) {
    +                        //No need to search more it is not going to help.
    +                        return new SolverResult(state, false);
    +                    }
    +
    +                    //backtracking (If we ever get here there really isn't a lot of hope that we will find a scheduling)
    +                    state.backtrack(comp, node, workerSlot);
    +                }
    +            }
    +        }
    +        //Tried all of the slots and none of them worked.
    +        return new SolverResult(state, false);
    +    }
    +
    +    /**
    +     * Check if any constraints are violated if exec is scheduled on worker.
    +     * @return true if scheduling exec on worker does not violate any constraints, returns false if it does
    +     */
    +    public boolean isExecAssignmentToWorkerValid(WorkerSlot worker, SearcherState state) {
    +        final ExecutorDetails exec = state.currentExec();
    +        //check resources
    +        RAS_Node node = nodes.get(worker.getNodeId());
    +        if (!node.wouldFit(worker, exec, state.td)) {
    +            LOG.trace("{} would not fit in resources available on {}", exec, worker);
    +            return false;
    +        }
    +
    +        //check if exec can be on worker based on user defined component exclusions
    +        String execComp = execToComp.get(exec);
    +        Set<String> components = state.workerCompAssignment.get(worker);
    +        if (components != null) {
    +            for (String comp : components) {
    +                if (constraintMatrix.get(execComp).get(comp) != 0) {
    +                    LOG.trace("{} found {} constraint violation {} on {}", exec, execComp, comp, worker);
    +                    return false;
    +                }
    +            }
    +        }
    +
    +        //check if exec satisfy spread
    +        if (spreadComps.contains(execComp)) {
    +            if (state.nodeCompAssignment.get(node).contains(execComp)) {
    +                LOG.trace("{} Found spread violation {} on node {}", exec, execComp, node.getId());
    +                return false;
    +            }
    +        }
    +        return true;
    +    }
    +
    +    static Map<String, Map<String, Integer>> getConstraintMap(TopologyDetails topo, Set<String> comps) {
    +        Map<String, Map<String, Integer>> matrix = new HashMap<>();
    +        for (String comp : comps) {
    +            matrix.put(comp, new HashMap<>());
    +            for (String comp2 : comps) {
    +                matrix.get(comp).put(comp2, 0);
    +            }
    +        }
    +        List<List<String>> constraints = (List<List<String>>) topo.getConf().get(Config.TOPOLOGY_CONSTRAINTS);
    +        if (constraints != null) {
    +            for (List<String> constraintPair : constraints) {
    +                String comp1 = constraintPair.get(0);
    +                String comp2 = constraintPair.get(1);
    +                if (!matrix.containsKey(comp1)) {
    +                    LOG.warn("Comp: {} declared in constraints is not valid!", comp1);
    +                    continue;
    +                }
    +                if (!matrix.containsKey(comp2)) {
    +                    LOG.warn("Comp: {} declared in constraints is not valid!", comp2);
    +                    continue;
    +                }
    +                matrix.get(comp1).put(comp2, 1);
    +                matrix.get(comp2).put(comp1, 1);
    +            }
    +        }
    +        return matrix;
    +    }
    +
    +    /**
    +     * Determines if a scheduling is valid and all constraints are satisfied.
    +     */
    +    @VisibleForTesting
    +    public static boolean validateSolution(Cluster cluster, TopologyDetails td) {
    +        return checkSpreadSchedulingValid(cluster, td)
    +            && checkConstraintsSatisfied(cluster, td)
    +            && checkResourcesCorrect(cluster, td);
    +    }
    +
    +    /**
    +     * Check if constraints are satisfied.
    +     */
    +    private static boolean checkConstraintsSatisfied(Cluster cluster, TopologyDetails topo) {
    +        LOG.info("Checking constraints...");
    +        Map<ExecutorDetails, WorkerSlot> result = cluster.getAssignmentById(topo.getId()).getExecutorToSlot();
    +        Map<ExecutorDetails, String> execToComp = topo.getExecutorToComponent();
    +        //get topology constraints
    +        Map<String, Map<String, Integer>> constraintMatrix = getConstraintMap(topo, new HashSet<>(topo.getExecutorToComponent().values()));
    +
    +        Map<WorkerSlot, List<String>> workerCompMap = new HashMap<>();
    +        for (Map.Entry<ExecutorDetails, WorkerSlot> entry : result.entrySet()) {
    +            WorkerSlot worker = entry.getValue();
    +            ExecutorDetails exec = entry.getKey();
    +            String comp = execToComp.get(exec);
    +            if (!workerCompMap.containsKey(worker)) {
    +                workerCompMap.put(worker, new LinkedList<>());
    +            }
    +            workerCompMap.get(worker).add(comp);
    +        }
    +        for (Map.Entry<WorkerSlot, List<String>> entry : workerCompMap.entrySet()) {
    +            List<String> comps = entry.getValue();
    +            for (int i = 0; i < comps.size(); i++) {
    +                for (int j = 0; j < comps.size(); j++) {
    +                    if (i != j && constraintMatrix.get(comps.get(i)).get(comps.get(j)) == 1) {
    +                        LOG.error("Incorrect Scheduling: worker exclusion for Component {} and {} not satisfied on WorkerSlot: {}",
    +                            comps.get(i), comps.get(j), entry.getKey());
    +                        return false;
    +                    }
    +                }
    +            }
    +        }
    +        return true;
    +    }
    +
    +    private static Map<WorkerSlot, RAS_Node> workerToNodes(Cluster cluster) {
    +        Map<WorkerSlot, RAS_Node> workerToNodes = new HashMap<>();
    +        for (RAS_Node node: RAS_Nodes.getAllNodesFrom(cluster).values()) {
    +            for (WorkerSlot s : node.getUsedSlots()) {
    +                workerToNodes.put(s, node);
    +            }
    +        }
    +        return workerToNodes;
    +    }
    +
    +    private static boolean checkSpreadSchedulingValid(Cluster cluster, TopologyDetails topo) {
    +        LOG.info("Checking for a valid scheduling...");
    +        Map<ExecutorDetails, WorkerSlot> result = cluster.getAssignmentById(topo.getId()).getExecutorToSlot();
    +        Map<ExecutorDetails, String> execToComp = topo.getExecutorToComponent();
    +        Map<WorkerSlot, HashSet<ExecutorDetails>> workerExecMap = new HashMap<>();
    +        Map<WorkerSlot, HashSet<String>> workerCompMap = new HashMap<>();
    +        Map<RAS_Node, HashSet<String>> nodeCompMap = new HashMap<>();
    +        Map<WorkerSlot, RAS_Node> workerToNodes = workerToNodes(cluster);
    +        boolean ret = true;
    +
    +        HashSet<String> spreadComps = getSpreadComps(topo);
    +        for (Map.Entry<ExecutorDetails, WorkerSlot> entry : result.entrySet()) {
    +            ExecutorDetails exec = entry.getKey();
    +            WorkerSlot worker = entry.getValue();
    +            RAS_Node node = workerToNodes.get(worker);
    +
    +            if (!workerExecMap.containsKey(worker)) {
    +                workerExecMap.put(worker, new HashSet<>());
    +                workerCompMap.put(worker, new HashSet<>());
    +            }
    +
    +            if (!nodeCompMap.containsKey(node)) {
    +                nodeCompMap.put(node, new HashSet<>());
    +            }
    +            if (workerExecMap.get(worker).contains(exec)) {
    +                LOG.error("Incorrect Scheduling: Found duplicate in scheduling");
    +                return false;
    +            }
    +            workerExecMap.get(worker).add(exec);
    +            String comp = execToComp.get(exec);
    +            workerCompMap.get(worker).add(comp);
    +            if (spreadComps.contains(comp)) {
    +                if (nodeCompMap.get(node).contains(comp)) {
    +                    LOG.error("Incorrect Scheduling: Spread for Component: {} {} on node {} not satisfied {}",
    +                        comp, exec, node.getId(), nodeCompMap.get(node));
    +                    ret = false;
    +                }
    +            }
    +            nodeCompMap.get(node).add(comp);
    +        }
    +        return ret;
    +    }
    +
    +    /**
    +     * Check if resource constraints satisfied.
    +     */
    +    private static boolean checkResourcesCorrect(Cluster cluster, TopologyDetails topo) {
    +        LOG.info("Checking Resources...");
    +        Map<ExecutorDetails, WorkerSlot> result = cluster.getAssignmentById(topo.getId()).getExecutorToSlot();
    +        Map<RAS_Node, Collection<ExecutorDetails>> nodeToExecs = new HashMap<>();
    +        Map<ExecutorDetails, WorkerSlot> mergedExecToWorker = new HashMap<>();
    +        Map<String, RAS_Node> nodes = RAS_Nodes.getAllNodesFrom(cluster);
    +        //merge with existing assignments
    +        if (cluster.getAssignmentById(topo.getId()) != null
    +                && cluster.getAssignmentById(topo.getId()).getExecutorToSlot() != null) {
    +            mergedExecToWorker.putAll(cluster.getAssignmentById(topo.getId()).getExecutorToSlot());
    +        }
    +        mergedExecToWorker.putAll(result);
    +
    +        for (Map.Entry<ExecutorDetails, WorkerSlot> entry : mergedExecToWorker.entrySet()) {
    +            ExecutorDetails exec = entry.getKey();
    +            WorkerSlot worker = entry.getValue();
    +            RAS_Node node = nodes.get(worker.getNodeId());
    +
    +            if (node.getAvailableMemoryResources() < 0.0 && node.getAvailableCpuResources() < 0.0) {
    +                LOG.error("Incorrect Scheduling: found node with negative available resources");
    +                return false;
    +            }
    +            if (!nodeToExecs.containsKey(node)) {
    +                nodeToExecs.put(node, new LinkedList<>());
    +            }
    +            nodeToExecs.get(node).add(exec);
    +        }
    +
    +        for (Map.Entry<RAS_Node, Collection<ExecutorDetails>> entry : nodeToExecs.entrySet()) {
    +            RAS_Node node = entry.getKey();
    +            Collection<ExecutorDetails> execs = entry.getValue();
    +            double cpuUsed = 0.0;
    +            double memoryUsed = 0.0;
    +            for (ExecutorDetails exec : execs) {
    +                cpuUsed += topo.getTotalCpuReqTask(exec);
    +                memoryUsed += topo.getTotalMemReqTask(exec);
    +            }
    +            if (node.getAvailableCpuResources() != (node.getTotalCpuResources() - cpuUsed)) {
    +                LOG.error("Incorrect Scheduling: node {} has consumed incorrect amount of cpu. Expected: {}"
    +                        + " Actual: {} Executors scheduled on node: {}",
    +                        node.getId(), (node.getTotalCpuResources() - cpuUsed), node.getAvailableCpuResources(), execs);
    +                return false;
    +            }
    +            if (node.getAvailableMemoryResources() != (node.getTotalMemoryResources() - memoryUsed)) {
    +                LOG.error("Incorrect Scheduling: node {} has consumed incorrect amount of memory. Expected: {}"
    +                        + " Actual: {} Executors scheduled on node: {}",
    +                        node.getId(), (node.getTotalMemoryResources() - memoryUsed), node.getAvailableMemoryResources(), execs);
    +                return false;
    +            }
    +        }
    +        return true;
    +    }
    +
    +    private Map<String, Set<ExecutorDetails>> getCompToExecs(Map<ExecutorDetails, String> executorToComp) {
    +        Map<String, Set<ExecutorDetails>> retMap = new HashMap<>();
    +        for (Map.Entry<ExecutorDetails, String> entry : executorToComp.entrySet()) {
    +            ExecutorDetails exec = entry.getKey();
    +            String comp = entry.getValue();
    +            if (!retMap.containsKey(comp)) {
    +                retMap.put(comp, new HashSet<>());
    +            }
    +            retMap.get(comp).add(exec);
    +        }
    +        return retMap;
    +    }
    +
    +    private ArrayList<ExecutorDetails> getSortedExecs(HashSet<String> spreadComps, Map<String, Map<String, Integer>> constraintMatrix,
    +                                                      Map<String, Set<ExecutorDetails>> compToExecs) {
    +        ArrayList<ExecutorDetails> retList = new ArrayList<>();
    +        //find number of constraints per component
    +        //Key->Comp Value-># of constraints
    +        Map<String, Integer> compConstraintCountMap = new HashMap<>();
    +        for (Map.Entry<String, Map<String, Integer>> constraintEntry1 : constraintMatrix.entrySet()) {
    +            int count = 0;
    +            String comp = constraintEntry1.getKey();
    +            for (Map.Entry<String, Integer> constraintEntry2 : constraintEntry1.getValue().entrySet()) {
    +                if (constraintEntry2.getValue() == 1) {
    +                    count++;
    +                }
    +            }
    +            //check component is declared for spreading
    +            if (spreadComps.contains(constraintEntry1.getKey())) {
    +                count++;
    +            }
    +            compConstraintCountMap.put(comp, count);
    +        }
    +        //Sort comps by number of constraints
    +        NavigableMap<String, Integer> sortedCompConstraintCountMap = sortByValues(compConstraintCountMap);
    +        //sort executors based on component constraints
    +        for (String comp : sortedCompConstraintCountMap.keySet()) {
    +            retList.addAll(compToExecs.get(comp));
    +        }
    +        return retList;
    +    }
    +
    +    private static HashSet<String> getSpreadComps(TopologyDetails topo) {
    +        HashSet<String> retSet = new HashSet<>();
    +        List<String> spread = (List<String>) topo.getConf().get(Config.TOPOLOGY_SPREAD_COMPONENTS);
    +        if (spread != null) {
    +            Set<String> comps = topo.getComponents().keySet();
    +            for (String comp : spread) {
    +                if (comps.contains(comp)) {
    +                    retSet.add(comp);
    +                } else {
    +                    LOG.warn("Comp {} declared for spread not valid", comp);
    --- End diff --
    
    Similar situation as TOPOLOGY_RAS_CONSTRAINTS.  I would give the same recommendation. 


---

[GitHub] storm issue #2442: STORM-2837: ConstraintSolverStrategy

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/2442
  
    @revans2 Thanks for the quick fix. I can see you've addressed my comment. Btw, I didn't see actual patch so would take time to review the code.
    
    @jerrypeng I think you've got context of the codebase, then could you help reviewing the change? Thanks in advance.


---

[GitHub] storm issue #2442: STORM-2837: ConstraintSolverStrategy

Posted by jerrypeng <gi...@git.apache.org>.
Github user jerrypeng commented on the issue:

    https://github.com/apache/storm/pull/2442
  
    @revans2 thanks for the explanantion


---

[GitHub] storm pull request #2442: STORM-2837: ConstraintSolverStrategy

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2442#discussion_r154195212
  
    --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java ---
    @@ -0,0 +1,623 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.scheduler.resource.strategies.scheduling;
    +
    +import com.google.common.annotations.VisibleForTesting;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Comparator;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.LinkedHashMap;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.NavigableMap;
    +import java.util.Set;
    +import java.util.Stack;
    +import java.util.TreeMap;
    +import java.util.TreeSet;
    +import org.apache.storm.Config;
    +import org.apache.storm.scheduler.Cluster;
    +import org.apache.storm.scheduler.ExecutorDetails;
    +import org.apache.storm.scheduler.SchedulerAssignment;
    +import org.apache.storm.scheduler.TopologyDetails;
    +import org.apache.storm.scheduler.WorkerSlot;
    +import org.apache.storm.scheduler.resource.RAS_Node;
    +import org.apache.storm.scheduler.resource.RAS_Nodes;
    +import org.apache.storm.scheduler.resource.SchedulingResult;
    +import org.apache.storm.scheduler.resource.SchedulingStatus;
    +import org.apache.storm.utils.ObjectReader;
    +import org.apache.storm.utils.Time;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +public class ConstraintSolverStrategy extends BaseResourceAwareStrategy {
    +    private static final Logger LOG = LoggerFactory.getLogger(ConstraintSolverStrategy.class);
    +
    +    protected static class SolverResult {
    +        private final int statesSearched;
    +        private final boolean success;
    +        private final long timeTakenMillis;
    +        private final int backtracked;
    +
    +        public SolverResult(SearcherState state, boolean success) {
    +            this.statesSearched = state.getStatesSearched();
    +            this.success = success;
    +            timeTakenMillis = Time.currentTimeMillis() - state.startTimeMillis;
    +            backtracked = state.numBacktrack;
    +        }
    +
    +        public SchedulingResult asSchedulingResult() {
    +            if (success) {
    +                return SchedulingResult.success("Fully Scheduled by ConstraintSolverStrategy (" + statesSearched
    +                    + " states traversed in " + timeTakenMillis + "ms, backtracked " + backtracked + " times)");
    +            }
    +            return SchedulingResult.failure(SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES,
    +                "Cannot find scheduling that satisfies all constraints (" + statesSearched
    +                    + " states traversed in " + timeTakenMillis + "ms, backtracked " + backtracked + " times)");
    +        }
    +    }
    +
    +    protected static class SearcherState {
    +        // Metrics
    +        // How many states searched so far.
    +        private int statesSearched = 0;
    +        // Number of times we had to backtrack.
    +        private int numBacktrack = 0;
    +        final long startTimeMillis;
    +        private final long maxEndTimeMs;
    +
    +        // Current state
    +        // The current executor we are trying to schedule
    +        private int execIndex = 0;
    +        // A map of the worker to the components in the worker to be able to enforce constraints.
    +        private final Map<WorkerSlot, Set<String>> workerCompAssignment;
    +        private final boolean[] okToRemoveFromWorker;
    +        // for the currently tested assignment a Map of the node to the components on it to be able to enforce constraints
    +        private final Map<RAS_Node, Set<String>> nodeCompAssignment;
    +        private final boolean[] okToRemoveFromNode;
    +
    +        // Static State
    +        // The list of all executors (preferably sorted to make assignments simpler).
    +        private final List<ExecutorDetails> execs;
    +        //The maximum number of state to search before stopping.
    +        private final int maxStatesSearched;
    +        //The topology we are scheduling
    +        private final TopologyDetails td;
    +
    +        private SearcherState(Map<WorkerSlot, Set<String>> workerCompAssignment, Map<RAS_Node, Set<String>> nodeCompAssignment,
    +                              int maxStatesSearched, long maxTimeMs, List<ExecutorDetails> execs, TopologyDetails td) {
    +            assert !execs.isEmpty();
    +            assert execs != null;
    +
    +            this.workerCompAssignment = workerCompAssignment;
    +            this.nodeCompAssignment = nodeCompAssignment;
    +            this.maxStatesSearched = maxStatesSearched;
    +            this.execs = execs;
    +            okToRemoveFromWorker = new boolean[execs.size()];
    +            okToRemoveFromNode = new boolean[execs.size()];
    +            this.td = td;
    +            startTimeMillis = Time.currentTimeMillis();
    +            if (maxTimeMs <= 0) {
    +                maxEndTimeMs = Long.MAX_VALUE;
    +            } else {
    +                maxEndTimeMs = startTimeMillis + maxTimeMs;
    +            }
    +        }
    +
    +        public void incStatesSearched() {
    +            statesSearched++;
    +            if (LOG.isDebugEnabled() && statesSearched % 1_000 == 0) {
    +                LOG.debug("States Searched: {}", statesSearched);
    +                LOG.debug("backtrack: {}", numBacktrack);
    +            }
    +        }
    +
    +        public int getStatesSearched() {
    +            return statesSearched;
    +        }
    +
    +        public boolean areSearchLimitsExceeded() {
    +            return statesSearched > maxStatesSearched || Time.currentTimeMillis() > maxEndTimeMs;
    +        }
    +
    +        public SearcherState nextExecutor() {
    +            execIndex++;
    +            if (execIndex >= execs.size()) {
    +                throw new IllegalStateException("Exceeded the exec limit " + execIndex + " >= " + execs.size());
    +            }
    +            return this;
    +        }
    +
    +        public boolean areAllExecsScheduled() {
    +            return execIndex == execs.size() - 1;
    +        }
    +
    +        public ExecutorDetails currentExec() {
    +            return execs.get(execIndex);
    +        }
    +
    +        public void tryToSchedule(String comp, RAS_Node node, WorkerSlot workerSlot) {
    +            ExecutorDetails exec = currentExec();
    +            LOG.trace("Trying assignment of {} {} to {}", exec, comp, workerSlot);
    +            //It is possible that this component is already scheduled on this node or worker.  If so when we backtrack we cannot remove it
    +            okToRemoveFromWorker[execIndex] = workerCompAssignment.computeIfAbsent(workerSlot, (k) -> new HashSet<>()).add(comp);
    +            okToRemoveFromNode[execIndex] = nodeCompAssignment.get(node).add(comp);
    +            node.assignSingleExecutor(workerSlot, exec, td);
    +        }
    +
    +        public void backtrack(String comp, RAS_Node node, WorkerSlot workerSlot) {
    +            execIndex--;
    +            if (execIndex < 0) {
    +                throw new IllegalStateException("Internal Error exec index became negative");
    +            }
    +            numBacktrack++;
    +            ExecutorDetails exec = currentExec();
    +            LOG.trace("Backtracking {} {} from {}", exec, comp, workerSlot);
    +            if (okToRemoveFromWorker[execIndex]) {
    +                workerCompAssignment.get(workerSlot).remove(comp);
    +            }
    +            if (okToRemoveFromNode[execIndex]) {
    +                nodeCompAssignment.get(node).remove(comp);
    +            }
    +            node.freeSingleExecutor(exec, td);
    +        }
    +    }
    +
    +    private Map<String, RAS_Node> nodes;
    +    private Map<ExecutorDetails, String> execToComp;
    +    private Map<String, Set<ExecutorDetails>> compToExecs;
    +    private List<String> favoredNodes;
    +    private List<String> unFavoredNodes;
    +
    +    //constraints and spreads
    +    private Map<String, Map<String, Integer>> constraintMatrix;
    +    private HashSet<String> spreadComps = new HashSet<>();
    +
    +    //hard coded max number of states to search
    +    public static final int MAX_STATE_SEARCH = 100_000;
    +
    +    @Override
    +    public SchedulingResult schedule(Cluster cluster, TopologyDetails td) {
    +        prepare(cluster);
    +        LOG.debug("Scheduling {}", td.getId());
    +        nodes = RAS_Nodes.getAllNodesFrom(cluster);
    +        Map<WorkerSlot, Set<String>> workerCompAssignment = new HashMap<>();
    +        Map<RAS_Node, Set<String>> nodeCompAssignment = new HashMap<>();
    +        //set max number of states to search
    +        final int maxStateSearch = Math.min(MAX_STATE_SEARCH,
    +            ObjectReader.getInt(td.getConf().get(Config.TOPOLOGY_CONSTRAINTS_MAX_DEPTH_TRAVERSAL)));
    +
    +        final long maxTimeMs =
    +            ObjectReader.getInt(td.getConf().get(Config.TOPOLOGY_CONSTRAINTS_MAX_TIME_SECS), -1).intValue() * 1000L;
    +
    +        favoredNodes = (List<String>) td.getConf().get(Config.TOPOLOGY_SCHEDULER_FAVORED_NODES);
    +        unFavoredNodes = (List<String>) td.getConf().get(Config.TOPOLOGY_SCHEDULER_UNFAVORED_NODES);
    +
    +        //get mapping of execs to components
    +        execToComp = td.getExecutorToComponent();
    +        //get mapping of components to executors
    +        compToExecs = getCompToExecs(execToComp);
    +
    +        //get topology constraints
    +        constraintMatrix = getConstraintMap(td, compToExecs.keySet());
    +
    +        //get spread components
    +        spreadComps = getSpreadComps(td);
    +
    +        ArrayList<ExecutorDetails> sortedExecs = new ArrayList<>();
    +        //get a sorted list of unassigned executors based on number of constraints
    +        Set<ExecutorDetails> unassignedExecutors = new HashSet<>(cluster.getUnassignedExecutors(td));
    +        for (ExecutorDetails exec1 : getSortedExecs(spreadComps, constraintMatrix, compToExecs)) {
    +            if (unassignedExecutors.contains(exec1)) {
    +                sortedExecs.add(exec1);
    +            }
    +        }
    +
    +        //initialize structures
    +        for (RAS_Node node : nodes.values()) {
    +            nodeCompAssignment.put(node, new HashSet<>());
    +        }
    +        //populate with existing assignments
    +        SchedulerAssignment existingAssignment = cluster.getAssignmentById(td.getId());
    +        if (existingAssignment != null) {
    +            for (Map.Entry<ExecutorDetails, WorkerSlot> entry1 : existingAssignment.getExecutorToSlot().entrySet()) {
    +                ExecutorDetails exec1 = entry1.getKey();
    +                String compId = execToComp.get(exec1);
    +                WorkerSlot ws = entry1.getValue();
    +                RAS_Node node = nodes.get(ws.getNodeId());
    +                //populate node to component Assignments
    +                nodeCompAssignment.get(node).add(compId);
    +                //populate worker to comp assignments
    +                workerCompAssignment.computeIfAbsent(ws, (k) -> new HashSet<>()).add(compId);
    +            }
    +        }
    +
    +        //early detection/early fail
    +        if (!checkSchedulingFeasibility()) {
    +            //Scheduling Status set to FAIL_OTHER so no eviction policy will be attempted to make space for this topology
    +            return SchedulingResult.failure(SchedulingStatus.FAIL_OTHER, "Scheduling not feasible!");
    +        }
    +        return backtrackSearch(new SearcherState(workerCompAssignment, nodeCompAssignment, maxStateSearch, maxTimeMs, sortedExecs, td))
    +            .asSchedulingResult();
    +    }
    +
    +    private boolean checkSchedulingFeasibility() {
    +        for (String comp : spreadComps) {
    +            int numExecs = compToExecs.get(comp).size();
    +            if (numExecs > nodes.size()) {
    +                LOG.error("Unsatisfiable constraint: Component: {} marked as spread has {} executors which is larger "
    +                    + "than number of nodes: {}", comp, numExecs, nodes.size());
    +                return false;
    +            }
    +        }
    +        if (execToComp.size() >= MAX_STATE_SEARCH) {
    +            LOG.error("Number of executors is greater than the maximum number of states allowed to be searched.  "
    +                + "# of executors: {} Max states to search: {}", execToComp.size(), MAX_STATE_SEARCH);
    +            return false;
    +        }
    +        return true;
    +    }
    +    
    +    @Override
    +    protected TreeSet<ObjectResources> sortObjectResources(
    +        final AllResources allResources, ExecutorDetails exec, TopologyDetails topologyDetails,
    +        final ExistingScheduleFunc existingScheduleFunc) {
    +        return GenericResourceAwareStrategy.sortObjectResourcesImpl(allResources, exec, topologyDetails, existingScheduleFunc);
    +    }
    +
    +    // Backtracking algorithm does not take into account the ordering of executors in worker to reduce traversal space
    +    @VisibleForTesting
    +    protected SolverResult backtrackSearch(SearcherState state) {
    +        state.incStatesSearched();
    +        if (state.areSearchLimitsExceeded()) {
    +            LOG.warn("Limits Exceeded");
    +            return new SolverResult(state, false);
    +        }
    +
    +        ExecutorDetails exec = state.currentExec();
    +        List<ObjectResources> sortedNodes = sortAllNodes(state.td, exec, favoredNodes, unFavoredNodes);
    +
    +        for (ObjectResources nodeResources: sortedNodes) {
    +            RAS_Node node = nodes.get(nodeResources.id);
    +            for (WorkerSlot workerSlot : node.getSlotsAvailbleTo(state.td)) {
    +                if (isExecAssignmentToWorkerValid(workerSlot, state)) {
    +                    String comp = execToComp.get(exec);
    +
    +                    state.tryToSchedule(comp, node, workerSlot);
    +
    +                    if (state.areAllExecsScheduled()) {
    +                        //Everything is scheduled correctly, so no need to search any more.
    +                        return new SolverResult(state, true);
    +                    }
    +
    +                    SolverResult results = backtrackSearch(state.nextExecutor());
    +                    if (results.success) {
    +                        //We found a good result we are done.
    +                        return results;
    +                    }
    +
    +                    if (state.areSearchLimitsExceeded()) {
    +                        //No need to search more it is not going to help.
    +                        return new SolverResult(state, false);
    +                    }
    +
    +                    //backtracking (If we ever get here there really isn't a lot of hope that we will find a scheduling)
    +                    state.backtrack(comp, node, workerSlot);
    +                }
    +            }
    +        }
    +        //Tried all of the slots and none of them worked.
    +        return new SolverResult(state, false);
    +    }
    +
    +    /**
    +     * Check if any constraints are violated if exec is scheduled on worker.
    +     * @return true if scheduling exec on worker does not violate any constraints, returns false if it does
    +     */
    +    public boolean isExecAssignmentToWorkerValid(WorkerSlot worker, SearcherState state) {
    +        final ExecutorDetails exec = state.currentExec();
    +        //check resources
    +        RAS_Node node = nodes.get(worker.getNodeId());
    +        if (!node.wouldFit(worker, exec, state.td)) {
    +            LOG.trace("{} would not fit in resources available on {}", exec, worker);
    +            return false;
    +        }
    +
    +        //check if exec can be on worker based on user defined component exclusions
    +        String execComp = execToComp.get(exec);
    +        Set<String> components = state.workerCompAssignment.get(worker);
    +        if (components != null) {
    +            for (String comp : components) {
    +                if (constraintMatrix.get(execComp).get(comp) != 0) {
    +                    LOG.trace("{} found {} constraint violation {} on {}", exec, execComp, comp, worker);
    +                    return false;
    +                }
    +            }
    +        }
    +
    +        //check if exec satisfy spread
    +        if (spreadComps.contains(execComp)) {
    +            if (state.nodeCompAssignment.get(node).contains(execComp)) {
    +                LOG.trace("{} Found spread violation {} on node {}", exec, execComp, node.getId());
    +                return false;
    +            }
    +        }
    +        return true;
    +    }
    +
    +    static Map<String, Map<String, Integer>> getConstraintMap(TopologyDetails topo, Set<String> comps) {
    +        Map<String, Map<String, Integer>> matrix = new HashMap<>();
    +        for (String comp : comps) {
    +            matrix.put(comp, new HashMap<>());
    +            for (String comp2 : comps) {
    +                matrix.get(comp).put(comp2, 0);
    +            }
    +        }
    +        List<List<String>> constraints = (List<List<String>>) topo.getConf().get(Config.TOPOLOGY_CONSTRAINTS);
    +        if (constraints != null) {
    +            for (List<String> constraintPair : constraints) {
    +                String comp1 = constraintPair.get(0);
    +                String comp2 = constraintPair.get(1);
    +                if (!matrix.containsKey(comp1)) {
    +                    LOG.warn("Comp: {} declared in constraints is not valid!", comp1);
    --- End diff --
    
    Should this be fatal?


---

[GitHub] storm issue #2442: STORM-2837: ConstraintSolverStrategy

Posted by jerrypeng <gi...@git.apache.org>.
Github user jerrypeng commented on the issue:

    https://github.com/apache/storm/pull/2442
  
    @revans2 Sorry for not being more clear about what I meant in regards to using the heap instead of stack space.  Currently, the algorithm uses recursion to backtrack which means it is going to use space on the call stack.  This can cause a stackoverflow if the stack size is not large enough to handle TOPOLOGY_RAS_CONSTRAINT_MAX_STATE_SEARCH. Thus, you might need to configure the jvm stack space (-Xss) in Nimbus to appropriately match what is expected to be set for TOPOLOGY_RAS_CONSTRAINT_MAX_STATE_SEARCH.  What I am suggesting is that if there is an iterative implementation of the recursive algorithm, we would not need to put some many frames on the stack but in heap so that you don't need to configure the stack space for the Nimbus Daemon.  Not something critical, but rather something to think about.


---

[GitHub] storm pull request #2442: STORM-2837: ConstraintSolverStrategy

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/storm/pull/2442


---

[GitHub] storm issue #2442: STORM-2837: ConstraintSolverStrategy

Posted by jerrypeng <gi...@git.apache.org>.
Github user jerrypeng commented on the issue:

    https://github.com/apache/storm/pull/2442
  
    LGTM +1. @revans2 up to you if you want to do something more about whether those warnings should dealt with in a more serious manner


---

[GitHub] storm issue #2442: STORM-2837: ConstraintSolverStrategy

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:

    https://github.com/apache/storm/pull/2442
  
    @HeartSaVioR I rebased it and made the changes.  My updates are in the last commit f5e9532


---

[GitHub] storm pull request #2442: STORM-2837: ConstraintSolverStrategy

Posted by jerrypeng <gi...@git.apache.org>.
Github user jerrypeng commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2442#discussion_r157679086
  
    --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java ---
    @@ -0,0 +1,623 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.scheduler.resource.strategies.scheduling;
    +
    +import com.google.common.annotations.VisibleForTesting;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Comparator;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.LinkedHashMap;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.NavigableMap;
    +import java.util.Set;
    +import java.util.Stack;
    +import java.util.TreeMap;
    +import java.util.TreeSet;
    +import org.apache.storm.Config;
    +import org.apache.storm.scheduler.Cluster;
    +import org.apache.storm.scheduler.ExecutorDetails;
    +import org.apache.storm.scheduler.SchedulerAssignment;
    +import org.apache.storm.scheduler.TopologyDetails;
    +import org.apache.storm.scheduler.WorkerSlot;
    +import org.apache.storm.scheduler.resource.RAS_Node;
    +import org.apache.storm.scheduler.resource.RAS_Nodes;
    +import org.apache.storm.scheduler.resource.SchedulingResult;
    +import org.apache.storm.scheduler.resource.SchedulingStatus;
    +import org.apache.storm.utils.ObjectReader;
    +import org.apache.storm.utils.Time;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +public class ConstraintSolverStrategy extends BaseResourceAwareStrategy {
    +    private static final Logger LOG = LoggerFactory.getLogger(ConstraintSolverStrategy.class);
    +
    +    protected static class SolverResult {
    +        private final int statesSearched;
    +        private final boolean success;
    +        private final long timeTakenMillis;
    +        private final int backtracked;
    +
    +        public SolverResult(SearcherState state, boolean success) {
    +            this.statesSearched = state.getStatesSearched();
    +            this.success = success;
    +            timeTakenMillis = Time.currentTimeMillis() - state.startTimeMillis;
    +            backtracked = state.numBacktrack;
    +        }
    +
    +        public SchedulingResult asSchedulingResult() {
    +            if (success) {
    +                return SchedulingResult.success("Fully Scheduled by ConstraintSolverStrategy (" + statesSearched
    +                    + " states traversed in " + timeTakenMillis + "ms, backtracked " + backtracked + " times)");
    +            }
    +            return SchedulingResult.failure(SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES,
    +                "Cannot find scheduling that satisfies all constraints (" + statesSearched
    +                    + " states traversed in " + timeTakenMillis + "ms, backtracked " + backtracked + " times)");
    +        }
    +    }
    +
    +    protected static class SearcherState {
    +        // Metrics
    +        // How many states searched so far.
    +        private int statesSearched = 0;
    +        // Number of times we had to backtrack.
    +        private int numBacktrack = 0;
    +        final long startTimeMillis;
    +        private final long maxEndTimeMs;
    +
    +        // Current state
    +        // The current executor we are trying to schedule
    +        private int execIndex = 0;
    +        // A map of the worker to the components in the worker to be able to enforce constraints.
    +        private final Map<WorkerSlot, Set<String>> workerCompAssignment;
    +        private final boolean[] okToRemoveFromWorker;
    +        // for the currently tested assignment a Map of the node to the components on it to be able to enforce constraints
    +        private final Map<RAS_Node, Set<String>> nodeCompAssignment;
    +        private final boolean[] okToRemoveFromNode;
    +
    +        // Static State
    +        // The list of all executors (preferably sorted to make assignments simpler).
    +        private final List<ExecutorDetails> execs;
    +        //The maximum number of state to search before stopping.
    +        private final int maxStatesSearched;
    +        //The topology we are scheduling
    +        private final TopologyDetails td;
    +
    +        private SearcherState(Map<WorkerSlot, Set<String>> workerCompAssignment, Map<RAS_Node, Set<String>> nodeCompAssignment,
    +                              int maxStatesSearched, long maxTimeMs, List<ExecutorDetails> execs, TopologyDetails td) {
    +            assert !execs.isEmpty();
    +            assert execs != null;
    +
    +            this.workerCompAssignment = workerCompAssignment;
    +            this.nodeCompAssignment = nodeCompAssignment;
    +            this.maxStatesSearched = maxStatesSearched;
    +            this.execs = execs;
    +            okToRemoveFromWorker = new boolean[execs.size()];
    +            okToRemoveFromNode = new boolean[execs.size()];
    +            this.td = td;
    +            startTimeMillis = Time.currentTimeMillis();
    +            if (maxTimeMs <= 0) {
    +                maxEndTimeMs = Long.MAX_VALUE;
    +            } else {
    +                maxEndTimeMs = startTimeMillis + maxTimeMs;
    +            }
    +        }
    +
    +        public void incStatesSearched() {
    +            statesSearched++;
    +            if (LOG.isDebugEnabled() && statesSearched % 1_000 == 0) {
    +                LOG.debug("States Searched: {}", statesSearched);
    +                LOG.debug("backtrack: {}", numBacktrack);
    +            }
    +        }
    +
    +        public int getStatesSearched() {
    +            return statesSearched;
    +        }
    +
    +        public boolean areSearchLimitsExceeded() {
    +            return statesSearched > maxStatesSearched || Time.currentTimeMillis() > maxEndTimeMs;
    +        }
    +
    +        public SearcherState nextExecutor() {
    +            execIndex++;
    +            if (execIndex >= execs.size()) {
    +                throw new IllegalStateException("Exceeded the exec limit " + execIndex + " >= " + execs.size());
    +            }
    +            return this;
    +        }
    +
    +        public boolean areAllExecsScheduled() {
    +            return execIndex == execs.size() - 1;
    +        }
    +
    +        public ExecutorDetails currentExec() {
    +            return execs.get(execIndex);
    +        }
    +
    +        public void tryToSchedule(String comp, RAS_Node node, WorkerSlot workerSlot) {
    +            ExecutorDetails exec = currentExec();
    +            LOG.trace("Trying assignment of {} {} to {}", exec, comp, workerSlot);
    +            //It is possible that this component is already scheduled on this node or worker.  If so when we backtrack we cannot remove it
    +            okToRemoveFromWorker[execIndex] = workerCompAssignment.computeIfAbsent(workerSlot, (k) -> new HashSet<>()).add(comp);
    +            okToRemoveFromNode[execIndex] = nodeCompAssignment.get(node).add(comp);
    +            node.assignSingleExecutor(workerSlot, exec, td);
    +        }
    +
    +        public void backtrack(String comp, RAS_Node node, WorkerSlot workerSlot) {
    +            execIndex--;
    +            if (execIndex < 0) {
    +                throw new IllegalStateException("Internal Error exec index became negative");
    +            }
    +            numBacktrack++;
    +            ExecutorDetails exec = currentExec();
    +            LOG.trace("Backtracking {} {} from {}", exec, comp, workerSlot);
    +            if (okToRemoveFromWorker[execIndex]) {
    +                workerCompAssignment.get(workerSlot).remove(comp);
    +            }
    +            if (okToRemoveFromNode[execIndex]) {
    +                nodeCompAssignment.get(node).remove(comp);
    +            }
    +            node.freeSingleExecutor(exec, td);
    +        }
    +    }
    +
    +    private Map<String, RAS_Node> nodes;
    +    private Map<ExecutorDetails, String> execToComp;
    +    private Map<String, Set<ExecutorDetails>> compToExecs;
    +    private List<String> favoredNodes;
    +    private List<String> unFavoredNodes;
    +
    +    //constraints and spreads
    +    private Map<String, Map<String, Integer>> constraintMatrix;
    +    private HashSet<String> spreadComps = new HashSet<>();
    +
    +    //hard coded max number of states to search
    +    public static final int MAX_STATE_SEARCH = 100_000;
    +
    +    @Override
    +    public SchedulingResult schedule(Cluster cluster, TopologyDetails td) {
    +        prepare(cluster);
    +        LOG.debug("Scheduling {}", td.getId());
    +        nodes = RAS_Nodes.getAllNodesFrom(cluster);
    +        Map<WorkerSlot, Set<String>> workerCompAssignment = new HashMap<>();
    +        Map<RAS_Node, Set<String>> nodeCompAssignment = new HashMap<>();
    +        //set max number of states to search
    +        final int maxStateSearch = Math.min(MAX_STATE_SEARCH,
    +            ObjectReader.getInt(td.getConf().get(Config.TOPOLOGY_CONSTRAINTS_MAX_DEPTH_TRAVERSAL)));
    +
    +        final long maxTimeMs =
    +            ObjectReader.getInt(td.getConf().get(Config.TOPOLOGY_CONSTRAINTS_MAX_TIME_SECS), -1).intValue() * 1000L;
    +
    +        favoredNodes = (List<String>) td.getConf().get(Config.TOPOLOGY_SCHEDULER_FAVORED_NODES);
    +        unFavoredNodes = (List<String>) td.getConf().get(Config.TOPOLOGY_SCHEDULER_UNFAVORED_NODES);
    +
    +        //get mapping of execs to components
    +        execToComp = td.getExecutorToComponent();
    +        //get mapping of components to executors
    +        compToExecs = getCompToExecs(execToComp);
    +
    +        //get topology constraints
    +        constraintMatrix = getConstraintMap(td, compToExecs.keySet());
    +
    +        //get spread components
    +        spreadComps = getSpreadComps(td);
    +
    +        ArrayList<ExecutorDetails> sortedExecs = new ArrayList<>();
    +        //get a sorted list of unassigned executors based on number of constraints
    +        Set<ExecutorDetails> unassignedExecutors = new HashSet<>(cluster.getUnassignedExecutors(td));
    +        for (ExecutorDetails exec1 : getSortedExecs(spreadComps, constraintMatrix, compToExecs)) {
    +            if (unassignedExecutors.contains(exec1)) {
    +                sortedExecs.add(exec1);
    +            }
    +        }
    +
    +        //initialize structures
    +        for (RAS_Node node : nodes.values()) {
    +            nodeCompAssignment.put(node, new HashSet<>());
    +        }
    +        //populate with existing assignments
    +        SchedulerAssignment existingAssignment = cluster.getAssignmentById(td.getId());
    +        if (existingAssignment != null) {
    +            for (Map.Entry<ExecutorDetails, WorkerSlot> entry1 : existingAssignment.getExecutorToSlot().entrySet()) {
    +                ExecutorDetails exec1 = entry1.getKey();
    +                String compId = execToComp.get(exec1);
    +                WorkerSlot ws = entry1.getValue();
    +                RAS_Node node = nodes.get(ws.getNodeId());
    +                //populate node to component Assignments
    +                nodeCompAssignment.get(node).add(compId);
    +                //populate worker to comp assignments
    +                workerCompAssignment.computeIfAbsent(ws, (k) -> new HashSet<>()).add(compId);
    +            }
    +        }
    +
    +        //early detection/early fail
    +        if (!checkSchedulingFeasibility()) {
    +            //Scheduling Status set to FAIL_OTHER so no eviction policy will be attempted to make space for this topology
    +            return SchedulingResult.failure(SchedulingStatus.FAIL_OTHER, "Scheduling not feasible!");
    +        }
    +        return backtrackSearch(new SearcherState(workerCompAssignment, nodeCompAssignment, maxStateSearch, maxTimeMs, sortedExecs, td))
    +            .asSchedulingResult();
    +    }
    +
    +    private boolean checkSchedulingFeasibility() {
    +        for (String comp : spreadComps) {
    +            int numExecs = compToExecs.get(comp).size();
    +            if (numExecs > nodes.size()) {
    +                LOG.error("Unsatisfiable constraint: Component: {} marked as spread has {} executors which is larger "
    +                    + "than number of nodes: {}", comp, numExecs, nodes.size());
    +                return false;
    +            }
    +        }
    +        if (execToComp.size() >= MAX_STATE_SEARCH) {
    +            LOG.error("Number of executors is greater than the maximum number of states allowed to be searched.  "
    +                + "# of executors: {} Max states to search: {}", execToComp.size(), MAX_STATE_SEARCH);
    +            return false;
    +        }
    +        return true;
    +    }
    +    
    +    @Override
    +    protected TreeSet<ObjectResources> sortObjectResources(
    +        final AllResources allResources, ExecutorDetails exec, TopologyDetails topologyDetails,
    +        final ExistingScheduleFunc existingScheduleFunc) {
    +        return GenericResourceAwareStrategy.sortObjectResourcesImpl(allResources, exec, topologyDetails, existingScheduleFunc);
    +    }
    +
    +    // Backtracking algorithm does not take into account the ordering of executors in worker to reduce traversal space
    +    @VisibleForTesting
    +    protected SolverResult backtrackSearch(SearcherState state) {
    +        state.incStatesSearched();
    +        if (state.areSearchLimitsExceeded()) {
    +            LOG.warn("Limits Exceeded");
    +            return new SolverResult(state, false);
    +        }
    +
    +        ExecutorDetails exec = state.currentExec();
    +        List<ObjectResources> sortedNodes = sortAllNodes(state.td, exec, favoredNodes, unFavoredNodes);
    +
    +        for (ObjectResources nodeResources: sortedNodes) {
    +            RAS_Node node = nodes.get(nodeResources.id);
    +            for (WorkerSlot workerSlot : node.getSlotsAvailbleTo(state.td)) {
    +                if (isExecAssignmentToWorkerValid(workerSlot, state)) {
    +                    String comp = execToComp.get(exec);
    +
    +                    state.tryToSchedule(comp, node, workerSlot);
    +
    +                    if (state.areAllExecsScheduled()) {
    +                        //Everything is scheduled correctly, so no need to search any more.
    +                        return new SolverResult(state, true);
    +                    }
    +
    +                    SolverResult results = backtrackSearch(state.nextExecutor());
    +                    if (results.success) {
    +                        //We found a good result we are done.
    +                        return results;
    +                    }
    +
    +                    if (state.areSearchLimitsExceeded()) {
    +                        //No need to search more it is not going to help.
    +                        return new SolverResult(state, false);
    +                    }
    +
    +                    //backtracking (If we ever get here there really isn't a lot of hope that we will find a scheduling)
    +                    state.backtrack(comp, node, workerSlot);
    +                }
    +            }
    +        }
    +        //Tried all of the slots and none of them worked.
    +        return new SolverResult(state, false);
    +    }
    +
    +    /**
    +     * Check if any constraints are violated if exec is scheduled on worker.
    +     * @return true if scheduling exec on worker does not violate any constraints, returns false if it does
    +     */
    +    public boolean isExecAssignmentToWorkerValid(WorkerSlot worker, SearcherState state) {
    +        final ExecutorDetails exec = state.currentExec();
    +        //check resources
    +        RAS_Node node = nodes.get(worker.getNodeId());
    +        if (!node.wouldFit(worker, exec, state.td)) {
    +            LOG.trace("{} would not fit in resources available on {}", exec, worker);
    +            return false;
    +        }
    +
    +        //check if exec can be on worker based on user defined component exclusions
    +        String execComp = execToComp.get(exec);
    +        Set<String> components = state.workerCompAssignment.get(worker);
    +        if (components != null) {
    +            for (String comp : components) {
    +                if (constraintMatrix.get(execComp).get(comp) != 0) {
    +                    LOG.trace("{} found {} constraint violation {} on {}", exec, execComp, comp, worker);
    +                    return false;
    +                }
    +            }
    +        }
    +
    +        //check if exec satisfy spread
    +        if (spreadComps.contains(execComp)) {
    +            if (state.nodeCompAssignment.get(node).contains(execComp)) {
    +                LOG.trace("{} Found spread violation {} on node {}", exec, execComp, node.getId());
    +                return false;
    +            }
    +        }
    +        return true;
    +    }
    +
    +    static Map<String, Map<String, Integer>> getConstraintMap(TopologyDetails topo, Set<String> comps) {
    +        Map<String, Map<String, Integer>> matrix = new HashMap<>();
    +        for (String comp : comps) {
    +            matrix.put(comp, new HashMap<>());
    +            for (String comp2 : comps) {
    +                matrix.get(comp).put(comp2, 0);
    +            }
    +        }
    +        List<List<String>> constraints = (List<List<String>>) topo.getConf().get(Config.TOPOLOGY_CONSTRAINTS);
    +        if (constraints != null) {
    +            for (List<String> constraintPair : constraints) {
    +                String comp1 = constraintPair.get(0);
    +                String comp2 = constraintPair.get(1);
    +                if (!matrix.containsKey(comp1)) {
    +                    LOG.warn("Comp: {} declared in constraints is not valid!", comp1);
    --- End diff --
    
    This pretty much means that the user specified (probably unintentionally) a component that doesn't exist.  In my experience users probably won't check the logs if there is not something obviously wrong with their topology so they will probably miss this error.   We should probably display some kind of warning for the topology status or even better if we can have client side checking to validate that all  components in Config.TOPOLOGY_RAS_CONSTRAINTS are valid components of the topology


---

[GitHub] storm issue #2442: STORM-2837: ConstraintSolverStrategy

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:

    https://github.com/apache/storm/pull/2442
  
    @HeartSaVioR sorry I have been out for the past 2 weeks.  I will update the configs accordingly.


---

[GitHub] storm pull request #2442: STORM-2837: ConstraintSolverStrategy

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2442#discussion_r154197827
  
    --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java ---
    @@ -0,0 +1,623 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.scheduler.resource.strategies.scheduling;
    +
    +import com.google.common.annotations.VisibleForTesting;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Comparator;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.LinkedHashMap;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.NavigableMap;
    +import java.util.Set;
    +import java.util.Stack;
    +import java.util.TreeMap;
    +import java.util.TreeSet;
    +import org.apache.storm.Config;
    +import org.apache.storm.scheduler.Cluster;
    +import org.apache.storm.scheduler.ExecutorDetails;
    +import org.apache.storm.scheduler.SchedulerAssignment;
    +import org.apache.storm.scheduler.TopologyDetails;
    +import org.apache.storm.scheduler.WorkerSlot;
    +import org.apache.storm.scheduler.resource.RAS_Node;
    +import org.apache.storm.scheduler.resource.RAS_Nodes;
    +import org.apache.storm.scheduler.resource.SchedulingResult;
    +import org.apache.storm.scheduler.resource.SchedulingStatus;
    +import org.apache.storm.utils.ObjectReader;
    +import org.apache.storm.utils.Time;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +public class ConstraintSolverStrategy extends BaseResourceAwareStrategy {
    +    private static final Logger LOG = LoggerFactory.getLogger(ConstraintSolverStrategy.class);
    +
    +    protected static class SolverResult {
    +        private final int statesSearched;
    +        private final boolean success;
    +        private final long timeTakenMillis;
    +        private final int backtracked;
    +
    +        public SolverResult(SearcherState state, boolean success) {
    +            this.statesSearched = state.getStatesSearched();
    +            this.success = success;
    +            timeTakenMillis = Time.currentTimeMillis() - state.startTimeMillis;
    +            backtracked = state.numBacktrack;
    +        }
    +
    +        public SchedulingResult asSchedulingResult() {
    +            if (success) {
    +                return SchedulingResult.success("Fully Scheduled by ConstraintSolverStrategy (" + statesSearched
    +                    + " states traversed in " + timeTakenMillis + "ms, backtracked " + backtracked + " times)");
    +            }
    +            return SchedulingResult.failure(SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES,
    +                "Cannot find scheduling that satisfies all constraints (" + statesSearched
    +                    + " states traversed in " + timeTakenMillis + "ms, backtracked " + backtracked + " times)");
    +        }
    +    }
    +
    +    protected static class SearcherState {
    +        // Metrics
    +        // How many states searched so far.
    +        private int statesSearched = 0;
    +        // Number of times we had to backtrack.
    +        private int numBacktrack = 0;
    +        final long startTimeMillis;
    +        private final long maxEndTimeMs;
    +
    +        // Current state
    +        // The current executor we are trying to schedule
    +        private int execIndex = 0;
    +        // A map of the worker to the components in the worker to be able to enforce constraints.
    +        private final Map<WorkerSlot, Set<String>> workerCompAssignment;
    +        private final boolean[] okToRemoveFromWorker;
    +        // for the currently tested assignment a Map of the node to the components on it to be able to enforce constraints
    +        private final Map<RAS_Node, Set<String>> nodeCompAssignment;
    +        private final boolean[] okToRemoveFromNode;
    +
    +        // Static State
    +        // The list of all executors (preferably sorted to make assignments simpler).
    +        private final List<ExecutorDetails> execs;
    +        //The maximum number of state to search before stopping.
    +        private final int maxStatesSearched;
    +        //The topology we are scheduling
    +        private final TopologyDetails td;
    +
    +        private SearcherState(Map<WorkerSlot, Set<String>> workerCompAssignment, Map<RAS_Node, Set<String>> nodeCompAssignment,
    +                              int maxStatesSearched, long maxTimeMs, List<ExecutorDetails> execs, TopologyDetails td) {
    +            assert !execs.isEmpty();
    +            assert execs != null;
    +
    +            this.workerCompAssignment = workerCompAssignment;
    +            this.nodeCompAssignment = nodeCompAssignment;
    +            this.maxStatesSearched = maxStatesSearched;
    +            this.execs = execs;
    +            okToRemoveFromWorker = new boolean[execs.size()];
    +            okToRemoveFromNode = new boolean[execs.size()];
    +            this.td = td;
    +            startTimeMillis = Time.currentTimeMillis();
    +            if (maxTimeMs <= 0) {
    +                maxEndTimeMs = Long.MAX_VALUE;
    +            } else {
    +                maxEndTimeMs = startTimeMillis + maxTimeMs;
    +            }
    +        }
    +
    +        public void incStatesSearched() {
    +            statesSearched++;
    +            if (LOG.isDebugEnabled() && statesSearched % 1_000 == 0) {
    +                LOG.debug("States Searched: {}", statesSearched);
    +                LOG.debug("backtrack: {}", numBacktrack);
    +            }
    +        }
    +
    +        public int getStatesSearched() {
    +            return statesSearched;
    +        }
    +
    +        public boolean areSearchLimitsExceeded() {
    +            return statesSearched > maxStatesSearched || Time.currentTimeMillis() > maxEndTimeMs;
    +        }
    +
    +        public SearcherState nextExecutor() {
    +            execIndex++;
    +            if (execIndex >= execs.size()) {
    +                throw new IllegalStateException("Exceeded the exec limit " + execIndex + " >= " + execs.size());
    +            }
    +            return this;
    +        }
    +
    +        public boolean areAllExecsScheduled() {
    +            return execIndex == execs.size() - 1;
    +        }
    +
    +        public ExecutorDetails currentExec() {
    +            return execs.get(execIndex);
    +        }
    +
    +        public void tryToSchedule(String comp, RAS_Node node, WorkerSlot workerSlot) {
    +            ExecutorDetails exec = currentExec();
    +            LOG.trace("Trying assignment of {} {} to {}", exec, comp, workerSlot);
    +            //It is possible that this component is already scheduled on this node or worker.  If so when we backtrack we cannot remove it
    +            okToRemoveFromWorker[execIndex] = workerCompAssignment.computeIfAbsent(workerSlot, (k) -> new HashSet<>()).add(comp);
    +            okToRemoveFromNode[execIndex] = nodeCompAssignment.get(node).add(comp);
    +            node.assignSingleExecutor(workerSlot, exec, td);
    +        }
    +
    +        public void backtrack(String comp, RAS_Node node, WorkerSlot workerSlot) {
    +            execIndex--;
    +            if (execIndex < 0) {
    +                throw new IllegalStateException("Internal Error exec index became negative");
    +            }
    +            numBacktrack++;
    +            ExecutorDetails exec = currentExec();
    +            LOG.trace("Backtracking {} {} from {}", exec, comp, workerSlot);
    +            if (okToRemoveFromWorker[execIndex]) {
    +                workerCompAssignment.get(workerSlot).remove(comp);
    +            }
    +            if (okToRemoveFromNode[execIndex]) {
    +                nodeCompAssignment.get(node).remove(comp);
    +            }
    +            node.freeSingleExecutor(exec, td);
    +        }
    +    }
    +
    +    private Map<String, RAS_Node> nodes;
    +    private Map<ExecutorDetails, String> execToComp;
    +    private Map<String, Set<ExecutorDetails>> compToExecs;
    +    private List<String> favoredNodes;
    +    private List<String> unFavoredNodes;
    +
    +    //constraints and spreads
    +    private Map<String, Map<String, Integer>> constraintMatrix;
    +    private HashSet<String> spreadComps = new HashSet<>();
    +
    +    //hard coded max number of states to search
    +    public static final int MAX_STATE_SEARCH = 100_000;
    +
    +    @Override
    +    public SchedulingResult schedule(Cluster cluster, TopologyDetails td) {
    +        prepare(cluster);
    +        LOG.debug("Scheduling {}", td.getId());
    +        nodes = RAS_Nodes.getAllNodesFrom(cluster);
    +        Map<WorkerSlot, Set<String>> workerCompAssignment = new HashMap<>();
    +        Map<RAS_Node, Set<String>> nodeCompAssignment = new HashMap<>();
    +        //set max number of states to search
    +        final int maxStateSearch = Math.min(MAX_STATE_SEARCH,
    +            ObjectReader.getInt(td.getConf().get(Config.TOPOLOGY_CONSTRAINTS_MAX_DEPTH_TRAVERSAL)));
    +
    +        final long maxTimeMs =
    +            ObjectReader.getInt(td.getConf().get(Config.TOPOLOGY_CONSTRAINTS_MAX_TIME_SECS), -1).intValue() * 1000L;
    +
    +        favoredNodes = (List<String>) td.getConf().get(Config.TOPOLOGY_SCHEDULER_FAVORED_NODES);
    +        unFavoredNodes = (List<String>) td.getConf().get(Config.TOPOLOGY_SCHEDULER_UNFAVORED_NODES);
    +
    +        //get mapping of execs to components
    +        execToComp = td.getExecutorToComponent();
    +        //get mapping of components to executors
    +        compToExecs = getCompToExecs(execToComp);
    +
    +        //get topology constraints
    +        constraintMatrix = getConstraintMap(td, compToExecs.keySet());
    +
    +        //get spread components
    +        spreadComps = getSpreadComps(td);
    +
    +        ArrayList<ExecutorDetails> sortedExecs = new ArrayList<>();
    +        //get a sorted list of unassigned executors based on number of constraints
    +        Set<ExecutorDetails> unassignedExecutors = new HashSet<>(cluster.getUnassignedExecutors(td));
    +        for (ExecutorDetails exec1 : getSortedExecs(spreadComps, constraintMatrix, compToExecs)) {
    +            if (unassignedExecutors.contains(exec1)) {
    +                sortedExecs.add(exec1);
    +            }
    +        }
    +
    +        //initialize structures
    +        for (RAS_Node node : nodes.values()) {
    +            nodeCompAssignment.put(node, new HashSet<>());
    +        }
    +        //populate with existing assignments
    +        SchedulerAssignment existingAssignment = cluster.getAssignmentById(td.getId());
    +        if (existingAssignment != null) {
    +            for (Map.Entry<ExecutorDetails, WorkerSlot> entry1 : existingAssignment.getExecutorToSlot().entrySet()) {
    +                ExecutorDetails exec1 = entry1.getKey();
    +                String compId = execToComp.get(exec1);
    +                WorkerSlot ws = entry1.getValue();
    +                RAS_Node node = nodes.get(ws.getNodeId());
    +                //populate node to component Assignments
    +                nodeCompAssignment.get(node).add(compId);
    +                //populate worker to comp assignments
    +                workerCompAssignment.computeIfAbsent(ws, (k) -> new HashSet<>()).add(compId);
    +            }
    +        }
    +
    +        //early detection/early fail
    +        if (!checkSchedulingFeasibility()) {
    +            //Scheduling Status set to FAIL_OTHER so no eviction policy will be attempted to make space for this topology
    +            return SchedulingResult.failure(SchedulingStatus.FAIL_OTHER, "Scheduling not feasible!");
    +        }
    +        return backtrackSearch(new SearcherState(workerCompAssignment, nodeCompAssignment, maxStateSearch, maxTimeMs, sortedExecs, td))
    +            .asSchedulingResult();
    +    }
    +
    +    private boolean checkSchedulingFeasibility() {
    +        for (String comp : spreadComps) {
    +            int numExecs = compToExecs.get(comp).size();
    +            if (numExecs > nodes.size()) {
    +                LOG.error("Unsatisfiable constraint: Component: {} marked as spread has {} executors which is larger "
    +                    + "than number of nodes: {}", comp, numExecs, nodes.size());
    +                return false;
    +            }
    +        }
    +        if (execToComp.size() >= MAX_STATE_SEARCH) {
    +            LOG.error("Number of executors is greater than the maximum number of states allowed to be searched.  "
    +                + "# of executors: {} Max states to search: {}", execToComp.size(), MAX_STATE_SEARCH);
    +            return false;
    +        }
    +        return true;
    +    }
    +    
    +    @Override
    +    protected TreeSet<ObjectResources> sortObjectResources(
    +        final AllResources allResources, ExecutorDetails exec, TopologyDetails topologyDetails,
    +        final ExistingScheduleFunc existingScheduleFunc) {
    +        return GenericResourceAwareStrategy.sortObjectResourcesImpl(allResources, exec, topologyDetails, existingScheduleFunc);
    +    }
    +
    +    // Backtracking algorithm does not take into account the ordering of executors in worker to reduce traversal space
    +    @VisibleForTesting
    +    protected SolverResult backtrackSearch(SearcherState state) {
    +        state.incStatesSearched();
    +        if (state.areSearchLimitsExceeded()) {
    +            LOG.warn("Limits Exceeded");
    +            return new SolverResult(state, false);
    +        }
    +
    +        ExecutorDetails exec = state.currentExec();
    +        List<ObjectResources> sortedNodes = sortAllNodes(state.td, exec, favoredNodes, unFavoredNodes);
    +
    +        for (ObjectResources nodeResources: sortedNodes) {
    +            RAS_Node node = nodes.get(nodeResources.id);
    +            for (WorkerSlot workerSlot : node.getSlotsAvailbleTo(state.td)) {
    +                if (isExecAssignmentToWorkerValid(workerSlot, state)) {
    +                    String comp = execToComp.get(exec);
    +
    +                    state.tryToSchedule(comp, node, workerSlot);
    +
    +                    if (state.areAllExecsScheduled()) {
    +                        //Everything is scheduled correctly, so no need to search any more.
    +                        return new SolverResult(state, true);
    +                    }
    +
    +                    SolverResult results = backtrackSearch(state.nextExecutor());
    +                    if (results.success) {
    +                        //We found a good result we are done.
    +                        return results;
    +                    }
    +
    +                    if (state.areSearchLimitsExceeded()) {
    +                        //No need to search more it is not going to help.
    +                        return new SolverResult(state, false);
    +                    }
    +
    +                    //backtracking (If we ever get here there really isn't a lot of hope that we will find a scheduling)
    +                    state.backtrack(comp, node, workerSlot);
    +                }
    +            }
    +        }
    +        //Tried all of the slots and none of them worked.
    +        return new SolverResult(state, false);
    +    }
    +
    +    /**
    +     * Check if any constraints are violated if exec is scheduled on worker.
    +     * @return true if scheduling exec on worker does not violate any constraints, returns false if it does
    +     */
    +    public boolean isExecAssignmentToWorkerValid(WorkerSlot worker, SearcherState state) {
    +        final ExecutorDetails exec = state.currentExec();
    +        //check resources
    +        RAS_Node node = nodes.get(worker.getNodeId());
    +        if (!node.wouldFit(worker, exec, state.td)) {
    +            LOG.trace("{} would not fit in resources available on {}", exec, worker);
    +            return false;
    +        }
    +
    +        //check if exec can be on worker based on user defined component exclusions
    +        String execComp = execToComp.get(exec);
    +        Set<String> components = state.workerCompAssignment.get(worker);
    +        if (components != null) {
    +            for (String comp : components) {
    +                if (constraintMatrix.get(execComp).get(comp) != 0) {
    +                    LOG.trace("{} found {} constraint violation {} on {}", exec, execComp, comp, worker);
    +                    return false;
    +                }
    +            }
    +        }
    +
    +        //check if exec satisfy spread
    +        if (spreadComps.contains(execComp)) {
    +            if (state.nodeCompAssignment.get(node).contains(execComp)) {
    +                LOG.trace("{} Found spread violation {} on node {}", exec, execComp, node.getId());
    +                return false;
    +            }
    +        }
    +        return true;
    +    }
    +
    +    static Map<String, Map<String, Integer>> getConstraintMap(TopologyDetails topo, Set<String> comps) {
    +        Map<String, Map<String, Integer>> matrix = new HashMap<>();
    +        for (String comp : comps) {
    +            matrix.put(comp, new HashMap<>());
    +            for (String comp2 : comps) {
    +                matrix.get(comp).put(comp2, 0);
    +            }
    +        }
    +        List<List<String>> constraints = (List<List<String>>) topo.getConf().get(Config.TOPOLOGY_CONSTRAINTS);
    +        if (constraints != null) {
    +            for (List<String> constraintPair : constraints) {
    +                String comp1 = constraintPair.get(0);
    +                String comp2 = constraintPair.get(1);
    +                if (!matrix.containsKey(comp1)) {
    +                    LOG.warn("Comp: {} declared in constraints is not valid!", comp1);
    +                    continue;
    +                }
    +                if (!matrix.containsKey(comp2)) {
    +                    LOG.warn("Comp: {} declared in constraints is not valid!", comp2);
    +                    continue;
    +                }
    +                matrix.get(comp1).put(comp2, 1);
    +                matrix.get(comp2).put(comp1, 1);
    +            }
    +        }
    +        return matrix;
    +    }
    +
    +    /**
    +     * Determines if a scheduling is valid and all constraints are satisfied.
    +     */
    +    @VisibleForTesting
    +    public static boolean validateSolution(Cluster cluster, TopologyDetails td) {
    +        return checkSpreadSchedulingValid(cluster, td)
    +            && checkConstraintsSatisfied(cluster, td)
    +            && checkResourcesCorrect(cluster, td);
    +    }
    +
    +    /**
    +     * Check if constraints are satisfied.
    +     */
    +    private static boolean checkConstraintsSatisfied(Cluster cluster, TopologyDetails topo) {
    +        LOG.info("Checking constraints...");
    +        Map<ExecutorDetails, WorkerSlot> result = cluster.getAssignmentById(topo.getId()).getExecutorToSlot();
    +        Map<ExecutorDetails, String> execToComp = topo.getExecutorToComponent();
    +        //get topology constraints
    +        Map<String, Map<String, Integer>> constraintMatrix = getConstraintMap(topo, new HashSet<>(topo.getExecutorToComponent().values()));
    +
    +        Map<WorkerSlot, List<String>> workerCompMap = new HashMap<>();
    +        for (Map.Entry<ExecutorDetails, WorkerSlot> entry : result.entrySet()) {
    +            WorkerSlot worker = entry.getValue();
    +            ExecutorDetails exec = entry.getKey();
    +            String comp = execToComp.get(exec);
    +            if (!workerCompMap.containsKey(worker)) {
    +                workerCompMap.put(worker, new LinkedList<>());
    +            }
    +            workerCompMap.get(worker).add(comp);
    +        }
    +        for (Map.Entry<WorkerSlot, List<String>> entry : workerCompMap.entrySet()) {
    +            List<String> comps = entry.getValue();
    +            for (int i = 0; i < comps.size(); i++) {
    +                for (int j = 0; j < comps.size(); j++) {
    +                    if (i != j && constraintMatrix.get(comps.get(i)).get(comps.get(j)) == 1) {
    +                        LOG.error("Incorrect Scheduling: worker exclusion for Component {} and {} not satisfied on WorkerSlot: {}",
    +                            comps.get(i), comps.get(j), entry.getKey());
    +                        return false;
    +                    }
    +                }
    +            }
    +        }
    +        return true;
    +    }
    +
    +    private static Map<WorkerSlot, RAS_Node> workerToNodes(Cluster cluster) {
    +        Map<WorkerSlot, RAS_Node> workerToNodes = new HashMap<>();
    +        for (RAS_Node node: RAS_Nodes.getAllNodesFrom(cluster).values()) {
    +            for (WorkerSlot s : node.getUsedSlots()) {
    +                workerToNodes.put(s, node);
    +            }
    +        }
    +        return workerToNodes;
    +    }
    +
    +    private static boolean checkSpreadSchedulingValid(Cluster cluster, TopologyDetails topo) {
    +        LOG.info("Checking for a valid scheduling...");
    +        Map<ExecutorDetails, WorkerSlot> result = cluster.getAssignmentById(topo.getId()).getExecutorToSlot();
    +        Map<ExecutorDetails, String> execToComp = topo.getExecutorToComponent();
    +        Map<WorkerSlot, HashSet<ExecutorDetails>> workerExecMap = new HashMap<>();
    +        Map<WorkerSlot, HashSet<String>> workerCompMap = new HashMap<>();
    +        Map<RAS_Node, HashSet<String>> nodeCompMap = new HashMap<>();
    +        Map<WorkerSlot, RAS_Node> workerToNodes = workerToNodes(cluster);
    +        boolean ret = true;
    +
    +        HashSet<String> spreadComps = getSpreadComps(topo);
    +        for (Map.Entry<ExecutorDetails, WorkerSlot> entry : result.entrySet()) {
    +            ExecutorDetails exec = entry.getKey();
    +            WorkerSlot worker = entry.getValue();
    +            RAS_Node node = workerToNodes.get(worker);
    +
    +            if (!workerExecMap.containsKey(worker)) {
    +                workerExecMap.put(worker, new HashSet<>());
    +                workerCompMap.put(worker, new HashSet<>());
    +            }
    +
    +            if (!nodeCompMap.containsKey(node)) {
    +                nodeCompMap.put(node, new HashSet<>());
    +            }
    +            if (workerExecMap.get(worker).contains(exec)) {
    +                LOG.error("Incorrect Scheduling: Found duplicate in scheduling");
    +                return false;
    +            }
    +            workerExecMap.get(worker).add(exec);
    +            String comp = execToComp.get(exec);
    +            workerCompMap.get(worker).add(comp);
    +            if (spreadComps.contains(comp)) {
    +                if (nodeCompMap.get(node).contains(comp)) {
    +                    LOG.error("Incorrect Scheduling: Spread for Component: {} {} on node {} not satisfied {}",
    +                        comp, exec, node.getId(), nodeCompMap.get(node));
    +                    ret = false;
    +                }
    +            }
    +            nodeCompMap.get(node).add(comp);
    +        }
    +        return ret;
    +    }
    +
    +    /**
    +     * Check if resource constraints satisfied.
    +     */
    +    private static boolean checkResourcesCorrect(Cluster cluster, TopologyDetails topo) {
    +        LOG.info("Checking Resources...");
    +        Map<ExecutorDetails, WorkerSlot> result = cluster.getAssignmentById(topo.getId()).getExecutorToSlot();
    +        Map<RAS_Node, Collection<ExecutorDetails>> nodeToExecs = new HashMap<>();
    +        Map<ExecutorDetails, WorkerSlot> mergedExecToWorker = new HashMap<>();
    +        Map<String, RAS_Node> nodes = RAS_Nodes.getAllNodesFrom(cluster);
    +        //merge with existing assignments
    +        if (cluster.getAssignmentById(topo.getId()) != null
    +                && cluster.getAssignmentById(topo.getId()).getExecutorToSlot() != null) {
    +            mergedExecToWorker.putAll(cluster.getAssignmentById(topo.getId()).getExecutorToSlot());
    +        }
    +        mergedExecToWorker.putAll(result);
    +
    +        for (Map.Entry<ExecutorDetails, WorkerSlot> entry : mergedExecToWorker.entrySet()) {
    +            ExecutorDetails exec = entry.getKey();
    +            WorkerSlot worker = entry.getValue();
    +            RAS_Node node = nodes.get(worker.getNodeId());
    +
    +            if (node.getAvailableMemoryResources() < 0.0 && node.getAvailableCpuResources() < 0.0) {
    +                LOG.error("Incorrect Scheduling: found node with negative available resources");
    +                return false;
    +            }
    +            if (!nodeToExecs.containsKey(node)) {
    +                nodeToExecs.put(node, new LinkedList<>());
    +            }
    +            nodeToExecs.get(node).add(exec);
    +        }
    +
    +        for (Map.Entry<RAS_Node, Collection<ExecutorDetails>> entry : nodeToExecs.entrySet()) {
    +            RAS_Node node = entry.getKey();
    +            Collection<ExecutorDetails> execs = entry.getValue();
    +            double cpuUsed = 0.0;
    +            double memoryUsed = 0.0;
    +            for (ExecutorDetails exec : execs) {
    +                cpuUsed += topo.getTotalCpuReqTask(exec);
    +                memoryUsed += topo.getTotalMemReqTask(exec);
    +            }
    +            if (node.getAvailableCpuResources() != (node.getTotalCpuResources() - cpuUsed)) {
    +                LOG.error("Incorrect Scheduling: node {} has consumed incorrect amount of cpu. Expected: {}"
    +                        + " Actual: {} Executors scheduled on node: {}",
    +                        node.getId(), (node.getTotalCpuResources() - cpuUsed), node.getAvailableCpuResources(), execs);
    +                return false;
    +            }
    +            if (node.getAvailableMemoryResources() != (node.getTotalMemoryResources() - memoryUsed)) {
    +                LOG.error("Incorrect Scheduling: node {} has consumed incorrect amount of memory. Expected: {}"
    +                        + " Actual: {} Executors scheduled on node: {}",
    +                        node.getId(), (node.getTotalMemoryResources() - memoryUsed), node.getAvailableMemoryResources(), execs);
    +                return false;
    +            }
    +        }
    +        return true;
    +    }
    +
    +    private Map<String, Set<ExecutorDetails>> getCompToExecs(Map<ExecutorDetails, String> executorToComp) {
    +        Map<String, Set<ExecutorDetails>> retMap = new HashMap<>();
    +        for (Map.Entry<ExecutorDetails, String> entry : executorToComp.entrySet()) {
    +            ExecutorDetails exec = entry.getKey();
    +            String comp = entry.getValue();
    +            if (!retMap.containsKey(comp)) {
    +                retMap.put(comp, new HashSet<>());
    +            }
    +            retMap.get(comp).add(exec);
    +        }
    +        return retMap;
    +    }
    +
    +    private ArrayList<ExecutorDetails> getSortedExecs(HashSet<String> spreadComps, Map<String, Map<String, Integer>> constraintMatrix,
    +                                                      Map<String, Set<ExecutorDetails>> compToExecs) {
    +        ArrayList<ExecutorDetails> retList = new ArrayList<>();
    +        //find number of constraints per component
    +        //Key->Comp Value-># of constraints
    +        Map<String, Integer> compConstraintCountMap = new HashMap<>();
    +        for (Map.Entry<String, Map<String, Integer>> constraintEntry1 : constraintMatrix.entrySet()) {
    +            int count = 0;
    +            String comp = constraintEntry1.getKey();
    +            for (Map.Entry<String, Integer> constraintEntry2 : constraintEntry1.getValue().entrySet()) {
    +                if (constraintEntry2.getValue() == 1) {
    +                    count++;
    +                }
    +            }
    +            //check component is declared for spreading
    +            if (spreadComps.contains(constraintEntry1.getKey())) {
    +                count++;
    +            }
    +            compConstraintCountMap.put(comp, count);
    +        }
    --- End diff --
    
    Is there a more java8 way of doing this?


---

[GitHub] storm pull request #2442: STORM-2837: ConstraintSolverStrategy

Posted by jerrypeng <gi...@git.apache.org>.
Github user jerrypeng commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2442#discussion_r157677523
  
    --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/multitenant/NodePool.java ---
    @@ -98,7 +98,7 @@ public RoundRobinSlotScheduler(TopologyDetails td, int slotsToUse,
           }
           
           _spreadToSchedule = new HashMap<>();
    -      List<String> spreadComps = (List<String>)td.getConf().get(Config.TOPOLOGY_SPREAD_COMPONENTS);
    +      List<String> spreadComps = (List<String>)td.getConf().get(Config.TOPOLOGY_RAS_CONSTRAINT_SPREAD_COMPONENTS);
    --- End diff --
    
    I don't know about changing TOPOLOGY_SPREAD_COMPONENTS to TOPOLOGY_RAS_CONSTRAINT_SPREAD_COMPONENTS.  It seems kind of weird to me that the multitenant scheduler would have a config that references RAS.


---

[GitHub] storm pull request #2442: STORM-2837: ConstraintSolverStrategy

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2442#discussion_r154194294
  
    --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java ---
    @@ -0,0 +1,623 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.scheduler.resource.strategies.scheduling;
    +
    +import com.google.common.annotations.VisibleForTesting;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Comparator;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.LinkedHashMap;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.NavigableMap;
    +import java.util.Set;
    +import java.util.Stack;
    +import java.util.TreeMap;
    +import java.util.TreeSet;
    +import org.apache.storm.Config;
    +import org.apache.storm.scheduler.Cluster;
    +import org.apache.storm.scheduler.ExecutorDetails;
    +import org.apache.storm.scheduler.SchedulerAssignment;
    +import org.apache.storm.scheduler.TopologyDetails;
    +import org.apache.storm.scheduler.WorkerSlot;
    +import org.apache.storm.scheduler.resource.RAS_Node;
    +import org.apache.storm.scheduler.resource.RAS_Nodes;
    +import org.apache.storm.scheduler.resource.SchedulingResult;
    +import org.apache.storm.scheduler.resource.SchedulingStatus;
    +import org.apache.storm.utils.ObjectReader;
    +import org.apache.storm.utils.Time;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +public class ConstraintSolverStrategy extends BaseResourceAwareStrategy {
    +    private static final Logger LOG = LoggerFactory.getLogger(ConstraintSolverStrategy.class);
    +
    +    protected static class SolverResult {
    +        private final int statesSearched;
    +        private final boolean success;
    +        private final long timeTakenMillis;
    +        private final int backtracked;
    +
    +        public SolverResult(SearcherState state, boolean success) {
    +            this.statesSearched = state.getStatesSearched();
    +            this.success = success;
    +            timeTakenMillis = Time.currentTimeMillis() - state.startTimeMillis;
    +            backtracked = state.numBacktrack;
    +        }
    +
    +        public SchedulingResult asSchedulingResult() {
    +            if (success) {
    +                return SchedulingResult.success("Fully Scheduled by ConstraintSolverStrategy (" + statesSearched
    +                    + " states traversed in " + timeTakenMillis + "ms, backtracked " + backtracked + " times)");
    +            }
    +            return SchedulingResult.failure(SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES,
    +                "Cannot find scheduling that satisfies all constraints (" + statesSearched
    +                    + " states traversed in " + timeTakenMillis + "ms, backtracked " + backtracked + " times)");
    +        }
    +    }
    +
    +    protected static class SearcherState {
    +        // Metrics
    +        // How many states searched so far.
    +        private int statesSearched = 0;
    +        // Number of times we had to backtrack.
    +        private int numBacktrack = 0;
    +        final long startTimeMillis;
    +        private final long maxEndTimeMs;
    +
    +        // Current state
    +        // The current executor we are trying to schedule
    +        private int execIndex = 0;
    +        // A map of the worker to the components in the worker to be able to enforce constraints.
    +        private final Map<WorkerSlot, Set<String>> workerCompAssignment;
    +        private final boolean[] okToRemoveFromWorker;
    +        // for the currently tested assignment a Map of the node to the components on it to be able to enforce constraints
    +        private final Map<RAS_Node, Set<String>> nodeCompAssignment;
    +        private final boolean[] okToRemoveFromNode;
    +
    +        // Static State
    +        // The list of all executors (preferably sorted to make assignments simpler).
    +        private final List<ExecutorDetails> execs;
    +        //The maximum number of state to search before stopping.
    +        private final int maxStatesSearched;
    +        //The topology we are scheduling
    +        private final TopologyDetails td;
    +
    +        private SearcherState(Map<WorkerSlot, Set<String>> workerCompAssignment, Map<RAS_Node, Set<String>> nodeCompAssignment,
    +                              int maxStatesSearched, long maxTimeMs, List<ExecutorDetails> execs, TopologyDetails td) {
    +            assert !execs.isEmpty();
    +            assert execs != null;
    +
    +            this.workerCompAssignment = workerCompAssignment;
    +            this.nodeCompAssignment = nodeCompAssignment;
    +            this.maxStatesSearched = maxStatesSearched;
    +            this.execs = execs;
    +            okToRemoveFromWorker = new boolean[execs.size()];
    +            okToRemoveFromNode = new boolean[execs.size()];
    +            this.td = td;
    +            startTimeMillis = Time.currentTimeMillis();
    +            if (maxTimeMs <= 0) {
    +                maxEndTimeMs = Long.MAX_VALUE;
    +            } else {
    +                maxEndTimeMs = startTimeMillis + maxTimeMs;
    +            }
    +        }
    +
    +        public void incStatesSearched() {
    +            statesSearched++;
    +            if (LOG.isDebugEnabled() && statesSearched % 1_000 == 0) {
    +                LOG.debug("States Searched: {}", statesSearched);
    +                LOG.debug("backtrack: {}", numBacktrack);
    +            }
    +        }
    +
    +        public int getStatesSearched() {
    +            return statesSearched;
    +        }
    +
    +        public boolean areSearchLimitsExceeded() {
    +            return statesSearched > maxStatesSearched || Time.currentTimeMillis() > maxEndTimeMs;
    +        }
    +
    +        public SearcherState nextExecutor() {
    +            execIndex++;
    +            if (execIndex >= execs.size()) {
    +                throw new IllegalStateException("Exceeded the exec limit " + execIndex + " >= " + execs.size());
    --- End diff --
    
    This in an "Internal Error" too


---

[GitHub] storm issue #2442: STORM-2837: ConstraintSolverStrategy

Posted by jerrypeng <gi...@git.apache.org>.
Github user jerrypeng commented on the issue:

    https://github.com/apache/storm/pull/2442
  
    In terms of the configs, we may want to explore splitting the Configs into multiple pieces.  Config.java has become a monolith in a sense. Configs.java in the storm-client module should probably only contain configs users can set in their topology.


---

[GitHub] storm issue #2442: STORM-2837: ConstraintSolverStrategy

Posted by jerrypeng <gi...@git.apache.org>.
Github user jerrypeng commented on the issue:

    https://github.com/apache/storm/pull/2442
  
    Generally look good just some minor comments.  @revans2 you can also implement a version of the algorithm using heap space in the JVM instead of stack stack space so you wouldn't need to tinker with the -Xss settings.  May simplify operations.


---

[GitHub] storm pull request #2442: STORM-2837: ConstraintSolverStrategy

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2442#discussion_r154193911
  
    --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java ---
    @@ -0,0 +1,623 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.scheduler.resource.strategies.scheduling;
    +
    +import com.google.common.annotations.VisibleForTesting;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Comparator;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.LinkedHashMap;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.NavigableMap;
    +import java.util.Set;
    +import java.util.Stack;
    +import java.util.TreeMap;
    +import java.util.TreeSet;
    +import org.apache.storm.Config;
    +import org.apache.storm.scheduler.Cluster;
    +import org.apache.storm.scheduler.ExecutorDetails;
    +import org.apache.storm.scheduler.SchedulerAssignment;
    +import org.apache.storm.scheduler.TopologyDetails;
    +import org.apache.storm.scheduler.WorkerSlot;
    +import org.apache.storm.scheduler.resource.RAS_Node;
    +import org.apache.storm.scheduler.resource.RAS_Nodes;
    +import org.apache.storm.scheduler.resource.SchedulingResult;
    +import org.apache.storm.scheduler.resource.SchedulingStatus;
    +import org.apache.storm.utils.ObjectReader;
    +import org.apache.storm.utils.Time;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +public class ConstraintSolverStrategy extends BaseResourceAwareStrategy {
    +    private static final Logger LOG = LoggerFactory.getLogger(ConstraintSolverStrategy.class);
    +
    +    protected static class SolverResult {
    +        private final int statesSearched;
    +        private final boolean success;
    +        private final long timeTakenMillis;
    +        private final int backtracked;
    +
    +        public SolverResult(SearcherState state, boolean success) {
    +            this.statesSearched = state.getStatesSearched();
    +            this.success = success;
    +            timeTakenMillis = Time.currentTimeMillis() - state.startTimeMillis;
    +            backtracked = state.numBacktrack;
    +        }
    +
    +        public SchedulingResult asSchedulingResult() {
    +            if (success) {
    +                return SchedulingResult.success("Fully Scheduled by ConstraintSolverStrategy (" + statesSearched
    +                    + " states traversed in " + timeTakenMillis + "ms, backtracked " + backtracked + " times)");
    +            }
    +            return SchedulingResult.failure(SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES,
    +                "Cannot find scheduling that satisfies all constraints (" + statesSearched
    +                    + " states traversed in " + timeTakenMillis + "ms, backtracked " + backtracked + " times)");
    +        }
    +    }
    +
    +    protected static class SearcherState {
    +        // Metrics
    +        // How many states searched so far.
    +        private int statesSearched = 0;
    +        // Number of times we had to backtrack.
    +        private int numBacktrack = 0;
    +        final long startTimeMillis;
    +        private final long maxEndTimeMs;
    +
    +        // Current state
    +        // The current executor we are trying to schedule
    +        private int execIndex = 0;
    +        // A map of the worker to the components in the worker to be able to enforce constraints.
    +        private final Map<WorkerSlot, Set<String>> workerCompAssignment;
    +        private final boolean[] okToRemoveFromWorker;
    +        // for the currently tested assignment a Map of the node to the components on it to be able to enforce constraints
    +        private final Map<RAS_Node, Set<String>> nodeCompAssignment;
    +        private final boolean[] okToRemoveFromNode;
    +
    +        // Static State
    +        // The list of all executors (preferably sorted to make assignments simpler).
    +        private final List<ExecutorDetails> execs;
    +        //The maximum number of state to search before stopping.
    +        private final int maxStatesSearched;
    +        //The topology we are scheduling
    +        private final TopologyDetails td;
    +
    +        private SearcherState(Map<WorkerSlot, Set<String>> workerCompAssignment, Map<RAS_Node, Set<String>> nodeCompAssignment,
    +                              int maxStatesSearched, long maxTimeMs, List<ExecutorDetails> execs, TopologyDetails td) {
    +            assert !execs.isEmpty();
    +            assert execs != null;
    +
    +            this.workerCompAssignment = workerCompAssignment;
    +            this.nodeCompAssignment = nodeCompAssignment;
    +            this.maxStatesSearched = maxStatesSearched;
    +            this.execs = execs;
    +            okToRemoveFromWorker = new boolean[execs.size()];
    +            okToRemoveFromNode = new boolean[execs.size()];
    +            this.td = td;
    +            startTimeMillis = Time.currentTimeMillis();
    +            if (maxTimeMs <= 0) {
    +                maxEndTimeMs = Long.MAX_VALUE;
    +            } else {
    +                maxEndTimeMs = startTimeMillis + maxTimeMs;
    +            }
    +        }
    +
    +        public void incStatesSearched() {
    +            statesSearched++;
    +            if (LOG.isDebugEnabled() && statesSearched % 1_000 == 0) {
    +                LOG.debug("States Searched: {}", statesSearched);
    +                LOG.debug("backtrack: {}", numBacktrack);
    +            }
    +        }
    +
    +        public int getStatesSearched() {
    +            return statesSearched;
    +        }
    +
    +        public boolean areSearchLimitsExceeded() {
    +            return statesSearched > maxStatesSearched || Time.currentTimeMillis() > maxEndTimeMs;
    +        }
    +
    +        public SearcherState nextExecutor() {
    +            execIndex++;
    +            if (execIndex >= execs.size()) {
    +                throw new IllegalStateException("Exceeded the exec limit " + execIndex + " >= " + execs.size());
    +            }
    +            return this;
    +        }
    +
    +        public boolean areAllExecsScheduled() {
    +            return execIndex == execs.size() - 1;
    +        }
    +
    +        public ExecutorDetails currentExec() {
    +            return execs.get(execIndex);
    +        }
    +
    +        public void tryToSchedule(String comp, RAS_Node node, WorkerSlot workerSlot) {
    +            ExecutorDetails exec = currentExec();
    +            LOG.trace("Trying assignment of {} {} to {}", exec, comp, workerSlot);
    +            //It is possible that this component is already scheduled on this node or worker.  If so when we backtrack we cannot remove it
    +            okToRemoveFromWorker[execIndex] = workerCompAssignment.computeIfAbsent(workerSlot, (k) -> new HashSet<>()).add(comp);
    +            okToRemoveFromNode[execIndex] = nodeCompAssignment.get(node).add(comp);
    +            node.assignSingleExecutor(workerSlot, exec, td);
    +        }
    +
    +        public void backtrack(String comp, RAS_Node node, WorkerSlot workerSlot) {
    +            execIndex--;
    +            if (execIndex < 0) {
    +                throw new IllegalStateException("Internal Error exec index became negative");
    +            }
    +            numBacktrack++;
    +            ExecutorDetails exec = currentExec();
    +            LOG.trace("Backtracking {} {} from {}", exec, comp, workerSlot);
    +            if (okToRemoveFromWorker[execIndex]) {
    +                workerCompAssignment.get(workerSlot).remove(comp);
    +            }
    +            if (okToRemoveFromNode[execIndex]) {
    --- End diff --
    
    As defensive we may want to reset this back to false.


---

[GitHub] storm issue #2442: STORM-2837: ConstraintSolverStrategy

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/2442
  
    @revans2 No problem. Please take your time.


---

[GitHub] storm pull request #2442: STORM-2837: ConstraintSolverStrategy

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2442#discussion_r154194951
  
    --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java ---
    @@ -0,0 +1,623 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.scheduler.resource.strategies.scheduling;
    +
    +import com.google.common.annotations.VisibleForTesting;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Comparator;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.LinkedHashMap;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.NavigableMap;
    +import java.util.Set;
    +import java.util.Stack;
    +import java.util.TreeMap;
    +import java.util.TreeSet;
    +import org.apache.storm.Config;
    +import org.apache.storm.scheduler.Cluster;
    +import org.apache.storm.scheduler.ExecutorDetails;
    +import org.apache.storm.scheduler.SchedulerAssignment;
    +import org.apache.storm.scheduler.TopologyDetails;
    +import org.apache.storm.scheduler.WorkerSlot;
    +import org.apache.storm.scheduler.resource.RAS_Node;
    +import org.apache.storm.scheduler.resource.RAS_Nodes;
    +import org.apache.storm.scheduler.resource.SchedulingResult;
    +import org.apache.storm.scheduler.resource.SchedulingStatus;
    +import org.apache.storm.utils.ObjectReader;
    +import org.apache.storm.utils.Time;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +public class ConstraintSolverStrategy extends BaseResourceAwareStrategy {
    +    private static final Logger LOG = LoggerFactory.getLogger(ConstraintSolverStrategy.class);
    +
    +    protected static class SolverResult {
    +        private final int statesSearched;
    +        private final boolean success;
    +        private final long timeTakenMillis;
    +        private final int backtracked;
    +
    +        public SolverResult(SearcherState state, boolean success) {
    +            this.statesSearched = state.getStatesSearched();
    +            this.success = success;
    +            timeTakenMillis = Time.currentTimeMillis() - state.startTimeMillis;
    +            backtracked = state.numBacktrack;
    +        }
    +
    +        public SchedulingResult asSchedulingResult() {
    +            if (success) {
    +                return SchedulingResult.success("Fully Scheduled by ConstraintSolverStrategy (" + statesSearched
    +                    + " states traversed in " + timeTakenMillis + "ms, backtracked " + backtracked + " times)");
    +            }
    +            return SchedulingResult.failure(SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES,
    +                "Cannot find scheduling that satisfies all constraints (" + statesSearched
    +                    + " states traversed in " + timeTakenMillis + "ms, backtracked " + backtracked + " times)");
    +        }
    +    }
    +
    +    protected static class SearcherState {
    +        // Metrics
    +        // How many states searched so far.
    +        private int statesSearched = 0;
    +        // Number of times we had to backtrack.
    +        private int numBacktrack = 0;
    +        final long startTimeMillis;
    +        private final long maxEndTimeMs;
    +
    +        // Current state
    +        // The current executor we are trying to schedule
    +        private int execIndex = 0;
    +        // A map of the worker to the components in the worker to be able to enforce constraints.
    +        private final Map<WorkerSlot, Set<String>> workerCompAssignment;
    +        private final boolean[] okToRemoveFromWorker;
    +        // for the currently tested assignment a Map of the node to the components on it to be able to enforce constraints
    +        private final Map<RAS_Node, Set<String>> nodeCompAssignment;
    +        private final boolean[] okToRemoveFromNode;
    +
    +        // Static State
    +        // The list of all executors (preferably sorted to make assignments simpler).
    +        private final List<ExecutorDetails> execs;
    +        //The maximum number of state to search before stopping.
    +        private final int maxStatesSearched;
    +        //The topology we are scheduling
    +        private final TopologyDetails td;
    +
    +        private SearcherState(Map<WorkerSlot, Set<String>> workerCompAssignment, Map<RAS_Node, Set<String>> nodeCompAssignment,
    +                              int maxStatesSearched, long maxTimeMs, List<ExecutorDetails> execs, TopologyDetails td) {
    +            assert !execs.isEmpty();
    +            assert execs != null;
    +
    +            this.workerCompAssignment = workerCompAssignment;
    +            this.nodeCompAssignment = nodeCompAssignment;
    +            this.maxStatesSearched = maxStatesSearched;
    +            this.execs = execs;
    +            okToRemoveFromWorker = new boolean[execs.size()];
    +            okToRemoveFromNode = new boolean[execs.size()];
    +            this.td = td;
    +            startTimeMillis = Time.currentTimeMillis();
    +            if (maxTimeMs <= 0) {
    +                maxEndTimeMs = Long.MAX_VALUE;
    +            } else {
    +                maxEndTimeMs = startTimeMillis + maxTimeMs;
    +            }
    +        }
    +
    +        public void incStatesSearched() {
    +            statesSearched++;
    +            if (LOG.isDebugEnabled() && statesSearched % 1_000 == 0) {
    +                LOG.debug("States Searched: {}", statesSearched);
    +                LOG.debug("backtrack: {}", numBacktrack);
    +            }
    +        }
    +
    +        public int getStatesSearched() {
    +            return statesSearched;
    +        }
    +
    +        public boolean areSearchLimitsExceeded() {
    +            return statesSearched > maxStatesSearched || Time.currentTimeMillis() > maxEndTimeMs;
    +        }
    +
    +        public SearcherState nextExecutor() {
    +            execIndex++;
    +            if (execIndex >= execs.size()) {
    +                throw new IllegalStateException("Exceeded the exec limit " + execIndex + " >= " + execs.size());
    +            }
    +            return this;
    +        }
    +
    +        public boolean areAllExecsScheduled() {
    +            return execIndex == execs.size() - 1;
    +        }
    +
    +        public ExecutorDetails currentExec() {
    +            return execs.get(execIndex);
    +        }
    +
    +        public void tryToSchedule(String comp, RAS_Node node, WorkerSlot workerSlot) {
    +            ExecutorDetails exec = currentExec();
    +            LOG.trace("Trying assignment of {} {} to {}", exec, comp, workerSlot);
    +            //It is possible that this component is already scheduled on this node or worker.  If so when we backtrack we cannot remove it
    +            okToRemoveFromWorker[execIndex] = workerCompAssignment.computeIfAbsent(workerSlot, (k) -> new HashSet<>()).add(comp);
    +            okToRemoveFromNode[execIndex] = nodeCompAssignment.get(node).add(comp);
    +            node.assignSingleExecutor(workerSlot, exec, td);
    +        }
    +
    +        public void backtrack(String comp, RAS_Node node, WorkerSlot workerSlot) {
    +            execIndex--;
    +            if (execIndex < 0) {
    +                throw new IllegalStateException("Internal Error exec index became negative");
    +            }
    +            numBacktrack++;
    +            ExecutorDetails exec = currentExec();
    +            LOG.trace("Backtracking {} {} from {}", exec, comp, workerSlot);
    +            if (okToRemoveFromWorker[execIndex]) {
    +                workerCompAssignment.get(workerSlot).remove(comp);
    +            }
    +            if (okToRemoveFromNode[execIndex]) {
    +                nodeCompAssignment.get(node).remove(comp);
    +            }
    +            node.freeSingleExecutor(exec, td);
    +        }
    +    }
    +
    +    private Map<String, RAS_Node> nodes;
    +    private Map<ExecutorDetails, String> execToComp;
    +    private Map<String, Set<ExecutorDetails>> compToExecs;
    +    private List<String> favoredNodes;
    +    private List<String> unFavoredNodes;
    +
    +    //constraints and spreads
    +    private Map<String, Map<String, Integer>> constraintMatrix;
    +    private HashSet<String> spreadComps = new HashSet<>();
    +
    +    //hard coded max number of states to search
    +    public static final int MAX_STATE_SEARCH = 100_000;
    +
    +    @Override
    +    public SchedulingResult schedule(Cluster cluster, TopologyDetails td) {
    +        prepare(cluster);
    +        LOG.debug("Scheduling {}", td.getId());
    +        nodes = RAS_Nodes.getAllNodesFrom(cluster);
    +        Map<WorkerSlot, Set<String>> workerCompAssignment = new HashMap<>();
    +        Map<RAS_Node, Set<String>> nodeCompAssignment = new HashMap<>();
    +        //set max number of states to search
    +        final int maxStateSearch = Math.min(MAX_STATE_SEARCH,
    +            ObjectReader.getInt(td.getConf().get(Config.TOPOLOGY_CONSTRAINTS_MAX_DEPTH_TRAVERSAL)));
    +
    +        final long maxTimeMs =
    +            ObjectReader.getInt(td.getConf().get(Config.TOPOLOGY_CONSTRAINTS_MAX_TIME_SECS), -1).intValue() * 1000L;
    +
    +        favoredNodes = (List<String>) td.getConf().get(Config.TOPOLOGY_SCHEDULER_FAVORED_NODES);
    +        unFavoredNodes = (List<String>) td.getConf().get(Config.TOPOLOGY_SCHEDULER_UNFAVORED_NODES);
    +
    +        //get mapping of execs to components
    +        execToComp = td.getExecutorToComponent();
    +        //get mapping of components to executors
    +        compToExecs = getCompToExecs(execToComp);
    +
    +        //get topology constraints
    +        constraintMatrix = getConstraintMap(td, compToExecs.keySet());
    +
    +        //get spread components
    +        spreadComps = getSpreadComps(td);
    +
    +        ArrayList<ExecutorDetails> sortedExecs = new ArrayList<>();
    +        //get a sorted list of unassigned executors based on number of constraints
    +        Set<ExecutorDetails> unassignedExecutors = new HashSet<>(cluster.getUnassignedExecutors(td));
    +        for (ExecutorDetails exec1 : getSortedExecs(spreadComps, constraintMatrix, compToExecs)) {
    +            if (unassignedExecutors.contains(exec1)) {
    +                sortedExecs.add(exec1);
    +            }
    +        }
    +
    +        //initialize structures
    +        for (RAS_Node node : nodes.values()) {
    +            nodeCompAssignment.put(node, new HashSet<>());
    +        }
    +        //populate with existing assignments
    +        SchedulerAssignment existingAssignment = cluster.getAssignmentById(td.getId());
    +        if (existingAssignment != null) {
    +            for (Map.Entry<ExecutorDetails, WorkerSlot> entry1 : existingAssignment.getExecutorToSlot().entrySet()) {
    +                ExecutorDetails exec1 = entry1.getKey();
    +                String compId = execToComp.get(exec1);
    +                WorkerSlot ws = entry1.getValue();
    +                RAS_Node node = nodes.get(ws.getNodeId());
    +                //populate node to component Assignments
    +                nodeCompAssignment.get(node).add(compId);
    +                //populate worker to comp assignments
    +                workerCompAssignment.computeIfAbsent(ws, (k) -> new HashSet<>()).add(compId);
    +            }
    +        }
    +
    +        //early detection/early fail
    +        if (!checkSchedulingFeasibility()) {
    +            //Scheduling Status set to FAIL_OTHER so no eviction policy will be attempted to make space for this topology
    +            return SchedulingResult.failure(SchedulingStatus.FAIL_OTHER, "Scheduling not feasible!");
    +        }
    +        return backtrackSearch(new SearcherState(workerCompAssignment, nodeCompAssignment, maxStateSearch, maxTimeMs, sortedExecs, td))
    +            .asSchedulingResult();
    +    }
    +
    +    private boolean checkSchedulingFeasibility() {
    +        for (String comp : spreadComps) {
    +            int numExecs = compToExecs.get(comp).size();
    +            if (numExecs > nodes.size()) {
    +                LOG.error("Unsatisfiable constraint: Component: {} marked as spread has {} executors which is larger "
    +                    + "than number of nodes: {}", comp, numExecs, nodes.size());
    +                return false;
    +            }
    +        }
    +        if (execToComp.size() >= MAX_STATE_SEARCH) {
    +            LOG.error("Number of executors is greater than the maximum number of states allowed to be searched.  "
    +                + "# of executors: {} Max states to search: {}", execToComp.size(), MAX_STATE_SEARCH);
    +            return false;
    +        }
    +        return true;
    +    }
    +    
    +    @Override
    +    protected TreeSet<ObjectResources> sortObjectResources(
    +        final AllResources allResources, ExecutorDetails exec, TopologyDetails topologyDetails,
    +        final ExistingScheduleFunc existingScheduleFunc) {
    +        return GenericResourceAwareStrategy.sortObjectResourcesImpl(allResources, exec, topologyDetails, existingScheduleFunc);
    +    }
    +
    +    // Backtracking algorithm does not take into account the ordering of executors in worker to reduce traversal space
    +    @VisibleForTesting
    +    protected SolverResult backtrackSearch(SearcherState state) {
    +        state.incStatesSearched();
    +        if (state.areSearchLimitsExceeded()) {
    +            LOG.warn("Limits Exceeded");
    +            return new SolverResult(state, false);
    +        }
    +
    +        ExecutorDetails exec = state.currentExec();
    +        List<ObjectResources> sortedNodes = sortAllNodes(state.td, exec, favoredNodes, unFavoredNodes);
    +
    +        for (ObjectResources nodeResources: sortedNodes) {
    +            RAS_Node node = nodes.get(nodeResources.id);
    +            for (WorkerSlot workerSlot : node.getSlotsAvailbleTo(state.td)) {
    +                if (isExecAssignmentToWorkerValid(workerSlot, state)) {
    +                    String comp = execToComp.get(exec);
    +
    +                    state.tryToSchedule(comp, node, workerSlot);
    +
    +                    if (state.areAllExecsScheduled()) {
    +                        //Everything is scheduled correctly, so no need to search any more.
    +                        return new SolverResult(state, true);
    +                    }
    +
    +                    SolverResult results = backtrackSearch(state.nextExecutor());
    +                    if (results.success) {
    +                        //We found a good result we are done.
    +                        return results;
    +                    }
    +
    +                    if (state.areSearchLimitsExceeded()) {
    +                        //No need to search more it is not going to help.
    +                        return new SolverResult(state, false);
    +                    }
    +
    +                    //backtracking (If we ever get here there really isn't a lot of hope that we will find a scheduling)
    +                    state.backtrack(comp, node, workerSlot);
    +                }
    +            }
    +        }
    +        //Tried all of the slots and none of them worked.
    +        return new SolverResult(state, false);
    +    }
    +
    +    /**
    +     * Check if any constraints are violated if exec is scheduled on worker.
    +     * @return true if scheduling exec on worker does not violate any constraints, returns false if it does
    +     */
    +    public boolean isExecAssignmentToWorkerValid(WorkerSlot worker, SearcherState state) {
    +        final ExecutorDetails exec = state.currentExec();
    +        //check resources
    +        RAS_Node node = nodes.get(worker.getNodeId());
    +        if (!node.wouldFit(worker, exec, state.td)) {
    +            LOG.trace("{} would not fit in resources available on {}", exec, worker);
    +            return false;
    +        }
    +
    +        //check if exec can be on worker based on user defined component exclusions
    +        String execComp = execToComp.get(exec);
    +        Set<String> components = state.workerCompAssignment.get(worker);
    +        if (components != null) {
    +            for (String comp : components) {
    +                if (constraintMatrix.get(execComp).get(comp) != 0) {
    +                    LOG.trace("{} found {} constraint violation {} on {}", exec, execComp, comp, worker);
    +                    return false;
    +                }
    +            }
    +        }
    +
    +        //check if exec satisfy spread
    +        if (spreadComps.contains(execComp)) {
    +            if (state.nodeCompAssignment.get(node).contains(execComp)) {
    +                LOG.trace("{} Found spread violation {} on node {}", exec, execComp, node.getId());
    +                return false;
    +            }
    +        }
    +        return true;
    +    }
    +
    +    static Map<String, Map<String, Integer>> getConstraintMap(TopologyDetails topo, Set<String> comps) {
    +        Map<String, Map<String, Integer>> matrix = new HashMap<>();
    +        for (String comp : comps) {
    +            matrix.put(comp, new HashMap<>());
    +            for (String comp2 : comps) {
    +                matrix.get(comp).put(comp2, 0);
    +            }
    +        }
    +        List<List<String>> constraints = (List<List<String>>) topo.getConf().get(Config.TOPOLOGY_CONSTRAINTS);
    +        if (constraints != null) {
    +            for (List<String> constraintPair : constraints) {
    +                String comp1 = constraintPair.get(0);
    +                String comp2 = constraintPair.get(1);
    +                if (!matrix.containsKey(comp1)) {
    +                    LOG.warn("Comp: {} declared in constraints is not valid!", comp1);
    +                    continue;
    +                }
    +                if (!matrix.containsKey(comp2)) {
    +                    LOG.warn("Comp: {} declared in constraints is not valid!", comp2);
    +                    continue;
    +                }
    +                matrix.get(comp1).put(comp2, 1);
    +                matrix.get(comp2).put(comp1, 1);
    +            }
    +        }
    +        return matrix;
    +    }
    +
    +    /**
    +     * Determines if a scheduling is valid and all constraints are satisfied.
    +     */
    +    @VisibleForTesting
    +    public static boolean validateSolution(Cluster cluster, TopologyDetails td) {
    +        return checkSpreadSchedulingValid(cluster, td)
    +            && checkConstraintsSatisfied(cluster, td)
    +            && checkResourcesCorrect(cluster, td);
    +    }
    +
    +    /**
    +     * Check if constraints are satisfied.
    +     */
    +    private static boolean checkConstraintsSatisfied(Cluster cluster, TopologyDetails topo) {
    +        LOG.info("Checking constraints...");
    +        Map<ExecutorDetails, WorkerSlot> result = cluster.getAssignmentById(topo.getId()).getExecutorToSlot();
    +        Map<ExecutorDetails, String> execToComp = topo.getExecutorToComponent();
    +        //get topology constraints
    +        Map<String, Map<String, Integer>> constraintMatrix = getConstraintMap(topo, new HashSet<>(topo.getExecutorToComponent().values()));
    +
    +        Map<WorkerSlot, List<String>> workerCompMap = new HashMap<>();
    +        for (Map.Entry<ExecutorDetails, WorkerSlot> entry : result.entrySet()) {
    +            WorkerSlot worker = entry.getValue();
    +            ExecutorDetails exec = entry.getKey();
    +            String comp = execToComp.get(exec);
    +            if (!workerCompMap.containsKey(worker)) {
    +                workerCompMap.put(worker, new LinkedList<>());
    +            }
    +            workerCompMap.get(worker).add(comp);
    +        }
    +        for (Map.Entry<WorkerSlot, List<String>> entry : workerCompMap.entrySet()) {
    +            List<String> comps = entry.getValue();
    +            for (int i = 0; i < comps.size(); i++) {
    +                for (int j = 0; j < comps.size(); j++) {
    +                    if (i != j && constraintMatrix.get(comps.get(i)).get(comps.get(j)) == 1) {
    +                        LOG.error("Incorrect Scheduling: worker exclusion for Component {} and {} not satisfied on WorkerSlot: {}",
    +                            comps.get(i), comps.get(j), entry.getKey());
    +                        return false;
    +                    }
    +                }
    +            }
    +        }
    +        return true;
    +    }
    +
    +    private static Map<WorkerSlot, RAS_Node> workerToNodes(Cluster cluster) {
    +        Map<WorkerSlot, RAS_Node> workerToNodes = new HashMap<>();
    +        for (RAS_Node node: RAS_Nodes.getAllNodesFrom(cluster).values()) {
    +            for (WorkerSlot s : node.getUsedSlots()) {
    +                workerToNodes.put(s, node);
    +            }
    +        }
    +        return workerToNodes;
    +    }
    +
    +    private static boolean checkSpreadSchedulingValid(Cluster cluster, TopologyDetails topo) {
    +        LOG.info("Checking for a valid scheduling...");
    +        Map<ExecutorDetails, WorkerSlot> result = cluster.getAssignmentById(topo.getId()).getExecutorToSlot();
    +        Map<ExecutorDetails, String> execToComp = topo.getExecutorToComponent();
    +        Map<WorkerSlot, HashSet<ExecutorDetails>> workerExecMap = new HashMap<>();
    +        Map<WorkerSlot, HashSet<String>> workerCompMap = new HashMap<>();
    +        Map<RAS_Node, HashSet<String>> nodeCompMap = new HashMap<>();
    +        Map<WorkerSlot, RAS_Node> workerToNodes = workerToNodes(cluster);
    +        boolean ret = true;
    +
    +        HashSet<String> spreadComps = getSpreadComps(topo);
    +        for (Map.Entry<ExecutorDetails, WorkerSlot> entry : result.entrySet()) {
    +            ExecutorDetails exec = entry.getKey();
    +            WorkerSlot worker = entry.getValue();
    +            RAS_Node node = workerToNodes.get(worker);
    +
    +            if (!workerExecMap.containsKey(worker)) {
    +                workerExecMap.put(worker, new HashSet<>());
    +                workerCompMap.put(worker, new HashSet<>());
    +            }
    +
    +            if (!nodeCompMap.containsKey(node)) {
    +                nodeCompMap.put(node, new HashSet<>());
    +            }
    +            if (workerExecMap.get(worker).contains(exec)) {
    +                LOG.error("Incorrect Scheduling: Found duplicate in scheduling");
    +                return false;
    +            }
    +            workerExecMap.get(worker).add(exec);
    +            String comp = execToComp.get(exec);
    +            workerCompMap.get(worker).add(comp);
    +            if (spreadComps.contains(comp)) {
    +                if (nodeCompMap.get(node).contains(comp)) {
    +                    LOG.error("Incorrect Scheduling: Spread for Component: {} {} on node {} not satisfied {}",
    +                        comp, exec, node.getId(), nodeCompMap.get(node));
    +                    ret = false;
    +                }
    +            }
    +            nodeCompMap.get(node).add(comp);
    +        }
    +        return ret;
    +    }
    +
    +    /**
    +     * Check if resource constraints satisfied.
    +     */
    +    private static boolean checkResourcesCorrect(Cluster cluster, TopologyDetails topo) {
    +        LOG.info("Checking Resources...");
    +        Map<ExecutorDetails, WorkerSlot> result = cluster.getAssignmentById(topo.getId()).getExecutorToSlot();
    +        Map<RAS_Node, Collection<ExecutorDetails>> nodeToExecs = new HashMap<>();
    +        Map<ExecutorDetails, WorkerSlot> mergedExecToWorker = new HashMap<>();
    +        Map<String, RAS_Node> nodes = RAS_Nodes.getAllNodesFrom(cluster);
    +        //merge with existing assignments
    +        if (cluster.getAssignmentById(topo.getId()) != null
    +                && cluster.getAssignmentById(topo.getId()).getExecutorToSlot() != null) {
    +            mergedExecToWorker.putAll(cluster.getAssignmentById(topo.getId()).getExecutorToSlot());
    +        }
    +        mergedExecToWorker.putAll(result);
    +
    +        for (Map.Entry<ExecutorDetails, WorkerSlot> entry : mergedExecToWorker.entrySet()) {
    +            ExecutorDetails exec = entry.getKey();
    +            WorkerSlot worker = entry.getValue();
    +            RAS_Node node = nodes.get(worker.getNodeId());
    +
    +            if (node.getAvailableMemoryResources() < 0.0 && node.getAvailableCpuResources() < 0.0) {
    +                LOG.error("Incorrect Scheduling: found node with negative available resources");
    +                return false;
    +            }
    +            if (!nodeToExecs.containsKey(node)) {
    +                nodeToExecs.put(node, new LinkedList<>());
    +            }
    +            nodeToExecs.get(node).add(exec);
    +        }
    +
    +        for (Map.Entry<RAS_Node, Collection<ExecutorDetails>> entry : nodeToExecs.entrySet()) {
    +            RAS_Node node = entry.getKey();
    +            Collection<ExecutorDetails> execs = entry.getValue();
    +            double cpuUsed = 0.0;
    +            double memoryUsed = 0.0;
    +            for (ExecutorDetails exec : execs) {
    +                cpuUsed += topo.getTotalCpuReqTask(exec);
    +                memoryUsed += topo.getTotalMemReqTask(exec);
    +            }
    +            if (node.getAvailableCpuResources() != (node.getTotalCpuResources() - cpuUsed)) {
    +                LOG.error("Incorrect Scheduling: node {} has consumed incorrect amount of cpu. Expected: {}"
    +                        + " Actual: {} Executors scheduled on node: {}",
    +                        node.getId(), (node.getTotalCpuResources() - cpuUsed), node.getAvailableCpuResources(), execs);
    +                return false;
    +            }
    +            if (node.getAvailableMemoryResources() != (node.getTotalMemoryResources() - memoryUsed)) {
    +                LOG.error("Incorrect Scheduling: node {} has consumed incorrect amount of memory. Expected: {}"
    +                        + " Actual: {} Executors scheduled on node: {}",
    +                        node.getId(), (node.getTotalMemoryResources() - memoryUsed), node.getAvailableMemoryResources(), execs);
    +                return false;
    +            }
    +        }
    +        return true;
    +    }
    +
    +    private Map<String, Set<ExecutorDetails>> getCompToExecs(Map<ExecutorDetails, String> executorToComp) {
    +        Map<String, Set<ExecutorDetails>> retMap = new HashMap<>();
    +        for (Map.Entry<ExecutorDetails, String> entry : executorToComp.entrySet()) {
    +            ExecutorDetails exec = entry.getKey();
    +            String comp = entry.getValue();
    +            if (!retMap.containsKey(comp)) {
    +                retMap.put(comp, new HashSet<>());
    +            }
    +            retMap.get(comp).add(exec);
    --- End diff --
    
    Should we make this more java 8 like?


---

[GitHub] storm pull request #2442: STORM-2837: ConstraintSolverStrategy

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2442#discussion_r154194020
  
    --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java ---
    @@ -0,0 +1,623 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.scheduler.resource.strategies.scheduling;
    +
    +import com.google.common.annotations.VisibleForTesting;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Comparator;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.LinkedHashMap;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.NavigableMap;
    +import java.util.Set;
    +import java.util.Stack;
    +import java.util.TreeMap;
    +import java.util.TreeSet;
    +import org.apache.storm.Config;
    +import org.apache.storm.scheduler.Cluster;
    +import org.apache.storm.scheduler.ExecutorDetails;
    +import org.apache.storm.scheduler.SchedulerAssignment;
    +import org.apache.storm.scheduler.TopologyDetails;
    +import org.apache.storm.scheduler.WorkerSlot;
    +import org.apache.storm.scheduler.resource.RAS_Node;
    +import org.apache.storm.scheduler.resource.RAS_Nodes;
    +import org.apache.storm.scheduler.resource.SchedulingResult;
    +import org.apache.storm.scheduler.resource.SchedulingStatus;
    +import org.apache.storm.utils.ObjectReader;
    +import org.apache.storm.utils.Time;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +public class ConstraintSolverStrategy extends BaseResourceAwareStrategy {
    +    private static final Logger LOG = LoggerFactory.getLogger(ConstraintSolverStrategy.class);
    +
    +    protected static class SolverResult {
    +        private final int statesSearched;
    +        private final boolean success;
    +        private final long timeTakenMillis;
    +        private final int backtracked;
    +
    +        public SolverResult(SearcherState state, boolean success) {
    +            this.statesSearched = state.getStatesSearched();
    +            this.success = success;
    +            timeTakenMillis = Time.currentTimeMillis() - state.startTimeMillis;
    +            backtracked = state.numBacktrack;
    +        }
    +
    +        public SchedulingResult asSchedulingResult() {
    +            if (success) {
    +                return SchedulingResult.success("Fully Scheduled by ConstraintSolverStrategy (" + statesSearched
    +                    + " states traversed in " + timeTakenMillis + "ms, backtracked " + backtracked + " times)");
    +            }
    +            return SchedulingResult.failure(SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES,
    +                "Cannot find scheduling that satisfies all constraints (" + statesSearched
    +                    + " states traversed in " + timeTakenMillis + "ms, backtracked " + backtracked + " times)");
    +        }
    +    }
    +
    +    protected static class SearcherState {
    +        // Metrics
    +        // How many states searched so far.
    +        private int statesSearched = 0;
    +        // Number of times we had to backtrack.
    +        private int numBacktrack = 0;
    +        final long startTimeMillis;
    +        private final long maxEndTimeMs;
    +
    +        // Current state
    +        // The current executor we are trying to schedule
    +        private int execIndex = 0;
    +        // A map of the worker to the components in the worker to be able to enforce constraints.
    +        private final Map<WorkerSlot, Set<String>> workerCompAssignment;
    +        private final boolean[] okToRemoveFromWorker;
    +        // for the currently tested assignment a Map of the node to the components on it to be able to enforce constraints
    +        private final Map<RAS_Node, Set<String>> nodeCompAssignment;
    +        private final boolean[] okToRemoveFromNode;
    +
    +        // Static State
    +        // The list of all executors (preferably sorted to make assignments simpler).
    +        private final List<ExecutorDetails> execs;
    +        //The maximum number of state to search before stopping.
    +        private final int maxStatesSearched;
    +        //The topology we are scheduling
    +        private final TopologyDetails td;
    +
    +        private SearcherState(Map<WorkerSlot, Set<String>> workerCompAssignment, Map<RAS_Node, Set<String>> nodeCompAssignment,
    +                              int maxStatesSearched, long maxTimeMs, List<ExecutorDetails> execs, TopologyDetails td) {
    +            assert !execs.isEmpty();
    +            assert execs != null;
    +
    +            this.workerCompAssignment = workerCompAssignment;
    +            this.nodeCompAssignment = nodeCompAssignment;
    +            this.maxStatesSearched = maxStatesSearched;
    +            this.execs = execs;
    +            okToRemoveFromWorker = new boolean[execs.size()];
    +            okToRemoveFromNode = new boolean[execs.size()];
    +            this.td = td;
    +            startTimeMillis = Time.currentTimeMillis();
    +            if (maxTimeMs <= 0) {
    +                maxEndTimeMs = Long.MAX_VALUE;
    +            } else {
    +                maxEndTimeMs = startTimeMillis + maxTimeMs;
    +            }
    +        }
    +
    +        public void incStatesSearched() {
    +            statesSearched++;
    +            if (LOG.isDebugEnabled() && statesSearched % 1_000 == 0) {
    +                LOG.debug("States Searched: {}", statesSearched);
    +                LOG.debug("backtrack: {}", numBacktrack);
    +            }
    +        }
    +
    +        public int getStatesSearched() {
    +            return statesSearched;
    +        }
    +
    +        public boolean areSearchLimitsExceeded() {
    +            return statesSearched > maxStatesSearched || Time.currentTimeMillis() > maxEndTimeMs;
    +        }
    +
    +        public SearcherState nextExecutor() {
    +            execIndex++;
    +            if (execIndex >= execs.size()) {
    +                throw new IllegalStateException("Exceeded the exec limit " + execIndex + " >= " + execs.size());
    +            }
    +            return this;
    +        }
    +
    +        public boolean areAllExecsScheduled() {
    +            return execIndex == execs.size() - 1;
    +        }
    +
    +        public ExecutorDetails currentExec() {
    +            return execs.get(execIndex);
    +        }
    +
    +        public void tryToSchedule(String comp, RAS_Node node, WorkerSlot workerSlot) {
    +            ExecutorDetails exec = currentExec();
    +            LOG.trace("Trying assignment of {} {} to {}", exec, comp, workerSlot);
    +            //It is possible that this component is already scheduled on this node or worker.  If so when we backtrack we cannot remove it
    +            okToRemoveFromWorker[execIndex] = workerCompAssignment.computeIfAbsent(workerSlot, (k) -> new HashSet<>()).add(comp);
    +            okToRemoveFromNode[execIndex] = nodeCompAssignment.get(node).add(comp);
    +            node.assignSingleExecutor(workerSlot, exec, td);
    +        }
    +
    +        public void backtrack(String comp, RAS_Node node, WorkerSlot workerSlot) {
    +            execIndex--;
    +            if (execIndex < 0) {
    +                throw new IllegalStateException("Internal Error exec index became negative");
    +            }
    +            numBacktrack++;
    +            ExecutorDetails exec = currentExec();
    +            LOG.trace("Backtracking {} {} from {}", exec, comp, workerSlot);
    +            if (okToRemoveFromWorker[execIndex]) {
    --- End diff --
    
    As defensive we may want to reset this back to false.


---

[GitHub] storm pull request #2442: STORM-2837: ConstraintSolverStrategy

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2442#discussion_r154195462
  
    --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java ---
    @@ -0,0 +1,623 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.scheduler.resource.strategies.scheduling;
    +
    +import com.google.common.annotations.VisibleForTesting;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Comparator;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.LinkedHashMap;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.NavigableMap;
    +import java.util.Set;
    +import java.util.Stack;
    +import java.util.TreeMap;
    +import java.util.TreeSet;
    +import org.apache.storm.Config;
    +import org.apache.storm.scheduler.Cluster;
    +import org.apache.storm.scheduler.ExecutorDetails;
    +import org.apache.storm.scheduler.SchedulerAssignment;
    +import org.apache.storm.scheduler.TopologyDetails;
    +import org.apache.storm.scheduler.WorkerSlot;
    +import org.apache.storm.scheduler.resource.RAS_Node;
    +import org.apache.storm.scheduler.resource.RAS_Nodes;
    +import org.apache.storm.scheduler.resource.SchedulingResult;
    +import org.apache.storm.scheduler.resource.SchedulingStatus;
    +import org.apache.storm.utils.ObjectReader;
    +import org.apache.storm.utils.Time;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +public class ConstraintSolverStrategy extends BaseResourceAwareStrategy {
    +    private static final Logger LOG = LoggerFactory.getLogger(ConstraintSolverStrategy.class);
    +
    +    protected static class SolverResult {
    +        private final int statesSearched;
    +        private final boolean success;
    +        private final long timeTakenMillis;
    +        private final int backtracked;
    +
    +        public SolverResult(SearcherState state, boolean success) {
    +            this.statesSearched = state.getStatesSearched();
    +            this.success = success;
    +            timeTakenMillis = Time.currentTimeMillis() - state.startTimeMillis;
    +            backtracked = state.numBacktrack;
    +        }
    +
    +        public SchedulingResult asSchedulingResult() {
    +            if (success) {
    +                return SchedulingResult.success("Fully Scheduled by ConstraintSolverStrategy (" + statesSearched
    +                    + " states traversed in " + timeTakenMillis + "ms, backtracked " + backtracked + " times)");
    +            }
    +            return SchedulingResult.failure(SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES,
    +                "Cannot find scheduling that satisfies all constraints (" + statesSearched
    +                    + " states traversed in " + timeTakenMillis + "ms, backtracked " + backtracked + " times)");
    +        }
    +    }
    +
    +    protected static class SearcherState {
    +        // Metrics
    +        // How many states searched so far.
    +        private int statesSearched = 0;
    +        // Number of times we had to backtrack.
    +        private int numBacktrack = 0;
    +        final long startTimeMillis;
    +        private final long maxEndTimeMs;
    +
    +        // Current state
    +        // The current executor we are trying to schedule
    +        private int execIndex = 0;
    +        // A map of the worker to the components in the worker to be able to enforce constraints.
    +        private final Map<WorkerSlot, Set<String>> workerCompAssignment;
    +        private final boolean[] okToRemoveFromWorker;
    +        // for the currently tested assignment a Map of the node to the components on it to be able to enforce constraints
    +        private final Map<RAS_Node, Set<String>> nodeCompAssignment;
    +        private final boolean[] okToRemoveFromNode;
    +
    +        // Static State
    +        // The list of all executors (preferably sorted to make assignments simpler).
    +        private final List<ExecutorDetails> execs;
    +        //The maximum number of state to search before stopping.
    +        private final int maxStatesSearched;
    +        //The topology we are scheduling
    +        private final TopologyDetails td;
    +
    +        private SearcherState(Map<WorkerSlot, Set<String>> workerCompAssignment, Map<RAS_Node, Set<String>> nodeCompAssignment,
    +                              int maxStatesSearched, long maxTimeMs, List<ExecutorDetails> execs, TopologyDetails td) {
    +            assert !execs.isEmpty();
    +            assert execs != null;
    +
    +            this.workerCompAssignment = workerCompAssignment;
    +            this.nodeCompAssignment = nodeCompAssignment;
    +            this.maxStatesSearched = maxStatesSearched;
    +            this.execs = execs;
    +            okToRemoveFromWorker = new boolean[execs.size()];
    +            okToRemoveFromNode = new boolean[execs.size()];
    +            this.td = td;
    +            startTimeMillis = Time.currentTimeMillis();
    +            if (maxTimeMs <= 0) {
    +                maxEndTimeMs = Long.MAX_VALUE;
    +            } else {
    +                maxEndTimeMs = startTimeMillis + maxTimeMs;
    +            }
    +        }
    +
    +        public void incStatesSearched() {
    +            statesSearched++;
    +            if (LOG.isDebugEnabled() && statesSearched % 1_000 == 0) {
    +                LOG.debug("States Searched: {}", statesSearched);
    +                LOG.debug("backtrack: {}", numBacktrack);
    +            }
    +        }
    +
    +        public int getStatesSearched() {
    +            return statesSearched;
    +        }
    +
    +        public boolean areSearchLimitsExceeded() {
    +            return statesSearched > maxStatesSearched || Time.currentTimeMillis() > maxEndTimeMs;
    +        }
    +
    +        public SearcherState nextExecutor() {
    +            execIndex++;
    +            if (execIndex >= execs.size()) {
    +                throw new IllegalStateException("Exceeded the exec limit " + execIndex + " >= " + execs.size());
    +            }
    +            return this;
    +        }
    +
    +        public boolean areAllExecsScheduled() {
    +            return execIndex == execs.size() - 1;
    +        }
    +
    +        public ExecutorDetails currentExec() {
    +            return execs.get(execIndex);
    +        }
    +
    +        public void tryToSchedule(String comp, RAS_Node node, WorkerSlot workerSlot) {
    +            ExecutorDetails exec = currentExec();
    +            LOG.trace("Trying assignment of {} {} to {}", exec, comp, workerSlot);
    +            //It is possible that this component is already scheduled on this node or worker.  If so when we backtrack we cannot remove it
    +            okToRemoveFromWorker[execIndex] = workerCompAssignment.computeIfAbsent(workerSlot, (k) -> new HashSet<>()).add(comp);
    +            okToRemoveFromNode[execIndex] = nodeCompAssignment.get(node).add(comp);
    +            node.assignSingleExecutor(workerSlot, exec, td);
    +        }
    +
    +        public void backtrack(String comp, RAS_Node node, WorkerSlot workerSlot) {
    +            execIndex--;
    +            if (execIndex < 0) {
    +                throw new IllegalStateException("Internal Error exec index became negative");
    +            }
    +            numBacktrack++;
    +            ExecutorDetails exec = currentExec();
    +            LOG.trace("Backtracking {} {} from {}", exec, comp, workerSlot);
    +            if (okToRemoveFromWorker[execIndex]) {
    +                workerCompAssignment.get(workerSlot).remove(comp);
    +            }
    +            if (okToRemoveFromNode[execIndex]) {
    +                nodeCompAssignment.get(node).remove(comp);
    +            }
    +            node.freeSingleExecutor(exec, td);
    +        }
    +    }
    +
    +    private Map<String, RAS_Node> nodes;
    +    private Map<ExecutorDetails, String> execToComp;
    +    private Map<String, Set<ExecutorDetails>> compToExecs;
    +    private List<String> favoredNodes;
    +    private List<String> unFavoredNodes;
    +
    +    //constraints and spreads
    +    private Map<String, Map<String, Integer>> constraintMatrix;
    +    private HashSet<String> spreadComps = new HashSet<>();
    +
    +    //hard coded max number of states to search
    +    public static final int MAX_STATE_SEARCH = 100_000;
    +
    +    @Override
    +    public SchedulingResult schedule(Cluster cluster, TopologyDetails td) {
    +        prepare(cluster);
    +        LOG.debug("Scheduling {}", td.getId());
    +        nodes = RAS_Nodes.getAllNodesFrom(cluster);
    +        Map<WorkerSlot, Set<String>> workerCompAssignment = new HashMap<>();
    +        Map<RAS_Node, Set<String>> nodeCompAssignment = new HashMap<>();
    +        //set max number of states to search
    +        final int maxStateSearch = Math.min(MAX_STATE_SEARCH,
    +            ObjectReader.getInt(td.getConf().get(Config.TOPOLOGY_CONSTRAINTS_MAX_DEPTH_TRAVERSAL)));
    +
    +        final long maxTimeMs =
    +            ObjectReader.getInt(td.getConf().get(Config.TOPOLOGY_CONSTRAINTS_MAX_TIME_SECS), -1).intValue() * 1000L;
    +
    +        favoredNodes = (List<String>) td.getConf().get(Config.TOPOLOGY_SCHEDULER_FAVORED_NODES);
    +        unFavoredNodes = (List<String>) td.getConf().get(Config.TOPOLOGY_SCHEDULER_UNFAVORED_NODES);
    +
    +        //get mapping of execs to components
    +        execToComp = td.getExecutorToComponent();
    +        //get mapping of components to executors
    +        compToExecs = getCompToExecs(execToComp);
    +
    +        //get topology constraints
    +        constraintMatrix = getConstraintMap(td, compToExecs.keySet());
    +
    +        //get spread components
    +        spreadComps = getSpreadComps(td);
    +
    +        ArrayList<ExecutorDetails> sortedExecs = new ArrayList<>();
    +        //get a sorted list of unassigned executors based on number of constraints
    +        Set<ExecutorDetails> unassignedExecutors = new HashSet<>(cluster.getUnassignedExecutors(td));
    +        for (ExecutorDetails exec1 : getSortedExecs(spreadComps, constraintMatrix, compToExecs)) {
    +            if (unassignedExecutors.contains(exec1)) {
    +                sortedExecs.add(exec1);
    +            }
    +        }
    +
    +        //initialize structures
    +        for (RAS_Node node : nodes.values()) {
    +            nodeCompAssignment.put(node, new HashSet<>());
    +        }
    +        //populate with existing assignments
    +        SchedulerAssignment existingAssignment = cluster.getAssignmentById(td.getId());
    +        if (existingAssignment != null) {
    +            for (Map.Entry<ExecutorDetails, WorkerSlot> entry1 : existingAssignment.getExecutorToSlot().entrySet()) {
    +                ExecutorDetails exec1 = entry1.getKey();
    +                String compId = execToComp.get(exec1);
    +                WorkerSlot ws = entry1.getValue();
    +                RAS_Node node = nodes.get(ws.getNodeId());
    +                //populate node to component Assignments
    +                nodeCompAssignment.get(node).add(compId);
    +                //populate worker to comp assignments
    +                workerCompAssignment.computeIfAbsent(ws, (k) -> new HashSet<>()).add(compId);
    +            }
    +        }
    +
    +        //early detection/early fail
    +        if (!checkSchedulingFeasibility()) {
    +            //Scheduling Status set to FAIL_OTHER so no eviction policy will be attempted to make space for this topology
    +            return SchedulingResult.failure(SchedulingStatus.FAIL_OTHER, "Scheduling not feasible!");
    +        }
    +        return backtrackSearch(new SearcherState(workerCompAssignment, nodeCompAssignment, maxStateSearch, maxTimeMs, sortedExecs, td))
    +            .asSchedulingResult();
    +    }
    +
    +    private boolean checkSchedulingFeasibility() {
    +        for (String comp : spreadComps) {
    +            int numExecs = compToExecs.get(comp).size();
    +            if (numExecs > nodes.size()) {
    +                LOG.error("Unsatisfiable constraint: Component: {} marked as spread has {} executors which is larger "
    +                    + "than number of nodes: {}", comp, numExecs, nodes.size());
    +                return false;
    +            }
    +        }
    +        if (execToComp.size() >= MAX_STATE_SEARCH) {
    +            LOG.error("Number of executors is greater than the maximum number of states allowed to be searched.  "
    +                + "# of executors: {} Max states to search: {}", execToComp.size(), MAX_STATE_SEARCH);
    +            return false;
    +        }
    +        return true;
    +    }
    +    
    +    @Override
    +    protected TreeSet<ObjectResources> sortObjectResources(
    +        final AllResources allResources, ExecutorDetails exec, TopologyDetails topologyDetails,
    +        final ExistingScheduleFunc existingScheduleFunc) {
    +        return GenericResourceAwareStrategy.sortObjectResourcesImpl(allResources, exec, topologyDetails, existingScheduleFunc);
    +    }
    +
    +    // Backtracking algorithm does not take into account the ordering of executors in worker to reduce traversal space
    +    @VisibleForTesting
    +    protected SolverResult backtrackSearch(SearcherState state) {
    +        state.incStatesSearched();
    +        if (state.areSearchLimitsExceeded()) {
    +            LOG.warn("Limits Exceeded");
    +            return new SolverResult(state, false);
    +        }
    +
    +        ExecutorDetails exec = state.currentExec();
    +        List<ObjectResources> sortedNodes = sortAllNodes(state.td, exec, favoredNodes, unFavoredNodes);
    +
    +        for (ObjectResources nodeResources: sortedNodes) {
    +            RAS_Node node = nodes.get(nodeResources.id);
    +            for (WorkerSlot workerSlot : node.getSlotsAvailbleTo(state.td)) {
    +                if (isExecAssignmentToWorkerValid(workerSlot, state)) {
    +                    String comp = execToComp.get(exec);
    +
    +                    state.tryToSchedule(comp, node, workerSlot);
    +
    +                    if (state.areAllExecsScheduled()) {
    +                        //Everything is scheduled correctly, so no need to search any more.
    +                        return new SolverResult(state, true);
    +                    }
    +
    +                    SolverResult results = backtrackSearch(state.nextExecutor());
    +                    if (results.success) {
    +                        //We found a good result we are done.
    +                        return results;
    +                    }
    +
    +                    if (state.areSearchLimitsExceeded()) {
    +                        //No need to search more it is not going to help.
    +                        return new SolverResult(state, false);
    +                    }
    +
    +                    //backtracking (If we ever get here there really isn't a lot of hope that we will find a scheduling)
    +                    state.backtrack(comp, node, workerSlot);
    +                }
    +            }
    +        }
    +        //Tried all of the slots and none of them worked.
    +        return new SolverResult(state, false);
    +    }
    +
    +    /**
    +     * Check if any constraints are violated if exec is scheduled on worker.
    +     * @return true if scheduling exec on worker does not violate any constraints, returns false if it does
    +     */
    +    public boolean isExecAssignmentToWorkerValid(WorkerSlot worker, SearcherState state) {
    +        final ExecutorDetails exec = state.currentExec();
    +        //check resources
    +        RAS_Node node = nodes.get(worker.getNodeId());
    +        if (!node.wouldFit(worker, exec, state.td)) {
    +            LOG.trace("{} would not fit in resources available on {}", exec, worker);
    +            return false;
    +        }
    +
    +        //check if exec can be on worker based on user defined component exclusions
    +        String execComp = execToComp.get(exec);
    +        Set<String> components = state.workerCompAssignment.get(worker);
    +        if (components != null) {
    +            for (String comp : components) {
    +                if (constraintMatrix.get(execComp).get(comp) != 0) {
    +                    LOG.trace("{} found {} constraint violation {} on {}", exec, execComp, comp, worker);
    +                    return false;
    +                }
    +            }
    +        }
    +
    +        //check if exec satisfy spread
    +        if (spreadComps.contains(execComp)) {
    +            if (state.nodeCompAssignment.get(node).contains(execComp)) {
    +                LOG.trace("{} Found spread violation {} on node {}", exec, execComp, node.getId());
    +                return false;
    +            }
    +        }
    +        return true;
    +    }
    +
    +    static Map<String, Map<String, Integer>> getConstraintMap(TopologyDetails topo, Set<String> comps) {
    +        Map<String, Map<String, Integer>> matrix = new HashMap<>();
    +        for (String comp : comps) {
    +            matrix.put(comp, new HashMap<>());
    +            for (String comp2 : comps) {
    +                matrix.get(comp).put(comp2, 0);
    +            }
    +        }
    +        List<List<String>> constraints = (List<List<String>>) topo.getConf().get(Config.TOPOLOGY_CONSTRAINTS);
    +        if (constraints != null) {
    +            for (List<String> constraintPair : constraints) {
    +                String comp1 = constraintPair.get(0);
    +                String comp2 = constraintPair.get(1);
    +                if (!matrix.containsKey(comp1)) {
    +                    LOG.warn("Comp: {} declared in constraints is not valid!", comp1);
    +                    continue;
    +                }
    +                if (!matrix.containsKey(comp2)) {
    +                    LOG.warn("Comp: {} declared in constraints is not valid!", comp2);
    +                    continue;
    +                }
    +                matrix.get(comp1).put(comp2, 1);
    +                matrix.get(comp2).put(comp1, 1);
    +            }
    +        }
    +        return matrix;
    +    }
    +
    +    /**
    +     * Determines if a scheduling is valid and all constraints are satisfied.
    +     */
    +    @VisibleForTesting
    +    public static boolean validateSolution(Cluster cluster, TopologyDetails td) {
    +        return checkSpreadSchedulingValid(cluster, td)
    +            && checkConstraintsSatisfied(cluster, td)
    +            && checkResourcesCorrect(cluster, td);
    +    }
    +
    +    /**
    +     * Check if constraints are satisfied.
    +     */
    +    private static boolean checkConstraintsSatisfied(Cluster cluster, TopologyDetails topo) {
    +        LOG.info("Checking constraints...");
    +        Map<ExecutorDetails, WorkerSlot> result = cluster.getAssignmentById(topo.getId()).getExecutorToSlot();
    +        Map<ExecutorDetails, String> execToComp = topo.getExecutorToComponent();
    +        //get topology constraints
    +        Map<String, Map<String, Integer>> constraintMatrix = getConstraintMap(topo, new HashSet<>(topo.getExecutorToComponent().values()));
    +
    +        Map<WorkerSlot, List<String>> workerCompMap = new HashMap<>();
    +        for (Map.Entry<ExecutorDetails, WorkerSlot> entry : result.entrySet()) {
    +            WorkerSlot worker = entry.getValue();
    +            ExecutorDetails exec = entry.getKey();
    +            String comp = execToComp.get(exec);
    +            if (!workerCompMap.containsKey(worker)) {
    +                workerCompMap.put(worker, new LinkedList<>());
    +            }
    +            workerCompMap.get(worker).add(comp);
    +        }
    +        for (Map.Entry<WorkerSlot, List<String>> entry : workerCompMap.entrySet()) {
    +            List<String> comps = entry.getValue();
    +            for (int i = 0; i < comps.size(); i++) {
    +                for (int j = 0; j < comps.size(); j++) {
    +                    if (i != j && constraintMatrix.get(comps.get(i)).get(comps.get(j)) == 1) {
    +                        LOG.error("Incorrect Scheduling: worker exclusion for Component {} and {} not satisfied on WorkerSlot: {}",
    +                            comps.get(i), comps.get(j), entry.getKey());
    +                        return false;
    +                    }
    +                }
    +            }
    +        }
    +        return true;
    +    }
    +
    +    private static Map<WorkerSlot, RAS_Node> workerToNodes(Cluster cluster) {
    +        Map<WorkerSlot, RAS_Node> workerToNodes = new HashMap<>();
    +        for (RAS_Node node: RAS_Nodes.getAllNodesFrom(cluster).values()) {
    +            for (WorkerSlot s : node.getUsedSlots()) {
    +                workerToNodes.put(s, node);
    +            }
    +        }
    +        return workerToNodes;
    +    }
    +
    +    private static boolean checkSpreadSchedulingValid(Cluster cluster, TopologyDetails topo) {
    +        LOG.info("Checking for a valid scheduling...");
    +        Map<ExecutorDetails, WorkerSlot> result = cluster.getAssignmentById(topo.getId()).getExecutorToSlot();
    +        Map<ExecutorDetails, String> execToComp = topo.getExecutorToComponent();
    +        Map<WorkerSlot, HashSet<ExecutorDetails>> workerExecMap = new HashMap<>();
    +        Map<WorkerSlot, HashSet<String>> workerCompMap = new HashMap<>();
    +        Map<RAS_Node, HashSet<String>> nodeCompMap = new HashMap<>();
    +        Map<WorkerSlot, RAS_Node> workerToNodes = workerToNodes(cluster);
    +        boolean ret = true;
    +
    +        HashSet<String> spreadComps = getSpreadComps(topo);
    +        for (Map.Entry<ExecutorDetails, WorkerSlot> entry : result.entrySet()) {
    +            ExecutorDetails exec = entry.getKey();
    +            WorkerSlot worker = entry.getValue();
    +            RAS_Node node = workerToNodes.get(worker);
    +
    +            if (!workerExecMap.containsKey(worker)) {
    +                workerExecMap.put(worker, new HashSet<>());
    +                workerCompMap.put(worker, new HashSet<>());
    +            }
    +
    +            if (!nodeCompMap.containsKey(node)) {
    +                nodeCompMap.put(node, new HashSet<>());
    +            }
    +            if (workerExecMap.get(worker).contains(exec)) {
    +                LOG.error("Incorrect Scheduling: Found duplicate in scheduling");
    +                return false;
    +            }
    +            workerExecMap.get(worker).add(exec);
    +            String comp = execToComp.get(exec);
    +            workerCompMap.get(worker).add(comp);
    +            if (spreadComps.contains(comp)) {
    +                if (nodeCompMap.get(node).contains(comp)) {
    +                    LOG.error("Incorrect Scheduling: Spread for Component: {} {} on node {} not satisfied {}",
    +                        comp, exec, node.getId(), nodeCompMap.get(node));
    +                    ret = false;
    +                }
    +            }
    +            nodeCompMap.get(node).add(comp);
    +        }
    +        return ret;
    +    }
    +
    +    /**
    +     * Check if resource constraints satisfied.
    +     */
    +    private static boolean checkResourcesCorrect(Cluster cluster, TopologyDetails topo) {
    +        LOG.info("Checking Resources...");
    +        Map<ExecutorDetails, WorkerSlot> result = cluster.getAssignmentById(topo.getId()).getExecutorToSlot();
    +        Map<RAS_Node, Collection<ExecutorDetails>> nodeToExecs = new HashMap<>();
    +        Map<ExecutorDetails, WorkerSlot> mergedExecToWorker = new HashMap<>();
    +        Map<String, RAS_Node> nodes = RAS_Nodes.getAllNodesFrom(cluster);
    +        //merge with existing assignments
    +        if (cluster.getAssignmentById(topo.getId()) != null
    +                && cluster.getAssignmentById(topo.getId()).getExecutorToSlot() != null) {
    +            mergedExecToWorker.putAll(cluster.getAssignmentById(topo.getId()).getExecutorToSlot());
    +        }
    +        mergedExecToWorker.putAll(result);
    +
    +        for (Map.Entry<ExecutorDetails, WorkerSlot> entry : mergedExecToWorker.entrySet()) {
    +            ExecutorDetails exec = entry.getKey();
    +            WorkerSlot worker = entry.getValue();
    +            RAS_Node node = nodes.get(worker.getNodeId());
    +
    +            if (node.getAvailableMemoryResources() < 0.0 && node.getAvailableCpuResources() < 0.0) {
    +                LOG.error("Incorrect Scheduling: found node with negative available resources");
    +                return false;
    +            }
    +            if (!nodeToExecs.containsKey(node)) {
    +                nodeToExecs.put(node, new LinkedList<>());
    +            }
    +            nodeToExecs.get(node).add(exec);
    +        }
    +
    +        for (Map.Entry<RAS_Node, Collection<ExecutorDetails>> entry : nodeToExecs.entrySet()) {
    +            RAS_Node node = entry.getKey();
    +            Collection<ExecutorDetails> execs = entry.getValue();
    +            double cpuUsed = 0.0;
    +            double memoryUsed = 0.0;
    +            for (ExecutorDetails exec : execs) {
    +                cpuUsed += topo.getTotalCpuReqTask(exec);
    +                memoryUsed += topo.getTotalMemReqTask(exec);
    +            }
    +            if (node.getAvailableCpuResources() != (node.getTotalCpuResources() - cpuUsed)) {
    +                LOG.error("Incorrect Scheduling: node {} has consumed incorrect amount of cpu. Expected: {}"
    +                        + " Actual: {} Executors scheduled on node: {}",
    +                        node.getId(), (node.getTotalCpuResources() - cpuUsed), node.getAvailableCpuResources(), execs);
    +                return false;
    +            }
    +            if (node.getAvailableMemoryResources() != (node.getTotalMemoryResources() - memoryUsed)) {
    +                LOG.error("Incorrect Scheduling: node {} has consumed incorrect amount of memory. Expected: {}"
    +                        + " Actual: {} Executors scheduled on node: {}",
    +                        node.getId(), (node.getTotalMemoryResources() - memoryUsed), node.getAvailableMemoryResources(), execs);
    +                return false;
    +            }
    +        }
    +        return true;
    +    }
    +
    +    private Map<String, Set<ExecutorDetails>> getCompToExecs(Map<ExecutorDetails, String> executorToComp) {
    +        Map<String, Set<ExecutorDetails>> retMap = new HashMap<>();
    +        for (Map.Entry<ExecutorDetails, String> entry : executorToComp.entrySet()) {
    +            ExecutorDetails exec = entry.getKey();
    +            String comp = entry.getValue();
    +            if (!retMap.containsKey(comp)) {
    +                retMap.put(comp, new HashSet<>());
    +            }
    +            retMap.get(comp).add(exec);
    +        }
    +        return retMap;
    +    }
    +
    +    private ArrayList<ExecutorDetails> getSortedExecs(HashSet<String> spreadComps, Map<String, Map<String, Integer>> constraintMatrix,
    +                                                      Map<String, Set<ExecutorDetails>> compToExecs) {
    +        ArrayList<ExecutorDetails> retList = new ArrayList<>();
    +        //find number of constraints per component
    +        //Key->Comp Value-># of constraints
    +        Map<String, Integer> compConstraintCountMap = new HashMap<>();
    +        for (Map.Entry<String, Map<String, Integer>> constraintEntry1 : constraintMatrix.entrySet()) {
    +            int count = 0;
    +            String comp = constraintEntry1.getKey();
    +            for (Map.Entry<String, Integer> constraintEntry2 : constraintEntry1.getValue().entrySet()) {
    +                if (constraintEntry2.getValue() == 1) {
    +                    count++;
    +                }
    +            }
    +            //check component is declared for spreading
    +            if (spreadComps.contains(constraintEntry1.getKey())) {
    +                count++;
    +            }
    +            compConstraintCountMap.put(comp, count);
    +        }
    +        //Sort comps by number of constraints
    +        NavigableMap<String, Integer> sortedCompConstraintCountMap = sortByValues(compConstraintCountMap);
    +        //sort executors based on component constraints
    +        for (String comp : sortedCompConstraintCountMap.keySet()) {
    +            retList.addAll(compToExecs.get(comp));
    +        }
    +        return retList;
    +    }
    +
    +    private static HashSet<String> getSpreadComps(TopologyDetails topo) {
    +        HashSet<String> retSet = new HashSet<>();
    +        List<String> spread = (List<String>) topo.getConf().get(Config.TOPOLOGY_SPREAD_COMPONENTS);
    +        if (spread != null) {
    +            Set<String> comps = topo.getComponents().keySet();
    +            for (String comp : spread) {
    +                if (comps.contains(comp)) {
    +                    retSet.add(comp);
    +                } else {
    +                    LOG.warn("Comp {} declared for spread not valid", comp);
    --- End diff --
    
    Should this be fatal?


---

[GitHub] storm pull request #2442: STORM-2837: ConstraintSolverStrategy

Posted by jerrypeng <gi...@git.apache.org>.
Github user jerrypeng commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2442#discussion_r157680068
  
    --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/GenericResourceAwareStrategy.java ---
    @@ -129,7 +129,15 @@ public SchedulingResult schedule(Cluster cluster, TopologyDetails td) {
         protected TreeSet<ObjectResources> sortObjectResources(
                 final AllResources allResources, ExecutorDetails exec, TopologyDetails topologyDetails,
                 final ExistingScheduleFunc existingScheduleFunc) {
    +        return sortObjectResourcesImpl(allResources, exec, topologyDetails, existingScheduleFunc);
    +    }
     
    +    /**
    +     * Implelemtation of the sortObjectResources method so other strategies can reuse it.
    +     */
    +    public static TreeSet<ObjectResources> sortObjectResourcesImpl(
    --- End diff --
    
    I would suggest putting this in some sort of Utility class.  Its kind of awkward for the ConstraintSovlerStrategy to call a static method in the GenericResourceAwareStrategy


---

[GitHub] storm issue #2442: STORM-2837: ConstraintSolverStrategy

Posted by jerrypeng <gi...@git.apache.org>.
Github user jerrypeng commented on the issue:

    https://github.com/apache/storm/pull/2442
  
    cool this got open sourced!


---

[GitHub] storm pull request #2442: STORM-2837: ConstraintSolverStrategy

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2442#discussion_r154194603
  
    --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java ---
    @@ -0,0 +1,623 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.scheduler.resource.strategies.scheduling;
    +
    +import com.google.common.annotations.VisibleForTesting;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Comparator;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.LinkedHashMap;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.NavigableMap;
    +import java.util.Set;
    +import java.util.Stack;
    +import java.util.TreeMap;
    +import java.util.TreeSet;
    +import org.apache.storm.Config;
    +import org.apache.storm.scheduler.Cluster;
    +import org.apache.storm.scheduler.ExecutorDetails;
    +import org.apache.storm.scheduler.SchedulerAssignment;
    +import org.apache.storm.scheduler.TopologyDetails;
    +import org.apache.storm.scheduler.WorkerSlot;
    +import org.apache.storm.scheduler.resource.RAS_Node;
    +import org.apache.storm.scheduler.resource.RAS_Nodes;
    +import org.apache.storm.scheduler.resource.SchedulingResult;
    +import org.apache.storm.scheduler.resource.SchedulingStatus;
    +import org.apache.storm.utils.ObjectReader;
    +import org.apache.storm.utils.Time;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +public class ConstraintSolverStrategy extends BaseResourceAwareStrategy {
    +    private static final Logger LOG = LoggerFactory.getLogger(ConstraintSolverStrategy.class);
    +
    +    protected static class SolverResult {
    +        private final int statesSearched;
    +        private final boolean success;
    +        private final long timeTakenMillis;
    +        private final int backtracked;
    +
    +        public SolverResult(SearcherState state, boolean success) {
    +            this.statesSearched = state.getStatesSearched();
    +            this.success = success;
    +            timeTakenMillis = Time.currentTimeMillis() - state.startTimeMillis;
    +            backtracked = state.numBacktrack;
    +        }
    +
    +        public SchedulingResult asSchedulingResult() {
    +            if (success) {
    +                return SchedulingResult.success("Fully Scheduled by ConstraintSolverStrategy (" + statesSearched
    +                    + " states traversed in " + timeTakenMillis + "ms, backtracked " + backtracked + " times)");
    +            }
    +            return SchedulingResult.failure(SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES,
    +                "Cannot find scheduling that satisfies all constraints (" + statesSearched
    +                    + " states traversed in " + timeTakenMillis + "ms, backtracked " + backtracked + " times)");
    +        }
    +    }
    +
    +    protected static class SearcherState {
    +        // Metrics
    +        // How many states searched so far.
    +        private int statesSearched = 0;
    +        // Number of times we had to backtrack.
    +        private int numBacktrack = 0;
    +        final long startTimeMillis;
    +        private final long maxEndTimeMs;
    +
    +        // Current state
    +        // The current executor we are trying to schedule
    +        private int execIndex = 0;
    +        // A map of the worker to the components in the worker to be able to enforce constraints.
    +        private final Map<WorkerSlot, Set<String>> workerCompAssignment;
    +        private final boolean[] okToRemoveFromWorker;
    +        // for the currently tested assignment a Map of the node to the components on it to be able to enforce constraints
    +        private final Map<RAS_Node, Set<String>> nodeCompAssignment;
    +        private final boolean[] okToRemoveFromNode;
    +
    +        // Static State
    +        // The list of all executors (preferably sorted to make assignments simpler).
    +        private final List<ExecutorDetails> execs;
    +        //The maximum number of state to search before stopping.
    +        private final int maxStatesSearched;
    +        //The topology we are scheduling
    +        private final TopologyDetails td;
    +
    +        private SearcherState(Map<WorkerSlot, Set<String>> workerCompAssignment, Map<RAS_Node, Set<String>> nodeCompAssignment,
    +                              int maxStatesSearched, long maxTimeMs, List<ExecutorDetails> execs, TopologyDetails td) {
    +            assert !execs.isEmpty();
    +            assert execs != null;
    +
    +            this.workerCompAssignment = workerCompAssignment;
    +            this.nodeCompAssignment = nodeCompAssignment;
    +            this.maxStatesSearched = maxStatesSearched;
    +            this.execs = execs;
    +            okToRemoveFromWorker = new boolean[execs.size()];
    +            okToRemoveFromNode = new boolean[execs.size()];
    +            this.td = td;
    +            startTimeMillis = Time.currentTimeMillis();
    +            if (maxTimeMs <= 0) {
    +                maxEndTimeMs = Long.MAX_VALUE;
    +            } else {
    +                maxEndTimeMs = startTimeMillis + maxTimeMs;
    +            }
    +        }
    +
    +        public void incStatesSearched() {
    +            statesSearched++;
    +            if (LOG.isDebugEnabled() && statesSearched % 1_000 == 0) {
    +                LOG.debug("States Searched: {}", statesSearched);
    +                LOG.debug("backtrack: {}", numBacktrack);
    +            }
    +        }
    +
    +        public int getStatesSearched() {
    +            return statesSearched;
    +        }
    +
    +        public boolean areSearchLimitsExceeded() {
    +            return statesSearched > maxStatesSearched || Time.currentTimeMillis() > maxEndTimeMs;
    +        }
    +
    +        public SearcherState nextExecutor() {
    +            execIndex++;
    +            if (execIndex >= execs.size()) {
    +                throw new IllegalStateException("Exceeded the exec limit " + execIndex + " >= " + execs.size());
    +            }
    +            return this;
    +        }
    +
    +        public boolean areAllExecsScheduled() {
    +            return execIndex == execs.size() - 1;
    +        }
    +
    +        public ExecutorDetails currentExec() {
    +            return execs.get(execIndex);
    +        }
    +
    +        public void tryToSchedule(String comp, RAS_Node node, WorkerSlot workerSlot) {
    +            ExecutorDetails exec = currentExec();
    +            LOG.trace("Trying assignment of {} {} to {}", exec, comp, workerSlot);
    +            //It is possible that this component is already scheduled on this node or worker.  If so when we backtrack we cannot remove it
    +            okToRemoveFromWorker[execIndex] = workerCompAssignment.computeIfAbsent(workerSlot, (k) -> new HashSet<>()).add(comp);
    +            okToRemoveFromNode[execIndex] = nodeCompAssignment.get(node).add(comp);
    +            node.assignSingleExecutor(workerSlot, exec, td);
    +        }
    +
    +        public void backtrack(String comp, RAS_Node node, WorkerSlot workerSlot) {
    --- End diff --
    
    Here too instead of passing in the comp we may want to pass in the mapping and get it ourselves. Just because I would want to be sure that the state matches correctly.


---