You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by et...@apache.org on 2019/08/02 21:55:19 UTC
[storm] branch master updated: STORM-3474 Large fragmented cluster
scheduling time test
This is an automated email from the ASF dual-hosted git repository.
ethanli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push:
new 45f9d0b STORM-3474 Large fragmented cluster scheduling time test
new f8394c5 Merge pull request #3092 from dandsager1/STORM-3474
45f9d0b is described below
commit 45f9d0b12f01fcc628c4016a72e232d786d868c5
Author: dandsager <da...@verizonmedia.com>
AuthorDate: Tue Jul 16 15:16:42 2019 -0500
STORM-3474 Large fragmented cluster scheduling time test
---
.../resource/TestResourceAwareScheduler.java | 210 +++++++++++++++++++++
.../TestUtilsForResourceAwareScheduler.java | 8 +
2 files changed, 218 insertions(+)
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
index 405e2ae..17b9fa5 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
@@ -43,6 +43,7 @@ import org.apache.storm.scheduler.TopologyDetails;
import org.apache.storm.scheduler.WorkerSlot;
import org.apache.storm.scheduler.resource.normalization.NormalizedResources;
import org.apache.storm.scheduler.resource.strategies.scheduling.BaseResourceAwareStrategy;
+import org.apache.storm.scheduler.resource.strategies.scheduling.ConstraintSolverStrategy;
import org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy;
import org.apache.storm.scheduler.resource.strategies.scheduling.GenericResourceAwareStrategy;
import org.apache.storm.scheduler.resource.strategies.scheduling.IStrategy;
@@ -1067,6 +1068,215 @@ public class TestResourceAwareScheduler {
assertFalse("Topo-1 unscheduled?", cluster.getAssignmentById(topo1.getId()) != null);
}
+ protected static class TimeBlockResult {
+ List<Long> firstBlockTime;
+ List<Long> lastBlockTime;
+
+ TimeBlockResult() {
+ firstBlockTime = new ArrayList<>();
+ lastBlockTime = new ArrayList<>();
+ }
+
+ void append(TimeBlockResult other) {
+ this.firstBlockTime.addAll(other.firstBlockTime);
+ this.lastBlockTime.addAll(other.lastBlockTime);
+ }
+ }
+
+ private long getMedianValue(List<Long> values) {
+ final int numValues = values.size();
+ assert(numValues % 2 == 1); // number of values must be odd to compute median as below
+ List<Long> sortedValues = new ArrayList<Long>();
+ sortedValues.addAll(values);
+ Collections.sort(sortedValues);
+
+ final int medianIndex = (int) Math.floor(numValues / 2);
+ return sortedValues.get(medianIndex);
+ }
+
+ /**
+ * Check time to schedule a fragmented cluster using different strategies
+ *
+ * Simulate scheduling on a large production cluster. Find the ratio of time to schedule a set of topologies when
+ * the cluster is empty and when the cluster is nearly full. While the cluster has sufficient resources to schedule
+ * all topologies, when nearly full the cluster becomes fragmented and some topologies fail to schedule.
+ */
+ @Test
+ public void TestLargeFragmentedClusterScheduling() {
+ /*
+ Without fragmentation, the cluster would be able to schedule both topologies on each node. Let's call each node
+ with both topologies scheduled as 100% scheduled.
+
+ We schedule the cluster in 3 blocks of topologies, measuring the time to schedule the blocks. The first, middle
+ and last blocks attempt to schedule the following 0-10%, 10%-90%, 90%-100%. The last block has a number of
+ scheduling failures due to cluster fragmentation and its time is dominated by attempting to evict topologies.
+
+ Timing results for scheduling are noisy. As a result, we do multiple runs and use median values for FirstBlock
+ and LastBlock times. (somewhere a statistician is crying). The ratio of LastBlock / FirstBlock remains fairly constant.
+
+
+ TestLargeFragmentedClusterScheduling took 91118 ms
+ DefaultResourceAwareStrategy, FirstBlock 249.0, LastBlock 1734.0 ratio 6.963855421686747
+ GenericResourceAwareStrategy, FirstBlock 215.0, LastBlock 1673.0 ratio 7.78139534883721
+ ConstraintSolverStrategy, FirstBlock 279.0, LastBlock 2200.0 ratio 7.885304659498208
+
+ TestLargeFragmentedClusterScheduling took 98455 ms
+ DefaultResourceAwareStrategy, FirstBlock 266.0, LastBlock 1812.0 ratio 6.81203007518797
+ GenericResourceAwareStrategy, FirstBlock 235.0, LastBlock 1802.0 ratio 7.6680851063829785
+ ConstraintSolverStrategy, FirstBlock 304.0, LastBlock 2320.0 ratio 7.631578947368421
+
+ TestLargeFragmentedClusterScheduling took 97268 ms
+ DefaultResourceAwareStrategy, FirstBlock 251.0, LastBlock 1826.0 ratio 7.274900398406374
+ GenericResourceAwareStrategy, FirstBlock 220.0, LastBlock 1719.0 ratio 7.8136363636363635
+ ConstraintSolverStrategy, FirstBlock 296.0, LastBlock 2469.0 ratio 8.341216216216216
+
+ TestLargeFragmentedClusterScheduling took 97963 ms
+ DefaultResourceAwareStrategy, FirstBlock 249.0, LastBlock 1788.0 ratio 7.180722891566265
+ GenericResourceAwareStrategy, FirstBlock 240.0, LastBlock 1796.0 ratio 7.483333333333333
+ ConstraintSolverStrategy, FirstBlock 328.0, LastBlock 2544.0 ratio 7.7560975609756095
+
+ TestLargeFragmentedClusterScheduling took 93106 ms
+ DefaultResourceAwareStrategy, FirstBlock 258.0, LastBlock 1714.0 ratio 6.6434108527131785
+ GenericResourceAwareStrategy, FirstBlock 215.0, LastBlock 1692.0 ratio 7.869767441860465
+ ConstraintSolverStrategy, FirstBlock 309.0, LastBlock 2342.0 ratio 7.5792880258899675
+
+ Choose the median value of the values above
+ DefaultResourceAwareStrategy 6.96
+ GenericResourceAwareStrategy 7.78
+ ConstraintSolverStrategy 7.75
+ */
+
+ final int numNodes = 500;
+ final int numRuns = 5;
+
+ Map<String, Config> strategyToConfigs = new HashMap<>();
+ strategyToConfigs.put(DefaultResourceAwareStrategy.class.getName(), createClusterConfig(10, 10, 0, null));
+ strategyToConfigs.put(GenericResourceAwareStrategy.class.getName(), createGrasClusterConfig(10, 10, 0, null, null));
+ strategyToConfigs.put(ConstraintSolverStrategy.class.getName(), createCSSClusterConfig(10, 10, 0, null));
+
+ Map<String, TimeBlockResult> strategyToTimeBlockResults = new HashMap<>();
+
+ // AcceptedBlockTimeRatios obtained by empirical testing (see comment block above)
+ Map<String, Double> strategyToAcceptedBlockTimeRatios = new HashMap<>();
+ strategyToAcceptedBlockTimeRatios.put(DefaultResourceAwareStrategy.class.getName(), 6.96);
+ strategyToAcceptedBlockTimeRatios.put(GenericResourceAwareStrategy.class.getName(), 7.78);
+ strategyToAcceptedBlockTimeRatios.put(ConstraintSolverStrategy.class.getName(), 7.75);
+
+ // Get first and last block times for multiple runs and strategies
+ long startTime = Time.currentTimeMillis();
+ for (Entry<String, Config> strategyConfig : strategyToConfigs.entrySet()) {
+ TimeBlockResult strategyTimeBlockResult = strategyToTimeBlockResults.computeIfAbsent(strategyConfig.getKey(), (k) -> new TimeBlockResult());
+ for (int run = 0; run < numRuns; ++run) {
+ TimeBlockResult result = testLargeClusterSchedulingTiming(numNodes, strategyConfig.getValue());
+ strategyTimeBlockResult.append(result);
+ }
+ }
+
+ // Log median ratios for different strategies
+ LOG.info("TestLargeFragmentedClusterScheduling took {} ms", Time.currentTimeMillis() - startTime);
+ for (Entry<String, TimeBlockResult> strategyResult : strategyToTimeBlockResults.entrySet()) {
+ TimeBlockResult strategyTimeBlockResult = strategyResult.getValue();
+ double medianFirstBlockTime = getMedianValue(strategyTimeBlockResult.firstBlockTime);
+ double medianLastBlockTime = getMedianValue(strategyTimeBlockResult.lastBlockTime);
+ double ratio = medianLastBlockTime / medianFirstBlockTime;
+ LOG.info("{}, FirstBlock {}, LastBlock {} ratio {}", strategyResult.getKey(), medianFirstBlockTime, medianLastBlockTime, ratio);
+ }
+
+ // Check last block scheduling time does not get significantly slower
+ for (Entry<String, TimeBlockResult> strategyResult : strategyToTimeBlockResults.entrySet()) {
+ TimeBlockResult strategyTimeBlockResult = strategyResult.getValue();
+ double medianFirstBlockTime = getMedianValue(strategyTimeBlockResult.firstBlockTime);
+ double medianLastBlockTime = getMedianValue(strategyTimeBlockResult.lastBlockTime);
+ double ratio = medianLastBlockTime / medianFirstBlockTime;
+
+ double slowSchedulingThreshold = 1.5;
+ String msg = "Strategy " + strategyResult.getKey() + " scheduling is significantly slower for mostly full fragmented cluster\n";
+ msg += "Ratio was " + ratio + " Max allowed is " + (slowSchedulingThreshold * ratio);
+ assertTrue(msg, ratio < slowSchedulingThreshold * strategyToAcceptedBlockTimeRatios.get(strategyResult.getKey()));
+ }
+ }
+
+ // Create multiple copies of a test topology
+ private void addTopologyBlockToMap(Map<String, TopologyDetails> topologyMap, String baseName, Config config,
+ double spoutMemoryLoad, int[] blockIndices) {
+ TopologyBuilder builder = new TopologyBuilder();
+ builder.setSpout("testSpout", new TestSpout(), 1).setMemoryLoad(spoutMemoryLoad);
+ StormTopology stormTopology = builder.createTopology();
+ Map<ExecutorDetails, String> executorMap = genExecsAndComps(stormTopology);
+
+ for (int i = blockIndices[0]; i <= blockIndices[1]; ++i) {
+ TopologyDetails topo = new TopologyDetails(baseName + i, config, stormTopology, 0, executorMap, 0, "user");
+ topologyMap.put(topo.getId(), topo);
+ }
+ }
+
+ /*
+ * Test time to schedule large cluster scheduling with fragmentation
+ */
+ private TimeBlockResult testLargeClusterSchedulingTiming(int numNodes, Config config) {
+ // Attempt to schedule multiple copies of 2 different topologies (topo-t0 and topo-t1) in 3 blocks.
+ // Without fragmentation it is possible to schedule all topologies, but fragmentation causes topologies to not
+ // schedule for the last block.
+
+ // Get start/end indices for blocks
+ int numTopologyPairs = numNodes;
+ int increment = (int) Math.floor(numTopologyPairs * 0.1);
+ int firstBlockIndices[] = {0, increment - 1};
+ int midBlockIndices[] = {increment, numTopologyPairs - increment - 1};
+ int lastBlockIndices[] = {numTopologyPairs - increment, numTopologyPairs - 1};
+
+ // Memory is the constraining resource.
+ double t0Mem = 70; // memory required by topo-t0
+ double t1Mem = 20; // memory required by topo-t1
+ double nodeMem = 100;
+
+ // first block (0% - 10%)
+ Map<String, TopologyDetails> topologyMap = new HashMap<>();
+ addTopologyBlockToMap(topologyMap, "topo_t0-", config, t0Mem, firstBlockIndices);
+ addTopologyBlockToMap(topologyMap, "topo_t1-", config, t1Mem, firstBlockIndices);
+ Topologies topologies = new Topologies(topologyMap);
+
+ Map<String, SupervisorDetails> supMap = genSupervisors(numNodes, 7, 3500, nodeMem);
+ Cluster cluster = new Cluster(new INimbusTest(), new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<String, SchedulerAssignmentImpl>(), topologies, config);
+ TimeBlockResult timeBlockResult = new TimeBlockResult();
+
+ // schedule first block (0% - 10%)
+ {
+ scheduler = new ResourceAwareScheduler();
+ scheduler.prepare(config);
+
+ long time = Time.currentTimeMillis();
+ scheduler.schedule(topologies, cluster);
+ timeBlockResult.firstBlockTime.add(Time.currentTimeMillis() - time);
+ }
+
+ // schedule mid block (10% - 90%)
+ {
+ addTopologyBlockToMap(topologyMap, "topo_t0-", config, t0Mem, midBlockIndices);
+ addTopologyBlockToMap(topologyMap, "topo_t1-", config, t1Mem, midBlockIndices);
+
+ topologies = new Topologies(topologyMap);
+ cluster = new Cluster(cluster, topologies);
+
+ scheduler.schedule(topologies, cluster);
+ }
+
+ // schedule last block (90% to 100%)
+ {
+ addTopologyBlockToMap(topologyMap, "topo_t0-", config, t0Mem, lastBlockIndices);
+ addTopologyBlockToMap(topologyMap, "topo_t1-", config, t1Mem, lastBlockIndices);
+
+ topologies = new Topologies(topologyMap);
+ cluster = new Cluster(cluster, topologies);
+
+ long time = Time.currentTimeMillis();
+ scheduler.schedule(topologies, cluster);
+ timeBlockResult.lastBlockTime.add(Time.currentTimeMillis() - time);
+ }
+
+ return timeBlockResult;
+ }
+
/**
* Test multiple spouts and cyclic topologies
*/
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
index e29830e..276cc46 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
@@ -31,6 +31,7 @@ import org.apache.storm.scheduler.Topologies;
import org.apache.storm.scheduler.TopologyDetails;
import org.apache.storm.scheduler.WorkerSlot;
import org.apache.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy;
+import org.apache.storm.scheduler.resource.strategies.scheduling.ConstraintSolverStrategy;
import org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy;
import org.apache.storm.scheduler.resource.strategies.scheduling.GenericResourceAwareStrategy;
import org.apache.storm.spout.SpoutOutputCollector;
@@ -102,6 +103,13 @@ public class TestUtilsForResourceAwareScheduler {
return ret;
}
+ public static Config createCSSClusterConfig(double compPcore, double compOnHeap, double compOffHeap,
+ Map<String, Map<String, Number>> pools) {
+ Config config = createClusterConfig(compPcore, compOnHeap, compOffHeap, pools);
+ config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, ConstraintSolverStrategy.class.getName());
+ return config;
+ }
+
public static Config createGrasClusterConfig(double compPcore, double compOnHeap, double compOffHeap,
Map<String, Map<String, Number>> pools, Map<String, Double> genericResourceMap) {
Config config = createClusterConfig(compPcore, compOnHeap, compOffHeap, pools);