You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by da...@apache.org on 2015/12/21 15:46:36 UTC
[08/23] storm git commit: made scheduling, eviction,
and priority strategies pluggable
made scheduling, eviction, and priority strategies pluggable
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4cd5efa3
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4cd5efa3
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4cd5efa3
Branch: refs/heads/master
Commit: 4cd5efa3c5114413cf82ced7d3041e258175a5c0
Parents: 9d3c864
Author: Boyang Jerry Peng <je...@yahoo-inc.com>
Authored: Tue Nov 24 16:36:36 2015 -0600
Committer: Boyang Jerry Peng <je...@yahoo-inc.com>
Committed: Fri Dec 4 13:07:10 2015 -0600
----------------------------------------------------------------------
conf/defaults.yaml | 3 +
storm-core/src/jvm/backtype/storm/Config.java | 37 +-
.../resource/ResourceAwareScheduler.java | 191 ++++----
.../backtype/storm/scheduler/resource/User.java | 19 -
.../resource/strategies/IStrategy.java | 38 --
.../strategies/ResourceAwareStrategy.java | 486 ------------------
.../eviction/DefaultEvictionStrategy.java | 115 +++++
.../strategies/eviction/IEvictionStrategy.java | 47 ++
.../DefaultSchedulingPriorityStrategy.java | 92 ++++
.../priority/ISchedulingPriorityStrategy.java | 41 ++
.../DefaultResourceAwareStrategy.java | 487 +++++++++++++++++++
.../strategies/scheduling/IStrategy.java | 43 ++
.../storm/validation/ConfigValidation.java | 25 +
.../validation/ConfigValidationAnnotations.java | 9 +
.../jvm/backtype/storm/TestConfigValidate.java | 29 ++
15 files changed, 1009 insertions(+), 653 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/4cd5efa3/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index 2a99ba6..cef09d3 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -256,6 +256,9 @@ topology.component.resources.onheap.memory.mb: 128.0
topology.component.resources.offheap.memory.mb: 0.0
topology.component.cpu.pcore.percent: 10.0
topology.worker.max.heap.size.mb: 768.0
+topology.scheduler.strategy: "backtype.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy"
+resource.aware.scheduler.eviction.strategy: "backtype.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy"
+resource.aware.scheduler.priority.strategy: "backtype.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy"
dev.zookeeper.path: "/tmp/dev-storm-zookeeper"
http://git-wip-us.apache.org/repos/asf/storm/blob/4cd5efa3/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java
index d9c8815..54af6fa 100644
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@ -17,6 +17,9 @@
*/
package backtype.storm;
+import backtype.storm.scheduler.resource.strategies.eviction.IEvictionStrategy;
+import backtype.storm.scheduler.resource.strategies.priority.ISchedulingPriorityStrategy;
+import backtype.storm.scheduler.resource.strategies.scheduling.IStrategy;
import backtype.storm.serialization.IKryoDecorator;
import backtype.storm.serialization.IKryoFactory;
import backtype.storm.validation.ConfigValidationAnnotations.*;
@@ -194,7 +197,8 @@ public class Config extends HashMap<String, Object> {
* rack names that correspond to the supervisors. This information is stored in Cluster.java, and
* is used in the resource aware scheduler.
*/
- @isString
+ @NotNull
+ @isImplementationOfClass(implementsClass = backtype.storm.networktopography.DNSToSwitchMapping.class)
public static final String STORM_NETWORK_TOPOGRAPHY_PLUGIN = "storm.network.topography.plugin";
/**
@@ -1482,6 +1486,13 @@ public class Config extends HashMap<String, Object> {
public static final String TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB = "topology.worker.max.heap.size.mb";
/**
+ * The strategy to use when scheduling a topology with Resource Aware Scheduler
+ */
+ @NotNull
+ @isImplementationOfClass(implementsClass = IStrategy.class)
+ public static final String TOPOLOGY_SCHEDULER_STRATEGY = "topology.scheduler.strategy";
+
+ /**
* How many executors to spawn for ackers.
*
* <p>By not setting this variable or setting it as null, Storm will set the number of acker executors
@@ -1943,6 +1954,20 @@ public class Config extends HashMap<String, Object> {
public static final String RESOURCE_AWARE_SCHEDULER_USER_POOLS = "resource.aware.scheduler.user.pools";
/**
+ * The class that specifies the eviction strategy to use in ResourceAwareScheduler
+ */
+ @NotNull
+ @isImplementationOfClass(implementsClass = IEvictionStrategy.class)
+ public static final String RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY = "resource.aware.scheduler.eviction.strategy";
+
+ /**
+ * the class that specifies the scheduling priority strategy to use in ResourceAwareScheduler
+ */
+ @NotNull
+ @isImplementationOfClass(implementsClass = ISchedulingPriorityStrategy.class)
+ public static final String RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY = "resource.aware.scheduler.priority.strategy";
+
+ /**
* The number of machines that should be used by this topology to isolate it from all others. Set storm.scheduler
* to backtype.storm.scheduler.multitenant.MultitenantScheduler
*/
@@ -2201,4 +2226,14 @@ public class Config extends HashMap<String, Object> {
public void setTopologyPriority(int priority) {
this.put(Config.TOPOLOGY_PRIORITY, priority);
}
+
+ /**
+ * Takes as input the scheduler class name.
+ * Currently only the Multitenant Scheduler and Resource Aware Scheduler are supported
+ */
+ public void setTopologyStrategy(Class<? extends IStrategy> clazz) {
+ if(clazz != null) {
+ this.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, clazz.getName());
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/4cd5efa3/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java
index 934858e..f5c8354 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java
@@ -19,7 +19,9 @@
package backtype.storm.scheduler.resource;
import backtype.storm.Config;
-import backtype.storm.scheduler.SchedulerAssignment;
+import backtype.storm.scheduler.resource.strategies.eviction.IEvictionStrategy;
+import backtype.storm.scheduler.resource.strategies.priority.ISchedulingPriorityStrategy;
+import backtype.storm.scheduler.resource.strategies.scheduling.IStrategy;
import backtype.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -30,7 +32,7 @@ import backtype.storm.scheduler.IScheduler;
import backtype.storm.scheduler.Topologies;
import backtype.storm.scheduler.TopologyDetails;
import backtype.storm.scheduler.WorkerSlot;
-import backtype.storm.scheduler.resource.strategies.ResourceAwareStrategy;
+import backtype.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy;
import java.util.Collection;
import java.util.HashMap;
@@ -98,87 +100,36 @@ public class ResourceAwareScheduler implements IScheduler {
LOG.info("Nodes:\n{}", this.nodes);
- LOG.info("getNextUser: {}", this.getNextUser());
+ //LOG.info("getNextUser: {}", this.getNextUser());
+ ISchedulingPriorityStrategy schedulingPrioritystrategy = null;
while (true) {
LOG.info("/*********** next scheduling iteration **************/");
- User nextUser = this.getNextUser();
- if (nextUser == null) {
+ if(schedulingPrioritystrategy == null) {
+ try {
+ schedulingPrioritystrategy = (ISchedulingPriorityStrategy) Utils.newInstance((String) this.conf.get(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY));
+ } catch (RuntimeException e) {
+ LOG.error("failed to create instance of priority strategy: {} with error: {}! No topology eviction will be done.",
+ this.conf.get(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY), e.getMessage());
+ break;
+ }
+ }
+ //need to re prepare since scheduling state might have been restored
+ schedulingPrioritystrategy.prepare(this.topologies, this.cluster, this.userMap, this.nodes);
+ //Call scheduling priority strategy
+ TopologyDetails td = schedulingPrioritystrategy.getNextTopologyToSchedule();
+ if(td == null) {
break;
}
- TopologyDetails td = nextUser.getNextTopologyToSchedule();
scheduleTopology(td);
}
+
//since scheduling state might have been restored thus need to set the cluster and topologies.
cluster = this.cluster;
topologies = this.topologies;
}
- private boolean makeSpaceForTopo(TopologyDetails td) {
- LOG.info("attempting to make space for topo {} from user {}", td.getName(), td.getTopologySubmitter());
- User submitter = this.userMap.get(td.getTopologySubmitter());
- if (submitter.getCPUResourceGuaranteed() == null || submitter.getMemoryResourceGuaranteed() == null) {
- return false;
- }
-
- double cpuNeeded = td.getTotalRequestedCpu() / submitter.getCPUResourceGuaranteed();
- double memoryNeeded = (td.getTotalRequestedMemOffHeap() + td.getTotalRequestedMemOnHeap()) / submitter.getMemoryResourceGuaranteed();
-
- //user has enough resource under his or her resource guarantee to schedule topology
- if ((1.0 - submitter.getCPUResourcePoolUtilization()) >= cpuNeeded && (1.0 - submitter.getMemoryResourcePoolUtilization()) >= memoryNeeded) {
- User evictUser = this.findUserWithMostResourcesAboveGuarantee();
- if (evictUser == null) {
- LOG.info("Cannot make space for topology {} from user {}", td.getName(), submitter.getId());
- submitter.moveTopoFromPendingToAttempted(td, this.cluster);
-
- return false;
- }
- TopologyDetails topologyEvict = evictUser.getRunningTopologyWithLowestPriority();
- LOG.info("topology to evict: {}", topologyEvict);
- evictTopology(topologyEvict);
-
- LOG.info("Resources After eviction:\n{}", this.nodes);
-
- return true;
- } else {
-
- if ((1.0 - submitter.getCPUResourcePoolUtilization()) < cpuNeeded) {
-
- }
-
- if ((1.0 - submitter.getMemoryResourcePoolUtilization()) < memoryNeeded) {
-
- }
- return false;
-
- }
- }
-
- private void evictTopology(TopologyDetails topologyEvict) {
- Collection<WorkerSlot> workersToEvict = this.cluster.getUsedSlotsByTopologyId(topologyEvict.getId());
- User submitter = this.userMap.get(topologyEvict.getTopologySubmitter());
-
- LOG.info("Evicting Topology {} with workers: {}", topologyEvict.getName(), workersToEvict);
- LOG.debug("From Nodes:\n{}", ResourceUtils.printScheduling(this.nodes));
- this.nodes.freeSlots(workersToEvict);
- submitter.moveTopoFromRunningToPending(topologyEvict, this.cluster);
- LOG.info("check if topology unassigned: {}", this.cluster.getUsedSlotsByTopologyId(topologyEvict.getId()));
- }
-
- private User findUserWithMostResourcesAboveGuarantee() {
- double most = 0.0;
- User mostOverUser = null;
- for (User user : this.userMap.values()) {
- double over = user.getResourcePoolAverageUtilization() - 1.0;
- if ((over > most) && (!user.getTopologiesRunning().isEmpty())) {
- most = over;
- mostOverUser = user;
- }
- }
- return mostOverUser;
- }
-
public void scheduleTopology(TopologyDetails td) {
User topologySubmitter = this.userMap.get(td.getTopologySubmitter());
if (cluster.getUnassignedExecutors(td).size() > 0) {
@@ -190,9 +141,19 @@ public class ResourceAwareScheduler implements IScheduler {
LOG.debug("From Nodes:\n{}", ResourceUtils.printScheduling(this.nodes));
SchedulingState schedulingState = this.checkpointSchedulingState();
+ IStrategy RAStrategy = null;
+ try {
+ RAStrategy = (IStrategy) Utils.newInstance((String) td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY));
+ } catch (RuntimeException e) {
+ LOG.error("failed to create instance of IStrategy: {} with error: {}! Topology {} will not be scheduled.",
+ td.getName(), td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY), e.getMessage());
+ topologySubmitter.moveTopoFromPendingToInvalid(td, this.cluster);
+ return;
+ }
+ IEvictionStrategy evictionStrategy = null;
while (true) {
- //Need to reinitialize ResourceAwareStrategy with cluster and topologies in case scheduling state was restored
- ResourceAwareStrategy RAStrategy = new ResourceAwareStrategy(this.cluster, this.topologies);
+ //Need to re prepare scheduling strategy with cluster and topologies in case scheduling state was restored
+ RAStrategy.prepare(this.topologies, this.cluster, this.userMap, this.nodes);
SchedulingResult result = RAStrategy.schedule(td);
LOG.info("scheduling result: {}", result);
if (result.isValid()) {
@@ -201,8 +162,8 @@ public class ResourceAwareScheduler implements IScheduler {
if (mkAssignment(td, result.getSchedulingResultMap())) {
topologySubmitter.moveTopoFromPendingToRunning(td, this.cluster);
} else {
- //resetAssignments(assignmentCheckpoint);
this.restoreCheckpointSchedulingState(schedulingState);
+ //since state is restored need the update User topologySubmitter to the new User object in userMap
topologySubmitter = this.userMap.get(td.getTopologySubmitter());
topologySubmitter.moveTopoFromPendingToAttempted(td, this.cluster);
}
@@ -218,7 +179,19 @@ public class ResourceAwareScheduler implements IScheduler {
break;
} else {
if (result.getStatus() == SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES) {
- if (!this.makeSpaceForTopo(td)) {
+ if(evictionStrategy == null) {
+ try {
+ evictionStrategy = (IEvictionStrategy) Utils.newInstance((String) this.conf.get(Config.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY));
+ } catch (RuntimeException e) {
+ LOG.error("failed to create instance of eviction strategy: {} with error: {}! No topology eviction will be done.",
+ this.conf.get(Config.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY), e.getMessage());
+ topologySubmitter.moveTopoFromPendingToAttempted(td);
+ break;
+ }
+ }
+ //need to re prepare since scheduling state might have been restored
+ evictionStrategy.prepare(this.topologies, this.cluster, this.userMap, this.nodes);
+ if (!evictionStrategy.makeSpaceForTopo(td)) {
this.cluster.setStatus(td.getId(), result.getErrorMessage());
this.restoreCheckpointSchedulingState(schedulingState);
//since state is restored need the update User topologySubmitter to the new User object in userMap
@@ -324,41 +297,41 @@ public class ResourceAwareScheduler implements IScheduler {
return this.userMap;
}
- public User getNextUser() {
- Double least = Double.POSITIVE_INFINITY;
- User ret = null;
- for (User user : this.userMap.values()) {
- LOG.info("getNextUser {}", user.getDetailedInfo());
- LOG.info("hasTopologyNeedSchedule: {}", user.hasTopologyNeedSchedule());
- if (user.hasTopologyNeedSchedule()) {
- Double userResourcePoolAverageUtilization = user.getResourcePoolAverageUtilization();
- if(ret!=null) {
- LOG.info("current: {}-{} compareUser: {}-{}", ret.getId(), least, user.getId(), userResourcePoolAverageUtilization);
- LOG.info("{} == {}: {}", least, userResourcePoolAverageUtilization, least == userResourcePoolAverageUtilization);
- LOG.info("{} == {}: {}", least, userResourcePoolAverageUtilization, (Math.abs(least - userResourcePoolAverageUtilization) < 0.0001));
-
- }
- if (least > userResourcePoolAverageUtilization) {
- ret = user;
- least = userResourcePoolAverageUtilization;
- } else if (Math.abs(least - userResourcePoolAverageUtilization) < 0.0001) {
- double currentCpuPercentage = ret.getCPUResourceGuaranteed() / this.cluster.getClusterTotalCPUResource();
- double currentMemoryPercentage = ret.getMemoryResourceGuaranteed() / this.cluster.getClusterTotalMemoryResource();
- double currentAvgPercentage = (currentCpuPercentage + currentMemoryPercentage) / 2.0;
-
- double userCpuPercentage = user.getCPUResourceGuaranteed() / this.cluster.getClusterTotalCPUResource();
- double userMemoryPercentage = user.getMemoryResourceGuaranteed() / this.cluster.getClusterTotalMemoryResource();
- double userAvgPercentage = (userCpuPercentage + userMemoryPercentage) / 2.0;
- LOG.info("current: {}-{} compareUser: {}-{}", ret.getId(), currentAvgPercentage, user.getId(), userAvgPercentage);
- if (userAvgPercentage > currentAvgPercentage) {
- ret = user;
- least = userResourcePoolAverageUtilization;
- }
- }
- }
- }
- return ret;
- }
+// public User getNextUser() {
+// Double least = Double.POSITIVE_INFINITY;
+// User ret = null;
+// for (User user : this.userMap.values()) {
+// LOG.info("getNextUser {}", user.getDetailedInfo());
+// LOG.info("hasTopologyNeedSchedule: {}", user.hasTopologyNeedSchedule());
+// if (user.hasTopologyNeedSchedule()) {
+// Double userResourcePoolAverageUtilization = user.getResourcePoolAverageUtilization();
+// if(ret!=null) {
+// LOG.info("current: {}-{} compareUser: {}-{}", ret.getId(), least, user.getId(), userResourcePoolAverageUtilization);
+// LOG.info("{} == {}: {}", least, userResourcePoolAverageUtilization, least == userResourcePoolAverageUtilization);
+// LOG.info("{} == {}: {}", least, userResourcePoolAverageUtilization, (Math.abs(least - userResourcePoolAverageUtilization) < 0.0001));
+//
+// }
+// if (least > userResourcePoolAverageUtilization) {
+// ret = user;
+// least = userResourcePoolAverageUtilization;
+// } else if (Math.abs(least - userResourcePoolAverageUtilization) < 0.0001) {
+// double currentCpuPercentage = ret.getCPUResourceGuaranteed() / this.cluster.getClusterTotalCPUResource();
+// double currentMemoryPercentage = ret.getMemoryResourceGuaranteed() / this.cluster.getClusterTotalMemoryResource();
+// double currentAvgPercentage = (currentCpuPercentage + currentMemoryPercentage) / 2.0;
+//
+// double userCpuPercentage = user.getCPUResourceGuaranteed() / this.cluster.getClusterTotalCPUResource();
+// double userMemoryPercentage = user.getMemoryResourceGuaranteed() / this.cluster.getClusterTotalMemoryResource();
+// double userAvgPercentage = (userCpuPercentage + userMemoryPercentage) / 2.0;
+// LOG.info("current: {}-{} compareUser: {}-{}", ret.getId(), currentAvgPercentage, user.getId(), userAvgPercentage);
+// if (userAvgPercentage > currentAvgPercentage) {
+// ret = user;
+// least = userResourcePoolAverageUtilization;
+// }
+// }
+// }
+// }
+// return ret;
+// }
/**
* Intialize scheduling and running queues
http://git-wip-us.apache.org/repos/asf/storm/blob/4cd5efa3/storm-core/src/jvm/backtype/storm/scheduler/resource/User.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/User.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/User.java
index 77a1dff..2d7c79f 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/resource/User.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/User.java
@@ -314,25 +314,6 @@ public class User {
return ret;
}
- public static int cTo(TopologyDetails topo1, TopologyDetails topo2) {
- if (topo1.getId().compareTo(topo2.getId()) == 0) {
- return 0;
- }
- if (topo1.getTopologyPriority() > topo2.getTopologyPriority()) {
- return 1;
- } else if (topo1.getTopologyPriority() < topo2.getTopologyPriority()) {
- return -1;
- } else {
- if (topo1.getUpTime() > topo2.getUpTime()) {
- return -1;
- } else if (topo1.getUpTime() < topo2.getUpTime()) {
- return 1;
- } else {
- return topo1.getId().compareTo(topo2.getId());
- }
- }
- }
-
/**
* Comparator that sorts topologies by priority and then by submission time
*/
http://git-wip-us.apache.org/repos/asf/storm/blob/4cd5efa3/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/IStrategy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/IStrategy.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/IStrategy.java
deleted file mode 100644
index 722eddb..0000000
--- a/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/IStrategy.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package backtype.storm.scheduler.resource.strategies;
-
-import java.util.Collection;
-import java.util.Map;
-
-import backtype.storm.scheduler.Topologies;
-import backtype.storm.scheduler.ExecutorDetails;
-import backtype.storm.scheduler.TopologyDetails;
-import backtype.storm.scheduler.WorkerSlot;
-import backtype.storm.scheduler.resource.RAS_Node;
-import backtype.storm.scheduler.resource.SchedulingResult;
-
-/**
- * An interface to for implementing different scheduling strategies for the resource aware scheduling
- * In the future stategies will be pluggable
- */
-public interface IStrategy {
-
- public SchedulingResult schedule(TopologyDetails td);
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/4cd5efa3/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/ResourceAwareStrategy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/ResourceAwareStrategy.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/ResourceAwareStrategy.java
deleted file mode 100644
index 812bf5d..0000000
--- a/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/ResourceAwareStrategy.java
+++ /dev/null
@@ -1,486 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package backtype.storm.scheduler.resource.strategies;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Queue;
-import java.util.TreeMap;
-import java.util.HashSet;
-import java.util.Iterator;
-
-import backtype.storm.scheduler.resource.RAS_Nodes;
-import backtype.storm.scheduler.resource.SchedulingResult;
-import backtype.storm.scheduler.resource.SchedulingStatus;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.scheduler.Cluster;
-import backtype.storm.scheduler.ExecutorDetails;
-import backtype.storm.scheduler.Topologies;
-import backtype.storm.scheduler.TopologyDetails;
-import backtype.storm.scheduler.WorkerSlot;
-import backtype.storm.scheduler.resource.Component;
-import backtype.storm.scheduler.resource.RAS_Node;
-
-public class ResourceAwareStrategy implements IStrategy {
- private static final Logger LOG = LoggerFactory.getLogger(ResourceAwareStrategy.class);
- private Topologies _topologies;
- private Cluster _cluster;
- //Map key is the supervisor id and the value is the corresponding RAS_Node Object
- private Map<String, RAS_Node> _availNodes;
- private RAS_Node refNode = null;
- /**
- * supervisor id -> Node
- */
- private Map<String, RAS_Node> _nodes;
- private Map<String, List<String>> _clusterInfo;
-
- private final double CPU_WEIGHT = 1.0;
- private final double MEM_WEIGHT = 1.0;
- private final double NETWORK_WEIGHT = 1.0;
-
- public ResourceAwareStrategy(Cluster cluster, Topologies topologies) {
- _topologies = topologies;
- _cluster = cluster;
- _nodes = RAS_Nodes.getAllNodesFrom(cluster, _topologies);
- _availNodes = this.getAvailNodes();
- _clusterInfo = cluster.getNetworkTopography();
- LOG.debug(this.getClusterInfo());
- }
-
- //the returned TreeMap keeps the Components sorted
- private TreeMap<Integer, List<ExecutorDetails>> getPriorityToExecutorDetailsListMap(
- Queue<Component> ordered__Component_list, Collection<ExecutorDetails> unassignedExecutors) {
- TreeMap<Integer, List<ExecutorDetails>> retMap = new TreeMap<>();
- Integer rank = 0;
- for (Component ras_comp : ordered__Component_list) {
- retMap.put(rank, new ArrayList<ExecutorDetails>());
- for(ExecutorDetails exec : ras_comp.execs) {
- if(unassignedExecutors.contains(exec)) {
- retMap.get(rank).add(exec);
- }
- }
- rank++;
- }
- return retMap;
- }
-
- public SchedulingResult schedule(TopologyDetails td) {
- if (_availNodes.size() <= 0) {
- LOG.warn("No available nodes to schedule tasks on!");
- return SchedulingResult.failure(SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES, "No available nodes to schedule tasks on!");
- }
- Collection<ExecutorDetails> unassignedExecutors = _cluster.getUnassignedExecutors(td);
- Map<WorkerSlot, Collection<ExecutorDetails>> schedulerAssignmentMap = new HashMap<>();
- LOG.debug("ExecutorsNeedScheduling: {}", unassignedExecutors);
- Collection<ExecutorDetails> scheduledTasks = new ArrayList<>();
- List<Component> spouts = this.getSpouts(_topologies, td);
-
- if (spouts.size() == 0) {
- LOG.error("Cannot find a Spout!");
- return SchedulingResult.failure(SchedulingStatus.FAIL_INVALID_TOPOLOGY, "Cannot find a Spout!");
- }
-
- Queue<Component> ordered__Component_list = bfs(_topologies, td, spouts);
-
- Map<Integer, List<ExecutorDetails>> priorityToExecutorMap = getPriorityToExecutorDetailsListMap(ordered__Component_list, unassignedExecutors);
- Collection<ExecutorDetails> executorsNotScheduled = new HashSet<>(unassignedExecutors);
- Integer longestPriorityListSize = this.getLongestPriorityListSize(priorityToExecutorMap);
- //Pick the first executor with priority one, then the 1st exec with priority 2, so on an so forth.
- //Once we reach the last priority, we go back to priority 1 and schedule the second task with priority 1.
- for (int i = 0; i < longestPriorityListSize; i++) {
- for (Entry<Integer, List<ExecutorDetails>> entry : priorityToExecutorMap.entrySet()) {
- Iterator<ExecutorDetails> it = entry.getValue().iterator();
- if (it.hasNext()) {
- ExecutorDetails exec = it.next();
- LOG.debug("\n\nAttempting to schedule: {} of component {}[avail {}] with rank {}",
- new Object[] { exec, td.getExecutorToComponent().get(exec),
- td.getTaskResourceReqList(exec), entry.getKey() });
- WorkerSlot targetSlot = this.findWorkerForExec(exec, td, schedulerAssignmentMap);
- if (targetSlot != null) {
- RAS_Node targetNode = this.idToNode(targetSlot.getNodeId());
- if(!schedulerAssignmentMap.containsKey(targetSlot)) {
- schedulerAssignmentMap.put(targetSlot, new LinkedList<ExecutorDetails>());
- }
-
- schedulerAssignmentMap.get(targetSlot).add(exec);
- targetNode.consumeResourcesforTask(exec, td);
- scheduledTasks.add(exec);
- LOG.debug("TASK {} assigned to Node: {} avail [mem: {} cpu: {}] total [mem: {} cpu: {}] on slot: {}", exec,
- targetNode, targetNode.getAvailableMemoryResources(),
- targetNode.getAvailableCpuResources(), targetNode.getTotalMemoryResources(),
- targetNode.getTotalCpuResources(), targetSlot);
- } else {
- LOG.error("Not Enough Resources to schedule Task {}", exec);
- }
- it.remove();
- }
- }
- }
-
- executorsNotScheduled.removeAll(scheduledTasks);
- LOG.debug("/* Scheduling left over task (most likely sys tasks) */");
- // schedule left over system tasks
- for (ExecutorDetails exec : executorsNotScheduled) {
- WorkerSlot targetSlot = this.findWorkerForExec(exec, td, schedulerAssignmentMap);
- if (targetSlot != null) {
- RAS_Node targetNode = this.idToNode(targetSlot.getNodeId());
- if(!schedulerAssignmentMap.containsKey(targetSlot)) {
- schedulerAssignmentMap.put(targetSlot, new LinkedList<ExecutorDetails>());
- }
-
- schedulerAssignmentMap.get(targetSlot).add(exec);
- targetNode.consumeResourcesforTask(exec, td);
- scheduledTasks.add(exec);
- LOG.debug("TASK {} assigned to Node: {} avail [mem: {} cpu: {}] total [mem: {} cpu: {}] on slot: {}", exec,
- targetNode, targetNode.getAvailableMemoryResources(),
- targetNode.getAvailableCpuResources(), targetNode.getTotalMemoryResources(),
- targetNode.getTotalCpuResources(), targetSlot);
- } else {
- LOG.error("Not Enough Resources to schedule Task {}", exec);
- }
- }
-
- SchedulingResult result;
- executorsNotScheduled.removeAll(scheduledTasks);
- if (executorsNotScheduled.size() > 0) {
- LOG.error("Not all executors successfully scheduled: {}",
- executorsNotScheduled);
- schedulerAssignmentMap = null;
- result = SchedulingResult.failure(SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES, "Not all executors successfully scheduled: " + executorsNotScheduled);
- } else {
- LOG.debug("All resources successfully scheduled!");
- result = SchedulingResult.success(schedulerAssignmentMap);
- }
- if (schedulerAssignmentMap == null) {
- LOG.error("Topology {} not successfully scheduled!", td.getId());
- }
- return result;
- }
-
- private WorkerSlot findWorkerForExec(ExecutorDetails exec, TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) {
- WorkerSlot ws;
- // first scheduling
- if (this.refNode == null) {
- String clus = this.getBestClustering();
- ws = this.getBestWorker(exec, td, clus, scheduleAssignmentMap);
- } else {
- ws = this.getBestWorker(exec, td, scheduleAssignmentMap);
- }
- if(ws != null) {
- this.refNode = this.idToNode(ws.getNodeId());
- }
- LOG.debug("reference node for the resource aware scheduler is: {}", this.refNode);
- return ws;
- }
-
- private WorkerSlot getBestWorker(ExecutorDetails exec, TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) {
- return this.getBestWorker(exec, td, null, scheduleAssignmentMap);
- }
-
- private WorkerSlot getBestWorker(ExecutorDetails exec, TopologyDetails td, String clusterId, Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) {
- double taskMem = td.getTotalMemReqTask(exec);
- double taskCPU = td.getTotalCpuReqTask(exec);
- List<RAS_Node> nodes;
- if(clusterId != null) {
- nodes = this.getAvailableNodesFromCluster(clusterId);
-
- } else {
- nodes = this.getAvailableNodes();
- }
- //First sort nodes by distance
- TreeMap<Double, RAS_Node> nodeRankMap = new TreeMap<>();
- for (RAS_Node n : nodes) {
- if(n.getFreeSlots().size()>0) {
- if (n.getAvailableMemoryResources() >= taskMem
- && n.getAvailableCpuResources() >= taskCPU) {
- double a = Math.pow(((taskCPU - n.getAvailableCpuResources())/(n.getAvailableCpuResources() + 1))
- * this.CPU_WEIGHT, 2);
- double b = Math.pow(((taskMem - n.getAvailableMemoryResources())/(n.getAvailableMemoryResources() + 1))
- * this.MEM_WEIGHT, 2);
- double c = 0.0;
- if(this.refNode != null) {
- c = Math.pow(this.distToNode(this.refNode, n)
- * this.NETWORK_WEIGHT, 2);
- }
- double distance = Math.sqrt(a + b + c);
- nodeRankMap.put(distance, n);
- }
- }
- }
- //Then, pick worker from closest node that satisfy constraints
- for(Map.Entry<Double, RAS_Node> entry : nodeRankMap.entrySet()) {
- RAS_Node n = entry.getValue();
- for(WorkerSlot ws : n.getFreeSlots()) {
- if(checkWorkerConstraints(exec, ws, td, scheduleAssignmentMap)) {
- return ws;
- }
- }
- }
- return null;
- }
-
- private String getBestClustering() {
- String bestCluster = null;
- Double mostRes = 0.0;
- for (Entry<String, List<String>> cluster : _clusterInfo
- .entrySet()) {
- Double clusterTotalRes = this.getTotalClusterRes(cluster.getValue());
- if (clusterTotalRes > mostRes) {
- mostRes = clusterTotalRes;
- bestCluster = cluster.getKey();
- }
- }
- return bestCluster;
- }
-
- private Double getTotalClusterRes(List<String> cluster) {
- Double res = 0.0;
- for (String node : cluster) {
- res += _availNodes.get(this.NodeHostnameToId(node))
- .getAvailableMemoryResources()
- + _availNodes.get(this.NodeHostnameToId(node))
- .getAvailableCpuResources();
- }
- return res;
- }
-
- private Double distToNode(RAS_Node src, RAS_Node dest) {
- if (src.getId().equals(dest.getId())) {
- return 0.0;
- } else if (this.NodeToCluster(src).equals(this.NodeToCluster(dest))) {
- return 0.5;
- } else {
- return 1.0;
- }
- }
-
- private String NodeToCluster(RAS_Node node) {
- for (Entry<String, List<String>> entry : _clusterInfo
- .entrySet()) {
- if (entry.getValue().contains(node.getHostname())) {
- return entry.getKey();
- }
- }
- LOG.error("Node: {} not found in any clusters", node.getHostname());
- return null;
- }
-
- private List<RAS_Node> getAvailableNodes() {
- LinkedList<RAS_Node> nodes = new LinkedList<>();
- for (String clusterId : _clusterInfo.keySet()) {
- nodes.addAll(this.getAvailableNodesFromCluster(clusterId));
- }
- return nodes;
- }
-
- private List<RAS_Node> getAvailableNodesFromCluster(String clus) {
- List<RAS_Node> retList = new ArrayList<>();
- for (String node_id : _clusterInfo.get(clus)) {
- retList.add(_availNodes.get(this
- .NodeHostnameToId(node_id)));
- }
- return retList;
- }
-
- private List<WorkerSlot> getAvailableWorkersFromCluster(String clusterId) {
- List<RAS_Node> nodes = this.getAvailableNodesFromCluster(clusterId);
- List<WorkerSlot> workers = new LinkedList<>();
- for(RAS_Node node : nodes) {
- workers.addAll(node.getFreeSlots());
- }
- return workers;
- }
-
- private List<WorkerSlot> getAvailableWorker() {
- List<WorkerSlot> workers = new LinkedList<>();
- for (String clusterId : _clusterInfo.keySet()) {
- workers.addAll(this.getAvailableWorkersFromCluster(clusterId));
- }
- return workers;
- }
-
- /**
- * In case in the future RAS can only use a subset of nodes
- */
- private Map<String, RAS_Node> getAvailNodes() {
- return _nodes;
- }
-
- /**
- * Breadth first traversal of the topology DAG
- * @param topologies
- * @param td
- * @param spouts
- * @return A partial ordering of components
- */
- private Queue<Component> bfs(Topologies topologies, TopologyDetails td, List<Component> spouts) {
- // Since queue is a interface
- Queue<Component> ordered__Component_list = new LinkedList<Component>();
- HashMap<String, Component> visited = new HashMap<>();
-
- /* start from each spout that is not visited, each does a breadth-first traverse */
- for (Component spout : spouts) {
- if (!visited.containsKey(spout.id)) {
- Queue<Component> queue = new LinkedList<>();
- queue.offer(spout);
- while (!queue.isEmpty()) {
- Component comp = queue.poll();
- visited.put(comp.id, comp);
- ordered__Component_list.add(comp);
- List<String> neighbors = new ArrayList<>();
- neighbors.addAll(comp.children);
- neighbors.addAll(comp.parents);
- for (String nbID : neighbors) {
- if (!visited.containsKey(nbID)) {
- Component child = topologies.getAllComponents().get(td.getId()).get(nbID);
- queue.offer(child);
- }
- }
- }
- }
- }
- return ordered__Component_list;
- }
-
- private List<Component> getSpouts(Topologies topologies, TopologyDetails td) {
- List<Component> spouts = new ArrayList<>();
- for (Component c : topologies.getAllComponents().get(td.getId())
- .values()) {
- if (c.type == Component.ComponentType.SPOUT) {
- spouts.add(c);
- }
- }
- return spouts;
- }
-
- private Integer getLongestPriorityListSize(Map<Integer, List<ExecutorDetails>> priorityToExecutorMap) {
- Integer mostNum = 0;
- for (List<ExecutorDetails> execs : priorityToExecutorMap.values()) {
- Integer numExecs = execs.size();
- if (mostNum < numExecs) {
- mostNum = numExecs;
- }
- }
- return mostNum;
- }
-
- /**
- * Get the remaining amount memory that can be assigned to a worker given the set worker max heap size
- * @param ws
- * @param td
- * @param scheduleAssignmentMap
- * @return The remaining amount of memory
- */
- private Double getWorkerScheduledMemoryAvailable(WorkerSlot ws, TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) {
- Double memScheduleUsed = this.getWorkerScheduledMemoryUse(ws, td, scheduleAssignmentMap);
- return td.getTopologyWorkerMaxHeapSize() - memScheduleUsed;
- }
-
- /**
- * Get the amount of memory already assigned to a worker
- * @param ws
- * @param td
- * @param scheduleAssignmentMap
- * @return the amount of memory
- */
- private Double getWorkerScheduledMemoryUse(WorkerSlot ws, TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) {
- Double totalMem = 0.0;
- Collection<ExecutorDetails> execs = scheduleAssignmentMap.get(ws);
- if(execs != null) {
- for(ExecutorDetails exec : execs) {
- totalMem += td.getTotalMemReqTask(exec);
- }
- }
- return totalMem;
- }
-
- /**
- * Checks whether we can schedule an Executor exec on the worker slot ws
- * Only considers memory currently. May include CPU in the future
- * @param exec
- * @param ws
- * @param td
- * @param scheduleAssignmentMap
- * @return a boolean: True denoting the exec can be scheduled on ws and false if it cannot
- */
- private boolean checkWorkerConstraints(ExecutorDetails exec, WorkerSlot ws, TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) {
- boolean retVal = false;
- if(this.getWorkerScheduledMemoryAvailable(ws, td, scheduleAssignmentMap) >= td.getTotalMemReqTask(exec)) {
- retVal = true;
- }
- return retVal;
- }
-
- /**
- * Get the amount of resources available and total for each node
- * @return a String with cluster resource info for debug
- */
- private String getClusterInfo() {
- String retVal = "Cluster info:\n";
- for(Entry<String, List<String>> clusterEntry : _clusterInfo.entrySet()) {
- String clusterId = clusterEntry.getKey();
- retVal += "Rack: " + clusterId + "\n";
- for(String nodeHostname : clusterEntry.getValue()) {
- RAS_Node node = this.idToNode(this.NodeHostnameToId(nodeHostname));
- retVal += "-> Node: " + node.getHostname() + " " + node.getId() + "\n";
- retVal += "--> Avail Resources: {Mem " + node.getAvailableMemoryResources() + ", CPU " + node.getAvailableCpuResources() + "}\n";
- retVal += "--> Total Resources: {Mem " + node.getTotalMemoryResources() + ", CPU " + node.getTotalCpuResources() + "}\n";
- }
- }
- return retVal;
- }
-
- /**
- * hostname to Id
- * @param hostname
- * @return the id of a node
- */
- public String NodeHostnameToId(String hostname) {
- for (RAS_Node n : _nodes.values()) {
- if (n.getHostname() == null) {
- continue;
- }
- if (n.getHostname().equals(hostname)) {
- return n.getId();
- }
- }
- LOG.error("Cannot find Node with hostname {}", hostname);
- return null;
- }
-
- /**
- * Find RAS_Node for specified node id
- * @param id
- * @return a RAS_Node object
- */
- public RAS_Node idToNode(String id) {
- if(_nodes.containsKey(id) == false) {
- LOG.error("Cannot find Node with Id: {}", id);
- return null;
- }
- return _nodes.get(id);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/4cd5efa3/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/eviction/DefaultEvictionStrategy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/eviction/DefaultEvictionStrategy.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/eviction/DefaultEvictionStrategy.java
new file mode 100644
index 0000000..7ca7ac3
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/eviction/DefaultEvictionStrategy.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.scheduler.resource.strategies.eviction;
+
+import backtype.storm.scheduler.Cluster;
+import backtype.storm.scheduler.Topologies;
+import backtype.storm.scheduler.TopologyDetails;
+import backtype.storm.scheduler.WorkerSlot;
+import backtype.storm.scheduler.resource.RAS_Nodes;
+import backtype.storm.scheduler.resource.ResourceUtils;
+import backtype.storm.scheduler.resource.User;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Map;
+
+public class DefaultEvictionStrategy implements IEvictionStrategy {
+ private static final Logger LOG = LoggerFactory
+ .getLogger(DefaultEvictionStrategy.class);
+
+ private Topologies topologies;
+ private Cluster cluster;
+ private Map<String, User> userMap;
+ private RAS_Nodes nodes;
+
+ @Override
+ public void prepare(Topologies topologies, Cluster cluster, Map<String, User> userMap, RAS_Nodes nodes) {
+ this.topologies = topologies;
+ this.cluster = cluster;
+ this.userMap = userMap;
+ this.nodes = nodes;
+ }
+
+ @Override
+ public boolean makeSpaceForTopo(TopologyDetails td) {
+ LOG.info("attempting to make space for topo {} from user {}", td.getName(), td.getTopologySubmitter());
+ User submitter = this.userMap.get(td.getTopologySubmitter());
+ if (submitter.getCPUResourceGuaranteed() == null || submitter.getMemoryResourceGuaranteed() == null) {
+ return false;
+ }
+
+ double cpuNeeded = td.getTotalRequestedCpu() / submitter.getCPUResourceGuaranteed();
+ double memoryNeeded = (td.getTotalRequestedMemOffHeap() + td.getTotalRequestedMemOnHeap()) / submitter.getMemoryResourceGuaranteed();
+
+ //user has enough resource under his or her resource guarantee to schedule topology
+ if ((1.0 - submitter.getCPUResourcePoolUtilization()) >= cpuNeeded && (1.0 - submitter.getMemoryResourcePoolUtilization()) >= memoryNeeded) {
+ User evictUser = this.findUserWithMostResourcesAboveGuarantee();
+ if (evictUser == null) {
+ LOG.info("Cannot make space for topology {} from user {}", td.getName(), submitter.getId());
+ submitter.moveTopoFromPendingToAttempted(td, this.cluster);
+
+ return false;
+ }
+ TopologyDetails topologyEvict = evictUser.getRunningTopologyWithLowestPriority();
+ LOG.info("topology to evict: {}", topologyEvict);
+ evictTopology(topologyEvict);
+
+ LOG.info("Resources After eviction:\n{}", this.nodes);
+
+ return true;
+ } else {
+
+ if ((1.0 - submitter.getCPUResourcePoolUtilization()) < cpuNeeded) {
+
+ }
+
+ if ((1.0 - submitter.getMemoryResourcePoolUtilization()) < memoryNeeded) {
+
+ }
+ return false;
+
+ }
+ }
+
+ private void evictTopology(TopologyDetails topologyEvict) {
+ Collection<WorkerSlot> workersToEvict = this.cluster.getUsedSlotsByTopologyId(topologyEvict.getId());
+ User submitter = this.userMap.get(topologyEvict.getTopologySubmitter());
+
+ LOG.info("Evicting Topology {} with workers: {}", topologyEvict.getName(), workersToEvict);
+ LOG.debug("From Nodes:\n{}", ResourceUtils.printScheduling(this.nodes));
+ this.nodes.freeSlots(workersToEvict);
+ submitter.moveTopoFromRunningToPending(topologyEvict, this.cluster);
+ LOG.info("check if topology unassigned: {}", this.cluster.getUsedSlotsByTopologyId(topologyEvict.getId()));
+ }
+
+ private User findUserWithMostResourcesAboveGuarantee() {
+ double most = 0.0;
+ User mostOverUser = null;
+ for (User user : this.userMap.values()) {
+ double over = user.getResourcePoolAverageUtilization() - 1.0;
+ if ((over > most) && (!user.getTopologiesRunning().isEmpty())) {
+ most = over;
+ mostOverUser = user;
+ }
+ }
+ return mostOverUser;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4cd5efa3/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/eviction/IEvictionStrategy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/eviction/IEvictionStrategy.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/eviction/IEvictionStrategy.java
new file mode 100644
index 0000000..b787434
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/eviction/IEvictionStrategy.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.scheduler.resource.strategies.eviction;
+
+import backtype.storm.scheduler.Cluster;
+import backtype.storm.scheduler.Topologies;
+import backtype.storm.scheduler.TopologyDetails;
+import backtype.storm.scheduler.resource.RAS_Nodes;
+import backtype.storm.scheduler.resource.User;
+
+import java.util.Map;
+
+public interface IEvictionStrategy {
+
+ /**
+ * Initialization
+ */
+ public void prepare(Topologies topologies, Cluster cluster, Map<String, User> userMap, RAS_Nodes nodes);
+
+ /**
+ * This method when invoked should attempt to make space on the cluster so that the topology specified can be scheduled
+ * @param td the topology to make space for
+ * @return return true to indicate that space has been made for topology and try schedule topology td again.
+ * Return false to inidcate that no space could be made for the topology on the cluster and the scheduler should give up
+ * trying to schedule the topology for this round of scheduling. This method will be invoked until the topology indicated
+ * could be scheduled or the method returns false
+ */
+ public boolean makeSpaceForTopo(TopologyDetails td);
+
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4cd5efa3/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/priority/DefaultSchedulingPriorityStrategy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/priority/DefaultSchedulingPriorityStrategy.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/priority/DefaultSchedulingPriorityStrategy.java
new file mode 100644
index 0000000..5096fd6
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/priority/DefaultSchedulingPriorityStrategy.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.scheduler.resource.strategies.priority;
+
+import backtype.storm.scheduler.Cluster;
+import backtype.storm.scheduler.Topologies;
+import backtype.storm.scheduler.TopologyDetails;
+import backtype.storm.scheduler.resource.RAS_Nodes;
+import backtype.storm.scheduler.resource.User;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+public class DefaultSchedulingPriorityStrategy implements ISchedulingPriorityStrategy{
+ private static final Logger LOG = LoggerFactory
+ .getLogger(DefaultSchedulingPriorityStrategy.class);
+
+ private Topologies topologies;
+ private Cluster cluster;
+ private Map<String, User> userMap;
+ private RAS_Nodes nodes;
+
+ @Override
+ public void prepare(Topologies topologies, Cluster cluster, Map<String, User> userMap, RAS_Nodes nodes) {
+ this.topologies = topologies;
+ this.cluster = cluster;
+ this.userMap = userMap;
+ this.nodes = nodes;
+ }
+
+ @Override
+ public TopologyDetails getNextTopologyToSchedule() {
+ User nextUser = this.getNextUser();
+ if (nextUser == null) {
+ return null;
+ }
+ return nextUser.getNextTopologyToSchedule();
+ }
+
+ public User getNextUser() {
+ Double least = Double.POSITIVE_INFINITY;
+ User ret = null;
+ for (User user : this.userMap.values()) {
+ LOG.info("getNextUser {}", user.getDetailedInfo());
+ LOG.info("hasTopologyNeedSchedule: {}", user.hasTopologyNeedSchedule());
+ if (user.hasTopologyNeedSchedule()) {
+ Double userResourcePoolAverageUtilization = user.getResourcePoolAverageUtilization();
+ if(ret!=null) {
+ LOG.info("current: {}-{} compareUser: {}-{}", ret.getId(), least, user.getId(), userResourcePoolAverageUtilization);
+ LOG.info("{} == {}: {}", least, userResourcePoolAverageUtilization, least == userResourcePoolAverageUtilization);
+ LOG.info("{} == {}: {}", least, userResourcePoolAverageUtilization, (Math.abs(least - userResourcePoolAverageUtilization) < 0.0001));
+
+ }
+ if (least > userResourcePoolAverageUtilization) {
+ ret = user;
+ least = userResourcePoolAverageUtilization;
+ } else if (Math.abs(least - userResourcePoolAverageUtilization) < 0.0001) {
+ double currentCpuPercentage = ret.getCPUResourceGuaranteed() / this.cluster.getClusterTotalCPUResource();
+ double currentMemoryPercentage = ret.getMemoryResourceGuaranteed() / this.cluster.getClusterTotalMemoryResource();
+ double currentAvgPercentage = (currentCpuPercentage + currentMemoryPercentage) / 2.0;
+
+ double userCpuPercentage = user.getCPUResourceGuaranteed() / this.cluster.getClusterTotalCPUResource();
+ double userMemoryPercentage = user.getMemoryResourceGuaranteed() / this.cluster.getClusterTotalMemoryResource();
+ double userAvgPercentage = (userCpuPercentage + userMemoryPercentage) / 2.0;
+ LOG.info("current: {}-{} compareUser: {}-{}", ret.getId(), currentAvgPercentage, user.getId(), userAvgPercentage);
+ if (userAvgPercentage > currentAvgPercentage) {
+ ret = user;
+ least = userResourcePoolAverageUtilization;
+ }
+ }
+ }
+ }
+ return ret;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4cd5efa3/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/priority/ISchedulingPriorityStrategy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/priority/ISchedulingPriorityStrategy.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/priority/ISchedulingPriorityStrategy.java
new file mode 100644
index 0000000..7e92b3d
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/priority/ISchedulingPriorityStrategy.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.scheduler.resource.strategies.priority;
+
+import backtype.storm.scheduler.Cluster;
+import backtype.storm.scheduler.Topologies;
+import backtype.storm.scheduler.TopologyDetails;
+import backtype.storm.scheduler.resource.RAS_Nodes;
+import backtype.storm.scheduler.resource.User;
+
+import java.util.Map;
+
+public interface ISchedulingPriorityStrategy {
+
+ /**
+ * initializes
+ */
+ public void prepare(Topologies topologies, Cluster cluster, Map<String, User> userMap, RAS_Nodes nodes);
+
+ /**
+ *
+ * @return return the next topology to schedule. If there is no topologies left to schedule, return null
+ */
+ public TopologyDetails getNextTopologyToSchedule();
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4cd5efa3/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
new file mode 100644
index 0000000..1950858
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
@@ -0,0 +1,487 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.scheduler.resource.strategies.scheduling;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Queue;
+import java.util.TreeMap;
+import java.util.HashSet;
+import java.util.Iterator;
+
+import backtype.storm.scheduler.resource.RAS_Nodes;
+import backtype.storm.scheduler.resource.SchedulingResult;
+import backtype.storm.scheduler.resource.SchedulingStatus;
+import backtype.storm.scheduler.resource.User;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.scheduler.Cluster;
+import backtype.storm.scheduler.ExecutorDetails;
+import backtype.storm.scheduler.Topologies;
+import backtype.storm.scheduler.TopologyDetails;
+import backtype.storm.scheduler.WorkerSlot;
+import backtype.storm.scheduler.resource.Component;
+import backtype.storm.scheduler.resource.RAS_Node;
+
+public class DefaultResourceAwareStrategy implements IStrategy {
+ private static final Logger LOG = LoggerFactory.getLogger(DefaultResourceAwareStrategy.class);
+ private Topologies _topologies;
+ private Cluster _cluster;
+ //Map key is the supervisor id and the value is the corresponding RAS_Node Object
+ private Map<String, RAS_Node> _availNodes;
+ private RAS_Node refNode = null;
+ /**
+ * supervisor id -> Node
+ */
+ private Map<String, RAS_Node> _nodes;
+ private Map<String, List<String>> _clusterInfo;
+
+ private final double CPU_WEIGHT = 1.0;
+ private final double MEM_WEIGHT = 1.0;
+ private final double NETWORK_WEIGHT = 1.0;
+
+ public void prepare (Topologies topologies, Cluster cluster, Map<String, User> userMap, RAS_Nodes nodes) {
+ _topologies = topologies;
+ _cluster = cluster;
+ _nodes = RAS_Nodes.getAllNodesFrom(cluster, _topologies);
+ _availNodes = this.getAvailNodes();
+ _clusterInfo = cluster.getNetworkTopography();
+ LOG.debug(this.getClusterInfo());
+ }
+
+ //the returned TreeMap keeps the Components sorted
+ private TreeMap<Integer, List<ExecutorDetails>> getPriorityToExecutorDetailsListMap(
+ Queue<Component> ordered__Component_list, Collection<ExecutorDetails> unassignedExecutors) {
+ TreeMap<Integer, List<ExecutorDetails>> retMap = new TreeMap<>();
+ Integer rank = 0;
+ for (Component ras_comp : ordered__Component_list) {
+ retMap.put(rank, new ArrayList<ExecutorDetails>());
+ for(ExecutorDetails exec : ras_comp.execs) {
+ if(unassignedExecutors.contains(exec)) {
+ retMap.get(rank).add(exec);
+ }
+ }
+ rank++;
+ }
+ return retMap;
+ }
+
+ public SchedulingResult schedule(TopologyDetails td) {
+ if (_availNodes.size() <= 0) {
+ LOG.warn("No available nodes to schedule tasks on!");
+ return SchedulingResult.failure(SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES, "No available nodes to schedule tasks on!");
+ }
+ Collection<ExecutorDetails> unassignedExecutors = _cluster.getUnassignedExecutors(td);
+ Map<WorkerSlot, Collection<ExecutorDetails>> schedulerAssignmentMap = new HashMap<>();
+ LOG.debug("ExecutorsNeedScheduling: {}", unassignedExecutors);
+ Collection<ExecutorDetails> scheduledTasks = new ArrayList<>();
+ List<Component> spouts = this.getSpouts(_topologies, td);
+
+ if (spouts.size() == 0) {
+ LOG.error("Cannot find a Spout!");
+ return SchedulingResult.failure(SchedulingStatus.FAIL_INVALID_TOPOLOGY, "Cannot find a Spout!");
+ }
+
+ Queue<Component> ordered__Component_list = bfs(_topologies, td, spouts);
+
+ Map<Integer, List<ExecutorDetails>> priorityToExecutorMap = getPriorityToExecutorDetailsListMap(ordered__Component_list, unassignedExecutors);
+ Collection<ExecutorDetails> executorsNotScheduled = new HashSet<>(unassignedExecutors);
+ Integer longestPriorityListSize = this.getLongestPriorityListSize(priorityToExecutorMap);
+ //Pick the first executor with priority one, then the 1st exec with priority 2, so on an so forth.
+ //Once we reach the last priority, we go back to priority 1 and schedule the second task with priority 1.
+ for (int i = 0; i < longestPriorityListSize; i++) {
+ for (Entry<Integer, List<ExecutorDetails>> entry : priorityToExecutorMap.entrySet()) {
+ Iterator<ExecutorDetails> it = entry.getValue().iterator();
+ if (it.hasNext()) {
+ ExecutorDetails exec = it.next();
+ LOG.debug("\n\nAttempting to schedule: {} of component {}[avail {}] with rank {}",
+ new Object[] { exec, td.getExecutorToComponent().get(exec),
+ td.getTaskResourceReqList(exec), entry.getKey() });
+ WorkerSlot targetSlot = this.findWorkerForExec(exec, td, schedulerAssignmentMap);
+ if (targetSlot != null) {
+ RAS_Node targetNode = this.idToNode(targetSlot.getNodeId());
+ if(!schedulerAssignmentMap.containsKey(targetSlot)) {
+ schedulerAssignmentMap.put(targetSlot, new LinkedList<ExecutorDetails>());
+ }
+
+ schedulerAssignmentMap.get(targetSlot).add(exec);
+ targetNode.consumeResourcesforTask(exec, td);
+ scheduledTasks.add(exec);
+ LOG.debug("TASK {} assigned to Node: {} avail [mem: {} cpu: {}] total [mem: {} cpu: {}] on slot: {}", exec,
+ targetNode, targetNode.getAvailableMemoryResources(),
+ targetNode.getAvailableCpuResources(), targetNode.getTotalMemoryResources(),
+ targetNode.getTotalCpuResources(), targetSlot);
+ } else {
+ LOG.error("Not Enough Resources to schedule Task {}", exec);
+ }
+ it.remove();
+ }
+ }
+ }
+
+ executorsNotScheduled.removeAll(scheduledTasks);
+ LOG.debug("/* Scheduling left over task (most likely sys tasks) */");
+ // schedule left over system tasks
+ for (ExecutorDetails exec : executorsNotScheduled) {
+ WorkerSlot targetSlot = this.findWorkerForExec(exec, td, schedulerAssignmentMap);
+ if (targetSlot != null) {
+ RAS_Node targetNode = this.idToNode(targetSlot.getNodeId());
+ if(!schedulerAssignmentMap.containsKey(targetSlot)) {
+ schedulerAssignmentMap.put(targetSlot, new LinkedList<ExecutorDetails>());
+ }
+
+ schedulerAssignmentMap.get(targetSlot).add(exec);
+ targetNode.consumeResourcesforTask(exec, td);
+ scheduledTasks.add(exec);
+ LOG.debug("TASK {} assigned to Node: {} avail [mem: {} cpu: {}] total [mem: {} cpu: {}] on slot: {}", exec,
+ targetNode, targetNode.getAvailableMemoryResources(),
+ targetNode.getAvailableCpuResources(), targetNode.getTotalMemoryResources(),
+ targetNode.getTotalCpuResources(), targetSlot);
+ } else {
+ LOG.error("Not Enough Resources to schedule Task {}", exec);
+ }
+ }
+
+ SchedulingResult result;
+ executorsNotScheduled.removeAll(scheduledTasks);
+ if (executorsNotScheduled.size() > 0) {
+ LOG.error("Not all executors successfully scheduled: {}",
+ executorsNotScheduled);
+ schedulerAssignmentMap = null;
+ result = SchedulingResult.failure(SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES, "Not all executors successfully scheduled: " + executorsNotScheduled);
+ } else {
+ LOG.debug("All resources successfully scheduled!");
+ result = SchedulingResult.success(schedulerAssignmentMap);
+ }
+ if (schedulerAssignmentMap == null) {
+ LOG.error("Topology {} not successfully scheduled!", td.getId());
+ }
+ return result;
+ }
+
+ private WorkerSlot findWorkerForExec(ExecutorDetails exec, TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) {
+ WorkerSlot ws;
+ // first scheduling
+ if (this.refNode == null) {
+ String clus = this.getBestClustering();
+ ws = this.getBestWorker(exec, td, clus, scheduleAssignmentMap);
+ } else {
+ ws = this.getBestWorker(exec, td, scheduleAssignmentMap);
+ }
+ if(ws != null) {
+ this.refNode = this.idToNode(ws.getNodeId());
+ }
+ LOG.debug("reference node for the resource aware scheduler is: {}", this.refNode);
+ return ws;
+ }
+
+ private WorkerSlot getBestWorker(ExecutorDetails exec, TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) {
+ return this.getBestWorker(exec, td, null, scheduleAssignmentMap);
+ }
+
+ private WorkerSlot getBestWorker(ExecutorDetails exec, TopologyDetails td, String clusterId, Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) {
+ double taskMem = td.getTotalMemReqTask(exec);
+ double taskCPU = td.getTotalCpuReqTask(exec);
+ List<RAS_Node> nodes;
+ if(clusterId != null) {
+ nodes = this.getAvailableNodesFromCluster(clusterId);
+
+ } else {
+ nodes = this.getAvailableNodes();
+ }
+ //First sort nodes by distance
+ TreeMap<Double, RAS_Node> nodeRankMap = new TreeMap<>();
+ for (RAS_Node n : nodes) {
+ if(n.getFreeSlots().size()>0) {
+ if (n.getAvailableMemoryResources() >= taskMem
+ && n.getAvailableCpuResources() >= taskCPU) {
+ double a = Math.pow(((taskCPU - n.getAvailableCpuResources())/(n.getAvailableCpuResources() + 1))
+ * this.CPU_WEIGHT, 2);
+ double b = Math.pow(((taskMem - n.getAvailableMemoryResources())/(n.getAvailableMemoryResources() + 1))
+ * this.MEM_WEIGHT, 2);
+ double c = 0.0;
+ if(this.refNode != null) {
+ c = Math.pow(this.distToNode(this.refNode, n)
+ * this.NETWORK_WEIGHT, 2);
+ }
+ double distance = Math.sqrt(a + b + c);
+ nodeRankMap.put(distance, n);
+ }
+ }
+ }
+ //Then, pick worker from closest node that satisfy constraints
+ for(Map.Entry<Double, RAS_Node> entry : nodeRankMap.entrySet()) {
+ RAS_Node n = entry.getValue();
+ for(WorkerSlot ws : n.getFreeSlots()) {
+ if(checkWorkerConstraints(exec, ws, td, scheduleAssignmentMap)) {
+ return ws;
+ }
+ }
+ }
+ return null;
+ }
+
+ private String getBestClustering() {
+ String bestCluster = null;
+ Double mostRes = 0.0;
+ for (Entry<String, List<String>> cluster : _clusterInfo
+ .entrySet()) {
+ Double clusterTotalRes = this.getTotalClusterRes(cluster.getValue());
+ if (clusterTotalRes > mostRes) {
+ mostRes = clusterTotalRes;
+ bestCluster = cluster.getKey();
+ }
+ }
+ return bestCluster;
+ }
+
+ private Double getTotalClusterRes(List<String> cluster) {
+ Double res = 0.0;
+ for (String node : cluster) {
+ res += _availNodes.get(this.NodeHostnameToId(node))
+ .getAvailableMemoryResources()
+ + _availNodes.get(this.NodeHostnameToId(node))
+ .getAvailableCpuResources();
+ }
+ return res;
+ }
+
+ private Double distToNode(RAS_Node src, RAS_Node dest) {
+ if (src.getId().equals(dest.getId())) {
+ return 0.0;
+ } else if (this.NodeToCluster(src).equals(this.NodeToCluster(dest))) {
+ return 0.5;
+ } else {
+ return 1.0;
+ }
+ }
+
+ private String NodeToCluster(RAS_Node node) {
+ for (Entry<String, List<String>> entry : _clusterInfo
+ .entrySet()) {
+ if (entry.getValue().contains(node.getHostname())) {
+ return entry.getKey();
+ }
+ }
+ LOG.error("Node: {} not found in any clusters", node.getHostname());
+ return null;
+ }
+
+ private List<RAS_Node> getAvailableNodes() {
+ LinkedList<RAS_Node> nodes = new LinkedList<>();
+ for (String clusterId : _clusterInfo.keySet()) {
+ nodes.addAll(this.getAvailableNodesFromCluster(clusterId));
+ }
+ return nodes;
+ }
+
+ private List<RAS_Node> getAvailableNodesFromCluster(String clus) {
+ List<RAS_Node> retList = new ArrayList<>();
+ for (String node_id : _clusterInfo.get(clus)) {
+ retList.add(_availNodes.get(this
+ .NodeHostnameToId(node_id)));
+ }
+ return retList;
+ }
+
+ private List<WorkerSlot> getAvailableWorkersFromCluster(String clusterId) {
+ List<RAS_Node> nodes = this.getAvailableNodesFromCluster(clusterId);
+ List<WorkerSlot> workers = new LinkedList<>();
+ for(RAS_Node node : nodes) {
+ workers.addAll(node.getFreeSlots());
+ }
+ return workers;
+ }
+
+ private List<WorkerSlot> getAvailableWorker() {
+ List<WorkerSlot> workers = new LinkedList<>();
+ for (String clusterId : _clusterInfo.keySet()) {
+ workers.addAll(this.getAvailableWorkersFromCluster(clusterId));
+ }
+ return workers;
+ }
+
+ /**
+ * In case in the future RAS can only use a subset of nodes
+ */
+ private Map<String, RAS_Node> getAvailNodes() {
+ return _nodes;
+ }
+
+ /**
+ * Breadth first traversal of the topology DAG
+ * @param topologies
+ * @param td
+ * @param spouts
+ * @return A partial ordering of components
+ */
+ private Queue<Component> bfs(Topologies topologies, TopologyDetails td, List<Component> spouts) {
+ // Since queue is a interface
+ Queue<Component> ordered__Component_list = new LinkedList<Component>();
+ HashMap<String, Component> visited = new HashMap<>();
+
+ /* start from each spout that is not visited, each does a breadth-first traverse */
+ for (Component spout : spouts) {
+ if (!visited.containsKey(spout.id)) {
+ Queue<Component> queue = new LinkedList<>();
+ queue.offer(spout);
+ while (!queue.isEmpty()) {
+ Component comp = queue.poll();
+ visited.put(comp.id, comp);
+ ordered__Component_list.add(comp);
+ List<String> neighbors = new ArrayList<>();
+ neighbors.addAll(comp.children);
+ neighbors.addAll(comp.parents);
+ for (String nbID : neighbors) {
+ if (!visited.containsKey(nbID)) {
+ Component child = topologies.getAllComponents().get(td.getId()).get(nbID);
+ queue.offer(child);
+ }
+ }
+ }
+ }
+ }
+ return ordered__Component_list;
+ }
+
+ private List<Component> getSpouts(Topologies topologies, TopologyDetails td) {
+ List<Component> spouts = new ArrayList<>();
+ for (Component c : topologies.getAllComponents().get(td.getId())
+ .values()) {
+ if (c.type == Component.ComponentType.SPOUT) {
+ spouts.add(c);
+ }
+ }
+ return spouts;
+ }
+
+ private Integer getLongestPriorityListSize(Map<Integer, List<ExecutorDetails>> priorityToExecutorMap) {
+ Integer mostNum = 0;
+ for (List<ExecutorDetails> execs : priorityToExecutorMap.values()) {
+ Integer numExecs = execs.size();
+ if (mostNum < numExecs) {
+ mostNum = numExecs;
+ }
+ }
+ return mostNum;
+ }
+
+ /**
+ * Get the remaining amount memory that can be assigned to a worker given the set worker max heap size
+ * @param ws
+ * @param td
+ * @param scheduleAssignmentMap
+ * @return The remaining amount of memory
+ */
+ private Double getWorkerScheduledMemoryAvailable(WorkerSlot ws, TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) {
+ Double memScheduleUsed = this.getWorkerScheduledMemoryUse(ws, td, scheduleAssignmentMap);
+ return td.getTopologyWorkerMaxHeapSize() - memScheduleUsed;
+ }
+
+ /**
+ * Get the amount of memory already assigned to a worker
+ * @param ws
+ * @param td
+ * @param scheduleAssignmentMap
+ * @return the amount of memory
+ */
+ private Double getWorkerScheduledMemoryUse(WorkerSlot ws, TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) {
+ Double totalMem = 0.0;
+ Collection<ExecutorDetails> execs = scheduleAssignmentMap.get(ws);
+ if(execs != null) {
+ for(ExecutorDetails exec : execs) {
+ totalMem += td.getTotalMemReqTask(exec);
+ }
+ }
+ return totalMem;
+ }
+
+ /**
+ * Checks whether we can schedule an Executor exec on the worker slot ws
+ * Only considers memory currently. May include CPU in the future
+ * @param exec
+ * @param ws
+ * @param td
+ * @param scheduleAssignmentMap
+ * @return a boolean: True denoting the exec can be scheduled on ws and false if it cannot
+ */
+ private boolean checkWorkerConstraints(ExecutorDetails exec, WorkerSlot ws, TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) {
+ boolean retVal = false;
+ if(this.getWorkerScheduledMemoryAvailable(ws, td, scheduleAssignmentMap) >= td.getTotalMemReqTask(exec)) {
+ retVal = true;
+ }
+ return retVal;
+ }
+
+ /**
+ * Get the amount of resources available and total for each node
+ * @return a String with cluster resource info for debug
+ */
+ private String getClusterInfo() {
+ String retVal = "Cluster info:\n";
+ for(Entry<String, List<String>> clusterEntry : _clusterInfo.entrySet()) {
+ String clusterId = clusterEntry.getKey();
+ retVal += "Rack: " + clusterId + "\n";
+ for(String nodeHostname : clusterEntry.getValue()) {
+ RAS_Node node = this.idToNode(this.NodeHostnameToId(nodeHostname));
+ retVal += "-> Node: " + node.getHostname() + " " + node.getId() + "\n";
+ retVal += "--> Avail Resources: {Mem " + node.getAvailableMemoryResources() + ", CPU " + node.getAvailableCpuResources() + "}\n";
+ retVal += "--> Total Resources: {Mem " + node.getTotalMemoryResources() + ", CPU " + node.getTotalCpuResources() + "}\n";
+ }
+ }
+ return retVal;
+ }
+
+ /**
+ * hostname to Id
+ * @param hostname
+ * @return the id of a node
+ */
+ public String NodeHostnameToId(String hostname) {
+ for (RAS_Node n : _nodes.values()) {
+ if (n.getHostname() == null) {
+ continue;
+ }
+ if (n.getHostname().equals(hostname)) {
+ return n.getId();
+ }
+ }
+ LOG.error("Cannot find Node with hostname {}", hostname);
+ return null;
+ }
+
+ /**
+ * Find RAS_Node for specified node id
+ * @param id
+ * @return a RAS_Node object
+ */
+ public RAS_Node idToNode(String id) {
+ if(_nodes.containsKey(id) == false) {
+ LOG.error("Cannot find Node with Id: {}", id);
+ return null;
+ }
+ return _nodes.get(id);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4cd5efa3/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/scheduling/IStrategy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/scheduling/IStrategy.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/scheduling/IStrategy.java
new file mode 100644
index 0000000..12e8ff3
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/scheduling/IStrategy.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.scheduler.resource.strategies.scheduling;
+
+import java.util.Collection;
+import java.util.Map;
+
+import backtype.storm.scheduler.Cluster;
+import backtype.storm.scheduler.Topologies;
+import backtype.storm.scheduler.ExecutorDetails;
+import backtype.storm.scheduler.TopologyDetails;
+import backtype.storm.scheduler.WorkerSlot;
+import backtype.storm.scheduler.resource.RAS_Node;
+import backtype.storm.scheduler.resource.RAS_Nodes;
+import backtype.storm.scheduler.resource.SchedulingResult;
+import backtype.storm.scheduler.resource.User;
+
+/**
+ * An interface to for implementing different scheduling strategies for the resource aware scheduling
+ * In the future stategies will be pluggable
+ */
+public interface IStrategy {
+
+ public void prepare(Topologies topologies, Cluster cluster, Map<String, User> userMap, RAS_Nodes nodes);
+
+ public SchedulingResult schedule(TopologyDetails td);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4cd5efa3/storm-core/src/jvm/backtype/storm/validation/ConfigValidation.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/validation/ConfigValidation.java b/storm-core/src/jvm/backtype/storm/validation/ConfigValidation.java
index 52c4ed1..b82a4ec 100644
--- a/storm-core/src/jvm/backtype/storm/validation/ConfigValidation.java
+++ b/storm-core/src/jvm/backtype/storm/validation/ConfigValidation.java
@@ -523,6 +523,31 @@ public class ConfigValidation {
}
}
+ public static class ImplementsClassValidator extends Validator {
+
+ Class classImplements;
+
+ public ImplementsClassValidator(Map<String, Object> params) {
+ this.classImplements = (Class) params.get(ConfigValidationAnnotations.ValidatorParams.IMPLEMENTS_CLASS);
+ }
+
+ @Override
+ public void validateField(String name, Object o) {
+ if(o == null) {
+ return;
+ }
+ SimpleTypeValidator.validateField(name, String.class, o);
+ try {
+ Class objectClass = Class.forName((String) o);
+ if(!this.classImplements.isAssignableFrom(objectClass)) {
+ throw new IllegalArgumentException("Field " + name + " with value " + o + " does not implement " + this.classImplements.getName());
+ }
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
/**
* Methods for validating confs
*/
http://git-wip-us.apache.org/repos/asf/storm/blob/4cd5efa3/storm-core/src/jvm/backtype/storm/validation/ConfigValidationAnnotations.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/validation/ConfigValidationAnnotations.java b/storm-core/src/jvm/backtype/storm/validation/ConfigValidationAnnotations.java
index c47f523..28707e4 100644
--- a/storm-core/src/jvm/backtype/storm/validation/ConfigValidationAnnotations.java
+++ b/storm-core/src/jvm/backtype/storm/validation/ConfigValidationAnnotations.java
@@ -46,6 +46,7 @@ public class ConfigValidationAnnotations {
static final String VALUE_TYPE = "valueType";
static final String INCLUDE_ZERO = "includeZero";
static final String ACCEPTED_VALUES = "acceptedValues";
+ static final String IMPLEMENTS_CLASS = "implementsClass";
}
/**
@@ -176,6 +177,14 @@ public class ConfigValidationAnnotations {
boolean includeZero() default false;
}
+ @Retention(RetentionPolicy.RUNTIME)
+ @Target(ElementType.FIELD)
+ public @interface isImplementationOfClass {
+ Class validatorClass() default ConfigValidation.ImplementsClassValidator.class;
+
+ Class implementsClass();
+ }
+
/**
* Complex/custom type validators
*/