You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by da...@apache.org on 2015/12/21 15:46:38 UTC
[10/23] storm git commit: first initial implementation
first initial implementation
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3e832205
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3e832205
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3e832205
Branch: refs/heads/master
Commit: 3e8322053c3d7fc7877b2985bae1ad657562c931
Parents: 4cd5efa
Author: Boyang Jerry Peng <je...@yahoo-inc.com>
Authored: Wed Nov 25 17:13:44 2015 -0600
Committer: Boyang Jerry Peng <je...@yahoo-inc.com>
Committed: Fri Dec 4 13:07:11 2015 -0600
----------------------------------------------------------------------
conf/user-resource-pools-example.yaml | 26 +
.../starter/ResourceAwareExampleTopology.java | 6 +
.../src/clj/backtype/storm/daemon/nimbus.clj | 5 +-
storm-core/src/jvm/backtype/storm/Config.java | 5 +-
.../jvm/backtype/storm/scheduler/Cluster.java | 41 +-
.../storm/scheduler/SupervisorDetails.java | 4 +-
.../backtype/storm/scheduler/Topologies.java | 8 +-
.../storm/scheduler/TopologyDetails.java | 47 +-
.../storm/scheduler/resource/RAS_Node.java | 32 +-
.../storm/scheduler/resource/RAS_Nodes.java | 7 +-
.../resource/ResourceAwareScheduler.java | 228 +++++----
.../backtype/storm/scheduler/resource/User.java | 52 +-
.../eviction/DefaultEvictionStrategy.java | 46 +-
.../DefaultSchedulingPriorityStrategy.java | 14 +-
.../priority/ISchedulingPriorityStrategy.java | 2 +-
.../DefaultResourceAwareStrategy.java | 4 +-
.../strategies/scheduling/IStrategy.java | 11 +
.../storm/validation/ConfigValidation.java | 12 +-
.../scheduler/resource_aware_scheduler_test.clj | 129 +++--
.../storm/scheduler/resource/Experiment.java | 222 ---------
.../resource/TestResourceAwareScheduler.java | 479 ++++++++++++++++++-
.../storm/scheduler/resource/TestUser.java | 4 +-
.../TestUtilsForResourceAwareScheduler.java | 42 +-
23 files changed, 893 insertions(+), 533 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/3e832205/conf/user-resource-pools-example.yaml
----------------------------------------------------------------------
diff --git a/conf/user-resource-pools-example.yaml b/conf/user-resource-pools-example.yaml
new file mode 100644
index 0000000..829a6be
--- /dev/null
+++ b/conf/user-resource-pools-example.yaml
@@ -0,0 +1,26 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+resource.aware.scheduler.user.pools:
+ jerry:
+ cpu: 1000
+ memory: 8192.0
+ derek:
+ cpu: 10000.0
+ memory: 32768
+ bobby:
+ cpu: 5000.0
+ memory: 16384.0
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/3e832205/examples/storm-starter/src/jvm/storm/starter/ResourceAwareExampleTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/ResourceAwareExampleTopology.java b/examples/storm-starter/src/jvm/storm/starter/ResourceAwareExampleTopology.java
index e8225d4..a9ac659 100644
--- a/examples/storm-starter/src/jvm/storm/starter/ResourceAwareExampleTopology.java
+++ b/examples/storm-starter/src/jvm/storm/starter/ResourceAwareExampleTopology.java
@@ -84,6 +84,12 @@ public class ResourceAwareExampleTopology {
*/
conf.setTopologyWorkerMaxHeapSize(1024.0);
+ // Set topology priority 0-30 with 0 being the highest priority and 30 being the lowest priority.
+ conf.setTopologyPriority(30);
+
+ //Set strategy to schedule topology. If not specified, default to backtype.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy
+ conf.setTopologyStrategy(backtype.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class);
+
if (args != null && args.length > 0) {
conf.setNumWorkers(3);
http://git-wip-us.apache.org/repos/asf/storm/blob/3e832205/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
index e22161b..d76825d 100644
--- a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
@@ -847,13 +847,14 @@
supervisors (read-all-supervisor-details nimbus all-scheduling-slots supervisor->dead-ports)
cluster (Cluster. (:inimbus nimbus) supervisors topology->scheduler-assignment conf)
-
+ _ (.setStatusMap cluster (deref (:id->sched-status nimbus)))
;; call scheduler.schedule to schedule all the topologies
;; the new assignments for all the topologies are in the cluster object.
_ (.schedule (:scheduler nimbus) topologies cluster)
_ (.setResourcesMap cluster @(:id->resources nimbus))
_ (if-not (conf SCHEDULER-DISPLAY-RESOURCE) (.updateAssignedMemoryForTopologyAndSupervisor cluster topologies))
- _ (reset! (:id->sched-status nimbus) (.getStatusMap cluster))
+ ;;merge with existing statuses
+ _ (reset! (:id->sched-status nimbus) (merge (deref (:id->sched-status nimbus)) (.getStatusMap cluster)))
_ (reset! (:node-id->resources nimbus) (.getSupervisorsResourcesMap cluster))
_ (reset! (:id->resources nimbus) (.getResourcesMap cluster))]
(.getAssignments cluster)))
http://git-wip-us.apache.org/repos/asf/storm/blob/3e832205/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java
index 54af6fa..f3c8c4a 100644
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@ -2228,11 +2228,10 @@ public class Config extends HashMap<String, Object> {
}
/**
- * Takes as input the scheduler class name.
- * Currently only the Multitenant Scheduler and Resource Aware Scheduler are supported
+ * Takes as input the strategy class name. Strategy must implement the IStrategy interface
*/
public void setTopologyStrategy(Class<? extends IStrategy> clazz) {
- if(clazz != null) {
+ if (clazz != null) {
this.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, clazz.getName());
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/3e832205/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java b/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java
index 2676af1..c35dbbd 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java
@@ -28,6 +28,8 @@ import java.util.Set;
import backtype.storm.Config;
import backtype.storm.networktopography.DNSToSwitchMapping;
import backtype.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class Cluster {
@@ -91,9 +93,18 @@ public class Cluster {
this.conf = storm_conf;
}
+ /**
+ * Get a copy of this cluster object
+ */
public Cluster getCopy() {
- Cluster copy = new Cluster(this.inimbus, this.supervisors, this.assignments, this.conf);
- for(Map.Entry<String, String> entry : this.status.entrySet()) {
+ HashMap<String, SchedulerAssignmentImpl> newAssignments = new HashMap<String, SchedulerAssignmentImpl>();
+ for (Map.Entry<String, SchedulerAssignmentImpl> entry : this.assignments.entrySet()) {
+ newAssignments.put(entry.getKey(), new SchedulerAssignmentImpl(entry.getValue().getTopologyId(), entry.getValue().getExecutorToSlot()));
+ }
+ Map newConf = new HashMap<String, Object>();
+ newConf.putAll(this.conf);
+ Cluster copy = new Cluster(this.inimbus, this.supervisors, newAssignments, newConf);
+ for (Map.Entry<String, String> entry : this.status.entrySet()) {
copy.setStatus(entry.getKey(), entry.getValue());
}
return copy;
@@ -447,9 +458,12 @@ public class Cluster {
return ret;
}
- public void setAssignments(Map<String, SchedulerAssignment> assignments) {
+ /**
+ * set assignments for cluster
+ */
+ public void setAssignments(Map<String, SchedulerAssignment> newAssignments) {
this.assignments = new HashMap<String, SchedulerAssignmentImpl>();
- for(Map.Entry<String, SchedulerAssignmentImpl> entry : this.assignments.entrySet()) {
+ for (Map.Entry<String, SchedulerAssignment> entry : newAssignments.entrySet()) {
this.assignments.put(entry.getKey(), new SchedulerAssignmentImpl(entry.getValue().getTopologyId(), entry.getValue().getExecutorToSlot()));
}
}
@@ -466,7 +480,7 @@ public class Cluster {
*/
public double getClusterTotalCPUResource() {
double sum = 0.0;
- for(SupervisorDetails sup: this.supervisors.values()) {
+ for (SupervisorDetails sup : this.supervisors.values()) {
sum += sup.getTotalCPU();
}
return sum;
@@ -477,7 +491,7 @@ public class Cluster {
*/
public double getClusterTotalMemoryResource() {
double sum = 0.0;
- for(SupervisorDetails sup: this.supervisors.values()) {
+ for (SupervisorDetails sup : this.supervisors.values()) {
sum += sup.getTotalMemory();
}
return sum;
@@ -607,14 +621,29 @@ public class Cluster {
}
}
+ /**
+ * set scheduler status for a topology
+ */
+ private static final Logger LOG = LoggerFactory
+ .getLogger(Cluster.class);
public void setStatus(String topologyId, String status) {
this.status.put(topologyId, status);
}
+ /**
+ * Get all schedule statuses
+ */
public Map<String, String> getStatusMap() {
return this.status;
}
+ /**
+ * set scheduler status map
+ */
+ public void setStatusMap(Map<String, String> statusMap) {
+ this.status.putAll(statusMap);
+ }
+
public void setResources(String topologyId, Double[] resources) {
this.resources.put(topologyId, resources);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/3e832205/storm-core/src/jvm/backtype/storm/scheduler/SupervisorDetails.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/SupervisorDetails.java b/storm-core/src/jvm/backtype/storm/scheduler/SupervisorDetails.java
index d93252f..afdabe8 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/SupervisorDetails.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/SupervisorDetails.java
@@ -120,7 +120,7 @@ public class SupervisorDetails {
public Double getTotalMemory() {
Double totalMemory = getTotalResource(Config.SUPERVISOR_MEMORY_CAPACITY_MB);
- if(totalMemory == null) {
+ if (totalMemory == null) {
throw new IllegalStateException("default value for supervisor.memory.capacity.mb is not set!");
}
return totalMemory;
@@ -128,7 +128,7 @@ public class SupervisorDetails {
public Double getTotalCPU() {
Double totalCPU = getTotalResource(Config.SUPERVISOR_CPU_CAPACITY);
- if(totalCPU == null) {
+ if (totalCPU == null) {
throw new IllegalStateException("default value for supervisor.cpu.capacity is not set!");
}
return totalCPU;
http://git-wip-us.apache.org/repos/asf/storm/blob/3e832205/storm-core/src/jvm/backtype/storm/scheduler/Topologies.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/Topologies.java b/storm-core/src/jvm/backtype/storm/scheduler/Topologies.java
index 59a53c8..3a6361f 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/Topologies.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/Topologies.java
@@ -29,10 +29,10 @@ public class Topologies {
Map<String, Map<String, Component>> _allComponents;
public Topologies(Map<String, TopologyDetails> topologies) {
- if(topologies==null) topologies = new HashMap();
- this.topologies = new HashMap<String, TopologyDetails>(topologies.size());
+ if(topologies==null) topologies = new HashMap<>();
+ this.topologies = new HashMap<>(topologies.size());
this.topologies.putAll(topologies);
- this.nameToId = new HashMap<String, String>(topologies.size());
+ this.nameToId = new HashMap<>(topologies.size());
for (Map.Entry<String, TopologyDetails> entry : topologies.entrySet()) {
TopologyDetails topology = entry.getValue();
@@ -75,7 +75,7 @@ public class Topologies {
@Override
public String toString() {
String ret = "Topologies:\n";
- for(TopologyDetails td : this.getTopologies()) {
+ for (TopologyDetails td : this.getTopologies()) {
ret += td.toString() + "\n";
}
return ret;
http://git-wip-us.apache.org/repos/asf/storm/blob/3e832205/storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java b/storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java
index 3931b43..871ae9b 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java
@@ -39,18 +39,18 @@ import org.slf4j.LoggerFactory;
public class TopologyDetails {
- String topologyId;
- Map topologyConf;
- StormTopology topology;
- Map<ExecutorDetails, String> executorToComponent;
- int numWorkers;
+ private String topologyId;
+ private Map topologyConf;
+ private StormTopology topology;
+ private Map<ExecutorDetails, String> executorToComponent;
+ private int numWorkers;
//<ExecutorDetails - Task, Map<String - Type of resource, Map<String - type of that resource, Double - amount>>>
private Map<ExecutorDetails, Map<String, Double>> _resourceList;
//Max heap size for a worker used by topology
private Double topologyWorkerMaxHeapSize;
-
+ //topology priority
private Integer topologyPriority;
-
+ //when topology was launched
private int launchTime;
private static final Logger LOG = LoggerFactory.getLogger(TopologyDetails.class);
@@ -415,17 +415,16 @@ public class TopologyDetails {
* Add default resource requirements for a executor
*/
public void addDefaultResforExec(ExecutorDetails exec) {
-
Double topologyComponentCpuPcorePercent = Utils.getDouble(topologyConf.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT), null);
- if(topologyComponentCpuPcorePercent == null) {
+ if (topologyComponentCpuPcorePercent == null) {
LOG.warn("default value for topology.component.cpu.pcore.percent needs to be set!");
}
- Double topologyComponentResourcesOffheapMemoryMb = Utils.getDouble(topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB), null);
- if(topologyComponentResourcesOffheapMemoryMb == null) {
+ Double topologyComponentResourcesOffheapMemoryMb = Utils.getDouble(topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB), null);
+ if (topologyComponentResourcesOffheapMemoryMb == null) {
LOG.warn("default value for topology.component.resources.offheap.memory.mb needs to be set!");
}
Double topologyComponentResourcesOnheapMemoryMb = Utils.getDouble(topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB), null);
- if(topologyComponentResourcesOnheapMemoryMb == null) {
+ if (topologyComponentResourcesOnheapMemoryMb == null) {
LOG.warn("default value for topology.component.resources.onheap.memory.mb needs to be set!");
}
@@ -446,11 +445,11 @@ public class TopologyDetails {
*/
private void initConfigs() {
this.topologyWorkerMaxHeapSize = Utils.getDouble(this.topologyConf.get(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB), null);
- if(this.topologyWorkerMaxHeapSize == null) {
+ if (this.topologyWorkerMaxHeapSize == null) {
LOG.warn("default value for topology.worker.max.heap.size.mb needs to be set!");
}
this.topologyPriority = Utils.getInt(this.topologyConf.get(Config.TOPOLOGY_PRIORITY), null);
- if(this.topologyPriority == null) {
+ if (this.topologyPriority == null) {
LOG.warn("default value for topology.priority needs to be set!");
}
}
@@ -463,17 +462,35 @@ public class TopologyDetails {
return this.topologyWorkerMaxHeapSize;
}
+ /**
+ * Get the user that submitted this topology
+ */
public String getTopologySubmitter() {
- return (String)this.topologyConf.get(Config.TOPOLOGY_SUBMITTER_USER);
+ String user = (String) this.topologyConf.get(Config.TOPOLOGY_SUBMITTER_USER);
+ if (user == null || user.equals("")) {
+ LOG.debug("Topology {} submitted by anonymous user", this.getName());
+ user = "anonymous";
+ }
+ return user;
}
+ /**
+ * get teh priority of this topology
+ */
public int getTopologyPriority() {
return this.topologyPriority;
}
+
+ /**
+ * Get the timestamp of when this topology was launched
+ */
public int getLaunchTime() {
return this.launchTime;
}
+ /**
+ * Get how long this topology has been executing
+ */
public int getUpTime() {
return Time.currentTimeSecs() - this.launchTime;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/3e832205/storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Node.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Node.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Node.java
index 21367f0..54775bf 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Node.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Node.java
@@ -90,7 +90,7 @@ public class RAS_Node {
public Collection<WorkerSlot> getUsedSlots() {
Collection<WorkerSlot> ret = new LinkedList<WorkerSlot>();
- for(Collection<WorkerSlot> workers : _topIdToUsedSlots.values()) {
+ for (Collection<WorkerSlot> workers : _topIdToUsedSlots.values()) {
ret.addAll(workers);
}
return ret;
@@ -243,30 +243,28 @@ public class RAS_Node {
public void freeMemory(double amount) {
_availMemory += amount;
- LOG.info("freeing {} memory...avail mem: {}", amount, _availMemory);
- if(_availMemory > this.getTotalMemoryResources()) {
+ LOG.debug("freeing {} memory on node {}...avail mem: {}", amount, this.getHostname(), _availMemory);
+ if (_availMemory > this.getTotalMemoryResources()) {
LOG.warn("Freeing more memory than there exists!");
}
}
public void freeCPU(double amount) {
_availCPU += amount;
- LOG.info("freeing {} CPU...avail CPU: {}", amount, _availCPU);
- if(_availCPU > this.getAvailableCpuResources()) {
+ LOG.debug("freeing {} CPU on node...avail CPU: {}", amount, this.getHostname(), _availCPU);
+ if (_availCPU > this.getAvailableCpuResources()) {
LOG.warn("Freeing more memory than there exists!");
}
}
public double getMemoryUsedByWorker(WorkerSlot ws) {
TopologyDetails topo = this.findTopologyUsingWorker(ws);
- LOG.info("Topology {} using worker {}", topo, ws);
- if(topo == null) {
+ if (topo == null) {
return 0.0;
}
Collection<ExecutorDetails> execs = this.getExecutors(ws, this._cluster);
- LOG.info("Worker {} has execs: {}", ws, execs);
double totalMemoryUsed = 0.0;
- for(ExecutorDetails exec : execs) {
+ for (ExecutorDetails exec : execs) {
totalMemoryUsed += topo.getTotalMemReqTask(exec);
}
return totalMemoryUsed;
@@ -274,26 +272,23 @@ public class RAS_Node {
public double getCpuUsedByWorker(WorkerSlot ws) {
TopologyDetails topo = this.findTopologyUsingWorker(ws);
- LOG.info("Topology {} using worker {}", topo, ws);
- if(topo == null) {
+ if (topo == null) {
return 0.0;
}
Collection<ExecutorDetails> execs = this.getExecutors(ws, this._cluster);
- LOG.info("Worker {} has execs: {}", ws, execs);
double totalCpuUsed = 0.0;
- for(ExecutorDetails exec : execs) {
+ for (ExecutorDetails exec : execs) {
totalCpuUsed += topo.getTotalCpuReqTask(exec);
}
return totalCpuUsed;
}
public TopologyDetails findTopologyUsingWorker(WorkerSlot ws) {
- for(Entry<String, Set<WorkerSlot>> entry : _topIdToUsedSlots.entrySet()) {
- LOG.info("topoId: {} workers: {}", entry.getKey(), entry.getValue());
+ for (Entry<String, Set<WorkerSlot>> entry : _topIdToUsedSlots.entrySet()) {
String topoId = entry.getKey();
Set<WorkerSlot> workers = entry.getValue();
for (WorkerSlot worker : workers) {
- if(worker.getNodeId().equals(ws.getNodeId()) && worker.getPort() == ws.getPort()) {
+ if (worker.getNodeId().equals(ws.getNodeId()) && worker.getPort() == ws.getPort()) {
return _topologies.getById(topoId);
}
}
@@ -376,7 +371,9 @@ public class RAS_Node {
@Override
public String toString() {
- return "{Node: " + _sup.getHost() + ", AvailMem: " + _availMemory.toString() + ", AvailCPU: " + _availCPU.toString() + "}";
+ return "{Node: " + ((_sup == null) ? "null (possibly down)" : _sup.getHost())
+ + ", AvailMem: " + ((_availMemory == null) ? "N/A" : _availMemory.toString())
+ + ", AvailCPU: " + ((_availCPU == null) ? "N/A" : _availCPU.toString()) + "}";
}
public static int countSlotsUsed(String topId, Collection<RAS_Node> nodes) {
@@ -415,7 +412,6 @@ public class RAS_Node {
return total;
}
- //This function is only used for logging information
public static Collection<ExecutorDetails> getExecutors(WorkerSlot ws, Cluster cluster) {
Collection<ExecutorDetails> retList = new ArrayList<ExecutorDetails>();
for (Entry<String, SchedulerAssignment> entry : cluster.getAssignments()
http://git-wip-us.apache.org/repos/asf/storm/blob/3e832205/storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Nodes.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Nodes.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Nodes.java
index 42fc236..5a99bdd 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Nodes.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Nodes.java
@@ -35,16 +35,11 @@ import java.util.Map;
public class RAS_Nodes {
private Map<String, RAS_Node> nodeMap;
- private Cluster cluster;
- private Topologies topologies;
private static final Logger LOG = LoggerFactory.getLogger(RAS_Nodes.class);
-
public RAS_Nodes(Cluster cluster, Topologies topologies) {
this.nodeMap = getAllNodesFrom(cluster, topologies);
- this.cluster = cluster;
- this.topologies = topologies;
}
public static Map<String, RAS_Node> getAllNodesFrom(Cluster cluster, Topologies topologies) {
@@ -142,7 +137,7 @@ public class RAS_Nodes {
for (RAS_Node node : nodeMap.values()) {
for (WorkerSlot ws : node.getUsedSlots()) {
if (workerSlots.contains(ws)) {
- LOG.info("freeing ws {} on node {}", ws, node);
+ LOG.debug("freeing ws {} on node {}", ws, node);
node.free(ws);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/3e832205/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java
index f5c8354..53672f6 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java
@@ -32,8 +32,8 @@ import backtype.storm.scheduler.IScheduler;
import backtype.storm.scheduler.Topologies;
import backtype.storm.scheduler.TopologyDetails;
import backtype.storm.scheduler.WorkerSlot;
-import backtype.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy;
+import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
@@ -55,7 +55,7 @@ public class ResourceAwareScheduler implements IScheduler {
private Map conf = new Config();
public SchedulingState(Map<String, User> userMap, Cluster cluster, Topologies topologies, RAS_Nodes nodes, Map conf) {
- for(Map.Entry<String, User> userMapEntry : userMap.entrySet()) {
+ for (Map.Entry<String, User> userMapEntry : userMap.entrySet()) {
String userId = userMapEntry.getKey();
User user = userMapEntry.getValue();
this.userMap.put(userId, user.getCopy());
@@ -84,61 +84,51 @@ public class ResourceAwareScheduler implements IScheduler {
@Override
public void schedule(Topologies topologies, Cluster cluster) {
LOG.info("\n\n\nRerunning ResourceAwareScheduler...");
- LOG.debug(ResourceUtils.printScheduling(cluster, topologies));
- LOG.info("topologies: {}", topologies);
-
+ //initialize data structures
this.initialize(topologies, cluster);
-
- LOG.info("UserMap:\n{}", this.userMap);
+ //logs everything that is currently scheduled and the location at which they are scheduled
+ LOG.info("Cluster scheduling:\n{}", ResourceUtils.printScheduling(cluster, topologies));
+ //logs the resources available/used for every node
+ LOG.info("Nodes:\n{}", this.nodes);
+ //logs the detailed info about each user
for (User user : this.getUserMap().values()) {
LOG.info(user.getDetailedInfo());
}
- for (TopologyDetails topo : topologies.getTopologies()) {
- LOG.info("topo {} status: {}", topo, cluster.getStatusMap().get(topo.getId()));
- }
-
- LOG.info("Nodes:\n{}", this.nodes);
-
- //LOG.info("getNextUser: {}", this.getNextUser());
-
ISchedulingPriorityStrategy schedulingPrioritystrategy = null;
while (true) {
- LOG.info("/*********** next scheduling iteration **************/");
- if(schedulingPrioritystrategy == null) {
+ if (schedulingPrioritystrategy == null) {
try {
schedulingPrioritystrategy = (ISchedulingPriorityStrategy) Utils.newInstance((String) this.conf.get(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY));
} catch (RuntimeException e) {
- LOG.error("failed to create instance of priority strategy: {} with error: {}! No topology eviction will be done.",
+ LOG.error("failed to create instance of priority strategy: {} with error: {}! No topologies will be scheduled.",
this.conf.get(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY), e.getMessage());
break;
}
}
- //need to re prepare since scheduling state might have been restored
- schedulingPrioritystrategy.prepare(this.topologies, this.cluster, this.userMap, this.nodes);
- //Call scheduling priority strategy
- TopologyDetails td = schedulingPrioritystrategy.getNextTopologyToSchedule();
- if(td == null) {
+ TopologyDetails td = null;
+ try {
+ //need to re prepare since scheduling state might have been restored
+ schedulingPrioritystrategy.prepare(this.topologies, this.cluster, this.userMap, this.nodes);
+ //Call scheduling priority strategy
+ td = schedulingPrioritystrategy.getNextTopologyToSchedule();
+ } catch (Exception e) {
+ LOG.error("Exception thrown when running priority strategy {}. No topologies will be scheduled! Error: {} StackTrack: {}"
+ , schedulingPrioritystrategy.getClass().getName(), e.getMessage(), Arrays.toString(e.getStackTrace()));
+ break;
+ }
+ if (td == null) {
break;
}
scheduleTopology(td);
}
-
- //since scheduling state might have been restored thus need to set the cluster and topologies.
- cluster = this.cluster;
- topologies = this.topologies;
}
public void scheduleTopology(TopologyDetails td) {
User topologySubmitter = this.userMap.get(td.getTopologySubmitter());
if (cluster.getUnassignedExecutors(td).size() > 0) {
- LOG.info("/********Scheduling topology {} from User {}************/", td.getName(), topologySubmitter);
- LOG.info("{}", this.userMap.get(td.getTopologySubmitter()).getDetailedInfo());
- LOG.info("{}", User.getResourcePoolAverageUtilizationForUsers(this.userMap.values()));
- LOG.info("Nodes:\n{}", this.nodes);
- LOG.debug("From cluster:\n{}", ResourceUtils.printScheduling(this.cluster, this.topologies));
- LOG.debug("From Nodes:\n{}", ResourceUtils.printScheduling(this.nodes));
+ LOG.debug("/********Scheduling topology {} from User {}************/", td.getName(), topologySubmitter);
SchedulingState schedulingState = this.checkpointSchedulingState();
IStrategy RAStrategy = null;
@@ -147,39 +137,59 @@ public class ResourceAwareScheduler implements IScheduler {
} catch (RuntimeException e) {
LOG.error("failed to create instance of IStrategy: {} with error: {}! Topology {} will not be scheduled.",
td.getName(), td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY), e.getMessage());
- topologySubmitter.moveTopoFromPendingToInvalid(td, this.cluster);
+ this.restoreCheckpointSchedulingState(schedulingState);
+ //since state is restored need the update User topologySubmitter to the new User object in userMap
+ topologySubmitter = this.userMap.get(td.getTopologySubmitter());
+ topologySubmitter.moveTopoFromPendingToInvalid(td);
+ this.cluster.setStatus(td.getId(), "Unsuccessful in scheduling - failed to create instance of topology strategy "
+ + td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY) + ". Please check logs for details");
return;
}
IEvictionStrategy evictionStrategy = null;
while (true) {
- //Need to re prepare scheduling strategy with cluster and topologies in case scheduling state was restored
- RAStrategy.prepare(this.topologies, this.cluster, this.userMap, this.nodes);
- SchedulingResult result = RAStrategy.schedule(td);
- LOG.info("scheduling result: {}", result);
- if (result.isValid()) {
+ SchedulingResult result = null;
+ try {
+ //Need to re prepare scheduling strategy with cluster and topologies in case scheduling state was restored
+ RAStrategy.prepare(this.topologies, this.cluster, this.userMap, this.nodes);
+ result = RAStrategy.schedule(td);
+ } catch (Exception e) {
+ LOG.error("Exception thrown when running strategy {} to schedule topology {}. Topology will not be scheduled! Error: {} StackTrack: {}"
+ , RAStrategy.getClass().getName(), td.getName(), e.getMessage(), Arrays.toString(e.getStackTrace()));
+ this.restoreCheckpointSchedulingState(schedulingState);
+ //since state is restored need the update User topologySubmitter to the new User object in userMap
+ topologySubmitter = this.userMap.get(td.getTopologySubmitter());
+ topologySubmitter.moveTopoFromPendingToInvalid(td);
+ this.cluster.setStatus(td.getId(), "Unsuccessful in scheduling - Exception thrown when running strategy {}"
+ + RAStrategy.getClass().getName() + ". Please check logs for details");
+ }
+ LOG.debug("scheduling result: {}", result);
+ if (result != null && result.isValid()) {
if (result.isSuccess()) {
try {
if (mkAssignment(td, result.getSchedulingResultMap())) {
- topologySubmitter.moveTopoFromPendingToRunning(td, this.cluster);
+ topologySubmitter.moveTopoFromPendingToRunning(td);
+ this.cluster.setStatus(td.getId(), "Running - " + result.getMessage());
} else {
this.restoreCheckpointSchedulingState(schedulingState);
//since state is restored need the update User topologySubmitter to the new User object in userMap
topologySubmitter = this.userMap.get(td.getTopologySubmitter());
- topologySubmitter.moveTopoFromPendingToAttempted(td, this.cluster);
+ topologySubmitter.moveTopoFromPendingToAttempted(td);
+ this.cluster.setStatus(td.getId(), "Unsuccessful in scheduling - Unable to assign executors to nodes. Please check logs for details");
}
} catch (IllegalStateException ex) {
LOG.error(ex.toString());
- LOG.error("Unsuccessful in scheduling: IllegalStateException thrown!", td.getId());
- this.cluster.setStatus(td.getId(), "Unsuccessful in scheduling");
+ LOG.error("Unsuccessful in scheduling - IllegalStateException thrown when attempting to assign executors to nodes. Error: {} StackTrace: {}",
+ ex.getClass().getName(), Arrays.toString(ex.getStackTrace()));
this.restoreCheckpointSchedulingState(schedulingState);
//since state is restored need the update User topologySubmitter to the new User object in userMap
topologySubmitter = this.userMap.get(td.getTopologySubmitter());
- topologySubmitter.moveTopoFromPendingToAttempted(td, this.cluster);
+ topologySubmitter.moveTopoFromPendingToAttempted(td);
+ this.cluster.setStatus(td.getId(), "Unsuccessful in scheduling - IllegalStateException thrown when attempting to assign executors to nodes. Please check log for details.");
}
break;
} else {
if (result.getStatus() == SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES) {
- if(evictionStrategy == null) {
+ if (evictionStrategy == null) {
try {
evictionStrategy = (IEvictionStrategy) Utils.newInstance((String) this.conf.get(Config.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY));
} catch (RuntimeException e) {
@@ -189,16 +199,29 @@ public class ResourceAwareScheduler implements IScheduler {
break;
}
}
- //need to re prepare since scheduling state might have been restored
- evictionStrategy.prepare(this.topologies, this.cluster, this.userMap, this.nodes);
- if (!evictionStrategy.makeSpaceForTopo(td)) {
- this.cluster.setStatus(td.getId(), result.getErrorMessage());
+ boolean madeSpace = false;
+ try {
+ //need to re prepare since scheduling state might have been restored
+ evictionStrategy.prepare(this.topologies, this.cluster, this.userMap, this.nodes);
+ madeSpace = evictionStrategy.makeSpaceForTopo(td);
+ } catch (Exception e) {
+ LOG.error("Exception thrown when running eviction strategy {} to schedule topology {}. No evictions will be done! Error: {} StackTrack: {}"
+ , evictionStrategy.getClass().getName(), td.getName(), e.getClass().getName(), Arrays.toString(e.getStackTrace()));
this.restoreCheckpointSchedulingState(schedulingState);
//since state is restored need the update User topologySubmitter to the new User object in userMap
topologySubmitter = this.userMap.get(td.getTopologySubmitter());
topologySubmitter.moveTopoFromPendingToAttempted(td);
break;
}
+ if (!madeSpace) {
+ LOG.debug("Could not make space for topo {} will move to attempted", td);
+ this.restoreCheckpointSchedulingState(schedulingState);
+ //since state is restored need the update User topologySubmitter to the new User object in userMap
+ topologySubmitter = this.userMap.get(td.getTopologySubmitter());
+ topologySubmitter.moveTopoFromPendingToAttempted(td);
+ this.cluster.setStatus(td.getId(), "Not enough resources to schedule - " + result.getErrorMessage());
+ break;
+ }
continue;
} else if (result.getStatus() == SchedulingStatus.FAIL_INVALID_TOPOLOGY) {
this.restoreCheckpointSchedulingState(schedulingState);
@@ -225,13 +248,14 @@ public class ResourceAwareScheduler implements IScheduler {
}
} else {
LOG.warn("Topology {} is already fully scheduled!", td.getName());
- topologySubmitter.moveTopoFromPendingToRunning(td, this.cluster);
- throw new IllegalStateException("illegal");
+ topologySubmitter.moveTopoFromPendingToRunning(td);
+ if (this.cluster.getStatusMap().get(td.getId()) == null || this.cluster.getStatusMap().get(td.getId()).equals("")) {
+ this.cluster.setStatus(td.getId(), "Fully Scheduled");
+ }
}
}
private boolean mkAssignment(TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>> schedulerAssignmentMap) {
- LOG.info("making assignments for topology {}", td);
if (schedulerAssignmentMap != null) {
double requestedMemOnHeap = td.getTotalRequestedMemOnHeap();
double requestedMemOffHeap = td.getTotalRequestedMemOffHeap();
@@ -241,7 +265,6 @@ public class ResourceAwareScheduler implements IScheduler {
double assignedCpu = 0.0;
Set<String> nodesUsed = new HashSet<String>();
- int assignedWorkers = schedulerAssignmentMap.keySet().size();
for (Map.Entry<WorkerSlot, Collection<ExecutorDetails>> workerToTasksEntry : schedulerAssignmentMap.entrySet()) {
WorkerSlot targetSlot = workerToTasksEntry.getKey();
Collection<ExecutorDetails> execsNeedScheduling = workerToTasksEntry.getValue();
@@ -256,8 +279,6 @@ public class ResourceAwareScheduler implements IScheduler {
assignedMemOffHeap += targetSlot.getAllocatedMemOffHeap();
assignedCpu += targetSlot.getAllocatedCpu();
}
- LOG.debug("Topology: {} assigned to {} nodes on {} workers", td.getId(), nodesUsed.size(), assignedWorkers);
- this.cluster.setStatus(td.getId(), "Fully Scheduled");
Double[] resources = {requestedMemOnHeap, requestedMemOffHeap, requestedCpu,
assignedMemOnHeap, assignedMemOffHeap, assignedCpu};
@@ -266,17 +287,17 @@ public class ResourceAwareScheduler implements IScheduler {
td.getId(), requestedMemOnHeap, requestedMemOffHeap, requestedCpu,
assignedMemOnHeap, assignedMemOffHeap, assignedCpu);
this.cluster.setResources(td.getId(), resources);
+ updateSupervisorsResources(this.cluster, this.topologies);
return true;
} else {
LOG.warn("schedulerAssignmentMap for topo {} is null. This shouldn't happen!", td.getName());
return false;
}
- updateSupervisorsResources(cluster, topologies);
}
private void updateSupervisorsResources(Cluster cluster, Topologies topologies) {
Map<String, Double[]> supervisors_resources = new HashMap<String, Double[]>();
- Map<String, RAS_Node> nodes = RAS_Node.getAllNodesFrom(cluster, topologies);
+ Map<String, RAS_Node> nodes = RAS_Nodes.getAllNodesFrom(cluster, topologies);
for (Map.Entry<String, RAS_Node> entry : nodes.entrySet()) {
RAS_Node node = entry.getValue();
Double totalMem = node.getTotalMemoryResources();
@@ -297,42 +318,6 @@ public class ResourceAwareScheduler implements IScheduler {
return this.userMap;
}
-// public User getNextUser() {
-// Double least = Double.POSITIVE_INFINITY;
-// User ret = null;
-// for (User user : this.userMap.values()) {
-// LOG.info("getNextUser {}", user.getDetailedInfo());
-// LOG.info("hasTopologyNeedSchedule: {}", user.hasTopologyNeedSchedule());
-// if (user.hasTopologyNeedSchedule()) {
-// Double userResourcePoolAverageUtilization = user.getResourcePoolAverageUtilization();
-// if(ret!=null) {
-// LOG.info("current: {}-{} compareUser: {}-{}", ret.getId(), least, user.getId(), userResourcePoolAverageUtilization);
-// LOG.info("{} == {}: {}", least, userResourcePoolAverageUtilization, least == userResourcePoolAverageUtilization);
-// LOG.info("{} == {}: {}", least, userResourcePoolAverageUtilization, (Math.abs(least - userResourcePoolAverageUtilization) < 0.0001));
-//
-// }
-// if (least > userResourcePoolAverageUtilization) {
-// ret = user;
-// least = userResourcePoolAverageUtilization;
-// } else if (Math.abs(least - userResourcePoolAverageUtilization) < 0.0001) {
-// double currentCpuPercentage = ret.getCPUResourceGuaranteed() / this.cluster.getClusterTotalCPUResource();
-// double currentMemoryPercentage = ret.getMemoryResourceGuaranteed() / this.cluster.getClusterTotalMemoryResource();
-// double currentAvgPercentage = (currentCpuPercentage + currentMemoryPercentage) / 2.0;
-//
-// double userCpuPercentage = user.getCPUResourceGuaranteed() / this.cluster.getClusterTotalCPUResource();
-// double userMemoryPercentage = user.getMemoryResourceGuaranteed() / this.cluster.getClusterTotalMemoryResource();
-// double userAvgPercentage = (userCpuPercentage + userMemoryPercentage) / 2.0;
-// LOG.info("current: {}-{} compareUser: {}-{}", ret.getId(), currentAvgPercentage, user.getId(), userAvgPercentage);
-// if (userAvgPercentage > currentAvgPercentage) {
-// ret = user;
-// least = userResourcePoolAverageUtilization;
-// }
-// }
-// }
-// }
-// return ret;
-// }
-
/**
* Intialize scheduling and running queues
*
@@ -340,33 +325,40 @@ public class ResourceAwareScheduler implements IScheduler {
* @param cluster
*/
private void initUsers(Topologies topologies, Cluster cluster) {
-
this.userMap = new HashMap<String, User>();
Map<String, Map<String, Double>> userResourcePools = this.getUserResourcePools();
- LOG.info("userResourcePools: {}", userResourcePools);
+ LOG.debug("userResourcePools: {}", userResourcePools);
for (TopologyDetails td : topologies.getTopologies()) {
+ //Get user that submitted topology. If topology submitter is null or empty string, the topologySubmitter
+ //will be set to anonymous
String topologySubmitter = td.getTopologySubmitter();
- if (topologySubmitter == null) {
- LOG.warn("Topology {} submitted by anonymous user", td.getName());
- topologySubmitter = "anonymous";
+ //additional safety check to make sure that topologySubmitter is going to be a valid value
+ if (topologySubmitter == null || topologySubmitter.equals("")) {
+ LOG.error("Cannot determine user for topology {}. Will skip scheduling this topology", td.getName());
+ continue;
}
if (!this.userMap.containsKey(topologySubmitter)) {
this.userMap.put(topologySubmitter, new User(topologySubmitter, userResourcePools.get(topologySubmitter)));
}
- if (cluster.getUnassignedExecutors(td).size() >= td.getExecutors().size()) {
- this.userMap.get(topologySubmitter).addTopologyToPendingQueue(td, cluster);
+ if (cluster.getUnassignedExecutors(td).size() > 0) {
+ LOG.debug("adding td: {} to pending queue", td.getName());
+ this.userMap.get(topologySubmitter).addTopologyToPendingQueue(td);
} else {
- this.userMap.get(topologySubmitter).addTopologyToRunningQueue(td, cluster);
+ LOG.debug("adding td: {} to running queue with existing status: {}", td.getName(), cluster.getStatusMap().get(td.getId()));
+ this.userMap.get(topologySubmitter).addTopologyToRunningQueue(td);
+ if (cluster.getStatusMap().get(td.getId()) == null || cluster.getStatusMap().get(td.getId()).equals("")) {
+ cluster.setStatus(td.getId(), "Fully Scheduled");
+ }
}
}
}
private void initialize(Topologies topologies, Cluster cluster) {
- initUsers(topologies, cluster);
this.cluster = cluster;
this.topologies = topologies;
this.nodes = new RAS_Nodes(this.cluster, this.topologies);
+ initUsers(topologies, cluster);
}
/**
@@ -403,30 +395,36 @@ public class ResourceAwareScheduler implements IScheduler {
}
private SchedulingState checkpointSchedulingState() {
- LOG.info("checkpointing scheduling state...");
- LOG.info("/*********Checkpoint************/");
+ LOG.debug("/*********Checkpoint scheduling state************/");
for (User user : this.getUserMap().values()) {
- LOG.info(user.getDetailedInfo());
+ LOG.debug(user.getDetailedInfo());
}
- LOG.info("/*********End************/");
+ LOG.debug(ResourceUtils.printScheduling(this.cluster, this.topologies));
+ LOG.debug("nodes:\n{}", this.nodes);
+ LOG.debug("/*********End************/");
return new SchedulingState(this.userMap, this.cluster, this.topologies, this.nodes, this.conf);
}
private void restoreCheckpointSchedulingState(SchedulingState schedulingState) {
- LOG.info("restoring scheduling state...");
- LOG.info("/*********Before************/");
- for (User user : this.getUserMap().values()) {
- LOG.info(user.getDetailedInfo());
- }
- this.cluster = schedulingState.cluster;
+ LOG.debug("/*********restoring scheduling state************/");
+ //reseting cluster
+ //Cannot simply set this.cluster=schedulingState.cluster since clojure is immutable
+ this.cluster.setAssignments(schedulingState.cluster.getAssignments());
+ this.cluster.setSupervisorsResourcesMap(schedulingState.cluster.getSupervisorsResourcesMap());
+ this.cluster.setStatusMap(schedulingState.cluster.getStatusMap());
+ this.cluster.setResourcesMap(schedulingState.cluster.getResourcesMap());
+ //don't need to explicitly set data structues like Cluster since nothing can really be changed
+ //unless this.topologies is set to another object
this.topologies = schedulingState.topologies;
this.conf = schedulingState.conf;
this.userMap = schedulingState.userMap;
this.nodes = schedulingState.nodes;
- LOG.info("/*********After************/");
+
for (User user : this.getUserMap().values()) {
- LOG.info(user.getDetailedInfo());
+ LOG.debug(user.getDetailedInfo());
}
- LOG.info("/*********End************/");
+ LOG.debug(ResourceUtils.printScheduling(cluster, topologies));
+ LOG.debug("nodes:\n{}", this.nodes);
+ LOG.debug("/*********End************/");
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/3e832205/storm-core/src/jvm/backtype/storm/scheduler/resource/User.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/User.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/User.java
index 2d7c79f..8542120 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/resource/User.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/User.java
@@ -58,20 +58,26 @@ public class User {
if (resourcePool != null) {
this.resourcePool.putAll(resourcePool);
}
+ if (this.resourcePool.get("cpu") == null) {
+ this.resourcePool.put("cpu", 0.0);
+ }
+ if (this.resourcePool.get("memory") == null) {
+ this.resourcePool.put("memory", 0.0);
+ }
}
public User getCopy() {
User newUser = new User(this.userId, this.resourcePool);
- for(TopologyDetails topo : this.pendingQueue) {
+ for (TopologyDetails topo : this.pendingQueue) {
newUser.addTopologyToPendingQueue(topo);
}
- for(TopologyDetails topo : this.runningQueue) {
+ for (TopologyDetails topo : this.runningQueue) {
newUser.addTopologyToRunningQueue(topo);
}
- for(TopologyDetails topo : this.attemptedQueue) {
+ for (TopologyDetails topo : this.attemptedQueue) {
newUser.addTopologyToAttemptedQueue(topo);
}
- for(TopologyDetails topo : this.invalidQueue) {
+ for (TopologyDetails topo : this.invalidQueue) {
newUser.addTopologyToInvalidQueue(topo);
}
return newUser;
@@ -134,6 +140,7 @@ public class User {
ret.addAll(this.invalidQueue);
return ret;
}
+
public Map<String, Number> getResourcePool() {
if (this.resourcePool != null) {
return new HashMap<String, Number>(this.resourcePool);
@@ -190,7 +197,7 @@ public class User {
private void moveTopology(TopologyDetails topo, Set<TopologyDetails> src, String srcName, Set<TopologyDetails> dest, String destName) {
- LOG.info("For User {} Moving topo {} from {} to {}", this.userId, topo.getName(), srcName, destName);
+ LOG.debug("For User {} Moving topo {} from {} to {}", this.userId, topo.getName(), srcName, destName);
if (topo == null) {
return;
}
@@ -204,49 +211,49 @@ public class User {
}
src.remove(topo);
dest.add(topo);
- LOG.info("SRC: {}", src);
- LOG.info("DEST: {}", dest);
}
- public Double getResourcePoolAverageUtilization() {
- List<Double> resourceUilitzationList = new LinkedList<Double>();
+ public double getResourcePoolAverageUtilization() {
Double cpuResourcePoolUtilization = this.getCPUResourcePoolUtilization();
Double memoryResourcePoolUtilization = this.getMemoryResourcePoolUtilization();
if (cpuResourcePoolUtilization != null && memoryResourcePoolUtilization != null) {
- return (cpuResourcePoolUtilization + memoryResourcePoolUtilization) / 2.0;
+ //cannot be (cpuResourcePoolUtilization + memoryResourcePoolUtilization)/2
+ //since memoryResourcePoolUtilization or cpuResourcePoolUtilization can be Double.MAX_VALUE
+ //Should not return infinity in that case
+ return ((cpuResourcePoolUtilization) / 2.0) + ((memoryResourcePoolUtilization) / 2.0);
}
return Double.MAX_VALUE;
}
- public Double getCPUResourcePoolUtilization() {
+ public double getCPUResourcePoolUtilization() {
Double cpuGuarantee = this.resourcePool.get("cpu");
- if (cpuGuarantee != null) {
- return this.getCPUResourceUsedByUser() / cpuGuarantee;
+ if (cpuGuarantee == null || cpuGuarantee == 0.0) {
+ return Double.MAX_VALUE;
}
- return null;
+ return this.getCPUResourceUsedByUser() / cpuGuarantee;
}
- public Double getMemoryResourcePoolUtilization() {
+ public double getMemoryResourcePoolUtilization() {
Double memoryGuarantee = this.resourcePool.get("memory");
- if (memoryGuarantee != null) {
- return this.getMemoryResourceUsedByUser() / memoryGuarantee;
+ if (memoryGuarantee == null || memoryGuarantee == 0.0) {
+ return Double.MAX_VALUE;
}
- return null;
+ return this.getMemoryResourceUsedByUser() / memoryGuarantee;
}
- public Double getCPUResourceUsedByUser() {
- Double sum = 0.0;
+ public double getCPUResourceUsedByUser() {
+ double sum = 0.0;
for (TopologyDetails topo : this.runningQueue) {
sum += topo.getTotalRequestedCpu();
}
return sum;
}
- public Double getMemoryResourceUsedByUser() {
- Double sum = 0.0;
+ public double getMemoryResourceUsedByUser() {
+ double sum = 0.0;
for (TopologyDetails topo : this.runningQueue) {
sum += topo.getTotalRequestedMemOnHeap() + topo.getTotalRequestedMemOffHeap();
}
@@ -271,7 +278,6 @@ public class User {
}
public boolean hasTopologyNeedSchedule() {
- //return (!this.pendingQueue.isEmpty() && (this.pendingQueue.size() - this.attemptedQueue.size()) > 0);
return (!this.pendingQueue.isEmpty());
}
http://git-wip-us.apache.org/repos/asf/storm/blob/3e832205/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/eviction/DefaultEvictionStrategy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/eviction/DefaultEvictionStrategy.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/eviction/DefaultEvictionStrategy.java
index 7ca7ac3..f0401ce 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/eviction/DefaultEvictionStrategy.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/eviction/DefaultEvictionStrategy.java
@@ -50,54 +50,50 @@ public class DefaultEvictionStrategy implements IEvictionStrategy {
@Override
public boolean makeSpaceForTopo(TopologyDetails td) {
- LOG.info("attempting to make space for topo {} from user {}", td.getName(), td.getTopologySubmitter());
+ LOG.debug("attempting to make space for topo {} from user {}", td.getName(), td.getTopologySubmitter());
User submitter = this.userMap.get(td.getTopologySubmitter());
if (submitter.getCPUResourceGuaranteed() == null || submitter.getMemoryResourceGuaranteed() == null) {
return false;
}
-
double cpuNeeded = td.getTotalRequestedCpu() / submitter.getCPUResourceGuaranteed();
double memoryNeeded = (td.getTotalRequestedMemOffHeap() + td.getTotalRequestedMemOnHeap()) / submitter.getMemoryResourceGuaranteed();
+ User evictUser = this.findUserWithMostResourcesAboveGuarantee();
//user has enough resource under his or her resource guarantee to schedule topology
if ((1.0 - submitter.getCPUResourcePoolUtilization()) >= cpuNeeded && (1.0 - submitter.getMemoryResourcePoolUtilization()) >= memoryNeeded) {
- User evictUser = this.findUserWithMostResourcesAboveGuarantee();
- if (evictUser == null) {
- LOG.info("Cannot make space for topology {} from user {}", td.getName(), submitter.getId());
- submitter.moveTopoFromPendingToAttempted(td, this.cluster);
+ if (evictUser != null) {
- return false;
+ TopologyDetails topologyEvict = evictUser.getRunningTopologyWithLowestPriority();
+ evictTopology(topologyEvict);
+ return true;
}
- TopologyDetails topologyEvict = evictUser.getRunningTopologyWithLowestPriority();
- LOG.info("topology to evict: {}", topologyEvict);
- evictTopology(topologyEvict);
-
- LOG.info("Resources After eviction:\n{}", this.nodes);
-
- return true;
} else {
-
- if ((1.0 - submitter.getCPUResourcePoolUtilization()) < cpuNeeded) {
-
+ if (evictUser != null) {
+ if ((evictUser.getResourcePoolAverageUtilization() - 1.0) > (cpuNeeded + (submitter.getResourcePoolAverageUtilization() - 1.0))) {
+ TopologyDetails topologyEvict = evictUser.getRunningTopologyWithLowestPriority();
+ evictTopology(topologyEvict);
+ return true;
+ }
}
-
- if ((1.0 - submitter.getMemoryResourcePoolUtilization()) < memoryNeeded) {
-
+ }
+ //See if there is a lower priority topology that can be evicted from the current user
+ for (TopologyDetails topo : submitter.getTopologiesRunning()) {
+ //check to if there is a topology with a lower priority we can evict
+ if (topo.getTopologyPriority() > td.getTopologyPriority()) {
+ evictTopology(topo);
+ return true;
}
- return false;
-
}
+ return false;
}
private void evictTopology(TopologyDetails topologyEvict) {
Collection<WorkerSlot> workersToEvict = this.cluster.getUsedSlotsByTopologyId(topologyEvict.getId());
User submitter = this.userMap.get(topologyEvict.getTopologySubmitter());
- LOG.info("Evicting Topology {} with workers: {}", topologyEvict.getName(), workersToEvict);
- LOG.debug("From Nodes:\n{}", ResourceUtils.printScheduling(this.nodes));
+ LOG.info("Evicting Topology {} with workers: {} from user {}", topologyEvict.getName(), workersToEvict, topologyEvict.getTopologySubmitter());
this.nodes.freeSlots(workersToEvict);
submitter.moveTopoFromRunningToPending(topologyEvict, this.cluster);
- LOG.info("check if topology unassigned: {}", this.cluster.getUsedSlotsByTopologyId(topologyEvict.getId()));
}
private User findUserWithMostResourcesAboveGuarantee() {
http://git-wip-us.apache.org/repos/asf/storm/blob/3e832205/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/priority/DefaultSchedulingPriorityStrategy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/priority/DefaultSchedulingPriorityStrategy.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/priority/DefaultSchedulingPriorityStrategy.java
index 5096fd6..990ccd6 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/priority/DefaultSchedulingPriorityStrategy.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/priority/DefaultSchedulingPriorityStrategy.java
@@ -28,7 +28,7 @@ import org.slf4j.LoggerFactory;
import java.util.Map;
-public class DefaultSchedulingPriorityStrategy implements ISchedulingPriorityStrategy{
+public class DefaultSchedulingPriorityStrategy implements ISchedulingPriorityStrategy {
private static final Logger LOG = LoggerFactory
.getLogger(DefaultSchedulingPriorityStrategy.class);
@@ -58,20 +58,15 @@ public class DefaultSchedulingPriorityStrategy implements ISchedulingPrioritySt
Double least = Double.POSITIVE_INFINITY;
User ret = null;
for (User user : this.userMap.values()) {
- LOG.info("getNextUser {}", user.getDetailedInfo());
- LOG.info("hasTopologyNeedSchedule: {}", user.hasTopologyNeedSchedule());
if (user.hasTopologyNeedSchedule()) {
Double userResourcePoolAverageUtilization = user.getResourcePoolAverageUtilization();
- if(ret!=null) {
- LOG.info("current: {}-{} compareUser: {}-{}", ret.getId(), least, user.getId(), userResourcePoolAverageUtilization);
- LOG.info("{} == {}: {}", least, userResourcePoolAverageUtilization, least == userResourcePoolAverageUtilization);
- LOG.info("{} == {}: {}", least, userResourcePoolAverageUtilization, (Math.abs(least - userResourcePoolAverageUtilization) < 0.0001));
- }
if (least > userResourcePoolAverageUtilization) {
ret = user;
least = userResourcePoolAverageUtilization;
- } else if (Math.abs(least - userResourcePoolAverageUtilization) < 0.0001) {
+ }
+ // if ResourcePoolAverageUtilization is equal to the user that is being compared
+ else if (Math.abs(least - userResourcePoolAverageUtilization) < 0.0001) {
double currentCpuPercentage = ret.getCPUResourceGuaranteed() / this.cluster.getClusterTotalCPUResource();
double currentMemoryPercentage = ret.getMemoryResourceGuaranteed() / this.cluster.getClusterTotalMemoryResource();
double currentAvgPercentage = (currentCpuPercentage + currentMemoryPercentage) / 2.0;
@@ -79,7 +74,6 @@ public class DefaultSchedulingPriorityStrategy implements ISchedulingPrioritySt
double userCpuPercentage = user.getCPUResourceGuaranteed() / this.cluster.getClusterTotalCPUResource();
double userMemoryPercentage = user.getMemoryResourceGuaranteed() / this.cluster.getClusterTotalMemoryResource();
double userAvgPercentage = (userCpuPercentage + userMemoryPercentage) / 2.0;
- LOG.info("current: {}-{} compareUser: {}-{}", ret.getId(), currentAvgPercentage, user.getId(), userAvgPercentage);
if (userAvgPercentage > currentAvgPercentage) {
ret = user;
least = userResourcePoolAverageUtilization;
http://git-wip-us.apache.org/repos/asf/storm/blob/3e832205/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/priority/ISchedulingPriorityStrategy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/priority/ISchedulingPriorityStrategy.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/priority/ISchedulingPriorityStrategy.java
index 7e92b3d..a5b0ff5 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/priority/ISchedulingPriorityStrategy.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/priority/ISchedulingPriorityStrategy.java
@@ -34,7 +34,7 @@ public interface ISchedulingPriorityStrategy {
public void prepare(Topologies topologies, Cluster cluster, Map<String, User> userMap, RAS_Nodes nodes);
/**
- *
+ * Gets the next topology to schedule
* @return return the next topology to schedule. If there is no topologies left to schedule, return null
*/
public TopologyDetails getNextTopologyToSchedule();
http://git-wip-us.apache.org/repos/asf/storm/blob/3e832205/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
index 1950858..75cc5eb 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
@@ -170,10 +170,10 @@ public class DefaultResourceAwareStrategy implements IStrategy {
LOG.error("Not all executors successfully scheduled: {}",
executorsNotScheduled);
schedulerAssignmentMap = null;
- result = SchedulingResult.failure(SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES, "Not all executors successfully scheduled: " + executorsNotScheduled);
+ result = SchedulingResult.failure(SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES, (td.getExecutors().size() - unassignedExecutors.size()) + "/" + td.getExecutors().size() + " executors scheduled");
} else {
LOG.debug("All resources successfully scheduled!");
- result = SchedulingResult.success(schedulerAssignmentMap);
+ result = SchedulingResult.successWithMsg(schedulerAssignmentMap, "Fully Scheduled by DefaultResourceAwareStrategy");
}
if (schedulerAssignmentMap == null) {
LOG.error("Topology {} not successfully scheduled!", td.getId());
http://git-wip-us.apache.org/repos/asf/storm/blob/3e832205/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/scheduling/IStrategy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/scheduling/IStrategy.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/scheduling/IStrategy.java
index 12e8ff3..bb2e955 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/scheduling/IStrategy.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/scheduling/IStrategy.java
@@ -37,7 +37,18 @@ import backtype.storm.scheduler.resource.User;
*/
public interface IStrategy {
+ /**
+ * initialize prior to scheduling
+ */
public void prepare(Topologies topologies, Cluster cluster, Map<String, User> userMap, RAS_Nodes nodes);
+ /**
+ * This method is invoked to calcuate a scheduling for topology td
+ * @param td
+ * @return returns a SchedulingResult object containing SchedulingStatus object to indicate whether scheduling is successful
+ * The strategy must calculate a scheduling in the format of Map<WorkerSlot, Collection<ExecutorDetails>> where the key of
+ * this map is the worker slot that the value (collection of executors) should be assigned to. if a scheduling is calculated successfully,
+ * put the scheduling map in the SchedulingResult object.
+ */
public SchedulingResult schedule(TopologyDetails td);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/3e832205/storm-core/src/jvm/backtype/storm/validation/ConfigValidation.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/validation/ConfigValidation.java b/storm-core/src/jvm/backtype/storm/validation/ConfigValidation.java
index b82a4ec..57c9f40 100644
--- a/storm-core/src/jvm/backtype/storm/validation/ConfigValidation.java
+++ b/storm-core/src/jvm/backtype/storm/validation/ConfigValidation.java
@@ -507,14 +507,14 @@ public class ConfigValidation {
@Override
public void validateField(String name, Object o) {
- if(o == null) {
+ if (o == null) {
return;
}
SimpleTypeValidator.validateField(name, Map.class, o);
- if(!((Map) o).containsKey("cpu") ) {
- throw new IllegalArgumentException( "Field " + name + " must have map entry with key: cpu");
+ if (!((Map) o).containsKey("cpu")) {
+ throw new IllegalArgumentException("Field " + name + " must have map entry with key: cpu");
}
- if(!((Map) o).containsKey("memory") ) {
+ if (!((Map) o).containsKey("memory")) {
throw new IllegalArgumentException("Field " + name + " must have map entry with key: memory");
}
@@ -533,13 +533,13 @@ public class ConfigValidation {
@Override
public void validateField(String name, Object o) {
- if(o == null) {
+ if (o == null) {
return;
}
SimpleTypeValidator.validateField(name, String.class, o);
try {
Class objectClass = Class.forName((String) o);
- if(!this.classImplements.isAssignableFrom(objectClass)) {
+ if (!this.classImplements.isAssignableFrom(objectClass)) {
throw new IllegalArgumentException("Field " + name + " with value " + o + " does not implement " + this.classImplements.getName());
}
} catch (ClassNotFoundException e) {