You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by je...@apache.org on 2016/01/19 17:52:56 UTC
[2/5] storm git commit: adding an additional test and cleaning up
adding an additional test and cleaning up
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/68836690
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/68836690
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/68836690
Branch: refs/heads/master
Commit: 688366902d98a7fdc17d854a33ff975c6a60ccab
Parents: 829ea11
Author: Boyang Jerry Peng <je...@yahoo-inc.com>
Authored: Wed Jan 13 14:13:54 2016 -0600
Committer: Boyang Jerry Peng <je...@yahoo-inc.com>
Committed: Thu Jan 14 14:25:07 2016 -0600
----------------------------------------------------------------------
.../src/clj/org/apache/storm/daemon/nimbus.clj | 5 +-
.../jvm/org/apache/storm/scheduler/Cluster.java | 54 ++++-
.../apache/storm/scheduler/TopologyDetails.java | 2 +-
.../org/apache/storm/scheduler/WorkerSlot.java | 10 +-
.../scheduler/resource/ClusterStateData.java | 4 +-
.../storm/scheduler/resource/RAS_Node.java | 141 +++++------
.../storm/scheduler/resource/RAS_Nodes.java | 48 ----
.../resource/ResourceAwareScheduler.java | 43 +++-
.../scheduler/resource_aware_scheduler_test.clj | 8 +-
.../resource/TestResourceAwareScheduler.java | 240 +++++++++++++++++++
10 files changed, 388 insertions(+), 167 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/68836690/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
index 99260d9..0555ed9 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
@@ -847,12 +847,12 @@
;; call scheduler.schedule to schedule all the topologies
;; the new assignments for all the topologies are in the cluster object.
_ (.schedule (:scheduler nimbus) topologies cluster)
- _ (.setResourcesMap cluster @(:id->resources nimbus))
+ _ (.setTopologyResourcesMap cluster @(:id->resources nimbus))
_ (if-not (conf SCHEDULER-DISPLAY-RESOURCE) (.updateAssignedMemoryForTopologyAndSupervisor cluster topologies))
;;merge with existing statuses
_ (reset! (:id->sched-status nimbus) (merge (deref (:id->sched-status nimbus)) (.getStatusMap cluster)))
_ (reset! (:node-id->resources nimbus) (.getSupervisorsResourcesMap cluster))
- _ (reset! (:id->resources nimbus) (.getResourcesMap cluster))]
+ _ (reset! (:id->resources nimbus) (.getTopologyResourcesMap cluster))]
(.getAssignments cluster)))
(defn changed-executors [executor->node+port new-executor->node+port]
@@ -913,7 +913,6 @@
existing-assignments
topologies
scratch-topology-id)
-
topology->executor->node+port (compute-new-topology->executor->node+port new-scheduler-assignments existing-assignments)
topology->executor->node+port (merge (into {} (for [id assigned-topology-ids] {id nil})) topology->executor->node+port)
http://git-wip-us.apache.org/repos/asf/storm/blob/68836690/storm-core/src/jvm/org/apache/storm/scheduler/Cluster.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/Cluster.java b/storm-core/src/jvm/org/apache/storm/scheduler/Cluster.java
index 3650df2..4ac4eaf 100644
--- a/storm-core/src/jvm/org/apache/storm/scheduler/Cluster.java
+++ b/storm-core/src/jvm/org/apache/storm/scheduler/Cluster.java
@@ -57,7 +57,7 @@ public class Cluster {
/**
* key topologyId, Value: requested and assigned resources (e.g., on-heap/off-heap mem, cpu) for each topology.
*/
- private Map<String, Double[]> resources;
+ private Map<String, Double[]> topologyResources;
/**
* a map from hostname to supervisor id.
@@ -76,7 +76,7 @@ public class Cluster {
this.assignments = new HashMap<String, SchedulerAssignmentImpl>(assignments.size());
this.assignments.putAll(assignments);
this.status = new HashMap<String, String>();
- this.resources = new HashMap<String, Double[]>();
+ this.topologyResources = new HashMap<String, Double[]>();
this.supervisorsResources = new HashMap<String, Double[]>();
this.hostToId = new HashMap<String, List<String>>();
for (Map.Entry<String, SupervisorDetails> entry : supervisors.entrySet()) {
@@ -617,13 +617,13 @@ public class Cluster {
supervisorToAssignedMem.put(nodeId, assignedMemPerSlot);
}
}
- if (this.getResourcesMap().containsKey(topId)) {
- Double[] topo_resources = getResourcesMap().get(topId);
+ if (this.getTopologyResourcesMap().containsKey(topId)) {
+ Double[] topo_resources = getTopologyResourcesMap().get(topId);
topo_resources[3] = assignedMemForTopology;
} else {
Double[] topo_resources = {0.0, 0.0, 0.0, 0.0, 0.0, 0.0};
topo_resources[3] = assignedMemForTopology;
- this.setResources(topId, topo_resources);
+ this.setTopologyResources(topId, topo_resources);
}
}
@@ -662,22 +662,50 @@ public class Cluster {
this.status.putAll(statusMap);
}
- public void setResources(String topologyId, Double[] resources) {
- this.resources.put(topologyId, resources);
+ /**
+ * Set the amount of resources used used by a topology. Used for displaying resource information on the UI
+ * @param topologyId
+ * @param resources describes the resources requested and assigned to topology in the following format in an array:
+ * {requestedMemOnHeap, requestedMemOffHeap, requestedCpu, assignedMemOnHeap, assignedMemOffHeap, assignedCpu}
+ */
+ public void setTopologyResources(String topologyId, Double[] resources) {
+ this.topologyResources.put(topologyId, resources);
}
- public void setResourcesMap(Map<String, Double[]> topologies_resources) {
- this.resources.putAll(topologies_resources);
+ /**
+ * Set the amount of resources used used by a topology. Used for displaying resource information on the UI
+ * @param topologyResources a map that contains multiple topologies and the resources the topology requested and assigned.
+ * Key: topology id Value: an array that describes the resources the topology requested and assigned in the following format:
+ * {requestedMemOnHeap, requestedMemOffHeap, requestedCpu, assignedMemOnHeap, assignedMemOffHeap, assignedCpu}
+ */
+ public void setTopologyResourcesMap(Map<String, Double[]> topologyResources) {
+ this.topologyResources.putAll(topologyResources);
}
- public Map<String, Double[]> getResourcesMap() {
- return this.resources;
+ /**
+ * Get the amount of resources used by topologies. Used for displaying resource information on the UI
+ * @return a map that contains multiple topologies and the resources the topology requested and assigned.
+ * Key: topology id Value: an array that describes the resources the topology requested and assigned in the following format:
+ * {requestedMemOnHeap, requestedMemOffHeap, requestedCpu, assignedMemOnHeap, assignedMemOffHeap, assignedCpu}
+ */
+ public Map<String, Double[]> getTopologyResourcesMap() {
+ return this.topologyResources;
}
- public void setSupervisorsResourcesMap(Map<String, Double[]> supervisors_resources) {
- this.supervisorsResources.putAll(supervisors_resources);
+ /**
+ * Sets the amount of used and free resources on a supervisor. Used for displaying resource information on the UI
+ * @param supervisorResources a map where the key is the supervisor id and the value is a map that represents
+ * resource usage for a supervisor in the following format: {totalMem, totalCpu, usedMem, usedCpu}
+ */
+ public void setSupervisorsResourcesMap(Map<String, Double[]> supervisorResources) {
+ this.supervisorsResources.putAll(supervisorResources);
}
+ /**
+ * Get the amount of used and free resources on a supervisor. Used for displaying resource information on the UI
+ * @return a map where the key is the supervisor id and the value is a map that represents
+ * resource usage for a supervisor in the following format: {totalMem, totalCpu, usedMem, usedCpu}
+ */
public Map<String, Double[]> getSupervisorsResourcesMap() {
return this.supervisorsResources;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/68836690/storm-core/src/jvm/org/apache/storm/scheduler/TopologyDetails.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/TopologyDetails.java b/storm-core/src/jvm/org/apache/storm/scheduler/TopologyDetails.java
index ea5bfad..a0eb4ad 100644
--- a/storm-core/src/jvm/org/apache/storm/scheduler/TopologyDetails.java
+++ b/storm-core/src/jvm/org/apache/storm/scheduler/TopologyDetails.java
@@ -420,7 +420,7 @@ public class TopologyDetails {
/**
* Add default resource requirements for a executor
*/
- public void addDefaultResforExec(ExecutorDetails exec) {
+ private void addDefaultResforExec(ExecutorDetails exec) {
Double topologyComponentCpuPcorePercent = Utils.getDouble(this.topologyConf.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT), null);
Double topologyComponentResourcesOffheapMemoryMb = Utils.getDouble(this.topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB), null);
Double topologyComponentResourcesOnheapMemoryMb = Utils.getDouble(this.topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB), null);
http://git-wip-us.apache.org/repos/asf/storm/blob/68836690/storm-core/src/jvm/org/apache/storm/scheduler/WorkerSlot.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/WorkerSlot.java b/storm-core/src/jvm/org/apache/storm/scheduler/WorkerSlot.java
index 57cd9c5..423764d 100644
--- a/storm-core/src/jvm/org/apache/storm/scheduler/WorkerSlot.java
+++ b/storm-core/src/jvm/org/apache/storm/scheduler/WorkerSlot.java
@@ -18,14 +18,14 @@
package org.apache.storm.scheduler;
public class WorkerSlot {
- protected String nodeId;
- protected int port;
+ private String nodeId;
+ private int port;
// amount of on-heap memory allocated to it
- protected double memOnHeap = 0.0;
+ private double memOnHeap = 0.0;
// amount of off-heap memory allocated to it
- protected double memOffHeap = 0.0;
+ private double memOffHeap = 0.0;
// amount of cpu allocated to it
- protected double cpu = 0.0;
+ private double cpu = 0.0;
public WorkerSlot(String nodeId, Number port) {
this(nodeId, port, 0.0, 0.0, 0.0);
http://git-wip-us.apache.org/repos/asf/storm/blob/68836690/storm-core/src/jvm/org/apache/storm/scheduler/resource/ClusterStateData.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/resource/ClusterStateData.java b/storm-core/src/jvm/org/apache/storm/scheduler/resource/ClusterStateData.java
index 0fffe93..ece2800 100644
--- a/storm-core/src/jvm/org/apache/storm/scheduler/resource/ClusterStateData.java
+++ b/storm-core/src/jvm/org/apache/storm/scheduler/resource/ClusterStateData.java
@@ -30,7 +30,7 @@ import java.util.List;
import java.util.Map;
/**
- * A class to specify which data and API to expose to a scheduling strategy
+ * A class to specify which data and API to expose to a scheduling strategy
*/
public class ClusterStateData {
@@ -86,7 +86,7 @@ public class ClusterStateData {
this.cluster = cluster;
this.topologies = topologies;
Map<String, RAS_Node> nodes = RAS_Nodes.getAllNodesFrom(cluster, topologies);
- for(Map.Entry<String, RAS_Node> entry : nodes.entrySet()) {
+ for (Map.Entry<String, RAS_Node> entry : nodes.entrySet()) {
this.nodes.put(entry.getKey(), new NodeDetails(entry.getValue()));
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/68836690/storm-core/src/jvm/org/apache/storm/scheduler/resource/RAS_Node.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/resource/RAS_Node.java b/storm-core/src/jvm/org/apache/storm/scheduler/resource/RAS_Node.java
index 3e10085..e33122b 100644
--- a/storm-core/src/jvm/org/apache/storm/scheduler/resource/RAS_Node.java
+++ b/storm-core/src/jvm/org/apache/storm/scheduler/resource/RAS_Node.java
@@ -64,18 +64,10 @@ public class RAS_Node {
public RAS_Node(String nodeId, SupervisorDetails sup, Cluster cluster, Topologies topologies, Map<String, WorkerSlot> workerIdToWorker, Map<String, Map<String, Collection<ExecutorDetails>>> assignmentMap) {
//Node ID and supervisor ID are the same.
_nodeId = nodeId;
- _isAlive = !cluster.isBlackListed(_nodeId);
-
- //check if node is alive
- if (_isAlive && sup != null) {
- _hostname = sup.getHost();
- _sup = sup;
- _availMemory = getTotalMemoryResources();
- _availCPU = getTotalCpuResources();
-
- LOG.debug("Found a {} Node {} {}",
- _isAlive ? "living" : "dead", _nodeId, sup.getAllPorts());
- LOG.debug("resources_mem: {}, resources_CPU: {}", sup.getTotalMemory(), sup.getTotalCPU());
+ if (sup == null) {
+ _isAlive = false;
+ } else {
+ _isAlive = !cluster.isBlackListed(_nodeId);
}
_cluster = cluster;
@@ -90,6 +82,45 @@ public class RAS_Node {
if (assignmentMap != null) {
_topIdToUsedSlots = assignmentMap;
}
+
+ //check if node is alive
+ if (_isAlive && sup != null) {
+ _hostname = sup.getHost();
+ _sup = sup;
+ _availMemory = getTotalMemoryResources();
+ _availCPU = getTotalCpuResources();
+
+ LOG.debug("Found a {} Node {} {}",
+ _isAlive ? "living" : "dead", _nodeId, sup.getAllPorts());
+ LOG.debug("resources_mem: {}, resources_CPU: {}", sup.getTotalMemory(), sup.getTotalCPU());
+ //intialize resource usages on node
+ intializeResources();
+ }
+ }
+
+ /**
+ * intializes resource usages on node
+ */
+ private void intializeResources() {
+ for (Entry<String, Map<String, Collection<ExecutorDetails>>> entry : _topIdToUsedSlots.entrySet()) {
+ String topoId = entry.getKey();
+ Map<String, Collection<ExecutorDetails>> assignment = entry.getValue();
+ Map<ExecutorDetails, Double> topoMemoryResourceList = _topologies.getById(topoId).getTotalMemoryResourceList();
+ for (Collection<ExecutorDetails> execs : assignment.values()) {
+ for (ExecutorDetails exec : execs) {
+ if (!_isAlive) {
+ continue;
+ // We do not free the assigned slots (the orphaned slots) on the inactive supervisors
+ // The inactive node will be treated as a 0-resource node and not available for other unassigned workers
+ }
+ if (topoMemoryResourceList.containsKey(exec)) {
+ consumeResourcesforTask(exec, _topologies.getById(topoId));
+ } else {
+ throw new IllegalStateException("Executor " + exec + "not found!");
+ }
+ }
+ }
+ }
}
public String getId() {
@@ -109,7 +140,7 @@ public class RAS_Node {
}
public Collection<String> getFreeSlotsId() {
- if(!_isAlive) {
+ if (!_isAlive) {
return new HashSet<String>();
}
Collection<String> usedSlotsId = getUsedSlotsId();
@@ -200,22 +231,21 @@ public class RAS_Node {
throw new IllegalArgumentException("Tried to free a slot " + ws + " that was already free!");
}
- //free slot
- _cluster.freeSlot(ws);
-
- //cleanup internal assignments
- _topIdToUsedSlots.get(topo.getId()).remove(ws.getId());
-
double memUsed = getMemoryUsedByWorker(ws);
double cpuUsed = getCpuUsedByWorker(ws);
freeMemory(memUsed);
freeCPU(cpuUsed);
+
+ //free slot
+ _cluster.freeSlot(ws);
+ //cleanup internal assignments
+ _topIdToUsedSlots.get(topo.getId()).remove(ws.getId());
}
private void freeMemory(double amount) {
LOG.debug("freeing {} memory on node {}...avail mem: {}", amount, getHostname(), _availMemory);
- if((_availMemory + amount) > getTotalCpuResources()) {
- LOG.warn("Freeing more memory than there exists!");
+ if((_availMemory + amount) > getTotalMemoryResources()) {
+ LOG.warn("Freeing more memory than there exists! Memory trying to free: {} Total memory on Node: {}", (_availMemory + amount), getTotalMemoryResources());
return;
}
_availMemory += amount;
@@ -223,8 +253,8 @@ public class RAS_Node {
private void freeCPU(double amount) {
LOG.debug("freeing {} CPU on node...avail CPU: {}", amount, getHostname(), _availCPU);
- if ((_availCPU + amount) > getAvailableCpuResources()) {
- LOG.warn("Freeing more CPU than there exists!");
+ if ((_availCPU + amount) > getTotalCpuResources()) {
+ LOG.warn("Freeing more CPU than there exists! CPU trying to free: {} Total CPU on Node: {}", (_availMemory + amount), getTotalCpuResources());
return;
}
_availCPU += amount;
@@ -239,6 +269,7 @@ public class RAS_Node {
return 0.0;
}
Collection<ExecutorDetails> execs = getExecutors(ws, _cluster);
+ LOG.info("getMemoryUsedByWorker executors: {}", execs);
double totalMemoryUsed = 0.0;
for (ExecutorDetails exec : execs) {
totalMemoryUsed += topo.getTotalMemReqTask(exec);
@@ -280,54 +311,6 @@ public class RAS_Node {
}
/**
- * Allocate Mem and CPU resources to the assigned slot for the topology's executors.
- * @param td the TopologyDetails that the slot is assigned to.
- * @param executors the executors to run in that slot.
- * @param slot the slot to allocate resource to
- */
-// private void allocateResourceToSlot (TopologyDetails td, Collection<ExecutorDetails> executors, WorkerSlot slot) {
-// double onHeapMem = 0.0;
-// double offHeapMem = 0.0;
-// double cpu = 0.0;
-// for (ExecutorDetails exec : executors) {
-// Double onHeapMemForExec = td.getOnHeapMemoryRequirement(exec);
-// if (onHeapMemForExec != null) {
-// onHeapMem += onHeapMemForExec;
-// }
-// Double offHeapMemForExec = td.getOffHeapMemoryRequirement(exec);
-// if (offHeapMemForExec != null) {
-// offHeapMem += offHeapMemForExec;
-// }
-// Double cpuForExec = td.getTotalCpuReqTask(exec);
-// if (cpuForExec != null) {
-// cpu += cpuForExec;
-// }
-// }
-// slot.allocateResource(onHeapMem, offHeapMem, cpu);
-// }
-
- private WorkerSlot allocateResourceToSlot (TopologyDetails td, Collection<ExecutorDetails> executors, WorkerSlot slot) {
- double onHeapMem = 0.0;
- double offHeapMem = 0.0;
- double cpu = 0.0;
- for (ExecutorDetails exec : executors) {
- Double onHeapMemForExec = td.getOnHeapMemoryRequirement(exec);
- if (onHeapMemForExec != null) {
- onHeapMem += onHeapMemForExec;
- }
- Double offHeapMemForExec = td.getOffHeapMemoryRequirement(exec);
- if (offHeapMemForExec != null) {
- offHeapMem += offHeapMemForExec;
- }
- Double cpuForExec = td.getTotalCpuReqTask(exec);
- if (cpuForExec != null) {
- cpu += cpuForExec;
- }
- }
- return new WorkerSlot(slot.getNodeId(), slot.getPort(), onHeapMem, offHeapMem, cpu);
- }
-
- /**
* Assigns a worker to a node
* @param target the worker slot to assign the executors
* @param td the topology the executors are from
@@ -350,7 +333,8 @@ public class RAS_Node {
if (!freeSlots.contains(target)) {
throw new IllegalStateException("Trying to assign already used slot" + target.getPort() + "on node " + _nodeId);
}
- target = allocateResourceToSlot(td, executors, target);
+ LOG.info("target slot: {}", target);
+
_cluster.assign(target, td.getId(), executors);
//assigning internally
@@ -364,15 +348,6 @@ public class RAS_Node {
_topIdToUsedSlots.get(td.getId()).get(target.getId()).addAll(executors);
}
- /**
- * Assign a free slot on the node to the following topology and executors.
- * @param td the TopologyDetails to assign a free slot to.
- * @param executors the executors to run in that slot.
- */
- public void assign(TopologyDetails td, Collection<ExecutorDetails> executors) {
- assign(null, td, executors);
- }
-
@Override
public boolean equals(Object other) {
if (other instanceof RAS_Node) {
@@ -487,7 +462,7 @@ public class RAS_Node {
public Double consumeMemory(Double amount) {
if (amount > _availMemory) {
LOG.error("Attempting to consume more memory than available! Needed: {}, we only have: {}", amount, _availMemory);
- return null;
+ throw new IllegalStateException("Attempting to consume more memory than available");
}
_availMemory = _availMemory - amount;
return _availMemory;
@@ -524,7 +499,7 @@ public class RAS_Node {
public Double consumeCPU(Double amount) {
if (amount > _availCPU) {
LOG.error("Attempting to consume more CPU than available! Needed: {}, we only have: {}", amount, _availCPU);
- return null;
+ throw new IllegalStateException("Attempting to consume more memory than available");
}
_availCPU = _availCPU - amount;
return _availCPU;
http://git-wip-us.apache.org/repos/asf/storm/blob/68836690/storm-core/src/jvm/org/apache/storm/scheduler/resource/RAS_Nodes.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/resource/RAS_Nodes.java b/storm-core/src/jvm/org/apache/storm/scheduler/resource/RAS_Nodes.java
index 389a63c..00795a3 100644
--- a/storm-core/src/jvm/org/apache/storm/scheduler/resource/RAS_Nodes.java
+++ b/storm-core/src/jvm/org/apache/storm/scheduler/resource/RAS_Nodes.java
@@ -98,58 +98,10 @@ public class RAS_Nodes {
nodeIdToNode.put(nodeId, new RAS_Node(nodeId, null, cluster, topologies, workerIdToWorker.get(nodeId), assignments));
}
}
-
- updateAvailableResources(cluster, topologies, nodeIdToNode);
return nodeIdToNode;
}
/**
- * updates the available resources for every node in a cluster
- * by recalculating memory requirements.
- *
- * @param cluster the cluster used in this calculation
- * @param topologies container of all topologies
- * @param nodeIdToNode a map between node id and node
- */
- private static void updateAvailableResources(Cluster cluster,
- Topologies topologies,
- Map<String, RAS_Node> nodeIdToNode) {
- //recompute memory
- if (cluster.getAssignments().size() > 0) {
- for (Map.Entry<String, SchedulerAssignment> entry : cluster.getAssignments()
- .entrySet()) {
- Map<ExecutorDetails, WorkerSlot> executorToSlot = entry.getValue()
- .getExecutorToSlot();
- Map<ExecutorDetails, Double> topoMemoryResourceList = topologies.getById(entry.getKey()).getTotalMemoryResourceList();
-
- if (topoMemoryResourceList == null || topoMemoryResourceList.size() == 0) {
- continue;
- }
- for (Map.Entry<ExecutorDetails, WorkerSlot> execToSlot : executorToSlot
- .entrySet()) {
- WorkerSlot slot = execToSlot.getValue();
- ExecutorDetails exec = execToSlot.getKey();
- RAS_Node node = nodeIdToNode.get(slot.getNodeId());
- if (!node.isAlive()) {
- continue;
- // We do not free the assigned slots (the orphaned slots) on the inactive supervisors
- // The inactive node will be treated as a 0-resource node and not available for other unassigned workers
- }
- if (topoMemoryResourceList.containsKey(exec)) {
- node.consumeResourcesforTask(exec, topologies.getById(entry.getKey()));
- } else {
- throw new IllegalStateException("Executor " + exec + "not found!");
- }
- }
- }
- } else {
- for (RAS_Node n : nodeIdToNode.values()) {
- n.setAvailableMemory(n.getAvailableMemoryResources());
- }
- }
- }
-
- /**
* get node object from nodeId
*/
public RAS_Node getNodeById(String nodeId) {
http://git-wip-us.apache.org/repos/asf/storm/blob/68836690/storm-core/src/jvm/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java b/storm-core/src/jvm/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
index a913e69..2b35d6b 100644
--- a/storm-core/src/jvm/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
+++ b/storm-core/src/jvm/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
@@ -33,7 +33,6 @@ import org.apache.storm.scheduler.Topologies;
import org.apache.storm.scheduler.TopologyDetails;
import org.apache.storm.scheduler.WorkerSlot;
-import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
@@ -121,8 +120,10 @@ public class ResourceAwareScheduler implements IScheduler {
}
scheduleTopology(td);
- LOG.info("Nodes after scheduling:{}", this.nodes);
+ LOG.debug("Nodes after scheduling:\n{}", this.nodes);
}
+ //updating resources used by supervisor
+ updateSupervisorsResources(this.cluster, this.topologies);
}
public void scheduleTopology(TopologyDetails td) {
@@ -255,12 +256,17 @@ public class ResourceAwareScheduler implements IScheduler {
WorkerSlot targetSlot = workerToTasksEntry.getKey();
Collection<ExecutorDetails> execsNeedScheduling = workerToTasksEntry.getValue();
RAS_Node targetNode = this.nodes.getNodeById(targetSlot.getNodeId());
+
+ targetSlot = allocateResourceToSlot(td, execsNeedScheduling, targetSlot);
+
targetNode.assign(targetSlot, td, execsNeedScheduling);
+
+ LOG.debug("ASSIGNMENT TOPOLOGY: {} TASKS: {} To Node: {} on Slot: {}",
+ td.getName(), execsNeedScheduling, targetNode.getHostname(), targetSlot.getPort());
+
for (ExecutorDetails exec : execsNeedScheduling) {
targetNode.consumeResourcesforTask(exec, td);
}
- LOG.debug("ASSIGNMENT TOPOLOGY: {} TASKS: {} To Node: {} on Slot: {}",
- td.getName(), execsNeedScheduling, targetNode.getHostname(), targetSlot.getPort());
if (!nodesUsed.contains(targetNode.getId())) {
nodesUsed.add(targetNode.getId());
}
@@ -271,12 +277,12 @@ public class ResourceAwareScheduler implements IScheduler {
Double[] resources = {requestedMemOnHeap, requestedMemOffHeap, requestedCpu,
assignedMemOnHeap, assignedMemOffHeap, assignedCpu};
- LOG.debug("setResources for {}: requested on-heap mem, off-heap mem, cpu: {} {} {} " +
+ LOG.debug("setTopologyResources for {}: requested on-heap mem, off-heap mem, cpu: {} {} {} " +
"assigned on-heap mem, off-heap mem, cpu: {} {} {}",
td.getId(), requestedMemOnHeap, requestedMemOffHeap, requestedCpu,
assignedMemOnHeap, assignedMemOffHeap, assignedCpu);
- this.cluster.setResources(td.getId(), resources);
- updateSupervisorsResources(this.cluster, this.topologies);
+ //updating resources used for a topology
+ this.cluster.setTopologyResources(td.getId(), resources);
return true;
} else {
LOG.warn("schedulerAssignmentMap for topo {} is null. This shouldn't happen!", td.getName());
@@ -284,6 +290,27 @@ public class ResourceAwareScheduler implements IScheduler {
}
}
+ private WorkerSlot allocateResourceToSlot (TopologyDetails td, Collection<ExecutorDetails> executors, WorkerSlot slot) {
+ double onHeapMem = 0.0;
+ double offHeapMem = 0.0;
+ double cpu = 0.0;
+ for (ExecutorDetails exec : executors) {
+ Double onHeapMemForExec = td.getOnHeapMemoryRequirement(exec);
+ if (onHeapMemForExec != null) {
+ onHeapMem += onHeapMemForExec;
+ }
+ Double offHeapMemForExec = td.getOffHeapMemoryRequirement(exec);
+ if (offHeapMemForExec != null) {
+ offHeapMem += offHeapMemForExec;
+ }
+ Double cpuForExec = td.getTotalCpuReqTask(exec);
+ if (cpuForExec != null) {
+ cpu += cpuForExec;
+ }
+ }
+ return new WorkerSlot(slot.getNodeId(), slot.getPort(), onHeapMem, offHeapMem, cpu);
+ }
+
private void updateSupervisorsResources(Cluster cluster, Topologies topologies) {
Map<String, Double[]> supervisors_resources = new HashMap<String, Double[]>();
Map<String, RAS_Node> nodes = RAS_Nodes.getAllNodesFrom(cluster, topologies);
@@ -401,7 +428,7 @@ public class ResourceAwareScheduler implements IScheduler {
this.cluster.setAssignments(schedulingState.cluster.getAssignments());
this.cluster.setSupervisorsResourcesMap(schedulingState.cluster.getSupervisorsResourcesMap());
this.cluster.setStatusMap(schedulingState.cluster.getStatusMap());
- this.cluster.setResourcesMap(schedulingState.cluster.getResourcesMap());
+ this.cluster.setTopologyResourcesMap(schedulingState.cluster.getTopologyResourcesMap());
//don't need to explicitly set data structues like Cluster since nothing can really be changed
//unless this.topologies is set to another object
this.topologies = schedulingState.topologies;
http://git-wip-us.apache.org/repos/asf/storm/blob/68836690/storm-core/test/clj/org/apache/storm/scheduler/resource_aware_scheduler_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/scheduler/resource_aware_scheduler_test.clj b/storm-core/test/clj/org/apache/storm/scheduler/resource_aware_scheduler_test.clj
index 874363d..ec51914 100644
--- a/storm-core/test/clj/org/apache/storm/scheduler/resource_aware_scheduler_test.clj
+++ b/storm-core/test/clj/org/apache/storm/scheduler/resource_aware_scheduler_test.clj
@@ -112,25 +112,25 @@
(is (= 4 (.totalSlotsFree node)))
(is (= 0 (.totalSlotsUsed node)))
(is (= 4 (.totalSlots node)))
- (.assign node topology1 (list (ExecutorDetails. 1 1)))
+ (.assign node (.next (.iterator (.getFreeSlots node))) topology1 (list (ExecutorDetails. 1 1)))
(is (= 1 (.size (.getRunningTopologies node))))
(is (= false (.isTotallyFree node)))
(is (= 3 (.totalSlotsFree node)))
(is (= 1 (.totalSlotsUsed node)))
(is (= 4 (.totalSlots node)))
- (.assign node topology1 (list (ExecutorDetails. 2 2)))
+ (.assign node (.next (.iterator (.getFreeSlots node))) topology1 (list (ExecutorDetails. 2 2)))
(is (= 1 (.size (.getRunningTopologies node))))
(is (= false (.isTotallyFree node)))
(is (= 2 (.totalSlotsFree node)))
(is (= 2 (.totalSlotsUsed node)))
(is (= 4 (.totalSlots node)))
- (.assign node topology2 (list (ExecutorDetails. 1 1)))
+ (.assign node (.next (.iterator (.getFreeSlots node))) topology2 (list (ExecutorDetails. 1 1)))
(is (= 2 (.size (.getRunningTopologies node))))
(is (= false (.isTotallyFree node)))
(is (= 1 (.totalSlotsFree node)))
(is (= 3 (.totalSlotsUsed node)))
(is (= 4 (.totalSlots node)))
- (.assign node topology2 (list (ExecutorDetails. 2 2)))
+ (.assign node (.next (.iterator (.getFreeSlots node))) topology2 (list (ExecutorDetails. 2 2)))
(is (= 2 (.size (.getRunningTopologies node))))
(is (= false (.isTotallyFree node)))
(is (= 0 (.totalSlotsFree node)))
http://git-wip-us.apache.org/repos/asf/storm/blob/68836690/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
index 1e0f6ed..fae663f 100644
--- a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
+++ b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
@@ -19,6 +19,7 @@
package org.apache.storm.scheduler.resource;
import org.apache.storm.Config;
+import org.apache.storm.generated.StormTopology;
import org.apache.storm.scheduler.Cluster;
import org.apache.storm.scheduler.ExecutorDetails;
import org.apache.storm.scheduler.INimbus;
@@ -28,8 +29,10 @@ import org.apache.storm.scheduler.SupervisorDetails;
import org.apache.storm.scheduler.Topologies;
import org.apache.storm.scheduler.TopologyDetails;
import org.apache.storm.scheduler.WorkerSlot;
+import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.utils.Utils;
import org.apache.storm.validation.ConfigValidation;
+
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
@@ -37,9 +40,12 @@ import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
import java.util.Set;
+
public class TestResourceAwareScheduler {
private final String TOPOLOGY_SUBMITTER = "jerry";
@@ -939,6 +945,76 @@ public class TestResourceAwareScheduler {
* If users are above his or her guarantee, check if topology eviction works correct
*/
@Test
+ public void Test() {
+ INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
+ Map<String, Number> resourceMap = new HashMap<String, Number>();
+ resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 200.0);
+ resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 2000.0);
+ Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(4, 4, resourceMap);
+ Config config = new Config();
+ config.putAll(Utils.readDefaultConfig());
+ config.put(Config.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY, org.apache.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy.class.getName());
+ config.put(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY, org.apache.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy.class.getName());
+ config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class.getName());
+ config.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, 100.0);
+ config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 500);
+ config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 500);
+ Map<String, Map<String, Number>> resourceUserPool = new HashMap<String, Map<String, Number>>();
+ resourceUserPool.put("jerry", new HashMap<String, Number>());
+ resourceUserPool.get("jerry").put("cpu", 70.0);
+ resourceUserPool.get("jerry").put("memory", 700.0);
+
+ resourceUserPool.put("bobby", new HashMap<String, Number>());
+ resourceUserPool.get("bobby").put("cpu", 100.0);
+ resourceUserPool.get("bobby").put("memory", 1000.0);
+
+ resourceUserPool.put("derek", new HashMap<String, Number>());
+ resourceUserPool.get("derek").put("cpu", 25.0);
+ resourceUserPool.get("derek").put("memory", 250.0);
+
+ config.put(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS, resourceUserPool);
+ Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config);
+
+ config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
+
+ TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 2, 0, currentTime - 2, 20);
+
+ config.put(Config.TOPOLOGY_SUBMITTER_USER, "bobby");
+
+ TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 2, 0, currentTime - 2, 10);
+ TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 2, 0, currentTime - 2, 10);
+
+ config.put(Config.TOPOLOGY_SUBMITTER_USER, "derek");
+
+ TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 1, 0, 2, 0, currentTime - 2, 29);
+
+ Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
+ topoMap.put(topo1.getId(), topo1);
+ topoMap.put(topo3.getId(), topo3);
+ topoMap.put(topo4.getId(), topo4);
+ topoMap.put(topo5.getId(), topo5);
+
+ Topologies topologies = new Topologies(topoMap);
+
+ ResourceAwareScheduler rs = new ResourceAwareScheduler();
+
+ rs.prepare(config);
+ rs.schedule(topologies, cluster);
+
+ LOG.info("Assignments: {}", cluster.getAssignments());
+ for (Map.Entry<String, SchedulerAssignment> entry : cluster.getAssignments().entrySet()) {
+ LOG.info("Topology id: {}", entry.getKey());
+ for(WorkerSlot target: entry.getValue().getSlots()) {
+ LOG.info("target resources onheap: {} offheap: {} cpu: {}", target.getAllocatedMemOnHeap(), target.getAllocatedMemOffHeap(), target.getAllocatedCpu());
+ }
+
+ }
+ }
+
+ /**
+ * If users are above his or her guarantee, check if topology eviction works correct
+ */
+ @Test
public void TestOverGuaranteeEviction() {
INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
Map<String, Number> resourceMap = new HashMap<String, Number>();
@@ -1083,6 +1159,15 @@ public class TestResourceAwareScheduler {
Assert.assertEquals("# of pending topologies", 0, rs.getUser("bobby").getTopologiesPending().size());
Assert.assertEquals("# of attempted topologies", 0, rs.getUser("bobby").getTopologiesAttempted().size());
Assert.assertEquals("# of invalid topologies", 0, rs.getUser("bobby").getTopologiesInvalid().size());
+
+ LOG.info("Assignments: {}", cluster.getAssignments());
+ for (Map.Entry<String, SchedulerAssignment> entry : cluster.getAssignments().entrySet()) {
+ LOG.info("Topology id: {}", entry.getKey());
+ for(WorkerSlot target: entry.getValue().getSlots()) {
+ LOG.info("target resources onheap: {} offheap: {} cpu: {}", target.getAllocatedMemOnHeap(), target.getAllocatedMemOffHeap(), target.getAllocatedCpu());
+ }
+
+ }
}
/**
@@ -1224,4 +1309,159 @@ public class TestResourceAwareScheduler {
Assert.assertEquals("# of invalid topologies", 0, rs.getUser("bobby").getTopologiesInvalid().size());
Assert.assertEquals("# of attempted topologies", 0, rs.getUser("bobby").getTopologiesAttempted().size());
}
+
+ /**
+ * test if the scheduling logic for the DefaultResourceAwareStrategy is correct
+ */
+ @Test
+ public void testDefaultResourceAwareStrategy() {
+ int spoutParallelism = 1;
+ int boltParallelism = 2;
+ TopologyBuilder builder = new TopologyBuilder();
+ builder.setSpout("spout", new TestUtilsForResourceAwareScheduler.TestSpout(),
+ spoutParallelism);
+ builder.setBolt("bolt-1", new TestUtilsForResourceAwareScheduler.TestBolt(),
+ boltParallelism).shuffleGrouping("spout");
+ builder.setBolt("bolt-2", new TestUtilsForResourceAwareScheduler.TestBolt(),
+ boltParallelism).shuffleGrouping("bolt-1");
+ builder.setBolt("bolt-3", new TestUtilsForResourceAwareScheduler.TestBolt(),
+ boltParallelism).shuffleGrouping("bolt-2");
+
+ StormTopology stormToplogy = builder.createTopology();
+
+ Config conf = new Config();
+ INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
+ Map<String, Number> resourceMap = new HashMap<String, Number>();
+ resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 150.0);
+ resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 1500.0);
+ Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(4, 4, resourceMap);
+ conf.putAll(Utils.readDefaultConfig());
+ conf.put(Config.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY, org.apache.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy.class.getName());
+ conf.put(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY, org.apache.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy.class.getName());
+ conf.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class.getName());
+ conf.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, 50.0);
+ conf.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 250);
+ conf.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 250);
+ conf.put(Config.TOPOLOGY_PRIORITY, 0);
+ conf.put(Config.TOPOLOGY_NAME, "testTopology");
+ conf.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, Double.MAX_VALUE);
+
+ TopologyDetails topo = new TopologyDetails("testTopology-id", conf, stormToplogy, 0,
+ TestUtilsForResourceAwareScheduler.genExecsAndComps(stormToplogy, spoutParallelism, boltParallelism)
+ , this.currentTime);
+
+ Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
+ topoMap.put(topo.getId(), topo);
+ Topologies topologies = new Topologies(topoMap);
+ Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), conf);
+
+ ResourceAwareScheduler rs = new ResourceAwareScheduler();
+
+ rs.prepare(conf);
+ rs.schedule(topologies, cluster);
+
+ Map<String, List<String>> nodeToComps = new HashMap<String, List<String>>();
+ for (Map.Entry<ExecutorDetails, WorkerSlot> entry : cluster.getAssignments().get("testTopology-id").getExecutorToSlot().entrySet()) {
+ WorkerSlot ws = entry.getValue();
+ ExecutorDetails exec = entry.getKey();
+ if (!nodeToComps.containsKey(ws.getNodeId())) {
+ nodeToComps.put(ws.getNodeId(), new LinkedList<String>());
+ }
+ nodeToComps.get(ws.getNodeId()).add(topo.getExecutorToComponent().get(exec));
+ }
+
+ /**
+ * check for correct scheduling
+ * Since all the resource availabilites on nodes are the same in the beginining
+ * DefaultResourceAwareStrategy can arbitrarily pick one thus we must find if a particular scheduling
+ * exists on a node the the cluster.
+ */
+
+ //one node should have the below scheduling
+ List<String> node1 = new LinkedList<>();
+ node1.add("spout");
+ node1.add("bolt-1");
+ node1.add("bolt-2");
+ Assert.assertTrue("Check DefaultResourceAwareStrategy scheduling", checkDefaultStrategyScheduling(nodeToComps, node1));
+
+ //one node should have the below scheduling
+ List<String> node2 = new LinkedList<>();
+ node2.add("bolt-3");
+ node2.add("bolt-1");
+ node2.add("bolt-2");
+
+ Assert.assertTrue("Check DefaultResourceAwareStrategy scheduling", checkDefaultStrategyScheduling(nodeToComps, node2));
+
+ //one node should have the below scheduling
+ List<String> node3 = new LinkedList<>();
+ node3.add("bolt-3");
+
+ Assert.assertTrue("Check DefaultResourceAwareStrategy scheduling", checkDefaultStrategyScheduling(nodeToComps, node3));
+
+ //three used and one node should be empty
+ Assert.assertEquals("only three nodes should be used", 3, nodeToComps.size());
+ }
+
+ private boolean checkDefaultStrategyScheduling(Map<String, List<String>> nodeToComps, List<String> schedulingToFind) {
+ for (List<String> entry : nodeToComps.values()) {
+ if (schedulingToFind.containsAll(entry) && entry.containsAll(schedulingToFind)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * test if free slots on nodes work correctly
+ */
+ @Test
+ public void TestNodeFreeSlot() {
+ INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
+ Map<String, Number> resourceMap = new HashMap<String, Number>();
+ resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 100.0);
+ resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 1000.0);
+ Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(4, 4, resourceMap);
+ Config config = new Config();
+ config.putAll(Utils.readDefaultConfig());
+ config.put(Config.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY, org.apache.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy.class.getName());
+ config.put(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY, org.apache.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy.class.getName());
+ config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class.getName());
+ config.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, 100.0);
+ config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 500);
+ config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 500);
+
+ Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config);
+ TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 2, 0, currentTime - 2, 29);
+ TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 2, 0, currentTime - 2, 10);
+
+ Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
+ topoMap.put(topo1.getId(), topo1);
+ topoMap.put(topo2.getId(), topo2);
+
+ Topologies topologies = new Topologies(topoMap);
+
+ ResourceAwareScheduler rs = new ResourceAwareScheduler();
+
+ rs.prepare(config);
+ rs.schedule(topologies, cluster);
+
+ Map<String, RAS_Node> nodes = RAS_Nodes.getAllNodesFrom(cluster, topologies);
+
+ for (SchedulerAssignment entry : cluster.getAssignments().values()) {
+ for (WorkerSlot ws : entry.getSlots()) {
+ double memoryBefore = nodes.get(ws.getNodeId()).getAvailableMemoryResources();
+ double cpuBefore = nodes.get(ws.getNodeId()).getAvailableCpuResources();
+ double memoryUsedByWorker = nodes.get(ws.getNodeId()).getMemoryUsedByWorker(ws);
+ Assert.assertEquals("Check if memory used by worker is calculated correctly", 1000.0, memoryUsedByWorker, 0.001);
+ double cpuUsedByWorker = nodes.get(ws.getNodeId()).getCpuUsedByWorker(ws);
+ Assert.assertEquals("Check if CPU used by worker is calculated correctly", 100.0, cpuUsedByWorker, 0.001);
+ nodes.get(ws.getNodeId()).free(ws);
+ double memoryAfter = nodes.get(ws.getNodeId()).getAvailableMemoryResources();
+ double cpuAfter = nodes.get(ws.getNodeId()).getAvailableCpuResources();
+ Assert.assertEquals("Check if free correctly frees amount of memory", memoryBefore + memoryUsedByWorker, memoryAfter, 0.001);
+ Assert.assertEquals("Check if free correctly frees amount of memory", cpuBefore + cpuUsedByWorker, cpuAfter, 0.001);
+ Assert.assertFalse("Check if worker was removed from assignments", entry.getSlotToExecutors().containsKey(ws));
+ }
+ }
+ }
}