You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by et...@apache.org on 2020/04/16 13:57:21 UTC

[storm] branch master updated: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies (#3213)

This is an automated email from the ASF dual-hosted git repository.

ethanli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git


The following commit(s) were added to refs/heads/master by this push:
     new 973fda8  [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies (#3213)
973fda8 is described below

commit 973fda807a407b4fd764fcca14e30f5891ec30f2
Author: Rui Li <ru...@verizonmedia.com>
AuthorDate: Thu Apr 16 08:57:11 2020 -0500

    [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies (#3213)
---
 .../java/org/apache/storm/scheduler/Cluster.java   |  70 +++++--
 .../apache/storm/scheduler/ISchedulingState.java   |   7 +
 .../scheduler/resource/ResourceAwareScheduler.java |  43 +++-
 .../org/apache/storm/scheduler/resource/User.java  |  28 ++-
 .../DefaultSchedulingPriorityStrategy.java         |   4 +-
 ...ricResourceAwareSchedulingPriorityStrategy.java | 177 ++++++++++++++++
 .../scheduling/GenericResourceAwareStrategy.java   |   4 +-
 .../TestUtilsForResourceAwareScheduler.java        |  26 ++-
 ...ricResourceAwareSchedulingPriorityStrategy.java | 224 +++++++++++++++++++++
 9 files changed, 551 insertions(+), 32 deletions(-)

diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
index 06b922c..cb7291c 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
@@ -81,6 +81,13 @@ public class Cluster implements ISchedulingState {
     private final Map<String, Map<String, Double>> nodeToScheduledOffHeapNodeMemoryCache;   // node -> topologyId -> double
     private final Map<String, Set<WorkerSlot>> nodeToUsedSlotsCache;
     private final Map<String, NormalizedResourceRequest> totalResourcesPerNodeCache = new HashMap<>();
+    /**
+     * Snapshot of cluster total resources (cpu, memory, generic).
+     */
+    private final double totalCpuResource;
+    private final double totalMemoryResource;
+    private final Map<String, Double> totalGenericResources;
+
     private final ResourceMetrics resourceMetrics;
     private SchedulerAssignmentImpl assignment;
     private Set<String> blackListedHosts = new HashSet<>();
@@ -105,7 +112,8 @@ public class Cluster implements ISchedulingState {
         Map<String, ? extends SchedulerAssignment> map,
         Topologies topologies,
         Map<String, Object> conf) {
-        this(nimbus, resourceMetrics, supervisors, map, topologies, conf, null, null, null, null);
+        this(nimbus, resourceMetrics, supervisors, map, topologies, conf, null, null, null, null,
+            Double.NaN, Double.NaN, null);
     }
 
     /**
@@ -122,7 +130,10 @@ public class Cluster implements ISchedulingState {
             src.status,
             src.blackListedHosts,
             src.greyListedSupervisors,
-            src.networkTopography);
+            src.networkTopography,
+            src.totalCpuResource,
+            src.totalMemoryResource,
+            src.totalGenericResources);
     }
 
     /**
@@ -143,7 +154,10 @@ public class Cluster implements ISchedulingState {
             src.status,
             src.blackListedHosts,
             src.greyListedSupervisors,
-            src.networkTopography);
+            src.networkTopography,
+            src.totalCpuResource,
+            src.totalMemoryResource,
+            src.totalGenericResources);
     }
 
     private Cluster(
@@ -156,7 +170,10 @@ public class Cluster implements ISchedulingState {
         Map<String, String> status,
         Set<String> blackListedHosts,
         List<String> greyListedSupervisors,
-        Map<String, List<String>> networkTopography) {
+        Map<String, List<String>> networkTopography,
+        double totalCpuResource,
+        double totalMemoryResource,
+        Map<String, Double> totalGenericResources) {
         this.inimbus = nimbus;
         this.resourceMetrics = resourceMetrics;
         this.supervisors.putAll(supervisors);
@@ -175,6 +192,13 @@ public class Cluster implements ISchedulingState {
         this.topologies = topologies;
         this.minWorkerCpu = ObjectReader.getDouble(conf.get(DaemonConfig.STORM_WORKER_MIN_CPU_PCORE_PERCENT), 0.0);
 
+        this.totalCpuResource = Double.isNaN(totalCpuResource) ? computeClusterCpuResource() :
+                                                           totalCpuResource;
+        this.totalMemoryResource = Double.isNaN(totalMemoryResource) ? computeClusterMemoryResource() :
+                                                                 totalMemoryResource;
+        this.totalGenericResources = totalGenericResources == null ? computeClusterGenericResources() :
+                                                                     totalGenericResources;
+
         ArrayList<String> supervisorHostNames = new ArrayList<>();
         for (SupervisorDetails s : supervisors.values()) {
             supervisorHostNames.add(s.getHost());
@@ -916,20 +940,38 @@ public class Cluster implements ISchedulingState {
 
     @Override
     public double getClusterTotalCpuResource() {
-        double sum = 0.0;
-        for (SupervisorDetails sup : supervisors.values()) {
-            sum += sup.getTotalCpu();
-        }
-        return sum;
+        return this.totalCpuResource;
+    }
+
+    private double computeClusterCpuResource() {
+        return supervisors.values().stream()
+            .mapToDouble(SupervisorDetails::getTotalCpu)
+            .sum();
+
     }
 
     @Override
     public double getClusterTotalMemoryResource() {
-        double sum = 0.0;
-        for (SupervisorDetails sup : supervisors.values()) {
-            sum += sup.getTotalMemory();
-        }
-        return sum;
+        return this.totalMemoryResource;
+    }
+
+
+    private double computeClusterMemoryResource() {
+        return supervisors.values().stream()
+            .mapToDouble(SupervisorDetails::getTotalMemory)
+            .sum();
+    }
+
+    @Override
+    public Map<String, Double> getClusterTotalGenericResources() {
+        return this.totalGenericResources;
+    }
+
+    private Map<String, Double> computeClusterGenericResources() {
+        return supervisors.values().stream()
+            .map(sup -> sup.getTotalGenericResources().entrySet())
+            .flatMap(Set::stream)
+            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, Double::sum));
     }
 
     @Override
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java b/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java
index 41a00b0..8e8232d 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java
@@ -19,9 +19,11 @@
 package org.apache.storm.scheduler;
 
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+
 import org.apache.storm.daemon.nimbus.TopologyResources;
 import org.apache.storm.generated.WorkerResources;
 import org.apache.storm.scheduler.resource.normalization.NormalizedResourceOffer;
@@ -272,6 +274,11 @@ public interface ISchedulingState {
     double getClusterTotalMemoryResource();
 
     /**
+     * Get the total amount of generic resources (excluding CPU and memory) in cluster.
+     */
+    Map<String, Double> getClusterTotalGenericResources();
+
+    /**
      * Get the network topography (rackId -> nodes in the rack).
      */
     Map<String, List<String>> getNetworkTopography();
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
index a26246a..788767c 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
@@ -17,8 +17,10 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -57,6 +59,7 @@ public class ResourceAwareScheduler implements IScheduler {
     private int maxSchedulingAttempts;
     private int schedulingTimeoutSeconds;
     private ExecutorService backgroundScheduling;
+    private Map<String, Set<String>> evictedTopologiesMap;   // topoId : toposEvicted
     private Meter schedulingTimeoutMeter;
     private Meter internalErrorMeter;
 
@@ -88,6 +91,7 @@ public class ResourceAwareScheduler implements IScheduler {
         schedulingTimeoutSeconds = ObjectReader.getInt(
                 conf.get(DaemonConfig.SCHEDULING_TIMEOUT_SECONDS_PER_TOPOLOGY), 60);
         backgroundScheduling = Executors.newFixedThreadPool(1);
+        evictedTopologiesMap = new HashMap<>();
     }
 
     @Override
@@ -108,19 +112,23 @@ public class ResourceAwareScheduler implements IScheduler {
         if (LOG.isDebugEnabled()) {
             LOG.debug("Ordered list of topologies is: {}", orderedTopologies.stream().map((t) -> t.getId()).collect(Collectors.toList()));
         }
+        // clear tmpEvictedTopologiesMap at the beginning of each round of scheduling
+        // move it to evictedTopologiesMap at the end of this round of scheduling
+        Map<String, Set<String>> tmpEvictedTopologiesMap = new HashMap<>();
         for (TopologyDetails td : orderedTopologies) {
             if (!cluster.needsSchedulingRas(td)) {
                 //cluster forgets about its previous status, so if it is scheduled just leave it.
                 cluster.setStatusIfAbsent(td.getId(), "Fully Scheduled");
             } else {
                 User submitter = userMap.get(td.getTopologySubmitter());
-                scheduleTopology(td, cluster, submitter, orderedTopologies);
+                scheduleTopology(td, cluster, submitter, orderedTopologies, tmpEvictedTopologiesMap);
             }
         }
+        evictedTopologiesMap = tmpEvictedTopologiesMap;
     }
 
     private void scheduleTopology(TopologyDetails td, Cluster cluster, final User topologySubmitter,
-                                  List<TopologyDetails> orderedTopologies) {
+                                  List<TopologyDetails> orderedTopologies, Map<String, Set<String>> tmpEvictedTopologiesMap) {
         //A copy of cluster that we can modify, but does not get committed back to cluster unless scheduling succeeds
         Cluster workingState = new Cluster(cluster);
         RasNodes nodes = new RasNodes(workingState);
@@ -198,21 +206,18 @@ public class ResourceAwareScheduler implements IScheduler {
                     } else if (result.getStatus() == SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES) {
                         LOG.debug("Not enough resources to schedule {}", td.getName());
                         List<TopologyDetails> reversedList = ImmutableList.copyOf(orderedTopologies).reverse();
-                        boolean evictedSomething = false;
-                        LOG.debug("attempting to make space for topo {} from user {}", td.getName(), td.getTopologySubmitter());
+                        LOG.debug("Attempting to make space for topo {} from user {}", td.getName(), td.getTopologySubmitter());
                         int tdIndex = reversedList.indexOf(td);
                         topologySchedulingResources.setRemainingRequiredResources(toSchedule, td);
 
+                        Set<String> tmpEvictedTopos = new HashSet<>();
                         for (int index = 0; index < tdIndex; index++) {
                             TopologyDetails topologyEvict = reversedList.get(index);
                             SchedulerAssignment evictAssignemnt = workingState.getAssignmentById(topologyEvict.getId());
                             if (evictAssignemnt != null && !evictAssignemnt.getSlots().isEmpty()) {
-                                Collection<WorkerSlot> workersToEvict = workingState.getUsedSlotsByTopologyId(topologyEvict.getId());
                                 topologySchedulingResources.adjustResourcesForEvictedTopology(toSchedule, topologyEvict);
-
-                                LOG.debug("Evicting Topology {} with workers: {} from user {}", topologyEvict.getName(), workersToEvict,
-                                    topologyEvict.getTopologySubmitter());
-                                evictedSomething = true;
+                                tmpEvictedTopos.add(topologyEvict.getId());
+                                Collection<WorkerSlot> workersToEvict = workingState.getUsedSlotsByTopologyId(topologyEvict.getId());
                                 nodes.freeSlots(workersToEvict);
                                 if (topologySchedulingResources.canSchedule()) {
                                     //We evicted enough topologies to have a hope of scheduling, so try it now, and don't evict more
@@ -221,8 +226,10 @@ public class ResourceAwareScheduler implements IScheduler {
                                 }
                             }
                         }
-
-                        if (!evictedSomething) {
+                        if (!tmpEvictedTopos.isEmpty()) {
+                            LOG.warn("Evicted Topologies {} when scheduling topology: {}", tmpEvictedTopos, td.getId());
+                            tmpEvictedTopologiesMap.computeIfAbsent(td.getId(), k -> new HashSet<>()).addAll(tmpEvictedTopos);
+                        } else {
                             StringBuilder message = new StringBuilder();
                             message.append("Not enough resources to schedule after evicting lower priority topologies. ");
                             message.append(topologySchedulingResources.getRemainingRequiredResourcesMessage());
@@ -251,6 +258,20 @@ public class ResourceAwareScheduler implements IScheduler {
                     + topologySchedulingResources.getRemainingRequiredResourcesMessage());
     }
 
+    /**
+     * Return eviction information as map {scheduled topo : evicted topos}
+     * NOTE this method returns the map of a completed scheduling round.
+     * If scheduling is going on, this method will return a map of last scheduling round
+     * <p>
+     * TODO: This method is only used for testing . It's subject to change if we plan to use this info elsewhere.
+     * </p>
+     * @return a MAP of scheduled (topo : evicted) topos of most recent completed scheduling round
+     */
+    public Map<String, Set<String>> getEvictedTopologiesMap() {
+        return Collections.unmodifiableMap(evictedTopologiesMap);
+    }
+
+
     /*
      * Class for tracking resources for scheduling a topology.
      *
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/User.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/User.java
index fce65b8..3bda922 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/User.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/User.java
@@ -18,11 +18,14 @@
 
 package org.apache.storm.scheduler.resource;
 
+import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
+
 import org.apache.storm.daemon.nimbus.TopologyResources;
 import org.apache.storm.scheduler.Cluster;
 import org.apache.storm.scheduler.ISchedulingState;
@@ -34,23 +37,26 @@ public class User {
     private final Set<TopologyDetails> unsuccess = new HashSet<>();
     private final double cpuGuarantee;
     private final double memoryGuarantee;
+    private final Map<String, Double> genericGuarantee;
     private String userId;
 
     public User(String userId) {
-        this(userId, 0, 0);
+        this(userId, 0, 0, Collections.emptyMap());
     }
 
     public User(String userId, Map<String, Double> resourcePool) {
         this(
             userId,
             resourcePool == null ? 0.0 : resourcePool.getOrDefault("cpu", 0.0),
-            resourcePool == null ? 0.0 : resourcePool.getOrDefault("memory", 0.0));
+            resourcePool == null ? 0.0 : resourcePool.getOrDefault("memory", 0.0),
+            resourcePool == null ? Collections.emptyMap() : extractGenericResourceEntries(resourcePool));
     }
 
-    private User(String userId, double cpuGuarantee, double memoryGuarantee) {
+    private User(String userId, double cpuGuarantee, double memoryGuarantee, Map<String, Double> genericGuarantee) {
         this.userId = userId;
         this.cpuGuarantee = cpuGuarantee;
         this.memoryGuarantee = memoryGuarantee;
+        this.genericGuarantee = genericGuarantee;
     }
 
     public String getId() {
@@ -167,6 +173,10 @@ public class User {
         return cpuGuarantee;
     }
 
+    public Map<String, Double> getGenericGuaranteed() {
+        return genericGuarantee;
+    }
+
     public TopologyDetails getNextTopologyToSchedule(ISchedulingState cluster) {
         for (TopologyDetails topo : getPendingTopologies(cluster)) {
             return topo;
@@ -186,6 +196,18 @@ public class User {
         return queue.last();
     }
 
+    private static Map<String, Double> extractGenericResourceEntries(Map<String, Double> resourcePool) {
+        Map<String, Double> ret = new HashMap<>();
+        for (Map.Entry<String, Double> entry : resourcePool.entrySet()) {
+            String key = entry.getKey();
+            Double value = entry.getValue();
+            if (key != null && !key.equals("cpu") && !key.equals("memory")) {
+                ret.put(key, value);
+            }
+        }
+        return ret;
+    }
+
     @Override
     public int hashCode() {
         return this.userId.hashCode();
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/priority/DefaultSchedulingPriorityStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/priority/DefaultSchedulingPriorityStrategy.java
index 4cc0a85..b0efc2c 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/priority/DefaultSchedulingPriorityStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/priority/DefaultSchedulingPriorityStrategy.java
@@ -63,8 +63,8 @@ public class DefaultSchedulingPriorityStrategy implements ISchedulingPriorityStr
         public final double guaranteedCpu;
         public final double guaranteedMemory;
         protected final LinkedList<TopologyDetails> tds = new LinkedList<>();
-        private double assignedCpu = 0.0;
-        private double assignedMemory = 0.0;
+        protected double assignedCpu = 0.0;
+        protected double assignedMemory = 0.0;
 
         public SimulatedUser(User other, ISchedulingState cluster) {
             tds.addAll(cluster.getTopologies().getTopologiesOwnedBy(other.getId()));
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/priority/GenericResourceAwareSchedulingPriorityStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/priority/GenericResourceAwareSchedulingPriorityStrategy.java
new file mode 100644
index 0000000..c2a5eda
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/priority/GenericResourceAwareSchedulingPriorityStrategy.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource.strategies.priority;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.storm.scheduler.ISchedulingState;
+import org.apache.storm.scheduler.TopologyDetails;
+import org.apache.storm.scheduler.resource.User;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class GenericResourceAwareSchedulingPriorityStrategy extends DefaultSchedulingPriorityStrategy {
+    private static final Logger LOG = LoggerFactory.getLogger(GenericResourceAwareSchedulingPriorityStrategy.class);
+
+    @Override
+    protected GrasSimulatedUser getSimulatedUserFor(User u, ISchedulingState cluster) {
+        return new GrasSimulatedUser(u, cluster);
+    }
+
+    @Override
+    public List<TopologyDetails> getOrderedTopologies(ISchedulingState cluster, Map<String, User> userMap) {
+        double cpuAvail = cluster.getClusterTotalCpuResource();
+        double memAvail = cluster.getClusterTotalMemoryResource();
+        Map<String, Double> genericAvail = cluster.getClusterTotalGenericResources();
+
+        List<TopologyDetails> allUserTopologies = new ArrayList<>();
+        List<GrasSimulatedUser> users = new ArrayList<>();
+
+        for (User u : userMap.values()) {
+            users.add(getSimulatedUserFor(u, cluster));
+        }
+
+        while (!users.isEmpty()) {
+            Collections.sort(users, new GrasSimulatedUserComparator(cpuAvail, memAvail, genericAvail));
+            GrasSimulatedUser u = users.get(0);
+            TopologyDetails td = u.getNextHighest();
+            if (td == null) {
+                users.remove(0);
+            } else {
+                double score = u.getScore(cpuAvail, memAvail, genericAvail);
+                td = u.simScheduleNextHighest();
+                LOG.info("GRAS SIM Scheduling {} with score of {}", td.getId(), score);
+                cpuAvail -= td.getTotalRequestedCpu();
+                memAvail -= (td.getTotalRequestedMemOffHeap() + td.getTotalRequestedMemOnHeap());
+                for (Map.Entry<String, Double> entry : td.getTotalRequestedGenericResources().entrySet()) {
+                    String resource = entry.getKey();
+                    Double requestedAmount = entry.getValue();
+                    if (!genericAvail.containsKey(resource)) {
+                        LOG.warn("Resource: {} is not supported in this cluster. Ignoring this request.", resource);
+                    } else {
+                        genericAvail.put(resource, genericAvail.get(resource) - requestedAmount);
+                    }
+                }
+                allUserTopologies.add(td);
+            }
+        }
+        return allUserTopologies;
+    }
+
+    protected static class GrasSimulatedUser extends SimulatedUser {
+
+        // Extend support for Generic Resources in addition to CPU and Memory
+        public final Map<String, Double> guaranteedGenericResources;                // resource name : guaranteed amount
+        private Map<String, Double> assignedGenericResources = new HashMap<>();     // resource name : assigned amount
+
+        public GrasSimulatedUser(User other, ISchedulingState cluster) {
+            super(other, cluster);
+
+            Map<String, Double> guaranteedGenericResources = new HashMap<>();
+            // generic resource types that are offered
+            Set<String> availGenericResourceTypes = cluster.getClusterTotalGenericResources().keySet();
+            for (String resourceType : availGenericResourceTypes) {
+                Double guaranteedAmount = other.getGenericGuaranteed().getOrDefault(resourceType, 0.0);
+                guaranteedGenericResources.put(resourceType, guaranteedAmount);
+            }
+            this.guaranteedGenericResources = guaranteedGenericResources;
+        }
+
+        @Override
+        public TopologyDetails simScheduleNextHighest() {
+            TopologyDetails td = super.simScheduleNextHighest();
+            Map<String, Double> tdRequestedGenericResource = td.getTotalRequestedGenericResources();
+            for (Map.Entry<String, Double> entry : tdRequestedGenericResource.entrySet()) {
+                String resource = entry.getKey();
+                Double requestedAmount = entry.getValue();
+                assignedGenericResources.put(resource, assignedGenericResources.getOrDefault(resource, 0.0) + requestedAmount);
+            }
+            return td;
+        }
+
+        /**
+         * Get a score for the simulated user.  This is used to sort the users, by their highest priority topology.
+         * Only give user guarantees that will not exceed cluster capacity.
+         * Score of each resource type is calculated as: (Requested + Assigned - Guaranteed)/clusterAvailable
+         * The final score is a max over all resource types.
+         * Topology score will fall into the following intervals if:
+         *      User is under quota (guarantee):                    [(-guarantee)/available : 0]
+         *      User is over quota:                                 (0, infinity)
+         * Unfortunately, score below 0 does not guarantee that the topology will be scheduled due to resources fragmentation.
+         * @param availableCpu available CPU on the cluster.
+         * @param availableMemory available memory on the cluster.
+         * @param availableGenericResources available generic resources (other that cpu and memory) in cluster
+         * @param td the topology we are looking at.
+         * @return the score.
+         */
+        protected double getScore(double availableCpu, double availableMemory,
+                                  Map<String, Double> availableGenericResources, TopologyDetails td) {
+            // calculate scores for cpu and memory first
+            double ret = super.getScore(availableCpu, availableMemory, td);
+            if (ret == Double.MAX_VALUE) {
+                return ret;
+            }
+            Map<String, Double> tdTotalRequestedGeneric = td.getTotalRequestedGenericResources();
+            if (tdTotalRequestedGeneric == null) {
+                tdTotalRequestedGeneric = Collections.emptyMap();
+            }
+            for (Map.Entry<String, Double> entry : availableGenericResources.entrySet()) {
+                String resource = entry.getKey();
+                Double available = entry.getValue();
+                if (available <= 0) {
+                    return Double.MAX_VALUE;
+                }
+                Double wouldBeResource = assignedGenericResources.getOrDefault(resource, 0.0)
+                    + tdTotalRequestedGeneric.getOrDefault(resource, 0.0);
+                double thisScore = (wouldBeResource -  guaranteedGenericResources.getOrDefault(resource, 0.0)) / available;
+                ret = Math.max(ret, thisScore);
+            }
+
+            return ret;
+        }
+
+        protected double getScore(double availableCpu, double availableMemory, Map<String, Double> availableGenericResources) {
+            return getScore(availableCpu, availableMemory, availableGenericResources, getNextHighest());
+        }
+    }
+
+    private static class GrasSimulatedUserComparator implements Comparator<GrasSimulatedUser> {
+        private final double cpuAvail;
+        private final double memAvail;
+        private final Map<String, Double> genericAvail;
+
+        private GrasSimulatedUserComparator(double cpuAvail, double memAvail, Map<String, Double> genericAvail) {
+            this.cpuAvail = cpuAvail;
+            this.memAvail = memAvail;
+            this.genericAvail = genericAvail;
+        }
+
+        @Override
+        public int compare(GrasSimulatedUser o1, GrasSimulatedUser o2) {
+            return Double.compare(o1.getScore(cpuAvail, memAvail, genericAvail), o2.getScore(cpuAvail, memAvail, genericAvail));
+        }
+    }
+}
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/GenericResourceAwareStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/GenericResourceAwareStrategy.java
index 790ada2..23735e2 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/GenericResourceAwareStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/GenericResourceAwareStrategy.java
@@ -86,7 +86,9 @@ public class GenericResourceAwareStrategy extends BaseResourceAwareStrategy impl
         }
         Collection<ExecutorDetails> unassignedExecutors =
             new HashSet<>(this.cluster.getUnassignedExecutors(td));
-        LOG.debug("{} Num ExecutorsNeedScheduling: {}", td.getId(), unassignedExecutors.size());
+        LOG.debug("Topology: {} has {} executors which need scheduling.",
+                    td.getId(), unassignedExecutors.size());
+
         Collection<ExecutorDetails> scheduledTasks = new ArrayList<>();
         List<Component> spouts = this.getSpouts(td);
 
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
index 276cc46..11de468 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
@@ -82,7 +82,7 @@ public class TestUtilsForResourceAwareScheduler {
             }
         }
     }
-
+    
     public static TestUserResources userRes(String name, double cpu, double mem) {
         return new TestUserResources(name, cpu, mem);
     }
@@ -451,6 +451,30 @@ public class TestUtilsForResourceAwareScheduler {
         }
     }
 
+    public static void assertTopologiesBeenEvicted(Cluster cluster, Set<String> evictedTopologies, String... topoNames) {
+        Topologies topologies = cluster.getTopologies();
+        LOG.info("Evicted topos: {}", evictedTopologies);
+        assert (evictedTopologies != null);
+        for (String topoName : topoNames) {
+            TopologyDetails td = topologies.getByName(topoName);
+            assert (td != null) : topoName;
+            String topoId = td.getId();
+            assert (evictedTopologies.contains(topoId)) : topoName;
+        }
+    }
+
+    public static void assertTopologiesNotBeenEvicted(Cluster cluster, Set<String> evictedTopologies, String... topoNames) {
+        Topologies topologies = cluster.getTopologies();
+        LOG.info("Evicted topos: {}", evictedTopologies);
+        assert (evictedTopologies != null);
+        for (String topoName : topoNames) {
+            TopologyDetails td = topologies.getByName(topoName);
+            assert (td != null) : topoName;
+            String topoId = td.getId();
+            assert (!evictedTopologies.contains(topoId)) : topoName;
+        }
+    }
+
     public static void assertStatusSuccess(Cluster cluster, String topoId) {
         assert (isStatusSuccess(cluster.getStatus(topoId))) :
             "topology status " + topoId + " is not successful " + cluster.getStatus(topoId);
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/priority/TestGenericResourceAwareSchedulingPriorityStrategy.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/priority/TestGenericResourceAwareSchedulingPriorityStrategy.java
new file mode 100644
index 0000000..a61195e
--- /dev/null
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/priority/TestGenericResourceAwareSchedulingPriorityStrategy.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource.strategies.priority;
+
+import org.apache.storm.Config;
+import org.apache.storm.DaemonConfig;
+import org.apache.storm.metric.StormMetricsRegistry;
+import org.apache.storm.scheduler.Cluster;
+import org.apache.storm.scheduler.INimbus;
+import org.apache.storm.scheduler.IScheduler;
+import org.apache.storm.scheduler.SupervisorDetails;
+import org.apache.storm.scheduler.Topologies;
+import org.apache.storm.scheduler.resource.ResourceAwareScheduler;
+import org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler;
+import org.apache.storm.scheduler.resource.normalization.ResourceMetrics;
+import org.apache.storm.utils.Time;
+import org.junit.After;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.addTopologies;
+import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.assertTopologiesBeenEvicted;
+import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.assertTopologiesFullyScheduled;
+import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.assertTopologiesNotBeenEvicted;
+import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.assertTopologiesNotScheduled;
+import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.createGrasClusterConfig;
+import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.genSupervisors;
+import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.genTopology;
+import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.userRes;
+import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.userResourcePool;
+
+public class TestGenericResourceAwareSchedulingPriorityStrategy {
+
+    private static final Logger LOG = LoggerFactory.getLogger(TestGenericResourceAwareSchedulingPriorityStrategy.class);
+    private static int currentTime = Time.currentTimeSecs();
+    private static IScheduler scheduler = null;
+
+    @After
+    public void cleanup() {
+        if (scheduler != null) {
+            scheduler.cleanup();
+            scheduler = null;
+        }
+    }
+
+    /*
+     * DefaultSchedulingPriorityStrategy will not evict topo as long as the resources request can be met
+     *
+     *  Ethan asks for heavy cpu and memory while Rui asks for little cpu and memory but heavy generic resource
+     *  Since Rui's all types of resources request can be met, no eviction will happen.
+    */
+    @Test
+    public void testDefaultSchedulingPriorityStrategyNotEvicting() {
+        Map<String, Double> requestedgenericResourcesMap = new HashMap<>();
+        requestedgenericResourcesMap.put("generic.resource.1", 40.0);
+        // Use full memory and cpu of the cluster capacity
+        Config ruiConf = createGrasClusterConfig(20, 50, 50, null, requestedgenericResourcesMap);
+        Config ethanConf = createGrasClusterConfig(80, 400, 500, null, Collections.emptyMap());
+        Topologies topologies = new Topologies(
+            genTopology("ethan-topo-1", ethanConf, 1, 0, 1, 0, currentTime - 2, 10, "ethan"),
+            genTopology("ethan-topo-2", ethanConf, 1, 0, 1, 0, currentTime - 2, 20, "ethan"),
+            genTopology("ethan-topo-3", ethanConf, 1, 0, 1, 0, currentTime - 2, 28, "ethan"),
+            genTopology("ethan-topo-4", ethanConf, 1, 0, 1, 0, currentTime - 2, 29, "ethan"));
+
+        Topologies withNewTopo = addTopologies(topologies,
+            genTopology("rui-topo-1", ruiConf, 1, 0, 4, 0, currentTime - 2, 10, "rui"));
+
+        Config config = mkClusterConfig(DefaultSchedulingPriorityStrategy.class.getName());
+        Cluster cluster = mkTestCluster(topologies, config);
+        scheduler = new ResourceAwareScheduler();
+        scheduler.prepare(config, new StormMetricsRegistry());
+
+        scheduler.schedule(topologies, cluster);
+
+        assertTopologiesFullyScheduled(cluster, "ethan-topo-1", "ethan-topo-2", "ethan-topo-3", "ethan-topo-4");
+
+        cluster = new Cluster(cluster, withNewTopo);
+        scheduler.schedule(withNewTopo, cluster);
+        Map<String, Set<String>> evictedTopos = ((ResourceAwareScheduler) scheduler).getEvictedTopologiesMap();
+
+        assertTopologiesFullyScheduled(cluster, "ethan-topo-1", "ethan-topo-2", "ethan-topo-3", "ethan-topo-4");
+        assertTopologiesNotBeenEvicted(cluster, collectMapValues(evictedTopos), "ethan-topo-1", "ethan-topo-2", "ethan-topo-3", "ethan-topo-4");
+        assertTopologiesFullyScheduled(cluster, "rui-topo-1");
+    }
+
+    /*
+     * DefaultSchedulingPriorityStrategy does not take generic resources into account when calculating score
+     * So even if a user is requesting a lot of generic resources other than CPU and memory, scheduler will still score it very low and kick out other topologies
+     *
+     *  Ethan asks for medium cpu and memory while Rui asks for little cpu and memory but heavy generic resource
+     *  However, Rui's generic request can not be met and default scoring system is not taking generic resources into account,
+     *  so the score of Rui's new topo will be much lower than all Ethan's topos'.
+     *  Then all Ethan's topo will be evicted in trying to make rooms for Rui.
+     */
+    @Test
+    public void testDefaultSchedulingPriorityStrategyEvicting() {
+        Map<String, Double> requestedgenericResourcesMap = new HashMap<>();
+        requestedgenericResourcesMap.put("generic.resource.1", 40.0);
+        Config ruiConf = createGrasClusterConfig(10, 10, 10, null, requestedgenericResourcesMap);
+        Config ethanConf = createGrasClusterConfig(60, 200, 300, null, Collections.emptyMap());
+        Topologies topologies = new Topologies(
+            genTopology("ethan-topo-1", ethanConf, 1, 0, 1, 0, currentTime - 2, 10, "ethan"),
+            genTopology("ethan-topo-2", ethanConf, 1, 0, 1, 0, currentTime - 2, 20, "ethan"),
+            genTopology("ethan-topo-3", ethanConf, 1, 0, 1, 0, currentTime - 2, 28, "ethan"),
+            genTopology("ethan-topo-4", ethanConf, 1, 0, 1, 0, currentTime - 2, 29, "ethan"));
+
+        Topologies withNewTopo = addTopologies(topologies,
+            genTopology("rui-topo-1", ruiConf, 1, 0, 5, 0, currentTime - 2, 10, "rui"));
+
+        Config config = mkClusterConfig(DefaultSchedulingPriorityStrategy.class.getName());
+        Cluster cluster = mkTestCluster(topologies, config);
+        scheduler = new ResourceAwareScheduler();
+        scheduler.prepare(config, new StormMetricsRegistry());
+        scheduler.schedule(topologies, cluster);
+
+        assertTopologiesFullyScheduled(cluster, "ethan-topo-1", "ethan-topo-2", "ethan-topo-3", "ethan-topo-4");
+
+        cluster = new Cluster(cluster, withNewTopo);
+        scheduler.schedule(withNewTopo, cluster);
+        Map<String, Set<String>> evictedTopos = ((ResourceAwareScheduler) scheduler).getEvictedTopologiesMap();
+
+        assertTopologiesFullyScheduled(cluster, "ethan-topo-1", "ethan-topo-2", "ethan-topo-3", "ethan-topo-4");
+        assertTopologiesBeenEvicted(cluster, collectMapValues(evictedTopos), "ethan-topo-1", "ethan-topo-2", "ethan-topo-3", "ethan-topo-4");
+        assertTopologiesNotScheduled(cluster, "rui-topo-1");
+    }
+
+    /*
+     * GenericResourceAwareSchedulingPriorityStrategy extend scoring formula to accommodate generic resources
+     *
+     *   Same setting as testDefaultSchedulingPriorityStrategyEvicting, but this time, new scoring system is taking generic resources into account,
+     *   the score of rui's new topo will be higher than all Ethan's topos' due to its crazy generic request.
+     *   At the end, all Ethan's topo will not be evicted as expected.
+     */
+    @Test
+    public void testGenericSchedulingPriorityStrategyEvicting() {
+        Map<String, Double> requestedgenericResourcesMap = new HashMap<>();
+        requestedgenericResourcesMap.put("generic.resource.1", 40.0);
+        Config ruiConf = createGrasClusterConfig(10, 10, 10, null, requestedgenericResourcesMap);
+        Config ethanConf = createGrasClusterConfig(60, 200, 300, null, Collections.emptyMap());
+        Topologies topologies = new Topologies(
+            genTopology("ethan-topo-1", ethanConf, 1, 0, 1, 0, currentTime - 2, 10, "ethan"),
+            genTopology("ethan-topo-2", ethanConf, 1, 0, 1, 0, currentTime - 2, 20, "ethan"),
+            genTopology("ethan-topo-3", ethanConf, 1, 0, 1, 0, currentTime - 2, 28, "ethan"),
+            genTopology("ethan-topo-4", ethanConf, 1, 0, 1, 0, currentTime - 2, 29, "ethan"));
+
+        Topologies withNewTopo = addTopologies(topologies,
+            genTopology("rui-topo-1", ruiConf, 1, 0, 5, 0, currentTime - 2, 10, "rui"));
+
+
+        Config config = mkClusterConfig(GenericResourceAwareSchedulingPriorityStrategy.class.getName());
+        Cluster cluster = mkTestCluster(topologies, config);
+        scheduler = new ResourceAwareScheduler();
+        scheduler.prepare(config, new StormMetricsRegistry());
+        scheduler.schedule(topologies, cluster);
+
+        assertTopologiesFullyScheduled(cluster, "ethan-topo-1", "ethan-topo-2", "ethan-topo-3", "ethan-topo-4");
+
+        cluster = new Cluster(cluster, withNewTopo);
+        scheduler.schedule(withNewTopo, cluster);
+        Map<String, Set<String>> evictedTopos = ((ResourceAwareScheduler) scheduler).getEvictedTopologiesMap();
+
+        assertTopologiesFullyScheduled(cluster, "ethan-topo-1", "ethan-topo-2", "ethan-topo-3", "ethan-topo-4");
+        assertTopologiesNotBeenEvicted(cluster, collectMapValues(evictedTopos),"ethan-topo-1", "ethan-topo-2", "ethan-topo-3", "ethan-topo-4");
+        assertTopologiesNotScheduled(cluster, "rui-topo-1");
+    }
+
+
+    private Config mkClusterConfig(String SchedulingPriorityStrategy) {
+        Map<String, Map<String, Number>> resourceUserPool = userResourcePool(
+            userRes("rui", 200, 2000),
+            userRes("ethan", 200, 2000));
+
+        Map<String, Double> genericResourcesOfferedMap = new HashMap<>();
+        genericResourcesOfferedMap.put("generic.resource.1", 50.0);
+
+        Config config = createGrasClusterConfig(100, 500, 500, resourceUserPool, genericResourcesOfferedMap);
+        config.put(DaemonConfig.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY, SchedulingPriorityStrategy);
+        config.put(DaemonConfig.RESOURCE_AWARE_SCHEDULER_MAX_TOPOLOGY_SCHEDULING_ATTEMPTS, 2);    // allow 1 round of evictions
+
+        return config;
+    }
+
+    private Cluster mkTestCluster(Topologies topologies, Config config) {
+        INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
+
+        Map<String, Double> genericResourcesOfferedMap = (Map<String, Double>) config.get(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP);
+        if (genericResourcesOfferedMap == null || genericResourcesOfferedMap.isEmpty()) {
+            throw new IllegalArgumentException("Generic resources map must contain something in this test: "
+                    + TestGenericResourceAwareSchedulingPriorityStrategy.class.getName());
+        }
+        Map<String, SupervisorDetails> supMap = genSupervisors(4, 4, 100, 1000, genericResourcesOfferedMap);
+
+        return new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config);
+    }
+
+    private Set<String> collectMapValues(Map<String, Set<String>> map) {
+        Set<String> set = new HashSet<>();
+        map.values().forEach((s) -> set.addAll(s));
+        return set;
+    }
+}