You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by revans2 <gi...@git.apache.org> on 2017/11/03 17:05:57 UTC

[GitHub] storm pull request #2400: STORM-2792: Remove RAS EvictionPolicy and cleanup

GitHub user revans2 opened a pull request:

    https://github.com/apache/storm/pull/2400

     STORM-2792: Remove RAS EvictionPolicy and cleanup 

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/revans2/incubator-storm STORM-2792

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/2400.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2400
    
----
commit 5d95e7139493270643e35ffb1ba59628c8402e57
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Date:   2017-11-01T14:22:37Z

    STORM-2795: Fix race

commit 5f0abb21c83c129af5b4acce2d0bb3bbb0f41887
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Date:   2017-10-30T21:09:45Z

    STORM-2792: Remove RAS EvictionPolicy and cleanup

----


---

[GitHub] storm pull request #2400: STORM-2792: Remove RAS EvictionPolicy and cleanup

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2400#discussion_r148849861
  
    --- Diff: storm-client/pom.xml ---
    @@ -193,6 +193,10 @@
                 <groupId>org.hamcrest</groupId>
                 <artifactId>java-hamcrest</artifactId>
             </dependency>
    +        <dependency>
    +            <groupId>org.apache.curator</groupId>
    +            <artifactId>curator-client</artifactId>
    +        </dependency>
    --- End diff --
    
    Need to remove this.  It is a duplicate.  Not sure why my IDE wanted me to do this...


---

[GitHub] storm pull request #2400: STORM-2792: Remove RAS EvictionPolicy and cleanup

Posted by jerrypeng <gi...@git.apache.org>.
Github user jerrypeng commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2400#discussion_r148923251
  
    --- Diff: docs/Resource_Aware_Scheduler_overview.md ---
    @@ -303,28 +326,33 @@ To get an idea of how much memory/CPU your topology is actually using you can ad
         workerMetrics.put("CPU", "org.apache.storm.metrics.sigar.CPUMetric");
         conf.put(Config.TOPOLOGY_WORKER_METRICS, workerMetrics);
     ```
    +
     The CPU metrics will require you to add
    +
     ``` 
         <dependency>
             <groupId>org.apache.storm</groupId>
             <artifactId>storm-metrics</artifactId>
             <version>1.0.0</version>
         </dependency>
     ```
    +
     as a topology dependency (1.0.0 or higher).
      
     You can then go to your topology on the UI, turn on the system metrics, and find the log that the LoggingMetricsConsumer is writing to.  It will output results in the log like.
    +
     ```
         1454526100 node1.nodes.com:6707 -1:__system CPU {user-ms=74480, sys-ms=10780}
         1454526100 node1.nodes.com:6707 -1:__system memory/nonHeap     {unusedBytes=2077536, virtualFreeBytes=-64621729, initBytes=2555904, committedBytes=66699264, maxBytes=-1, usedBytes=64621728}
         1454526100 node1.nodes.com:6707 -1:__system memory/heap  {unusedBytes=573861408, virtualFreeBytes=694644256, initBytes=805306368, committedBytes=657719296, maxBytes=778502144, usedBytes=83857888}
     ```
    +
     The metrics with -1:__system are generally metrics for the entire worker.  In the example above that worker is running on node1.nodes.com:6707.  These metrics are collected every 60 seconds.  For the CPU you can see that over the 60 seconds this worker used  74480 + 10780 = 85260 ms of CPU time.  This is equivalent to 85260/60000 or about 1.5 cores.
      
     The Memory usage is similar but look at the usedBytes.  offHeap is 64621728 or about 62MB, and onHeap is 83857888 or about 80MB, but you should know what you set your heap to in each of your workers already.  How do you divide this up per bolt/spout?  That is a bit harder and may require some trial and error from your end.
     
     <div id='Enhancements-on-original-DefaultResourceAwareStrategy'/>
    -## * Enhancements on original DefaultResourceAwareStrategy *
    +## *Enhancements on original DefaultResourceAwareStrategy*
    --- End diff --
    
    The markdown does seem to be right for this header


---

[GitHub] storm pull request #2400: STORM-2792: Remove RAS EvictionPolicy and cleanup

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2400#discussion_r148852931
  
    --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java ---
    @@ -62,123 +70,144 @@ public void prepare(Map<String, Object> conf) {
     
         @Override
         public void schedule(Topologies topologies, Cluster cluster) {
    -        //initialize data structures
    -        for (TopologyDetails td : cluster.getTopologies()) {
    +        Map<String, User> userMap = getUsers(cluster);
    +        List<TopologyDetails> orderedTopologies = new ArrayList<>(schedulingPriorityStrategy.getOrderedTopologies(cluster, userMap));
    +        LOG.info("Ordered list of topologies is: {}", orderedTopologies.stream().map((t) -> t.getId()).collect(Collectors.toList()));
    +        for (TopologyDetails td : orderedTopologies) {
                 if (!cluster.needsSchedulingRas(td)) {
                     //cluster forgets about its previous status, so if it is scheduled just leave it.
                     cluster.setStatusIfAbsent(td.getId(), "Fully Scheduled");
    -            }
    -        }
    -        Map<String, User> userMap = getUsers(cluster);
    -
    -        while (true) {
    -            TopologyDetails td;
    -            try {
    -                //Call scheduling priority strategy
    -                td = schedulingPrioritystrategy.getNextTopologyToSchedule(cluster, userMap);
    -            } catch (Exception ex) {
    -                LOG.error("Exception thrown when running priority strategy {}. No topologies will be scheduled!",
    -                        schedulingPrioritystrategy.getClass().getName(), ex);
    -                break;
    -            }
    -            if (td == null) {
    -                break;
    -            }
    -            User submitter = userMap.get(td.getTopologySubmitter());
    -            if (cluster.needsSchedulingRas(td)) {
    -                scheduleTopology(td, cluster, submitter, userMap);
                 } else {
    -                LOG.warn("Topology {} is already fully scheduled!", td.getName());
    -                cluster.setStatusIfAbsent(td.getId(), "Fully Scheduled");
    +                User submitter = userMap.get(td.getTopologySubmitter());
    +                scheduleTopology(td, cluster, submitter, orderedTopologies);
                 }
             }
         }
     
    +    private static void markFailedTopology(User u, Cluster c, TopologyDetails td, String message) {
    +        markFailedTopology(u, c, td, message, null);
    +    }
     
    -    public void scheduleTopology(TopologyDetails td, Cluster cluster, final User topologySubmitter,
    -                                 Map<String, User> userMap) {
    +    private static void markFailedTopology(User u, Cluster c, TopologyDetails td, String message, Throwable t) {
    +        c.setStatus(td, message);
    +        String realMessage = td.getId() + " " + message;
    +        if (t != null) {
    +            LOG.error(realMessage, t);
    +        } else {
    +            LOG.error(realMessage);
    +        }
    +        u.markTopoUnsuccess(td);
    +    }
    +
    +    private void scheduleTopology(TopologyDetails td, Cluster cluster, final User topologySubmitter,
    +                                  List<TopologyDetails> orderedTopologies) {
             //A copy of cluster that we can modify, but does not get committed back to cluster unless scheduling succeeds
             Cluster workingState = new Cluster(cluster);
    +        RAS_Nodes nodes = new RAS_Nodes(workingState);
             IStrategy rasStrategy = null;
             String strategyConf = (String) td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY);
             try {
    -            rasStrategy = (IStrategy) ReflectionUtils.newSchedulerStrategyInstance((String) td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY), conf);
    +            rasStrategy = ReflectionUtils.newSchedulerStrategyInstance((String) td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY), conf);
                 rasStrategy.prepare(conf);
             } catch (DisallowedStrategyException e) {
    -            topologySubmitter.markTopoUnsuccess(td);
    -            cluster.setStatus(td.getId(), "Unsuccessful in scheduling - " + e.getAttemptedClass()
    -                    + " is not an allowed strategy. Please make sure your " + Config.TOPOLOGY_SCHEDULER_STRATEGY
    -                    + " config is one of the allowed strategies: " + e.getAllowedStrategies().toString());
    +            markFailedTopology(topologySubmitter, cluster, td,
    +                "Unsuccessful in scheduling - " + e.getAttemptedClass()
    +                    + " is not an allowed strategy. Please make sure your "
    +                    + Config.TOPOLOGY_SCHEDULER_STRATEGY
    +                    + " config is one of the allowed strategies: "
    +                    + e.getAllowedStrategies(), e);
                 return;
             } catch (RuntimeException e) {
    -            LOG.error("failed to create instance of IStrategy: {} Topology {} will not be scheduled.",
    -                    strategyConf, td.getName(), e);
    -            topologySubmitter.markTopoUnsuccess(td);
    -            cluster.setStatus(td.getId(), "Unsuccessful in scheduling - failed to create instance of topology strategy "
    -                    + strategyConf + ". Please check logs for details");
    +            markFailedTopology(topologySubmitter, cluster, td,
    +                "Unsuccessful in scheduling - failed to create instance of topology strategy "
    +                    + strategyConf
    +                    + ". Please check logs for details", e);
                 return;
             }
     
    -        while (true) {
    -            // A copy of the cluster that restricts the strategy to only modify a single topology
    +        for (int i = 0; i < maxSchedulingAttempts; i++) {
                 SingleTopologyCluster toSchedule = new SingleTopologyCluster(workingState, td.getId());
    -            SchedulingResult result = null;
                 try {
    -                result = rasStrategy.schedule(toSchedule, td);
    -            } catch (Exception ex) {
    -                LOG.error("Exception thrown when running strategy {} to schedule topology {}."
    -                        + " Topology will not be scheduled!", rasStrategy.getClass().getName(), td.getName(), ex);
    -                topologySubmitter.markTopoUnsuccess(td);
    -                cluster.setStatus(td.getId(), "Unsuccessful in scheduling - Exception thrown when running strategy {}"
    -                        + rasStrategy.getClass().getName() + ". Please check logs for details");
    -            }
    -            LOG.debug("scheduling result: {}", result);
    -            if (result != null) {
    -                if (result.isSuccess()) {
    -                    try {
    +                SchedulingResult result = rasStrategy.schedule(toSchedule, td);
    +                LOG.debug("scheduling result: {}", result);
    +                if (result != null) {
    +                    if (result.isSuccess()) {
                             cluster.updateFrom(toSchedule);
                             cluster.setStatus(td.getId(), "Running - " + result.getMessage());
    -                    } catch (Exception ex) {
    -                        LOG.error("Unsuccessful attempting to assign executors to nodes.", ex);
    -                        topologySubmitter.markTopoUnsuccess(td);
    -                        cluster.setStatus(td.getId(), "Unsuccessful in scheduling - "
    -                                + "IllegalStateException thrown when attempting to assign executors to nodes. Please check"
    -                                + " log for details.");
    -                    }
    -                    return;
    -                } else {
    -                    if (result.getStatus() == SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES) {
    -                        boolean madeSpace = false;
    +                        //DONE
    +                        return;
    +                    } else if (result.getStatus() == SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES) {
    +                        LOG.info("Not enough resources to schedule {}", td.getName());
    +                        List<TopologyDetails> reversedList = ImmutableList.copyOf(orderedTopologies).reverse();
                             try {
    -                            //need to re prepare since scheduling state might have been restored
    -                            madeSpace = evictionStrategy.makeSpaceForTopo(td, workingState, userMap);
    +                            boolean evictedSomething = false;
    +                            LOG.debug("attempting to make space for topo {} from user {}", td.getName(), td.getTopologySubmitter());
    +                            int tdIndex = reversedList.indexOf(td);
    +                            double cpuNeeded = td.getTotalRequestedCpu();
    +                            double memoryNeeded = td.getTotalRequestedMemOffHeap() + td.getTotalRequestedMemOnHeap();
    +                            SchedulerAssignment assignment = cluster.getAssignmentById(td.getId());
    +                            if (assignment != null) {
    +                                cpuNeeded -= getCpuUsed(assignment);
    +                                memoryNeeded -= getMemoryUsed(assignment);
    +                            }
    +                            cluster.getTopologyResourcesMap();
    +                            for (int index = 0; index < tdIndex; index++) {
    +                                TopologyDetails topologyEvict = reversedList.get(index);
    +                                SchedulerAssignment evictAssignemnt = workingState.getAssignmentById(topologyEvict.getId());
    +                                if (evictAssignemnt != null && !evictAssignemnt.getSlots().isEmpty()) {
    +                                    Collection<WorkerSlot> workersToEvict = workingState.getUsedSlotsByTopologyId(topologyEvict.getId());
    +
    +                                    LOG.debug("Evicting Topology {} with workers: {} from user {}", topologyEvict.getName(), workersToEvict,
    +                                        topologyEvict.getTopologySubmitter());
    +                                    cpuNeeded -= getCpuUsed(evictAssignemnt);
    +                                    memoryNeeded -= getMemoryUsed(evictAssignemnt);
    +                                    evictedSomething = true;
    +                                    nodes.freeSlots(workersToEvict);
    +                                    if (cpuNeeded <= 0 && memoryNeeded <= 0) {
    +                                        //We evicted enough topologies to have a hope of scheduling, so try it now, and don't evict more
    +                                        // than is needed
    +                                        break;
    +                                    }
    +                                }
    +                            }
    +
    +                            if (!evictedSomething) {
    +                                markFailedTopology(topologySubmitter, cluster, td,
    +                                    "Not enough resources to schedule - " + result.getErrorMessage());
    +                                return;
    +                            }
                             } catch (Exception ex) {
    -                            LOG.error("Exception thrown when running eviction strategy {} to schedule topology {}."
    -                                            + " No evictions will be done!", evictionStrategy.getClass().getName(),
    -                                    td.getName(), ex);
    -                            topologySubmitter.markTopoUnsuccess(td);
    -                            return;
    -                        }
    -                        if (!madeSpace) {
    -                            LOG.debug("Could not make space for topo {} will move to attempted", td);
    -                            topologySubmitter.markTopoUnsuccess(td);
    -                            cluster.setStatus(td.getId(), "Not enough resources to schedule - "
    -                                    + result.getErrorMessage());
    -                            return;
    +                            LOG.error("Exception thrown when running eviction to schedule topology {}."
    +                                    + " No evictions will be done! Error: {}",
    +                                td.getName(), ex.getClass().getName(), ex);
                             }
    +                        //Only place we fall though to do the loop over again...
                             continue;
                         } else {
    +                        //The assumption is that the strategy set the status...
                             topologySubmitter.markTopoUnsuccess(td, cluster);
                             return;
                         }
    +                } else {
    --- End diff --
    
    Should we switch the order so the null result is more obvious?


---

[GitHub] storm pull request #2400: STORM-2792: Remove RAS EvictionPolicy and cleanup

Posted by kishorvpatil <gi...@git.apache.org>.
Github user kishorvpatil commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2400#discussion_r148882086
  
    --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java ---
    @@ -62,123 +70,144 @@ public void prepare(Map<String, Object> conf) {
     
         @Override
         public void schedule(Topologies topologies, Cluster cluster) {
    -        //initialize data structures
    -        for (TopologyDetails td : cluster.getTopologies()) {
    +        Map<String, User> userMap = getUsers(cluster);
    +        List<TopologyDetails> orderedTopologies = new ArrayList<>(schedulingPriorityStrategy.getOrderedTopologies(cluster, userMap));
    +        LOG.info("Ordered list of topologies is: {}", orderedTopologies.stream().map((t) -> t.getId()).collect(Collectors.toList()));
    --- End diff --
    
    I would leave this at debug.


---

[GitHub] storm issue #2400: STORM-2792: Remove RAS EvictionPolicy and cleanup

Posted by jerrypeng <gi...@git.apache.org>.
Github user jerrypeng commented on the issue:

    https://github.com/apache/storm/pull/2400
  
    @revans2 can you elaborate on why EvictionPolicies are no longer needed? 


---

[GitHub] storm issue #2400: STORM-2792: Remove RAS EvictionPolicy and cleanup

Posted by jerrypeng <gi...@git.apache.org>.
Github user jerrypeng commented on the issue:

    https://github.com/apache/storm/pull/2400
  
    @revans2 yup you are right about it being more about the size of the component.  Great work!  Thanks for sharing the detailed information as its a good learning experience for me!  Keep me in to loop on how it goes.  It would be great if yahoo could release info about topology structure so people like me can run the simulation of scheduling over a large number of topologies.


---

[GitHub] storm issue #2400: STORM-2792: Remove RAS EvictionPolicy and cleanup

Posted by jerrypeng <gi...@git.apache.org>.
Github user jerrypeng commented on the issue:

    https://github.com/apache/storm/pull/2400
  
    @revans2 thanks for the explanation! Sorry my math was wrong!  I understand the context a lot better now! Though my intuition tells me that if larger topologies usually get scheduled first, this will likely to result in more fragmentation and less topologies to be scheduled overall if there is a lot of resource contention.


---

[GitHub] storm pull request #2400: STORM-2792: Remove RAS EvictionPolicy and cleanup

Posted by jerrypeng <gi...@git.apache.org>.
Github user jerrypeng commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2400#discussion_r148923188
  
    --- Diff: docs/Resource_Aware_Scheduler_overview.md ---
    @@ -243,58 +243,81 @@ http://dl.acm.org/citation.cfm?id=2814808
     <div id='Specifying-Topology-Prioritization-Strategy'/>
     ### Specifying Topology Prioritization Strategy
     
    -The order of scheduling is a pluggable interface in which a user could define a strategy that prioritizes topologies.  For a user to define his or her own prioritization strategy, he or she needs to implement the ISchedulingPriorityStrategy interface.  A user can set the scheduling priority strategy by setting the *Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY* to point to the class that implements the strategy. For instance:
    +The order of scheduling and eviction is determined by a pluggable interface in which the cluster owner can define how topologies should be scheduled.  For the owner to define his or her own prioritization strategy, she or he needs to implement the ISchedulingPriorityStrategy interface.  A user can set the scheduling priority strategy by setting the `DaemonConfig.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY` to point to the class that implements the strategy. For instance:
     ```
         resource.aware.scheduler.priority.strategy: "org.apache.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy"
     ```
    -A default strategy will be provided.  The following explains how the default scheduling priority strategy works.
    +
    +Topologies are scheduled starting at the beginning of the list returned by this plugin.  If there are not enough resources to schedule the topology others are evicted starting at the end of the list.  Eviction stops when there are no lower priority topologies left to evict.
     
     **DefaultSchedulingPriorityStrategy**
     
    -The order of scheduling should be based on the distance between a user’s current resource allocation and his or her guaranteed allocation.  We should prioritize the users who are the furthest away from their resource guarantee. The difficulty of this problem is that a user may have multiple resource guarantees, and another user can have another set of resource guarantees, so how can we compare them in a fair manner?  Let's use the average percentage of resource guarantees satisfied as a method of comparison.
    +In the past the order of scheduling was based on the distance between a user’s current resource allocation and his or her guaranteed allocation.
    +
    +We currently use a slightly different approach. We simulate scheduling the highest priority topology for each user and score the topology for each of the resources using the formula
    +
    +```
    +(Requested + Assigned - Guaranteed)/Available
    +```
    +
    +Where
    +
    + * `Requested` is the resource requested by this topology (or a approximation of it for complex requests like shared memory)
    + * `Assigned` is the resources already assigned by the simulation.
    + * `Guaranteed` is the resource guarantee for this user
    + * `Available` is the amount of that resource currently available in the cluster.
     
    -For example:
    +This gives a score that is negative for guaranteed requests and a score that is positive for requests that are not within the guarantee.
     
    -|User|Resource Guarantee|Resource Allocated|
    -|----|------------------|------------------|
    -|A|<10 CPU, 50GB>|<2 CPU, 40 GB>|
    -|B|< 20 CPU, 25GB>|<15 CPU, 10 GB>|
    +To combine different resources the maximum of all the indavidual resource scores is used.  This guarantees that if a user would go over a guarantee for a single resource it would not be offset by being under guarantee on any other resources.
     
    -User A’s average percentage satisfied of resource guarantee: 
    +For Example:
     
    -(2/10+40/50)/2  = 0.5
    +Assume we have to schedule the following topologies.
     
    -User B’s average percentage satisfied of resource guarantee: 
    +|ID|User|CPU|Memory|Priority|
    +|---|----|---|------|-------|
    +|A-1|A|100|1,000|1|
    +|A-2|A|100|1,000|10|
    +|B-1|B|100|1,000|1|
    +|B-2|B|100|1,000|10|
     
    -(15/20+10/25)/2  = 0.575
    +The cluster as a whole has 300 CPU and 4,000 Memory.
     
    -Thus, in this example User A has a smaller average percentage of his or her resource guarantee satisfied than User B.  Thus, User A should get priority to be allocated more resource, i.e., schedule a topology submitted by User A.
    +User A is guaranteed 100 CPU and 1,000 Memory.  User B is guaranteed 200 CPU and 1,500 Memory.  The scores for the most important, lowest priority number, topologies for each user would be.
    --- End diff --
    
    "The scores for the most important, lowest priority number, topologies for each user would be." 
    
    This sentence is a little confusing.  
    
    "The scores for the most important topology, i.e. topology with the lowest score,  for each user would be." 


---

[GitHub] storm pull request #2400: STORM-2792: Remove RAS EvictionPolicy and cleanup

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2400#discussion_r148850301
  
    --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java ---
    @@ -62,123 +70,144 @@ public void prepare(Map<String, Object> conf) {
     
         @Override
         public void schedule(Topologies topologies, Cluster cluster) {
    -        //initialize data structures
    -        for (TopologyDetails td : cluster.getTopologies()) {
    +        Map<String, User> userMap = getUsers(cluster);
    +        List<TopologyDetails> orderedTopologies = new ArrayList<>(schedulingPriorityStrategy.getOrderedTopologies(cluster, userMap));
    +        LOG.info("Ordered list of topologies is: {}", orderedTopologies.stream().map((t) -> t.getId()).collect(Collectors.toList()));
    --- End diff --
    
    This should change to a debug...


---

[GitHub] storm issue #2400: STORM-2792: Remove RAS EvictionPolicy and cleanup

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:

    https://github.com/apache/storm/pull/2400
  
    This is very long and I don't expect everyone to read the whole thing.
    
    @jerrypeng your numbers are off for big vs small topologies.  The scores will be negative for your example.
    
    -0.1 for Topology 1 vs  -0.01 for Topology 2.  The smallest of these scores (-0.1) is the larger topology (Topology 1) so it would be prioritized first. 
    
    The algorithm is kind of odd.  It was designed with #2385 (Generic RAS) in mind.  Specifically GPU support.  There were 2 big goals here.  First we wanted something that we could apply in a simple way across all types of resources (No matter how many resources there were), and second we wanted to prioritize requests that needed a rare resource over requests that needed common resources.  We had a lesser goal of trying to be fair between users over their guarantees.
    
    Why do we have these goals? We found a rather large problem with resource fragmentation in our clusters.  Up to 20% of the total resources end up not being usable because a single resource was effectively exhausted on a node before the other one could be.  GPUs are expensive and having one of them sitting idle because we filled the node up with other things is really hard to explain to a customer that needs the GPU.  If you look at #2385 you see that @govind-menon  has adjusted how nodes are selected for an executor.  Each time an executor is scheduled the nodes are sorted again.  The sorting of the nodes has also changed, even though that might be hard to see because of some of the refactoring.  It now tries to avoid nodes that have a "rare" resource on them unless the request needs that resource.
    
    This algorithm tries to help by handing the other algorithm topologies that use rare resources first.  Rare in this case means it uses a higher percentage of what is in the cluster as a whole.  It is not perfect in this, because It has to take into account the user given priority of the topology first along with guaranteed resources.  So it ends up prioritizing larger topologies that fit within a users guarantee.  But once a user goes over their guarantee, it treats all users equally and will prioritize users that have less resources requested (hence smaller topologies likely show up ahead of bigger ones).  We may adjust this over time as we get more experience running with these kinds of topologies, and may end up evicting parts of a topology to make room for another topology that needs a fragmented resource.  But I would really like to avoid this if at all possible.
    
    Now to answer your second question about why I removed the eviction strategy.  On our staging clusters we have been running with a FIFO eviction strategy for the same reasons described in the FIFO prioritization strategy in this patch.  A few times we saw nimbus get stuck in what looked like an infinite loop trying to schedule topologies.  We were never able to come up with a reproducible use case but the way the APIs were written with each strategy returning a single item (the next one to schedule or evict) that you could imagine how they could feed off of each other and cause a loop to happen.  We "fixed" the issue internally by limiting the total number of times a topology will be scheduled before we give up on trying to schedule it.  But it is ugly and inefficient.  I kept it for this patch, but I am leaning towards removing it.  Instead if we have a single source of truth for a total ordering of topologies there is no way for it to be inconsistent.  We just try to schedule s
 tarting at the most important and evict starting at the least important.  This means there are no unbounded loops in the code, so there is no way to be stuck in one of those loops.  With that change we only needed one plugin to sort all of the topologies into a total order, so that is the new prioritization strategy API.  Hence no need for an eviction strategy.


---

[GitHub] storm pull request #2400: STORM-2792: Remove RAS EvictionPolicy and cleanup

Posted by jerrypeng <gi...@git.apache.org>.
Github user jerrypeng commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2400#discussion_r148923218
  
    --- Diff: docs/Resource_Aware_Scheduler_overview.md ---
    @@ -243,58 +243,81 @@ http://dl.acm.org/citation.cfm?id=2814808
     <div id='Specifying-Topology-Prioritization-Strategy'/>
     ### Specifying Topology Prioritization Strategy
     
    -The order of scheduling is a pluggable interface in which a user could define a strategy that prioritizes topologies.  For a user to define his or her own prioritization strategy, he or she needs to implement the ISchedulingPriorityStrategy interface.  A user can set the scheduling priority strategy by setting the *Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY* to point to the class that implements the strategy. For instance:
    +The order of scheduling and eviction is determined by a pluggable interface in which the cluster owner can define how topologies should be scheduled.  For the owner to define his or her own prioritization strategy, she or he needs to implement the ISchedulingPriorityStrategy interface.  A user can set the scheduling priority strategy by setting the `DaemonConfig.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY` to point to the class that implements the strategy. For instance:
     ```
         resource.aware.scheduler.priority.strategy: "org.apache.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy"
     ```
    -A default strategy will be provided.  The following explains how the default scheduling priority strategy works.
    +
    +Topologies are scheduled starting at the beginning of the list returned by this plugin.  If there are not enough resources to schedule the topology others are evicted starting at the end of the list.  Eviction stops when there are no lower priority topologies left to evict.
     
     **DefaultSchedulingPriorityStrategy**
     
    -The order of scheduling should be based on the distance between a user’s current resource allocation and his or her guaranteed allocation.  We should prioritize the users who are the furthest away from their resource guarantee. The difficulty of this problem is that a user may have multiple resource guarantees, and another user can have another set of resource guarantees, so how can we compare them in a fair manner?  Let's use the average percentage of resource guarantees satisfied as a method of comparison.
    +In the past the order of scheduling was based on the distance between a user’s current resource allocation and his or her guaranteed allocation.
    +
    +We currently use a slightly different approach. We simulate scheduling the highest priority topology for each user and score the topology for each of the resources using the formula
    +
    +```
    +(Requested + Assigned - Guaranteed)/Available
    +```
    +
    +Where
    +
    + * `Requested` is the resource requested by this topology (or a approximation of it for complex requests like shared memory)
    + * `Assigned` is the resources already assigned by the simulation.
    + * `Guaranteed` is the resource guarantee for this user
    + * `Available` is the amount of that resource currently available in the cluster.
     
    -For example:
    +This gives a score that is negative for guaranteed requests and a score that is positive for requests that are not within the guarantee.
     
    -|User|Resource Guarantee|Resource Allocated|
    -|----|------------------|------------------|
    -|A|<10 CPU, 50GB>|<2 CPU, 40 GB>|
    -|B|< 20 CPU, 25GB>|<15 CPU, 10 GB>|
    +To combine different resources the maximum of all the indavidual resource scores is used.  This guarantees that if a user would go over a guarantee for a single resource it would not be offset by being under guarantee on any other resources.
     
    -User A’s average percentage satisfied of resource guarantee: 
    +For Example:
     
    -(2/10+40/50)/2  = 0.5
    +Assume we have to schedule the following topologies.
     
    -User B’s average percentage satisfied of resource guarantee: 
    +|ID|User|CPU|Memory|Priority|
    +|---|----|---|------|-------|
    +|A-1|A|100|1,000|1|
    +|A-2|A|100|1,000|10|
    +|B-1|B|100|1,000|1|
    +|B-2|B|100|1,000|10|
     
    -(15/20+10/25)/2  = 0.575
    +The cluster as a whole has 300 CPU and 4,000 Memory.
     
    -Thus, in this example User A has a smaller average percentage of his or her resource guarantee satisfied than User B.  Thus, User A should get priority to be allocated more resource, i.e., schedule a topology submitted by User A.
    +User A is guaranteed 100 CPU and 1,000 Memory.  User B is guaranteed 200 CPU and 1,500 Memory.  The scores for the most important, lowest priority number, topologies for each user would be.
    +
    +```
    +A-1 Score = max(CPU: (100 + 0 - 100)/300, MEM: (1,000 + 0 - 1,000)/4,000) = 0
    +B-1 Score = max(CPU: (100 + 0 - 200)/300, MEM: (1,000 + 0 - 1,500)/4,000) = -0.125
    +``` 
     
    -When scheduling, RAS sorts users by the average percentage satisfied of resource guarantee and schedule topologies from users based on that ordering starting from the users with the lowest average percentage satisfied of resource guarantee.  When a user’s resource guarantee is completely satisfied, the user’s average percentage satisfied of resource guarantee will be greater than or equal to 1.
    +`B-1` has the lowest score so it would be the highest priority topology to schedule. In the next round the scores would be.
     
    -<div id='Specifying-Eviction-Strategy'/>
    -### Specifying Eviction Strategy
    -The eviction strategy is used when there are not enough free resources in the cluster to schedule new topologies. If the cluster is full, we need a mechanism to evict topologies so that user resource guarantees can be met and additional resource can be shared fairly among users. The strategy for evicting topologies is also a pluggable interface in which the user can implement his or her own topology eviction strategy.  For a user to implement his or her own eviction strategy, he or she needs to implement the IEvictionStrategy Interface and set *Config.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY* to point to the implemented strategy class. For instance:
     ```
    -    resource.aware.scheduler.eviction.strategy: "org.apache.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy"
    +A-1 Score = max(CPU: (100 + 0 - 100)/200, MEM: (1,000 + 0 - 1,000)/3,000) = 0
    +B-2 Score = max(CPU: (100 + 100 - 200)/200, MEM: (1,000 + 1,000 - 1,500)/3,000) = 0.167
     ```
    -A default eviction strategy is provided.  The following explains how the default topology eviction strategy works
     
    -**DefaultEvictionStrategy**
    +`A-1` has the lowest score now so it would be the next higest priority topology to schedule.
    --- End diff --
    
    misspelling "higest"


---

[GitHub] storm issue #2400: STORM-2792: Remove RAS EvictionPolicy and cleanup

Posted by jerrypeng <gi...@git.apache.org>.
Github user jerrypeng commented on the issue:

    https://github.com/apache/storm/pull/2400
  
    @revans2 interesting work!  Though the formula used for the DefaultSchedulingPriorityStrategy seems to have a bias towards smaller topologies.
    
    For example:
    
    Total amount of Resource R in cluster= 1000
    
    Topology 1 Resource R: Request = 100 Guarantee = 200
    
    Topology 2 Resource R: Request = 10 Guarantee = 20
    
    Topology 1 score = (100 - 0 - 200) / 1000 = 0.1
    
    Topology 2 score = (10 - 0 - 20) / 1000 = 0.01
    
    Topology 2 will be prioritized or topology one correct? Even though the ratio their respective request and guarantee is the same.  This may cause larger topologies to be starved.
    



---

[GitHub] storm issue #2400: STORM-2792: Remove RAS EvictionPolicy and cleanup

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:

    https://github.com/apache/storm/pull/2400
  
    @jerrypeng yes I need to go follow up on my previous ask and see where the lawyers are at with it.


---

[GitHub] storm pull request #2400: STORM-2792: Remove RAS EvictionPolicy and cleanup

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/storm/pull/2400


---

[GitHub] storm pull request #2400: STORM-2792: Remove RAS EvictionPolicy and cleanup

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2400#discussion_r148852167
  
    --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java ---
    @@ -62,123 +70,144 @@ public void prepare(Map<String, Object> conf) {
     
         @Override
         public void schedule(Topologies topologies, Cluster cluster) {
    -        //initialize data structures
    -        for (TopologyDetails td : cluster.getTopologies()) {
    +        Map<String, User> userMap = getUsers(cluster);
    +        List<TopologyDetails> orderedTopologies = new ArrayList<>(schedulingPriorityStrategy.getOrderedTopologies(cluster, userMap));
    +        LOG.info("Ordered list of topologies is: {}", orderedTopologies.stream().map((t) -> t.getId()).collect(Collectors.toList()));
    +        for (TopologyDetails td : orderedTopologies) {
                 if (!cluster.needsSchedulingRas(td)) {
                     //cluster forgets about its previous status, so if it is scheduled just leave it.
                     cluster.setStatusIfAbsent(td.getId(), "Fully Scheduled");
    -            }
    -        }
    -        Map<String, User> userMap = getUsers(cluster);
    -
    -        while (true) {
    -            TopologyDetails td;
    -            try {
    -                //Call scheduling priority strategy
    -                td = schedulingPrioritystrategy.getNextTopologyToSchedule(cluster, userMap);
    -            } catch (Exception ex) {
    -                LOG.error("Exception thrown when running priority strategy {}. No topologies will be scheduled!",
    -                        schedulingPrioritystrategy.getClass().getName(), ex);
    -                break;
    -            }
    -            if (td == null) {
    -                break;
    -            }
    -            User submitter = userMap.get(td.getTopologySubmitter());
    -            if (cluster.needsSchedulingRas(td)) {
    -                scheduleTopology(td, cluster, submitter, userMap);
                 } else {
    -                LOG.warn("Topology {} is already fully scheduled!", td.getName());
    -                cluster.setStatusIfAbsent(td.getId(), "Fully Scheduled");
    +                User submitter = userMap.get(td.getTopologySubmitter());
    +                scheduleTopology(td, cluster, submitter, orderedTopologies);
                 }
             }
         }
     
    +    private static void markFailedTopology(User u, Cluster c, TopologyDetails td, String message) {
    +        markFailedTopology(u, c, td, message, null);
    +    }
     
    -    public void scheduleTopology(TopologyDetails td, Cluster cluster, final User topologySubmitter,
    -                                 Map<String, User> userMap) {
    +    private static void markFailedTopology(User u, Cluster c, TopologyDetails td, String message, Throwable t) {
    +        c.setStatus(td, message);
    +        String realMessage = td.getId() + " " + message;
    +        if (t != null) {
    +            LOG.error(realMessage, t);
    +        } else {
    +            LOG.error(realMessage);
    +        }
    +        u.markTopoUnsuccess(td);
    +    }
    +
    +    private void scheduleTopology(TopologyDetails td, Cluster cluster, final User topologySubmitter,
    +                                  List<TopologyDetails> orderedTopologies) {
             //A copy of cluster that we can modify, but does not get committed back to cluster unless scheduling succeeds
             Cluster workingState = new Cluster(cluster);
    +        RAS_Nodes nodes = new RAS_Nodes(workingState);
             IStrategy rasStrategy = null;
             String strategyConf = (String) td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY);
             try {
    -            rasStrategy = (IStrategy) ReflectionUtils.newSchedulerStrategyInstance((String) td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY), conf);
    +            rasStrategy = ReflectionUtils.newSchedulerStrategyInstance((String) td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY), conf);
                 rasStrategy.prepare(conf);
             } catch (DisallowedStrategyException e) {
    -            topologySubmitter.markTopoUnsuccess(td);
    -            cluster.setStatus(td.getId(), "Unsuccessful in scheduling - " + e.getAttemptedClass()
    -                    + " is not an allowed strategy. Please make sure your " + Config.TOPOLOGY_SCHEDULER_STRATEGY
    -                    + " config is one of the allowed strategies: " + e.getAllowedStrategies().toString());
    +            markFailedTopology(topologySubmitter, cluster, td,
    +                "Unsuccessful in scheduling - " + e.getAttemptedClass()
    +                    + " is not an allowed strategy. Please make sure your "
    +                    + Config.TOPOLOGY_SCHEDULER_STRATEGY
    +                    + " config is one of the allowed strategies: "
    +                    + e.getAllowedStrategies(), e);
                 return;
             } catch (RuntimeException e) {
    -            LOG.error("failed to create instance of IStrategy: {} Topology {} will not be scheduled.",
    -                    strategyConf, td.getName(), e);
    -            topologySubmitter.markTopoUnsuccess(td);
    -            cluster.setStatus(td.getId(), "Unsuccessful in scheduling - failed to create instance of topology strategy "
    -                    + strategyConf + ". Please check logs for details");
    +            markFailedTopology(topologySubmitter, cluster, td,
    +                "Unsuccessful in scheduling - failed to create instance of topology strategy "
    +                    + strategyConf
    +                    + ". Please check logs for details", e);
                 return;
             }
     
    -        while (true) {
    -            // A copy of the cluster that restricts the strategy to only modify a single topology
    +        for (int i = 0; i < maxSchedulingAttempts; i++) {
                 SingleTopologyCluster toSchedule = new SingleTopologyCluster(workingState, td.getId());
    -            SchedulingResult result = null;
                 try {
    -                result = rasStrategy.schedule(toSchedule, td);
    -            } catch (Exception ex) {
    -                LOG.error("Exception thrown when running strategy {} to schedule topology {}."
    -                        + " Topology will not be scheduled!", rasStrategy.getClass().getName(), td.getName(), ex);
    -                topologySubmitter.markTopoUnsuccess(td);
    -                cluster.setStatus(td.getId(), "Unsuccessful in scheduling - Exception thrown when running strategy {}"
    -                        + rasStrategy.getClass().getName() + ". Please check logs for details");
    -            }
    -            LOG.debug("scheduling result: {}", result);
    -            if (result != null) {
    -                if (result.isSuccess()) {
    -                    try {
    +                SchedulingResult result = rasStrategy.schedule(toSchedule, td);
    +                LOG.debug("scheduling result: {}", result);
    +                if (result != null) {
    +                    if (result.isSuccess()) {
                             cluster.updateFrom(toSchedule);
                             cluster.setStatus(td.getId(), "Running - " + result.getMessage());
    -                    } catch (Exception ex) {
    -                        LOG.error("Unsuccessful attempting to assign executors to nodes.", ex);
    -                        topologySubmitter.markTopoUnsuccess(td);
    -                        cluster.setStatus(td.getId(), "Unsuccessful in scheduling - "
    -                                + "IllegalStateException thrown when attempting to assign executors to nodes. Please check"
    -                                + " log for details.");
    -                    }
    -                    return;
    -                } else {
    -                    if (result.getStatus() == SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES) {
    -                        boolean madeSpace = false;
    +                        //DONE
    +                        return;
    +                    } else if (result.getStatus() == SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES) {
    +                        LOG.info("Not enough resources to schedule {}", td.getName());
    +                        List<TopologyDetails> reversedList = ImmutableList.copyOf(orderedTopologies).reverse();
                             try {
    -                            //need to re prepare since scheduling state might have been restored
    -                            madeSpace = evictionStrategy.makeSpaceForTopo(td, workingState, userMap);
    +                            boolean evictedSomething = false;
    +                            LOG.debug("attempting to make space for topo {} from user {}", td.getName(), td.getTopologySubmitter());
    +                            int tdIndex = reversedList.indexOf(td);
    +                            double cpuNeeded = td.getTotalRequestedCpu();
    +                            double memoryNeeded = td.getTotalRequestedMemOffHeap() + td.getTotalRequestedMemOnHeap();
    +                            SchedulerAssignment assignment = cluster.getAssignmentById(td.getId());
    +                            if (assignment != null) {
    +                                cpuNeeded -= getCpuUsed(assignment);
    +                                memoryNeeded -= getMemoryUsed(assignment);
    +                            }
    +                            cluster.getTopologyResourcesMap();
    +                            for (int index = 0; index < tdIndex; index++) {
    +                                TopologyDetails topologyEvict = reversedList.get(index);
    +                                SchedulerAssignment evictAssignemnt = workingState.getAssignmentById(topologyEvict.getId());
    +                                if (evictAssignemnt != null && !evictAssignemnt.getSlots().isEmpty()) {
    +                                    Collection<WorkerSlot> workersToEvict = workingState.getUsedSlotsByTopologyId(topologyEvict.getId());
    +
    +                                    LOG.debug("Evicting Topology {} with workers: {} from user {}", topologyEvict.getName(), workersToEvict,
    +                                        topologyEvict.getTopologySubmitter());
    +                                    cpuNeeded -= getCpuUsed(evictAssignemnt);
    +                                    memoryNeeded -= getMemoryUsed(evictAssignemnt);
    +                                    evictedSomething = true;
    +                                    nodes.freeSlots(workersToEvict);
    +                                    if (cpuNeeded <= 0 && memoryNeeded <= 0) {
    +                                        //We evicted enough topologies to have a hope of scheduling, so try it now, and don't evict more
    +                                        // than is needed
    +                                        break;
    +                                    }
    +                                }
    +                            }
    +
    +                            if (!evictedSomething) {
    +                                markFailedTopology(topologySubmitter, cluster, td,
    +                                    "Not enough resources to schedule - " + result.getErrorMessage());
    --- End diff --
    
    Should we change this to give them an idea of what resource was missing?  CPU/Memory etc.


---

[GitHub] storm pull request #2400: STORM-2792: Remove RAS EvictionPolicy and cleanup

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2400#discussion_r148851660
  
    --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java ---
    @@ -62,123 +70,144 @@ public void prepare(Map<String, Object> conf) {
     
         @Override
         public void schedule(Topologies topologies, Cluster cluster) {
    -        //initialize data structures
    -        for (TopologyDetails td : cluster.getTopologies()) {
    +        Map<String, User> userMap = getUsers(cluster);
    +        List<TopologyDetails> orderedTopologies = new ArrayList<>(schedulingPriorityStrategy.getOrderedTopologies(cluster, userMap));
    +        LOG.info("Ordered list of topologies is: {}", orderedTopologies.stream().map((t) -> t.getId()).collect(Collectors.toList()));
    +        for (TopologyDetails td : orderedTopologies) {
                 if (!cluster.needsSchedulingRas(td)) {
                     //cluster forgets about its previous status, so if it is scheduled just leave it.
                     cluster.setStatusIfAbsent(td.getId(), "Fully Scheduled");
    -            }
    -        }
    -        Map<String, User> userMap = getUsers(cluster);
    -
    -        while (true) {
    -            TopologyDetails td;
    -            try {
    -                //Call scheduling priority strategy
    -                td = schedulingPrioritystrategy.getNextTopologyToSchedule(cluster, userMap);
    -            } catch (Exception ex) {
    -                LOG.error("Exception thrown when running priority strategy {}. No topologies will be scheduled!",
    -                        schedulingPrioritystrategy.getClass().getName(), ex);
    -                break;
    -            }
    -            if (td == null) {
    -                break;
    -            }
    -            User submitter = userMap.get(td.getTopologySubmitter());
    -            if (cluster.needsSchedulingRas(td)) {
    -                scheduleTopology(td, cluster, submitter, userMap);
                 } else {
    -                LOG.warn("Topology {} is already fully scheduled!", td.getName());
    -                cluster.setStatusIfAbsent(td.getId(), "Fully Scheduled");
    +                User submitter = userMap.get(td.getTopologySubmitter());
    +                scheduleTopology(td, cluster, submitter, orderedTopologies);
                 }
             }
         }
     
    +    private static void markFailedTopology(User u, Cluster c, TopologyDetails td, String message) {
    +        markFailedTopology(u, c, td, message, null);
    +    }
     
    -    public void scheduleTopology(TopologyDetails td, Cluster cluster, final User topologySubmitter,
    -                                 Map<String, User> userMap) {
    +    private static void markFailedTopology(User u, Cluster c, TopologyDetails td, String message, Throwable t) {
    +        c.setStatus(td, message);
    +        String realMessage = td.getId() + " " + message;
    +        if (t != null) {
    +            LOG.error(realMessage, t);
    +        } else {
    +            LOG.error(realMessage);
    +        }
    +        u.markTopoUnsuccess(td);
    +    }
    +
    +    private void scheduleTopology(TopologyDetails td, Cluster cluster, final User topologySubmitter,
    +                                  List<TopologyDetails> orderedTopologies) {
             //A copy of cluster that we can modify, but does not get committed back to cluster unless scheduling succeeds
             Cluster workingState = new Cluster(cluster);
    +        RAS_Nodes nodes = new RAS_Nodes(workingState);
             IStrategy rasStrategy = null;
             String strategyConf = (String) td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY);
             try {
    -            rasStrategy = (IStrategy) ReflectionUtils.newSchedulerStrategyInstance((String) td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY), conf);
    +            rasStrategy = ReflectionUtils.newSchedulerStrategyInstance((String) td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY), conf);
                 rasStrategy.prepare(conf);
             } catch (DisallowedStrategyException e) {
    -            topologySubmitter.markTopoUnsuccess(td);
    -            cluster.setStatus(td.getId(), "Unsuccessful in scheduling - " + e.getAttemptedClass()
    -                    + " is not an allowed strategy. Please make sure your " + Config.TOPOLOGY_SCHEDULER_STRATEGY
    -                    + " config is one of the allowed strategies: " + e.getAllowedStrategies().toString());
    +            markFailedTopology(topologySubmitter, cluster, td,
    +                "Unsuccessful in scheduling - " + e.getAttemptedClass()
    +                    + " is not an allowed strategy. Please make sure your "
    +                    + Config.TOPOLOGY_SCHEDULER_STRATEGY
    +                    + " config is one of the allowed strategies: "
    +                    + e.getAllowedStrategies(), e);
                 return;
             } catch (RuntimeException e) {
    -            LOG.error("failed to create instance of IStrategy: {} Topology {} will not be scheduled.",
    -                    strategyConf, td.getName(), e);
    -            topologySubmitter.markTopoUnsuccess(td);
    -            cluster.setStatus(td.getId(), "Unsuccessful in scheduling - failed to create instance of topology strategy "
    -                    + strategyConf + ". Please check logs for details");
    +            markFailedTopology(topologySubmitter, cluster, td,
    +                "Unsuccessful in scheduling - failed to create instance of topology strategy "
    +                    + strategyConf
    +                    + ". Please check logs for details", e);
                 return;
             }
     
    -        while (true) {
    -            // A copy of the cluster that restricts the strategy to only modify a single topology
    +        for (int i = 0; i < maxSchedulingAttempts; i++) {
                 SingleTopologyCluster toSchedule = new SingleTopologyCluster(workingState, td.getId());
    -            SchedulingResult result = null;
                 try {
    -                result = rasStrategy.schedule(toSchedule, td);
    -            } catch (Exception ex) {
    -                LOG.error("Exception thrown when running strategy {} to schedule topology {}."
    -                        + " Topology will not be scheduled!", rasStrategy.getClass().getName(), td.getName(), ex);
    -                topologySubmitter.markTopoUnsuccess(td);
    -                cluster.setStatus(td.getId(), "Unsuccessful in scheduling - Exception thrown when running strategy {}"
    -                        + rasStrategy.getClass().getName() + ". Please check logs for details");
    -            }
    -            LOG.debug("scheduling result: {}", result);
    -            if (result != null) {
    -                if (result.isSuccess()) {
    -                    try {
    +                SchedulingResult result = rasStrategy.schedule(toSchedule, td);
    +                LOG.debug("scheduling result: {}", result);
    +                if (result != null) {
    +                    if (result.isSuccess()) {
                             cluster.updateFrom(toSchedule);
                             cluster.setStatus(td.getId(), "Running - " + result.getMessage());
    -                    } catch (Exception ex) {
    -                        LOG.error("Unsuccessful attempting to assign executors to nodes.", ex);
    -                        topologySubmitter.markTopoUnsuccess(td);
    -                        cluster.setStatus(td.getId(), "Unsuccessful in scheduling - "
    -                                + "IllegalStateException thrown when attempting to assign executors to nodes. Please check"
    -                                + " log for details.");
    -                    }
    -                    return;
    -                } else {
    -                    if (result.getStatus() == SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES) {
    -                        boolean madeSpace = false;
    +                        //DONE
    +                        return;
    +                    } else if (result.getStatus() == SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES) {
    +                        LOG.info("Not enough resources to schedule {}", td.getName());
    +                        List<TopologyDetails> reversedList = ImmutableList.copyOf(orderedTopologies).reverse();
                             try {
    -                            //need to re prepare since scheduling state might have been restored
    -                            madeSpace = evictionStrategy.makeSpaceForTopo(td, workingState, userMap);
    +                            boolean evictedSomething = false;
    +                            LOG.debug("attempting to make space for topo {} from user {}", td.getName(), td.getTopologySubmitter());
    +                            int tdIndex = reversedList.indexOf(td);
    +                            double cpuNeeded = td.getTotalRequestedCpu();
    +                            double memoryNeeded = td.getTotalRequestedMemOffHeap() + td.getTotalRequestedMemOnHeap();
    +                            SchedulerAssignment assignment = cluster.getAssignmentById(td.getId());
    +                            if (assignment != null) {
    +                                cpuNeeded -= getCpuUsed(assignment);
    +                                memoryNeeded -= getMemoryUsed(assignment);
    +                            }
    +                            cluster.getTopologyResourcesMap();
    --- End diff --
    
    Delete this line it is not needed.


---

[GitHub] storm issue #2400: STORM-2792: Remove RAS EvictionPolicy and cleanup

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:

    https://github.com/apache/storm/pull/2400
  
    @jerrypeng,
    
    I am not sure your intuition is right, but this is all still theoretical until we roll it out and see what happens in real life.  We have run some simulations that at least for our simulated load it does not appear to be any worse and in cases where there are GPU like resources it is a lot better. 
    
    Solving fragmentation should mostly be around matching the ratio of resources in a request to the resources left on a node, which is a lot of what your initial RAS paper was about.  The problem is that when sorting the nodes we prioritize proximity to other parts of the same topology over fixing fragmentation.  So fragmentation really only matters when scheduling the first executor on a node.  Because of this I don't think it is the size of the topology that matters as much as it is the size of the individual components.  To really solve this we need a way to balance the desire to co-locate components with how well does this request fill what is left on the node.  I am hopeful that when we finish https://issues.apache.org/jira/browse/STORM-2684 the scheduler will group parts of a topology that give it the biggest win within a single "super component" and then if we need to we can look at having a config that controls when to switch from sorting by co-locating to sorting to reduce
  fragmentation.  i.e. when do we move on to the next node in the rack even if this one is not full because we are starting to run low on resource X.
    
    If you have suggestions or want to collaborate on it that would be great, but you know how hard it is to for us to get legal approval to share too much more than just code. So for now we want to try and get this feature rolled out and then monitor it to see how it goes and if we need to adjust anything. 
    
    @govind-menon,
    
    Do you have some of the simulation results in a human readable format we can share?  Also if you have the code we used to run the simulation putting an Apache license on it posting it would be great so that others can reproduce what we have done.


---

[GitHub] storm pull request #2400: STORM-2792: Remove RAS EvictionPolicy and cleanup

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2400#discussion_r148852532
  
    --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java ---
    @@ -62,123 +70,144 @@ public void prepare(Map<String, Object> conf) {
     
         @Override
         public void schedule(Topologies topologies, Cluster cluster) {
    -        //initialize data structures
    -        for (TopologyDetails td : cluster.getTopologies()) {
    +        Map<String, User> userMap = getUsers(cluster);
    +        List<TopologyDetails> orderedTopologies = new ArrayList<>(schedulingPriorityStrategy.getOrderedTopologies(cluster, userMap));
    +        LOG.info("Ordered list of topologies is: {}", orderedTopologies.stream().map((t) -> t.getId()).collect(Collectors.toList()));
    +        for (TopologyDetails td : orderedTopologies) {
                 if (!cluster.needsSchedulingRas(td)) {
                     //cluster forgets about its previous status, so if it is scheduled just leave it.
                     cluster.setStatusIfAbsent(td.getId(), "Fully Scheduled");
    -            }
    -        }
    -        Map<String, User> userMap = getUsers(cluster);
    -
    -        while (true) {
    -            TopologyDetails td;
    -            try {
    -                //Call scheduling priority strategy
    -                td = schedulingPrioritystrategy.getNextTopologyToSchedule(cluster, userMap);
    -            } catch (Exception ex) {
    -                LOG.error("Exception thrown when running priority strategy {}. No topologies will be scheduled!",
    -                        schedulingPrioritystrategy.getClass().getName(), ex);
    -                break;
    -            }
    -            if (td == null) {
    -                break;
    -            }
    -            User submitter = userMap.get(td.getTopologySubmitter());
    -            if (cluster.needsSchedulingRas(td)) {
    -                scheduleTopology(td, cluster, submitter, userMap);
                 } else {
    -                LOG.warn("Topology {} is already fully scheduled!", td.getName());
    -                cluster.setStatusIfAbsent(td.getId(), "Fully Scheduled");
    +                User submitter = userMap.get(td.getTopologySubmitter());
    +                scheduleTopology(td, cluster, submitter, orderedTopologies);
                 }
             }
         }
     
    +    private static void markFailedTopology(User u, Cluster c, TopologyDetails td, String message) {
    +        markFailedTopology(u, c, td, message, null);
    +    }
     
    -    public void scheduleTopology(TopologyDetails td, Cluster cluster, final User topologySubmitter,
    -                                 Map<String, User> userMap) {
    +    private static void markFailedTopology(User u, Cluster c, TopologyDetails td, String message, Throwable t) {
    +        c.setStatus(td, message);
    +        String realMessage = td.getId() + " " + message;
    +        if (t != null) {
    +            LOG.error(realMessage, t);
    +        } else {
    +            LOG.error(realMessage);
    +        }
    +        u.markTopoUnsuccess(td);
    +    }
    +
    +    private void scheduleTopology(TopologyDetails td, Cluster cluster, final User topologySubmitter,
    +                                  List<TopologyDetails> orderedTopologies) {
             //A copy of cluster that we can modify, but does not get committed back to cluster unless scheduling succeeds
             Cluster workingState = new Cluster(cluster);
    +        RAS_Nodes nodes = new RAS_Nodes(workingState);
             IStrategy rasStrategy = null;
             String strategyConf = (String) td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY);
             try {
    -            rasStrategy = (IStrategy) ReflectionUtils.newSchedulerStrategyInstance((String) td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY), conf);
    +            rasStrategy = ReflectionUtils.newSchedulerStrategyInstance((String) td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY), conf);
                 rasStrategy.prepare(conf);
             } catch (DisallowedStrategyException e) {
    -            topologySubmitter.markTopoUnsuccess(td);
    -            cluster.setStatus(td.getId(), "Unsuccessful in scheduling - " + e.getAttemptedClass()
    -                    + " is not an allowed strategy. Please make sure your " + Config.TOPOLOGY_SCHEDULER_STRATEGY
    -                    + " config is one of the allowed strategies: " + e.getAllowedStrategies().toString());
    +            markFailedTopology(topologySubmitter, cluster, td,
    +                "Unsuccessful in scheduling - " + e.getAttemptedClass()
    +                    + " is not an allowed strategy. Please make sure your "
    +                    + Config.TOPOLOGY_SCHEDULER_STRATEGY
    +                    + " config is one of the allowed strategies: "
    +                    + e.getAllowedStrategies(), e);
                 return;
             } catch (RuntimeException e) {
    -            LOG.error("failed to create instance of IStrategy: {} Topology {} will not be scheduled.",
    -                    strategyConf, td.getName(), e);
    -            topologySubmitter.markTopoUnsuccess(td);
    -            cluster.setStatus(td.getId(), "Unsuccessful in scheduling - failed to create instance of topology strategy "
    -                    + strategyConf + ". Please check logs for details");
    +            markFailedTopology(topologySubmitter, cluster, td,
    +                "Unsuccessful in scheduling - failed to create instance of topology strategy "
    +                    + strategyConf
    +                    + ". Please check logs for details", e);
                 return;
             }
     
    -        while (true) {
    -            // A copy of the cluster that restricts the strategy to only modify a single topology
    +        for (int i = 0; i < maxSchedulingAttempts; i++) {
                 SingleTopologyCluster toSchedule = new SingleTopologyCluster(workingState, td.getId());
    -            SchedulingResult result = null;
                 try {
    -                result = rasStrategy.schedule(toSchedule, td);
    -            } catch (Exception ex) {
    -                LOG.error("Exception thrown when running strategy {} to schedule topology {}."
    -                        + " Topology will not be scheduled!", rasStrategy.getClass().getName(), td.getName(), ex);
    -                topologySubmitter.markTopoUnsuccess(td);
    -                cluster.setStatus(td.getId(), "Unsuccessful in scheduling - Exception thrown when running strategy {}"
    -                        + rasStrategy.getClass().getName() + ". Please check logs for details");
    -            }
    -            LOG.debug("scheduling result: {}", result);
    -            if (result != null) {
    -                if (result.isSuccess()) {
    -                    try {
    +                SchedulingResult result = rasStrategy.schedule(toSchedule, td);
    +                LOG.debug("scheduling result: {}", result);
    +                if (result != null) {
    +                    if (result.isSuccess()) {
                             cluster.updateFrom(toSchedule);
                             cluster.setStatus(td.getId(), "Running - " + result.getMessage());
    -                    } catch (Exception ex) {
    -                        LOG.error("Unsuccessful attempting to assign executors to nodes.", ex);
    -                        topologySubmitter.markTopoUnsuccess(td);
    -                        cluster.setStatus(td.getId(), "Unsuccessful in scheduling - "
    -                                + "IllegalStateException thrown when attempting to assign executors to nodes. Please check"
    -                                + " log for details.");
    -                    }
    -                    return;
    -                } else {
    -                    if (result.getStatus() == SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES) {
    -                        boolean madeSpace = false;
    +                        //DONE
    +                        return;
    +                    } else if (result.getStatus() == SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES) {
    +                        LOG.info("Not enough resources to schedule {}", td.getName());
    +                        List<TopologyDetails> reversedList = ImmutableList.copyOf(orderedTopologies).reverse();
                             try {
    -                            //need to re prepare since scheduling state might have been restored
    -                            madeSpace = evictionStrategy.makeSpaceForTopo(td, workingState, userMap);
    +                            boolean evictedSomething = false;
    +                            LOG.debug("attempting to make space for topo {} from user {}", td.getName(), td.getTopologySubmitter());
    +                            int tdIndex = reversedList.indexOf(td);
    +                            double cpuNeeded = td.getTotalRequestedCpu();
    +                            double memoryNeeded = td.getTotalRequestedMemOffHeap() + td.getTotalRequestedMemOnHeap();
    +                            SchedulerAssignment assignment = cluster.getAssignmentById(td.getId());
    +                            if (assignment != null) {
    +                                cpuNeeded -= getCpuUsed(assignment);
    +                                memoryNeeded -= getMemoryUsed(assignment);
    +                            }
    +                            cluster.getTopologyResourcesMap();
    +                            for (int index = 0; index < tdIndex; index++) {
    +                                TopologyDetails topologyEvict = reversedList.get(index);
    +                                SchedulerAssignment evictAssignemnt = workingState.getAssignmentById(topologyEvict.getId());
    +                                if (evictAssignemnt != null && !evictAssignemnt.getSlots().isEmpty()) {
    +                                    Collection<WorkerSlot> workersToEvict = workingState.getUsedSlotsByTopologyId(topologyEvict.getId());
    +
    +                                    LOG.debug("Evicting Topology {} with workers: {} from user {}", topologyEvict.getName(), workersToEvict,
    +                                        topologyEvict.getTopologySubmitter());
    +                                    cpuNeeded -= getCpuUsed(evictAssignemnt);
    +                                    memoryNeeded -= getMemoryUsed(evictAssignemnt);
    +                                    evictedSomething = true;
    +                                    nodes.freeSlots(workersToEvict);
    +                                    if (cpuNeeded <= 0 && memoryNeeded <= 0) {
    +                                        //We evicted enough topologies to have a hope of scheduling, so try it now, and don't evict more
    +                                        // than is needed
    +                                        break;
    +                                    }
    +                                }
    +                            }
    +
    +                            if (!evictedSomething) {
    +                                markFailedTopology(topologySubmitter, cluster, td,
    +                                    "Not enough resources to schedule - " + result.getErrorMessage());
    +                                return;
    +                            }
                             } catch (Exception ex) {
    -                            LOG.error("Exception thrown when running eviction strategy {} to schedule topology {}."
    -                                            + " No evictions will be done!", evictionStrategy.getClass().getName(),
    -                                    td.getName(), ex);
    -                            topologySubmitter.markTopoUnsuccess(td);
    -                            return;
    -                        }
    -                        if (!madeSpace) {
    -                            LOG.debug("Could not make space for topo {} will move to attempted", td);
    -                            topologySubmitter.markTopoUnsuccess(td);
    -                            cluster.setStatus(td.getId(), "Not enough resources to schedule - "
    -                                    + result.getErrorMessage());
    -                            return;
    +                            LOG.error("Exception thrown when running eviction to schedule topology {}."
    +                                    + " No evictions will be done! Error: {}",
    +                                td.getName(), ex.getClass().getName(), ex);
    --- End diff --
    
    Why do we need the exception class name???


---

[GitHub] storm pull request #2400: STORM-2792: Remove RAS EvictionPolicy and cleanup

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2400#discussion_r148850943
  
    --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java ---
    @@ -62,123 +70,144 @@ public void prepare(Map<String, Object> conf) {
     
         @Override
         public void schedule(Topologies topologies, Cluster cluster) {
    -        //initialize data structures
    -        for (TopologyDetails td : cluster.getTopologies()) {
    +        Map<String, User> userMap = getUsers(cluster);
    +        List<TopologyDetails> orderedTopologies = new ArrayList<>(schedulingPriorityStrategy.getOrderedTopologies(cluster, userMap));
    +        LOG.info("Ordered list of topologies is: {}", orderedTopologies.stream().map((t) -> t.getId()).collect(Collectors.toList()));
    +        for (TopologyDetails td : orderedTopologies) {
                 if (!cluster.needsSchedulingRas(td)) {
                     //cluster forgets about its previous status, so if it is scheduled just leave it.
                     cluster.setStatusIfAbsent(td.getId(), "Fully Scheduled");
    -            }
    -        }
    -        Map<String, User> userMap = getUsers(cluster);
    -
    -        while (true) {
    -            TopologyDetails td;
    -            try {
    -                //Call scheduling priority strategy
    -                td = schedulingPrioritystrategy.getNextTopologyToSchedule(cluster, userMap);
    -            } catch (Exception ex) {
    -                LOG.error("Exception thrown when running priority strategy {}. No topologies will be scheduled!",
    -                        schedulingPrioritystrategy.getClass().getName(), ex);
    -                break;
    -            }
    -            if (td == null) {
    -                break;
    -            }
    -            User submitter = userMap.get(td.getTopologySubmitter());
    -            if (cluster.needsSchedulingRas(td)) {
    -                scheduleTopology(td, cluster, submitter, userMap);
                 } else {
    -                LOG.warn("Topology {} is already fully scheduled!", td.getName());
    -                cluster.setStatusIfAbsent(td.getId(), "Fully Scheduled");
    +                User submitter = userMap.get(td.getTopologySubmitter());
    +                scheduleTopology(td, cluster, submitter, orderedTopologies);
                 }
             }
         }
     
    +    private static void markFailedTopology(User u, Cluster c, TopologyDetails td, String message) {
    +        markFailedTopology(u, c, td, message, null);
    +    }
     
    -    public void scheduleTopology(TopologyDetails td, Cluster cluster, final User topologySubmitter,
    -                                 Map<String, User> userMap) {
    +    private static void markFailedTopology(User u, Cluster c, TopologyDetails td, String message, Throwable t) {
    +        c.setStatus(td, message);
    +        String realMessage = td.getId() + " " + message;
    +        if (t != null) {
    +            LOG.error(realMessage, t);
    +        } else {
    +            LOG.error(realMessage);
    +        }
    +        u.markTopoUnsuccess(td);
    +    }
    +
    +    private void scheduleTopology(TopologyDetails td, Cluster cluster, final User topologySubmitter,
    +                                  List<TopologyDetails> orderedTopologies) {
             //A copy of cluster that we can modify, but does not get committed back to cluster unless scheduling succeeds
             Cluster workingState = new Cluster(cluster);
    +        RAS_Nodes nodes = new RAS_Nodes(workingState);
             IStrategy rasStrategy = null;
             String strategyConf = (String) td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY);
             try {
    -            rasStrategy = (IStrategy) ReflectionUtils.newSchedulerStrategyInstance((String) td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY), conf);
    +            rasStrategy = ReflectionUtils.newSchedulerStrategyInstance((String) td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY), conf);
                 rasStrategy.prepare(conf);
             } catch (DisallowedStrategyException e) {
    -            topologySubmitter.markTopoUnsuccess(td);
    -            cluster.setStatus(td.getId(), "Unsuccessful in scheduling - " + e.getAttemptedClass()
    -                    + " is not an allowed strategy. Please make sure your " + Config.TOPOLOGY_SCHEDULER_STRATEGY
    -                    + " config is one of the allowed strategies: " + e.getAllowedStrategies().toString());
    +            markFailedTopology(topologySubmitter, cluster, td,
    +                "Unsuccessful in scheduling - " + e.getAttemptedClass()
    +                    + " is not an allowed strategy. Please make sure your "
    +                    + Config.TOPOLOGY_SCHEDULER_STRATEGY
    +                    + " config is one of the allowed strategies: "
    +                    + e.getAllowedStrategies(), e);
                 return;
             } catch (RuntimeException e) {
    -            LOG.error("failed to create instance of IStrategy: {} Topology {} will not be scheduled.",
    -                    strategyConf, td.getName(), e);
    -            topologySubmitter.markTopoUnsuccess(td);
    -            cluster.setStatus(td.getId(), "Unsuccessful in scheduling - failed to create instance of topology strategy "
    -                    + strategyConf + ". Please check logs for details");
    +            markFailedTopology(topologySubmitter, cluster, td,
    +                "Unsuccessful in scheduling - failed to create instance of topology strategy "
    +                    + strategyConf
    +                    + ". Please check logs for details", e);
                 return;
             }
     
    -        while (true) {
    -            // A copy of the cluster that restricts the strategy to only modify a single topology
    +        for (int i = 0; i < maxSchedulingAttempts; i++) {
                 SingleTopologyCluster toSchedule = new SingleTopologyCluster(workingState, td.getId());
    -            SchedulingResult result = null;
                 try {
    -                result = rasStrategy.schedule(toSchedule, td);
    -            } catch (Exception ex) {
    -                LOG.error("Exception thrown when running strategy {} to schedule topology {}."
    -                        + " Topology will not be scheduled!", rasStrategy.getClass().getName(), td.getName(), ex);
    -                topologySubmitter.markTopoUnsuccess(td);
    -                cluster.setStatus(td.getId(), "Unsuccessful in scheduling - Exception thrown when running strategy {}"
    -                        + rasStrategy.getClass().getName() + ". Please check logs for details");
    -            }
    -            LOG.debug("scheduling result: {}", result);
    -            if (result != null) {
    -                if (result.isSuccess()) {
    -                    try {
    +                SchedulingResult result = rasStrategy.schedule(toSchedule, td);
    +                LOG.debug("scheduling result: {}", result);
    +                if (result != null) {
    +                    if (result.isSuccess()) {
                             cluster.updateFrom(toSchedule);
                             cluster.setStatus(td.getId(), "Running - " + result.getMessage());
    -                    } catch (Exception ex) {
    -                        LOG.error("Unsuccessful attempting to assign executors to nodes.", ex);
    -                        topologySubmitter.markTopoUnsuccess(td);
    -                        cluster.setStatus(td.getId(), "Unsuccessful in scheduling - "
    -                                + "IllegalStateException thrown when attempting to assign executors to nodes. Please check"
    -                                + " log for details.");
    -                    }
    -                    return;
    -                } else {
    -                    if (result.getStatus() == SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES) {
    -                        boolean madeSpace = false;
    +                        //DONE
    +                        return;
    +                    } else if (result.getStatus() == SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES) {
    +                        LOG.info("Not enough resources to schedule {}", td.getName());
    --- End diff --
    
    Should this be debug?


---

[GitHub] storm pull request #2400: STORM-2792: Remove RAS EvictionPolicy and cleanup

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2400#discussion_r148852799
  
    --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java ---
    @@ -62,123 +70,144 @@ public void prepare(Map<String, Object> conf) {
     
         @Override
         public void schedule(Topologies topologies, Cluster cluster) {
    -        //initialize data structures
    -        for (TopologyDetails td : cluster.getTopologies()) {
    +        Map<String, User> userMap = getUsers(cluster);
    +        List<TopologyDetails> orderedTopologies = new ArrayList<>(schedulingPriorityStrategy.getOrderedTopologies(cluster, userMap));
    +        LOG.info("Ordered list of topologies is: {}", orderedTopologies.stream().map((t) -> t.getId()).collect(Collectors.toList()));
    +        for (TopologyDetails td : orderedTopologies) {
                 if (!cluster.needsSchedulingRas(td)) {
                     //cluster forgets about its previous status, so if it is scheduled just leave it.
                     cluster.setStatusIfAbsent(td.getId(), "Fully Scheduled");
    -            }
    -        }
    -        Map<String, User> userMap = getUsers(cluster);
    -
    -        while (true) {
    -            TopologyDetails td;
    -            try {
    -                //Call scheduling priority strategy
    -                td = schedulingPrioritystrategy.getNextTopologyToSchedule(cluster, userMap);
    -            } catch (Exception ex) {
    -                LOG.error("Exception thrown when running priority strategy {}. No topologies will be scheduled!",
    -                        schedulingPrioritystrategy.getClass().getName(), ex);
    -                break;
    -            }
    -            if (td == null) {
    -                break;
    -            }
    -            User submitter = userMap.get(td.getTopologySubmitter());
    -            if (cluster.needsSchedulingRas(td)) {
    -                scheduleTopology(td, cluster, submitter, userMap);
                 } else {
    -                LOG.warn("Topology {} is already fully scheduled!", td.getName());
    -                cluster.setStatusIfAbsent(td.getId(), "Fully Scheduled");
    +                User submitter = userMap.get(td.getTopologySubmitter());
    +                scheduleTopology(td, cluster, submitter, orderedTopologies);
                 }
             }
         }
     
    +    private static void markFailedTopology(User u, Cluster c, TopologyDetails td, String message) {
    +        markFailedTopology(u, c, td, message, null);
    +    }
     
    -    public void scheduleTopology(TopologyDetails td, Cluster cluster, final User topologySubmitter,
    -                                 Map<String, User> userMap) {
    +    private static void markFailedTopology(User u, Cluster c, TopologyDetails td, String message, Throwable t) {
    +        c.setStatus(td, message);
    +        String realMessage = td.getId() + " " + message;
    +        if (t != null) {
    +            LOG.error(realMessage, t);
    +        } else {
    +            LOG.error(realMessage);
    +        }
    +        u.markTopoUnsuccess(td);
    +    }
    +
    +    private void scheduleTopology(TopologyDetails td, Cluster cluster, final User topologySubmitter,
    +                                  List<TopologyDetails> orderedTopologies) {
             //A copy of cluster that we can modify, but does not get committed back to cluster unless scheduling succeeds
             Cluster workingState = new Cluster(cluster);
    +        RAS_Nodes nodes = new RAS_Nodes(workingState);
             IStrategy rasStrategy = null;
             String strategyConf = (String) td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY);
             try {
    -            rasStrategy = (IStrategy) ReflectionUtils.newSchedulerStrategyInstance((String) td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY), conf);
    +            rasStrategy = ReflectionUtils.newSchedulerStrategyInstance((String) td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY), conf);
                 rasStrategy.prepare(conf);
             } catch (DisallowedStrategyException e) {
    -            topologySubmitter.markTopoUnsuccess(td);
    -            cluster.setStatus(td.getId(), "Unsuccessful in scheduling - " + e.getAttemptedClass()
    -                    + " is not an allowed strategy. Please make sure your " + Config.TOPOLOGY_SCHEDULER_STRATEGY
    -                    + " config is one of the allowed strategies: " + e.getAllowedStrategies().toString());
    +            markFailedTopology(topologySubmitter, cluster, td,
    +                "Unsuccessful in scheduling - " + e.getAttemptedClass()
    +                    + " is not an allowed strategy. Please make sure your "
    +                    + Config.TOPOLOGY_SCHEDULER_STRATEGY
    +                    + " config is one of the allowed strategies: "
    +                    + e.getAllowedStrategies(), e);
                 return;
             } catch (RuntimeException e) {
    -            LOG.error("failed to create instance of IStrategy: {} Topology {} will not be scheduled.",
    -                    strategyConf, td.getName(), e);
    -            topologySubmitter.markTopoUnsuccess(td);
    -            cluster.setStatus(td.getId(), "Unsuccessful in scheduling - failed to create instance of topology strategy "
    -                    + strategyConf + ". Please check logs for details");
    +            markFailedTopology(topologySubmitter, cluster, td,
    +                "Unsuccessful in scheduling - failed to create instance of topology strategy "
    +                    + strategyConf
    +                    + ". Please check logs for details", e);
                 return;
             }
     
    -        while (true) {
    -            // A copy of the cluster that restricts the strategy to only modify a single topology
    +        for (int i = 0; i < maxSchedulingAttempts; i++) {
                 SingleTopologyCluster toSchedule = new SingleTopologyCluster(workingState, td.getId());
    -            SchedulingResult result = null;
                 try {
    -                result = rasStrategy.schedule(toSchedule, td);
    -            } catch (Exception ex) {
    -                LOG.error("Exception thrown when running strategy {} to schedule topology {}."
    -                        + " Topology will not be scheduled!", rasStrategy.getClass().getName(), td.getName(), ex);
    -                topologySubmitter.markTopoUnsuccess(td);
    -                cluster.setStatus(td.getId(), "Unsuccessful in scheduling - Exception thrown when running strategy {}"
    -                        + rasStrategy.getClass().getName() + ". Please check logs for details");
    -            }
    -            LOG.debug("scheduling result: {}", result);
    -            if (result != null) {
    -                if (result.isSuccess()) {
    -                    try {
    +                SchedulingResult result = rasStrategy.schedule(toSchedule, td);
    +                LOG.debug("scheduling result: {}", result);
    +                if (result != null) {
    +                    if (result.isSuccess()) {
                             cluster.updateFrom(toSchedule);
                             cluster.setStatus(td.getId(), "Running - " + result.getMessage());
    -                    } catch (Exception ex) {
    -                        LOG.error("Unsuccessful attempting to assign executors to nodes.", ex);
    -                        topologySubmitter.markTopoUnsuccess(td);
    -                        cluster.setStatus(td.getId(), "Unsuccessful in scheduling - "
    -                                + "IllegalStateException thrown when attempting to assign executors to nodes. Please check"
    -                                + " log for details.");
    -                    }
    -                    return;
    -                } else {
    -                    if (result.getStatus() == SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES) {
    -                        boolean madeSpace = false;
    +                        //DONE
    +                        return;
    +                    } else if (result.getStatus() == SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES) {
    +                        LOG.info("Not enough resources to schedule {}", td.getName());
    +                        List<TopologyDetails> reversedList = ImmutableList.copyOf(orderedTopologies).reverse();
                             try {
    -                            //need to re prepare since scheduling state might have been restored
    -                            madeSpace = evictionStrategy.makeSpaceForTopo(td, workingState, userMap);
    +                            boolean evictedSomething = false;
    +                            LOG.debug("attempting to make space for topo {} from user {}", td.getName(), td.getTopologySubmitter());
    +                            int tdIndex = reversedList.indexOf(td);
    +                            double cpuNeeded = td.getTotalRequestedCpu();
    +                            double memoryNeeded = td.getTotalRequestedMemOffHeap() + td.getTotalRequestedMemOnHeap();
    +                            SchedulerAssignment assignment = cluster.getAssignmentById(td.getId());
    +                            if (assignment != null) {
    +                                cpuNeeded -= getCpuUsed(assignment);
    +                                memoryNeeded -= getMemoryUsed(assignment);
    +                            }
    +                            cluster.getTopologyResourcesMap();
    +                            for (int index = 0; index < tdIndex; index++) {
    +                                TopologyDetails topologyEvict = reversedList.get(index);
    +                                SchedulerAssignment evictAssignemnt = workingState.getAssignmentById(topologyEvict.getId());
    +                                if (evictAssignemnt != null && !evictAssignemnt.getSlots().isEmpty()) {
    +                                    Collection<WorkerSlot> workersToEvict = workingState.getUsedSlotsByTopologyId(topologyEvict.getId());
    +
    +                                    LOG.debug("Evicting Topology {} with workers: {} from user {}", topologyEvict.getName(), workersToEvict,
    +                                        topologyEvict.getTopologySubmitter());
    +                                    cpuNeeded -= getCpuUsed(evictAssignemnt);
    +                                    memoryNeeded -= getMemoryUsed(evictAssignemnt);
    +                                    evictedSomething = true;
    +                                    nodes.freeSlots(workersToEvict);
    +                                    if (cpuNeeded <= 0 && memoryNeeded <= 0) {
    +                                        //We evicted enough topologies to have a hope of scheduling, so try it now, and don't evict more
    +                                        // than is needed
    +                                        break;
    +                                    }
    +                                }
    +                            }
    +
    +                            if (!evictedSomething) {
    +                                markFailedTopology(topologySubmitter, cluster, td,
    +                                    "Not enough resources to schedule - " + result.getErrorMessage());
    +                                return;
    +                            }
                             } catch (Exception ex) {
    -                            LOG.error("Exception thrown when running eviction strategy {} to schedule topology {}."
    -                                            + " No evictions will be done!", evictionStrategy.getClass().getName(),
    -                                    td.getName(), ex);
    -                            topologySubmitter.markTopoUnsuccess(td);
    -                            return;
    -                        }
    -                        if (!madeSpace) {
    -                            LOG.debug("Could not make space for topo {} will move to attempted", td);
    -                            topologySubmitter.markTopoUnsuccess(td);
    -                            cluster.setStatus(td.getId(), "Not enough resources to schedule - "
    -                                    + result.getErrorMessage());
    -                            return;
    +                            LOG.error("Exception thrown when running eviction to schedule topology {}."
    +                                    + " No evictions will be done! Error: {}",
    +                                td.getName(), ex.getClass().getName(), ex);
                             }
    +                        //Only place we fall though to do the loop over again...
                             continue;
                         } else {
    --- End diff --
    
    Should comment that this is for all other failed status


---

[GitHub] storm pull request #2400: STORM-2792: Remove RAS EvictionPolicy and cleanup

Posted by jerrypeng <gi...@git.apache.org>.
Github user jerrypeng commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2400#discussion_r148923121
  
    --- Diff: docs/Resource_Aware_Scheduler_overview.md ---
    @@ -243,58 +243,81 @@ http://dl.acm.org/citation.cfm?id=2814808
     <div id='Specifying-Topology-Prioritization-Strategy'/>
     ### Specifying Topology Prioritization Strategy
     
    -The order of scheduling is a pluggable interface in which a user could define a strategy that prioritizes topologies.  For a user to define his or her own prioritization strategy, he or she needs to implement the ISchedulingPriorityStrategy interface.  A user can set the scheduling priority strategy by setting the *Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY* to point to the class that implements the strategy. For instance:
    +The order of scheduling and eviction is determined by a pluggable interface in which the cluster owner can define how topologies should be scheduled.  For the owner to define his or her own prioritization strategy, she or he needs to implement the ISchedulingPriorityStrategy interface.  A user can set the scheduling priority strategy by setting the `DaemonConfig.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY` to point to the class that implements the strategy. For instance:
     ```
         resource.aware.scheduler.priority.strategy: "org.apache.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy"
     ```
    -A default strategy will be provided.  The following explains how the default scheduling priority strategy works.
    +
    +Topologies are scheduled starting at the beginning of the list returned by this plugin.  If there are not enough resources to schedule the topology others are evicted starting at the end of the list.  Eviction stops when there are no lower priority topologies left to evict.
     
     **DefaultSchedulingPriorityStrategy**
     
    -The order of scheduling should be based on the distance between a user’s current resource allocation and his or her guaranteed allocation.  We should prioritize the users who are the furthest away from their resource guarantee. The difficulty of this problem is that a user may have multiple resource guarantees, and another user can have another set of resource guarantees, so how can we compare them in a fair manner?  Let's use the average percentage of resource guarantees satisfied as a method of comparison.
    +In the past the order of scheduling was based on the distance between a user’s current resource allocation and his or her guaranteed allocation.
    +
    +We currently use a slightly different approach. We simulate scheduling the highest priority topology for each user and score the topology for each of the resources using the formula
    +
    +```
    +(Requested + Assigned - Guaranteed)/Available
    +```
    +
    +Where
    +
    + * `Requested` is the resource requested by this topology (or a approximation of it for complex requests like shared memory)
    + * `Assigned` is the resources already assigned by the simulation.
    + * `Guaranteed` is the resource guarantee for this user
    + * `Available` is the amount of that resource currently available in the cluster.
     
    -For example:
    +This gives a score that is negative for guaranteed requests and a score that is positive for requests that are not within the guarantee.
     
    -|User|Resource Guarantee|Resource Allocated|
    -|----|------------------|------------------|
    -|A|<10 CPU, 50GB>|<2 CPU, 40 GB>|
    -|B|< 20 CPU, 25GB>|<15 CPU, 10 GB>|
    +To combine different resources the maximum of all the indavidual resource scores is used.  This guarantees that if a user would go over a guarantee for a single resource it would not be offset by being under guarantee on any other resources.
    --- End diff --
    
    "indavidual" is misspelled


---