You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by et...@apache.org on 2020/04/16 13:57:21 UTC
[storm] branch master updated: [STORM-3588] add
GenericResourceAwareSchedulingPriorityStrategy to accommodate generic
resource in grading topologies (#3213)
This is an automated email from the ASF dual-hosted git repository.
ethanli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push:
new 973fda8 [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies (#3213)
973fda8 is described below
commit 973fda807a407b4fd764fcca14e30f5891ec30f2
Author: Rui Li <ru...@verizonmedia.com>
AuthorDate: Thu Apr 16 08:57:11 2020 -0500
[STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies (#3213)
---
.../java/org/apache/storm/scheduler/Cluster.java | 70 +++++--
.../apache/storm/scheduler/ISchedulingState.java | 7 +
.../scheduler/resource/ResourceAwareScheduler.java | 43 +++-
.../org/apache/storm/scheduler/resource/User.java | 28 ++-
.../DefaultSchedulingPriorityStrategy.java | 4 +-
...ricResourceAwareSchedulingPriorityStrategy.java | 177 ++++++++++++++++
.../scheduling/GenericResourceAwareStrategy.java | 4 +-
.../TestUtilsForResourceAwareScheduler.java | 26 ++-
...ricResourceAwareSchedulingPriorityStrategy.java | 224 +++++++++++++++++++++
9 files changed, 551 insertions(+), 32 deletions(-)
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
index 06b922c..cb7291c 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
@@ -81,6 +81,13 @@ public class Cluster implements ISchedulingState {
private final Map<String, Map<String, Double>> nodeToScheduledOffHeapNodeMemoryCache; // node -> topologyId -> double
private final Map<String, Set<WorkerSlot>> nodeToUsedSlotsCache;
private final Map<String, NormalizedResourceRequest> totalResourcesPerNodeCache = new HashMap<>();
+ /**
+ * Snapshot of cluster total resources (cpu, memory, generic).
+ */
+ private final double totalCpuResource;
+ private final double totalMemoryResource;
+ private final Map<String, Double> totalGenericResources;
+
private final ResourceMetrics resourceMetrics;
private SchedulerAssignmentImpl assignment;
private Set<String> blackListedHosts = new HashSet<>();
@@ -105,7 +112,8 @@ public class Cluster implements ISchedulingState {
Map<String, ? extends SchedulerAssignment> map,
Topologies topologies,
Map<String, Object> conf) {
- this(nimbus, resourceMetrics, supervisors, map, topologies, conf, null, null, null, null);
+ this(nimbus, resourceMetrics, supervisors, map, topologies, conf, null, null, null, null,
+ Double.NaN, Double.NaN, null);
}
/**
@@ -122,7 +130,10 @@ public class Cluster implements ISchedulingState {
src.status,
src.blackListedHosts,
src.greyListedSupervisors,
- src.networkTopography);
+ src.networkTopography,
+ src.totalCpuResource,
+ src.totalMemoryResource,
+ src.totalGenericResources);
}
/**
@@ -143,7 +154,10 @@ public class Cluster implements ISchedulingState {
src.status,
src.blackListedHosts,
src.greyListedSupervisors,
- src.networkTopography);
+ src.networkTopography,
+ src.totalCpuResource,
+ src.totalMemoryResource,
+ src.totalGenericResources);
}
private Cluster(
@@ -156,7 +170,10 @@ public class Cluster implements ISchedulingState {
Map<String, String> status,
Set<String> blackListedHosts,
List<String> greyListedSupervisors,
- Map<String, List<String>> networkTopography) {
+ Map<String, List<String>> networkTopography,
+ double totalCpuResource,
+ double totalMemoryResource,
+ Map<String, Double> totalGenericResources) {
this.inimbus = nimbus;
this.resourceMetrics = resourceMetrics;
this.supervisors.putAll(supervisors);
@@ -175,6 +192,13 @@ public class Cluster implements ISchedulingState {
this.topologies = topologies;
this.minWorkerCpu = ObjectReader.getDouble(conf.get(DaemonConfig.STORM_WORKER_MIN_CPU_PCORE_PERCENT), 0.0);
+ this.totalCpuResource = Double.isNaN(totalCpuResource) ? computeClusterCpuResource() :
+ totalCpuResource;
+ this.totalMemoryResource = Double.isNaN(totalMemoryResource) ? computeClusterMemoryResource() :
+ totalMemoryResource;
+ this.totalGenericResources = totalGenericResources == null ? computeClusterGenericResources() :
+ totalGenericResources;
+
ArrayList<String> supervisorHostNames = new ArrayList<>();
for (SupervisorDetails s : supervisors.values()) {
supervisorHostNames.add(s.getHost());
@@ -916,20 +940,38 @@ public class Cluster implements ISchedulingState {
@Override
public double getClusterTotalCpuResource() {
- double sum = 0.0;
- for (SupervisorDetails sup : supervisors.values()) {
- sum += sup.getTotalCpu();
- }
- return sum;
+ return this.totalCpuResource;
+ }
+
+ private double computeClusterCpuResource() {
+ return supervisors.values().stream()
+ .mapToDouble(SupervisorDetails::getTotalCpu)
+ .sum();
+
}
@Override
public double getClusterTotalMemoryResource() {
- double sum = 0.0;
- for (SupervisorDetails sup : supervisors.values()) {
- sum += sup.getTotalMemory();
- }
- return sum;
+ return this.totalMemoryResource;
+ }
+
+
+ private double computeClusterMemoryResource() {
+ return supervisors.values().stream()
+ .mapToDouble(SupervisorDetails::getTotalMemory)
+ .sum();
+ }
+
+ @Override
+ public Map<String, Double> getClusterTotalGenericResources() {
+ return this.totalGenericResources;
+ }
+
+ private Map<String, Double> computeClusterGenericResources() {
+ return supervisors.values().stream()
+ .map(sup -> sup.getTotalGenericResources().entrySet())
+ .flatMap(Set::stream)
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, Double::sum));
}
@Override
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java b/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java
index 41a00b0..8e8232d 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java
@@ -19,9 +19,11 @@
package org.apache.storm.scheduler;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
+
import org.apache.storm.daemon.nimbus.TopologyResources;
import org.apache.storm.generated.WorkerResources;
import org.apache.storm.scheduler.resource.normalization.NormalizedResourceOffer;
@@ -272,6 +274,11 @@ public interface ISchedulingState {
double getClusterTotalMemoryResource();
/**
+ * Get the total amount of generic resources (excluding CPU and memory) in cluster.
+ */
+ Map<String, Double> getClusterTotalGenericResources();
+
+ /**
* Get the network topography (rackId -> nodes in the rack).
*/
Map<String, List<String>> getNetworkTopography();
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
index a26246a..788767c 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
@@ -17,8 +17,10 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -57,6 +59,7 @@ public class ResourceAwareScheduler implements IScheduler {
private int maxSchedulingAttempts;
private int schedulingTimeoutSeconds;
private ExecutorService backgroundScheduling;
+ private Map<String, Set<String>> evictedTopologiesMap; // topoId : toposEvicted
private Meter schedulingTimeoutMeter;
private Meter internalErrorMeter;
@@ -88,6 +91,7 @@ public class ResourceAwareScheduler implements IScheduler {
schedulingTimeoutSeconds = ObjectReader.getInt(
conf.get(DaemonConfig.SCHEDULING_TIMEOUT_SECONDS_PER_TOPOLOGY), 60);
backgroundScheduling = Executors.newFixedThreadPool(1);
+ evictedTopologiesMap = new HashMap<>();
}
@Override
@@ -108,19 +112,23 @@ public class ResourceAwareScheduler implements IScheduler {
if (LOG.isDebugEnabled()) {
LOG.debug("Ordered list of topologies is: {}", orderedTopologies.stream().map((t) -> t.getId()).collect(Collectors.toList()));
}
+ // clear tmpEvictedTopologiesMap at the beginning of each round of scheduling
+ // move it to evictedTopologiesMap at the end of this round of scheduling
+ Map<String, Set<String>> tmpEvictedTopologiesMap = new HashMap<>();
for (TopologyDetails td : orderedTopologies) {
if (!cluster.needsSchedulingRas(td)) {
//cluster forgets about its previous status, so if it is scheduled just leave it.
cluster.setStatusIfAbsent(td.getId(), "Fully Scheduled");
} else {
User submitter = userMap.get(td.getTopologySubmitter());
- scheduleTopology(td, cluster, submitter, orderedTopologies);
+ scheduleTopology(td, cluster, submitter, orderedTopologies, tmpEvictedTopologiesMap);
}
}
+ evictedTopologiesMap = tmpEvictedTopologiesMap;
}
private void scheduleTopology(TopologyDetails td, Cluster cluster, final User topologySubmitter,
- List<TopologyDetails> orderedTopologies) {
+ List<TopologyDetails> orderedTopologies, Map<String, Set<String>> tmpEvictedTopologiesMap) {
//A copy of cluster that we can modify, but does not get committed back to cluster unless scheduling succeeds
Cluster workingState = new Cluster(cluster);
RasNodes nodes = new RasNodes(workingState);
@@ -198,21 +206,18 @@ public class ResourceAwareScheduler implements IScheduler {
} else if (result.getStatus() == SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES) {
LOG.debug("Not enough resources to schedule {}", td.getName());
List<TopologyDetails> reversedList = ImmutableList.copyOf(orderedTopologies).reverse();
- boolean evictedSomething = false;
- LOG.debug("attempting to make space for topo {} from user {}", td.getName(), td.getTopologySubmitter());
+ LOG.debug("Attempting to make space for topo {} from user {}", td.getName(), td.getTopologySubmitter());
int tdIndex = reversedList.indexOf(td);
topologySchedulingResources.setRemainingRequiredResources(toSchedule, td);
+ Set<String> tmpEvictedTopos = new HashSet<>();
for (int index = 0; index < tdIndex; index++) {
TopologyDetails topologyEvict = reversedList.get(index);
SchedulerAssignment evictAssignemnt = workingState.getAssignmentById(topologyEvict.getId());
if (evictAssignemnt != null && !evictAssignemnt.getSlots().isEmpty()) {
- Collection<WorkerSlot> workersToEvict = workingState.getUsedSlotsByTopologyId(topologyEvict.getId());
topologySchedulingResources.adjustResourcesForEvictedTopology(toSchedule, topologyEvict);
-
- LOG.debug("Evicting Topology {} with workers: {} from user {}", topologyEvict.getName(), workersToEvict,
- topologyEvict.getTopologySubmitter());
- evictedSomething = true;
+ tmpEvictedTopos.add(topologyEvict.getId());
+ Collection<WorkerSlot> workersToEvict = workingState.getUsedSlotsByTopologyId(topologyEvict.getId());
nodes.freeSlots(workersToEvict);
if (topologySchedulingResources.canSchedule()) {
//We evicted enough topologies to have a hope of scheduling, so try it now, and don't evict more
@@ -221,8 +226,10 @@ public class ResourceAwareScheduler implements IScheduler {
}
}
}
-
- if (!evictedSomething) {
+ if (!tmpEvictedTopos.isEmpty()) {
+ LOG.warn("Evicted Topologies {} when scheduling topology: {}", tmpEvictedTopos, td.getId());
+ tmpEvictedTopologiesMap.computeIfAbsent(td.getId(), k -> new HashSet<>()).addAll(tmpEvictedTopos);
+ } else {
StringBuilder message = new StringBuilder();
message.append("Not enough resources to schedule after evicting lower priority topologies. ");
message.append(topologySchedulingResources.getRemainingRequiredResourcesMessage());
@@ -251,6 +258,20 @@ public class ResourceAwareScheduler implements IScheduler {
+ topologySchedulingResources.getRemainingRequiredResourcesMessage());
}
+ /**
+ * Return eviction information as map {scheduled topo : evicted topos}
+ * NOTE this method returns the map of a completed scheduling round.
+ * If scheduling is going on, this method will return a map of last scheduling round
+ * <p>
+ * TODO: This method is only used for testing . It's subject to change if we plan to use this info elsewhere.
+ * </p>
+ * @return a MAP of scheduled (topo : evicted) topos of most recent completed scheduling round
+ */
+ public Map<String, Set<String>> getEvictedTopologiesMap() {
+ return Collections.unmodifiableMap(evictedTopologiesMap);
+ }
+
+
/*
* Class for tracking resources for scheduling a topology.
*
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/User.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/User.java
index fce65b8..3bda922 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/User.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/User.java
@@ -18,11 +18,14 @@
package org.apache.storm.scheduler.resource;
+import java.util.Collections;
import java.util.Comparator;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
+
import org.apache.storm.daemon.nimbus.TopologyResources;
import org.apache.storm.scheduler.Cluster;
import org.apache.storm.scheduler.ISchedulingState;
@@ -34,23 +37,26 @@ public class User {
private final Set<TopologyDetails> unsuccess = new HashSet<>();
private final double cpuGuarantee;
private final double memoryGuarantee;
+ private final Map<String, Double> genericGuarantee;
private String userId;
public User(String userId) {
- this(userId, 0, 0);
+ this(userId, 0, 0, Collections.emptyMap());
}
public User(String userId, Map<String, Double> resourcePool) {
this(
userId,
resourcePool == null ? 0.0 : resourcePool.getOrDefault("cpu", 0.0),
- resourcePool == null ? 0.0 : resourcePool.getOrDefault("memory", 0.0));
+ resourcePool == null ? 0.0 : resourcePool.getOrDefault("memory", 0.0),
+ resourcePool == null ? Collections.emptyMap() : extractGenericResourceEntries(resourcePool));
}
- private User(String userId, double cpuGuarantee, double memoryGuarantee) {
+ private User(String userId, double cpuGuarantee, double memoryGuarantee, Map<String, Double> genericGuarantee) {
this.userId = userId;
this.cpuGuarantee = cpuGuarantee;
this.memoryGuarantee = memoryGuarantee;
+ this.genericGuarantee = genericGuarantee;
}
public String getId() {
@@ -167,6 +173,10 @@ public class User {
return cpuGuarantee;
}
+ public Map<String, Double> getGenericGuaranteed() {
+ return genericGuarantee;
+ }
+
public TopologyDetails getNextTopologyToSchedule(ISchedulingState cluster) {
for (TopologyDetails topo : getPendingTopologies(cluster)) {
return topo;
@@ -186,6 +196,18 @@ public class User {
return queue.last();
}
+ private static Map<String, Double> extractGenericResourceEntries(Map<String, Double> resourcePool) {
+ Map<String, Double> ret = new HashMap<>();
+ for (Map.Entry<String, Double> entry : resourcePool.entrySet()) {
+ String key = entry.getKey();
+ Double value = entry.getValue();
+ if (key != null && !key.equals("cpu") && !key.equals("memory")) {
+ ret.put(key, value);
+ }
+ }
+ return ret;
+ }
+
@Override
public int hashCode() {
return this.userId.hashCode();
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/priority/DefaultSchedulingPriorityStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/priority/DefaultSchedulingPriorityStrategy.java
index 4cc0a85..b0efc2c 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/priority/DefaultSchedulingPriorityStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/priority/DefaultSchedulingPriorityStrategy.java
@@ -63,8 +63,8 @@ public class DefaultSchedulingPriorityStrategy implements ISchedulingPriorityStr
public final double guaranteedCpu;
public final double guaranteedMemory;
protected final LinkedList<TopologyDetails> tds = new LinkedList<>();
- private double assignedCpu = 0.0;
- private double assignedMemory = 0.0;
+ protected double assignedCpu = 0.0;
+ protected double assignedMemory = 0.0;
public SimulatedUser(User other, ISchedulingState cluster) {
tds.addAll(cluster.getTopologies().getTopologiesOwnedBy(other.getId()));
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/priority/GenericResourceAwareSchedulingPriorityStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/priority/GenericResourceAwareSchedulingPriorityStrategy.java
new file mode 100644
index 0000000..c2a5eda
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/priority/GenericResourceAwareSchedulingPriorityStrategy.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource.strategies.priority;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.storm.scheduler.ISchedulingState;
+import org.apache.storm.scheduler.TopologyDetails;
+import org.apache.storm.scheduler.resource.User;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class GenericResourceAwareSchedulingPriorityStrategy extends DefaultSchedulingPriorityStrategy {
+ private static final Logger LOG = LoggerFactory.getLogger(GenericResourceAwareSchedulingPriorityStrategy.class);
+
+ @Override
+ protected GrasSimulatedUser getSimulatedUserFor(User u, ISchedulingState cluster) {
+ return new GrasSimulatedUser(u, cluster);
+ }
+
+ @Override
+ public List<TopologyDetails> getOrderedTopologies(ISchedulingState cluster, Map<String, User> userMap) {
+ double cpuAvail = cluster.getClusterTotalCpuResource();
+ double memAvail = cluster.getClusterTotalMemoryResource();
+ Map<String, Double> genericAvail = cluster.getClusterTotalGenericResources();
+
+ List<TopologyDetails> allUserTopologies = new ArrayList<>();
+ List<GrasSimulatedUser> users = new ArrayList<>();
+
+ for (User u : userMap.values()) {
+ users.add(getSimulatedUserFor(u, cluster));
+ }
+
+ while (!users.isEmpty()) {
+ Collections.sort(users, new GrasSimulatedUserComparator(cpuAvail, memAvail, genericAvail));
+ GrasSimulatedUser u = users.get(0);
+ TopologyDetails td = u.getNextHighest();
+ if (td == null) {
+ users.remove(0);
+ } else {
+ double score = u.getScore(cpuAvail, memAvail, genericAvail);
+ td = u.simScheduleNextHighest();
+ LOG.info("GRAS SIM Scheduling {} with score of {}", td.getId(), score);
+ cpuAvail -= td.getTotalRequestedCpu();
+ memAvail -= (td.getTotalRequestedMemOffHeap() + td.getTotalRequestedMemOnHeap());
+ for (Map.Entry<String, Double> entry : td.getTotalRequestedGenericResources().entrySet()) {
+ String resource = entry.getKey();
+ Double requestedAmount = entry.getValue();
+ if (!genericAvail.containsKey(resource)) {
+ LOG.warn("Resource: {} is not supported in this cluster. Ignoring this request.", resource);
+ } else {
+ genericAvail.put(resource, genericAvail.get(resource) - requestedAmount);
+ }
+ }
+ allUserTopologies.add(td);
+ }
+ }
+ return allUserTopologies;
+ }
+
+ protected static class GrasSimulatedUser extends SimulatedUser {
+
+ // Extend support for Generic Resources in addition to CPU and Memory
+ public final Map<String, Double> guaranteedGenericResources; // resource name : guaranteed amount
+ private Map<String, Double> assignedGenericResources = new HashMap<>(); // resource name : assigned amount
+
+ public GrasSimulatedUser(User other, ISchedulingState cluster) {
+ super(other, cluster);
+
+ Map<String, Double> guaranteedGenericResources = new HashMap<>();
+ // generic resource types that are offered
+ Set<String> availGenericResourceTypes = cluster.getClusterTotalGenericResources().keySet();
+ for (String resourceType : availGenericResourceTypes) {
+ Double guaranteedAmount = other.getGenericGuaranteed().getOrDefault(resourceType, 0.0);
+ guaranteedGenericResources.put(resourceType, guaranteedAmount);
+ }
+ this.guaranteedGenericResources = guaranteedGenericResources;
+ }
+
+ @Override
+ public TopologyDetails simScheduleNextHighest() {
+ TopologyDetails td = super.simScheduleNextHighest();
+ Map<String, Double> tdRequestedGenericResource = td.getTotalRequestedGenericResources();
+ for (Map.Entry<String, Double> entry : tdRequestedGenericResource.entrySet()) {
+ String resource = entry.getKey();
+ Double requestedAmount = entry.getValue();
+ assignedGenericResources.put(resource, assignedGenericResources.getOrDefault(resource, 0.0) + requestedAmount);
+ }
+ return td;
+ }
+
+ /**
+ * Get a score for the simulated user. This is used to sort the users, by their highest priority topology.
+ * Only give user guarantees that will not exceed cluster capacity.
+ * Score of each resource type is calculated as: (Requested + Assigned - Guaranteed)/clusterAvailable
+ * The final score is a max over all resource types.
+ * Topology score will fall into the following intervals if:
+ * User is under quota (guarantee): [(-guarantee)/available : 0]
+ * User is over quota: (0, infinity)
+ * Unfortunately, score below 0 does not guarantee that the topology will be scheduled due to resources fragmentation.
+ * @param availableCpu available CPU on the cluster.
+ * @param availableMemory available memory on the cluster.
+ * @param availableGenericResources available generic resources (other that cpu and memory) in cluster
+ * @param td the topology we are looking at.
+ * @return the score.
+ */
+ protected double getScore(double availableCpu, double availableMemory,
+ Map<String, Double> availableGenericResources, TopologyDetails td) {
+ // calculate scores for cpu and memory first
+ double ret = super.getScore(availableCpu, availableMemory, td);
+ if (ret == Double.MAX_VALUE) {
+ return ret;
+ }
+ Map<String, Double> tdTotalRequestedGeneric = td.getTotalRequestedGenericResources();
+ if (tdTotalRequestedGeneric == null) {
+ tdTotalRequestedGeneric = Collections.emptyMap();
+ }
+ for (Map.Entry<String, Double> entry : availableGenericResources.entrySet()) {
+ String resource = entry.getKey();
+ Double available = entry.getValue();
+ if (available <= 0) {
+ return Double.MAX_VALUE;
+ }
+ Double wouldBeResource = assignedGenericResources.getOrDefault(resource, 0.0)
+ + tdTotalRequestedGeneric.getOrDefault(resource, 0.0);
+ double thisScore = (wouldBeResource - guaranteedGenericResources.getOrDefault(resource, 0.0)) / available;
+ ret = Math.max(ret, thisScore);
+ }
+
+ return ret;
+ }
+
+ protected double getScore(double availableCpu, double availableMemory, Map<String, Double> availableGenericResources) {
+ return getScore(availableCpu, availableMemory, availableGenericResources, getNextHighest());
+ }
+ }
+
+ private static class GrasSimulatedUserComparator implements Comparator<GrasSimulatedUser> {
+ private final double cpuAvail;
+ private final double memAvail;
+ private final Map<String, Double> genericAvail;
+
+ private GrasSimulatedUserComparator(double cpuAvail, double memAvail, Map<String, Double> genericAvail) {
+ this.cpuAvail = cpuAvail;
+ this.memAvail = memAvail;
+ this.genericAvail = genericAvail;
+ }
+
+ @Override
+ public int compare(GrasSimulatedUser o1, GrasSimulatedUser o2) {
+ return Double.compare(o1.getScore(cpuAvail, memAvail, genericAvail), o2.getScore(cpuAvail, memAvail, genericAvail));
+ }
+ }
+}
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/GenericResourceAwareStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/GenericResourceAwareStrategy.java
index 790ada2..23735e2 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/GenericResourceAwareStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/GenericResourceAwareStrategy.java
@@ -86,7 +86,9 @@ public class GenericResourceAwareStrategy extends BaseResourceAwareStrategy impl
}
Collection<ExecutorDetails> unassignedExecutors =
new HashSet<>(this.cluster.getUnassignedExecutors(td));
- LOG.debug("{} Num ExecutorsNeedScheduling: {}", td.getId(), unassignedExecutors.size());
+ LOG.debug("Topology: {} has {} executors which need scheduling.",
+ td.getId(), unassignedExecutors.size());
+
Collection<ExecutorDetails> scheduledTasks = new ArrayList<>();
List<Component> spouts = this.getSpouts(td);
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
index 276cc46..11de468 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
@@ -82,7 +82,7 @@ public class TestUtilsForResourceAwareScheduler {
}
}
}
-
+
public static TestUserResources userRes(String name, double cpu, double mem) {
return new TestUserResources(name, cpu, mem);
}
@@ -451,6 +451,30 @@ public class TestUtilsForResourceAwareScheduler {
}
}
+ public static void assertTopologiesBeenEvicted(Cluster cluster, Set<String> evictedTopologies, String... topoNames) {
+ Topologies topologies = cluster.getTopologies();
+ LOG.info("Evicted topos: {}", evictedTopologies);
+ assert (evictedTopologies != null);
+ for (String topoName : topoNames) {
+ TopologyDetails td = topologies.getByName(topoName);
+ assert (td != null) : topoName;
+ String topoId = td.getId();
+ assert (evictedTopologies.contains(topoId)) : topoName;
+ }
+ }
+
+ public static void assertTopologiesNotBeenEvicted(Cluster cluster, Set<String> evictedTopologies, String... topoNames) {
+ Topologies topologies = cluster.getTopologies();
+ LOG.info("Evicted topos: {}", evictedTopologies);
+ assert (evictedTopologies != null);
+ for (String topoName : topoNames) {
+ TopologyDetails td = topologies.getByName(topoName);
+ assert (td != null) : topoName;
+ String topoId = td.getId();
+ assert (!evictedTopologies.contains(topoId)) : topoName;
+ }
+ }
+
public static void assertStatusSuccess(Cluster cluster, String topoId) {
assert (isStatusSuccess(cluster.getStatus(topoId))) :
"topology status " + topoId + " is not successful " + cluster.getStatus(topoId);
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/priority/TestGenericResourceAwareSchedulingPriorityStrategy.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/priority/TestGenericResourceAwareSchedulingPriorityStrategy.java
new file mode 100644
index 0000000..a61195e
--- /dev/null
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/priority/TestGenericResourceAwareSchedulingPriorityStrategy.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource.strategies.priority;
+
+import org.apache.storm.Config;
+import org.apache.storm.DaemonConfig;
+import org.apache.storm.metric.StormMetricsRegistry;
+import org.apache.storm.scheduler.Cluster;
+import org.apache.storm.scheduler.INimbus;
+import org.apache.storm.scheduler.IScheduler;
+import org.apache.storm.scheduler.SupervisorDetails;
+import org.apache.storm.scheduler.Topologies;
+import org.apache.storm.scheduler.resource.ResourceAwareScheduler;
+import org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler;
+import org.apache.storm.scheduler.resource.normalization.ResourceMetrics;
+import org.apache.storm.utils.Time;
+import org.junit.After;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.addTopologies;
+import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.assertTopologiesBeenEvicted;
+import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.assertTopologiesFullyScheduled;
+import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.assertTopologiesNotBeenEvicted;
+import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.assertTopologiesNotScheduled;
+import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.createGrasClusterConfig;
+import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.genSupervisors;
+import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.genTopology;
+import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.userRes;
+import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.userResourcePool;
+
+public class TestGenericResourceAwareSchedulingPriorityStrategy {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestGenericResourceAwareSchedulingPriorityStrategy.class);
+ private static int currentTime = Time.currentTimeSecs();
+ private static IScheduler scheduler = null;
+
+ @After
+ public void cleanup() {
+ if (scheduler != null) {
+ scheduler.cleanup();
+ scheduler = null;
+ }
+ }
+
+ /*
+ * DefaultSchedulingPriorityStrategy will not evict topo as long as the resources request can be met
+ *
+ * Ethan asks for heavy cpu and memory while Rui asks for little cpu and memory but heavy generic resource
+ * Since Rui's all types of resources request can be met, no eviction will happen.
+ */
+ @Test
+ public void testDefaultSchedulingPriorityStrategyNotEvicting() {
+ Map<String, Double> requestedgenericResourcesMap = new HashMap<>();
+ requestedgenericResourcesMap.put("generic.resource.1", 40.0);
+ // Use full memory and cpu of the cluster capacity
+ Config ruiConf = createGrasClusterConfig(20, 50, 50, null, requestedgenericResourcesMap);
+ Config ethanConf = createGrasClusterConfig(80, 400, 500, null, Collections.emptyMap());
+ Topologies topologies = new Topologies(
+ genTopology("ethan-topo-1", ethanConf, 1, 0, 1, 0, currentTime - 2, 10, "ethan"),
+ genTopology("ethan-topo-2", ethanConf, 1, 0, 1, 0, currentTime - 2, 20, "ethan"),
+ genTopology("ethan-topo-3", ethanConf, 1, 0, 1, 0, currentTime - 2, 28, "ethan"),
+ genTopology("ethan-topo-4", ethanConf, 1, 0, 1, 0, currentTime - 2, 29, "ethan"));
+
+ Topologies withNewTopo = addTopologies(topologies,
+ genTopology("rui-topo-1", ruiConf, 1, 0, 4, 0, currentTime - 2, 10, "rui"));
+
+ Config config = mkClusterConfig(DefaultSchedulingPriorityStrategy.class.getName());
+ Cluster cluster = mkTestCluster(topologies, config);
+ scheduler = new ResourceAwareScheduler();
+ scheduler.prepare(config, new StormMetricsRegistry());
+
+ scheduler.schedule(topologies, cluster);
+
+ assertTopologiesFullyScheduled(cluster, "ethan-topo-1", "ethan-topo-2", "ethan-topo-3", "ethan-topo-4");
+
+ cluster = new Cluster(cluster, withNewTopo);
+ scheduler.schedule(withNewTopo, cluster);
+ Map<String, Set<String>> evictedTopos = ((ResourceAwareScheduler) scheduler).getEvictedTopologiesMap();
+
+ assertTopologiesFullyScheduled(cluster, "ethan-topo-1", "ethan-topo-2", "ethan-topo-3", "ethan-topo-4");
+ assertTopologiesNotBeenEvicted(cluster, collectMapValues(evictedTopos), "ethan-topo-1", "ethan-topo-2", "ethan-topo-3", "ethan-topo-4");
+ assertTopologiesFullyScheduled(cluster, "rui-topo-1");
+ }
+
+ /*
+ * DefaultSchedulingPriorityStrategy does not take generic resources into account when calculating score
+ * So even if a user is requesting a lot of generic resources other than CPU and memory, scheduler will still score it very low and kick out other topologies
+ *
+ * Ethan asks for medium cpu and memory while Rui asks for little cpu and memory but heavy generic resource
+ * However, Rui's generic request can not be met and default scoring system is not taking generic resources into account,
+ * so the score of Rui's new topo will be much lower than all Ethan's topos'.
+ * Then all Ethan's topo will be evicted in trying to make rooms for Rui.
+ */
+ @Test
+ public void testDefaultSchedulingPriorityStrategyEvicting() {
+ Map<String, Double> requestedgenericResourcesMap = new HashMap<>();
+ requestedgenericResourcesMap.put("generic.resource.1", 40.0);
+ Config ruiConf = createGrasClusterConfig(10, 10, 10, null, requestedgenericResourcesMap);
+ Config ethanConf = createGrasClusterConfig(60, 200, 300, null, Collections.emptyMap());
+ Topologies topologies = new Topologies(
+ genTopology("ethan-topo-1", ethanConf, 1, 0, 1, 0, currentTime - 2, 10, "ethan"),
+ genTopology("ethan-topo-2", ethanConf, 1, 0, 1, 0, currentTime - 2, 20, "ethan"),
+ genTopology("ethan-topo-3", ethanConf, 1, 0, 1, 0, currentTime - 2, 28, "ethan"),
+ genTopology("ethan-topo-4", ethanConf, 1, 0, 1, 0, currentTime - 2, 29, "ethan"));
+
+ Topologies withNewTopo = addTopologies(topologies,
+ genTopology("rui-topo-1", ruiConf, 1, 0, 5, 0, currentTime - 2, 10, "rui"));
+
+ Config config = mkClusterConfig(DefaultSchedulingPriorityStrategy.class.getName());
+ Cluster cluster = mkTestCluster(topologies, config);
+ scheduler = new ResourceAwareScheduler();
+ scheduler.prepare(config, new StormMetricsRegistry());
+ scheduler.schedule(topologies, cluster);
+
+ assertTopologiesFullyScheduled(cluster, "ethan-topo-1", "ethan-topo-2", "ethan-topo-3", "ethan-topo-4");
+
+ cluster = new Cluster(cluster, withNewTopo);
+ scheduler.schedule(withNewTopo, cluster);
+ Map<String, Set<String>> evictedTopos = ((ResourceAwareScheduler) scheduler).getEvictedTopologiesMap();
+
+ assertTopologiesFullyScheduled(cluster, "ethan-topo-1", "ethan-topo-2", "ethan-topo-3", "ethan-topo-4");
+ assertTopologiesBeenEvicted(cluster, collectMapValues(evictedTopos), "ethan-topo-1", "ethan-topo-2", "ethan-topo-3", "ethan-topo-4");
+ assertTopologiesNotScheduled(cluster, "rui-topo-1");
+ }
+
+ /*
+ * GenericResourceAwareSchedulingPriorityStrategy extend scoring formula to accommodate generic resources
+ *
+ * Same setting as testDefaultSchedulingPriorityStrategyEvicting, but this time, new scoring system is taking generic resources into account,
+ * the score of rui's new topo will be higher than all Ethan's topos' due to its crazy generic request.
+ * At the end, all Ethan's topo will not be evicted as expected.
+ */
+ @Test
+ public void testGenericSchedulingPriorityStrategyEvicting() {
+ Map<String, Double> requestedgenericResourcesMap = new HashMap<>();
+ requestedgenericResourcesMap.put("generic.resource.1", 40.0);
+ Config ruiConf = createGrasClusterConfig(10, 10, 10, null, requestedgenericResourcesMap);
+ Config ethanConf = createGrasClusterConfig(60, 200, 300, null, Collections.emptyMap());
+ Topologies topologies = new Topologies(
+ genTopology("ethan-topo-1", ethanConf, 1, 0, 1, 0, currentTime - 2, 10, "ethan"),
+ genTopology("ethan-topo-2", ethanConf, 1, 0, 1, 0, currentTime - 2, 20, "ethan"),
+ genTopology("ethan-topo-3", ethanConf, 1, 0, 1, 0, currentTime - 2, 28, "ethan"),
+ genTopology("ethan-topo-4", ethanConf, 1, 0, 1, 0, currentTime - 2, 29, "ethan"));
+
+ Topologies withNewTopo = addTopologies(topologies,
+ genTopology("rui-topo-1", ruiConf, 1, 0, 5, 0, currentTime - 2, 10, "rui"));
+
+
+ Config config = mkClusterConfig(GenericResourceAwareSchedulingPriorityStrategy.class.getName());
+ Cluster cluster = mkTestCluster(topologies, config);
+ scheduler = new ResourceAwareScheduler();
+ scheduler.prepare(config, new StormMetricsRegistry());
+ scheduler.schedule(topologies, cluster);
+
+ assertTopologiesFullyScheduled(cluster, "ethan-topo-1", "ethan-topo-2", "ethan-topo-3", "ethan-topo-4");
+
+ cluster = new Cluster(cluster, withNewTopo);
+ scheduler.schedule(withNewTopo, cluster);
+ Map<String, Set<String>> evictedTopos = ((ResourceAwareScheduler) scheduler).getEvictedTopologiesMap();
+
+ assertTopologiesFullyScheduled(cluster, "ethan-topo-1", "ethan-topo-2", "ethan-topo-3", "ethan-topo-4");
+ assertTopologiesNotBeenEvicted(cluster, collectMapValues(evictedTopos),"ethan-topo-1", "ethan-topo-2", "ethan-topo-3", "ethan-topo-4");
+ assertTopologiesNotScheduled(cluster, "rui-topo-1");
+ }
+
+
+ private Config mkClusterConfig(String SchedulingPriorityStrategy) {
+ Map<String, Map<String, Number>> resourceUserPool = userResourcePool(
+ userRes("rui", 200, 2000),
+ userRes("ethan", 200, 2000));
+
+ Map<String, Double> genericResourcesOfferedMap = new HashMap<>();
+ genericResourcesOfferedMap.put("generic.resource.1", 50.0);
+
+ Config config = createGrasClusterConfig(100, 500, 500, resourceUserPool, genericResourcesOfferedMap);
+ config.put(DaemonConfig.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY, SchedulingPriorityStrategy);
+ config.put(DaemonConfig.RESOURCE_AWARE_SCHEDULER_MAX_TOPOLOGY_SCHEDULING_ATTEMPTS, 2); // allow 1 round of evictions
+
+ return config;
+ }
+
+ private Cluster mkTestCluster(Topologies topologies, Config config) {
+ INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
+
+ Map<String, Double> genericResourcesOfferedMap = (Map<String, Double>) config.get(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP);
+ if (genericResourcesOfferedMap == null || genericResourcesOfferedMap.isEmpty()) {
+ throw new IllegalArgumentException("Generic resources map must contain something in this test: "
+ + TestGenericResourceAwareSchedulingPriorityStrategy.class.getName());
+ }
+ Map<String, SupervisorDetails> supMap = genSupervisors(4, 4, 100, 1000, genericResourcesOfferedMap);
+
+ return new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config);
+ }
+
+ private Set<String> collectMapValues(Map<String, Set<String>> map) {
+ Set<String> set = new HashSet<>();
+ map.values().forEach((s) -> set.addAll(s));
+ return set;
+ }
+}