You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by GitBox <gi...@apache.org> on 2021/02/22 06:02:27 UTC

[GitHub] [storm] Ethanlm commented on a change in pull request #3378: [STORM-3743] Add new topologies and change TestLargeCluster

Ethanlm commented on a change in pull request #3378:
URL: https://github.com/apache/storm/pull/3378#discussion_r579992649



##########
File path: storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestTopologyAnonymizerUtils.java
##########
@@ -66,8 +66,10 @@
 public class TestTopologyAnonymizerUtils {
     private static final Logger LOG = LoggerFactory.getLogger(TestTopologyAnonymizerUtils.class);
 
-    private static final String DEFAULT_ORIGINAL_RESOURCES_PATH = "clusterconf/iridiumblue";
-    private static final String DEFAULT_ANONYMIZED_RESOURCES_OUTDIR = "src/test/resources/clusterconf/largeCluster01";
+    // iridiumblue -> largeCluster02 (prior largeCluster01)
+    // ebonyred -> largeCluster03

Review comment:
       These comments don't mean much to other people and can be confusing. Can we delete them?

##########
File path: storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestLargeCluster.java
##########
@@ -62,12 +62,17 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicInteger;
 
 @ExtendWith({NormalizedResourcesExtension.class})
 public class TestLargeCluster {
     private static final Logger LOG = LoggerFactory.getLogger(TestLargeCluster.class);
 
-    public static final String TEST_CLUSTER_NAME = "largeCluster01";
+    public static final String TEST_CLUSTER_01 = "largeCluster01";
+    public static final String TEST_CLUSTER_02 = "largeCluster02";
+    public static final String TEST_CLUSTER_03 = "largeCluster03";
+
+    public static final String TEST_CLUSTER_NAME = TEST_CLUSTER_02;

Review comment:
       Please help me understand why these variables are declared.  Looks like only `TEST_CLUSTER_NAME` is used. Don't we want to test all three clusters?

##########
File path: storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestLargeCluster.java
##########
@@ -312,57 +321,93 @@ private static void createAndAddOneSupervisor(
     }
 
     /**
-     * Create supervisors.
+     * Create supervisors for a larger cluster configuration.
      *
-     * @param uniformSupervisors true if all supervisors are of the same size, false otherwise.
-     * @return supervisor details indexed by id
+     * @param reducedSupervisorsPerRack number of supervisors to reduce in rack.
+     * @return created supervisors.
      */
-    private static Map<String, SupervisorDetails> createSupervisors(boolean uniformSupervisors) {
-        Map<String, SupervisorDetails> retVal;
-        if (uniformSupervisors) {
-            int numRacks = 16;
-            int numSupersPerRack = 82;
-            int numPorts = 50; // note: scheduling is slower when components with large cpu/mem leave large percent of workerslots unused
-            int rackStart = 0;
-            int superInRackStart = 1;
-            double cpu = 7200; // %percent
-            double mem = 356_000; // MB
-            Map<String, Double> miscResources = new HashMap<>();
-            miscResources.put("network.resource.units", 100.0);
-
-            return TestUtilsForResourceAwareScheduler.genSupervisorsWithRacks(
-                    numRacks, numSupersPerRack, numPorts, rackStart, superInRackStart, cpu, mem, miscResources);
-
+    private static Map<String, SupervisorDetails> createSupervisors(int reducedSupervisorsPerRack) {

Review comment:
       `reducedSupervisorsPerRack` is always 0. Can we delete it ?

##########
File path: storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestLargeCluster.java
##########
@@ -312,57 +321,93 @@ private static void createAndAddOneSupervisor(
     }
 
     /**
-     * Create supervisors.
+     * Create supervisors for a larger cluster configuration.
      *
-     * @param uniformSupervisors true if all supervisors are of the same size, false otherwise.
-     * @return supervisor details indexed by id
+     * @param reducedSupervisorsPerRack number of supervisors to reduce in rack.
+     * @return created supervisors.
      */
-    private static Map<String, SupervisorDetails> createSupervisors(boolean uniformSupervisors) {
-        Map<String, SupervisorDetails> retVal;
-        if (uniformSupervisors) {
-            int numRacks = 16;
-            int numSupersPerRack = 82;
-            int numPorts = 50; // note: scheduling is slower when components with large cpu/mem leave large percent of workerslots unused
-            int rackStart = 0;
-            int superInRackStart = 1;
-            double cpu = 7200; // %percent
-            double mem = 356_000; // MB
-            Map<String, Double> miscResources = new HashMap<>();
-            miscResources.put("network.resource.units", 100.0);
-
-            return TestUtilsForResourceAwareScheduler.genSupervisorsWithRacks(
-                    numRacks, numSupersPerRack, numPorts, rackStart, superInRackStart, cpu, mem, miscResources);
-
+    private static Map<String, SupervisorDetails> createSupervisors(int reducedSupervisorsPerRack) {
+        if (TEST_CLUSTER_NAME.equals(TEST_CLUSTER_02)) {
+            Collection<SupervisorDistribution> supervisorDistributions = SupervisorDistribution.getSupervisorDistribution02();
+            return createSupervisors(supervisorDistributions, reducedSupervisorsPerRack);
+        } else if (TEST_CLUSTER_NAME.equals(TEST_CLUSTER_03)) {
+            Collection<SupervisorDistribution> supervisorDistributions = SupervisorDistribution.getSupervisorDistribution03();
+            return createSupervisors(supervisorDistributions, reducedSupervisorsPerRack);
         } else {
-            // this non-uniform supervisor distribution closely (but not exactly) mimics a large cluster in use
-            int numSupersPerRack = 82;
-            int numPorts = 50;
-
-            Map<String, SupervisorDetails> retList = new HashMap<>();
+            return createSupervisorsForCluster01(reducedSupervisorsPerRack);
+        }
+    }
 
-            for (int rack = 0 ; rack < 12 ; rack++) {
-                double cpu = 3600; // %percent
-                double mem = 178_000; // MB
-                for (int superInRack = 0; superInRack < numSupersPerRack ; superInRack++) {
-                    createAndAddOneSupervisor(rack, superInRack, cpu - 100 * (superInRack % 2), mem, numPorts, retList);
-                }
+    /**
+     * Create supervisors based on a predefined supervisor distribution modeled after an existing
+     * large cluster in use.
+     *
+     * @param supervisorDistributions supervisor distribution to use.
+     * @param reducedSupervisorsPerRack number of supervisors to reduce per rack.
+     * @return created supervisors.
+     */
+    private static Map<String, SupervisorDetails> createSupervisors(
+        Collection<SupervisorDistribution> supervisorDistributions, int reducedSupervisorsPerRack) {
+        Map<String, Collection<SupervisorDistribution>> byRackId = SupervisorDistribution.mapByRackId(supervisorDistributions);
+        LOG.info("Cluster={}, Designed capacity: {}", TEST_CLUSTER_NAME, SupervisorDistribution.clusterCapacity(supervisorDistributions));
+
+        Map<String, SupervisorDetails> retList = new HashMap<>();
+        Map<String, AtomicInteger> seenRacks = new HashMap<>();
+        byRackId.forEach((rackId, list) -> {
+            int tmpRackSupervisorCnt = list.stream().mapToInt(x -> x.supervisorCnt).sum() - Math.abs(reducedSupervisorsPerRack);
+            if (tmpRackSupervisorCnt > Math.abs(reducedSupervisorsPerRack)) {
+                tmpRackSupervisorCnt -= Math.abs(reducedSupervisorsPerRack);
             }
-            for (int rack = 12 ; rack < 14 ; rack++) {
-                double cpu = 2400; // %percent
-                double mem = 118_100; // MB
-                for (int superInRack = 0; superInRack < numSupersPerRack ; superInRack++) {
-                    createAndAddOneSupervisor(rack, superInRack, cpu - 100 * (superInRack % 2), mem, numPorts, retList);
+            final int adjustedRackSupervisorCnt = tmpRackSupervisorCnt;
+            list.forEach(x -> {
+                int supervisorCnt = x.supervisorCnt;
+                for (int i = 0; i < supervisorCnt ; i++) {
+                    int superInRack = seenRacks.computeIfAbsent(rackId, z -> new AtomicInteger(-1)).incrementAndGet();
+                    int rackNum = seenRacks.size() - 1;
+                    if (superInRack >= adjustedRackSupervisorCnt) {
+                        continue;
+                    }
+                    createAndAddOneSupervisor(rackNum, superInRack, x.cpuPercent, x.memoryMb, x.slotCnt, retList);

Review comment:
       Why recreate rackNum instead of reusing `rackId` from `SupervisorDistribution`

##########
File path: storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestLargeCluster.java
##########
@@ -312,57 +321,93 @@ private static void createAndAddOneSupervisor(
     }
 
     /**
-     * Create supervisors.
+     * Create supervisors for a larger cluster configuration.
      *
-     * @param uniformSupervisors true if all supervisors are of the same size, false otherwise.
-     * @return supervisor details indexed by id
+     * @param reducedSupervisorsPerRack number of supervisors to reduce in rack.
+     * @return created supervisors.
      */
-    private static Map<String, SupervisorDetails> createSupervisors(boolean uniformSupervisors) {
-        Map<String, SupervisorDetails> retVal;
-        if (uniformSupervisors) {
-            int numRacks = 16;
-            int numSupersPerRack = 82;
-            int numPorts = 50; // note: scheduling is slower when components with large cpu/mem leave large percent of workerslots unused
-            int rackStart = 0;
-            int superInRackStart = 1;
-            double cpu = 7200; // %percent
-            double mem = 356_000; // MB
-            Map<String, Double> miscResources = new HashMap<>();
-            miscResources.put("network.resource.units", 100.0);
-
-            return TestUtilsForResourceAwareScheduler.genSupervisorsWithRacks(
-                    numRacks, numSupersPerRack, numPorts, rackStart, superInRackStart, cpu, mem, miscResources);
-
+    private static Map<String, SupervisorDetails> createSupervisors(int reducedSupervisorsPerRack) {
+        if (TEST_CLUSTER_NAME.equals(TEST_CLUSTER_02)) {
+            Collection<SupervisorDistribution> supervisorDistributions = SupervisorDistribution.getSupervisorDistribution02();
+            return createSupervisors(supervisorDistributions, reducedSupervisorsPerRack);
+        } else if (TEST_CLUSTER_NAME.equals(TEST_CLUSTER_03)) {
+            Collection<SupervisorDistribution> supervisorDistributions = SupervisorDistribution.getSupervisorDistribution03();
+            return createSupervisors(supervisorDistributions, reducedSupervisorsPerRack);
         } else {
-            // this non-uniform supervisor distribution closely (but not exactly) mimics a large cluster in use
-            int numSupersPerRack = 82;
-            int numPorts = 50;
-
-            Map<String, SupervisorDetails> retList = new HashMap<>();
+            return createSupervisorsForCluster01(reducedSupervisorsPerRack);

Review comment:
       Is it possible to replace `createSupervisorsForCluster01` with some method like other clusters? i.e. to have a new method called `SupervisorDistribution.getSupervisorDistribution01();` and used it in the same way like other clusters?

##########
File path: storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestLargeCluster.java
##########
@@ -312,57 +321,93 @@ private static void createAndAddOneSupervisor(
     }
 
     /**
-     * Create supervisors.
+     * Create supervisors for a larger cluster configuration.
      *
-     * @param uniformSupervisors true if all supervisors are of the same size, false otherwise.
-     * @return supervisor details indexed by id
+     * @param reducedSupervisorsPerRack number of supervisors to reduce in rack.
+     * @return created supervisors.
      */
-    private static Map<String, SupervisorDetails> createSupervisors(boolean uniformSupervisors) {
-        Map<String, SupervisorDetails> retVal;
-        if (uniformSupervisors) {
-            int numRacks = 16;
-            int numSupersPerRack = 82;
-            int numPorts = 50; // note: scheduling is slower when components with large cpu/mem leave large percent of workerslots unused
-            int rackStart = 0;
-            int superInRackStart = 1;
-            double cpu = 7200; // %percent
-            double mem = 356_000; // MB
-            Map<String, Double> miscResources = new HashMap<>();
-            miscResources.put("network.resource.units", 100.0);
-
-            return TestUtilsForResourceAwareScheduler.genSupervisorsWithRacks(
-                    numRacks, numSupersPerRack, numPorts, rackStart, superInRackStart, cpu, mem, miscResources);
-
+    private static Map<String, SupervisorDetails> createSupervisors(int reducedSupervisorsPerRack) {
+        if (TEST_CLUSTER_NAME.equals(TEST_CLUSTER_02)) {
+            Collection<SupervisorDistribution> supervisorDistributions = SupervisorDistribution.getSupervisorDistribution02();
+            return createSupervisors(supervisorDistributions, reducedSupervisorsPerRack);
+        } else if (TEST_CLUSTER_NAME.equals(TEST_CLUSTER_03)) {
+            Collection<SupervisorDistribution> supervisorDistributions = SupervisorDistribution.getSupervisorDistribution03();
+            return createSupervisors(supervisorDistributions, reducedSupervisorsPerRack);
         } else {
-            // this non-uniform supervisor distribution closely (but not exactly) mimics a large cluster in use
-            int numSupersPerRack = 82;
-            int numPorts = 50;
-
-            Map<String, SupervisorDetails> retList = new HashMap<>();
+            return createSupervisorsForCluster01(reducedSupervisorsPerRack);
+        }
+    }
 
-            for (int rack = 0 ; rack < 12 ; rack++) {
-                double cpu = 3600; // %percent
-                double mem = 178_000; // MB
-                for (int superInRack = 0; superInRack < numSupersPerRack ; superInRack++) {
-                    createAndAddOneSupervisor(rack, superInRack, cpu - 100 * (superInRack % 2), mem, numPorts, retList);
-                }
+    /**
+     * Create supervisors based on a predefined supervisor distribution modeled after an existing
+     * large cluster in use.
+     *
+     * @param supervisorDistributions supervisor distribution to use.
+     * @param reducedSupervisorsPerRack number of supervisors to reduce per rack.
+     * @return created supervisors.
+     */
+    private static Map<String, SupervisorDetails> createSupervisors(
+        Collection<SupervisorDistribution> supervisorDistributions, int reducedSupervisorsPerRack) {
+        Map<String, Collection<SupervisorDistribution>> byRackId = SupervisorDistribution.mapByRackId(supervisorDistributions);
+        LOG.info("Cluster={}, Designed capacity: {}", TEST_CLUSTER_NAME, SupervisorDistribution.clusterCapacity(supervisorDistributions));
+
+        Map<String, SupervisorDetails> retList = new HashMap<>();
+        Map<String, AtomicInteger> seenRacks = new HashMap<>();
+        byRackId.forEach((rackId, list) -> {
+            int tmpRackSupervisorCnt = list.stream().mapToInt(x -> x.supervisorCnt).sum() - Math.abs(reducedSupervisorsPerRack);
+            if (tmpRackSupervisorCnt > Math.abs(reducedSupervisorsPerRack)) {

Review comment:
       Why does it delete `reducedSupervisorsPerRack` again?

##########
File path: storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestLargeCluster.java
##########
@@ -312,57 +321,93 @@ private static void createAndAddOneSupervisor(
     }
 
     /**
-     * Create supervisors.
+     * Create supervisors for a larger cluster configuration.
      *
-     * @param uniformSupervisors true if all supervisors are of the same size, false otherwise.
-     * @return supervisor details indexed by id
+     * @param reducedSupervisorsPerRack number of supervisors to reduce in rack.
+     * @return created supervisors.
      */
-    private static Map<String, SupervisorDetails> createSupervisors(boolean uniformSupervisors) {
-        Map<String, SupervisorDetails> retVal;
-        if (uniformSupervisors) {
-            int numRacks = 16;
-            int numSupersPerRack = 82;
-            int numPorts = 50; // note: scheduling is slower when components with large cpu/mem leave large percent of workerslots unused
-            int rackStart = 0;
-            int superInRackStart = 1;
-            double cpu = 7200; // %percent
-            double mem = 356_000; // MB
-            Map<String, Double> miscResources = new HashMap<>();
-            miscResources.put("network.resource.units", 100.0);
-
-            return TestUtilsForResourceAwareScheduler.genSupervisorsWithRacks(
-                    numRacks, numSupersPerRack, numPorts, rackStart, superInRackStart, cpu, mem, miscResources);
-
+    private static Map<String, SupervisorDetails> createSupervisors(int reducedSupervisorsPerRack) {
+        if (TEST_CLUSTER_NAME.equals(TEST_CLUSTER_02)) {
+            Collection<SupervisorDistribution> supervisorDistributions = SupervisorDistribution.getSupervisorDistribution02();
+            return createSupervisors(supervisorDistributions, reducedSupervisorsPerRack);
+        } else if (TEST_CLUSTER_NAME.equals(TEST_CLUSTER_03)) {
+            Collection<SupervisorDistribution> supervisorDistributions = SupervisorDistribution.getSupervisorDistribution03();
+            return createSupervisors(supervisorDistributions, reducedSupervisorsPerRack);
         } else {
-            // this non-uniform supervisor distribution closely (but not exactly) mimics a large cluster in use
-            int numSupersPerRack = 82;
-            int numPorts = 50;
-
-            Map<String, SupervisorDetails> retList = new HashMap<>();
+            return createSupervisorsForCluster01(reducedSupervisorsPerRack);
+        }
+    }
 
-            for (int rack = 0 ; rack < 12 ; rack++) {
-                double cpu = 3600; // %percent
-                double mem = 178_000; // MB
-                for (int superInRack = 0; superInRack < numSupersPerRack ; superInRack++) {
-                    createAndAddOneSupervisor(rack, superInRack, cpu - 100 * (superInRack % 2), mem, numPorts, retList);
-                }
+    /**
+     * Create supervisors based on a predefined supervisor distribution modeled after an existing
+     * large cluster in use.
+     *
+     * @param supervisorDistributions supervisor distribution to use.
+     * @param reducedSupervisorsPerRack number of supervisors to reduce per rack.
+     * @return created supervisors.
+     */
+    private static Map<String, SupervisorDetails> createSupervisors(
+        Collection<SupervisorDistribution> supervisorDistributions, int reducedSupervisorsPerRack) {
+        Map<String, Collection<SupervisorDistribution>> byRackId = SupervisorDistribution.mapByRackId(supervisorDistributions);
+        LOG.info("Cluster={}, Designed capacity: {}", TEST_CLUSTER_NAME, SupervisorDistribution.clusterCapacity(supervisorDistributions));
+
+        Map<String, SupervisorDetails> retList = new HashMap<>();
+        Map<String, AtomicInteger> seenRacks = new HashMap<>();

Review comment:
       Why is `AtomicInteger` necessary?

##########
File path: storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestLargeCluster.java
##########
@@ -312,57 +321,93 @@ private static void createAndAddOneSupervisor(
     }
 
     /**
-     * Create supervisors.
+     * Create supervisors for a larger cluster configuration.
      *
-     * @param uniformSupervisors true if all supervisors are of the same size, false otherwise.
-     * @return supervisor details indexed by id
+     * @param reducedSupervisorsPerRack number of supervisors to reduce in rack.
+     * @return created supervisors.
      */
-    private static Map<String, SupervisorDetails> createSupervisors(boolean uniformSupervisors) {
-        Map<String, SupervisorDetails> retVal;
-        if (uniformSupervisors) {
-            int numRacks = 16;
-            int numSupersPerRack = 82;
-            int numPorts = 50; // note: scheduling is slower when components with large cpu/mem leave large percent of workerslots unused
-            int rackStart = 0;
-            int superInRackStart = 1;
-            double cpu = 7200; // %percent
-            double mem = 356_000; // MB
-            Map<String, Double> miscResources = new HashMap<>();
-            miscResources.put("network.resource.units", 100.0);
-
-            return TestUtilsForResourceAwareScheduler.genSupervisorsWithRacks(
-                    numRacks, numSupersPerRack, numPorts, rackStart, superInRackStart, cpu, mem, miscResources);
-
+    private static Map<String, SupervisorDetails> createSupervisors(int reducedSupervisorsPerRack) {
+        if (TEST_CLUSTER_NAME.equals(TEST_CLUSTER_02)) {
+            Collection<SupervisorDistribution> supervisorDistributions = SupervisorDistribution.getSupervisorDistribution02();
+            return createSupervisors(supervisorDistributions, reducedSupervisorsPerRack);
+        } else if (TEST_CLUSTER_NAME.equals(TEST_CLUSTER_03)) {
+            Collection<SupervisorDistribution> supervisorDistributions = SupervisorDistribution.getSupervisorDistribution03();
+            return createSupervisors(supervisorDistributions, reducedSupervisorsPerRack);
         } else {
-            // this non-uniform supervisor distribution closely (but not exactly) mimics a large cluster in use
-            int numSupersPerRack = 82;
-            int numPorts = 50;
-
-            Map<String, SupervisorDetails> retList = new HashMap<>();
+            return createSupervisorsForCluster01(reducedSupervisorsPerRack);
+        }
+    }
 
-            for (int rack = 0 ; rack < 12 ; rack++) {
-                double cpu = 3600; // %percent
-                double mem = 178_000; // MB
-                for (int superInRack = 0; superInRack < numSupersPerRack ; superInRack++) {
-                    createAndAddOneSupervisor(rack, superInRack, cpu - 100 * (superInRack % 2), mem, numPorts, retList);
-                }
+    /**
+     * Create supervisors based on a predefined supervisor distribution modeled after an existing
+     * large cluster in use.
+     *
+     * @param supervisorDistributions supervisor distribution to use.
+     * @param reducedSupervisorsPerRack number of supervisors to reduce per rack.
+     * @return created supervisors.
+     */
+    private static Map<String, SupervisorDetails> createSupervisors(
+        Collection<SupervisorDistribution> supervisorDistributions, int reducedSupervisorsPerRack) {
+        Map<String, Collection<SupervisorDistribution>> byRackId = SupervisorDistribution.mapByRackId(supervisorDistributions);
+        LOG.info("Cluster={}, Designed capacity: {}", TEST_CLUSTER_NAME, SupervisorDistribution.clusterCapacity(supervisorDistributions));
+
+        Map<String, SupervisorDetails> retList = new HashMap<>();
+        Map<String, AtomicInteger> seenRacks = new HashMap<>();
+        byRackId.forEach((rackId, list) -> {
+            int tmpRackSupervisorCnt = list.stream().mapToInt(x -> x.supervisorCnt).sum() - Math.abs(reducedSupervisorsPerRack);
+            if (tmpRackSupervisorCnt > Math.abs(reducedSupervisorsPerRack)) {
+                tmpRackSupervisorCnt -= Math.abs(reducedSupervisorsPerRack);
             }
-            for (int rack = 12 ; rack < 14 ; rack++) {
-                double cpu = 2400; // %percent
-                double mem = 118_100; // MB
-                for (int superInRack = 0; superInRack < numSupersPerRack ; superInRack++) {
-                    createAndAddOneSupervisor(rack, superInRack, cpu - 100 * (superInRack % 2), mem, numPorts, retList);
+            final int adjustedRackSupervisorCnt = tmpRackSupervisorCnt;
+            list.forEach(x -> {
+                int supervisorCnt = x.supervisorCnt;
+                for (int i = 0; i < supervisorCnt ; i++) {

Review comment:
       nit: delete space after `supervisorCnt`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org