You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2018/05/14 17:36:39 UTC
[1/5] storm git commit: STORM-3040: Improve scheduler performance
Repository: storm
Updated Branches:
refs/heads/master 5cb4582ff -> 53f38bc31
http://git-wip-us.apache.org/repos/asf/storm/blob/32392bc8/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
index 222c0cd..366ee78 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
@@ -12,6 +12,8 @@
package org.apache.storm.scheduler.resource;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.storm.networktopography.DNSToSwitchMapping;
import org.apache.storm.scheduler.resource.normalization.NormalizedResources;
import org.apache.storm.Config;
import org.apache.storm.DaemonConfig;
@@ -112,6 +114,7 @@ public class TestUtilsForResourceAwareScheduler {
Map<String, Map<String, Number>> pools) {
Config config = new Config();
config.putAll(Utils.readDefaultConfig());
+ config.put(Config.STORM_NETWORK_TOPOGRAPHY_PLUGIN, GenSupervisorsDnsToSwitchMapping.class.getName());
config.put(DaemonConfig.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY, DefaultSchedulingPriorityStrategy.class.getName());
config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, DefaultResourceAwareStrategy.class.getName());
config.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, compPcore);
@@ -139,19 +142,65 @@ public class TestUtilsForResourceAwareScheduler {
public static Map<String, SupervisorDetails> genSupervisors(int numSup, int numPorts, int start,
double cpu, double mem, Map<String, Double> miscResources) {
+ return genSupervisorsWithRacks(1, numSup, numPorts, 0, start, cpu, mem, miscResources);
+
+ }
+
+ private static final Pattern HOST_NAME_PATTERN = Pattern.compile("^(host-\\d+)-(.+)$");
+
+ public static String hostNameToRackName(String hostName) {
+ Matcher m = HOST_NAME_PATTERN.matcher(hostName);
+ if (m.matches()) {
+ return m.group(2);
+ }
+ return DNSToSwitchMapping.DEFAULT_RACK;
+ }
+
+ private static final Pattern SUPERVISOR_ID_PATTERN = Pattern.compile("^(r\\d+)s(\\d+)$");
+
+ public static String supervisorIdToRackName(String hostName) {
+ Matcher m = SUPERVISOR_ID_PATTERN.matcher(hostName);
+ if (m.matches()) {
+ return m.group(1);
+ }
+ return DNSToSwitchMapping.DEFAULT_RACK;
+ }
+
+ public static class GenSupervisorsDnsToSwitchMapping implements DNSToSwitchMapping {
+
+ private Map<String, String> mappingCache = new ConcurrentHashMap<>();
+
+ @Override
+ public Map<String,String> resolve(List<String> names) {
+
+ Map<String, String> m = new HashMap<>();
+ for (String name : names) {
+ m.put(name, mappingCache.computeIfAbsent(name, TestUtilsForResourceAwareScheduler::hostNameToRackName));
+ }
+ return m;
+ }
+ }
+
+ public static Map<String, SupervisorDetails> genSupervisorsWithRacks(int numRacks, int numSupersPerRack, int numPorts, int rackStart,
+ int superInRackStart, double cpu, double mem,
+ Map<String, Double> miscResources) {
Map<String, Double> resourceMap = new HashMap<>();
resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, cpu);
resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, mem);
resourceMap.putAll(miscResources);
- Map<String, SupervisorDetails> retList = new HashMap<String, SupervisorDetails>();
- for (int i = start; i < numSup + start; i++) {
- List<Number> ports = new LinkedList<>();
- for (int j = 0; j < numPorts; j++) {
- ports.add(j);
+ Map<String, SupervisorDetails> retList = new HashMap<>();
+ for (int rack = rackStart; rack < numRacks + rackStart; rack++) {
+ for (int superInRack = superInRackStart; superInRack < (numSupersPerRack + superInRackStart); superInRack++) {
+ List<Number> ports = new LinkedList<>();
+ for (int p = 0; p < numPorts; p++) {
+ ports.add(p);
+ }
+ SupervisorDetails sup = new SupervisorDetails(String.format("r%03ds%03d", rack, superInRack),
+ String.format("host-%03d-rack-%03d", superInRack, rack), null, ports,
+ NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(resourceMap));
+ retList.put(sup.getId(), sup);
+
}
- SupervisorDetails sup = new SupervisorDetails("sup-" + i, "host-" + i, null, ports,
- NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(resourceMap));
- retList.put(sup.getId(), sup);
}
return retList;
}
@@ -201,21 +250,38 @@ public class TestUtilsForResourceAwareScheduler {
int spoutParallelism, int boltParallelism, int launchTime, int priority,
String user) {
+ return genTopology(name, config, numSpout, numBolt, spoutParallelism, boltParallelism, launchTime, priority, user,
+ Double.MAX_VALUE);
+ }
+
+ public static TopologyDetails genTopology(String name, Map<String, Object> config, int numSpout, int numBolt,
+ int spoutParallelism, int boltParallelism, int launchTime, int priority,
+ String user, double maxHeapSize) {
+ StormTopology topology = buildTopology(numSpout, numBolt, spoutParallelism, boltParallelism);
+ return topoToTopologyDetails(name, config, topology, launchTime, priority, user, maxHeapSize);
+ }
+
+ public static TopologyDetails topoToTopologyDetails(String name, Map<String, Object> config, StormTopology topology,
+ int launchTime, int priority, String user, double maxHeapSize) {
+
Config conf = new Config();
conf.putAll(config);
conf.put(Config.TOPOLOGY_PRIORITY, priority);
conf.put(Config.TOPOLOGY_NAME, name);
conf.put(Config.TOPOLOGY_SUBMITTER_USER, user);
- conf.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, Double.MAX_VALUE);
- StormTopology topology = buildTopology(numSpout, numBolt, spoutParallelism, boltParallelism);
+ conf.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, maxHeapSize);
TopologyDetails topo = new TopologyDetails(name + "-" + launchTime, conf, topology,
- 0,
- genExecsAndComps(topology), launchTime, user);
+ 0, genExecsAndComps(topology), launchTime, user);
return topo;
}
public static StormTopology buildTopology(int numSpout, int numBolt,
int spoutParallelism, int boltParallelism) {
+ return topologyBuilder(numSpout, numBolt, spoutParallelism, boltParallelism).createTopology();
+ }
+
+ public static TopologyBuilder topologyBuilder(int numSpout, int numBolt,
+ int spoutParallelism, int boltParallelism) {
LOG.debug("buildTopology with -> numSpout: " + numSpout + " spoutParallelism: "
+ spoutParallelism + " numBolt: "
+ numBolt + " boltParallelism: " + boltParallelism);
@@ -235,7 +301,7 @@ public class TestUtilsForResourceAwareScheduler {
j++;
}
- return builder.createTopology();
+ return builder;
}
public static class TestSpout extends BaseRichSpout {
http://git-wip-us.apache.org/repos/asf/storm/blob/32392bc8/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
index f82d0f7..607b815 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
@@ -319,7 +319,7 @@ public class TestDefaultResourceAwareStrategy {
List<String> nodeHostnames = rackToNodes.get("rack-1");
for (int i = 0; i< topo2.getExecutors().size()/2; i++) {
String nodeHostname = nodeHostnames.get(i % nodeHostnames.size());
- RAS_Node node = rs.idToNode(rs.nodeHostnameToId(nodeHostname));
+ RAS_Node node = rs.hostnameToNodes(nodeHostname).get(0);
WorkerSlot targetSlot = node.getFreeSlots().iterator().next();
ExecutorDetails targetExec = executorIterator.next();
// to keep track of free slots
@@ -444,7 +444,7 @@ public class TestDefaultResourceAwareStrategy {
List<String> nodeHostnames = rackToNodes.get("rack-1");
for (int i = 0; i< topo2.getExecutors().size()/2; i++) {
String nodeHostname = nodeHostnames.get(i % nodeHostnames.size());
- RAS_Node node = rs.idToNode(rs.nodeHostnameToId(nodeHostname));
+ RAS_Node node = rs.hostnameToNodes(nodeHostname).get(0);
WorkerSlot targetSlot = node.getFreeSlots().iterator().next();
ExecutorDetails targetExec = executorIterator.next();
// to keep track of free slots
http://git-wip-us.apache.org/repos/asf/storm/blob/32392bc8/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestGenericResourceAwareStrategy.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestGenericResourceAwareStrategy.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestGenericResourceAwareStrategy.java
index f7a2a84..2c41237 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestGenericResourceAwareStrategy.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestGenericResourceAwareStrategy.java
@@ -26,6 +26,8 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.storm.Config;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.generated.WorkerResources;
@@ -33,7 +35,6 @@ import org.apache.storm.scheduler.Cluster;
import org.apache.storm.scheduler.ExecutorDetails;
import org.apache.storm.scheduler.INimbus;
import org.apache.storm.scheduler.SchedulerAssignment;
-import org.apache.storm.scheduler.SchedulerAssignmentImpl;
import org.apache.storm.scheduler.SupervisorDetails;
import org.apache.storm.scheduler.SupervisorResources;
import org.apache.storm.scheduler.Topologies;
@@ -44,14 +45,12 @@ import org.apache.storm.topology.SharedOffHeapWithinNode;
import org.apache.storm.topology.SharedOffHeapWithinWorker;
import org.apache.storm.topology.SharedOnHeap;
import org.apache.storm.topology.TopologyBuilder;
-import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.*;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
public class TestGenericResourceAwareStrategy {
private static final Logger LOG = LoggerFactory.getLogger(TestGenericResourceAwareStrategy.class);
@@ -220,6 +219,66 @@ public class TestGenericResourceAwareStrategy {
foundScheduling.add(new HashSet<>(execs));
}
- Assert.assertEquals(expectedScheduling, foundScheduling);
+ assertEquals(expectedScheduling, foundScheduling);
+ }
+
+ @Test
+ public void testAntiAffinityWithMultipleTopologies() {
+ INimbus iNimbus = new INimbusTest();
+ Map<String, SupervisorDetails> supMap = genSupervisorsWithRacks(1, 40, 66, 0, 0, 4700, 226200, new HashMap<>());
+ HashMap<String, Double> extraResources = new HashMap<>();
+ extraResources.put("my.gpu", 1.0);
+ supMap.putAll(genSupervisorsWithRacks(1, 40, 66, 1, 0, 4700, 226200, extraResources));
+
+ Config config = new Config();
+ config.putAll(createGrasClusterConfig(88, 775, 25, null, null));
+
+ ResourceAwareScheduler rs = new ResourceAwareScheduler();
+ rs.prepare(config);
+
+ TopologyDetails tdSimple = genTopology("topology-simple", config, 1,
+ 5, 100, 300, 0, 0, "user", 8192);
+
+ //Schedule the simple topology first
+ Topologies topologies = new Topologies(tdSimple);
+ Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<>(), topologies, config);
+ rs.schedule(topologies, cluster);
+
+ TopologyBuilder builder = topologyBuilder(1, 5, 100, 300);
+ builder.setBolt("gpu-bolt", new TestBolt(), 40)
+ .addResource("my.gpu", 1.0)
+ .shuffleGrouping("spout-0");
+ TopologyDetails tdGpu = topoToTopologyDetails("topology-gpu", config, builder.createTopology(), 0, 0,"user", 8192);
+
+ //Now schedule GPU but with the simple topology in place.
+ topologies = new Topologies(tdSimple, tdGpu);
+ cluster = new Cluster(cluster, topologies);
+ rs.schedule(topologies, cluster);
+
+ Map<String, SchedulerAssignment> assignments = new TreeMap<>(cluster.getAssignments());
+ assertEquals(2, assignments.size());
+
+ Map<String, Map<String, AtomicLong>> topoPerRackCount = new HashMap<>();
+ for (Entry<String, SchedulerAssignment> entry: assignments.entrySet()) {
+ SchedulerAssignment sa = entry.getValue();
+ Map<String, AtomicLong> slotsPerRack = new TreeMap<>();
+ for (WorkerSlot slot : sa.getSlots()) {
+ String nodeId = slot.getNodeId();
+ String rack = supervisorIdToRackName(nodeId);
+ slotsPerRack.computeIfAbsent(rack, (r) -> new AtomicLong(0)).incrementAndGet();
+ }
+ LOG.info("{} => {}", entry.getKey(), slotsPerRack);
+ topoPerRackCount.put(entry.getKey(), slotsPerRack);
+ }
+
+ Map<String, AtomicLong> simpleCount = topoPerRackCount.get("topology-simple-0");
+ assertNotNull(simpleCount);
+ //Because the simple topology was scheduled first we want to be sure that it didn't put anything on
+ // the GPU nodes.
+ assertEquals(1, simpleCount.size()); //Only 1 rack is in use
+ assertFalse(simpleCount.containsKey("r001")); //r001 is the second rack with GPUs
+ assertTrue(simpleCount.containsKey("r000")); //r000 is the first rack with no GPUs
+
+ //We don't really care too much about the scheduling of topology-gpu-0, because it was scheduled.
}
}
[5/5] storm git commit: Merge branch 'STORM-3040' of
https://github.com/revans2/incubator-storm into STORM-3040
Posted by bo...@apache.org.
Merge branch 'STORM-3040' of https://github.com/revans2/incubator-storm into STORM-3040
STORM-3040: Improve scheduler performance
This closes #2647
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/53f38bc3
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/53f38bc3
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/53f38bc3
Branch: refs/heads/master
Commit: 53f38bc31f2fd315a520ba6b86c0a60be08381cc
Parents: 5cb4582 558e9b6
Author: Robert Evans <ev...@yahoo-inc.com>
Authored: Mon May 14 12:15:13 2018 -0500
Committer: Robert Evans <ev...@yahoo-inc.com>
Committed: Mon May 14 12:15:13 2018 -0500
----------------------------------------------------------------------
pom.xml | 11 +-
.../apache/storm/testing/PerformanceTest.java | 37 ++
.../org/apache/storm/scheduler/Cluster.java | 65 +--
.../storm/scheduler/ISchedulingState.java | 1 +
.../scheduler/SchedulerAssignmentImpl.java | 46 +-
.../storm/scheduler/resource/RAS_Node.java | 97 +++--
.../normalization/NormalizedResourceOffer.java | 46 +-
.../NormalizedResourceRequest.java | 36 +-
.../normalization/NormalizedResources.java | 38 +-
.../NormalizedResourcesWithMemory.java | 5 +
.../normalization/ResourceMapArrayBridge.java | 8 +
.../scheduling/BaseResourceAwareStrategy.java | 426 ++++++++++++-------
.../scheduling/ConstraintSolverStrategy.java | 23 +-
.../DefaultResourceAwareStrategy.java | 14 +-
.../GenericResourceAwareStrategy.java | 34 +-
.../resource/TestResourceAwareScheduler.java | 137 ++++--
.../TestUtilsForResourceAwareScheduler.java | 92 +++-
.../TestDefaultResourceAwareStrategy.java | 4 +-
.../TestGenericResourceAwareStrategy.java | 69 ++-
19 files changed, 826 insertions(+), 363 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/53f38bc3/pom.xml
----------------------------------------------------------------------
[3/5] storm git commit: Fixed Checkstyle Issues
Posted by bo...@apache.org.
Fixed Checkstyle Issues
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d0be8aed
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d0be8aed
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d0be8aed
Branch: refs/heads/master
Commit: d0be8aed6ccba2abf7b32084fe8255f15514563c
Parents: 32392bc
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Mon May 7 09:56:27 2018 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Mon May 7 09:56:27 2018 -0500
----------------------------------------------------------------------
.../apache/storm/testing/PerformanceTest.java | 4 +--
.../storm/scheduler/resource/RAS_Node.java | 37 +++++++++-----------
.../normalization/NormalizedResourceOffer.java | 9 +++--
.../NormalizedResourceRequest.java | 8 +++++
.../normalization/NormalizedResources.java | 11 ++++--
.../normalization/ResourceMapArrayBridge.java | 2 +-
.../GenericResourceAwareStrategy.java | 4 +++
7 files changed, 46 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/d0be8aed/storm-client/src/jvm/org/apache/storm/testing/PerformanceTest.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/testing/PerformanceTest.java b/storm-client/src/jvm/org/apache/storm/testing/PerformanceTest.java
index e657dd8..c5eba2f 100644
--- a/storm-client/src/jvm/org/apache/storm/testing/PerformanceTest.java
+++ b/storm-client/src/jvm/org/apache/storm/testing/PerformanceTest.java
@@ -25,13 +25,11 @@ package org.apache.storm.testing;
* add the annotation @Category(PerformanceTest.class) to the class definition as well as to its hierarchy of superclasses.
* For example:
* <p/>
- *
- *
* @ Category(PerformanceTest.class)<br/>
* public class MyPerformanceTest {<br/>
* ...<br/>
* }
- *
+ * <p/>
* In general performance tests should have a time limit on them, but the time limit should be liberal enough to account
* for running on CI systems like travis ci, or the apache jenkins build.
*/
http://git-wip-us.apache.org/repos/asf/storm/blob/d0be8aed/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
index 3a9570e..6075d5a 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
@@ -53,6 +53,14 @@ public class RAS_Node {
private boolean isAlive;
private SupervisorDetails sup;
+ /**
+ * Create a new node.
+ * @param nodeId the id of the node.
+ * @param sup the supervisor this is for.
+ * @param cluster the cluster this is a part of.
+ * @param workerIdToWorker the mapping of slots already assigned to this node.
+ * @param assignmentMap the mapping of executors already assigned to this node.
+ */
public RAS_Node(
String nodeId,
SupervisorDetails sup,
@@ -99,26 +107,6 @@ public class RAS_Node {
}
}
- public static int countFreeSlotsAlive(Collection<RAS_Node> nodes) {
- int total = 0;
- for (RAS_Node n : nodes) {
- if (n.isAlive()) {
- total += n.totalSlotsFree();
- }
- }
- return total;
- }
-
- public static int countTotalSlotsAlive(Collection<RAS_Node> nodes) {
- int total = 0;
- for (RAS_Node n : nodes) {
- if (n.isAlive()) {
- total += n.totalSlots();
- }
- }
- return total;
- }
-
public String getId() {
return nodeId;
}
@@ -135,6 +123,10 @@ public class RAS_Node {
return ret;
}
+ /**
+ * Get the IDs of all free slots on this node.
+ * @return the ids of the free slots.
+ */
public Collection<String> getFreeSlotsId() {
if (!isAlive) {
return new HashSet<>();
@@ -164,6 +156,11 @@ public class RAS_Node {
return workerIdsToWorkers(getUsedSlotsId());
}
+ /**
+ * Get slots used by the given topology.
+ * @param topId the id of the topology to get.
+ * @return the slots currently assigned to that topology on this node.
+ */
public Collection<WorkerSlot> getUsedSlots(String topId) {
Collection<WorkerSlot> ret = null;
if (topIdToUsedSlots.get(topId) != null) {
http://git-wip-us.apache.org/repos/asf/storm/blob/d0be8aed/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceOffer.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceOffer.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceOffer.java
index 790a42b..b0defbb 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceOffer.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceOffer.java
@@ -96,7 +96,7 @@ public class NormalizedResourceOffer implements NormalizedResourcesWithMemory {
/**
* Calculate the average percentage used.
* @see NormalizedResources#calculateAveragePercentageUsedBy(org.apache.storm.scheduler.resource.normalization.NormalizedResources,
- * double, double).
+ * double, double)
*/
public double calculateAveragePercentageUsedBy(NormalizedResourceOffer used) {
return normalizedResources.calculateAveragePercentageUsedBy(
@@ -115,7 +115,7 @@ public class NormalizedResourceOffer implements NormalizedResourcesWithMemory {
/**
* Check if resources might be able to fit.
* @see NormalizedResources#couldHoldIgnoringSharedMemory(org.apache.storm.scheduler.resource.normalization.NormalizedResources, double,
- * double).
+ * double)
*/
public boolean couldHoldIgnoringSharedMemory(NormalizedResourcesWithMemory other) {
return normalizedResources.couldHoldIgnoringSharedMemory(
@@ -136,6 +136,11 @@ public class NormalizedResourceOffer implements NormalizedResourcesWithMemory {
return "Normalized resources: " + toNormalizedMap();
}
+ /**
+ * If a node or rack has a kind of resource not in a request, make that resource negative so when sorting that node or rack will
+ * be less likely to be selected.
+ * @param requestedResources the requested resources.
+ */
public void updateForRareResourceAffinity(NormalizedResourceRequest requestedResources) {
normalizedResources.updateForRareResourceAffinity(requestedResources.getNormalizedResources());
}
http://git-wip-us.apache.org/repos/asf/storm/blob/d0be8aed/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceRequest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceRequest.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceRequest.java
index 86033b8..2a30ffc 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceRequest.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceRequest.java
@@ -134,6 +134,10 @@ public class NormalizedResourceRequest implements NormalizedResourcesWithMemory
return topologyResources;
}
+ /**
+ * Convert to a map that is used by configuration and the UI.
+ * @return a map with the key as the resource name and the value the resource amount.
+ */
public Map<String, Double> toNormalizedMap() {
Map<String, Double> ret = this.normalizedResources.toNormalizedMap();
ret.put(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, offHeap);
@@ -168,6 +172,10 @@ public class NormalizedResourceRequest implements NormalizedResourcesWithMemory
offHeap += other.offHeap;
}
+ /**
+ * Add the resources from a worker to those in this.
+ * @param value the resources on the worker.
+ */
public void add(WorkerResources value) {
this.normalizedResources.add(value);
//The resources are already normalized
http://git-wip-us.apache.org/repos/asf/storm/blob/d0be8aed/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java
index e57d8f2..9fb717f 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java
@@ -347,10 +347,15 @@ public class NormalizedResources {
return min * 100.0;
}
- public void updateForRareResourceAffinity(NormalizedResources other) {
- int length = Math.min(this.otherResources.length, other.otherResources.length);
+ /**
+ * If a node or rack has a kind of resource not in a request, make that resource negative so when sorting that node or rack will
+ * be less likely to be selected.
+ * @param request the requested resources.
+ */
+ public void updateForRareResourceAffinity(NormalizedResources request) {
+ int length = Math.min(this.otherResources.length, request.otherResources.length);
for (int i = 0; i < length; i++) {
- if (other.getResourceAt(i) == 0.0) {
+ if (request.getResourceAt(i) == 0.0) {
this.otherResources[i] = -1 * this.otherResources[i];
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/d0be8aed/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/ResourceMapArrayBridge.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/ResourceMapArrayBridge.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/ResourceMapArrayBridge.java
index 5a71db4..4a4797b 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/ResourceMapArrayBridge.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/ResourceMapArrayBridge.java
@@ -65,7 +65,7 @@ public class ResourceMapArrayBridge {
}
/**
- * Create an array that has all values 0;
+ * Create an array that has all values 0.
* @return the empty array.
*/
public double[] empty() {
http://git-wip-us.apache.org/repos/asf/storm/blob/d0be8aed/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/GenericResourceAwareStrategy.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/GenericResourceAwareStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/GenericResourceAwareStrategy.java
index ce98abd..0c85e1c 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/GenericResourceAwareStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/GenericResourceAwareStrategy.java
@@ -44,6 +44,10 @@ public class GenericResourceAwareStrategy extends BaseResourceAwareStrategy impl
final AllResources allResources, ExecutorDetails exec, TopologyDetails topologyDetails,
final ExistingScheduleFunc existingScheduleFunc) {
AllResources affinityBasedAllResources = new AllResources(allResources);
+ NormalizedResourceRequest requestedResources = topologyDetails.getTotalResources(exec);
+ for (ObjectResources objectResources : affinityBasedAllResources.objectResources) {
+ objectResources.availableResources.updateForRareResourceAffinity(requestedResources);
+ }
TreeSet<ObjectResources> sortedObjectResources =
new TreeSet<>((o1, o2) -> {
[4/5] storm git commit: Addressed review comments
Posted by bo...@apache.org.
Addressed review comments
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/558e9b67
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/558e9b67
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/558e9b67
Branch: refs/heads/master
Commit: 558e9b671a7904aec5798cbac6cbdd2aca65b488
Parents: d0be8ae
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Tue May 8 09:56:22 2018 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Tue May 8 09:56:22 2018 -0500
----------------------------------------------------------------------
.../src/main/java/org/apache/storm/scheduler/Cluster.java | 3 +++
.../main/java/org/apache/storm/scheduler/ISchedulingState.java | 1 +
2 files changed, 4 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/558e9b67/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
index 0fc78e2..ab95bd2 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
@@ -47,6 +47,9 @@ import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * The current state of the storm cluster. Cluster is not currently thread safe.
+ */
public class Cluster implements ISchedulingState {
private static final Logger LOG = LoggerFactory.getLogger(Cluster.class);
private static final Function<String, Set<WorkerSlot>> MAKE_SET = (x) -> new HashSet<>();
http://git-wip-us.apache.org/repos/asf/storm/blob/558e9b67/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java b/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java
index b0d94b5..d96109c 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java
@@ -29,6 +29,7 @@ import org.apache.storm.scheduler.resource.normalization.NormalizedResourceReque
/**
* An interface that provides access to the current scheduling state.
+ * The scheduling state is not guaranteed to be thread safe.
*/
public interface ISchedulingState {
[2/5] storm git commit: STORM-3040: Improve scheduler performance
Posted by bo...@apache.org.
STORM-3040: Improve scheduler performance
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/32392bc8
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/32392bc8
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/32392bc8
Branch: refs/heads/master
Commit: 32392bc8e19a21a7d8c22248fedf42673472acbc
Parents: 02639f4
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Tue Apr 24 15:19:32 2018 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Fri May 4 13:59:04 2018 -0500
----------------------------------------------------------------------
pom.xml | 11 +-
.../apache/storm/testing/PerformanceTest.java | 39 ++
.../org/apache/storm/scheduler/Cluster.java | 62 +--
.../scheduler/SchedulerAssignmentImpl.java | 46 +-
.../storm/scheduler/resource/RAS_Node.java | 60 +--
.../normalization/NormalizedResourceOffer.java | 37 +-
.../NormalizedResourceRequest.java | 28 +-
.../normalization/NormalizedResources.java | 33 +-
.../NormalizedResourcesWithMemory.java | 5 +
.../normalization/ResourceMapArrayBridge.java | 8 +
.../scheduling/BaseResourceAwareStrategy.java | 426 ++++++++++++-------
.../scheduling/ConstraintSolverStrategy.java | 23 +-
.../DefaultResourceAwareStrategy.java | 14 +-
.../GenericResourceAwareStrategy.java | 30 +-
.../resource/TestResourceAwareScheduler.java | 137 ++++--
.../TestUtilsForResourceAwareScheduler.java | 92 +++-
.../TestDefaultResourceAwareStrategy.java | 4 +-
.../TestGenericResourceAwareStrategy.java | 69 ++-
18 files changed, 783 insertions(+), 341 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/32392bc8/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index c486ac6..870d909 100644
--- a/pom.xml
+++ b/pom.xml
@@ -328,7 +328,7 @@
<!-- Java and clojure build lifecycle test properties are defined here to avoid having to create a default profile -->
- <java.unit.test.exclude>org.apache.storm.testing.IntegrationTest</java.unit.test.exclude>
+ <java.unit.test.exclude>org.apache.storm.testing.IntegrationTest, org.apache.storm.testing.PerformanceTest</java.unit.test.exclude>
<java.unit.test.include>**/Test*.java, **/*Test.java, **/*TestCase.java</java.unit.test.include> <!--maven surefire plugin default test list-->
<java.integration.test.include>no.tests</java.integration.test.include>
<!-- by default the clojure test set are all clojure tests that are not integration tests. This property is overridden in the profiles -->
@@ -626,10 +626,19 @@
<id>all-tests</id>
<properties>
<java.integration.test.include>**/*.java</java.integration.test.include>
+ <!-- add perf tests back in -->
+ <java.unit.test.exclude>org.apache.storm.testing.IntegrationTest</java.unit.test.exclude>
<clojure.test.set>*.*</clojure.test.set>
</properties>
</profile>
<profile>
+ <id>performance-tests</id>
+ <properties>
+ <!-- add perf tests back in -->
+ <java.unit.test.exclude>org.apache.storm.testing.IntegrationTest</java.unit.test.exclude>
+ </properties>
+ </profile>
+ <profile>
<id>integration-tests-only</id>
<properties>
<!--Java-->
http://git-wip-us.apache.org/repos/asf/storm/blob/32392bc8/storm-client/src/jvm/org/apache/storm/testing/PerformanceTest.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/testing/PerformanceTest.java b/storm-client/src/jvm/org/apache/storm/testing/PerformanceTest.java
new file mode 100644
index 0000000..e657dd8
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/testing/PerformanceTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.testing;
+
+/**
+ * Marker interface used to mark performance tests. Performance tests will be run if the profile performance-tests or all-tests are enabled.
+ * <p/>
+ * Performance tests can be in the same package as unit tests. To mark a test as a performance test,
+ * add the annotation @Category(PerformanceTest.class) to the class definition as well as to its hierarchy of superclasses.
+ * For example:
+ * <p/>
+ *
+ *
+ * @ Category(PerformanceTest.class)<br/>
+ * public class MyPerformanceTest {<br/>
+ * ...<br/>
+ * }
+ *
+ * In general performance tests should have a time limit on them, but the time limit should be liberal enough to account
+ * for running on CI systems like travis ci, or the apache jenkins build.
+ */
+public interface PerformanceTest {
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/32392bc8/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
index bc08579..0fc78e2 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
@@ -28,6 +28,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.storm.Config;
import org.apache.storm.Constants;
@@ -48,6 +49,9 @@ import org.slf4j.LoggerFactory;
public class Cluster implements ISchedulingState {
private static final Logger LOG = LoggerFactory.getLogger(Cluster.class);
+ private static final Function<String, Set<WorkerSlot>> MAKE_SET = (x) -> new HashSet<>();
+ private static final Function<String, Map<WorkerSlot, NormalizedResourceRequest>> MAKE_MAP = (x) -> new HashMap<>();
+
/**
* key: supervisor id, value: supervisor details.
*/
@@ -72,6 +76,7 @@ public class Cluster implements ISchedulingState {
private final Topologies topologies;
private final Map<String, Map<WorkerSlot, NormalizedResourceRequest>> nodeToScheduledResourcesCache;
private final Map<String, Set<WorkerSlot>> nodeToUsedSlotsCache;
+ private final Map<String, NormalizedResourceRequest> totalResourcesPerNodeCache = new HashMap<>();
private SchedulerAssignmentImpl assignment;
private Set<String> blackListedHosts = new HashSet<>();
private INimbus inimbus;
@@ -147,7 +152,7 @@ public class Cluster implements ISchedulingState {
this.conf = conf;
this.topologies = topologies;
- ArrayList<String> supervisorHostNames = new ArrayList<String>();
+ ArrayList<String> supervisorHostNames = new ArrayList<>();
for (SupervisorDetails s : supervisors.values()) {
supervisorHostNames.add(s.getHost());
}
@@ -294,7 +299,7 @@ public class Cluster implements ISchedulingState {
@Override
public List<TopologyDetails> needsSchedulingTopologies() {
- List<TopologyDetails> ret = new ArrayList<TopologyDetails>();
+ List<TopologyDetails> ret = new ArrayList<>();
for (TopologyDetails topology : getTopologies()) {
if (needsScheduling(topology)) {
ret.add(topology);
@@ -349,8 +354,10 @@ public class Cluster implements ISchedulingState {
@Override
public Set<Integer> getUsedPorts(SupervisorDetails supervisor) {
- return nodeToUsedSlotsCache.computeIfAbsent(supervisor.getId(), (x) -> new HashSet<>()).stream().map(WorkerSlot::getPort)
- .collect(Collectors.toSet());
+ return nodeToUsedSlotsCache.computeIfAbsent(supervisor.getId(), MAKE_SET)
+ .stream()
+ .map(WorkerSlot::getPort)
+ .collect(Collectors.toSet());
}
@Override
@@ -374,7 +381,7 @@ public class Cluster implements ISchedulingState {
@Override
public List<WorkerSlot> getAvailableSlots() {
- List<WorkerSlot> slots = new ArrayList<WorkerSlot>();
+ List<WorkerSlot> slots = new ArrayList<>();
for (SupervisorDetails supervisor : this.supervisors.values()) {
slots.addAll(this.getAvailableSlots(supervisor));
}
@@ -385,7 +392,7 @@ public class Cluster implements ISchedulingState {
@Override
public List<WorkerSlot> getAvailableSlots(SupervisorDetails supervisor) {
Set<Integer> ports = this.getAvailablePorts(supervisor);
- List<WorkerSlot> slots = new ArrayList<WorkerSlot>(ports.size());
+ List<WorkerSlot> slots = new ArrayList<>(ports.size());
for (Integer port : ports) {
slots.add(new WorkerSlot(supervisor.getId(), port));
@@ -397,7 +404,7 @@ public class Cluster implements ISchedulingState {
@Override
public List<WorkerSlot> getAssignableSlots(SupervisorDetails supervisor) {
Set<Integer> ports = this.getAssignablePorts(supervisor);
- List<WorkerSlot> slots = new ArrayList<WorkerSlot>(ports.size());
+ List<WorkerSlot> slots = new ArrayList<>(ports.size());
for (Integer port : ports) {
slots.add(new WorkerSlot(supervisor.getId(), port));
@@ -408,7 +415,7 @@ public class Cluster implements ISchedulingState {
@Override
public List<WorkerSlot> getAssignableSlots() {
- List<WorkerSlot> slots = new ArrayList<WorkerSlot>();
+ List<WorkerSlot> slots = new ArrayList<>();
for (SupervisorDetails supervisor : this.supervisors.values()) {
slots.addAll(this.getAssignableSlots(supervisor));
}
@@ -419,7 +426,7 @@ public class Cluster implements ISchedulingState {
@Override
public Collection<ExecutorDetails> getUnassignedExecutors(TopologyDetails topology) {
if (topology == null) {
- return new ArrayList<ExecutorDetails>(0);
+ return new ArrayList<>(0);
}
Collection<ExecutorDetails> ret = new HashSet<>(topology.getExecutors());
@@ -441,7 +448,7 @@ public class Cluster implements ISchedulingState {
return 0;
}
- Set<WorkerSlot> slots = new HashSet<WorkerSlot>();
+ Set<WorkerSlot> slots = new HashSet<>();
slots.addAll(assignment.getExecutorToSlot().values());
return slots.size();
}
@@ -608,6 +615,7 @@ public class Cluster implements ISchedulingState {
double sharedOffHeapMemory = calculateSharedOffHeapMemory(nodeId, assignment);
assignment.setTotalSharedOffHeapMemory(nodeId, sharedOffHeapMemory);
updateCachesForWorkerSlot(slot, resources, sharedOffHeapMemory);
+ totalResourcesPerNodeCache.remove(slot.getNodeId());
}
/**
@@ -676,10 +684,12 @@ public class Cluster implements ISchedulingState {
String nodeId = slot.getNodeId();
assignment.setTotalSharedOffHeapMemory(
nodeId, calculateSharedOffHeapMemory(nodeId, assignment));
- nodeToScheduledResourcesCache.computeIfAbsent(nodeId, (x) -> new HashMap<>()).put(slot, new NormalizedResourceRequest());
- nodeToUsedSlotsCache.computeIfAbsent(nodeId, (x) -> new HashSet<>()).remove(slot);
+ nodeToScheduledResourcesCache.computeIfAbsent(nodeId, MAKE_MAP).put(slot, new NormalizedResourceRequest());
+ nodeToUsedSlotsCache.computeIfAbsent(nodeId, MAKE_SET).remove(slot);
}
}
+ //Invalidate the cache as something on the node changed
+ totalResourcesPerNodeCache.remove(slot.getNodeId());
}
/**
@@ -697,7 +707,7 @@ public class Cluster implements ISchedulingState {
@Override
public boolean isSlotOccupied(WorkerSlot slot) {
- return nodeToUsedSlotsCache.computeIfAbsent(slot.getNodeId(), (x) -> new HashSet<>()).contains(slot);
+ return nodeToUsedSlotsCache.computeIfAbsent(slot.getNodeId(), MAKE_SET).contains(slot);
}
@Override
@@ -731,7 +741,7 @@ public class Cluster implements ISchedulingState {
@Override
public List<SupervisorDetails> getSupervisorsByHost(String host) {
List<String> nodeIds = this.hostToId.get(host);
- List<SupervisorDetails> ret = new ArrayList<SupervisorDetails>();
+ List<SupervisorDetails> ret = new ArrayList<>();
if (nodeIds != null) {
for (String nodeId : nodeIds) {
@@ -744,7 +754,7 @@ public class Cluster implements ISchedulingState {
@Override
public Map<String, SchedulerAssignment> getAssignments() {
- return new HashMap<String, SchedulerAssignment>(assignments);
+ return new HashMap<>(assignments);
}
/**
@@ -763,6 +773,7 @@ public class Cluster implements ISchedulingState {
assertValidTopologyForModification(assignment.getTopologyId());
}
assignments.clear();
+ totalResourcesPerNodeCache.clear();
nodeToScheduledResourcesCache.values().forEach(Map::clear);
nodeToUsedSlotsCache.values().forEach(Set::clear);
for (SchedulerAssignment assignment : newAssignments.values()) {
@@ -916,28 +927,27 @@ public class Cluster implements ISchedulingState {
return ret;
}
-
/**
- * This medhod updates ScheduledResources and UsedSlots cache for given workerSlot
- *
- * @param workerSlot
- * @param workerResources
- * @param sharedoffHeapMemory
+ * This medhod updates ScheduledResources and UsedSlots cache for given workerSlot.
*/
private void updateCachesForWorkerSlot(WorkerSlot workerSlot, WorkerResources workerResources, Double sharedoffHeapMemory) {
String nodeId = workerSlot.getNodeId();
NormalizedResourceRequest normalizedResourceRequest = new NormalizedResourceRequest();
normalizedResourceRequest.add(workerResources);
normalizedResourceRequest.addOffHeap(sharedoffHeapMemory);
- nodeToScheduledResourcesCache.computeIfAbsent(nodeId, (x) -> new HashMap<>()).put(workerSlot, normalizedResourceRequest);
- nodeToUsedSlotsCache.computeIfAbsent(nodeId, (x) -> new HashSet<>()).add(workerSlot);
+ nodeToScheduledResourcesCache.computeIfAbsent(nodeId, MAKE_MAP).put(workerSlot, normalizedResourceRequest);
+ nodeToUsedSlotsCache.computeIfAbsent(nodeId, MAKE_SET).add(workerSlot);
}
@Override
public NormalizedResourceRequest getAllScheduledResourcesForNode(String nodeId) {
- NormalizedResourceRequest totalScheduledResources = new NormalizedResourceRequest();
- nodeToScheduledResourcesCache.computeIfAbsent(nodeId, (x) -> new HashMap<>()).values().forEach(totalScheduledResources::add);
- return totalScheduledResources;
+ return totalResourcesPerNodeCache.computeIfAbsent(nodeId, (nid) -> {
+ NormalizedResourceRequest totalScheduledResources = new NormalizedResourceRequest();
+ for (NormalizedResourceRequest req : nodeToScheduledResourcesCache.computeIfAbsent(nodeId, MAKE_MAP).values()) {
+ totalScheduledResources.add(req);
+ }
+ return totalScheduledResources;
+ });
}
@Override
http://git-wip-us.apache.org/repos/asf/storm/blob/32392bc8/storm-server/src/main/java/org/apache/storm/scheduler/SchedulerAssignmentImpl.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/SchedulerAssignmentImpl.java b/storm-server/src/main/java/org/apache/storm/scheduler/SchedulerAssignmentImpl.java
index dcb2ebc..077dafe 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/SchedulerAssignmentImpl.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/SchedulerAssignmentImpl.java
@@ -26,12 +26,14 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.function.Function;
import org.apache.storm.generated.WorkerResources;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SchedulerAssignmentImpl implements SchedulerAssignment {
private static final Logger LOG = LoggerFactory.getLogger(SchedulerAssignmentImpl.class);
+ private static Function<WorkerSlot, Collection<ExecutorDetails>> MAKE_LIST = (k) -> new LinkedList<>();
/**
* topology-id this assignment is for.
@@ -44,8 +46,7 @@ public class SchedulerAssignmentImpl implements SchedulerAssignment {
private final Map<ExecutorDetails, WorkerSlot> executorToSlot = new HashMap<>();
private final Map<WorkerSlot, WorkerResources> resources = new HashMap<>();
private final Map<String, Double> nodeIdToTotalSharedOffHeap = new HashMap<>();
- //Used to cache the slotToExecutors mapping.
- private Map<WorkerSlot, Collection<ExecutorDetails>> slotToExecutorsCache = null;
+ private final Map<WorkerSlot, Collection<ExecutorDetails>> slotToExecutors = new HashMap<>();
/**
* Create a new assignment.
@@ -63,6 +64,9 @@ public class SchedulerAssignmentImpl implements SchedulerAssignment {
throw new RuntimeException("Cannot create a scheduling with a null in it " + executorToSlot);
}
this.executorToSlot.putAll(executorToSlot);
+ for (Map.Entry<ExecutorDetails, WorkerSlot> entry : executorToSlot.entrySet()) {
+ slotToExecutors.computeIfAbsent(entry.getValue(), MAKE_LIST).add(entry.getKey());
+ }
}
if (resources != null) {
if (resources.entrySet().stream().anyMatch((entry) -> entry.getKey() == null || entry.getValue() == null)) {
@@ -149,32 +153,26 @@ public class SchedulerAssignmentImpl implements SchedulerAssignment {
for (ExecutorDetails executor : executors) {
this.executorToSlot.put(executor, slot);
}
+ slotToExecutors.computeIfAbsent(slot, MAKE_LIST)
+ .addAll(executors);
if (slotResources != null) {
resources.put(slot, slotResources);
} else {
resources.remove(slot);
}
- //Clear the cache scheduling changed
- slotToExecutorsCache = null;
}
/**
* Release the slot occupied by this assignment.
*/
public void unassignBySlot(WorkerSlot slot) {
- //Clear the cache scheduling is going to change
- slotToExecutorsCache = null;
- List<ExecutorDetails> executors = new ArrayList<>();
- for (ExecutorDetails executor : executorToSlot.keySet()) {
- WorkerSlot ws = executorToSlot.get(executor);
- if (ws.equals(slot)) {
- executors.add(executor);
- }
- }
+ Collection<ExecutorDetails> executors = slotToExecutors.remove(slot);
// remove
- for (ExecutorDetails executor : executors) {
- executorToSlot.remove(executor);
+ if (executors != null) {
+ for (ExecutorDetails executor : executors) {
+ executorToSlot.remove(executor);
+ }
}
resources.remove(slot);
@@ -194,7 +192,7 @@ public class SchedulerAssignmentImpl implements SchedulerAssignment {
@Override
public boolean isSlotOccupied(WorkerSlot slot) {
- return this.executorToSlot.containsValue(slot);
+ return this.slotToExecutors.containsKey(slot);
}
@Override
@@ -219,21 +217,7 @@ public class SchedulerAssignmentImpl implements SchedulerAssignment {
@Override
public Map<WorkerSlot, Collection<ExecutorDetails>> getSlotToExecutors() {
- Map<WorkerSlot, Collection<ExecutorDetails>> ret = slotToExecutorsCache;
- if (ret != null) {
- return ret;
- }
- ret = new HashMap<>();
- for (Map.Entry<ExecutorDetails, WorkerSlot> entry : executorToSlot.entrySet()) {
- ExecutorDetails exec = entry.getKey();
- WorkerSlot ws = entry.getValue();
- if (!ret.containsKey(ws)) {
- ret.put(ws, new LinkedList<ExecutorDetails>());
- }
- ret.get(ws).add(exec);
- }
- slotToExecutorsCache = ret;
- return ret;
+ return slotToExecutors;
}
@Override
http://git-wip-us.apache.org/repos/asf/storm/blob/32392bc8/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
index 4f43c0a..3a9570e 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
@@ -31,6 +31,7 @@ import org.apache.storm.scheduler.SupervisorDetails;
import org.apache.storm.scheduler.TopologyDetails;
import org.apache.storm.scheduler.WorkerSlot;
import org.apache.storm.scheduler.resource.normalization.NormalizedResourceOffer;
+import org.apache.storm.scheduler.resource.normalization.NormalizedResourceRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -127,7 +128,7 @@ public class RAS_Node {
}
private Collection<WorkerSlot> workerIdsToWorkers(Collection<String> workerIds) {
- Collection<WorkerSlot> ret = new LinkedList<WorkerSlot>();
+ Collection<WorkerSlot> ret = new LinkedList<>();
for (String workerId : workerIds) {
ret.add(slots.get(workerId));
}
@@ -136,26 +137,15 @@ public class RAS_Node {
public Collection<String> getFreeSlotsId() {
if (!isAlive) {
- return new HashSet<String>();
+ return new HashSet<>();
}
- Collection<String> usedSlotsId = getUsedSlotsId();
- Set<String> ret = new HashSet<>();
- ret.addAll(slots.keySet());
- ret.removeAll(usedSlotsId);
+ Set<String> ret = new HashSet<>(slots.keySet());
+ ret.removeAll(getUsedSlotsId());
return ret;
}
- public Collection<WorkerSlot> getSlotsAvailbleTo(TopologyDetails td) {
- //Try to reuse a slot if possible....
- HashSet<WorkerSlot> ret = new HashSet<>();
- Map<String, Collection<ExecutorDetails>> assigned = topIdToUsedSlots.get(td.getId());
- if (assigned != null) {
- ret.addAll(workerIdsToWorkers(assigned.keySet()));
- }
- ret.addAll(getFreeSlots());
- ret.retainAll(
- originallyFreeSlots); //RAS does not let you move things or modify existing assignments
- return ret;
+ public Collection<WorkerSlot> getSlotsAvailableToScheduleOn() {
+ return originallyFreeSlots;
}
public Collection<WorkerSlot> getFreeSlots() {
@@ -163,7 +153,7 @@ public class RAS_Node {
}
private Collection<String> getUsedSlotsId() {
- Collection<String> ret = new LinkedList<String>();
+ Collection<String> ret = new LinkedList<>();
for (Map<String, Collection<ExecutorDetails>> entry : topIdToUsedSlots.values()) {
ret.addAll(entry.keySet());
}
@@ -329,16 +319,17 @@ public class RAS_Node {
cluster.assign(target, td.getId(), executors);
//assigning internally
- if (!topIdToUsedSlots.containsKey(td.getId())) {
- topIdToUsedSlots.put(td.getId(), new HashMap<String, Collection<ExecutorDetails>>());
- }
-
- if (!topIdToUsedSlots.get(td.getId()).containsKey(target.getId())) {
- topIdToUsedSlots.get(td.getId()).put(target.getId(), new LinkedList<ExecutorDetails>());
- }
- topIdToUsedSlots.get(td.getId()).get(target.getId()).addAll(executors);
+ topIdToUsedSlots.computeIfAbsent(td.getId(), (tid) -> new HashMap<>())
+ .computeIfAbsent(target.getId(), (tid) -> new LinkedList<>())
+ .addAll(executors);
}
+ /**
+ * Assign a single executor to a slot, even if other things are in the slot.
+ * @param ws the slot to assign it to.
+ * @param exec the executor to assign.
+ * @param td the topology for the executor.
+ */
public void assignSingleExecutor(WorkerSlot ws, ExecutorDetails exec, TopologyDetails td) {
if (!isAlive) {
throw new IllegalStateException("Trying to adding to a dead node " + nodeId);
@@ -372,9 +363,7 @@ public class RAS_Node {
* @return true if it would fit else false
*/
public boolean wouldFit(WorkerSlot ws, ExecutorDetails exec, TopologyDetails td) {
- if (!nodeId.equals(ws.getNodeId())) {
- throw new IllegalStateException("Slot " + ws + " is not a part of this node " + nodeId);
- }
+ assert nodeId.equals(ws.getNodeId()) : "Slot " + ws + " is not a part of this node " + nodeId;
return isAlive
&& cluster.wouldFit(
ws,
@@ -385,6 +374,19 @@ public class RAS_Node {
);
}
+ /**
+ * Is there any possibility that exec could ever fit on this node.
+ * @param exec the executor to schedule
+ * @param td the topology the executor is a part of
+ * @return true if there is the possibility it might fit, no guarantee that it will, or false if there is no
+ * way it would ever fit.
+ */
+ public boolean couldEverFit(ExecutorDetails exec, TopologyDetails td) {
+ NormalizedResourceOffer avail = getTotalAvailableResources();
+ NormalizedResourceRequest requestedResources = td.getTotalResources(exec);
+ return isAlive && avail.couldHoldIgnoringSharedMemory(requestedResources);
+ }
+
@Override
public boolean equals(Object other) {
if (other instanceof RAS_Node) {
http://git-wip-us.apache.org/repos/asf/storm/blob/32392bc8/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceOffer.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceOffer.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceOffer.java
index 0c073a8..790a42b 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceOffer.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceOffer.java
@@ -43,10 +43,18 @@ public class NormalizedResourceOffer implements NormalizedResourcesWithMemory {
this.normalizedResources = new NormalizedResources(normalizedResourceMap);
}
+ /**
+ * Create an offer with all resources set to 0.
+ */
public NormalizedResourceOffer() {
- this((Map<String, ? extends Number>) null);
+ normalizedResources = new NormalizedResources();
+ totalMemoryMb = 0.0;
}
+ /**
+ * Copy Constructor.
+ * @param other what to copy.
+ */
public NormalizedResourceOffer(NormalizedResourceOffer other) {
this.totalMemoryMb = other.totalMemoryMb;
this.normalizedResources = new NormalizedResources(other.normalizedResources);
@@ -57,6 +65,10 @@ public class NormalizedResourceOffer implements NormalizedResourcesWithMemory {
return totalMemoryMb;
}
+ /**
+ * Return these resources as a normalized map.
+ * @return the normalized map.
+ */
public Map<String, Double> toNormalizedMap() {
Map<String, Double> ret = normalizedResources.toNormalizedMap();
ret.put(Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME, totalMemoryMb);
@@ -68,6 +80,10 @@ public class NormalizedResourceOffer implements NormalizedResourcesWithMemory {
totalMemoryMb += other.getTotalMemoryMb();
}
+ /**
+ * Remove the resources in other from this.
+ * @param other what to remove.
+ */
public void remove(NormalizedResourcesWithMemory other) {
normalizedResources.remove(other.getNormalizedResources());
totalMemoryMb -= other.getTotalMemoryMb();
@@ -75,10 +91,10 @@ public class NormalizedResourceOffer implements NormalizedResourcesWithMemory {
normalizedResources.throwBecauseResourceBecameNegative(
Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME, totalMemoryMb, other.getTotalMemoryMb());
}
- ;
}
/**
+ * Calculate the average percentage used.
* @see NormalizedResources#calculateAveragePercentageUsedBy(org.apache.storm.scheduler.resource.normalization.NormalizedResources,
* double, double).
*/
@@ -88,6 +104,7 @@ public class NormalizedResourceOffer implements NormalizedResourcesWithMemory {
}
/**
+ * Calculate the min percentage used of the resource.
* @see NormalizedResources#calculateMinPercentageUsedBy(org.apache.storm.scheduler.resource.normalization.NormalizedResources, double,
* double)
*/
@@ -96,6 +113,7 @@ public class NormalizedResourceOffer implements NormalizedResourcesWithMemory {
}
/**
+ * Check if resources might be able to fit.
* @see NormalizedResources#couldHoldIgnoringSharedMemory(org.apache.storm.scheduler.resource.normalization.NormalizedResources, double,
* double).
*/
@@ -112,4 +130,19 @@ public class NormalizedResourceOffer implements NormalizedResourcesWithMemory {
public NormalizedResources getNormalizedResources() {
return normalizedResources;
}
+
+ @Override
+ public String toString() {
+ return "Normalized resources: " + toNormalizedMap();
+ }
+
+ public void updateForRareResourceAffinity(NormalizedResourceRequest requestedResources) {
+ normalizedResources.updateForRareResourceAffinity(requestedResources.getNormalizedResources());
+ }
+
+ @Override
+ public void clear() {
+ this.totalMemoryMb = 0.0;
+ this.normalizedResources.clear();
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/32392bc8/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceRequest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceRequest.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceRequest.java
index dc380e6..86033b8 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceRequest.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceRequest.java
@@ -43,15 +43,24 @@ public class NormalizedResourceRequest implements NormalizedResourcesWithMemory
private NormalizedResourceRequest(Map<String, ? extends Number> resources,
Map<String, Double> defaultResources) {
- Map<String, Double> normalizedResourceMap = NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(defaultResources);
- normalizedResourceMap.putAll(NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(resources));
- onHeap = normalizedResourceMap.getOrDefault(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, 0.0);
- offHeap = normalizedResourceMap.getOrDefault(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, 0.0);
- normalizedResources = new NormalizedResources(normalizedResourceMap);
+ if (resources == null && defaultResources == null) {
+ onHeap = 0.0;
+ offHeap = 0.0;
+ normalizedResources = new NormalizedResources();
+ } else {
+ Map<String, Double> normalizedResourceMap = NormalizedResources.RESOURCE_NAME_NORMALIZER
+ .normalizedResourceMap(defaultResources);
+ normalizedResourceMap.putAll(NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(resources));
+ onHeap = normalizedResourceMap.getOrDefault(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, 0.0);
+ offHeap = normalizedResourceMap.getOrDefault(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, 0.0);
+ normalizedResources = new NormalizedResources(normalizedResourceMap);
+ }
}
+
public NormalizedResourceRequest(ComponentCommon component, Map<String, Object> topoConf) {
this(parseResources(component.get_json_conf()), getDefaultResources(topoConf));
}
+
public NormalizedResourceRequest(Map<String, Object> topoConf) {
this((Map<String, ? extends Number>) null, getDefaultResources(topoConf));
}
@@ -174,7 +183,7 @@ public class NormalizedResourceRequest implements NormalizedResourcesWithMemory
@Override
public String toString() {
- return super.toString() + " onHeap: " + onHeap + " offHeap: " + offHeap;
+ return "Normalized resources: " + toNormalizedMap();
}
public double getTotalCpu() {
@@ -185,4 +194,11 @@ public class NormalizedResourceRequest implements NormalizedResourcesWithMemory
public NormalizedResources getNormalizedResources() {
return this.normalizedResources;
}
+
+ @Override
+ public void clear() {
+ normalizedResources.clear();
+ offHeap = 0.0;
+ onHeap = 0.0;
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/32392bc8/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java
index c41cd95..e57d8f2 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java
@@ -45,6 +45,14 @@ public class NormalizedResources {
private double[] otherResources;
/**
+ * Create an empty NormalizedResources.
+ */
+ NormalizedResources() {
+ cpu = 0.0;
+ otherResources = RESOURCE_MAP_ARRAY_BRIDGE.empty();
+ }
+
+ /**
* Copy constructor.
*/
public NormalizedResources(NormalizedResources other) {
@@ -224,12 +232,6 @@ public class NormalizedResources {
* resources that are not present in the total.
*/
public double calculateAveragePercentageUsedBy(NormalizedResources used, double totalMemoryMb, double usedMemoryMb) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Calculating avg percentage used by. Used Mem: {} Total Mem: {}"
- + " Used Normalized Resources: {} Total Normalized Resources: {}", totalMemoryMb, usedMemoryMb,
- toNormalizedMap(), used.toNormalizedMap());
- }
-
int skippedResourceTypes = 0;
double total = 0.0;
if (usedMemoryMb > totalMemoryMb) {
@@ -344,4 +346,23 @@ public class NormalizedResources {
}
return min * 100.0;
}
+
+ public void updateForRareResourceAffinity(NormalizedResources other) {
+ int length = Math.min(this.otherResources.length, other.otherResources.length);
+ for (int i = 0; i < length; i++) {
+ if (other.getResourceAt(i) == 0.0) {
+ this.otherResources[i] = -1 * this.otherResources[i];
+ }
+ }
+ }
+
+ /**
+ * Set all resources to 0.
+ */
+ public void clear() {
+ this.cpu = 0.0;
+ for (int i = 0; i < otherResources.length; i++) {
+ otherResources[i] = 0.0;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/32392bc8/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourcesWithMemory.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourcesWithMemory.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourcesWithMemory.java
index 5001645..aeb0737 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourcesWithMemory.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourcesWithMemory.java
@@ -25,4 +25,9 @@ public interface NormalizedResourcesWithMemory {
double getTotalMemoryMb();
+ /**
+ * Set all resources to 0.
+ */
+ void clear();
+
}
http://git-wip-us.apache.org/repos/asf/storm/blob/32392bc8/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/ResourceMapArrayBridge.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/ResourceMapArrayBridge.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/ResourceMapArrayBridge.java
index 2b07c65..5a71db4 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/ResourceMapArrayBridge.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/ResourceMapArrayBridge.java
@@ -65,6 +65,14 @@ public class ResourceMapArrayBridge {
}
/**
+ * Create an array that has all values 0;
+ * @return the empty array.
+ */
+ public double[] empty() {
+ return new double[counter.get()];
+ }
+
+ /**
* Translates an array of resource values to a normalized resource map.
*
* @param resources The resource array to translate
http://git-wip-us.apache.org/repos/asf/storm/blob/32392bc8/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
index a474ccc..7e50f4a 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
@@ -24,35 +24,64 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.storm.generated.ComponentType;
+import org.apache.storm.networktopography.DNSToSwitchMapping;
import org.apache.storm.scheduler.Cluster;
import org.apache.storm.scheduler.Component;
import org.apache.storm.scheduler.ExecutorDetails;
+import org.apache.storm.scheduler.SchedulerAssignment;
import org.apache.storm.scheduler.TopologyDetails;
import org.apache.storm.scheduler.WorkerSlot;
import org.apache.storm.scheduler.resource.RAS_Node;
import org.apache.storm.scheduler.resource.RAS_Nodes;
+import org.apache.storm.scheduler.resource.SchedulingResult;
+import org.apache.storm.scheduler.resource.SchedulingStatus;
import org.apache.storm.scheduler.resource.normalization.NormalizedResourceOffer;
+import org.apache.storm.scheduler.resource.normalization.NormalizedResourceRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class BaseResourceAwareStrategy implements IStrategy {
private static final Logger LOG = LoggerFactory.getLogger(BaseResourceAwareStrategy.class);
protected Cluster cluster;
- protected RAS_Nodes nodes;
+ // Rack id to list of host names in that rack
private Map<String, List<String>> networkTopography;
+ private final Map<String, String> superIdToRack = new HashMap<>();
+ private final Map<String, String> superIdToHostname = new HashMap<>();
+ private final Map<String, List<RAS_Node>> hostnameToNodes = new HashMap<>();
+ private final Map<String, List<RAS_Node>> rackIdToNodes = new HashMap<>();
+ protected RAS_Nodes nodes;
@VisibleForTesting
void prepare(Cluster cluster) {
this.cluster = cluster;
nodes = new RAS_Nodes(cluster);
networkTopography = cluster.getNetworkTopography();
+ Map<String, String> hostToRack = new HashMap<>();
+ for (Map.Entry<String, List<String>> entry : networkTopography.entrySet()) {
+ String rackId = entry.getKey();
+ for (String hostName: entry.getValue()) {
+ hostToRack.put(hostName, rackId);
+ }
+ }
+ for (RAS_Node node: nodes.getNodes()) {
+ String superId = node.getId();
+ String hostName = node.getHostname();
+ String rackId = hostToRack.getOrDefault(hostName, DNSToSwitchMapping.DEFAULT_RACK);
+ superIdToHostname.put(superId, hostName);
+ superIdToRack.put(superId, rackId);
+ hostnameToNodes.computeIfAbsent(hostName, (hn) -> new ArrayList<>()).add(node);
+ rackIdToNodes.computeIfAbsent(rackId, (hn) -> new ArrayList<>()).add(node);
+ }
logClusterInfo();
}
@@ -61,15 +90,22 @@ public abstract class BaseResourceAwareStrategy implements IStrategy {
//NOOP
}
+ protected SchedulingResult mkNotEnoughResources(TopologyDetails td) {
+ return SchedulingResult.failure(
+ SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES,
+ td.getExecutors().size() + " executors not scheduled");
+ }
+
/**
* Schedule executor exec from topology td.
*
* @param exec the executor to schedule
* @param td the topology executor exec is a part of
* @param scheduledTasks executors that have been scheduled
+ * @return true if scheduled successfully, else false.
*/
- protected void scheduleExecutor(
- ExecutorDetails exec, TopologyDetails td, Collection<ExecutorDetails> scheduledTasks, List<ObjectResources> sortedNodes) {
+ protected boolean scheduleExecutor(
+ ExecutorDetails exec, TopologyDetails td, Collection<ExecutorDetails> scheduledTasks, Iterable<String> sortedNodes) {
WorkerSlot targetSlot = findWorkerForExec(exec, td, sortedNodes);
if (targetSlot != null) {
RAS_Node targetNode = idToNode(targetSlot.getNodeId());
@@ -86,8 +122,12 @@ public abstract class BaseResourceAwareStrategy implements IStrategy {
targetNode.getTotalCpuResources(),
targetSlot,
nodeToRack(targetNode));
+ return true;
} else {
- LOG.error("Not Enough Resources to schedule Task {}", exec);
+ String comp = td.getExecutorToComponent().get(exec);
+ NormalizedResourceRequest requestedResources = td.getTotalResources(exec);
+ LOG.error("Not Enough Resources to schedule Task {} - {} {}", exec, comp, requestedResources);
+ return false;
}
}
@@ -103,12 +143,14 @@ public abstract class BaseResourceAwareStrategy implements IStrategy {
* @param td the topology that the executor is a part of
* @return a worker to assign exec on. Returns null if a worker cannot be successfully found in cluster
*/
- protected WorkerSlot findWorkerForExec(ExecutorDetails exec, TopologyDetails td, List<ObjectResources> sortedNodes) {
- for (ObjectResources nodeResources : sortedNodes) {
- RAS_Node node = nodes.getNodeById(nodeResources.id);
- for (WorkerSlot ws : node.getSlotsAvailbleTo(td)) {
- if (node.wouldFit(ws, exec, td)) {
- return ws;
+ protected WorkerSlot findWorkerForExec(ExecutorDetails exec, TopologyDetails td, Iterable<String> sortedNodes) {
+ for (String id : sortedNodes) {
+ RAS_Node node = nodes.getNodeById(id);
+ if (node.couldEverFit(exec, td)) {
+ for (WorkerSlot ws : node.getSlotsAvailableToScheduleOn()) {
+ if (node.wouldFit(ws, exec, td)) {
+ return ws;
+ }
}
}
}
@@ -133,96 +175,219 @@ public abstract class BaseResourceAwareStrategy implements IStrategy {
* @return a sorted list of nodes.
*/
protected TreeSet<ObjectResources> sortNodes(
- List<RAS_Node> availNodes, ExecutorDetails exec, TopologyDetails topologyDetails, String rackId) {
- AllResources allResources = new AllResources("RACK");
- List<ObjectResources> nodes = allResources.objectResources;
+ List<RAS_Node> availNodes, ExecutorDetails exec, TopologyDetails topologyDetails, String rackId,
+ Map<String, AtomicInteger> scheduledCount) {
+ AllResources allRackResources = new AllResources("RACK");
+ List<ObjectResources> nodes = allRackResources.objectResources;
for (RAS_Node rasNode : availNodes) {
- String nodeId = rasNode.getId();
- ObjectResources node = new ObjectResources(nodeId);
+ String superId = rasNode.getId();
+ ObjectResources node = new ObjectResources(superId);
node.availableResources = rasNode.getTotalAvailableResources();
node.totalResources = rasNode.getTotalResources();
nodes.add(node);
- allResources.availableResourcesOverall.add(node.availableResources);
- allResources.totalResourcesOverall.add(node.totalResources);
+ allRackResources.availableResourcesOverall.add(node.availableResources);
+ allRackResources.totalResourcesOverall.add(node.totalResources);
}
LOG.debug(
"Rack {}: Overall Avail [ {} ] Total [ {} ]",
rackId,
- allResources.availableResourcesOverall,
- allResources.totalResourcesOverall);
+ allRackResources.availableResourcesOverall,
+ allRackResources.totalResourcesOverall);
- String topoId = topologyDetails.getId();
return sortObjectResources(
- allResources,
+ allRackResources,
exec,
topologyDetails,
- new ExistingScheduleFunc() {
- @Override
- public int getNumExistingSchedule(String objectId) {
-
- //Get execs already assigned in rack
- Collection<ExecutorDetails> execs = new LinkedList<>();
- if (cluster.getAssignmentById(topoId) != null) {
- for (Map.Entry<ExecutorDetails, WorkerSlot> entry :
- cluster.getAssignmentById(topoId).getExecutorToSlot().entrySet()) {
- WorkerSlot workerSlot = entry.getValue();
- ExecutorDetails exec = entry.getKey();
- if (workerSlot.getNodeId().equals(objectId)) {
- execs.add(exec);
- }
- }
- }
- return execs.size();
+ (superId) -> {
+ AtomicInteger count = scheduledCount.get(superId);
+ if (count == null) {
+ return 0;
}
+ return count.get();
});
}
- protected List<ObjectResources> sortAllNodes(TopologyDetails td, ExecutorDetails exec,
- List<String> favoredNodes, List<String> unFavoredNodes) {
- TreeSet<ObjectResources> sortedRacks = sortRacks(exec, td);
- ArrayList<ObjectResources> totallySortedNodes = new ArrayList<>();
- for (ObjectResources rack : sortedRacks) {
- final String rackId = rack.id;
- TreeSet<ObjectResources> sortedNodes = sortNodes(
- getAvailableNodesFromRack(rackId), exec, td, rackId);
- totallySortedNodes.addAll(sortedNodes);
+ protected List<String> makeHostToNodeIds(List<String> hosts) {
+ if (hosts == null) {
+ return Collections.emptyList();
+ }
+ List<String> ret = new ArrayList<>(hosts.size());
+ for (String host: hosts) {
+ List<RAS_Node> nodes = hostnameToNodes.get(host);
+ if (nodes != null) {
+ for (RAS_Node node : nodes) {
+ ret.add(node.getId());
+ }
+ }
+ }
+ return ret;
+ }
+
+ private static class LazyNodeSortingIterator implements Iterator<String> {
+ private final LazyNodeSorting parent;
+ private final Iterator<ObjectResources> rackIterator;
+ private Iterator<ObjectResources> nodeIterator;
+ private String nextValueFromNode = null;
+ private final Iterator<String> pre;
+ private final Iterator<String> post;
+ private final Set<String> skip;
+
+ public LazyNodeSortingIterator(LazyNodeSorting parent,
+ TreeSet<ObjectResources> sortedRacks) {
+ this.parent = parent;
+ rackIterator = sortedRacks.iterator();
+ pre = parent.favoredNodeIds.iterator();
+ post = parent.unFavoredNodeIds.iterator();
+ skip = parent.skippedNodeIds;
+ }
+
+ private Iterator<ObjectResources> getNodeIterator() {
+ if (nodeIterator != null && nodeIterator.hasNext()) {
+ return nodeIterator;
+ }
+ //need to get the next node iterator
+ if (rackIterator.hasNext()) {
+ ObjectResources rack = rackIterator.next();
+ final String rackId = rack.id;
+ nodeIterator = parent.getSortedNodesFor(rackId).iterator();
+ return nodeIterator;
+ }
+
+ return null;
}
- //Now do some post processing to add make some nodes preferred over others.
- if (favoredNodes != null || unFavoredNodes != null) {
- HashMap<String, Integer> hostOrder = new HashMap<>();
- if (favoredNodes != null) {
- int size = favoredNodes.size();
- for (int i = 0; i < size; i++) {
- //First in the list is the most desired so gets the Lowest possible value
- hostOrder.put(favoredNodes.get(i), -(size - i));
+
+ @Override
+ public boolean hasNext() {
+ if (pre.hasNext()) {
+ return true;
+ }
+ while (true) {
+ //For the node we don't know if we have another one unless we look at the contents
+ Iterator<ObjectResources> nodeIterator = getNodeIterator();
+ if (nodeIterator == null || !nodeIterator.hasNext()) {
+ break;
}
+ nextValueFromNode = nodeIterator.next().id;
+ if (!skip.contains(nextValueFromNode)) {
+ return true;
+ }
+ }
+ if (post.hasNext()) {
+ return true;
}
- if (unFavoredNodes != null) {
- int size = unFavoredNodes.size();
- for (int i = 0; i < size; i++) {
- //First in the list is the least desired so gets the highest value
- hostOrder.put(unFavoredNodes.get(i), size - i);
+ return false;
+ }
+
+ @Override
+ public String next() {
+ if (pre.hasNext()) {
+ return pre.next();
+ }
+ if (nextValueFromNode != null) {
+ String tmp = nextValueFromNode;
+ nextValueFromNode = null;
+ return tmp;
+ }
+ return post.next();
+ }
+ }
+
+ private class LazyNodeSorting implements Iterable<String> {
+ private final Map<String, AtomicInteger> perNodeScheduledCount = new HashMap<>();
+ private final TreeSet<ObjectResources> sortedRacks;
+ private final Map<String, TreeSet<ObjectResources>> cachedNodes = new HashMap<>();
+ private final ExecutorDetails exec;
+ private final TopologyDetails td;
+ private final List<String> favoredNodeIds;
+ private final List<String> unFavoredNodeIds;
+ private final Set<String> skippedNodeIds = new HashSet<>();
+
+ public LazyNodeSorting(TopologyDetails td, ExecutorDetails exec,
+ List<String> favoredNodeIds, List<String> unFavoredNodeIds) {
+ this.favoredNodeIds = favoredNodeIds;
+ this.unFavoredNodeIds = unFavoredNodeIds;
+ this.unFavoredNodeIds.removeAll(favoredNodeIds);
+ skippedNodeIds.addAll(favoredNodeIds);
+ skippedNodeIds.addAll(unFavoredNodeIds);
+
+ this.td = td;
+ this.exec = exec;
+ String topoId = td.getId();
+ SchedulerAssignment assignment = cluster.getAssignmentById(topoId);
+ if (assignment != null) {
+ for (Map.Entry<WorkerSlot, Collection<ExecutorDetails>> entry :
+ assignment.getSlotToExecutors().entrySet()) {
+ String superId = entry.getKey().getNodeId();
+ perNodeScheduledCount.computeIfAbsent(superId, (sid) -> new AtomicInteger(0))
+ .getAndAdd(entry.getValue().size());
}
}
- //java guarantees a stable sort so we can just return 0 for values we don't want to move.
- Collections.sort(totallySortedNodes, (o1, o2) -> {
- RAS_Node n1 = this.nodes.getNodeById(o1.id);
- String host1 = n1.getHostname();
- int h1Value = hostOrder.getOrDefault(host1, 0);
+ sortedRacks = sortRacks(exec, td);
+ }
- RAS_Node n2 = this.nodes.getNodeById(o2.id);
- String host2 = n2.getHostname();
- int h2Value = hostOrder.getOrDefault(host2, 0);
+ private TreeSet<ObjectResources> getSortedNodesFor(String rackId) {
+ return cachedNodes.computeIfAbsent(rackId,
+ (rid) -> sortNodes(rackIdToNodes.get(rid), exec, td, rid, perNodeScheduledCount));
+ }
- return Integer.compare(h1Value, h2Value);
- });
+ @Override
+ public Iterator<String> iterator() {
+ return new LazyNodeSortingIterator(this, sortedRacks);
}
- return totallySortedNodes;
+ }
+
+ protected Iterable<String> sortAllNodes(TopologyDetails td, ExecutorDetails exec,
+ List<String> favoredNodeIds, List<String> unFavoredNodeIds) {
+ return new LazyNodeSorting(td, exec, favoredNodeIds, unFavoredNodeIds);
+ }
+
+ private AllResources createClusterAllResources() {
+ AllResources allResources = new AllResources("Cluster");
+ List<ObjectResources> racks = allResources.objectResources;
+
+ //This is the first time so initialize the resources.
+ for (Map.Entry<String, List<String>> entry : networkTopography.entrySet()) {
+ String rackId = entry.getKey();
+ List<String> nodeHosts = entry.getValue();
+ ObjectResources rack = new ObjectResources(rackId);
+ racks.add(rack);
+ for (String nodeHost : nodeHosts) {
+ for (RAS_Node node : hostnameToNodes(nodeHost)) {
+ rack.availableResources.add(node.getTotalAvailableResources());
+ rack.totalResources.add(node.getTotalAvailableResources());
+ }
+ }
+
+ allResources.totalResourcesOverall.add(rack.totalResources);
+ allResources.availableResourcesOverall.add(rack.availableResources);
+ }
+
+ LOG.debug(
+ "Cluster Overall Avail [ {} ] Total [ {} ]",
+ allResources.availableResourcesOverall,
+ allResources.totalResourcesOverall);
+ return allResources;
+ }
+
+ private Map<String, AtomicInteger> getScheduledCount(TopologyDetails topologyDetails) {
+ String topoId = topologyDetails.getId();
+ SchedulerAssignment assignment = cluster.getAssignmentById(topoId);
+ Map<String, AtomicInteger> scheduledCount = new HashMap<>();
+ if (assignment != null) {
+ for (Map.Entry<WorkerSlot, Collection<ExecutorDetails>> entry :
+ assignment.getSlotToExecutors().entrySet()) {
+ String superId = entry.getKey().getNodeId();
+ String rackId = superIdToRack.get(superId);
+ scheduledCount.computeIfAbsent(rackId, (rid) -> new AtomicInteger(0))
+ .getAndAdd(entry.getValue().size());
+ }
+ }
+ return scheduledCount;
}
/**
@@ -242,55 +407,20 @@ public abstract class BaseResourceAwareStrategy implements IStrategy {
*/
@VisibleForTesting
TreeSet<ObjectResources> sortRacks(ExecutorDetails exec, TopologyDetails topologyDetails) {
- AllResources allResources = new AllResources("Cluster");
- List<ObjectResources> racks = allResources.objectResources;
-
- final Map<String, String> nodeIdToRackId = new HashMap<String, String>();
-
- for (Map.Entry<String, List<String>> entry : networkTopography.entrySet()) {
- String rackId = entry.getKey();
- List<String> nodeIds = entry.getValue();
- ObjectResources rack = new ObjectResources(rackId);
- racks.add(rack);
- for (String nodeId : nodeIds) {
- RAS_Node node = nodes.getNodeById(nodeHostnameToId(nodeId));
- rack.availableResources.add(node.getTotalAvailableResources());
- rack.totalResources.add(node.getTotalAvailableResources());
-
- nodeIdToRackId.put(nodeId, rack.id);
- allResources.totalResourcesOverall.add(rack.totalResources);
- allResources.availableResourcesOverall.add(rack.availableResources);
+ final AllResources allResources = createClusterAllResources();
+ final Map<String, AtomicInteger> scheduledCount = getScheduledCount(topologyDetails);
- }
- }
- LOG.debug(
- "Cluster Overall Avail [ {} ] Total [ {} ]",
- allResources.availableResourcesOverall,
- allResources.totalResourcesOverall);
-
- String topoId = topologyDetails.getId();
return sortObjectResources(
allResources,
exec,
topologyDetails,
- (objectId) -> {
- String rackId = objectId;
- //Get execs already assigned in rack
- Collection<ExecutorDetails> execs = new LinkedList<>();
- if (cluster.getAssignmentById(topoId) != null) {
- for (Map.Entry<ExecutorDetails, WorkerSlot> entry :
- cluster.getAssignmentById(topoId).getExecutorToSlot().entrySet()) {
- String nodeId = entry.getValue().getNodeId();
- String hostname = idToNode(nodeId).getHostname();
- ExecutorDetails exec1 = entry.getKey();
- if (nodeIdToRackId.get(hostname) != null
- && nodeIdToRackId.get(hostname).equals(rackId)) {
- execs.add(exec1);
- }
- }
+ (rackId) -> {
+ AtomicInteger count = scheduledCount.get(rackId);
+ if (count == null) {
+ return 0;
}
- return execs.size();
+ return count.get();
});
}
@@ -301,27 +431,7 @@ public abstract class BaseResourceAwareStrategy implements IStrategy {
* @return the rack id
*/
protected String nodeToRack(RAS_Node node) {
- for (Map.Entry<String, List<String>> entry : networkTopography.entrySet()) {
- if (entry.getValue().contains(node.getHostname())) {
- return entry.getKey();
- }
- }
- LOG.error("Node: {} not found in any racks", node.getHostname());
- return null;
- }
-
- /**
- * get a list nodes from a rack.
- *
- * @param rackId the rack id of the rack to get nodes from
- * @return a list of nodes
- */
- protected List<RAS_Node> getAvailableNodesFromRack(String rackId) {
- List<RAS_Node> retList = new ArrayList<>();
- for (String nodeId : networkTopography.get(rackId)) {
- retList.add(nodes.getNodeById(this.nodeHostnameToId(nodeId)));
- }
- return retList;
+ return superIdToRack.get(node.getId());
}
/**
@@ -459,9 +569,7 @@ public abstract class BaseResourceAwareStrategy implements IStrategy {
}
/**
- * Get the amount of resources available and total for each node.
- *
- * @return a String with cluster resource info for debug
+ * Log a bunch of stuff for debugging.
*/
private void logClusterInfo() {
if (LOG.isDebugEnabled()) {
@@ -470,40 +578,32 @@ public abstract class BaseResourceAwareStrategy implements IStrategy {
String rackId = clusterEntry.getKey();
LOG.debug("Rack: {}", rackId);
for (String nodeHostname : clusterEntry.getValue()) {
- RAS_Node node = idToNode(this.nodeHostnameToId(nodeHostname));
- LOG.debug("-> Node: {} {}", node.getHostname(), node.getId());
- LOG.debug(
- "--> Avail Resources: {Mem {}, CPU {} Slots: {}}",
- node.getAvailableMemoryResources(),
- node.getAvailableCpuResources(),
- node.totalSlotsFree());
- LOG.debug(
- "--> Total Resources: {Mem {}, CPU {} Slots: {}}",
- node.getTotalMemoryResources(),
- node.getTotalCpuResources(),
- node.totalSlots());
+ for (RAS_Node node : hostnameToNodes(nodeHostname)) {
+ LOG.debug("-> Node: {} {}", node.getHostname(), node.getId());
+ LOG.debug(
+ "--> Avail Resources: {Mem {}, CPU {} Slots: {}}",
+ node.getAvailableMemoryResources(),
+ node.getAvailableCpuResources(),
+ node.totalSlotsFree());
+ LOG.debug(
+ "--> Total Resources: {Mem {}, CPU {} Slots: {}}",
+ node.getTotalMemoryResources(),
+ node.getTotalCpuResources(),
+ node.totalSlots());
+ }
}
}
}
}
/**
- * hostname to Id.
+ * hostname to Ids.
*
- * @param hostname the hostname to convert to node id
- * @return the id of a node
+ * @param hostname the hostname.
+ * @return the ids n that node.
*/
- public String nodeHostnameToId(String hostname) {
- for (RAS_Node n : nodes.getNodes()) {
- if (n.getHostname() == null) {
- continue;
- }
- if (n.getHostname().equals(hostname)) {
- return n.getId();
- }
- }
- LOG.error("Cannot find Node with hostname {}", hostname);
- return null;
+ public List<RAS_Node> hostnameToNodes(String hostname) {
+ return hostnameToNodes.get(hostname);
}
/**
http://git-wip-us.apache.org/repos/asf/storm/blob/32392bc8/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java
index 12f89ca..9ccfd50 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java
@@ -45,16 +45,17 @@ public class ConstraintSolverStrategy extends BaseResourceAwareStrategy {
public static final int MAX_STATE_SEARCH = 100_000;
public static final int DEFAULT_STATE_SEARCH = 10_000;
private static final Logger LOG = LoggerFactory.getLogger(ConstraintSolverStrategy.class);
- private Map<String, RAS_Node> nodes;
- private Map<ExecutorDetails, String> execToComp;
- private Map<String, Set<ExecutorDetails>> compToExecs;
- private List<String> favoredNodes;
- private List<String> unFavoredNodes;
//constraints and spreads
private Map<String, Map<String, Integer>> constraintMatrix;
private HashSet<String> spreadComps = new HashSet<>();
+ private Map<String, RAS_Node> nodes;
+ private Map<ExecutorDetails, String> execToComp;
+ private Map<String, Set<ExecutorDetails>> compToExecs;
+ private List<String> favoredNodeIds;
+ private List<String> unFavoredNodeIds;
+
static Map<String, Map<String, Integer>> getConstraintMap(TopologyDetails topo, Set<String> comps) {
Map<String, Map<String, Integer>> matrix = new HashMap<>();
for (String comp : comps) {
@@ -252,8 +253,8 @@ public class ConstraintSolverStrategy extends BaseResourceAwareStrategy {
final long maxTimeMs =
ObjectReader.getInt(td.getConf().get(Config.TOPOLOGY_RAS_CONSTRAINT_MAX_TIME_SECS), -1).intValue() * 1000L;
- favoredNodes = (List<String>) td.getConf().get(Config.TOPOLOGY_SCHEDULER_FAVORED_NODES);
- unFavoredNodes = (List<String>) td.getConf().get(Config.TOPOLOGY_SCHEDULER_UNFAVORED_NODES);
+ favoredNodeIds = makeHostToNodeIds((List<String>) td.getConf().get(Config.TOPOLOGY_SCHEDULER_FAVORED_NODES));
+ unFavoredNodeIds = makeHostToNodeIds((List<String>) td.getConf().get(Config.TOPOLOGY_SCHEDULER_UNFAVORED_NODES));
//get mapping of execs to components
execToComp = td.getExecutorToComponent();
@@ -328,11 +329,11 @@ public class ConstraintSolverStrategy extends BaseResourceAwareStrategy {
}
ExecutorDetails exec = state.currentExec();
- List<ObjectResources> sortedNodes = sortAllNodes(state.td, exec, favoredNodes, unFavoredNodes);
+ Iterable<String> sortedNodes = sortAllNodes(state.td, exec, favoredNodeIds, unFavoredNodeIds);
- for (ObjectResources nodeResources : sortedNodes) {
- RAS_Node node = nodes.get(nodeResources.id);
- for (WorkerSlot workerSlot : node.getSlotsAvailbleTo(state.td)) {
+ for (String nodeId: sortedNodes) {
+ RAS_Node node = nodes.get(nodeId);
+ for (WorkerSlot workerSlot : node.getSlotsAvailableToScheduleOn()) {
if (isExecAssignmentToWorkerValid(workerSlot, state)) {
state.tryToSchedule(execToComp, node, workerSlot);
http://git-wip-us.apache.org/repos/asf/storm/blob/32392bc8/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
index 826c24d..a832557 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
@@ -60,9 +60,9 @@ public class DefaultResourceAwareStrategy extends BaseResourceAwareStrategy impl
//order executors to be scheduled
List<ExecutorDetails> orderedExecutors = this.orderExecutors(td, unassignedExecutors);
Collection<ExecutorDetails> executorsNotScheduled = new HashSet<>(unassignedExecutors);
- List<String> favoredNodes = (List<String>) td.getConf().get(Config.TOPOLOGY_SCHEDULER_FAVORED_NODES);
- List<String> unFavoredNodes = (List<String>) td.getConf().get(Config.TOPOLOGY_SCHEDULER_UNFAVORED_NODES);
- final List<ObjectResources> sortedNodes = sortAllNodes(td, null, favoredNodes, unFavoredNodes);
+ List<String> favoredNodesIds = makeHostToNodeIds((List<String>) td.getConf().get(Config.TOPOLOGY_SCHEDULER_FAVORED_NODES));
+ List<String> unFavoredNodesIds = makeHostToNodeIds((List<String>) td.getConf().get(Config.TOPOLOGY_SCHEDULER_UNFAVORED_NODES));
+ final Iterable<String> sortedNodes = sortAllNodes(td, null, favoredNodesIds, unFavoredNodesIds);
for (ExecutorDetails exec : orderedExecutors) {
LOG.debug(
@@ -70,14 +70,18 @@ public class DefaultResourceAwareStrategy extends BaseResourceAwareStrategy impl
exec,
td.getExecutorToComponent().get(exec),
td.getTaskResourceReqList(exec));
- scheduleExecutor(exec, td, scheduledTasks, sortedNodes);
+ if (!scheduleExecutor(exec, td, scheduledTasks, sortedNodes)) {
+ return mkNotEnoughResources(td);
+ }
}
executorsNotScheduled.removeAll(scheduledTasks);
LOG.debug("/* Scheduling left over task (most likely sys tasks) */");
// schedule left over system tasks
for (ExecutorDetails exec : executorsNotScheduled) {
- scheduleExecutor(exec, td, scheduledTasks, sortedNodes);
+ if (!scheduleExecutor(exec, td, scheduledTasks, sortedNodes)) {
+ return mkNotEnoughResources(td);
+ }
}
SchedulingResult result;
http://git-wip-us.apache.org/repos/asf/storm/blob/32392bc8/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/GenericResourceAwareStrategy.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/GenericResourceAwareStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/GenericResourceAwareStrategy.java
index af9c681..ce98abd 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/GenericResourceAwareStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/GenericResourceAwareStrategy.java
@@ -30,6 +30,7 @@ import org.apache.storm.scheduler.ExecutorDetails;
import org.apache.storm.scheduler.TopologyDetails;
import org.apache.storm.scheduler.resource.SchedulingResult;
import org.apache.storm.scheduler.resource.SchedulingStatus;
+import org.apache.storm.scheduler.resource.normalization.NormalizedResourceRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -81,7 +82,7 @@ public class GenericResourceAwareStrategy extends BaseResourceAwareStrategy impl
}
Collection<ExecutorDetails> unassignedExecutors =
new HashSet<>(this.cluster.getUnassignedExecutors(td));
- LOG.info("ExecutorsNeedScheduling: {}", unassignedExecutors);
+ LOG.debug("Num ExecutorsNeedScheduling: {}", unassignedExecutors.size());
Collection<ExecutorDetails> scheduledTasks = new ArrayList<>();
List<Component> spouts = this.getSpouts(td);
@@ -92,10 +93,10 @@ public class GenericResourceAwareStrategy extends BaseResourceAwareStrategy impl
}
//order executors to be scheduled
- List<ExecutorDetails> orderedExecutors = this.orderExecutors(td, unassignedExecutors);
+ List<ExecutorDetails> orderedExecutors = orderExecutors(td, unassignedExecutors);
Collection<ExecutorDetails> executorsNotScheduled = new HashSet<>(unassignedExecutors);
- List<String> favoredNodes = (List<String>) td.getConf().get(Config.TOPOLOGY_SCHEDULER_FAVORED_NODES);
- List<String> unFavoredNodes = (List<String>) td.getConf().get(Config.TOPOLOGY_SCHEDULER_UNFAVORED_NODES);
+ List<String> favoredNodeIds = makeHostToNodeIds((List<String>) td.getConf().get(Config.TOPOLOGY_SCHEDULER_FAVORED_NODES));
+ List<String> unFavoredNodeIds = makeHostToNodeIds((List<String>) td.getConf().get(Config.TOPOLOGY_SCHEDULER_UNFAVORED_NODES));
for (ExecutorDetails exec : orderedExecutors) {
LOG.debug(
@@ -103,17 +104,24 @@ public class GenericResourceAwareStrategy extends BaseResourceAwareStrategy impl
exec,
td.getExecutorToComponent().get(exec),
td.getTaskResourceReqList(exec));
- final List<ObjectResources> sortedNodes = this.sortAllNodes(td, exec, favoredNodes, unFavoredNodes);
+ final Iterable<String> sortedNodes = sortAllNodes(td, exec, favoredNodeIds, unFavoredNodeIds);
- scheduleExecutor(exec, td, scheduledTasks, sortedNodes);
+ if (!scheduleExecutor(exec, td, scheduledTasks, sortedNodes)) {
+ return mkNotEnoughResources(td);
+ }
}
executorsNotScheduled.removeAll(scheduledTasks);
- LOG.error("/* Scheduling left over task (most likely sys tasks) */");
- // schedule left over system tasks
- for (ExecutorDetails exec : executorsNotScheduled) {
- final List<ObjectResources> sortedNodes = this.sortAllNodes(td, exec, favoredNodes, unFavoredNodes);
- scheduleExecutor(exec, td, scheduledTasks, sortedNodes);
+ if (!executorsNotScheduled.isEmpty()) {
+ LOG.warn("Scheduling {} left over task (most likely sys tasks)", executorsNotScheduled);
+ // schedule left over system tasks
+ for (ExecutorDetails exec : executorsNotScheduled) {
+ final Iterable<String> sortedNodes = sortAllNodes(td, exec, favoredNodeIds, unFavoredNodeIds);
+ if (!scheduleExecutor(exec, td, scheduledTasks, sortedNodes)) {
+ return mkNotEnoughResources(td);
+ }
+ }
+ executorsNotScheduled.removeAll(scheduledTasks);
}
SchedulingResult result;
http://git-wip-us.apache.org/repos/asf/storm/blob/32392bc8/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
index 45a2436..74d3ae3 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
@@ -23,6 +23,8 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.storm.Config;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.generated.WorkerResources;
@@ -38,38 +40,27 @@ import org.apache.storm.scheduler.WorkerSlot;
import org.apache.storm.scheduler.resource.normalization.NormalizedResources;
import org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy;
import org.apache.storm.scheduler.resource.strategies.scheduling.GenericResourceAwareStrategy;
+import org.apache.storm.testing.PerformanceTest;
import org.apache.storm.testing.TestWordCounter;
import org.apache.storm.testing.TestWordSpout;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.DisallowedStrategyException;
import org.apache.storm.utils.ReflectionUtils;
+import org.apache.storm.utils.Time;
import org.apache.storm.utils.Utils;
import org.apache.storm.validation.ConfigValidation;
import org.junit.BeforeClass;
+import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
import org.junit.rules.ExpectedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.INimbusTest;
-import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.TestBolt;
-import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.TestSpout;
-import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.addTopologies;
-import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.assertTopologiesFullyScheduled;
-import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.assertTopologiesNotScheduled;
-import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.createClusterConfig;
-import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.genExecsAndComps;
-import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.genSupervisors;
-import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.genTopology;
-import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.getSupervisorToCpuUsage;
-import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.getSupervisorToMemoryUsage;
-import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.userRes;
-import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.userResourcePool;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.*;
+import static org.junit.Assert.*;
public class TestResourceAwareScheduler {
@@ -98,9 +89,9 @@ public class TestResourceAwareScheduler {
Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<>(), topologies, config);
Map<String, RAS_Node> nodes = RAS_Nodes.getAllNodesFrom(cluster);
assertEquals(5, nodes.size());
- RAS_Node node = nodes.get("sup-0");
+ RAS_Node node = nodes.get("r000s000");
- assertEquals("sup-0", node.getId());
+ assertEquals("r000s000", node.getId());
assertTrue(node.isAlive());
assertEquals(0, node.getRunningTopologies().size());
assertTrue(node.isTotallyFree());
@@ -436,16 +427,16 @@ public class TestResourceAwareScheduler {
// Test2: When a supervisor fails, RAS does not alter existing assignments
executorToSlot = new HashMap<>();
- executorToSlot.put(new ExecutorDetails(0, 0), new WorkerSlot("sup-0", 0));
- executorToSlot.put(new ExecutorDetails(1, 1), new WorkerSlot("sup-0", 1));
- executorToSlot.put(new ExecutorDetails(2, 2), new WorkerSlot("sup-1", 1));
+ executorToSlot.put(new ExecutorDetails(0, 0), new WorkerSlot("r000s000", 0));
+ executorToSlot.put(new ExecutorDetails(1, 1), new WorkerSlot("r000s000", 1));
+ executorToSlot.put(new ExecutorDetails(2, 2), new WorkerSlot("r000s001", 1));
Map<String, SchedulerAssignment> existingAssignments = new HashMap<>();
assignment = new SchedulerAssignmentImpl(topology1.getId(), executorToSlot, null, null);
existingAssignments.put(topology1.getId(), assignment);
copyOfOldMapping = new HashMap<>(executorToSlot);
Set<ExecutorDetails> existingExecutors = copyOfOldMapping.keySet();
Map<String, SupervisorDetails> supMap1 = new HashMap<>(supMap);
- supMap1.remove("sup-0"); // mock the supervisor sup-0 as a failed supervisor
+ supMap1.remove("r000s000"); // mock the supervisor r000s000 as a failed supervisor
topologies = new Topologies(topology1);
Cluster cluster1 = new Cluster(iNimbus, supMap1, existingAssignments, topologies, config1);
@@ -462,19 +453,19 @@ public class TestResourceAwareScheduler {
// Test3: When a supervisor and a worker on it fails, RAS does not alter existing assignments
executorToSlot = new HashMap<>();
- executorToSlot.put(new ExecutorDetails(0, 0), new WorkerSlot("sup-0", 1)); // the worker to orphan
- executorToSlot.put(new ExecutorDetails(1, 1), new WorkerSlot("sup-0", 2)); // the worker that fails
- executorToSlot.put(new ExecutorDetails(2, 2), new WorkerSlot("sup-1", 1)); // the healthy worker
+ executorToSlot.put(new ExecutorDetails(0, 0), new WorkerSlot("r000s000", 1)); // the worker to orphan
+ executorToSlot.put(new ExecutorDetails(1, 1), new WorkerSlot("r000s000", 2)); // the worker that fails
+ executorToSlot.put(new ExecutorDetails(2, 2), new WorkerSlot("r000s001", 1)); // the healthy worker
existingAssignments = new HashMap<>();
assignment = new SchedulerAssignmentImpl(topology1.getId(), executorToSlot, null, null);
existingAssignments.put(topology1.getId(), assignment);
- // delete one worker of sup-0 (failed) from topo1 assignment to enable actual schedule for testing
+ // delete one worker of r000s000 (failed) from topo1 assignment to enable actual schedule for testing
executorToSlot.remove(new ExecutorDetails(1, 1));
copyOfOldMapping = new HashMap<>(executorToSlot);
existingExecutors = copyOfOldMapping.keySet(); // namely the two eds on the orphaned worker and the healthy worker
supMap1 = new HashMap<>(supMap);
- supMap1.remove("sup-0"); // mock the supervisor sup-0 as a failed supervisor
+ supMap1.remove("r000s000"); // mock the supervisor r000s000 as a failed supervisor
topologies = new Topologies(topology1);
cluster1 = new Cluster(iNimbus, supMap1, existingAssignments, topologies, config1);
@@ -530,7 +521,7 @@ public class TestResourceAwareScheduler {
for (int j = 0; j < 4; j++) {
ports.add(j);
}
- SupervisorDetails sup = new SupervisorDetails("sup-" + i, "host-" + i, null, ports, i == 0 ? resourceMap1 : resourceMap2);
+ SupervisorDetails sup = new SupervisorDetails("r00s00" + i, "host-" + i, null, ports, i == 0 ? resourceMap1 : resourceMap2);
supMap.put(sup.getId(), sup);
}
LOG.info("SUPERVISORS = {}", supMap);
@@ -702,7 +693,7 @@ public class TestResourceAwareScheduler {
rs.schedule(topologies, cluster);
String status = cluster.getStatusMap().get(topology2.getId());
assert status.startsWith("Not enough resources to schedule") : status;
- assert status.endsWith("0/5 executors scheduled") : status;
+ assert status.endsWith("5 executors not scheduled") : status;
assertEquals(5, cluster.getUnassignedExecutors(topology2).size());
}
@@ -989,4 +980,90 @@ public class TestResourceAwareScheduler {
Object sched = ReflectionUtils.newSchedulerStrategyInstance(allowed, config);
assertEquals(sched.getClass().getName(), allowed);
}
+
+ @Category(PerformanceTest.class)
+ @Test(timeout=30_000)
+ public void testLargeTopologiesOnLargeClusters() {
+ testLargeTopologiesCommon(DefaultResourceAwareStrategy.class.getName(), false, 1);
+ }
+
+ @Category(PerformanceTest.class)
+ @Test(timeout=60_000)
+ public void testLargeTopologiesOnLargeClustersGras() {
+ testLargeTopologiesCommon(GenericResourceAwareStrategy.class.getName(), true, 1);
+ }
+
+ public void testLargeTopologiesCommon(final String strategy, final boolean includeGpu, final int multiplier) {
+ INimbus iNimbus = new INimbusTest();
+ Map<String, SupervisorDetails> supMap = genSupervisorsWithRacks(25 * multiplier, 40, 66, 3 * multiplier, 0, 4700, 226200, new HashMap<>());
+ if (includeGpu) {
+ HashMap<String, Double> extraResources = new HashMap<>();
+ extraResources.put("my.gpu", 1.0);
+ supMap.putAll(genSupervisorsWithRacks(3 * multiplier, 40, 66, 0, 0, 4700, 226200, extraResources));
+ }
+
+ Config config = new Config();
+ config.putAll(createClusterConfig(88, 775, 25, null));
+ config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, strategy);
+
+ ResourceAwareScheduler rs = new ResourceAwareScheduler();
+
+ Map<String, TopologyDetails> topologyDetailsMap = new HashMap<>();
+ for (int i = 0; i < 11 * multiplier; i++) {
+ TopologyDetails td = genTopology(String.format("topology-%05d", i), config, 5,
+ 40, 30, 114, 0, 0, "user", 8192);
+ topologyDetailsMap.put(td.getId(), td);
+ }
+ if (includeGpu) {
+ for (int i = 0; i < multiplier; i++) {
+ TopologyBuilder builder = topologyBuilder(5, 40, 30, 114);
+ builder.setBolt("gpu-bolt", new TestBolt(), 40)
+ .addResource("my.gpu", 1.0)
+ .shuffleGrouping("spout-0");
+ TopologyDetails td = topoToTopologyDetails(String.format("topology-gpu-%05d", i), config, builder.createTopology(), 0, 0,
+ "user", 8192);
+ topologyDetailsMap.put(td.getId(), td);
+ }
+ }
+ Topologies topologies = new Topologies(topologyDetailsMap);
+ Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<>(), topologies, config);
+
+ long startTime = Time.currentTimeMillis();
+ rs.prepare(config);
+ rs.schedule(topologies, cluster);
+ long schedulingDuration = Time.currentTimeMillis() - startTime;
+ LOG.info("Scheduling took " + schedulingDuration + " ms");
+ LOG.info("HAS {} SLOTS USED", cluster.getUsedSlots().size());
+
+ Map<String, SchedulerAssignment> assignments = new TreeMap<>(cluster.getAssignments());
+
+ for (Entry<String, SchedulerAssignment> entry: assignments.entrySet()) {
+ SchedulerAssignment sa = entry.getValue();
+ Map<String, AtomicLong> slotsPerRack = new TreeMap<>();
+ for (WorkerSlot slot : sa.getSlots()) {
+ String nodeId = slot.getNodeId();
+ String rack = supervisorIdToRackName(nodeId);
+ slotsPerRack.computeIfAbsent(rack, (r) -> new AtomicLong(0)).incrementAndGet();
+ }
+ LOG.info("{} => {}", entry.getKey(), slotsPerRack);
+ }
+ }
+
+ public static void main(String[] args) {
+ String strategy = DefaultResourceAwareStrategy.class.getName();
+ if (args.length > 0) {
+ strategy = args[0];
+ }
+ boolean includeGpu = false;
+ if (args.length > 1) {
+ includeGpu = Boolean.valueOf(args[1]);
+ }
+ int multiplier = 1;
+ if (args.length > 2) {
+ multiplier = Integer.valueOf(args[2]);
+ }
+ TestResourceAwareScheduler trs = new TestResourceAwareScheduler();
+ trs.testLargeTopologiesCommon(strategy, includeGpu, multiplier);
+ System.exit(0);
+ }
}