You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by da...@apache.org on 2015/12/21 15:46:34 UTC
[06/23] storm git commit: adding checkpointing
adding checkpointing
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/9d3c864c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/9d3c864c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/9d3c864c
Branch: refs/heads/master
Commit: 9d3c864cdc708d62a088a1b9b23904bd6ea1b276
Parents: 3f55fee
Author: Boyang Jerry Peng <je...@yahoo-inc.com>
Authored: Wed Nov 18 11:09:45 2015 -0600
Committer: Boyang Jerry Peng <je...@yahoo-inc.com>
Committed: Fri Dec 4 13:07:00 2015 -0600
----------------------------------------------------------------------
.../jvm/backtype/storm/scheduler/Cluster.java | 8 +
.../backtype/storm/scheduler/Topologies.java | 4 +
.../storm/scheduler/resource/RAS_Node.java | 77 +++-
.../storm/scheduler/resource/RAS_Nodes.java | 29 +-
.../resource/ResourceAwareScheduler.java | 236 ++++++-----
.../storm/scheduler/resource/ResourceUtils.java | 9 +
.../backtype/storm/scheduler/resource/User.java | 52 ++-
.../storm/scheduler/resource/Experiment.java | 257 +++++++-----
.../resource/TestResourceAwareScheduler.java | 400 ++++++++++++++++++-
.../TestUtilsForResourceAwareScheduler.java | 28 ++
10 files changed, 862 insertions(+), 238 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/9d3c864c/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java b/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java
index 7f16b86..2676af1 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java
@@ -90,6 +90,14 @@ public class Cluster {
}
this.conf = storm_conf;
}
+
+ public Cluster getCopy() {
+ Cluster copy = new Cluster(this.inimbus, this.supervisors, this.assignments, this.conf);
+ for(Map.Entry<String, String> entry : this.status.entrySet()) {
+ copy.setStatus(entry.getKey(), entry.getValue());
+ }
+ return copy;
+ }
public void setBlacklistedHosts(Set<String> hosts) {
blackListedHosts = hosts;
http://git-wip-us.apache.org/repos/asf/storm/blob/9d3c864c/storm-core/src/jvm/backtype/storm/scheduler/Topologies.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/Topologies.java b/storm-core/src/jvm/backtype/storm/scheduler/Topologies.java
index 0b4c0ca..59a53c8 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/Topologies.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/Topologies.java
@@ -68,6 +68,10 @@ public class Topologies {
return _allComponents;
}
+ public Topologies getCopy() {
+ return new Topologies(this.topologies);
+ }
+
@Override
public String toString() {
String ret = "Topologies:\n";
http://git-wip-us.apache.org/repos/asf/storm/blob/9d3c864c/storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Node.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Node.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Node.java
index b0bcc8a..21367f0 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Node.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Node.java
@@ -55,9 +55,10 @@ public class RAS_Node {
private Double _availCPU;
private List<WorkerSlot> _slots;
private Cluster _cluster;
+ private Topologies _topologies;
public RAS_Node(String nodeId, Set<Integer> allPorts, boolean isAlive,
- SupervisorDetails sup, Cluster cluster) {
+ SupervisorDetails sup, Cluster cluster, Topologies topologies) {
_slots = new ArrayList<WorkerSlot>();
_nodeId = nodeId;
_isAlive = isAlive;
@@ -72,6 +73,7 @@ public class RAS_Node {
_slots.addAll(_freeSlots);
}
this._cluster = cluster;
+ this._topologies = topologies;
}
public String getId() {
@@ -185,6 +187,8 @@ public class RAS_Node {
}
for (Entry<String, Set<WorkerSlot>> entry : _topIdToUsedSlots.entrySet()) {
_cluster.freeSlots(entry.getValue());
+ _availCPU = this.getTotalCpuResources();
+ _availMemory = this.getAvailableMemoryResources();
if (_isAlive) {
_freeSlots.addAll(entry.getValue());
}
@@ -197,14 +201,19 @@ public class RAS_Node {
* @param ws the slot to free
*/
public void free(WorkerSlot ws) {
+ LOG.info("freeing ws {} on node {}", ws, _hostname);
if (_freeSlots.contains(ws)) return;
for (Entry<String, Set<WorkerSlot>> entry : _topIdToUsedSlots.entrySet()) {
Set<WorkerSlot> slots = entry.getValue();
+ double memUsed = this.getMemoryUsedByWorker(ws);
+ double cpuUsed = this.getCpuUsedByWorker(ws);
if (slots.remove(ws)) {
_cluster.freeSlot(ws);
if (_isAlive) {
_freeSlots.add(ws);
}
+ this.freeMemory(memUsed);
+ this.freeCPU(cpuUsed);
return;
}
}
@@ -223,6 +232,8 @@ public class RAS_Node {
}
for (WorkerSlot ws : slots) {
_cluster.freeSlot(ws);
+ this.freeMemory(this.getMemoryUsedByWorker(ws));
+ this.freeCPU(this.getCpuUsedByWorker(ws));
if (_isAlive) {
_freeSlots.add(ws);
}
@@ -230,6 +241,66 @@ public class RAS_Node {
_topIdToUsedSlots.remove(topId);
}
+ public void freeMemory(double amount) {
+ _availMemory += amount;
+ LOG.info("freeing {} memory...avail mem: {}", amount, _availMemory);
+ if(_availMemory > this.getTotalMemoryResources()) {
+ LOG.warn("Freeing more memory than there exists!");
+ }
+ }
+
+ public void freeCPU(double amount) {
+ _availCPU += amount;
+ LOG.info("freeing {} CPU...avail CPU: {}", amount, _availCPU);
+ if(_availCPU > this.getAvailableCpuResources()) {
+ LOG.warn("Freeing more memory than there exists!");
+ }
+ }
+
+ public double getMemoryUsedByWorker(WorkerSlot ws) {
+ TopologyDetails topo = this.findTopologyUsingWorker(ws);
+ LOG.info("Topology {} using worker {}", topo, ws);
+ if(topo == null) {
+ return 0.0;
+ }
+ Collection<ExecutorDetails> execs = this.getExecutors(ws, this._cluster);
+ LOG.info("Worker {} has execs: {}", ws, execs);
+ double totalMemoryUsed = 0.0;
+ for(ExecutorDetails exec : execs) {
+ totalMemoryUsed += topo.getTotalMemReqTask(exec);
+ }
+ return totalMemoryUsed;
+ }
+
+ public double getCpuUsedByWorker(WorkerSlot ws) {
+ TopologyDetails topo = this.findTopologyUsingWorker(ws);
+ LOG.info("Topology {} using worker {}", topo, ws);
+ if(topo == null) {
+ return 0.0;
+ }
+ Collection<ExecutorDetails> execs = this.getExecutors(ws, this._cluster);
+ LOG.info("Worker {} has execs: {}", ws, execs);
+ double totalCpuUsed = 0.0;
+ for(ExecutorDetails exec : execs) {
+ totalCpuUsed += topo.getTotalCpuReqTask(exec);
+ }
+ return totalCpuUsed;
+ }
+
+ public TopologyDetails findTopologyUsingWorker(WorkerSlot ws) {
+ for(Entry<String, Set<WorkerSlot>> entry : _topIdToUsedSlots.entrySet()) {
+ LOG.info("topoId: {} workers: {}", entry.getKey(), entry.getValue());
+ String topoId = entry.getKey();
+ Set<WorkerSlot> workers = entry.getValue();
+ for (WorkerSlot worker : workers) {
+ if(worker.getNodeId().equals(ws.getNodeId()) && worker.getPort() == ws.getPort()) {
+ return _topologies.getById(topoId);
+ }
+ }
+ }
+ return null;
+ }
+
/**
* Allocate Mem and CPU resources to the assigned slot for the topology's executors.
* @param td the TopologyDetails that the slot is assigned to.
@@ -457,4 +528,8 @@ public class RAS_Node {
this.consumeCPU(taskCpuReq);
this.consumeMemory(taskMemReq);
}
+
+ public Map<String, Set<WorkerSlot>> getTopoIdTousedSlots() {
+ return _topIdToUsedSlots;
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/9d3c864c/storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Nodes.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Nodes.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Nodes.java
index 9df1475..42fc236 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Nodes.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Nodes.java
@@ -56,7 +56,7 @@ public class RAS_Nodes {
LOG.debug("Found a {} Node {} {}",
isAlive ? "living" : "dead", id, sup.getAllPorts());
LOG.debug("resources_mem: {}, resources_CPU: {}", sup.getTotalMemory(), sup.getTotalCPU());
- nodeIdToNode.put(sup.getId(), new RAS_Node(id, sup.getAllPorts(), isAlive, sup, cluster));
+ nodeIdToNode.put(sup.getId(), new RAS_Node(id, sup.getAllPorts(), isAlive, sup, cluster, topologies));
}
for (Map.Entry<String, SchedulerAssignment> entry : cluster.getAssignments().entrySet()) {
String topId = entry.getValue().getTopologyId();
@@ -66,7 +66,7 @@ public class RAS_Nodes {
if (node == null) {
LOG.info("Found an assigned slot on a dead supervisor {} with executors {}",
workerSlot, RAS_Node.getExecutors(workerSlot, cluster));
- node = new RAS_Node(id, null, false, null, cluster);
+ node = new RAS_Node(id, null, false, null, cluster, topologies);
nodeIdToNode.put(id, node);
}
if (!node.isAlive()) {
@@ -86,8 +86,9 @@ public class RAS_Nodes {
/**
* updates the available resources for every node in a cluster
* by recalculating memory requirements.
- * @param cluster the cluster used in this calculation
- * @param topologies container of all topologies
+ *
+ * @param cluster the cluster used in this calculation
+ * @param topologies container of all topologies
* @param nodeIdToNode a map between node id and node
*/
private static void updateAvailableResources(Cluster cluster,
@@ -138,12 +139,26 @@ public class RAS_Nodes {
}
public void freeSlots(Collection<WorkerSlot> workerSlots) {
- for(RAS_Node node : nodeMap.values()) {
- for(WorkerSlot ws : node.getUsedSlots()) {
- if(workerSlots.contains(ws)) {
+ for (RAS_Node node : nodeMap.values()) {
+ for (WorkerSlot ws : node.getUsedSlots()) {
+ if (workerSlots.contains(ws)) {
+ LOG.info("freeing ws {} on node {}", ws, node);
node.free(ws);
}
}
}
}
+
+ public Collection<RAS_Node> getNodes() {
+ return this.nodeMap.values();
+ }
+
+ @Override
+ public String toString() {
+ String ret = "";
+ for (RAS_Node node : this.nodeMap.values()) {
+ ret += node.toString() + "\n";
+ }
+ return ret;
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/9d3c864c/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java
index 279d060..934858e 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java
@@ -45,6 +45,27 @@ public class ResourceAwareScheduler implements IScheduler {
private Topologies topologies;
private RAS_Nodes nodes;
+ private class SchedulingState {
+ private Map<String, User> userMap = new HashMap<String, User>();
+ private Cluster cluster;
+ private Topologies topologies;
+ private RAS_Nodes nodes;
+ private Map conf = new Config();
+
+ public SchedulingState(Map<String, User> userMap, Cluster cluster, Topologies topologies, RAS_Nodes nodes, Map conf) {
+ for(Map.Entry<String, User> userMapEntry : userMap.entrySet()) {
+ String userId = userMapEntry.getKey();
+ User user = userMapEntry.getValue();
+ this.userMap.put(userId, user.getCopy());
+ }
+ this.cluster = cluster.getCopy();
+ this.topologies = topologies.getCopy();
+ this.nodes = new RAS_Nodes(this.cluster, this.topologies);
+ this.conf.putAll(conf);
+
+ }
+ }
+
@SuppressWarnings("rawtypes")
private Map conf;
@@ -67,27 +88,35 @@ public class ResourceAwareScheduler implements IScheduler {
this.initialize(topologies, cluster);
LOG.info("UserMap:\n{}", this.userMap);
- for(User user : this.getUserMap().values()) {
+ for (User user : this.getUserMap().values()) {
LOG.info(user.getDetailedInfo());
}
- for(TopologyDetails topo : topologies.getTopologies()) {
+ for (TopologyDetails topo : topologies.getTopologies()) {
LOG.info("topo {} status: {}", topo, cluster.getStatusMap().get(topo.getId()));
}
+ LOG.info("Nodes:\n{}", this.nodes);
+
LOG.info("getNextUser: {}", this.getNextUser());
- while(true) {
+ while (true) {
+ LOG.info("/*********** next scheduling iteration **************/");
+
User nextUser = this.getNextUser();
- if(nextUser == null){
+ if (nextUser == null) {
break;
}
TopologyDetails td = nextUser.getNextTopologyToSchedule();
scheduleTopology(td);
}
+ //since scheduling state might have been restored thus need to set the cluster and topologies.
+ cluster = this.cluster;
+ topologies = this.topologies;
}
private boolean makeSpaceForTopo(TopologyDetails td) {
+ LOG.info("attempting to make space for topo {} from user {}", td.getName(), td.getTopologySubmitter());
User submitter = this.userMap.get(td.getTopologySubmitter());
if (submitter.getCPUResourceGuaranteed() == null || submitter.getMemoryResourceGuaranteed() == null) {
return false;
@@ -97,7 +126,7 @@ public class ResourceAwareScheduler implements IScheduler {
double memoryNeeded = (td.getTotalRequestedMemOffHeap() + td.getTotalRequestedMemOnHeap()) / submitter.getMemoryResourceGuaranteed();
//user has enough resource under his or her resource guarantee to schedule topology
- if ((1.0 - submitter.getCPUResourcePoolUtilization()) > cpuNeeded && (1.0 - submitter.getMemoryResourcePoolUtilization()) > memoryNeeded) {
+ if ((1.0 - submitter.getCPUResourcePoolUtilization()) >= cpuNeeded && (1.0 - submitter.getMemoryResourcePoolUtilization()) >= memoryNeeded) {
User evictUser = this.findUserWithMostResourcesAboveGuarantee();
if (evictUser == null) {
LOG.info("Cannot make space for topology {} from user {}", td.getName(), submitter.getId());
@@ -109,6 +138,8 @@ public class ResourceAwareScheduler implements IScheduler {
LOG.info("topology to evict: {}", topologyEvict);
evictTopology(topologyEvict);
+ LOG.info("Resources After eviction:\n{}", this.nodes);
+
return true;
} else {
@@ -129,6 +160,7 @@ public class ResourceAwareScheduler implements IScheduler {
User submitter = this.userMap.get(topologyEvict.getTopologySubmitter());
LOG.info("Evicting Topology {} with workers: {}", topologyEvict.getName(), workersToEvict);
+ LOG.debug("From Nodes:\n{}", ResourceUtils.printScheduling(this.nodes));
this.nodes.freeSlots(workersToEvict);
submitter.moveTopoFromRunningToPending(topologyEvict, this.cluster);
LOG.info("check if topology unassigned: {}", this.cluster.getUsedSlotsByTopologyId(topologyEvict.getId()));
@@ -137,9 +169,9 @@ public class ResourceAwareScheduler implements IScheduler {
private User findUserWithMostResourcesAboveGuarantee() {
double most = 0.0;
User mostOverUser = null;
- for(User user : this.userMap.values()) {
- double over = user.getResourcePoolAverageUtilization() -1.0;
- if((over > most) && (!user.getTopologiesRunning().isEmpty())) {
+ for (User user : this.userMap.values()) {
+ double over = user.getResourcePoolAverageUtilization() - 1.0;
+ if ((over > most) && (!user.getTopologiesRunning().isEmpty())) {
most = over;
mostOverUser = user;
}
@@ -147,63 +179,74 @@ public class ResourceAwareScheduler implements IScheduler {
return mostOverUser;
}
- public void resetAssignments(Map<String, SchedulerAssignment> assignmentCheckpoint) {
- this.cluster.setAssignments(assignmentCheckpoint);
- }
-
public void scheduleTopology(TopologyDetails td) {
- ResourceAwareStrategy RAStrategy = new ResourceAwareStrategy(this.cluster, this.topologies);
User topologySubmitter = this.userMap.get(td.getTopologySubmitter());
if (cluster.getUnassignedExecutors(td).size() > 0) {
LOG.info("/********Scheduling topology {} from User {}************/", td.getName(), topologySubmitter);
LOG.info("{}", this.userMap.get(td.getTopologySubmitter()).getDetailedInfo());
LOG.info("{}", User.getResourcePoolAverageUtilizationForUsers(this.userMap.values()));
+ LOG.info("Nodes:\n{}", this.nodes);
+ LOG.debug("From cluster:\n{}", ResourceUtils.printScheduling(this.cluster, this.topologies));
+ LOG.debug("From Nodes:\n{}", ResourceUtils.printScheduling(this.nodes));
- Map<String, SchedulerAssignment> assignmentCheckpoint = this.cluster.getAssignments();
-
+ SchedulingState schedulingState = this.checkpointSchedulingState();
while (true) {
+ //Need to reinitialize ResourceAwareStrategy with cluster and topologies in case scheduling state was restored
+ ResourceAwareStrategy RAStrategy = new ResourceAwareStrategy(this.cluster, this.topologies);
SchedulingResult result = RAStrategy.schedule(td);
LOG.info("scheduling result: {}", result);
if (result.isValid()) {
if (result.isSuccess()) {
try {
- if(mkAssignment(td, result.getSchedulingResultMap())) {
+ if (mkAssignment(td, result.getSchedulingResultMap())) {
topologySubmitter.moveTopoFromPendingToRunning(td, this.cluster);
} else {
- resetAssignments(assignmentCheckpoint);
+ //resetAssignments(assignmentCheckpoint);
+ this.restoreCheckpointSchedulingState(schedulingState);
+ topologySubmitter = this.userMap.get(td.getTopologySubmitter());
topologySubmitter.moveTopoFromPendingToAttempted(td, this.cluster);
}
} catch (IllegalStateException ex) {
LOG.error(ex.toString());
- LOG.error("Unsuccessful in scheduling", td.getId());
+ LOG.error("Unsuccessful in scheduling: IllegalStateException thrown!", td.getId());
this.cluster.setStatus(td.getId(), "Unsuccessful in scheduling");
- resetAssignments(assignmentCheckpoint);
+ this.restoreCheckpointSchedulingState(schedulingState);
+ //since state is restored need the update User topologySubmitter to the new User object in userMap
+ topologySubmitter = this.userMap.get(td.getTopologySubmitter());
topologySubmitter.moveTopoFromPendingToAttempted(td, this.cluster);
}
break;
} else {
if (result.getStatus() == SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES) {
- if(!this.makeSpaceForTopo(td)) {
- topologySubmitter.moveTopoFromPendingToAttempted(td);
+ if (!this.makeSpaceForTopo(td)) {
this.cluster.setStatus(td.getId(), result.getErrorMessage());
- resetAssignments(assignmentCheckpoint);
+ this.restoreCheckpointSchedulingState(schedulingState);
+ //since state is restored need the update User topologySubmitter to the new User object in userMap
+ topologySubmitter = this.userMap.get(td.getTopologySubmitter());
+ topologySubmitter.moveTopoFromPendingToAttempted(td);
break;
}
continue;
} else if (result.getStatus() == SchedulingStatus.FAIL_INVALID_TOPOLOGY) {
+ this.restoreCheckpointSchedulingState(schedulingState);
+ //since state is restored need the update User topologySubmitter to the new User object in userMap
+ topologySubmitter = this.userMap.get(td.getTopologySubmitter());
topologySubmitter.moveTopoFromPendingToInvalid(td, this.cluster);
- resetAssignments(assignmentCheckpoint);
break;
} else {
+ this.restoreCheckpointSchedulingState(schedulingState);
+ //since state is restored need the update User topologySubmitter to the new User object in userMap
+ topologySubmitter = this.userMap.get(td.getTopologySubmitter());
topologySubmitter.moveTopoFromPendingToAttempted(td, this.cluster);
- resetAssignments(assignmentCheckpoint);
break;
}
}
} else {
LOG.warn("Scheduling results returned from topology {} is not vaild! Topology with be ignored.", td.getName());
+ this.restoreCheckpointSchedulingState(schedulingState);
+ //since state is restored need the update User topologySubmitter to the new User object in userMap
+ topologySubmitter = this.userMap.get(td.getTopologySubmitter());
topologySubmitter.moveTopoFromPendingToInvalid(td, this.cluster);
- resetAssignments(assignmentCheckpoint);
break;
}
}
@@ -215,6 +258,7 @@ public class ResourceAwareScheduler implements IScheduler {
}
private boolean mkAssignment(TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>> schedulerAssignmentMap) {
+ LOG.info("making assignments for topology {}", td);
if (schedulerAssignmentMap != null) {
double requestedMemOnHeap = td.getTotalRequestedMemOnHeap();
double requestedMemOffHeap = td.getTotalRequestedMemOffHeap();
@@ -272,69 +316,6 @@ public class ResourceAwareScheduler implements IScheduler {
cluster.setSupervisorsResourcesMap(supervisors_resources);
}
-
-// private void scheduleTopology(TopologyDetails td) {
-// ResourceAwareStrategy RAStrategy = new ResourceAwareStrategy(this.cluster, this.topologies);
-// if (cluster.needsScheduling(td) && cluster.getUnassignedExecutors(td).size() > 0) {
-// LOG.info("/********Scheduling topology {} from User {}************/", td.getName(), td.getTopologySubmitter());
-// LOG.info("{}", this.userMap.get(td.getTopologySubmitter()).getDetailedInfo());
-// LOG.info("{}", User.getResourcePoolAverageUtilizationForUsers(this.userMap.values()));
-//
-// Map<WorkerSlot, Collection<ExecutorDetails>> schedulerAssignmentMap = RAStrategy.schedule(td);
-//
-// double requestedMemOnHeap = td.getTotalRequestedMemOnHeap();
-// double requestedMemOffHeap = td.getTotalRequestedMemOffHeap();
-// double requestedCpu = td.getTotalRequestedCpu();
-// double assignedMemOnHeap = 0.0;
-// double assignedMemOffHeap = 0.0;
-// double assignedCpu = 0.0;
-//
-// if (schedulerAssignmentMap != null) {
-// try {
-// Set<String> nodesUsed = new HashSet<String>();
-// int assignedWorkers = schedulerAssignmentMap.keySet().size();
-// for (Map.Entry<WorkerSlot, Collection<ExecutorDetails>> workerToTasksEntry : schedulerAssignmentMap.entrySet()) {
-// WorkerSlot targetSlot = workerToTasksEntry.getKey();
-// Collection<ExecutorDetails> execsNeedScheduling = workerToTasksEntry.getValue();
-// RAS_Node targetNode = RAStrategy.idToNode(targetSlot.getNodeId());
-// targetNode.assign(targetSlot, td, execsNeedScheduling, this.cluster);
-// LOG.debug("ASSIGNMENT TOPOLOGY: {} TASKS: {} To Node: {} on Slot: {}",
-// td.getName(), execsNeedScheduling, targetNode.getHostname(), targetSlot.getPort());
-// if (!nodesUsed.contains(targetNode.getId())) {
-// nodesUsed.add(targetNode.getId());
-// }
-// assignedMemOnHeap += targetSlot.getAllocatedMemOnHeap();
-// assignedMemOffHeap += targetSlot.getAllocatedMemOffHeap();
-// assignedCpu += targetSlot.getAllocatedCpu();
-// }
-// LOG.debug("Topology: {} assigned to {} nodes on {} workers", td.getId(), nodesUsed.size(), assignedWorkers);
-// this.cluster.setStatus(td.getId(), "Fully Scheduled");
-// this.getUser(td.getTopologySubmitter()).moveTopoFromPendingToRunning(td);
-// LOG.info("getNextUser: {}", this.getNextUser());
-// } catch (IllegalStateException ex) {
-// LOG.error(ex.toString());
-// LOG.error("Unsuccessful in scheduling", td.getId());
-// this.cluster.setStatus(td.getId(), "Unsuccessful in scheduling");
-// this.getUser(td.getTopologySubmitter()).moveTopoFromPendingToAttempted(td);
-// }
-// } else {
-// LOG.error("Unsuccessful in scheduling {}", td.getId());
-// this.cluster.setStatus(td.getId(), "Unsuccessful in scheduling");
-// // this.evictTopology(td);
-// // this.getUser(td.getTopologySubmitter()).moveTopoFromPendingToAttempted(td);
-// }
-// Double[] resources = {requestedMemOnHeap, requestedMemOffHeap, requestedCpu,
-// assignedMemOnHeap, assignedMemOffHeap, assignedCpu};
-// LOG.debug("setResources for {}: requested on-heap mem, off-heap mem, cpu: {} {} {} " +
-// "assigned on-heap mem, off-heap mem, cpu: {} {} {}",
-// td.getId(), requestedMemOnHeap, requestedMemOffHeap, requestedCpu,
-// assignedMemOnHeap, assignedMemOffHeap, assignedCpu);
-// this.cluster.setResources(td.getId(), resources);
-// } else {
-// LOG.warn("Topology {} already scheduled!", td.getName());
-// this.cluster.setStatus(td.getId(), "Fully Scheduled");
-// }
-// }
public User getUser(String user) {
return this.userMap.get(user);
}
@@ -346,23 +327,30 @@ public class ResourceAwareScheduler implements IScheduler {
public User getNextUser() {
Double least = Double.POSITIVE_INFINITY;
User ret = null;
- for(User user : this.userMap.values()) {
- LOG.info("{}", user.getDetailedInfo());
+ for (User user : this.userMap.values()) {
+ LOG.info("getNextUser {}", user.getDetailedInfo());
LOG.info("hasTopologyNeedSchedule: {}", user.hasTopologyNeedSchedule());
- if(user.hasTopologyNeedSchedule()) {
+ if (user.hasTopologyNeedSchedule()) {
Double userResourcePoolAverageUtilization = user.getResourcePoolAverageUtilization();
+ if(ret!=null) {
+ LOG.info("current: {}-{} compareUser: {}-{}", ret.getId(), least, user.getId(), userResourcePoolAverageUtilization);
+ LOG.info("{} == {}: {}", least, userResourcePoolAverageUtilization, least == userResourcePoolAverageUtilization);
+ LOG.info("{} == {}: {}", least, userResourcePoolAverageUtilization, (Math.abs(least - userResourcePoolAverageUtilization) < 0.0001));
+
+ }
if (least > userResourcePoolAverageUtilization) {
ret = user;
least = userResourcePoolAverageUtilization;
- } else if (least == userResourcePoolAverageUtilization) {
- double currentCpuPercentage = ret.getCPUResourceGuaranteed()/this.cluster.getClusterTotalCPUResource();
- double currentMemoryPercentage = ret.getMemoryResourceGuaranteed()/this.cluster.getClusterTotalMemoryResource();
+ } else if (Math.abs(least - userResourcePoolAverageUtilization) < 0.0001) {
+ double currentCpuPercentage = ret.getCPUResourceGuaranteed() / this.cluster.getClusterTotalCPUResource();
+ double currentMemoryPercentage = ret.getMemoryResourceGuaranteed() / this.cluster.getClusterTotalMemoryResource();
double currentAvgPercentage = (currentCpuPercentage + currentMemoryPercentage) / 2.0;
- double userCpuPercentage = user.getCPUResourceGuaranteed()/this.cluster.getClusterTotalCPUResource();
- double userMemoryPercentage = user.getMemoryResourceGuaranteed()/this.cluster.getClusterTotalMemoryResource();
+ double userCpuPercentage = user.getCPUResourceGuaranteed() / this.cluster.getClusterTotalCPUResource();
+ double userMemoryPercentage = user.getMemoryResourceGuaranteed() / this.cluster.getClusterTotalMemoryResource();
double userAvgPercentage = (userCpuPercentage + userMemoryPercentage) / 2.0;
- if(userAvgPercentage > currentAvgPercentage) {
+ LOG.info("current: {}-{} compareUser: {}-{}", ret.getId(), currentAvgPercentage, user.getId(), userAvgPercentage);
+ if (userAvgPercentage > currentAvgPercentage) {
ret = user;
least = userResourcePoolAverageUtilization;
}
@@ -374,6 +362,7 @@ public class ResourceAwareScheduler implements IScheduler {
/**
* Intialize scheduling and running queues
+ *
* @param topologies
* @param cluster
*/
@@ -384,18 +373,16 @@ public class ResourceAwareScheduler implements IScheduler {
LOG.info("userResourcePools: {}", userResourcePools);
for (TopologyDetails td : topologies.getTopologies()) {
- LOG.info("topology: {} from {}", td.getName(), td.getTopologySubmitter());
String topologySubmitter = td.getTopologySubmitter();
- if(topologySubmitter == null) {
+ if (topologySubmitter == null) {
LOG.warn("Topology {} submitted by anonymous user", td.getName());
topologySubmitter = "anonymous";
}
- if(!this.userMap.containsKey(topologySubmitter)) {
+ if (!this.userMap.containsKey(topologySubmitter)) {
this.userMap.put(topologySubmitter, new User(topologySubmitter, userResourcePools.get(topologySubmitter)));
}
- if(cluster.getUnassignedExecutors(td).size() >= td.getExecutors().size()) {
+ if (cluster.getUnassignedExecutors(td).size() >= td.getExecutors().size()) {
this.userMap.get(topologySubmitter).addTopologyToPendingQueue(td, cluster);
- LOG.info(this.userMap.get(topologySubmitter).getDetailedInfo());
} else {
this.userMap.get(topologySubmitter).addTopologyToRunningQueue(td, cluster);
}
@@ -411,35 +398,62 @@ public class ResourceAwareScheduler implements IScheduler {
/**
* Get resource guarantee configs
+ *
* @return
*/
private Map<String, Map<String, Double>> getUserResourcePools() {
Object raw = this.conf.get(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS);
- Map<String, Map<String, Double>> ret = (Map<String, Map<String, Double>>)this.conf.get(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS);
+ Map<String, Map<String, Double>> ret = new HashMap<String, Map<String, Double>>();
- if (raw == null) {
- ret = new HashMap<String, Map<String, Double>>();
- } else {
- for(Map.Entry<String, Map<String, Number>> UserPoolEntry : ((Map<String, Map<String, Number>>) raw).entrySet()) {
+ if (raw != null) {
+ for (Map.Entry<String, Map<String, Number>> UserPoolEntry : ((Map<String, Map<String, Number>>) raw).entrySet()) {
String user = UserPoolEntry.getKey();
ret.put(user, new HashMap<String, Double>());
- for(Map.Entry<String, Number> resourceEntry : UserPoolEntry.getValue().entrySet()) {
+ for (Map.Entry<String, Number> resourceEntry : UserPoolEntry.getValue().entrySet()) {
ret.get(user).put(resourceEntry.getKey(), resourceEntry.getValue().doubleValue());
}
}
}
Map fromFile = Utils.findAndReadConfigFile("user-resource-pools.yaml", false);
- Map<String, Map<String, Number>>tmp = (Map<String, Map<String, Number>>)fromFile.get(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS);
+ Map<String, Map<String, Number>> tmp = (Map<String, Map<String, Number>>) fromFile.get(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS);
if (tmp != null) {
- for(Map.Entry<String, Map<String, Number>> UserPoolEntry : tmp.entrySet()) {
+ for (Map.Entry<String, Map<String, Number>> UserPoolEntry : tmp.entrySet()) {
String user = UserPoolEntry.getKey();
ret.put(user, new HashMap<String, Double>());
- for(Map.Entry<String, Number> resourceEntry : UserPoolEntry.getValue().entrySet()) {
+ for (Map.Entry<String, Number> resourceEntry : UserPoolEntry.getValue().entrySet()) {
ret.get(user).put(resourceEntry.getKey(), resourceEntry.getValue().doubleValue());
}
}
}
return ret;
}
+
+ private SchedulingState checkpointSchedulingState() {
+ LOG.info("checkpointing scheduling state...");
+ LOG.info("/*********Checkpoint************/");
+ for (User user : this.getUserMap().values()) {
+ LOG.info(user.getDetailedInfo());
+ }
+ LOG.info("/*********End************/");
+ return new SchedulingState(this.userMap, this.cluster, this.topologies, this.nodes, this.conf);
+ }
+
+ private void restoreCheckpointSchedulingState(SchedulingState schedulingState) {
+ LOG.info("restoring scheduling state...");
+ LOG.info("/*********Before************/");
+ for (User user : this.getUserMap().values()) {
+ LOG.info(user.getDetailedInfo());
+ }
+ this.cluster = schedulingState.cluster;
+ this.topologies = schedulingState.topologies;
+ this.conf = schedulingState.conf;
+ this.userMap = schedulingState.userMap;
+ this.nodes = schedulingState.nodes;
+ LOG.info("/*********After************/");
+ for (User user : this.getUserMap().values()) {
+ LOG.info(user.getDetailedInfo());
+ }
+ LOG.info("/*********End************/");
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/9d3c864c/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceUtils.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceUtils.java
index 8e11384..02d48e1 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceUtils.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceUtils.java
@@ -181,4 +181,13 @@ public class ResourceUtils {
}
return str.toString();
}
+
+ public static String printScheduling(RAS_Nodes nodes) {
+ String ret="";
+ for (RAS_Node node : nodes.getNodes()) {
+ ret += "Node: " + node.getHostname() + "\n";
+ ret += "-> " + node.getTopoIdTousedSlots() + "\n";
+ }
+ return ret;
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/9d3c864c/storm-core/src/jvm/backtype/storm/scheduler/resource/User.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/User.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/User.java
index 068db54..77a1dff 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/resource/User.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/User.java
@@ -35,15 +35,15 @@ import java.util.TreeSet;
public class User {
private String userId;
//Topologies yet to be scheduled sorted by priority for each user
- private Set<TopologyDetails> pendingQueue = new TreeSet<TopologyDetails>(new PQsortByPriorityAndSubmittionTime());
+ private TreeSet<TopologyDetails> pendingQueue = new TreeSet<TopologyDetails>(new PQsortByPriorityAndSubmittionTime());
//Topologies yet to be scheduled sorted by priority for each user
- private Set<TopologyDetails> runningQueue = new TreeSet<TopologyDetails>(new PQsortByPriorityAndSubmittionTime());
+ private TreeSet<TopologyDetails> runningQueue = new TreeSet<TopologyDetails>(new PQsortByPriorityAndSubmittionTime());
//Topologies that was attempted to be scheduled but wasn't successull
- private Set<TopologyDetails> attemptedQueue = new TreeSet<TopologyDetails>(new PQsortByPriorityAndSubmittionTime());
+ private TreeSet<TopologyDetails> attemptedQueue = new TreeSet<TopologyDetails>(new PQsortByPriorityAndSubmittionTime());
- private Set<TopologyDetails> invalidQueue = new TreeSet<TopologyDetails>(new PQsortByPriorityAndSubmittionTime());
+ private TreeSet<TopologyDetails> invalidQueue = new TreeSet<TopologyDetails>(new PQsortByPriorityAndSubmittionTime());
private Map<String, Double> resourcePool = new HashMap<String, Double>();
@@ -60,6 +60,23 @@ public class User {
}
}
+ public User getCopy() {
+ User newUser = new User(this.userId, this.resourcePool);
+ for(TopologyDetails topo : this.pendingQueue) {
+ newUser.addTopologyToPendingQueue(topo);
+ }
+ for(TopologyDetails topo : this.runningQueue) {
+ newUser.addTopologyToRunningQueue(topo);
+ }
+ for(TopologyDetails topo : this.attemptedQueue) {
+ newUser.addTopologyToAttemptedQueue(topo);
+ }
+ for(TopologyDetails topo : this.invalidQueue) {
+ newUser.addTopologyToInvalidQueue(topo);
+ }
+ return newUser;
+ }
+
public String getId() {
return this.userId;
}
@@ -92,6 +109,14 @@ public class User {
return ret;
}
+ public void addTopologyToAttemptedQueue(TopologyDetails topo) {
+ this.attemptedQueue.add(topo);
+ }
+
+ public void addTopologyToInvalidQueue(TopologyDetails topo) {
+ this.invalidQueue.add(topo);
+ }
+
public Set<TopologyDetails> getTopologiesRunning() {
TreeSet<TopologyDetails> ret = new TreeSet<TopologyDetails>(new PQsortByPriorityAndSubmittionTime());
ret.addAll(this.runningQueue);
@@ -104,6 +129,11 @@ public class User {
return ret;
}
+ public Set<TopologyDetails> getTopologiesInvalid() {
+ TreeSet<TopologyDetails> ret = new TreeSet<TopologyDetails>(new PQsortByPriorityAndSubmittionTime());
+ ret.addAll(this.invalidQueue);
+ return ret;
+ }
public Map<String, Number> getResourcePool() {
if (this.resourcePool != null) {
return new HashMap<String, Number>(this.resourcePool);
@@ -160,17 +190,12 @@ public class User {
private void moveTopology(TopologyDetails topo, Set<TopologyDetails> src, String srcName, Set<TopologyDetails> dest, String destName) {
- LOG.info("{} queue: {}", srcName, src);
- LOG.info("{} queue: {}", destName, dest);
+ LOG.info("For User {} Moving topo {} from {} to {}", this.userId, topo.getName(), srcName, destName);
if (topo == null) {
return;
}
if (!src.contains(topo)) {
LOG.warn("Topo {} not in User: {} {} queue!", topo.getName(), this.userId, srcName);
- LOG.info("topo {}-{}-{}", topo.getName(), topo.getId(), topo.hashCode());
- for (TopologyDetails t : src) {
- LOG.info("queue entry: {}-{}-{}", t.getName(), t.getId(), t.hashCode());
- }
return;
}
if (dest.contains(topo)) {
@@ -179,6 +204,8 @@ public class User {
}
src.remove(topo);
dest.add(topo);
+ LOG.info("SRC: {}", src);
+ LOG.info("DEST: {}", dest);
}
@@ -244,14 +271,15 @@ public class User {
}
public boolean hasTopologyNeedSchedule() {
- return (!this.pendingQueue.isEmpty() && (this.pendingQueue.size() - this.attemptedQueue.size()) > 0);
+ //return (!this.pendingQueue.isEmpty() && (this.pendingQueue.size() - this.attemptedQueue.size()) > 0);
+ return (!this.pendingQueue.isEmpty());
}
public TopologyDetails getRunningTopologyWithLowestPriority() {
if (this.runningQueue.isEmpty()) {
return null;
}
- return this.runningQueue.iterator().next();
+ return this.runningQueue.last();
}
@Override
http://git-wip-us.apache.org/repos/asf/storm/blob/9d3c864c/storm-core/test/jvm/backtype/storm/scheduler/resource/Experiment.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/scheduler/resource/Experiment.java b/storm-core/test/jvm/backtype/storm/scheduler/resource/Experiment.java
index bea41ff..3ff0af1 100644
--- a/storm-core/test/jvm/backtype/storm/scheduler/resource/Experiment.java
+++ b/storm-core/test/jvm/backtype/storm/scheduler/resource/Experiment.java
@@ -29,6 +29,8 @@ import backtype.storm.utils.Time;
import backtype.storm.utils.Utils;
import org.junit.Test;
import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
@@ -37,120 +39,61 @@ import java.util.Map;
* Created by jerrypeng on 11/11/15.
*/
public class Experiment {
- @Test
- public void TestMultipleUsers() {
-// INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
-// Map<String, Number> resourceMap = new HashMap<String, Number>();
-// resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 1000.0);
-// resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 1024.0 * 10);
-// Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(5, 4, resourceMap);
-// Config config = new Config();
-// config.putAll(Utils.readDefaultConfig());
-// Map<String, Map<String, Number>> resourceUserPool = new HashMap<String, Map<String, Number>>();
-// resourceUserPool.put("jerry", new HashMap<String, Number>());
-// resourceUserPool.get("jerry").put("cpu", 1000);
-// resourceUserPool.get("jerry").put("memory", 8192.0);
-//
-// resourceUserPool.put("bobby", new HashMap<String, Number>());
-// resourceUserPool.get("bobby").put("cpu", 10000.0);
-// resourceUserPool.get("bobby").put("memory", 32768);
-//
-// resourceUserPool.put("derek", new HashMap<String, Number>());
-// resourceUserPool.get("derek").put("cpu", 5000.0);
-// resourceUserPool.get("derek").put("memory", 16384.0);
-//
-// config.put(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS, resourceUserPool);
-// Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config);
-//
-// config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
-//
-// TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 5, 15, 1, 1, Time.currentTimeSecs() - 2, 20);
-// TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 5, 15, 1, 1, Time.currentTimeSecs() - 8, 30);
-// TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 5, 15, 1, 1, Time.currentTimeSecs() - 16, 30);
-// TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 5, 15, 1, 1, Time.currentTimeSecs() - 16, 20);
-// TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 5, 15, 1, 1, Time.currentTimeSecs() - 24, 30);
-//
-// config.put(Config.TOPOLOGY_SUBMITTER_USER, "bobby");
-//
-// TopologyDetails topo6 = TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 5, 15, 1, 1, Time.currentTimeSecs() - 2, 20);
-// TopologyDetails topo7 = TestUtilsForResourceAwareScheduler.getTopology("topo-7", config, 5, 15, 1, 1, Time.currentTimeSecs() - 8, 30);
-// TopologyDetails topo8 = TestUtilsForResourceAwareScheduler.getTopology("topo-8", config, 5, 15, 1, 1, Time.currentTimeSecs() - 16, 30);
-// TopologyDetails topo9 = TestUtilsForResourceAwareScheduler.getTopology("topo-9", config, 5, 15, 1, 1, Time.currentTimeSecs() - 16, 20);
-// TopologyDetails topo10 = TestUtilsForResourceAwareScheduler.getTopology("topo-10", config, 5, 15, 1, 1, Time.currentTimeSecs() - 24, 30);
-//
-// config.put(Config.TOPOLOGY_SUBMITTER_USER, "derek");
-//
-// TopologyDetails topo11 = TestUtilsForResourceAwareScheduler.getTopology("topo-11", config, 5, 15, 1, 1, Time.currentTimeSecs() - 2, 20);
-// TopologyDetails topo12 = TestUtilsForResourceAwareScheduler.getTopology("topo-12", config, 5, 15, 1, 1, Time.currentTimeSecs() - 8, 30);
-// TopologyDetails topo13 = TestUtilsForResourceAwareScheduler.getTopology("topo-13", config, 5, 15, 1, 1, Time.currentTimeSecs() - 16, 30);
-// TopologyDetails topo14 = TestUtilsForResourceAwareScheduler.getTopology("topo-14", config, 5, 15, 1, 1, Time.currentTimeSecs() - 16, 20);
-// TopologyDetails topo15 = TestUtilsForResourceAwareScheduler.getTopology("topo-15", config, 5, 15, 1, 1, Time.currentTimeSecs() - 24, 30);
-//
-//
-// Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
-// topoMap.put(topo1.getId(), topo1);
-// topoMap.put(topo2.getId(), topo2);
-// topoMap.put(topo3.getId(), topo3);
-// topoMap.put(topo4.getId(), topo4);
-// topoMap.put(topo5.getId(), topo5);
-// topoMap.put(topo6.getId(), topo6);
-// topoMap.put(topo7.getId(), topo7);
-// topoMap.put(topo8.getId(), topo8);
-// topoMap.put(topo9.getId(), topo9);
-// topoMap.put(topo10.getId(), topo10);
-// topoMap.put(topo11.getId(), topo11);
-// topoMap.put(topo12.getId(), topo12);
-// topoMap.put(topo13.getId(), topo13);
-// topoMap.put(topo14.getId(), topo14);
-// topoMap.put(topo15.getId(), topo15);
-//
-// Topologies topologies = new Topologies(topoMap);
-//
-// ResourceAwareScheduler rs = new ResourceAwareScheduler();
-//
-// rs.prepare(config);
-// rs.schedule(topologies, cluster);
-//
-// for(TopologyDetails topo : topoMap.values()) {
-// Assert.assertEquals(cluster.getStatusMap().get(topo.getId()), "Fully Scheduled");
-// }
-//
-// for(User user : rs.getUserMap().values()) {
-// Assert.assertEquals(user.getTopologiesPending().size(), 0);
-// Assert.assertEquals(user.getTopologiesRunning().size(), 5);
-// }
+ private static final Logger LOG = LoggerFactory.getLogger(Experiment.class);
+
+ /**
+ * Eviction order:
+ * topo-3: since user bobby don't have any resource guarantees and topo-3 is the lowest priority for user bobby
+ * topo-2: since user bobby don't have any resource guarantees and topo-2 is the next lowest priority for user bobby
+ * topo-5: since user derek has exceeded his resource guarantee while user jerry has not. topo-5 and topo-4 has the same priority
+ * but topo-4 was submitted earlier thus we choose that one to evict
+ */
+ @Test
+ public void TestEvictMultipleTopologiesFromMultipleUsersInCorrectOrder() {
INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
Map<String, Number> resourceMap = new HashMap<String, Number>();
- resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 200.0);
- resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 1024.0 * 10);
- Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(1, 4, resourceMap);
+ resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 100.0);
+ resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 1000.0);
+ Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(4, 4, resourceMap);
Config config = new Config();
config.putAll(Utils.readDefaultConfig());
+ config.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, 100.0);
+ config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 500);
+ config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 500);
Map<String, Map<String, Number>> resourceUserPool = new HashMap<String, Map<String, Number>>();
resourceUserPool.put("jerry", new HashMap<String, Number>());
- resourceUserPool.get("jerry").put("cpu", 1000);
- resourceUserPool.get("jerry").put("memory", 8192.0);
-
- resourceUserPool.put("bobby", new HashMap<String, Number>());
- resourceUserPool.get("bobby").put("cpu", 10000.0);
- resourceUserPool.get("bobby").put("memory", 32768);
+ resourceUserPool.get("jerry").put("cpu", 300.0);
+ resourceUserPool.get("jerry").put("memory", 3000.0);
resourceUserPool.put("derek", new HashMap<String, Number>());
- resourceUserPool.get("derek").put("cpu", 5000.0);
- resourceUserPool.get("derek").put("memory", 16384.0);
+ resourceUserPool.get("derek").put("cpu", 100.0);
+ resourceUserPool.get("derek").put("memory", 1000.0);
config.put(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS, resourceUserPool);
Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config);
config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
- TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 5, 15, 1, 1, Time.currentTimeSecs() - 2, 20);
- TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 5, 15, 1, 1, Time.currentTimeSecs() - 8, 30);
+ TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 10);
+ TopologyDetails topo6 = TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 10);
+ TopologyDetails topo7 = TestUtilsForResourceAwareScheduler.getTopology("topo-7", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 10);
+
+ config.put(Config.TOPOLOGY_SUBMITTER_USER, "bobby");
+
+ TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 10);
+ TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 20);
+
+ config.put(Config.TOPOLOGY_SUBMITTER_USER, "derek");
+
+ TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 30);
+ TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 1, 0, 1, 0, Time.currentTimeSecs() - 15, 30);
Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
- topoMap.put(topo1.getId(), topo1);
topoMap.put(topo2.getId(), topo2);
+ topoMap.put(topo3.getId(), topo3);
+ topoMap.put(topo4.getId(), topo4);
+ topoMap.put(topo5.getId(), topo5);
Topologies topologies = new Topologies(topoMap);
@@ -159,16 +102,120 @@ public class Experiment {
rs.prepare(config);
rs.schedule(topologies, cluster);
- int fullyScheduled = 0;
- for (TopologyDetails topo : topoMap.values()) {
- if(cluster.getStatusMap().get(topo.getId()).equals("Fully Scheduled")) {
- fullyScheduled++;
- }
+ for (TopologyDetails topo : rs.getUser("derek").getTopologiesRunning()) {
+ Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+ }
+ Assert.assertEquals("# of running topologies", 2, rs.getUser("derek").getTopologiesRunning().size());
+ Assert.assertEquals("# of pending topologies", 0, rs.getUser("derek").getTopologiesPending().size());
+ Assert.assertEquals("# of attempted topologies", 0, rs.getUser("derek").getTopologiesAttempted().size());
+ Assert.assertEquals("# of invalid topologies", 0, rs.getUser("derek").getTopologiesInvalid().size());
+
+ for (TopologyDetails topo : rs.getUser("bobby").getTopologiesRunning()) {
+ Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+ }
+ Assert.assertEquals("# of running topologies", 2, rs.getUser("bobby").getTopologiesRunning().size());
+ Assert.assertEquals("# of pending topologies", 0, rs.getUser("bobby").getTopologiesPending().size());
+ Assert.assertEquals("# of invalid topologies", 0, rs.getUser("bobby").getTopologiesInvalid().size());
+ Assert.assertEquals("# of attempted topologies", 0, rs.getUser("bobby").getTopologiesAttempted().size());
+
+ //user jerry submits another topology
+ topoMap.put(topo1.getId(), topo1);
+ topologies = new Topologies(topoMap);
+ rs.schedule(topologies, cluster);
+
+ for (TopologyDetails topo : rs.getUser("jerry").getTopologiesRunning()) {
+ Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+ }
+ Assert.assertEquals("# of running topologies", 1, rs.getUser("jerry").getTopologiesRunning().size());
+ Assert.assertEquals("# of pending topologies", 0, rs.getUser("jerry").getTopologiesPending().size());
+ Assert.assertEquals("# of attempted topologies", 0, rs.getUser("jerry").getTopologiesAttempted().size());
+ Assert.assertEquals("# of invalid topologies", 0, rs.getUser("jerry").getTopologiesInvalid().size());
+
+ for (TopologyDetails topo : rs.getUser("derek").getTopologiesRunning()) {
+ Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+ }
+ Assert.assertEquals("# of running topologies", 2, rs.getUser("derek").getTopologiesRunning().size());
+ Assert.assertEquals("# of pending topologies", 0, rs.getUser("derek").getTopologiesPending().size());
+ Assert.assertEquals("# of attempted topologies", 0, rs.getUser("derek").getTopologiesAttempted().size());
+ Assert.assertEquals("# of invalid topologies", 0, rs.getUser("derek").getTopologiesInvalid().size());
+
+ for (TopologyDetails topo : rs.getUser("bobby").getTopologiesAttempted()) {
+ Assert.assertFalse("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+ }
+ for (TopologyDetails topo : rs.getUser("bobby").getTopologiesRunning()) {
+ Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+ }
+ Assert.assertEquals("# of attempted topologies", 1, rs.getUser("bobby").getTopologiesAttempted().size());
+ Assert.assertEquals("# of pending topologies", 0, rs.getUser("bobby").getTopologiesPending().size());
+ Assert.assertEquals("# of invalid topologies", 0, rs.getUser("bobby").getTopologiesInvalid().size());
+ Assert.assertEquals("# of running topologies", 1, rs.getUser("bobby").getTopologiesRunning().size());
+ Assert.assertEquals("correct topology to evict", rs.getUser("bobby").getTopologiesAttempted().iterator().next().getName(), "topo-3");
+
+ topoMap.put(topo6.getId(), topo6);
+ topologies = new Topologies(topoMap);
+ rs.schedule(topologies, cluster);
+
+ for (TopologyDetails topo : rs.getUser("jerry").getTopologiesRunning()) {
+ Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
}
- Assert.assertEquals("# of Fully scheduled", 1, fullyScheduled);
- Assert.assertEquals("# of topologies schedule attempted", 1, rs.getUser("jerry").getTopologiesAttempted().size());
- Assert.assertEquals("# of topologies running", 1, rs.getUser("jerry").getTopologiesRunning().size());
- Assert.assertEquals("# of topologies schedule pending", 0, rs.getUser("jerry").getTopologiesPending().size());
+ Assert.assertEquals("# of running topologies", 2, rs.getUser("jerry").getTopologiesRunning().size());
+ Assert.assertEquals("# of pending topologies", 0, rs.getUser("jerry").getTopologiesPending().size());
+ Assert.assertEquals("# of attempted topologies", 0, rs.getUser("jerry").getTopologiesAttempted().size());
+ Assert.assertEquals("# of invalid topologies", 0, rs.getUser("jerry").getTopologiesInvalid().size());
+
+ for (TopologyDetails topo : rs.getUser("derek").getTopologiesRunning()) {
+ Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+ }
+ Assert.assertEquals("# of running topologies", 2, rs.getUser("derek").getTopologiesRunning().size());
+ Assert.assertEquals("# of pending topologies", 0, rs.getUser("derek").getTopologiesPending().size());
+ Assert.assertEquals("# of attempted topologies", 0, rs.getUser("derek").getTopologiesAttempted().size());
+ Assert.assertEquals("# of invalid topologies", 0, rs.getUser("derek").getTopologiesInvalid().size());
+
+ for (TopologyDetails topo : rs.getUser("bobby").getTopologiesAttempted()) {
+ Assert.assertFalse(TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+ }
+ Assert.assertEquals("# of attempted topologies", 2, rs.getUser("bobby").getTopologiesAttempted().size());
+ Assert.assertEquals("# of pending topologies", 0, rs.getUser("bobby").getTopologiesPending().size());
+ Assert.assertEquals("# of invalid topologies", 0, rs.getUser("bobby").getTopologiesInvalid().size());
+ Assert.assertEquals("# of running topologies", 0, rs.getUser("bobby").getTopologiesRunning().size());
+
+ Assert.assertTrue("correct topology to evict", TestUtilsForResourceAwareScheduler.findTopologyInSetFromName("topo-2", rs.getUser("bobby").getTopologiesAttempted()) != null);
+ Assert.assertTrue("correct topology to evict", TestUtilsForResourceAwareScheduler.findTopologyInSetFromName("topo-3", rs.getUser("bobby").getTopologiesAttempted()) != null);
+
+ topoMap.put(topo7.getId(), topo7);
+ topologies = new Topologies(topoMap);
+ rs.schedule(topologies, cluster);
+
+ for (TopologyDetails topo : rs.getUser("jerry").getTopologiesRunning()) {
+ Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+ }
+ Assert.assertEquals("# of running topologies", 3, rs.getUser("jerry").getTopologiesRunning().size());
+ Assert.assertEquals("# of pending topologies", 0, rs.getUser("jerry").getTopologiesPending().size());
+ Assert.assertEquals("# of attempted topologies", 0, rs.getUser("jerry").getTopologiesAttempted().size());
+ Assert.assertEquals("# of invalid topologies", 0, rs.getUser("jerry").getTopologiesInvalid().size());
+
+ for (TopologyDetails topo : rs.getUser("derek").getTopologiesRunning()) {
+ Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+ }
+ for (TopologyDetails topo : rs.getUser("derek").getTopologiesAttempted()) {
+ Assert.assertFalse("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+ }
+ Assert.assertEquals("# of running topologies", 1, rs.getUser("derek").getTopologiesRunning().size());
+ Assert.assertEquals("# of pending topologies", 0, rs.getUser("derek").getTopologiesPending().size());
+ Assert.assertEquals("# of attempted topologies", 1, rs.getUser("derek").getTopologiesAttempted().size());
+ Assert.assertEquals("# of invalid topologies", 0, rs.getUser("derek").getTopologiesInvalid().size());
+ Assert.assertEquals("correct topology to evict", rs.getUser("derek").getTopologiesAttempted().iterator().next().getName(), "topo-4");
+
+ for (TopologyDetails topo : rs.getUser("bobby").getTopologiesAttempted()) {
+ Assert.assertFalse("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+ }
+ Assert.assertEquals("# of attempted topologies", 2, rs.getUser("bobby").getTopologiesAttempted().size());
+ Assert.assertEquals("# of pending topologies", 0, rs.getUser("bobby").getTopologiesPending().size());
+ Assert.assertEquals("# of invalid topologies", 0, rs.getUser("bobby").getTopologiesInvalid().size());
+ Assert.assertEquals("# of running topologies", 0, rs.getUser("bobby").getTopologiesRunning().size());
+
+ Assert.assertTrue("correct topology to evict", TestUtilsForResourceAwareScheduler.findTopologyInSetFromName("topo-2", rs.getUser("bobby").getTopologiesAttempted()) != null);
+ Assert.assertTrue("correct topology to evict", TestUtilsForResourceAwareScheduler.findTopologyInSetFromName("topo-3", rs.getUser("bobby").getTopologiesAttempted()) != null);
}