You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by GitBox <gi...@apache.org> on 2021/02/17 14:41:42 UTC

[GitHub] [storm] bipinprasad opened a new pull request #3378: [STORM-3743] Add new topologies and change TestLargeCluster

bipinprasad opened a new pull request #3378:
URL: https://github.com/apache/storm/pull/3378


   ## What is the purpose of the change
   
   *Add a new larger set of Topologies and create a new set more uneven distribution of supervisors*
   
   ## How was the change tested
   
   *Run TestLargeCluster in storm-server by mvn test -Dtest="TestLargeCluster"*


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] Ethanlm commented on a change in pull request #3378: [STORM-3743] Add new topologies and change TestLargeCluster

Posted by GitBox <gi...@apache.org>.
Ethanlm commented on a change in pull request #3378:
URL: https://github.com/apache/storm/pull/3378#discussion_r583371646



##########
File path: storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestLargeCluster.java
##########
@@ -312,57 +321,93 @@ private static void createAndAddOneSupervisor(
     }
 
     /**
-     * Create supervisors.
+     * Create supervisors for a larger cluster configuration.
      *
-     * @param uniformSupervisors true if all supervisors are of the same size, false otherwise.
-     * @return supervisor details indexed by id
+     * @param reducedSupervisorsPerRack number of supervisors to reduce in rack.
+     * @return created supervisors.
      */
-    private static Map<String, SupervisorDetails> createSupervisors(boolean uniformSupervisors) {
-        Map<String, SupervisorDetails> retVal;
-        if (uniformSupervisors) {
-            int numRacks = 16;
-            int numSupersPerRack = 82;
-            int numPorts = 50; // note: scheduling is slower when components with large cpu/mem leave large percent of workerslots unused
-            int rackStart = 0;
-            int superInRackStart = 1;
-            double cpu = 7200; // %percent
-            double mem = 356_000; // MB
-            Map<String, Double> miscResources = new HashMap<>();
-            miscResources.put("network.resource.units", 100.0);
-
-            return TestUtilsForResourceAwareScheduler.genSupervisorsWithRacks(
-                    numRacks, numSupersPerRack, numPorts, rackStart, superInRackStart, cpu, mem, miscResources);
-
+    private static Map<String, SupervisorDetails> createSupervisors(int reducedSupervisorsPerRack) {
+        if (TEST_CLUSTER_NAME.equals(TEST_CLUSTER_02)) {
+            Collection<SupervisorDistribution> supervisorDistributions = SupervisorDistribution.getSupervisorDistribution02();
+            return createSupervisors(supervisorDistributions, reducedSupervisorsPerRack);
+        } else if (TEST_CLUSTER_NAME.equals(TEST_CLUSTER_03)) {
+            Collection<SupervisorDistribution> supervisorDistributions = SupervisorDistribution.getSupervisorDistribution03();
+            return createSupervisors(supervisorDistributions, reducedSupervisorsPerRack);
         } else {
-            // this non-uniform supervisor distribution closely (but not exactly) mimics a large cluster in use
-            int numSupersPerRack = 82;
-            int numPorts = 50;
-
-            Map<String, SupervisorDetails> retList = new HashMap<>();
+            return createSupervisorsForCluster01(reducedSupervisorsPerRack);
+        }
+    }
 
-            for (int rack = 0 ; rack < 12 ; rack++) {
-                double cpu = 3600; // %percent
-                double mem = 178_000; // MB
-                for (int superInRack = 0; superInRack < numSupersPerRack ; superInRack++) {
-                    createAndAddOneSupervisor(rack, superInRack, cpu - 100 * (superInRack % 2), mem, numPorts, retList);
-                }
+    /**
+     * Create supervisors based on a predefined supervisor distribution modeled after an existing
+     * large cluster in use.
+     *
+     * @param supervisorDistributions supervisor distribution to use.
+     * @param reducedSupervisorsPerRack number of supervisors to reduce per rack.
+     * @return created supervisors.
+     */
+    private static Map<String, SupervisorDetails> createSupervisors(
+        Collection<SupervisorDistribution> supervisorDistributions, int reducedSupervisorsPerRack) {
+        Map<String, Collection<SupervisorDistribution>> byRackId = SupervisorDistribution.mapByRackId(supervisorDistributions);
+        LOG.info("Cluster={}, Designed capacity: {}", TEST_CLUSTER_NAME, SupervisorDistribution.clusterCapacity(supervisorDistributions));
+
+        Map<String, SupervisorDetails> retList = new HashMap<>();
+        Map<String, AtomicInteger> seenRacks = new HashMap<>();

Review comment:
       I still have my opinion on the use of AtomicInteger. But that's fine. I won't block this PR because of it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] Ethanlm commented on a change in pull request #3378: [STORM-3743] Add new topologies and change TestLargeCluster

Posted by GitBox <gi...@apache.org>.
Ethanlm commented on a change in pull request #3378:
URL: https://github.com/apache/storm/pull/3378#discussion_r583371212



##########
File path: storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestLargeCluster.java
##########
@@ -312,57 +321,93 @@ private static void createAndAddOneSupervisor(
     }
 
     /**
-     * Create supervisors.
+     * Create supervisors for a larger cluster configuration.
      *
-     * @param uniformSupervisors true if all supervisors are of the same size, false otherwise.
-     * @return supervisor details indexed by id
+     * @param reducedSupervisorsPerRack number of supervisors to reduce in rack.
+     * @return created supervisors.
      */
-    private static Map<String, SupervisorDetails> createSupervisors(boolean uniformSupervisors) {
-        Map<String, SupervisorDetails> retVal;
-        if (uniformSupervisors) {
-            int numRacks = 16;
-            int numSupersPerRack = 82;
-            int numPorts = 50; // note: scheduling is slower when components with large cpu/mem leave large percent of workerslots unused
-            int rackStart = 0;
-            int superInRackStart = 1;
-            double cpu = 7200; // %percent
-            double mem = 356_000; // MB
-            Map<String, Double> miscResources = new HashMap<>();
-            miscResources.put("network.resource.units", 100.0);
-
-            return TestUtilsForResourceAwareScheduler.genSupervisorsWithRacks(
-                    numRacks, numSupersPerRack, numPorts, rackStart, superInRackStart, cpu, mem, miscResources);
-
+    private static Map<String, SupervisorDetails> createSupervisors(int reducedSupervisorsPerRack) {
+        if (TEST_CLUSTER_NAME.equals(TEST_CLUSTER_02)) {
+            Collection<SupervisorDistribution> supervisorDistributions = SupervisorDistribution.getSupervisorDistribution02();
+            return createSupervisors(supervisorDistributions, reducedSupervisorsPerRack);
+        } else if (TEST_CLUSTER_NAME.equals(TEST_CLUSTER_03)) {
+            Collection<SupervisorDistribution> supervisorDistributions = SupervisorDistribution.getSupervisorDistribution03();
+            return createSupervisors(supervisorDistributions, reducedSupervisorsPerRack);
         } else {
-            // this non-uniform supervisor distribution closely (but not exactly) mimics a large cluster in use
-            int numSupersPerRack = 82;
-            int numPorts = 50;
-
-            Map<String, SupervisorDetails> retList = new HashMap<>();
+            return createSupervisorsForCluster01(reducedSupervisorsPerRack);
+        }
+    }
 
-            for (int rack = 0 ; rack < 12 ; rack++) {
-                double cpu = 3600; // %percent
-                double mem = 178_000; // MB
-                for (int superInRack = 0; superInRack < numSupersPerRack ; superInRack++) {
-                    createAndAddOneSupervisor(rack, superInRack, cpu - 100 * (superInRack % 2), mem, numPorts, retList);
-                }
+    /**
+     * Create supervisors based on a predefined supervisor distribution modeled after an existing
+     * large cluster in use.
+     *
+     * @param supervisorDistributions supervisor distribution to use.
+     * @param reducedSupervisorsPerRack number of supervisors to reduce per rack.
+     * @return created supervisors.
+     */
+    private static Map<String, SupervisorDetails> createSupervisors(
+        Collection<SupervisorDistribution> supervisorDistributions, int reducedSupervisorsPerRack) {
+        Map<String, Collection<SupervisorDistribution>> byRackId = SupervisorDistribution.mapByRackId(supervisorDistributions);
+        LOG.info("Cluster={}, Designed capacity: {}", TEST_CLUSTER_NAME, SupervisorDistribution.clusterCapacity(supervisorDistributions));
+
+        Map<String, SupervisorDetails> retList = new HashMap<>();
+        Map<String, AtomicInteger> seenRacks = new HashMap<>();
+        byRackId.forEach((rackId, list) -> {
+            int tmpRackSupervisorCnt = list.stream().mapToInt(x -> x.supervisorCnt).sum() - Math.abs(reducedSupervisorsPerRack);
+            if (tmpRackSupervisorCnt > Math.abs(reducedSupervisorsPerRack)) {

Review comment:
       No no my questions is that why we have 
   ```
   int tmpRackSupervisorCnt = list.stream().mapToInt(x -> x.supervisorCnt).sum() - Math.abs(reducedSupervisorsPerRack);
               if (tmpRackSupervisorCnt > Math.abs(reducedSupervisorsPerRack)) {
                   tmpRackSupervisorCnt -= Math.abs(reducedSupervisorsPerRack);
               }
   ```
   which essentially deletes `reducedSupervisorsPerRack` from `tmpRackSupervisorCnt` twice? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] bipinprasad merged pull request #3378: [STORM-3743] Add new topologies and change TestLargeCluster

Posted by GitBox <gi...@apache.org>.
bipinprasad merged pull request #3378:
URL: https://github.com/apache/storm/pull/3378


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] bipinprasad commented on a change in pull request #3378: [STORM-3743] Add new topologies and change TestLargeCluster

Posted by GitBox <gi...@apache.org>.
bipinprasad commented on a change in pull request #3378:
URL: https://github.com/apache/storm/pull/3378#discussion_r580377547



##########
File path: storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestLargeCluster.java
##########
@@ -312,57 +321,93 @@ private static void createAndAddOneSupervisor(
     }
 
     /**
-     * Create supervisors.
+     * Create supervisors for a larger cluster configuration.
      *
-     * @param uniformSupervisors true if all supervisors are of the same size, false otherwise.
-     * @return supervisor details indexed by id
+     * @param reducedSupervisorsPerRack number of supervisors to reduce in rack.
+     * @return created supervisors.
      */
-    private static Map<String, SupervisorDetails> createSupervisors(boolean uniformSupervisors) {
-        Map<String, SupervisorDetails> retVal;
-        if (uniformSupervisors) {
-            int numRacks = 16;
-            int numSupersPerRack = 82;
-            int numPorts = 50; // note: scheduling is slower when components with large cpu/mem leave large percent of workerslots unused
-            int rackStart = 0;
-            int superInRackStart = 1;
-            double cpu = 7200; // %percent
-            double mem = 356_000; // MB
-            Map<String, Double> miscResources = new HashMap<>();
-            miscResources.put("network.resource.units", 100.0);
-
-            return TestUtilsForResourceAwareScheduler.genSupervisorsWithRacks(
-                    numRacks, numSupersPerRack, numPorts, rackStart, superInRackStart, cpu, mem, miscResources);
-
+    private static Map<String, SupervisorDetails> createSupervisors(int reducedSupervisorsPerRack) {
+        if (TEST_CLUSTER_NAME.equals(TEST_CLUSTER_02)) {
+            Collection<SupervisorDistribution> supervisorDistributions = SupervisorDistribution.getSupervisorDistribution02();
+            return createSupervisors(supervisorDistributions, reducedSupervisorsPerRack);
+        } else if (TEST_CLUSTER_NAME.equals(TEST_CLUSTER_03)) {
+            Collection<SupervisorDistribution> supervisorDistributions = SupervisorDistribution.getSupervisorDistribution03();
+            return createSupervisors(supervisorDistributions, reducedSupervisorsPerRack);
         } else {
-            // this non-uniform supervisor distribution closely (but not exactly) mimics a large cluster in use
-            int numSupersPerRack = 82;
-            int numPorts = 50;
-
-            Map<String, SupervisorDetails> retList = new HashMap<>();
+            return createSupervisorsForCluster01(reducedSupervisorsPerRack);
+        }
+    }
 
-            for (int rack = 0 ; rack < 12 ; rack++) {
-                double cpu = 3600; // %percent
-                double mem = 178_000; // MB
-                for (int superInRack = 0; superInRack < numSupersPerRack ; superInRack++) {
-                    createAndAddOneSupervisor(rack, superInRack, cpu - 100 * (superInRack % 2), mem, numPorts, retList);
-                }
+    /**
+     * Create supervisors based on a predefined supervisor distribution modeled after an existing
+     * large cluster in use.
+     *
+     * @param supervisorDistributions supervisor distribution to use.
+     * @param reducedSupervisorsPerRack number of supervisors to reduce per rack.
+     * @return created supervisors.
+     */
+    private static Map<String, SupervisorDetails> createSupervisors(
+        Collection<SupervisorDistribution> supervisorDistributions, int reducedSupervisorsPerRack) {
+        Map<String, Collection<SupervisorDistribution>> byRackId = SupervisorDistribution.mapByRackId(supervisorDistributions);
+        LOG.info("Cluster={}, Designed capacity: {}", TEST_CLUSTER_NAME, SupervisorDistribution.clusterCapacity(supervisorDistributions));
+
+        Map<String, SupervisorDetails> retList = new HashMap<>();
+        Map<String, AtomicInteger> seenRacks = new HashMap<>();
+        byRackId.forEach((rackId, list) -> {
+            int tmpRackSupervisorCnt = list.stream().mapToInt(x -> x.supervisorCnt).sum() - Math.abs(reducedSupervisorsPerRack);
+            if (tmpRackSupervisorCnt > Math.abs(reducedSupervisorsPerRack)) {
+                tmpRackSupervisorCnt -= Math.abs(reducedSupervisorsPerRack);
             }
-            for (int rack = 12 ; rack < 14 ; rack++) {
-                double cpu = 2400; // %percent
-                double mem = 118_100; // MB
-                for (int superInRack = 0; superInRack < numSupersPerRack ; superInRack++) {
-                    createAndAddOneSupervisor(rack, superInRack, cpu - 100 * (superInRack % 2), mem, numPorts, retList);
+            final int adjustedRackSupervisorCnt = tmpRackSupervisorCnt;
+            list.forEach(x -> {
+                int supervisorCnt = x.supervisorCnt;
+                for (int i = 0; i < supervisorCnt ; i++) {

Review comment:
       fixed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] bipinprasad commented on a change in pull request #3378: [STORM-3743] Add new topologies and change TestLargeCluster

Posted by GitBox <gi...@apache.org>.
bipinprasad commented on a change in pull request #3378:
URL: https://github.com/apache/storm/pull/3378#discussion_r580274143



##########
File path: storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestLargeCluster.java
##########
@@ -312,57 +321,93 @@ private static void createAndAddOneSupervisor(
     }
 
     /**
-     * Create supervisors.
+     * Create supervisors for a larger cluster configuration.
      *
-     * @param uniformSupervisors true if all supervisors are of the same size, false otherwise.
-     * @return supervisor details indexed by id
+     * @param reducedSupervisorsPerRack number of supervisors to reduce in rack.
+     * @return created supervisors.
      */
-    private static Map<String, SupervisorDetails> createSupervisors(boolean uniformSupervisors) {
-        Map<String, SupervisorDetails> retVal;
-        if (uniformSupervisors) {
-            int numRacks = 16;
-            int numSupersPerRack = 82;
-            int numPorts = 50; // note: scheduling is slower when components with large cpu/mem leave large percent of workerslots unused
-            int rackStart = 0;
-            int superInRackStart = 1;
-            double cpu = 7200; // %percent
-            double mem = 356_000; // MB
-            Map<String, Double> miscResources = new HashMap<>();
-            miscResources.put("network.resource.units", 100.0);
-
-            return TestUtilsForResourceAwareScheduler.genSupervisorsWithRacks(
-                    numRacks, numSupersPerRack, numPorts, rackStart, superInRackStart, cpu, mem, miscResources);
-
+    private static Map<String, SupervisorDetails> createSupervisors(int reducedSupervisorsPerRack) {
+        if (TEST_CLUSTER_NAME.equals(TEST_CLUSTER_02)) {
+            Collection<SupervisorDistribution> supervisorDistributions = SupervisorDistribution.getSupervisorDistribution02();
+            return createSupervisors(supervisorDistributions, reducedSupervisorsPerRack);
+        } else if (TEST_CLUSTER_NAME.equals(TEST_CLUSTER_03)) {
+            Collection<SupervisorDistribution> supervisorDistributions = SupervisorDistribution.getSupervisorDistribution03();
+            return createSupervisors(supervisorDistributions, reducedSupervisorsPerRack);
         } else {
-            // this non-uniform supervisor distribution closely (but not exactly) mimics a large cluster in use
-            int numSupersPerRack = 82;
-            int numPorts = 50;
-
-            Map<String, SupervisorDetails> retList = new HashMap<>();
+            return createSupervisorsForCluster01(reducedSupervisorsPerRack);

Review comment:
       Yes. I will change this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] bipinprasad commented on a change in pull request #3378: [STORM-3743] Add new topologies and change TestLargeCluster

Posted by GitBox <gi...@apache.org>.
bipinprasad commented on a change in pull request #3378:
URL: https://github.com/apache/storm/pull/3378#discussion_r580274143



##########
File path: storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestLargeCluster.java
##########
@@ -312,57 +321,93 @@ private static void createAndAddOneSupervisor(
     }
 
     /**
-     * Create supervisors.
+     * Create supervisors for a larger cluster configuration.
      *
-     * @param uniformSupervisors true if all supervisors are of the same size, false otherwise.
-     * @return supervisor details indexed by id
+     * @param reducedSupervisorsPerRack number of supervisors to reduce in rack.
+     * @return created supervisors.
      */
-    private static Map<String, SupervisorDetails> createSupervisors(boolean uniformSupervisors) {
-        Map<String, SupervisorDetails> retVal;
-        if (uniformSupervisors) {
-            int numRacks = 16;
-            int numSupersPerRack = 82;
-            int numPorts = 50; // note: scheduling is slower when components with large cpu/mem leave large percent of workerslots unused
-            int rackStart = 0;
-            int superInRackStart = 1;
-            double cpu = 7200; // %percent
-            double mem = 356_000; // MB
-            Map<String, Double> miscResources = new HashMap<>();
-            miscResources.put("network.resource.units", 100.0);
-
-            return TestUtilsForResourceAwareScheduler.genSupervisorsWithRacks(
-                    numRacks, numSupersPerRack, numPorts, rackStart, superInRackStart, cpu, mem, miscResources);
-
+    private static Map<String, SupervisorDetails> createSupervisors(int reducedSupervisorsPerRack) {
+        if (TEST_CLUSTER_NAME.equals(TEST_CLUSTER_02)) {
+            Collection<SupervisorDistribution> supervisorDistributions = SupervisorDistribution.getSupervisorDistribution02();
+            return createSupervisors(supervisorDistributions, reducedSupervisorsPerRack);
+        } else if (TEST_CLUSTER_NAME.equals(TEST_CLUSTER_03)) {
+            Collection<SupervisorDistribution> supervisorDistributions = SupervisorDistribution.getSupervisorDistribution03();
+            return createSupervisors(supervisorDistributions, reducedSupervisorsPerRack);
         } else {
-            // this non-uniform supervisor distribution closely (but not exactly) mimics a large cluster in use
-            int numSupersPerRack = 82;
-            int numPorts = 50;
-
-            Map<String, SupervisorDetails> retList = new HashMap<>();
+            return createSupervisorsForCluster01(reducedSupervisorsPerRack);

Review comment:
       The supervisor distribution can have repeated entries for the same rack number.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] bipinprasad commented on a change in pull request #3378: [STORM-3743] Add new topologies and change TestLargeCluster

Posted by GitBox <gi...@apache.org>.
bipinprasad commented on a change in pull request #3378:
URL: https://github.com/apache/storm/pull/3378#discussion_r580271619



##########
File path: storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestLargeCluster.java
##########
@@ -312,57 +321,93 @@ private static void createAndAddOneSupervisor(
     }
 
     /**
-     * Create supervisors.
+     * Create supervisors for a larger cluster configuration.
      *
-     * @param uniformSupervisors true if all supervisors are of the same size, false otherwise.
-     * @return supervisor details indexed by id
+     * @param reducedSupervisorsPerRack number of supervisors to reduce in rack.
+     * @return created supervisors.
      */
-    private static Map<String, SupervisorDetails> createSupervisors(boolean uniformSupervisors) {
-        Map<String, SupervisorDetails> retVal;
-        if (uniformSupervisors) {
-            int numRacks = 16;
-            int numSupersPerRack = 82;
-            int numPorts = 50; // note: scheduling is slower when components with large cpu/mem leave large percent of workerslots unused
-            int rackStart = 0;
-            int superInRackStart = 1;
-            double cpu = 7200; // %percent
-            double mem = 356_000; // MB
-            Map<String, Double> miscResources = new HashMap<>();
-            miscResources.put("network.resource.units", 100.0);
-
-            return TestUtilsForResourceAwareScheduler.genSupervisorsWithRacks(
-                    numRacks, numSupersPerRack, numPorts, rackStart, superInRackStart, cpu, mem, miscResources);
-
+    private static Map<String, SupervisorDetails> createSupervisors(int reducedSupervisorsPerRack) {

Review comment:
       that variable was needed when testing with cluster01 to force defragmentation. I will review that test.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] bipinprasad commented on a change in pull request #3378: [STORM-3743] Add new topologies and change TestLargeCluster

Posted by GitBox <gi...@apache.org>.
bipinprasad commented on a change in pull request #3378:
URL: https://github.com/apache/storm/pull/3378#discussion_r580272295



##########
File path: storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestLargeCluster.java
##########
@@ -312,57 +321,93 @@ private static void createAndAddOneSupervisor(
     }
 
     /**
-     * Create supervisors.
+     * Create supervisors for a larger cluster configuration.
      *
-     * @param uniformSupervisors true if all supervisors are of the same size, false otherwise.
-     * @return supervisor details indexed by id
+     * @param reducedSupervisorsPerRack number of supervisors to reduce in rack.
+     * @return created supervisors.
      */
-    private static Map<String, SupervisorDetails> createSupervisors(boolean uniformSupervisors) {
-        Map<String, SupervisorDetails> retVal;
-        if (uniformSupervisors) {
-            int numRacks = 16;
-            int numSupersPerRack = 82;
-            int numPorts = 50; // note: scheduling is slower when components with large cpu/mem leave large percent of workerslots unused
-            int rackStart = 0;
-            int superInRackStart = 1;
-            double cpu = 7200; // %percent
-            double mem = 356_000; // MB
-            Map<String, Double> miscResources = new HashMap<>();
-            miscResources.put("network.resource.units", 100.0);
-
-            return TestUtilsForResourceAwareScheduler.genSupervisorsWithRacks(
-                    numRacks, numSupersPerRack, numPorts, rackStart, superInRackStart, cpu, mem, miscResources);
-
+    private static Map<String, SupervisorDetails> createSupervisors(int reducedSupervisorsPerRack) {
+        if (TEST_CLUSTER_NAME.equals(TEST_CLUSTER_02)) {
+            Collection<SupervisorDistribution> supervisorDistributions = SupervisorDistribution.getSupervisorDistribution02();
+            return createSupervisors(supervisorDistributions, reducedSupervisorsPerRack);
+        } else if (TEST_CLUSTER_NAME.equals(TEST_CLUSTER_03)) {
+            Collection<SupervisorDistribution> supervisorDistributions = SupervisorDistribution.getSupervisorDistribution03();
+            return createSupervisors(supervisorDistributions, reducedSupervisorsPerRack);
         } else {
-            // this non-uniform supervisor distribution closely (but not exactly) mimics a large cluster in use
-            int numSupersPerRack = 82;
-            int numPorts = 50;
-
-            Map<String, SupervisorDetails> retList = new HashMap<>();
+            return createSupervisorsForCluster01(reducedSupervisorsPerRack);
+        }
+    }
 
-            for (int rack = 0 ; rack < 12 ; rack++) {
-                double cpu = 3600; // %percent
-                double mem = 178_000; // MB
-                for (int superInRack = 0; superInRack < numSupersPerRack ; superInRack++) {
-                    createAndAddOneSupervisor(rack, superInRack, cpu - 100 * (superInRack % 2), mem, numPorts, retList);
-                }
+    /**
+     * Create supervisors based on a predefined supervisor distribution modeled after an existing
+     * large cluster in use.
+     *
+     * @param supervisorDistributions supervisor distribution to use.
+     * @param reducedSupervisorsPerRack number of supervisors to reduce per rack.
+     * @return created supervisors.
+     */
+    private static Map<String, SupervisorDetails> createSupervisors(
+        Collection<SupervisorDistribution> supervisorDistributions, int reducedSupervisorsPerRack) {
+        Map<String, Collection<SupervisorDistribution>> byRackId = SupervisorDistribution.mapByRackId(supervisorDistributions);
+        LOG.info("Cluster={}, Designed capacity: {}", TEST_CLUSTER_NAME, SupervisorDistribution.clusterCapacity(supervisorDistributions));
+
+        Map<String, SupervisorDetails> retList = new HashMap<>();
+        Map<String, AtomicInteger> seenRacks = new HashMap<>();

Review comment:
       Simplifies the hash incrementing to use a mutable integer.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] Ethanlm commented on a change in pull request #3378: [STORM-3743] Add new topologies and change TestLargeCluster

Posted by GitBox <gi...@apache.org>.
Ethanlm commented on a change in pull request #3378:
URL: https://github.com/apache/storm/pull/3378#discussion_r579992649



##########
File path: storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestTopologyAnonymizerUtils.java
##########
@@ -66,8 +66,10 @@
 public class TestTopologyAnonymizerUtils {
     private static final Logger LOG = LoggerFactory.getLogger(TestTopologyAnonymizerUtils.class);
 
-    private static final String DEFAULT_ORIGINAL_RESOURCES_PATH = "clusterconf/iridiumblue";
-    private static final String DEFAULT_ANONYMIZED_RESOURCES_OUTDIR = "src/test/resources/clusterconf/largeCluster01";
+    // iridiumblue -> largeCluster02 (prior largeCluster01)
+    // ebonyred -> largeCluster03

Review comment:
       These comments don't mean much to other people and can be confusing. Can we delete them?

##########
File path: storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestLargeCluster.java
##########
@@ -62,12 +62,17 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicInteger;
 
 @ExtendWith({NormalizedResourcesExtension.class})
 public class TestLargeCluster {
     private static final Logger LOG = LoggerFactory.getLogger(TestLargeCluster.class);
 
-    public static final String TEST_CLUSTER_NAME = "largeCluster01";
+    public static final String TEST_CLUSTER_01 = "largeCluster01";
+    public static final String TEST_CLUSTER_02 = "largeCluster02";
+    public static final String TEST_CLUSTER_03 = "largeCluster03";
+
+    public static final String TEST_CLUSTER_NAME = TEST_CLUSTER_02;

Review comment:
       Please help me understand why these variables are declared.  Looks like only `TEST_CLUSTER_NAME` is used. Don't we want to test all three clusters?

##########
File path: storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestLargeCluster.java
##########
@@ -312,57 +321,93 @@ private static void createAndAddOneSupervisor(
     }
 
     /**
-     * Create supervisors.
+     * Create supervisors for a larger cluster configuration.
      *
-     * @param uniformSupervisors true if all supervisors are of the same size, false otherwise.
-     * @return supervisor details indexed by id
+     * @param reducedSupervisorsPerRack number of supervisors to reduce in rack.
+     * @return created supervisors.
      */
-    private static Map<String, SupervisorDetails> createSupervisors(boolean uniformSupervisors) {
-        Map<String, SupervisorDetails> retVal;
-        if (uniformSupervisors) {
-            int numRacks = 16;
-            int numSupersPerRack = 82;
-            int numPorts = 50; // note: scheduling is slower when components with large cpu/mem leave large percent of workerslots unused
-            int rackStart = 0;
-            int superInRackStart = 1;
-            double cpu = 7200; // %percent
-            double mem = 356_000; // MB
-            Map<String, Double> miscResources = new HashMap<>();
-            miscResources.put("network.resource.units", 100.0);
-
-            return TestUtilsForResourceAwareScheduler.genSupervisorsWithRacks(
-                    numRacks, numSupersPerRack, numPorts, rackStart, superInRackStart, cpu, mem, miscResources);
-
+    private static Map<String, SupervisorDetails> createSupervisors(int reducedSupervisorsPerRack) {

Review comment:
       `reducedSupervisorsPerRack` is always 0. Can we delete it ?

##########
File path: storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestLargeCluster.java
##########
@@ -312,57 +321,93 @@ private static void createAndAddOneSupervisor(
     }
 
     /**
-     * Create supervisors.
+     * Create supervisors for a larger cluster configuration.
      *
-     * @param uniformSupervisors true if all supervisors are of the same size, false otherwise.
-     * @return supervisor details indexed by id
+     * @param reducedSupervisorsPerRack number of supervisors to reduce in rack.
+     * @return created supervisors.
      */
-    private static Map<String, SupervisorDetails> createSupervisors(boolean uniformSupervisors) {
-        Map<String, SupervisorDetails> retVal;
-        if (uniformSupervisors) {
-            int numRacks = 16;
-            int numSupersPerRack = 82;
-            int numPorts = 50; // note: scheduling is slower when components with large cpu/mem leave large percent of workerslots unused
-            int rackStart = 0;
-            int superInRackStart = 1;
-            double cpu = 7200; // %percent
-            double mem = 356_000; // MB
-            Map<String, Double> miscResources = new HashMap<>();
-            miscResources.put("network.resource.units", 100.0);
-
-            return TestUtilsForResourceAwareScheduler.genSupervisorsWithRacks(
-                    numRacks, numSupersPerRack, numPorts, rackStart, superInRackStart, cpu, mem, miscResources);
-
+    private static Map<String, SupervisorDetails> createSupervisors(int reducedSupervisorsPerRack) {
+        if (TEST_CLUSTER_NAME.equals(TEST_CLUSTER_02)) {
+            Collection<SupervisorDistribution> supervisorDistributions = SupervisorDistribution.getSupervisorDistribution02();
+            return createSupervisors(supervisorDistributions, reducedSupervisorsPerRack);
+        } else if (TEST_CLUSTER_NAME.equals(TEST_CLUSTER_03)) {
+            Collection<SupervisorDistribution> supervisorDistributions = SupervisorDistribution.getSupervisorDistribution03();
+            return createSupervisors(supervisorDistributions, reducedSupervisorsPerRack);
         } else {
-            // this non-uniform supervisor distribution closely (but not exactly) mimics a large cluster in use
-            int numSupersPerRack = 82;
-            int numPorts = 50;
-
-            Map<String, SupervisorDetails> retList = new HashMap<>();
+            return createSupervisorsForCluster01(reducedSupervisorsPerRack);
+        }
+    }
 
-            for (int rack = 0 ; rack < 12 ; rack++) {
-                double cpu = 3600; // %percent
-                double mem = 178_000; // MB
-                for (int superInRack = 0; superInRack < numSupersPerRack ; superInRack++) {
-                    createAndAddOneSupervisor(rack, superInRack, cpu - 100 * (superInRack % 2), mem, numPorts, retList);
-                }
+    /**
+     * Create supervisors based on a predefined supervisor distribution modeled after an existing
+     * large cluster in use.
+     *
+     * @param supervisorDistributions supervisor distribution to use.
+     * @param reducedSupervisorsPerRack number of supervisors to reduce per rack.
+     * @return created supervisors.
+     */
+    private static Map<String, SupervisorDetails> createSupervisors(
+        Collection<SupervisorDistribution> supervisorDistributions, int reducedSupervisorsPerRack) {
+        Map<String, Collection<SupervisorDistribution>> byRackId = SupervisorDistribution.mapByRackId(supervisorDistributions);
+        LOG.info("Cluster={}, Designed capacity: {}", TEST_CLUSTER_NAME, SupervisorDistribution.clusterCapacity(supervisorDistributions));
+
+        Map<String, SupervisorDetails> retList = new HashMap<>();
+        Map<String, AtomicInteger> seenRacks = new HashMap<>();
+        byRackId.forEach((rackId, list) -> {
+            int tmpRackSupervisorCnt = list.stream().mapToInt(x -> x.supervisorCnt).sum() - Math.abs(reducedSupervisorsPerRack);
+            if (tmpRackSupervisorCnt > Math.abs(reducedSupervisorsPerRack)) {
+                tmpRackSupervisorCnt -= Math.abs(reducedSupervisorsPerRack);
             }
-            for (int rack = 12 ; rack < 14 ; rack++) {
-                double cpu = 2400; // %percent
-                double mem = 118_100; // MB
-                for (int superInRack = 0; superInRack < numSupersPerRack ; superInRack++) {
-                    createAndAddOneSupervisor(rack, superInRack, cpu - 100 * (superInRack % 2), mem, numPorts, retList);
+            final int adjustedRackSupervisorCnt = tmpRackSupervisorCnt;
+            list.forEach(x -> {
+                int supervisorCnt = x.supervisorCnt;
+                for (int i = 0; i < supervisorCnt ; i++) {
+                    int superInRack = seenRacks.computeIfAbsent(rackId, z -> new AtomicInteger(-1)).incrementAndGet();
+                    int rackNum = seenRacks.size() - 1;
+                    if (superInRack >= adjustedRackSupervisorCnt) {
+                        continue;
+                    }
+                    createAndAddOneSupervisor(rackNum, superInRack, x.cpuPercent, x.memoryMb, x.slotCnt, retList);

Review comment:
       Why recreate rackNum instead of reusing `rackId` from `SupervisorDistribution`

##########
File path: storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestLargeCluster.java
##########
@@ -312,57 +321,93 @@ private static void createAndAddOneSupervisor(
     }
 
     /**
-     * Create supervisors.
+     * Create supervisors for a larger cluster configuration.
      *
-     * @param uniformSupervisors true if all supervisors are of the same size, false otherwise.
-     * @return supervisor details indexed by id
+     * @param reducedSupervisorsPerRack number of supervisors to reduce in rack.
+     * @return created supervisors.
      */
-    private static Map<String, SupervisorDetails> createSupervisors(boolean uniformSupervisors) {
-        Map<String, SupervisorDetails> retVal;
-        if (uniformSupervisors) {
-            int numRacks = 16;
-            int numSupersPerRack = 82;
-            int numPorts = 50; // note: scheduling is slower when components with large cpu/mem leave large percent of workerslots unused
-            int rackStart = 0;
-            int superInRackStart = 1;
-            double cpu = 7200; // %percent
-            double mem = 356_000; // MB
-            Map<String, Double> miscResources = new HashMap<>();
-            miscResources.put("network.resource.units", 100.0);
-
-            return TestUtilsForResourceAwareScheduler.genSupervisorsWithRacks(
-                    numRacks, numSupersPerRack, numPorts, rackStart, superInRackStart, cpu, mem, miscResources);
-
+    private static Map<String, SupervisorDetails> createSupervisors(int reducedSupervisorsPerRack) {
+        if (TEST_CLUSTER_NAME.equals(TEST_CLUSTER_02)) {
+            Collection<SupervisorDistribution> supervisorDistributions = SupervisorDistribution.getSupervisorDistribution02();
+            return createSupervisors(supervisorDistributions, reducedSupervisorsPerRack);
+        } else if (TEST_CLUSTER_NAME.equals(TEST_CLUSTER_03)) {
+            Collection<SupervisorDistribution> supervisorDistributions = SupervisorDistribution.getSupervisorDistribution03();
+            return createSupervisors(supervisorDistributions, reducedSupervisorsPerRack);
         } else {
-            // this non-uniform supervisor distribution closely (but not exactly) mimics a large cluster in use
-            int numSupersPerRack = 82;
-            int numPorts = 50;
-
-            Map<String, SupervisorDetails> retList = new HashMap<>();
+            return createSupervisorsForCluster01(reducedSupervisorsPerRack);

Review comment:
       Is it possible to replace `createSupervisorsForCluster01` with some method like other clusters? i.e. to have a new method called `SupervisorDistribution.getSupervisorDistribution01();` and used it in the same way like other clusters?

##########
File path: storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestLargeCluster.java
##########
@@ -312,57 +321,93 @@ private static void createAndAddOneSupervisor(
     }
 
     /**
-     * Create supervisors.
+     * Create supervisors for a larger cluster configuration.
      *
-     * @param uniformSupervisors true if all supervisors are of the same size, false otherwise.
-     * @return supervisor details indexed by id
+     * @param reducedSupervisorsPerRack number of supervisors to reduce in rack.
+     * @return created supervisors.
      */
-    private static Map<String, SupervisorDetails> createSupervisors(boolean uniformSupervisors) {
-        Map<String, SupervisorDetails> retVal;
-        if (uniformSupervisors) {
-            int numRacks = 16;
-            int numSupersPerRack = 82;
-            int numPorts = 50; // note: scheduling is slower when components with large cpu/mem leave large percent of workerslots unused
-            int rackStart = 0;
-            int superInRackStart = 1;
-            double cpu = 7200; // %percent
-            double mem = 356_000; // MB
-            Map<String, Double> miscResources = new HashMap<>();
-            miscResources.put("network.resource.units", 100.0);
-
-            return TestUtilsForResourceAwareScheduler.genSupervisorsWithRacks(
-                    numRacks, numSupersPerRack, numPorts, rackStart, superInRackStart, cpu, mem, miscResources);
-
+    private static Map<String, SupervisorDetails> createSupervisors(int reducedSupervisorsPerRack) {
+        if (TEST_CLUSTER_NAME.equals(TEST_CLUSTER_02)) {
+            Collection<SupervisorDistribution> supervisorDistributions = SupervisorDistribution.getSupervisorDistribution02();
+            return createSupervisors(supervisorDistributions, reducedSupervisorsPerRack);
+        } else if (TEST_CLUSTER_NAME.equals(TEST_CLUSTER_03)) {
+            Collection<SupervisorDistribution> supervisorDistributions = SupervisorDistribution.getSupervisorDistribution03();
+            return createSupervisors(supervisorDistributions, reducedSupervisorsPerRack);
         } else {
-            // this non-uniform supervisor distribution closely (but not exactly) mimics a large cluster in use
-            int numSupersPerRack = 82;
-            int numPorts = 50;
-
-            Map<String, SupervisorDetails> retList = new HashMap<>();
+            return createSupervisorsForCluster01(reducedSupervisorsPerRack);
+        }
+    }
 
-            for (int rack = 0 ; rack < 12 ; rack++) {
-                double cpu = 3600; // %percent
-                double mem = 178_000; // MB
-                for (int superInRack = 0; superInRack < numSupersPerRack ; superInRack++) {
-                    createAndAddOneSupervisor(rack, superInRack, cpu - 100 * (superInRack % 2), mem, numPorts, retList);
-                }
+    /**
+     * Create supervisors based on a predefined supervisor distribution modeled after an existing
+     * large cluster in use.
+     *
+     * @param supervisorDistributions supervisor distribution to use.
+     * @param reducedSupervisorsPerRack number of supervisors to reduce per rack.
+     * @return created supervisors.
+     */
+    private static Map<String, SupervisorDetails> createSupervisors(
+        Collection<SupervisorDistribution> supervisorDistributions, int reducedSupervisorsPerRack) {
+        Map<String, Collection<SupervisorDistribution>> byRackId = SupervisorDistribution.mapByRackId(supervisorDistributions);
+        LOG.info("Cluster={}, Designed capacity: {}", TEST_CLUSTER_NAME, SupervisorDistribution.clusterCapacity(supervisorDistributions));
+
+        Map<String, SupervisorDetails> retList = new HashMap<>();
+        Map<String, AtomicInteger> seenRacks = new HashMap<>();
+        byRackId.forEach((rackId, list) -> {
+            int tmpRackSupervisorCnt = list.stream().mapToInt(x -> x.supervisorCnt).sum() - Math.abs(reducedSupervisorsPerRack);
+            if (tmpRackSupervisorCnt > Math.abs(reducedSupervisorsPerRack)) {

Review comment:
       Why does it delete `reducedSupervisorsPerRack` again?

##########
File path: storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestLargeCluster.java
##########
@@ -312,57 +321,93 @@ private static void createAndAddOneSupervisor(
     }
 
     /**
-     * Create supervisors.
+     * Create supervisors for a larger cluster configuration.
      *
-     * @param uniformSupervisors true if all supervisors are of the same size, false otherwise.
-     * @return supervisor details indexed by id
+     * @param reducedSupervisorsPerRack number of supervisors to reduce in rack.
+     * @return created supervisors.
      */
-    private static Map<String, SupervisorDetails> createSupervisors(boolean uniformSupervisors) {
-        Map<String, SupervisorDetails> retVal;
-        if (uniformSupervisors) {
-            int numRacks = 16;
-            int numSupersPerRack = 82;
-            int numPorts = 50; // note: scheduling is slower when components with large cpu/mem leave large percent of workerslots unused
-            int rackStart = 0;
-            int superInRackStart = 1;
-            double cpu = 7200; // %percent
-            double mem = 356_000; // MB
-            Map<String, Double> miscResources = new HashMap<>();
-            miscResources.put("network.resource.units", 100.0);
-
-            return TestUtilsForResourceAwareScheduler.genSupervisorsWithRacks(
-                    numRacks, numSupersPerRack, numPorts, rackStart, superInRackStart, cpu, mem, miscResources);
-
+    private static Map<String, SupervisorDetails> createSupervisors(int reducedSupervisorsPerRack) {
+        if (TEST_CLUSTER_NAME.equals(TEST_CLUSTER_02)) {
+            Collection<SupervisorDistribution> supervisorDistributions = SupervisorDistribution.getSupervisorDistribution02();
+            return createSupervisors(supervisorDistributions, reducedSupervisorsPerRack);
+        } else if (TEST_CLUSTER_NAME.equals(TEST_CLUSTER_03)) {
+            Collection<SupervisorDistribution> supervisorDistributions = SupervisorDistribution.getSupervisorDistribution03();
+            return createSupervisors(supervisorDistributions, reducedSupervisorsPerRack);
         } else {
-            // this non-uniform supervisor distribution closely (but not exactly) mimics a large cluster in use
-            int numSupersPerRack = 82;
-            int numPorts = 50;
-
-            Map<String, SupervisorDetails> retList = new HashMap<>();
+            return createSupervisorsForCluster01(reducedSupervisorsPerRack);
+        }
+    }
 
-            for (int rack = 0 ; rack < 12 ; rack++) {
-                double cpu = 3600; // %percent
-                double mem = 178_000; // MB
-                for (int superInRack = 0; superInRack < numSupersPerRack ; superInRack++) {
-                    createAndAddOneSupervisor(rack, superInRack, cpu - 100 * (superInRack % 2), mem, numPorts, retList);
-                }
+    /**
+     * Create supervisors based on a predefined supervisor distribution modeled after an existing
+     * large cluster in use.
+     *
+     * @param supervisorDistributions supervisor distribution to use.
+     * @param reducedSupervisorsPerRack number of supervisors to reduce per rack.
+     * @return created supervisors.
+     */
+    private static Map<String, SupervisorDetails> createSupervisors(
+        Collection<SupervisorDistribution> supervisorDistributions, int reducedSupervisorsPerRack) {
+        Map<String, Collection<SupervisorDistribution>> byRackId = SupervisorDistribution.mapByRackId(supervisorDistributions);
+        LOG.info("Cluster={}, Designed capacity: {}", TEST_CLUSTER_NAME, SupervisorDistribution.clusterCapacity(supervisorDistributions));
+
+        Map<String, SupervisorDetails> retList = new HashMap<>();
+        Map<String, AtomicInteger> seenRacks = new HashMap<>();

Review comment:
       Why is `AtomicInteger` necessary?

##########
File path: storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestLargeCluster.java
##########
@@ -312,57 +321,93 @@ private static void createAndAddOneSupervisor(
     }
 
     /**
-     * Create supervisors.
+     * Create supervisors for a larger cluster configuration.
      *
-     * @param uniformSupervisors true if all supervisors are of the same size, false otherwise.
-     * @return supervisor details indexed by id
+     * @param reducedSupervisorsPerRack number of supervisors to reduce in rack.
+     * @return created supervisors.
      */
-    private static Map<String, SupervisorDetails> createSupervisors(boolean uniformSupervisors) {
-        Map<String, SupervisorDetails> retVal;
-        if (uniformSupervisors) {
-            int numRacks = 16;
-            int numSupersPerRack = 82;
-            int numPorts = 50; // note: scheduling is slower when components with large cpu/mem leave large percent of workerslots unused
-            int rackStart = 0;
-            int superInRackStart = 1;
-            double cpu = 7200; // %percent
-            double mem = 356_000; // MB
-            Map<String, Double> miscResources = new HashMap<>();
-            miscResources.put("network.resource.units", 100.0);
-
-            return TestUtilsForResourceAwareScheduler.genSupervisorsWithRacks(
-                    numRacks, numSupersPerRack, numPorts, rackStart, superInRackStart, cpu, mem, miscResources);
-
+    private static Map<String, SupervisorDetails> createSupervisors(int reducedSupervisorsPerRack) {
+        if (TEST_CLUSTER_NAME.equals(TEST_CLUSTER_02)) {
+            Collection<SupervisorDistribution> supervisorDistributions = SupervisorDistribution.getSupervisorDistribution02();
+            return createSupervisors(supervisorDistributions, reducedSupervisorsPerRack);
+        } else if (TEST_CLUSTER_NAME.equals(TEST_CLUSTER_03)) {
+            Collection<SupervisorDistribution> supervisorDistributions = SupervisorDistribution.getSupervisorDistribution03();
+            return createSupervisors(supervisorDistributions, reducedSupervisorsPerRack);
         } else {
-            // this non-uniform supervisor distribution closely (but not exactly) mimics a large cluster in use
-            int numSupersPerRack = 82;
-            int numPorts = 50;
-
-            Map<String, SupervisorDetails> retList = new HashMap<>();
+            return createSupervisorsForCluster01(reducedSupervisorsPerRack);
+        }
+    }
 
-            for (int rack = 0 ; rack < 12 ; rack++) {
-                double cpu = 3600; // %percent
-                double mem = 178_000; // MB
-                for (int superInRack = 0; superInRack < numSupersPerRack ; superInRack++) {
-                    createAndAddOneSupervisor(rack, superInRack, cpu - 100 * (superInRack % 2), mem, numPorts, retList);
-                }
+    /**
+     * Create supervisors based on a predefined supervisor distribution modeled after an existing
+     * large cluster in use.
+     *
+     * @param supervisorDistributions supervisor distribution to use.
+     * @param reducedSupervisorsPerRack number of supervisors to reduce per rack.
+     * @return created supervisors.
+     */
+    private static Map<String, SupervisorDetails> createSupervisors(
+        Collection<SupervisorDistribution> supervisorDistributions, int reducedSupervisorsPerRack) {
+        Map<String, Collection<SupervisorDistribution>> byRackId = SupervisorDistribution.mapByRackId(supervisorDistributions);
+        LOG.info("Cluster={}, Designed capacity: {}", TEST_CLUSTER_NAME, SupervisorDistribution.clusterCapacity(supervisorDistributions));
+
+        Map<String, SupervisorDetails> retList = new HashMap<>();
+        Map<String, AtomicInteger> seenRacks = new HashMap<>();
+        byRackId.forEach((rackId, list) -> {
+            int tmpRackSupervisorCnt = list.stream().mapToInt(x -> x.supervisorCnt).sum() - Math.abs(reducedSupervisorsPerRack);
+            if (tmpRackSupervisorCnt > Math.abs(reducedSupervisorsPerRack)) {
+                tmpRackSupervisorCnt -= Math.abs(reducedSupervisorsPerRack);
             }
-            for (int rack = 12 ; rack < 14 ; rack++) {
-                double cpu = 2400; // %percent
-                double mem = 118_100; // MB
-                for (int superInRack = 0; superInRack < numSupersPerRack ; superInRack++) {
-                    createAndAddOneSupervisor(rack, superInRack, cpu - 100 * (superInRack % 2), mem, numPorts, retList);
+            final int adjustedRackSupervisorCnt = tmpRackSupervisorCnt;
+            list.forEach(x -> {
+                int supervisorCnt = x.supervisorCnt;
+                for (int i = 0; i < supervisorCnt ; i++) {

Review comment:
       nit: delete space after `supervisorCnt`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] bipinprasad commented on a change in pull request #3378: [STORM-3743] Add new topologies and change TestLargeCluster

Posted by GitBox <gi...@apache.org>.
bipinprasad commented on a change in pull request #3378:
URL: https://github.com/apache/storm/pull/3378#discussion_r581927316



##########
File path: storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestLargeCluster.java
##########
@@ -312,57 +320,87 @@ private static void createAndAddOneSupervisor(
     }
 
     /**
-     * Create supervisors.
+     * Create supervisors for a larger cluster configuration.
      *
-     * @param uniformSupervisors true if all supervisors are of the same size, false otherwise.
-     * @return supervisor details indexed by id
+     * @param reducedSupervisorsPerRack number of supervisors to reduce in rack.
+     * @return created supervisors.
      */
-    private static Map<String, SupervisorDetails> createSupervisors(boolean uniformSupervisors) {
-        Map<String, SupervisorDetails> retVal;
-        if (uniformSupervisors) {
-            int numRacks = 16;
-            int numSupersPerRack = 82;
-            int numPorts = 50; // note: scheduling is slower when components with large cpu/mem leave large percent of workerslots unused
-            int rackStart = 0;
-            int superInRackStart = 1;
-            double cpu = 7200; // %percent
-            double mem = 356_000; // MB
-            Map<String, Double> miscResources = new HashMap<>();
-            miscResources.put("network.resource.units", 100.0);
-
-            return TestUtilsForResourceAwareScheduler.genSupervisorsWithRacks(
-                    numRacks, numSupersPerRack, numPorts, rackStart, superInRackStart, cpu, mem, miscResources);
-
+    private static Map<String, SupervisorDetails> createSupervisors(int reducedSupervisorsPerRack) {
+        if (TEST_CLUSTER_NAME.equals(TEST_CLUSTER_02)) {
+            return createSupervisorsForCluster02(reducedSupervisorsPerRack);
         } else {
-            // this non-uniform supervisor distribution closely (but not exactly) mimics a large cluster in use
-            int numSupersPerRack = 82;
-            int numPorts = 50;
-
-            Map<String, SupervisorDetails> retList = new HashMap<>();
+            return createSupervisorsForCluster01(reducedSupervisorsPerRack);
+        }
+    }
 
-            for (int rack = 0 ; rack < 12 ; rack++) {
-                double cpu = 3600; // %percent
-                double mem = 178_000; // MB
-                for (int superInRack = 0; superInRack < numSupersPerRack ; superInRack++) {
-                    createAndAddOneSupervisor(rack, superInRack, cpu - 100 * (superInRack % 2), mem, numPorts, retList);
-                }
+    /**

Review comment:
       Created new Jira for documentation: https://issues.apache.org/jira/browse/STORM-3746 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] bipinprasad commented on a change in pull request #3378: [STORM-3743] Add new topologies and change TestLargeCluster

Posted by GitBox <gi...@apache.org>.
bipinprasad commented on a change in pull request #3378:
URL: https://github.com/apache/storm/pull/3378#discussion_r580737489



##########
File path: storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestLargeCluster.java
##########
@@ -312,57 +321,93 @@ private static void createAndAddOneSupervisor(
     }
 
     /**
-     * Create supervisors.
+     * Create supervisors for a larger cluster configuration.
      *
-     * @param uniformSupervisors true if all supervisors are of the same size, false otherwise.
-     * @return supervisor details indexed by id
+     * @param reducedSupervisorsPerRack number of supervisors to reduce in rack.
+     * @return created supervisors.
      */
-    private static Map<String, SupervisorDetails> createSupervisors(boolean uniformSupervisors) {
-        Map<String, SupervisorDetails> retVal;
-        if (uniformSupervisors) {
-            int numRacks = 16;
-            int numSupersPerRack = 82;
-            int numPorts = 50; // note: scheduling is slower when components with large cpu/mem leave large percent of workerslots unused
-            int rackStart = 0;
-            int superInRackStart = 1;
-            double cpu = 7200; // %percent
-            double mem = 356_000; // MB
-            Map<String, Double> miscResources = new HashMap<>();
-            miscResources.put("network.resource.units", 100.0);
-
-            return TestUtilsForResourceAwareScheduler.genSupervisorsWithRacks(
-                    numRacks, numSupersPerRack, numPorts, rackStart, superInRackStart, cpu, mem, miscResources);
-
+    private static Map<String, SupervisorDetails> createSupervisors(int reducedSupervisorsPerRack) {

Review comment:
       defrag test will follow shortly - but is not in this commit. It will force a reduction in Cluster using reducedSupervisorsPerRack.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] agresch commented on a change in pull request #3378: [STORM-3743] Add new topologies and change TestLargeCluster

Posted by GitBox <gi...@apache.org>.
agresch commented on a change in pull request #3378:
URL: https://github.com/apache/storm/pull/3378#discussion_r578623531



##########
File path: storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestLargeCluster.java
##########
@@ -312,57 +320,87 @@ private static void createAndAddOneSupervisor(
     }
 
     /**
-     * Create supervisors.
+     * Create supervisors for a larger cluster configuration.
      *
-     * @param uniformSupervisors true if all supervisors are of the same size, false otherwise.
-     * @return supervisor details indexed by id
+     * @param reducedSupervisorsPerRack number of supervisors to reduce in rack.
+     * @return created supervisors.
      */
-    private static Map<String, SupervisorDetails> createSupervisors(boolean uniformSupervisors) {
-        Map<String, SupervisorDetails> retVal;
-        if (uniformSupervisors) {
-            int numRacks = 16;
-            int numSupersPerRack = 82;
-            int numPorts = 50; // note: scheduling is slower when components with large cpu/mem leave large percent of workerslots unused
-            int rackStart = 0;
-            int superInRackStart = 1;
-            double cpu = 7200; // %percent
-            double mem = 356_000; // MB
-            Map<String, Double> miscResources = new HashMap<>();
-            miscResources.put("network.resource.units", 100.0);
-
-            return TestUtilsForResourceAwareScheduler.genSupervisorsWithRacks(
-                    numRacks, numSupersPerRack, numPorts, rackStart, superInRackStart, cpu, mem, miscResources);
-
+    private static Map<String, SupervisorDetails> createSupervisors(int reducedSupervisorsPerRack) {
+        if (TEST_CLUSTER_NAME.equals(TEST_CLUSTER_02)) {
+            return createSupervisorsForCluster02(reducedSupervisorsPerRack);
         } else {
-            // this non-uniform supervisor distribution closely (but not exactly) mimics a large cluster in use
-            int numSupersPerRack = 82;
-            int numPorts = 50;
-
-            Map<String, SupervisorDetails> retList = new HashMap<>();
+            return createSupervisorsForCluster01(reducedSupervisorsPerRack);
+        }
+    }
 
-            for (int rack = 0 ; rack < 12 ; rack++) {
-                double cpu = 3600; // %percent
-                double mem = 178_000; // MB
-                for (int superInRack = 0; superInRack < numSupersPerRack ; superInRack++) {
-                    createAndAddOneSupervisor(rack, superInRack, cpu - 100 * (superInRack % 2), mem, numPorts, retList);
-                }
+    /**

Review comment:
       Any documentation to allow other naive users like myself grab and anonymize these files and setup a test cluster would be very helpful.  Could be a follow on JIRA.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] bipinprasad commented on a change in pull request #3378: [STORM-3743] Add new topologies and change TestLargeCluster

Posted by GitBox <gi...@apache.org>.
bipinprasad commented on a change in pull request #3378:
URL: https://github.com/apache/storm/pull/3378#discussion_r580274718



##########
File path: storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestLargeCluster.java
##########
@@ -312,57 +321,93 @@ private static void createAndAddOneSupervisor(
     }
 
     /**
-     * Create supervisors.
+     * Create supervisors for a larger cluster configuration.
      *
-     * @param uniformSupervisors true if all supervisors are of the same size, false otherwise.
-     * @return supervisor details indexed by id
+     * @param reducedSupervisorsPerRack number of supervisors to reduce in rack.
+     * @return created supervisors.
      */
-    private static Map<String, SupervisorDetails> createSupervisors(boolean uniformSupervisors) {
-        Map<String, SupervisorDetails> retVal;
-        if (uniformSupervisors) {
-            int numRacks = 16;
-            int numSupersPerRack = 82;
-            int numPorts = 50; // note: scheduling is slower when components with large cpu/mem leave large percent of workerslots unused
-            int rackStart = 0;
-            int superInRackStart = 1;
-            double cpu = 7200; // %percent
-            double mem = 356_000; // MB
-            Map<String, Double> miscResources = new HashMap<>();
-            miscResources.put("network.resource.units", 100.0);
-
-            return TestUtilsForResourceAwareScheduler.genSupervisorsWithRacks(
-                    numRacks, numSupersPerRack, numPorts, rackStart, superInRackStart, cpu, mem, miscResources);
-
+    private static Map<String, SupervisorDetails> createSupervisors(int reducedSupervisorsPerRack) {
+        if (TEST_CLUSTER_NAME.equals(TEST_CLUSTER_02)) {
+            Collection<SupervisorDistribution> supervisorDistributions = SupervisorDistribution.getSupervisorDistribution02();
+            return createSupervisors(supervisorDistributions, reducedSupervisorsPerRack);
+        } else if (TEST_CLUSTER_NAME.equals(TEST_CLUSTER_03)) {
+            Collection<SupervisorDistribution> supervisorDistributions = SupervisorDistribution.getSupervisorDistribution03();
+            return createSupervisors(supervisorDistributions, reducedSupervisorsPerRack);
         } else {
-            // this non-uniform supervisor distribution closely (but not exactly) mimics a large cluster in use
-            int numSupersPerRack = 82;
-            int numPorts = 50;
-
-            Map<String, SupervisorDetails> retList = new HashMap<>();
+            return createSupervisorsForCluster01(reducedSupervisorsPerRack);
+        }
+    }
 
-            for (int rack = 0 ; rack < 12 ; rack++) {
-                double cpu = 3600; // %percent
-                double mem = 178_000; // MB
-                for (int superInRack = 0; superInRack < numSupersPerRack ; superInRack++) {
-                    createAndAddOneSupervisor(rack, superInRack, cpu - 100 * (superInRack % 2), mem, numPorts, retList);
-                }
+    /**
+     * Create supervisors based on a predefined supervisor distribution modeled after an existing
+     * large cluster in use.
+     *
+     * @param supervisorDistributions supervisor distribution to use.
+     * @param reducedSupervisorsPerRack number of supervisors to reduce per rack.
+     * @return created supervisors.
+     */
+    private static Map<String, SupervisorDetails> createSupervisors(
+        Collection<SupervisorDistribution> supervisorDistributions, int reducedSupervisorsPerRack) {
+        Map<String, Collection<SupervisorDistribution>> byRackId = SupervisorDistribution.mapByRackId(supervisorDistributions);
+        LOG.info("Cluster={}, Designed capacity: {}", TEST_CLUSTER_NAME, SupervisorDistribution.clusterCapacity(supervisorDistributions));
+
+        Map<String, SupervisorDetails> retList = new HashMap<>();
+        Map<String, AtomicInteger> seenRacks = new HashMap<>();
+        byRackId.forEach((rackId, list) -> {
+            int tmpRackSupervisorCnt = list.stream().mapToInt(x -> x.supervisorCnt).sum() - Math.abs(reducedSupervisorsPerRack);
+            if (tmpRackSupervisorCnt > Math.abs(reducedSupervisorsPerRack)) {
+                tmpRackSupervisorCnt -= Math.abs(reducedSupervisorsPerRack);
             }
-            for (int rack = 12 ; rack < 14 ; rack++) {
-                double cpu = 2400; // %percent
-                double mem = 118_100; // MB
-                for (int superInRack = 0; superInRack < numSupersPerRack ; superInRack++) {
-                    createAndAddOneSupervisor(rack, superInRack, cpu - 100 * (superInRack % 2), mem, numPorts, retList);
+            final int adjustedRackSupervisorCnt = tmpRackSupervisorCnt;
+            list.forEach(x -> {
+                int supervisorCnt = x.supervisorCnt;
+                for (int i = 0; i < supervisorCnt ; i++) {
+                    int superInRack = seenRacks.computeIfAbsent(rackId, z -> new AtomicInteger(-1)).incrementAndGet();
+                    int rackNum = seenRacks.size() - 1;
+                    if (superInRack >= adjustedRackSupervisorCnt) {
+                        continue;
+                    }
+                    createAndAddOneSupervisor(rackNum, superInRack, x.cpuPercent, x.memoryMb, x.slotCnt, retList);

Review comment:
       The supervisor distribution can have repeated entries for the same rack id (string). Supervisor id is generated from racknum (int) and has to be in a fixed predefined pattern that can be reliably parsed to generate host -> rack mapping (used in Cluster DNS mapping).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] bipinprasad commented on a change in pull request #3378: [STORM-3743] Add new topologies and change TestLargeCluster

Posted by GitBox <gi...@apache.org>.
bipinprasad commented on a change in pull request #3378:
URL: https://github.com/apache/storm/pull/3378#discussion_r580274718



##########
File path: storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestLargeCluster.java
##########
@@ -312,57 +321,93 @@ private static void createAndAddOneSupervisor(
     }
 
     /**
-     * Create supervisors.
+     * Create supervisors for a larger cluster configuration.
      *
-     * @param uniformSupervisors true if all supervisors are of the same size, false otherwise.
-     * @return supervisor details indexed by id
+     * @param reducedSupervisorsPerRack number of supervisors to reduce in rack.
+     * @return created supervisors.
      */
-    private static Map<String, SupervisorDetails> createSupervisors(boolean uniformSupervisors) {
-        Map<String, SupervisorDetails> retVal;
-        if (uniformSupervisors) {
-            int numRacks = 16;
-            int numSupersPerRack = 82;
-            int numPorts = 50; // note: scheduling is slower when components with large cpu/mem leave large percent of workerslots unused
-            int rackStart = 0;
-            int superInRackStart = 1;
-            double cpu = 7200; // %percent
-            double mem = 356_000; // MB
-            Map<String, Double> miscResources = new HashMap<>();
-            miscResources.put("network.resource.units", 100.0);
-
-            return TestUtilsForResourceAwareScheduler.genSupervisorsWithRacks(
-                    numRacks, numSupersPerRack, numPorts, rackStart, superInRackStart, cpu, mem, miscResources);
-
+    private static Map<String, SupervisorDetails> createSupervisors(int reducedSupervisorsPerRack) {
+        if (TEST_CLUSTER_NAME.equals(TEST_CLUSTER_02)) {
+            Collection<SupervisorDistribution> supervisorDistributions = SupervisorDistribution.getSupervisorDistribution02();
+            return createSupervisors(supervisorDistributions, reducedSupervisorsPerRack);
+        } else if (TEST_CLUSTER_NAME.equals(TEST_CLUSTER_03)) {
+            Collection<SupervisorDistribution> supervisorDistributions = SupervisorDistribution.getSupervisorDistribution03();
+            return createSupervisors(supervisorDistributions, reducedSupervisorsPerRack);
         } else {
-            // this non-uniform supervisor distribution closely (but not exactly) mimics a large cluster in use
-            int numSupersPerRack = 82;
-            int numPorts = 50;
-
-            Map<String, SupervisorDetails> retList = new HashMap<>();
+            return createSupervisorsForCluster01(reducedSupervisorsPerRack);
+        }
+    }
 
-            for (int rack = 0 ; rack < 12 ; rack++) {
-                double cpu = 3600; // %percent
-                double mem = 178_000; // MB
-                for (int superInRack = 0; superInRack < numSupersPerRack ; superInRack++) {
-                    createAndAddOneSupervisor(rack, superInRack, cpu - 100 * (superInRack % 2), mem, numPorts, retList);
-                }
+    /**
+     * Create supervisors based on a predefined supervisor distribution modeled after an existing
+     * large cluster in use.
+     *
+     * @param supervisorDistributions supervisor distribution to use.
+     * @param reducedSupervisorsPerRack number of supervisors to reduce per rack.
+     * @return created supervisors.
+     */
+    private static Map<String, SupervisorDetails> createSupervisors(
+        Collection<SupervisorDistribution> supervisorDistributions, int reducedSupervisorsPerRack) {
+        Map<String, Collection<SupervisorDistribution>> byRackId = SupervisorDistribution.mapByRackId(supervisorDistributions);
+        LOG.info("Cluster={}, Designed capacity: {}", TEST_CLUSTER_NAME, SupervisorDistribution.clusterCapacity(supervisorDistributions));
+
+        Map<String, SupervisorDetails> retList = new HashMap<>();
+        Map<String, AtomicInteger> seenRacks = new HashMap<>();
+        byRackId.forEach((rackId, list) -> {
+            int tmpRackSupervisorCnt = list.stream().mapToInt(x -> x.supervisorCnt).sum() - Math.abs(reducedSupervisorsPerRack);
+            if (tmpRackSupervisorCnt > Math.abs(reducedSupervisorsPerRack)) {
+                tmpRackSupervisorCnt -= Math.abs(reducedSupervisorsPerRack);
             }
-            for (int rack = 12 ; rack < 14 ; rack++) {
-                double cpu = 2400; // %percent
-                double mem = 118_100; // MB
-                for (int superInRack = 0; superInRack < numSupersPerRack ; superInRack++) {
-                    createAndAddOneSupervisor(rack, superInRack, cpu - 100 * (superInRack % 2), mem, numPorts, retList);
+            final int adjustedRackSupervisorCnt = tmpRackSupervisorCnt;
+            list.forEach(x -> {
+                int supervisorCnt = x.supervisorCnt;
+                for (int i = 0; i < supervisorCnt ; i++) {
+                    int superInRack = seenRacks.computeIfAbsent(rackId, z -> new AtomicInteger(-1)).incrementAndGet();
+                    int rackNum = seenRacks.size() - 1;
+                    if (superInRack >= adjustedRackSupervisorCnt) {
+                        continue;
+                    }
+                    createAndAddOneSupervisor(rackNum, superInRack, x.cpuPercent, x.memoryMb, x.slotCnt, retList);

Review comment:
       The supervisor distribution can have repeated entries for the same rack number.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] bipinprasad commented on a change in pull request #3378: [STORM-3743] Add new topologies and change TestLargeCluster

Posted by GitBox <gi...@apache.org>.
bipinprasad commented on a change in pull request #3378:
URL: https://github.com/apache/storm/pull/3378#discussion_r580269417



##########
File path: storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestLargeCluster.java
##########
@@ -62,12 +62,17 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicInteger;
 
 @ExtendWith({NormalizedResourcesExtension.class})
 public class TestLargeCluster {
     private static final Logger LOG = LoggerFactory.getLogger(TestLargeCluster.class);
 
-    public static final String TEST_CLUSTER_NAME = "largeCluster01";
+    public static final String TEST_CLUSTER_01 = "largeCluster01";
+    public static final String TEST_CLUSTER_02 = "largeCluster02";
+    public static final String TEST_CLUSTER_03 = "largeCluster03";
+
+    public static final String TEST_CLUSTER_NAME = TEST_CLUSTER_02;

Review comment:
       Earlier test was structured to test only one large cluster - since the cluster01 and cluster02 takes about two minutes each. Testing all will make this code easier to understand - especially with the supervisors being created in uniform manner as per your suggestion below.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] bipinprasad commented on a change in pull request #3378: [STORM-3743] Add new topologies and change TestLargeCluster

Posted by GitBox <gi...@apache.org>.
bipinprasad commented on a change in pull request #3378:
URL: https://github.com/apache/storm/pull/3378#discussion_r578757783



##########
File path: storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestLargeCluster.java
##########
@@ -312,57 +320,87 @@ private static void createAndAddOneSupervisor(
     }
 
     /**
-     * Create supervisors.
+     * Create supervisors for a larger cluster configuration.
      *
-     * @param uniformSupervisors true if all supervisors are of the same size, false otherwise.
-     * @return supervisor details indexed by id
+     * @param reducedSupervisorsPerRack number of supervisors to reduce in rack.
+     * @return created supervisors.
      */
-    private static Map<String, SupervisorDetails> createSupervisors(boolean uniformSupervisors) {
-        Map<String, SupervisorDetails> retVal;
-        if (uniformSupervisors) {
-            int numRacks = 16;
-            int numSupersPerRack = 82;
-            int numPorts = 50; // note: scheduling is slower when components with large cpu/mem leave large percent of workerslots unused
-            int rackStart = 0;
-            int superInRackStart = 1;
-            double cpu = 7200; // %percent
-            double mem = 356_000; // MB
-            Map<String, Double> miscResources = new HashMap<>();
-            miscResources.put("network.resource.units", 100.0);
-
-            return TestUtilsForResourceAwareScheduler.genSupervisorsWithRacks(
-                    numRacks, numSupersPerRack, numPorts, rackStart, superInRackStart, cpu, mem, miscResources);
-
+    private static Map<String, SupervisorDetails> createSupervisors(int reducedSupervisorsPerRack) {
+        if (TEST_CLUSTER_NAME.equals(TEST_CLUSTER_02)) {
+            return createSupervisorsForCluster02(reducedSupervisorsPerRack);
         } else {
-            // this non-uniform supervisor distribution closely (but not exactly) mimics a large cluster in use
-            int numSupersPerRack = 82;
-            int numPorts = 50;
-
-            Map<String, SupervisorDetails> retList = new HashMap<>();
+            return createSupervisorsForCluster01(reducedSupervisorsPerRack);
+        }
+    }
 
-            for (int rack = 0 ; rack < 12 ; rack++) {
-                double cpu = 3600; // %percent
-                double mem = 178_000; // MB
-                for (int superInRack = 0; superInRack < numSupersPerRack ; superInRack++) {
-                    createAndAddOneSupervisor(rack, superInRack, cpu - 100 * (superInRack % 2), mem, numPorts, retList);
-                }
+    /**

Review comment:
       Yes. There is another class TestTopologyAnonymizerUtils : documentation on how to do it is in the class:
      - grab code conf serialized files (using storm blobstore cat <x> -f <x>)
      - save this in a new directory storm-server/src/test/resources/clusterconf/<newdir
      - change the source and target (DEFAULT_ORIGINAL_RESOURCES_PATH and DEFAULT_ANONYMIZED_RESOURCES_OUTDIR)
      - run the test "testAnonymizer" (after uncommenting the @Test - mvn test -Dtest=TestTopologyAnonymizerUtils) 
   
   Alternatively:
       After saving the topology files, run the TestTopologyAnonymizerUtils with two arguments. 
   
   I find the first option easier, since I can do everything within the Intellij




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] bipinprasad commented on a change in pull request #3378: [STORM-3743] Add new topologies and change TestLargeCluster

Posted by GitBox <gi...@apache.org>.
bipinprasad commented on a change in pull request #3378:
URL: https://github.com/apache/storm/pull/3378#discussion_r583374023



##########
File path: storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestLargeCluster.java
##########
@@ -312,57 +321,93 @@ private static void createAndAddOneSupervisor(
     }
 
     /**
-     * Create supervisors.
+     * Create supervisors for a larger cluster configuration.
      *
-     * @param uniformSupervisors true if all supervisors are of the same size, false otherwise.
-     * @return supervisor details indexed by id
+     * @param reducedSupervisorsPerRack number of supervisors to reduce in rack.
+     * @return created supervisors.
      */
-    private static Map<String, SupervisorDetails> createSupervisors(boolean uniformSupervisors) {
-        Map<String, SupervisorDetails> retVal;
-        if (uniformSupervisors) {
-            int numRacks = 16;
-            int numSupersPerRack = 82;
-            int numPorts = 50; // note: scheduling is slower when components with large cpu/mem leave large percent of workerslots unused
-            int rackStart = 0;
-            int superInRackStart = 1;
-            double cpu = 7200; // %percent
-            double mem = 356_000; // MB
-            Map<String, Double> miscResources = new HashMap<>();
-            miscResources.put("network.resource.units", 100.0);
-
-            return TestUtilsForResourceAwareScheduler.genSupervisorsWithRacks(
-                    numRacks, numSupersPerRack, numPorts, rackStart, superInRackStart, cpu, mem, miscResources);
-
+    private static Map<String, SupervisorDetails> createSupervisors(int reducedSupervisorsPerRack) {
+        if (TEST_CLUSTER_NAME.equals(TEST_CLUSTER_02)) {
+            Collection<SupervisorDistribution> supervisorDistributions = SupervisorDistribution.getSupervisorDistribution02();
+            return createSupervisors(supervisorDistributions, reducedSupervisorsPerRack);
+        } else if (TEST_CLUSTER_NAME.equals(TEST_CLUSTER_03)) {
+            Collection<SupervisorDistribution> supervisorDistributions = SupervisorDistribution.getSupervisorDistribution03();
+            return createSupervisors(supervisorDistributions, reducedSupervisorsPerRack);
         } else {
-            // this non-uniform supervisor distribution closely (but not exactly) mimics a large cluster in use
-            int numSupersPerRack = 82;
-            int numPorts = 50;
-
-            Map<String, SupervisorDetails> retList = new HashMap<>();
+            return createSupervisorsForCluster01(reducedSupervisorsPerRack);
+        }
+    }
 
-            for (int rack = 0 ; rack < 12 ; rack++) {
-                double cpu = 3600; // %percent
-                double mem = 178_000; // MB
-                for (int superInRack = 0; superInRack < numSupersPerRack ; superInRack++) {
-                    createAndAddOneSupervisor(rack, superInRack, cpu - 100 * (superInRack % 2), mem, numPorts, retList);
-                }
+    /**
+     * Create supervisors based on a predefined supervisor distribution modeled after an existing
+     * large cluster in use.
+     *
+     * @param supervisorDistributions supervisor distribution to use.
+     * @param reducedSupervisorsPerRack number of supervisors to reduce per rack.
+     * @return created supervisors.
+     */
+    private static Map<String, SupervisorDetails> createSupervisors(
+        Collection<SupervisorDistribution> supervisorDistributions, int reducedSupervisorsPerRack) {
+        Map<String, Collection<SupervisorDistribution>> byRackId = SupervisorDistribution.mapByRackId(supervisorDistributions);
+        LOG.info("Cluster={}, Designed capacity: {}", TEST_CLUSTER_NAME, SupervisorDistribution.clusterCapacity(supervisorDistributions));
+
+        Map<String, SupervisorDetails> retList = new HashMap<>();
+        Map<String, AtomicInteger> seenRacks = new HashMap<>();
+        byRackId.forEach((rackId, list) -> {
+            int tmpRackSupervisorCnt = list.stream().mapToInt(x -> x.supervisorCnt).sum() - Math.abs(reducedSupervisorsPerRack);
+            if (tmpRackSupervisorCnt > Math.abs(reducedSupervisorsPerRack)) {

Review comment:
       Thanks for pointing this out. This is an error. Will fix.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] bipinprasad commented on a change in pull request #3378: [STORM-3743] Add new topologies and change TestLargeCluster

Posted by GitBox <gi...@apache.org>.
bipinprasad commented on a change in pull request #3378:
URL: https://github.com/apache/storm/pull/3378#discussion_r581638775



##########
File path: storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestLargeCluster.java
##########
@@ -312,57 +321,93 @@ private static void createAndAddOneSupervisor(
     }
 
     /**
-     * Create supervisors.
+     * Create supervisors for a larger cluster configuration.
      *
-     * @param uniformSupervisors true if all supervisors are of the same size, false otherwise.
-     * @return supervisor details indexed by id
+     * @param reducedSupervisorsPerRack number of supervisors to reduce in rack.
+     * @return created supervisors.
      */
-    private static Map<String, SupervisorDetails> createSupervisors(boolean uniformSupervisors) {
-        Map<String, SupervisorDetails> retVal;
-        if (uniformSupervisors) {
-            int numRacks = 16;
-            int numSupersPerRack = 82;
-            int numPorts = 50; // note: scheduling is slower when components with large cpu/mem leave large percent of workerslots unused
-            int rackStart = 0;
-            int superInRackStart = 1;
-            double cpu = 7200; // %percent
-            double mem = 356_000; // MB
-            Map<String, Double> miscResources = new HashMap<>();
-            miscResources.put("network.resource.units", 100.0);
-
-            return TestUtilsForResourceAwareScheduler.genSupervisorsWithRacks(
-                    numRacks, numSupersPerRack, numPorts, rackStart, superInRackStart, cpu, mem, miscResources);
-
+    private static Map<String, SupervisorDetails> createSupervisors(int reducedSupervisorsPerRack) {
+        if (TEST_CLUSTER_NAME.equals(TEST_CLUSTER_02)) {
+            Collection<SupervisorDistribution> supervisorDistributions = SupervisorDistribution.getSupervisorDistribution02();
+            return createSupervisors(supervisorDistributions, reducedSupervisorsPerRack);
+        } else if (TEST_CLUSTER_NAME.equals(TEST_CLUSTER_03)) {
+            Collection<SupervisorDistribution> supervisorDistributions = SupervisorDistribution.getSupervisorDistribution03();
+            return createSupervisors(supervisorDistributions, reducedSupervisorsPerRack);
         } else {
-            // this non-uniform supervisor distribution closely (but not exactly) mimics a large cluster in use
-            int numSupersPerRack = 82;
-            int numPorts = 50;
-
-            Map<String, SupervisorDetails> retList = new HashMap<>();
+            return createSupervisorsForCluster01(reducedSupervisorsPerRack);
+        }
+    }
 
-            for (int rack = 0 ; rack < 12 ; rack++) {
-                double cpu = 3600; // %percent
-                double mem = 178_000; // MB
-                for (int superInRack = 0; superInRack < numSupersPerRack ; superInRack++) {
-                    createAndAddOneSupervisor(rack, superInRack, cpu - 100 * (superInRack % 2), mem, numPorts, retList);
-                }
+    /**
+     * Create supervisors based on a predefined supervisor distribution modeled after an existing
+     * large cluster in use.
+     *
+     * @param supervisorDistributions supervisor distribution to use.
+     * @param reducedSupervisorsPerRack number of supervisors to reduce per rack.
+     * @return created supervisors.
+     */
+    private static Map<String, SupervisorDetails> createSupervisors(
+        Collection<SupervisorDistribution> supervisorDistributions, int reducedSupervisorsPerRack) {
+        Map<String, Collection<SupervisorDistribution>> byRackId = SupervisorDistribution.mapByRackId(supervisorDistributions);
+        LOG.info("Cluster={}, Designed capacity: {}", TEST_CLUSTER_NAME, SupervisorDistribution.clusterCapacity(supervisorDistributions));
+
+        Map<String, SupervisorDetails> retList = new HashMap<>();
+        Map<String, AtomicInteger> seenRacks = new HashMap<>();
+        byRackId.forEach((rackId, list) -> {
+            int tmpRackSupervisorCnt = list.stream().mapToInt(x -> x.supervisorCnt).sum() - Math.abs(reducedSupervisorsPerRack);
+            if (tmpRackSupervisorCnt > Math.abs(reducedSupervisorsPerRack)) {

Review comment:
       Defrag method uses this parameter to create a slightly smaller (i.e. more resource constrained) cluster. Fragmented cluster is then created reliably with a randomized kill of 10% of supervisors. Then defragmentation proceeds. At each step, the test code checks it is has reached the stage (i.e. fragmentation, then defragmentation). So it is convenient to have this parameter.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] bipinprasad commented on a change in pull request #3378: [STORM-3743] Add new topologies and change TestLargeCluster

Posted by GitBox <gi...@apache.org>.
bipinprasad commented on a change in pull request #3378:
URL: https://github.com/apache/storm/pull/3378#discussion_r581928292



##########
File path: storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestLargeCluster.java
##########
@@ -312,57 +321,93 @@ private static void createAndAddOneSupervisor(
     }
 
     /**
-     * Create supervisors.
+     * Create supervisors for a larger cluster configuration.
      *
-     * @param uniformSupervisors true if all supervisors are of the same size, false otherwise.
-     * @return supervisor details indexed by id
+     * @param reducedSupervisorsPerRack number of supervisors to reduce in rack.
+     * @return created supervisors.
      */
-    private static Map<String, SupervisorDetails> createSupervisors(boolean uniformSupervisors) {
-        Map<String, SupervisorDetails> retVal;
-        if (uniformSupervisors) {
-            int numRacks = 16;
-            int numSupersPerRack = 82;
-            int numPorts = 50; // note: scheduling is slower when components with large cpu/mem leave large percent of workerslots unused
-            int rackStart = 0;
-            int superInRackStart = 1;
-            double cpu = 7200; // %percent
-            double mem = 356_000; // MB
-            Map<String, Double> miscResources = new HashMap<>();
-            miscResources.put("network.resource.units", 100.0);
-
-            return TestUtilsForResourceAwareScheduler.genSupervisorsWithRacks(
-                    numRacks, numSupersPerRack, numPorts, rackStart, superInRackStart, cpu, mem, miscResources);
-
+    private static Map<String, SupervisorDetails> createSupervisors(int reducedSupervisorsPerRack) {
+        if (TEST_CLUSTER_NAME.equals(TEST_CLUSTER_02)) {
+            Collection<SupervisorDistribution> supervisorDistributions = SupervisorDistribution.getSupervisorDistribution02();
+            return createSupervisors(supervisorDistributions, reducedSupervisorsPerRack);
+        } else if (TEST_CLUSTER_NAME.equals(TEST_CLUSTER_03)) {
+            Collection<SupervisorDistribution> supervisorDistributions = SupervisorDistribution.getSupervisorDistribution03();
+            return createSupervisors(supervisorDistributions, reducedSupervisorsPerRack);
         } else {
-            // this non-uniform supervisor distribution closely (but not exactly) mimics a large cluster in use
-            int numSupersPerRack = 82;
-            int numPorts = 50;
-
-            Map<String, SupervisorDetails> retList = new HashMap<>();
+            return createSupervisorsForCluster01(reducedSupervisorsPerRack);

Review comment:
       Changed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] bipinprasad commented on a change in pull request #3378: [STORM-3743] Add new topologies and change TestLargeCluster

Posted by GitBox <gi...@apache.org>.
bipinprasad commented on a change in pull request #3378:
URL: https://github.com/apache/storm/pull/3378#discussion_r580265153



##########
File path: storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestTopologyAnonymizerUtils.java
##########
@@ -66,8 +66,10 @@
 public class TestTopologyAnonymizerUtils {
     private static final Logger LOG = LoggerFactory.getLogger(TestTopologyAnonymizerUtils.class);
 
-    private static final String DEFAULT_ORIGINAL_RESOURCES_PATH = "clusterconf/iridiumblue";
-    private static final String DEFAULT_ANONYMIZED_RESOURCES_OUTDIR = "src/test/resources/clusterconf/largeCluster01";
+    // iridiumblue -> largeCluster02 (prior largeCluster01)
+    // ebonyred -> largeCluster03

Review comment:
       removed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] bipinprasad commented on a change in pull request #3378: [STORM-3743] Add new topologies and change TestLargeCluster

Posted by GitBox <gi...@apache.org>.
bipinprasad commented on a change in pull request #3378:
URL: https://github.com/apache/storm/pull/3378#discussion_r580274143



##########
File path: storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestLargeCluster.java
##########
@@ -312,57 +321,93 @@ private static void createAndAddOneSupervisor(
     }
 
     /**
-     * Create supervisors.
+     * Create supervisors for a larger cluster configuration.
      *
-     * @param uniformSupervisors true if all supervisors are of the same size, false otherwise.
-     * @return supervisor details indexed by id
+     * @param reducedSupervisorsPerRack number of supervisors to reduce in rack.
+     * @return created supervisors.
      */
-    private static Map<String, SupervisorDetails> createSupervisors(boolean uniformSupervisors) {
-        Map<String, SupervisorDetails> retVal;
-        if (uniformSupervisors) {
-            int numRacks = 16;
-            int numSupersPerRack = 82;
-            int numPorts = 50; // note: scheduling is slower when components with large cpu/mem leave large percent of workerslots unused
-            int rackStart = 0;
-            int superInRackStart = 1;
-            double cpu = 7200; // %percent
-            double mem = 356_000; // MB
-            Map<String, Double> miscResources = new HashMap<>();
-            miscResources.put("network.resource.units", 100.0);
-
-            return TestUtilsForResourceAwareScheduler.genSupervisorsWithRacks(
-                    numRacks, numSupersPerRack, numPorts, rackStart, superInRackStart, cpu, mem, miscResources);
-
+    private static Map<String, SupervisorDetails> createSupervisors(int reducedSupervisorsPerRack) {
+        if (TEST_CLUSTER_NAME.equals(TEST_CLUSTER_02)) {
+            Collection<SupervisorDistribution> supervisorDistributions = SupervisorDistribution.getSupervisorDistribution02();
+            return createSupervisors(supervisorDistributions, reducedSupervisorsPerRack);
+        } else if (TEST_CLUSTER_NAME.equals(TEST_CLUSTER_03)) {
+            Collection<SupervisorDistribution> supervisorDistributions = SupervisorDistribution.getSupervisorDistribution03();
+            return createSupervisors(supervisorDistributions, reducedSupervisorsPerRack);
         } else {
-            // this non-uniform supervisor distribution closely (but not exactly) mimics a large cluster in use
-            int numSupersPerRack = 82;
-            int numPorts = 50;
-
-            Map<String, SupervisorDetails> retList = new HashMap<>();
+            return createSupervisorsForCluster01(reducedSupervisorsPerRack);

Review comment:
       The supervisor distribution can repeat have repeated entries for the same rack number.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org