You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by et...@apache.org on 2019/08/02 21:55:19 UTC

[storm] branch master updated: STORM-3474 Large fragmented cluster scheduling time test

This is an automated email from the ASF dual-hosted git repository.

ethanli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git


The following commit(s) were added to refs/heads/master by this push:
     new 45f9d0b  STORM-3474 Large fragmented cluster scheduling time test
     new f8394c5  Merge pull request #3092 from dandsager1/STORM-3474
45f9d0b is described below

commit 45f9d0b12f01fcc628c4016a72e232d786d868c5
Author: dandsager <da...@verizonmedia.com>
AuthorDate: Tue Jul 16 15:16:42 2019 -0500

    STORM-3474 Large fragmented cluster scheduling time test
---
 .../resource/TestResourceAwareScheduler.java       | 210 +++++++++++++++++++++
 .../TestUtilsForResourceAwareScheduler.java        |   8 +
 2 files changed, 218 insertions(+)

diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
index 405e2ae..17b9fa5 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
@@ -43,6 +43,7 @@ import org.apache.storm.scheduler.TopologyDetails;
 import org.apache.storm.scheduler.WorkerSlot;
 import org.apache.storm.scheduler.resource.normalization.NormalizedResources;
 import org.apache.storm.scheduler.resource.strategies.scheduling.BaseResourceAwareStrategy;
+import org.apache.storm.scheduler.resource.strategies.scheduling.ConstraintSolverStrategy;
 import org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy;
 import org.apache.storm.scheduler.resource.strategies.scheduling.GenericResourceAwareStrategy;
 import org.apache.storm.scheduler.resource.strategies.scheduling.IStrategy;
@@ -1067,6 +1068,215 @@ public class TestResourceAwareScheduler {
         assertFalse("Topo-1 unscheduled?", cluster.getAssignmentById(topo1.getId()) != null);
     }
 
+    protected static class TimeBlockResult {
+        List<Long> firstBlockTime;
+        List<Long> lastBlockTime;
+
+        TimeBlockResult() {
+            firstBlockTime = new ArrayList<>();
+            lastBlockTime = new ArrayList<>();
+        }
+
+        void append(TimeBlockResult other) {
+            this.firstBlockTime.addAll(other.firstBlockTime);
+            this.lastBlockTime.addAll(other.lastBlockTime);
+        }
+    }
+
+    private long getMedianValue(List<Long> values) {
+        final int numValues = values.size();
+        assert(numValues % 2 == 1);     // number of values must be odd to compute median as below
+        List<Long> sortedValues = new ArrayList<Long>();
+        sortedValues.addAll(values);
+        Collections.sort(sortedValues);
+
+        final int medianIndex = (int) Math.floor(numValues / 2);
+        return sortedValues.get(medianIndex);
+    }
+
+    /**
+     * Check time to schedule a fragmented cluster using different strategies
+     *
+     * Simulate scheduling on a large production cluster. Find the ratio of time to schedule a set of topologies when
+     * the cluster is empty and when the cluster is nearly full. While the cluster has sufficient resources to schedule
+     * all topologies, when nearly full the cluster becomes fragmented and some topologies fail to schedule.
+     */
+    @Test
+    public void TestLargeFragmentedClusterScheduling() {
+        /*
+        Without fragmentation, the cluster would be able to schedule both topologies on each node. Let's call each node
+        with both topologies scheduled as 100% scheduled.
+
+        We schedule the cluster in 3 blocks of topologies, measuring the time to schedule the blocks. The first, middle
+        and last blocks attempt to schedule the following 0-10%, 10%-90%, 90%-100%. The last block has a number of
+        scheduling failures due to cluster fragmentation and its time is dominated by attempting to evict topologies.
+
+        Timing results for scheduling are noisy. As a result, we do multiple runs and use median values for FirstBlock
+        and LastBlock times. (somewhere a statistician is crying). The ratio of LastBlock / FirstBlock remains fairly constant.
+
+
+        TestLargeFragmentedClusterScheduling took 91118 ms
+        DefaultResourceAwareStrategy, FirstBlock 249.0, LastBlock 1734.0 ratio 6.963855421686747
+        GenericResourceAwareStrategy, FirstBlock 215.0, LastBlock 1673.0 ratio 7.78139534883721
+        ConstraintSolverStrategy, FirstBlock 279.0, LastBlock 2200.0 ratio 7.885304659498208
+
+        TestLargeFragmentedClusterScheduling took 98455 ms
+        DefaultResourceAwareStrategy, FirstBlock 266.0, LastBlock 1812.0 ratio 6.81203007518797
+        GenericResourceAwareStrategy, FirstBlock 235.0, LastBlock 1802.0 ratio 7.6680851063829785
+        ConstraintSolverStrategy, FirstBlock 304.0, LastBlock 2320.0 ratio 7.631578947368421
+
+        TestLargeFragmentedClusterScheduling took 97268 ms
+        DefaultResourceAwareStrategy, FirstBlock 251.0, LastBlock 1826.0 ratio 7.274900398406374
+        GenericResourceAwareStrategy, FirstBlock 220.0, LastBlock 1719.0 ratio 7.8136363636363635
+        ConstraintSolverStrategy, FirstBlock 296.0, LastBlock 2469.0 ratio 8.341216216216216
+
+        TestLargeFragmentedClusterScheduling took 97963 ms
+        DefaultResourceAwareStrategy, FirstBlock 249.0, LastBlock 1788.0 ratio 7.180722891566265
+        GenericResourceAwareStrategy, FirstBlock 240.0, LastBlock 1796.0 ratio 7.483333333333333
+        ConstraintSolverStrategy, FirstBlock 328.0, LastBlock 2544.0 ratio 7.7560975609756095
+
+        TestLargeFragmentedClusterScheduling took 93106 ms
+        DefaultResourceAwareStrategy, FirstBlock 258.0, LastBlock 1714.0 ratio 6.6434108527131785
+        GenericResourceAwareStrategy, FirstBlock 215.0, LastBlock 1692.0 ratio 7.869767441860465
+        ConstraintSolverStrategy, FirstBlock 309.0, LastBlock 2342.0 ratio 7.5792880258899675
+
+        Choose the median value of the values above
+        DefaultResourceAwareStrategy    6.96
+        GenericResourceAwareStrategy    7.78
+        ConstraintSolverStrategy        7.75
+        */
+
+        final int numNodes = 500;
+        final int numRuns = 5;
+
+        Map<String, Config> strategyToConfigs = new HashMap<>();
+        strategyToConfigs.put(DefaultResourceAwareStrategy.class.getName(), createClusterConfig(10, 10, 0, null));
+        strategyToConfigs.put(GenericResourceAwareStrategy.class.getName(), createGrasClusterConfig(10, 10, 0, null, null));
+        strategyToConfigs.put(ConstraintSolverStrategy.class.getName(), createCSSClusterConfig(10, 10, 0, null));
+
+        Map<String, TimeBlockResult> strategyToTimeBlockResults = new HashMap<>();
+
+        // AcceptedBlockTimeRatios obtained by empirical testing (see comment block above)
+        Map<String, Double> strategyToAcceptedBlockTimeRatios = new HashMap<>();
+        strategyToAcceptedBlockTimeRatios.put(DefaultResourceAwareStrategy.class.getName(), 6.96);
+        strategyToAcceptedBlockTimeRatios.put(GenericResourceAwareStrategy.class.getName(), 7.78);
+        strategyToAcceptedBlockTimeRatios.put(ConstraintSolverStrategy.class.getName(), 7.75);
+
+        // Get first and last block times for multiple runs and strategies
+        long startTime = Time.currentTimeMillis();
+        for (Entry<String, Config> strategyConfig : strategyToConfigs.entrySet()) {
+            TimeBlockResult strategyTimeBlockResult = strategyToTimeBlockResults.computeIfAbsent(strategyConfig.getKey(), (k) -> new TimeBlockResult());
+            for (int run = 0; run < numRuns; ++run) {
+                TimeBlockResult result = testLargeClusterSchedulingTiming(numNodes, strategyConfig.getValue());
+                strategyTimeBlockResult.append(result);
+            }
+        }
+
+        // Log median ratios for different strategies
+        LOG.info("TestLargeFragmentedClusterScheduling took {} ms", Time.currentTimeMillis() - startTime);
+        for (Entry<String, TimeBlockResult> strategyResult : strategyToTimeBlockResults.entrySet()) {
+            TimeBlockResult strategyTimeBlockResult = strategyResult.getValue();
+            double medianFirstBlockTime = getMedianValue(strategyTimeBlockResult.firstBlockTime);
+            double medianLastBlockTime = getMedianValue(strategyTimeBlockResult.lastBlockTime);
+            double ratio = medianLastBlockTime / medianFirstBlockTime;
+            LOG.info("{}, FirstBlock {}, LastBlock {} ratio {}", strategyResult.getKey(), medianFirstBlockTime, medianLastBlockTime, ratio);
+        }
+
+        // Check last block scheduling time does not get significantly slower
+        for (Entry<String, TimeBlockResult> strategyResult : strategyToTimeBlockResults.entrySet()) {
+            TimeBlockResult strategyTimeBlockResult = strategyResult.getValue();
+            double medianFirstBlockTime = getMedianValue(strategyTimeBlockResult.firstBlockTime);
+            double medianLastBlockTime = getMedianValue(strategyTimeBlockResult.lastBlockTime);
+            double ratio = medianLastBlockTime / medianFirstBlockTime;
+
+            double slowSchedulingThreshold = 1.5;
+            String msg = "Strategy " + strategyResult.getKey() + " scheduling is significantly slower for mostly full fragmented cluster\n";
+            msg += "Ratio was " + ratio + " Max allowed is " + (slowSchedulingThreshold * ratio);
+            assertTrue(msg, ratio < slowSchedulingThreshold * strategyToAcceptedBlockTimeRatios.get(strategyResult.getKey()));
+        }
+    }
+
+    // Create multiple copies of a test topology
+    private void addTopologyBlockToMap(Map<String, TopologyDetails> topologyMap, String baseName, Config config,
+                                       double spoutMemoryLoad, int[] blockIndices) {
+        TopologyBuilder builder = new TopologyBuilder();
+        builder.setSpout("testSpout", new TestSpout(), 1).setMemoryLoad(spoutMemoryLoad);
+        StormTopology stormTopology = builder.createTopology();
+        Map<ExecutorDetails, String> executorMap = genExecsAndComps(stormTopology);
+
+        for (int i = blockIndices[0]; i <= blockIndices[1]; ++i) {
+            TopologyDetails topo = new TopologyDetails(baseName + i, config, stormTopology, 0, executorMap, 0, "user");
+            topologyMap.put(topo.getId(), topo);
+        }
+    }
+
+    /*
+     * Test time to schedule large cluster scheduling with fragmentation
+     */
+    private TimeBlockResult testLargeClusterSchedulingTiming(int numNodes, Config config) {
+        // Attempt to schedule multiple copies of 2 different topologies (topo-t0 and topo-t1) in 3 blocks.
+        // Without fragmentation it is possible to schedule all topologies, but fragmentation causes topologies to not
+        // schedule for the last block.
+
+        // Get start/end indices for blocks
+        int numTopologyPairs = numNodes;
+        int increment = (int) Math.floor(numTopologyPairs * 0.1);
+        int firstBlockIndices[] = {0, increment - 1};
+        int midBlockIndices[] = {increment, numTopologyPairs - increment - 1};
+        int lastBlockIndices[] = {numTopologyPairs - increment, numTopologyPairs - 1};
+
+        // Memory is the constraining resource.
+        double t0Mem = 70;   // memory required by topo-t0
+        double t1Mem = 20;   // memory required by topo-t1
+        double nodeMem = 100;
+
+        // first block (0% - 10%)
+        Map<String, TopologyDetails> topologyMap = new HashMap<>();
+        addTopologyBlockToMap(topologyMap, "topo_t0-", config, t0Mem, firstBlockIndices);
+        addTopologyBlockToMap(topologyMap, "topo_t1-", config, t1Mem, firstBlockIndices);
+        Topologies topologies = new Topologies(topologyMap);
+
+        Map<String, SupervisorDetails> supMap = genSupervisors(numNodes, 7, 3500, nodeMem);
+        Cluster cluster = new Cluster(new INimbusTest(), new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<String, SchedulerAssignmentImpl>(), topologies, config);
+        TimeBlockResult timeBlockResult = new TimeBlockResult();
+
+        // schedule first block (0% - 10%)
+        {
+            scheduler = new ResourceAwareScheduler();
+            scheduler.prepare(config);
+
+            long time = Time.currentTimeMillis();
+            scheduler.schedule(topologies, cluster);
+            timeBlockResult.firstBlockTime.add(Time.currentTimeMillis() - time);
+        }
+
+        // schedule mid block (10% - 90%)
+        {
+            addTopologyBlockToMap(topologyMap, "topo_t0-", config, t0Mem, midBlockIndices);
+            addTopologyBlockToMap(topologyMap, "topo_t1-", config, t1Mem, midBlockIndices);
+
+            topologies = new Topologies(topologyMap);
+            cluster = new Cluster(cluster, topologies);
+
+            scheduler.schedule(topologies, cluster);
+        }
+
+        // schedule last block (90% to 100%)
+        {
+            addTopologyBlockToMap(topologyMap, "topo_t0-", config, t0Mem, lastBlockIndices);
+            addTopologyBlockToMap(topologyMap, "topo_t1-", config, t1Mem, lastBlockIndices);
+
+            topologies = new Topologies(topologyMap);
+            cluster = new Cluster(cluster, topologies);
+
+            long time = Time.currentTimeMillis();
+            scheduler.schedule(topologies, cluster);
+            timeBlockResult.lastBlockTime.add(Time.currentTimeMillis() - time);
+        }
+
+        return timeBlockResult;
+    }
+
     /**
      * Test multiple spouts and cyclic topologies
      */
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
index e29830e..276cc46 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
@@ -31,6 +31,7 @@ import org.apache.storm.scheduler.Topologies;
 import org.apache.storm.scheduler.TopologyDetails;
 import org.apache.storm.scheduler.WorkerSlot;
 import org.apache.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy;
+import org.apache.storm.scheduler.resource.strategies.scheduling.ConstraintSolverStrategy;
 import org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy;
 import org.apache.storm.scheduler.resource.strategies.scheduling.GenericResourceAwareStrategy;
 import org.apache.storm.spout.SpoutOutputCollector;
@@ -102,6 +103,13 @@ public class TestUtilsForResourceAwareScheduler {
         return ret;
     }
 
+    public static Config createCSSClusterConfig(double compPcore, double compOnHeap, double compOffHeap,
+                                                Map<String, Map<String, Number>> pools) {
+        Config config = createClusterConfig(compPcore, compOnHeap, compOffHeap, pools);
+        config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, ConstraintSolverStrategy.class.getName());
+        return config;
+    }
+
     public static Config createGrasClusterConfig(double compPcore, double compOnHeap, double compOffHeap,
                                                  Map<String, Map<String, Number>> pools, Map<String, Double> genericResourceMap) {
         Config config = createClusterConfig(compPcore, compOnHeap, compOffHeap, pools);