You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2016/08/14 14:46:02 UTC
[1/3] storm git commit: [STORM-1766] - A better algorithm server rack
selection for RAS
Repository: storm
Updated Branches:
refs/heads/1.x-branch 66f5f68aa -> 560746f8e
[STORM-1766] - A better algorithm server rack selection for RAS
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d50a2437
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d50a2437
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d50a2437
Branch: refs/heads/1.x-branch
Commit: d50a2437923f2919fbee130b8e9e86a62a2d9f48
Parents: 9e9ec43
Author: Boyang Jerry Peng <je...@yahoo-inc.com>
Authored: Wed May 4 17:08:57 2016 -0500
Committer: Boyang Jerry Peng <je...@yahoo-inc.com>
Committed: Thu Aug 11 10:41:41 2016 -0500
----------------------------------------------------------------------
.../jvm/org/apache/storm/scheduler/Cluster.java | 7 +
.../storm/scheduler/resource/RAS_Node.java | 11 +-
.../DefaultResourceAwareStrategy.java | 228 +++++++++----
.../resource/TestResourceAwareScheduler.java | 114 +------
.../TestUtilsForResourceAwareScheduler.java | 25 +-
.../TestDefaultResourceAwareStrategy.java | 333 +++++++++++++++++++
6 files changed, 542 insertions(+), 176 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/d50a2437/storm-core/src/jvm/org/apache/storm/scheduler/Cluster.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/Cluster.java b/storm-core/src/jvm/org/apache/storm/scheduler/Cluster.java
index a6622ce..89cc1bc 100644
--- a/storm-core/src/jvm/org/apache/storm/scheduler/Cluster.java
+++ b/storm-core/src/jvm/org/apache/storm/scheduler/Cluster.java
@@ -103,6 +103,9 @@ public class Cluster {
this.status.putAll(src.status);
this.topologyResources.putAll(src.topologyResources);
this.blackListedHosts.addAll(src.blackListedHosts);
+ if (src.networkTopography != null) {
+ this.networkTopography = new HashMap<String, List<String>>(src.networkTopography);
+ }
}
public void setBlacklistedHosts(Set<String> hosts) {
@@ -522,6 +525,10 @@ public class Cluster {
return networkTopography;
}
+ public void setNetworkTopography(Map<String, List<String>> networkTopography) {
+ this.networkTopography = networkTopography;
+ }
+
private String getStringFromStringList(Object o) {
StringBuilder sb = new StringBuilder();
for (String s : (List<String>) o) {
http://git-wip-us.apache.org/repos/asf/storm/blob/d50a2437/storm-core/src/jvm/org/apache/storm/scheduler/resource/RAS_Node.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/resource/RAS_Node.java b/storm-core/src/jvm/org/apache/storm/scheduler/resource/RAS_Node.java
index 8d37805..79502d7 100644
--- a/storm-core/src/jvm/org/apache/storm/scheduler/resource/RAS_Node.java
+++ b/storm-core/src/jvm/org/apache/storm/scheduler/resource/RAS_Node.java
@@ -89,10 +89,6 @@ public class RAS_Node {
_sup = sup;
_availMemory = getTotalMemoryResources();
_availCPU = getTotalCpuResources();
-
- LOG.debug("Found a {} Node {} {}",
- _isAlive ? "living" : "dead", _nodeId, sup.getAllPorts());
- LOG.debug("resources_mem: {}, resources_CPU: {}", sup.getTotalMemory(), sup.getTotalCPU());
//intialize resource usages on node
intializeResources();
}
@@ -363,8 +359,11 @@ public class RAS_Node {
@Override
public String toString() {
return "{Node: " + ((_sup == null) ? "null (possibly down)" : _sup.getHost())
- + ", AvailMem: " + ((_availMemory == null) ? "N/A" : _availMemory.toString())
- + ", AvailCPU: " + ((_availCPU == null) ? "N/A" : _availCPU.toString()) + "}";
+ + ", Avail [ Mem: " + ((_availMemory == null) ? "N/A" : _availMemory.toString())
+ + ", CPU: " + ((_availCPU == null) ? "N/A" : _availCPU.toString()) + ", Slots: " + this.getFreeSlots()
+ + "] Total [ Mem: " + ((_sup == null) ? "N/A" : this.getTotalMemoryResources())
+ + ", CPU: " + ((_sup == null) ? "N/A" : this.getTotalCpuResources()) + ", Slots: "
+ + this._slots.values() + " ]}";
}
public static int countSlotsUsed(String topId, Collection<RAS_Node> nodes) {
http://git-wip-us.apache.org/repos/asf/storm/blob/d50a2437/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java b/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
index c4ce7ef..8957dc0 100644
--- a/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
+++ b/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
@@ -20,6 +20,7 @@ package org.apache.storm.scheduler.resource.strategies.scheduling;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
@@ -29,6 +30,7 @@ import java.util.Queue;
import java.util.TreeMap;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.TreeSet;
import org.apache.storm.scheduler.Cluster;
import org.apache.storm.scheduler.Topologies;
@@ -103,16 +105,16 @@ public class DefaultResourceAwareStrategy implements IStrategy {
Map<Integer, List<ExecutorDetails>> priorityToExecutorMap = getPriorityToExecutorDetailsListMap(ordered__Component_list, unassignedExecutors);
Collection<ExecutorDetails> executorsNotScheduled = new HashSet<>(unassignedExecutors);
Integer longestPriorityListSize = this.getLongestPriorityListSize(priorityToExecutorMap);
- //Pick the first executor with priority one, then the 1st exec with priority 2, so on an so forth.
+ //Pick the first executor with priority one, then the 1st exec with priority 2, so on an so forth.
//Once we reach the last priority, we go back to priority 1 and schedule the second task with priority 1.
for (int i = 0; i < longestPriorityListSize; i++) {
for (Entry<Integer, List<ExecutorDetails>> entry : priorityToExecutorMap.entrySet()) {
Iterator<ExecutorDetails> it = entry.getValue().iterator();
if (it.hasNext()) {
ExecutorDetails exec = it.next();
- LOG.debug("\n\nAttempting to schedule: {} of component {}[avail {}] with rank {}",
+ LOG.debug("\n\nAttempting to schedule: {} of component {}[ REQ {} ] with rank {}",
new Object[] { exec, td.getExecutorToComponent().get(exec),
- td.getTaskResourceReqList(exec), entry.getKey() });
+ td.getTaskResourceReqList(exec), entry.getKey() });
scheduleExecutor(exec, td, schedulerAssignmentMap, scheduledTasks);
it.remove();
}
@@ -156,42 +158,49 @@ public class DefaultResourceAwareStrategy implements IStrategy {
schedulerAssignmentMap.get(targetSlot).add(exec);
targetNode.consumeResourcesforTask(exec, td);
scheduledTasks.add(exec);
- LOG.debug("TASK {} assigned to Node: {} avail [mem: {} cpu: {}] total [mem: {} cpu: {}] on slot: {}", exec,
- targetNode, targetNode.getAvailableMemoryResources(),
+ LOG.debug("TASK {} assigned to Node: {} avail [ mem: {} cpu: {} ] total [ mem: {} cpu: {} ] on slot: {} on Rack: {}", exec,
+ targetNode.getHostname(), targetNode.getAvailableMemoryResources(),
targetNode.getAvailableCpuResources(), targetNode.getTotalMemoryResources(),
- targetNode.getTotalCpuResources(), targetSlot);
+ targetNode.getTotalCpuResources(), targetSlot, nodeToRack(targetNode));
} else {
LOG.error("Not Enough Resources to schedule Task {}", exec);
}
}
private WorkerSlot findWorkerForExec(ExecutorDetails exec, TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) {
- WorkerSlot ws;
- // first scheduling
- if (this.refNode == null) {
- String clus = this.getBestClustering();
- ws = this.getBestWorker(exec, td, clus, scheduleAssignmentMap);
- } else {
- ws = this.getBestWorker(exec, td, scheduleAssignmentMap);
- }
- if(ws != null) {
- this.refNode = this.idToNode(ws.getNodeId());
- }
- LOG.debug("reference node for the resource aware scheduler is: {}", this.refNode);
- return ws;
+ WorkerSlot ws = null;
+ // first scheduling
+ if (this.refNode == null) {
+ // iterate through an ordered list of all racks available to make sure we cannot schedule the first executor in any rack before we "give up"
+ // the list is ordered in decreasing order of effective resources. With the rack in the front of the list having the most effective resources.
+ for (RackResources rack : sortRacks(td.getId())) {
+ ws = this.getBestWorker(exec, td, rack.id, scheduleAssignmentMap);
+ if (ws != null) {
+ LOG.debug("best rack: {}", rack.id);
+ break;
+ }
+ }
+ } else {
+ ws = this.getBestWorker(exec, td, scheduleAssignmentMap);
+ }
+ if (ws != null) {
+ this.refNode = this.idToNode(ws.getNodeId());
+ }
+ LOG.debug("reference node for the resource aware scheduler is: {}", this.refNode);
+ return ws;
}
private WorkerSlot getBestWorker(ExecutorDetails exec, TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) {
return this.getBestWorker(exec, td, null, scheduleAssignmentMap);
}
- private WorkerSlot getBestWorker(ExecutorDetails exec, TopologyDetails td, String clusterId, Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) {
+ private WorkerSlot getBestWorker(ExecutorDetails exec, TopologyDetails td, String rackId, Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) {
double taskMem = td.getTotalMemReqTask(exec);
double taskCPU = td.getTotalCpuReqTask(exec);
List<RAS_Node> nodes;
- if(clusterId != null) {
- nodes = this.getAvailableNodesFromCluster(clusterId);
-
+ if(rackId != null) {
+ nodes = this.getAvailableNodesFromRack(rackId);
+
} else {
nodes = this.getAvailableNodes();
}
@@ -227,71 +236,178 @@ public class DefaultResourceAwareStrategy implements IStrategy {
return null;
}
- private String getBestClustering() {
- String bestCluster = null;
- Double mostRes = 0.0;
- for (Entry<String, List<String>> cluster : _clusterInfo
- .entrySet()) {
- Double clusterTotalRes = this.getTotalClusterRes(cluster.getValue());
- if (clusterTotalRes > mostRes) {
- mostRes = clusterTotalRes;
- bestCluster = cluster.getKey();
+ /**
+ * class to keep track of resources on a rack
+ */
+ class RackResources {
+ String id;
+ double availMem = 0.0;
+ double totalMem = 0.0;
+ double availCpu = 0.0;
+ double totalCpu = 0.0;
+ int freeSlots = 0;
+ int totalSlots = 0;
+ double effectiveResources = 0.0;
+ public RackResources(String id) {
+ this.id = id;
+ }
+
+ @Override
+ public String toString() {
+ return this.id;
+ }
+ }
+
+ /**
+ *
+ * @param topoId
+ * @return a sorted list of racks
+ * Racks are sorted by two criteria. 1) the number executors of the topology that needs to be scheduled is already on the rack in descending order.
+ * The reasoning to sort based on criterion 1 is so we schedule the rest of a topology on the same rack as the existing executors of the topology.
+ * 2) the subordinate/subservient resource availability percentage of a rack in descending order
+ * We calculate the resource availability percentage by dividing the resource availability on the rack by the resource availability of the entire cluster
+ * By doing this calculation, racks that have exhausted or little of one of the resources mentioned above will be ranked after racks that have more balanced resource availability.
+ * So we will be less likely to pick a rack that have a lot of one resource but a low amount of another.
+ */
+ TreeSet<RackResources> sortRacks(final String topoId) {
+ List<RackResources> racks = new LinkedList<RackResources>();
+ final Map<String, String> nodeIdToRackId = new HashMap<String, String>();
+ double availMemResourcesOverall = 0.0;
+ double totalMemResourcesOverall = 0.0;
+
+ double availCpuResourcesOverall = 0.0;
+ double totalCpuResourcesOverall = 0.0;
+
+ int freeSlotsOverall = 0;
+ int totalSlotsOverall = 0;
+
+ for (Entry<String, List<String>> entry : _clusterInfo.entrySet()) {
+ String rackId = entry.getKey();
+ List<String> nodeIds = entry.getValue();
+ RackResources rack = new RackResources(rackId);
+ racks.add(rack);
+ for (String nodeId : nodeIds) {
+ RAS_Node node = _nodes.getNodeById(this.NodeHostnameToId(nodeId));
+ double availMem = node.getAvailableMemoryResources();
+ double availCpu = node.getAvailableCpuResources();
+ double freeSlots = node.totalSlotsFree();
+ double totalMem = node.getTotalMemoryResources();
+ double totalCpu = node.getTotalCpuResources();
+ double totalSlots = node.totalSlots();
+
+ rack.availMem += availMem;
+ rack.totalMem += totalMem;
+ rack.availCpu += availCpu;
+ rack.totalCpu += totalCpu;
+ rack.freeSlots += freeSlots;
+ rack.totalSlots += totalSlots;
+ nodeIdToRackId.put(nodeId, rack.id);
+
+ availMemResourcesOverall += availMem;
+ availCpuResourcesOverall += availCpu;
+ freeSlotsOverall += freeSlots;
+
+ totalMemResourcesOverall += totalMem;
+ totalCpuResourcesOverall += totalCpu;
+ totalSlotsOverall += totalSlots;
}
}
- return bestCluster;
+
+ LOG.debug("Cluster Overall Avail [ CPU {} MEM {} Slots {} ] Total [ CPU {} MEM {} Slots {} ]",
+ availCpuResourcesOverall, availMemResourcesOverall, freeSlotsOverall, totalCpuResourcesOverall, totalMemResourcesOverall, totalSlotsOverall);
+ for (RackResources rack : racks) {
+ if (availCpuResourcesOverall <= 0.0 || availMemResourcesOverall <= 0.0 || freeSlotsOverall <= 0.0) {
+ rack.effectiveResources = 0.0;
+ } else {
+ rack.effectiveResources = Math.min(Math.min((rack.availCpu / availCpuResourcesOverall), (rack.availMem / availMemResourcesOverall)), ((double) rack.freeSlots / (double) freeSlotsOverall));
+ }
+ LOG.debug("rack: {} Avail [ CPU {}({}%) MEM {}({}%) Slots {}({}%) ] Total [ CPU {} MEM {} Slots {} ] effective resources: {}",
+ rack.id, rack.availCpu, rack.availCpu/availCpuResourcesOverall * 100.0, rack.availMem, rack.availMem/availMemResourcesOverall * 100.0,
+ rack.freeSlots, ((double) rack.freeSlots / (double) freeSlotsOverall) * 100.0, rack.totalCpu, rack.totalMem, rack.totalSlots, rack.effectiveResources);
+ }
+
+ TreeSet<RackResources> sortedRacks = new TreeSet<RackResources>(new Comparator<RackResources>() {
+ @Override
+ public int compare(RackResources o1, RackResources o2) {
+
+ int execsScheduledInRack1 = getExecutorsScheduledinRack(topoId, o1.id, nodeIdToRackId).size();
+ int execsScheduledInRack2 = getExecutorsScheduledinRack(topoId, o2.id, nodeIdToRackId).size();
+ if (execsScheduledInRack1 > execsScheduledInRack2) {
+ return -1;
+ } else if (execsScheduledInRack1 < execsScheduledInRack2) {
+ return 1;
+ } else {
+ if (o1.effectiveResources > o2.effectiveResources) {
+ return -1;
+ } else if (o1.effectiveResources < o2.effectiveResources) {
+ return 1;
+ }
+ else {
+ return o1.id.compareTo(o2.id);
+ }
+ }
+ }
+ });
+ sortedRacks.addAll(racks);
+ LOG.debug("Sorted rack: {}", sortedRacks);
+ return sortedRacks;
}
- private Double getTotalClusterRes(List<String> cluster) {
- Double res = 0.0;
- for (String node : cluster) {
- res += _nodes.getNodeById(this.NodeHostnameToId(node))
- .getAvailableMemoryResources()
- + _nodes.getNodeById(this.NodeHostnameToId(node))
- .getAvailableCpuResources();
+ private Collection<ExecutorDetails> getExecutorsScheduledinRack(String topoId, String rackId, Map<String, String> nodeToRack) {
+ Collection<ExecutorDetails> execs = new LinkedList<ExecutorDetails>();
+ if (_cluster.getAssignmentById(topoId) != null) {
+ for (Entry<ExecutorDetails, WorkerSlot> entry : _cluster.getAssignmentById(topoId).getExecutorToSlot().entrySet()) {
+ String nodeId = entry.getValue().getNodeId();
+ String hostname = idToNode(nodeId).getHostname();
+ ExecutorDetails exec = entry.getKey();
+ if (nodeToRack.get(hostname) != null && nodeToRack.get(hostname).equals(rackId)) {
+ execs.add(exec);
+ }
+ }
}
- return res;
+ return execs;
}
private Double distToNode(RAS_Node src, RAS_Node dest) {
if (src.getId().equals(dest.getId())) {
return 0.0;
- } else if (this.NodeToCluster(src).equals(this.NodeToCluster(dest))) {
+ } else if (this.nodeToRack(src).equals(this.nodeToRack(dest))) {
return 0.5;
} else {
return 1.0;
}
}
- private String NodeToCluster(RAS_Node node) {
+ private String nodeToRack(RAS_Node node) {
for (Entry<String, List<String>> entry : _clusterInfo
.entrySet()) {
if (entry.getValue().contains(node.getHostname())) {
return entry.getKey();
}
}
- LOG.error("Node: {} not found in any clusters", node.getHostname());
+ LOG.error("Node: {} not found in any racks", node.getHostname());
return null;
}
-
+
private List<RAS_Node> getAvailableNodes() {
LinkedList<RAS_Node> nodes = new LinkedList<>();
- for (String clusterId : _clusterInfo.keySet()) {
- nodes.addAll(this.getAvailableNodesFromCluster(clusterId));
+ for (String rackId : _clusterInfo.keySet()) {
+ nodes.addAll(this.getAvailableNodesFromRack(rackId));
}
return nodes;
}
- private List<RAS_Node> getAvailableNodesFromCluster(String clus) {
+ private List<RAS_Node> getAvailableNodesFromRack(String rackId) {
List<RAS_Node> retList = new ArrayList<>();
- for (String node_id : _clusterInfo.get(clus)) {
+ for (String node_id : _clusterInfo.get(rackId)) {
retList.add(_nodes.getNodeById(this
.NodeHostnameToId(node_id)));
}
return retList;
}
- private List<WorkerSlot> getAvailableWorkersFromCluster(String clusterId) {
- List<RAS_Node> nodes = this.getAvailableNodesFromCluster(clusterId);
+ private List<WorkerSlot> getAvailableWorkersFromRack(String rackId) {
+ List<RAS_Node> nodes = this.getAvailableNodesFromRack(rackId);
List<WorkerSlot> workers = new LinkedList<>();
for(RAS_Node node : nodes) {
workers.addAll(node.getFreeSlots());
@@ -301,8 +417,8 @@ public class DefaultResourceAwareStrategy implements IStrategy {
private List<WorkerSlot> getAvailableWorker() {
List<WorkerSlot> workers = new LinkedList<>();
- for (String clusterId : _clusterInfo.keySet()) {
- workers.addAll(this.getAvailableWorkersFromCluster(clusterId));
+ for (String rackId : _clusterInfo.keySet()) {
+ workers.addAll(this.getAvailableWorkersFromRack(rackId));
}
return workers;
}
@@ -391,7 +507,7 @@ public class DefaultResourceAwareStrategy implements IStrategy {
for(ExecutorDetails exec : execs) {
totalMem += td.getTotalMemReqTask(exec);
}
- }
+ }
return totalMem;
}
@@ -424,8 +540,8 @@ public class DefaultResourceAwareStrategy implements IStrategy {
for(String nodeHostname : clusterEntry.getValue()) {
RAS_Node node = this.idToNode(this.NodeHostnameToId(nodeHostname));
retVal += "-> Node: " + node.getHostname() + " " + node.getId() + "\n";
- retVal += "--> Avail Resources: {Mem " + node.getAvailableMemoryResources() + ", CPU " + node.getAvailableCpuResources() + "}\n";
- retVal += "--> Total Resources: {Mem " + node.getTotalMemoryResources() + ", CPU " + node.getTotalCpuResources() + "}\n";
+ retVal += "--> Avail Resources: {Mem " + node.getAvailableMemoryResources() + ", CPU " + node.getAvailableCpuResources() + " Slots: " + node.totalSlotsFree() + "}\n";
+ retVal += "--> Total Resources: {Mem " + node.getTotalMemoryResources() + ", CPU " + node.getTotalCpuResources() + " Slots: " + node.totalSlots() + "}\n";
}
}
return retVal;
http://git-wip-us.apache.org/repos/asf/storm/blob/d50a2437/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
index 9f8b980..054ecc2 100644
--- a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
+++ b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
@@ -29,8 +29,6 @@ import org.apache.storm.scheduler.SupervisorDetails;
import org.apache.storm.scheduler.Topologies;
import org.apache.storm.scheduler.TopologyDetails;
import org.apache.storm.scheduler.WorkerSlot;
-import org.apache.storm.testing.TestWordCounter;
-import org.apache.storm.testing.TestWordSpout;
import org.apache.storm.topology.BoltDeclarer;
import org.apache.storm.topology.SpoutDeclarer;
import org.apache.storm.topology.TopologyBuilder;
@@ -44,11 +42,10 @@ import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
import java.util.Map;
import java.util.Set;
+import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.genExecsAndComps;
public class TestResourceAwareScheduler {
@@ -144,6 +141,10 @@ public class TestResourceAwareScheduler {
config.put(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY, org.apache.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy.class.getName());
config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class.getName());
+ config.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, 10.0);
+ config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 128.0);
+ config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 0.0);
+
config.put(Config.TOPOLOGY_SUBMITTER_USER, TOPOLOGY_SUBMITTER);
Map<String, Map<String, Number>> resourceUserPool = new HashMap<String, Map<String, Number>>();
@@ -1236,107 +1237,6 @@ public class TestResourceAwareScheduler {
}
/**
- * test if the scheduling logic for the DefaultResourceAwareStrategy is correct
- */
- @Test
- public void testDefaultResourceAwareStrategy() {
- int spoutParallelism = 1;
- int boltParallelism = 2;
- TopologyBuilder builder = new TopologyBuilder();
- builder.setSpout("spout", new TestUtilsForResourceAwareScheduler.TestSpout(),
- spoutParallelism);
- builder.setBolt("bolt-1", new TestUtilsForResourceAwareScheduler.TestBolt(),
- boltParallelism).shuffleGrouping("spout");
- builder.setBolt("bolt-2", new TestUtilsForResourceAwareScheduler.TestBolt(),
- boltParallelism).shuffleGrouping("bolt-1");
- builder.setBolt("bolt-3", new TestUtilsForResourceAwareScheduler.TestBolt(),
- boltParallelism).shuffleGrouping("bolt-2");
-
- StormTopology stormToplogy = builder.createTopology();
-
- Config conf = new Config();
- INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
- Map<String, Number> resourceMap = new HashMap<String, Number>();
- resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 150.0);
- resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 1500.0);
- Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(4, 4, resourceMap);
- conf.putAll(Utils.readDefaultConfig());
- conf.put(Config.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY, org.apache.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy.class.getName());
- conf.put(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY, org.apache.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy.class.getName());
- conf.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class.getName());
- conf.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, 50.0);
- conf.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 250);
- conf.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 250);
- conf.put(Config.TOPOLOGY_PRIORITY, 0);
- conf.put(Config.TOPOLOGY_NAME, "testTopology");
- conf.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, Double.MAX_VALUE);
-
- TopologyDetails topo = new TopologyDetails("testTopology-id", conf, stormToplogy, 0,
- TestUtilsForResourceAwareScheduler.genExecsAndComps(stormToplogy, spoutParallelism, boltParallelism)
- , this.currentTime);
-
- Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
- topoMap.put(topo.getId(), topo);
- Topologies topologies = new Topologies(topoMap);
- Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), conf);
-
- ResourceAwareScheduler rs = new ResourceAwareScheduler();
-
- rs.prepare(conf);
- rs.schedule(topologies, cluster);
-
- Map<String, List<String>> nodeToComps = new HashMap<String, List<String>>();
- for (Map.Entry<ExecutorDetails, WorkerSlot> entry : cluster.getAssignments().get("testTopology-id").getExecutorToSlot().entrySet()) {
- WorkerSlot ws = entry.getValue();
- ExecutorDetails exec = entry.getKey();
- if (!nodeToComps.containsKey(ws.getNodeId())) {
- nodeToComps.put(ws.getNodeId(), new LinkedList<String>());
- }
- nodeToComps.get(ws.getNodeId()).add(topo.getExecutorToComponent().get(exec));
- }
-
- /**
- * check for correct scheduling
- * Since all the resource availabilites on nodes are the same in the beginining
- * DefaultResourceAwareStrategy can arbitrarily pick one thus we must find if a particular scheduling
- * exists on a node the the cluster.
- */
-
- //one node should have the below scheduling
- List<String> node1 = new LinkedList<>();
- node1.add("spout");
- node1.add("bolt-1");
- node1.add("bolt-2");
- Assert.assertTrue("Check DefaultResourceAwareStrategy scheduling", checkDefaultStrategyScheduling(nodeToComps, node1));
-
- //one node should have the below scheduling
- List<String> node2 = new LinkedList<>();
- node2.add("bolt-3");
- node2.add("bolt-1");
- node2.add("bolt-2");
-
- Assert.assertTrue("Check DefaultResourceAwareStrategy scheduling", checkDefaultStrategyScheduling(nodeToComps, node2));
-
- //one node should have the below scheduling
- List<String> node3 = new LinkedList<>();
- node3.add("bolt-3");
-
- Assert.assertTrue("Check DefaultResourceAwareStrategy scheduling", checkDefaultStrategyScheduling(nodeToComps, node3));
-
- //three used and one node should be empty
- Assert.assertEquals("only three nodes should be used", 3, nodeToComps.size());
- }
-
- private boolean checkDefaultStrategyScheduling(Map<String, List<String>> nodeToComps, List<String> schedulingToFind) {
- for (List<String> entry : nodeToComps.values()) {
- if (schedulingToFind.containsAll(entry) && entry.containsAll(schedulingToFind)) {
- return true;
- }
- }
- return false;
- }
-
- /**
* test if free slots on nodes work correctly
*/
@Test
@@ -1423,7 +1323,7 @@ public class TestResourceAwareScheduler {
topoMap.put(topo3.getId(), topo3);
Topologies topologies = new Topologies(topoMap);
-
+
ResourceAwareScheduler rs = new ResourceAwareScheduler();
rs.prepare(config);
@@ -1472,7 +1372,7 @@ public class TestResourceAwareScheduler {
StormTopology stormTopology = builder.createTopology();
TopologyDetails topo = new TopologyDetails("topo-1", config, stormTopology,
0,
- TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology, 5, 5), 0);
+ genExecsAndComps(stormTopology), 0);
Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config);
http://git-wip-us.apache.org/repos/asf/storm/blob/d50a2437/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
index f21645b..fbc0421 100644
--- a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
+++ b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
@@ -41,7 +41,6 @@ import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Time;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -94,33 +93,44 @@ public class TestUtilsForResourceAwareScheduler {
}
public static Map<String, SupervisorDetails> genSupervisors(int numSup, int numPorts, Map resourceMap) {
+ return genSupervisors(numSup, numPorts, 0, resourceMap);
+ }
+
+ public static Map<String, SupervisorDetails> genSupervisors(int numSup, int numPorts, int start, Map resourceMap) {
Map<String, SupervisorDetails> retList = new HashMap<String, SupervisorDetails>();
- for (int i = 0; i < numSup; i++) {
+ for (int i = start; i < numSup + start; i++) {
List<Number> ports = new LinkedList<Number>();
for (int j = 0; j < numPorts; j++) {
ports.add(j);
}
- SupervisorDetails sup = new SupervisorDetails("sup-" + i, "host-" + i, null, ports, resourceMap);
+ SupervisorDetails sup = new SupervisorDetails("sup-" + i, "host-" + i, null, ports, new HashMap<String, Double>(resourceMap));
retList.put(sup.getId(), sup);
}
return retList;
}
- public static Map<ExecutorDetails, String> genExecsAndComps(StormTopology topology, int spoutParallelism, int boltParallelism) {
+ public static Map<ExecutorDetails, String> genExecsAndComps(StormTopology topology) {
Map<ExecutorDetails, String> retMap = new HashMap<ExecutorDetails, String>();
int startTask = 0;
int endTask = 1;
for (Map.Entry<String, SpoutSpec> entry : topology.get_spouts().entrySet()) {
+ SpoutSpec spout = entry.getValue();
+ String spoutId = entry.getKey();
+ int spoutParallelism = spout.get_common().get_parallelism_hint();
+
for (int i = 0; i < spoutParallelism; i++) {
- retMap.put(new ExecutorDetails(startTask, endTask), entry.getKey());
+ retMap.put(new ExecutorDetails(startTask, endTask), spoutId);
startTask++;
endTask++;
}
}
for (Map.Entry<String, Bolt> entry : topology.get_bolts().entrySet()) {
+ String boltId = entry.getKey();
+ Bolt bolt = entry.getValue();
+ int boltParallelism = bolt.get_common().get_parallelism_hint();
for (int i = 0; i < boltParallelism; i++) {
- retMap.put(new ExecutorDetails(startTask, endTask), entry.getKey());
+ retMap.put(new ExecutorDetails(startTask, endTask), boltId);
startTask++;
endTask++;
}
@@ -139,7 +149,7 @@ public class TestUtilsForResourceAwareScheduler {
StormTopology topology = buildTopology(numSpout, numBolt, spoutParallelism, boltParallelism);
TopologyDetails topo = new TopologyDetails(name + "-" + launchTime, conf, topology,
0,
- genExecsAndComps(topology, spoutParallelism, boltParallelism), launchTime);
+ genExecsAndComps(topology), launchTime);
return topo;
}
@@ -161,6 +171,7 @@ public class TestUtilsForResourceAwareScheduler {
}
BoltDeclarer b1 = builder.setBolt("bolt-" + i, new TestBolt(),
boltParallelism).shuffleGrouping("spout-" + j);
+ j++;
}
return builder.createTopology();
http://git-wip-us.apache.org/repos/asf/storm/blob/d50a2437/storm-core/test/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java b/storm-core/test/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
new file mode 100644
index 0000000..77f23aa
--- /dev/null
+++ b/storm-core/test/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
@@ -0,0 +1,333 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource.strategies.scheduling;
+
+import org.apache.storm.Config;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.networktopography.DNSToSwitchMapping;
+import org.apache.storm.scheduler.Cluster;
+import org.apache.storm.scheduler.ExecutorDetails;
+import org.apache.storm.scheduler.INimbus;
+import org.apache.storm.scheduler.SchedulerAssignmentImpl;
+import org.apache.storm.scheduler.SupervisorDetails;
+import org.apache.storm.scheduler.Topologies;
+import org.apache.storm.scheduler.TopologyDetails;
+import org.apache.storm.scheduler.WorkerSlot;
+import org.apache.storm.scheduler.resource.RAS_Node;
+import org.apache.storm.scheduler.resource.RAS_Nodes;
+import org.apache.storm.scheduler.resource.ResourceAwareScheduler;
+import org.apache.storm.scheduler.resource.SchedulingResult;
+import org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.RackResources;
+import org.apache.storm.scheduler.resource.SchedulingState;
+import org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler;
+import org.apache.storm.scheduler.resource.User;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.utils.Utils;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+
+
+public class TestDefaultResourceAwareStrategy {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestDefaultResourceAwareStrategy.class);
+
+ private static int currentTime = 1450418597;
+
+ /**
+ * test if the scheduling logic for the DefaultResourceAwareStrategy is correct
+ */
+ @Test
+ public void testDefaultResourceAwareStrategy() {
+ int spoutParallelism = 1;
+ int boltParallelism = 2;
+ TopologyBuilder builder = new TopologyBuilder();
+ builder.setSpout("spout", new TestUtilsForResourceAwareScheduler.TestSpout(),
+ spoutParallelism);
+ builder.setBolt("bolt-1", new TestUtilsForResourceAwareScheduler.TestBolt(),
+ boltParallelism).shuffleGrouping("spout");
+ builder.setBolt("bolt-2", new TestUtilsForResourceAwareScheduler.TestBolt(),
+ boltParallelism).shuffleGrouping("bolt-1");
+ builder.setBolt("bolt-3", new TestUtilsForResourceAwareScheduler.TestBolt(),
+ boltParallelism).shuffleGrouping("bolt-2");
+
+ StormTopology stormToplogy = builder.createTopology();
+
+ Config conf = new Config();
+ INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
+ Map<String, Number> resourceMap = new HashMap<String, Number>();
+ resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 150.0);
+ resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 1500.0);
+ Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(4, 4, resourceMap);
+ conf.putAll(Utils.readDefaultConfig());
+ conf.put(Config.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY, org.apache.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy.class.getName());
+ conf.put(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY, org.apache.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy.class.getName());
+ conf.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class.getName());
+ conf.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, 50.0);
+ conf.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 250);
+ conf.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 250);
+ conf.put(Config.TOPOLOGY_PRIORITY, 0);
+ conf.put(Config.TOPOLOGY_NAME, "testTopology");
+ conf.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, Double.MAX_VALUE);
+
+ TopologyDetails topo = new TopologyDetails("testTopology-id", conf, stormToplogy, 0,
+ TestUtilsForResourceAwareScheduler.genExecsAndComps(stormToplogy)
+ , this.currentTime);
+
+ Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
+ topoMap.put(topo.getId(), topo);
+ Topologies topologies = new Topologies(topoMap);
+ Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), conf);
+
+ ResourceAwareScheduler rs = new ResourceAwareScheduler();
+
+ rs.prepare(conf);
+ rs.schedule(topologies, cluster);
+
+ Map<String, List<String>> nodeToComps = new HashMap<String, List<String>>();
+ for (Map.Entry<ExecutorDetails, WorkerSlot> entry : cluster.getAssignments().get("testTopology-id").getExecutorToSlot().entrySet()) {
+ WorkerSlot ws = entry.getValue();
+ ExecutorDetails exec = entry.getKey();
+ if (!nodeToComps.containsKey(ws.getNodeId())) {
+ nodeToComps.put(ws.getNodeId(), new LinkedList<String>());
+ }
+ nodeToComps.get(ws.getNodeId()).add(topo.getExecutorToComponent().get(exec));
+ }
+
+ /**
+ * check for correct scheduling
+ * Since all the resource availabilites on nodes are the same in the beginining
+ * DefaultResourceAwareStrategy can arbitrarily pick one thus we must find if a particular scheduling
+ * exists on a node the the cluster.
+ */
+
+ //one node should have the below scheduling
+ List<String> node1 = new LinkedList<>();
+ node1.add("spout");
+ node1.add("bolt-1");
+ node1.add("bolt-2");
+ Assert.assertTrue("Check DefaultResourceAwareStrategy scheduling", checkDefaultStrategyScheduling(nodeToComps, node1));
+
+ //one node should have the below scheduling
+ List<String> node2 = new LinkedList<>();
+ node2.add("bolt-3");
+ node2.add("bolt-1");
+ node2.add("bolt-2");
+
+ Assert.assertTrue("Check DefaultResourceAwareStrategy scheduling", checkDefaultStrategyScheduling(nodeToComps, node2));
+
+ //one node should have the below scheduling
+ List<String> node3 = new LinkedList<>();
+ node3.add("bolt-3");
+
+ Assert.assertTrue("Check DefaultResourceAwareStrategy scheduling", checkDefaultStrategyScheduling(nodeToComps, node3));
+
+ //three used and one node should be empty
+ Assert.assertEquals("only three nodes should be used", 3, nodeToComps.size());
+ }
+
+ private boolean checkDefaultStrategyScheduling(Map<String, List<String>> nodeToComps, List<String> schedulingToFind) {
+ for (List<String> entry : nodeToComps.values()) {
+ if (schedulingToFind.containsAll(entry) && entry.containsAll(schedulingToFind)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Test whether strategy will choose correct rack
+ */
+ @Test
+ public void testMultipleRacks() {
+
+ final Map<String, SupervisorDetails> supMap = new HashMap<String, SupervisorDetails>();
+ Map<String, Number> resourceMap = new HashMap<String, Number>();
+ //generate a rack of supervisors
+ resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 400.0);
+ resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 8000.0);
+ final Map<String, SupervisorDetails> supMapRack1 = TestUtilsForResourceAwareScheduler.genSupervisors(10, 4, 0, resourceMap);
+
+ //generate another rack of supervisors with less resources
+ resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 200.0);
+ resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 4000.0);
+ final Map<String, SupervisorDetails> supMapRack2 = TestUtilsForResourceAwareScheduler.genSupervisors(10, 4, 10, resourceMap);
+
+ //generate some supervisors that are depleted of one resource
+ resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 0.0);
+ resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 8000.0);
+ final Map<String, SupervisorDetails> supMapRack3 = TestUtilsForResourceAwareScheduler.genSupervisors(10, 4, 20, resourceMap);
+
+ //generate some that has alot of memory but little of cpu
+ resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 10.0);
+ resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 8000.0 *2 + 4000.0);
+ final Map<String, SupervisorDetails> supMapRack4 = TestUtilsForResourceAwareScheduler.genSupervisors(10, 4, 30, resourceMap);
+
+ //generate some that has alot of cpu but little of memory
+ resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 400.0 + 200.0 + 10.0);
+ resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 1000.0);
+ final Map<String, SupervisorDetails> supMapRack5 = TestUtilsForResourceAwareScheduler.genSupervisors(10, 4, 40, resourceMap);
+
+
+ supMap.putAll(supMapRack1);
+ supMap.putAll(supMapRack2);
+ supMap.putAll(supMapRack3);
+ supMap.putAll(supMapRack4);
+ supMap.putAll(supMapRack5);
+
+ Config config = new Config();
+ config.putAll(Utils.readDefaultConfig());
+ config.put(Config.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY, org.apache.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy.class.getName());
+ config.put(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY, org.apache.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy.class.getName());
+ config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class.getName());
+ config.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, 100.0);
+ config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 500);
+ config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 500);
+ config.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, Double.MAX_VALUE);
+ INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
+ Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config);
+
+ //create test DNSToSwitchMapping plugin
+ DNSToSwitchMapping TestNetworkTopographyPlugin = new DNSToSwitchMapping() {
+ @Override
+ public Map<String, String> resolve(List<String> names) {
+ Map<String, String> ret = new HashMap<String, String>();
+ for (SupervisorDetails sup : supMapRack1.values()) {
+ ret.put(sup.getHost(), "rack-0");
+ }
+ for (SupervisorDetails sup : supMapRack2.values()) {
+ ret.put(sup.getHost(), "rack-1");
+ }
+ for (SupervisorDetails sup : supMapRack3.values()) {
+ ret.put(sup.getHost(), "rack-2");
+ }
+ for (SupervisorDetails sup : supMapRack4.values()) {
+ ret.put(sup.getHost(), "rack-3");
+ }
+ for (SupervisorDetails sup : supMapRack5.values()) {
+ ret.put(sup.getHost(), "rack-4");
+ }
+ return ret;
+ }
+ };
+
+ List<String> supHostnames = new LinkedList<>();
+ for (SupervisorDetails sup : supMap.values()) {
+ supHostnames.add(sup.getHost());
+ }
+ Map<String, List<String>> rackToNodes = new HashMap<>();
+ Map<String, String> resolvedSuperVisors = TestNetworkTopographyPlugin.resolve(supHostnames);
+ for (Map.Entry<String, String> entry : resolvedSuperVisors.entrySet()) {
+ String hostName = entry.getKey();
+ String rack = entry.getValue();
+ List<String> nodesForRack = rackToNodes.get(rack);
+ if (nodesForRack == null) {
+ nodesForRack = new ArrayList<String>();
+ rackToNodes.put(rack, nodesForRack);
+ }
+ nodesForRack.add(hostName);
+ }
+ cluster.setNetworkTopography(rackToNodes);
+
+ //generate topologies
+ Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
+ TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 8, 0, 2, 0, currentTime - 2, 10);
+ TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 8, 0, 2, 0, currentTime - 2, 10);
+
+ topoMap.put(topo1.getId(), topo1);
+
+ Topologies topologies = new Topologies(topoMap);
+
+ DefaultResourceAwareStrategy rs = new DefaultResourceAwareStrategy();
+
+ rs.prepare(new SchedulingState(new HashMap<String, User>(), cluster, topologies, config));
+ TreeSet<RackResources> sortedRacks= rs.sortRacks(topo1.getId());
+
+ Assert.assertEquals("# of racks sorted", 5, sortedRacks.size());
+ Iterator<RackResources> it = sortedRacks.iterator();
+ // Ranked first since rack-0 has the most balanced set of resources
+ Assert.assertEquals("rack-0 should be ordered first", "rack-0", it.next().id);
+ // Ranked second since rack-1 has a balanced set of resources but less than rack-0
+ Assert.assertEquals("rack-1 should be ordered second", "rack-1", it.next().id);
+ // Ranked third since rack-4 has a lot of cpu but not a lot of memory
+ Assert.assertEquals("rack-4 should be ordered third", "rack-4", it.next().id);
+ // Ranked fourth since rack-3 has alot of memory but not cpu
+ Assert.assertEquals("rack-3 should be ordered fourth", "rack-3", it.next().id);
+ //Ranked last since rack-2 has not cpu resources
+ Assert.assertEquals("rack-2 should be ordered fifth", "rack-2", it.next().id);
+
+ SchedulingResult schedulingResult = rs.schedule(topo1);
+ for (Map.Entry<WorkerSlot, Collection<ExecutorDetails>> entry : schedulingResult.getSchedulingResultMap().entrySet()) {
+ WorkerSlot ws = entry.getKey();
+ Collection<ExecutorDetails> execs = entry.getValue();
+ //make sure all workers on scheduled in rack-0
+ Assert.assertEquals("assert worker scheduled on rack-0", "rack-0", resolvedSuperVisors.get(rs.idToNode(ws.getNodeId()).getHostname()));
+ // make actual assignments
+ cluster.assign(ws, topo1.getId(), execs);
+ }
+ Assert.assertEquals("All executors in topo-1 scheduled", 0, cluster.getUnassignedExecutors(topo1).size());
+
+ //Test if topology is already partially scheduled on one rack
+
+ topoMap.put(topo2.getId(), topo2);
+ topologies = new Topologies(topoMap);
+ RAS_Nodes nodes = new RAS_Nodes(cluster, topologies);
+ Iterator<ExecutorDetails> executorIterator = topo2.getExecutors().iterator();
+ List<String> nodeHostnames = rackToNodes.get("rack-1");
+ for (int i=0 ; i< topo2.getExecutors().size()/2; i++) {
+ String nodeHostname = nodeHostnames.get(i % nodeHostnames.size());
+ RAS_Node node = rs.idToNode(rs.NodeHostnameToId(nodeHostname));
+ WorkerSlot targetSlot = node.getFreeSlots().iterator().next();
+ ExecutorDetails targetExec = executorIterator.next();
+ // to keep track of free slots
+ node.assign(targetSlot, topo2, Arrays.asList(targetExec));
+ // to actually assign
+ cluster.assign(targetSlot, topo2.getId(), Arrays.asList(targetExec));
+ }
+
+ topologies.getById(topo2.getId()).getTotalMemoryResourceList();
+
+ rs = new DefaultResourceAwareStrategy();
+ rs.prepare(new SchedulingState(new HashMap<String, User>(), cluster, topologies, config));
+ // schedule topo2
+ schedulingResult = rs.schedule(topo2);
+
+ // checking assignments
+ for (Map.Entry<WorkerSlot, Collection<ExecutorDetails>> entry : schedulingResult.getSchedulingResultMap().entrySet()) {
+ WorkerSlot ws = entry.getKey();
+ Collection<ExecutorDetails> execs = entry.getValue();
+ //make sure all workers on scheduled in rack-1
+ Assert.assertEquals("assert worker scheduled on rack-1", "rack-1", resolvedSuperVisors.get(rs.idToNode(ws.getNodeId()).getHostname()));
+ // make actual assignments
+ cluster.assign(ws, topo2.getId(), execs);
+ }
+ Assert.assertEquals("All executors in topo-2 scheduled", 0, cluster.getUnassignedExecutors(topo1).size());
+ }
+}
[3/3] storm git commit: add STORM-1766 to CHANGELOG
Posted by ka...@apache.org.
add STORM-1766 to CHANGELOG
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/560746f8
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/560746f8
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/560746f8
Branch: refs/heads/1.x-branch
Commit: 560746f8ee4c3e570fa7df615829656f44459306
Parents: 790308d
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Sun Aug 14 23:45:44 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sun Aug 14 23:45:44 2016 +0900
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/560746f8/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index b190cc3..dca6f9d 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 1.1.0
+ * STORM-1766: A better algorithm server rack selection for RAS
* STORM-1913: Additions and Improvements for Trident RAS API
* STORM-2037: debug operation should be whitelisted in SimpleAclAuthorizer.
* STORM-2023: Add calcite-core to dependency of storm-sql-runtime
[2/3] storm git commit: Merge branch '1.x-STORM-1766' of
https://github.com/jerrypeng/storm into STORM-1766-1.x
Posted by ka...@apache.org.
Merge branch '1.x-STORM-1766' of https://github.com/jerrypeng/storm into STORM-1766-1.x
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/790308d2
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/790308d2
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/790308d2
Branch: refs/heads/1.x-branch
Commit: 790308d204584dbb12ce2dfd49bb61f55ba1c6c2
Parents: 66f5f68 d50a243
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Sun Aug 14 23:39:03 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sun Aug 14 23:39:03 2016 +0900
----------------------------------------------------------------------
.../jvm/org/apache/storm/scheduler/Cluster.java | 7 +
.../storm/scheduler/resource/RAS_Node.java | 11 +-
.../DefaultResourceAwareStrategy.java | 228 +++++++++----
.../resource/TestResourceAwareScheduler.java | 114 +------
.../TestUtilsForResourceAwareScheduler.java | 24 +-
.../TestDefaultResourceAwareStrategy.java | 333 +++++++++++++++++++
6 files changed, 541 insertions(+), 176 deletions(-)
----------------------------------------------------------------------