You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by GitBox <gi...@apache.org> on 2020/02/21 22:10:40 UTC

[GitHub] [storm] RuiLi8080 opened a new pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies

RuiLi8080 opened a new pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies
URL: https://github.com/apache/storm/pull/3213
 
 
   Add GenericResourceAwareSchedulingPriorityStrategy by extending DefaultSchedulingPriorityStrategy to accommodate generic resources when sorting topologies.
   
   TestGenericResourceAwareSchedulingPriorityStrategy provides situations where DefaultSchedulingPriorityStrategy wrongfully pre-empted all other running topologies. This is prevented by GenericResourceAwareSchedulingPriorityStrategy. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [storm] Ethanlm commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies

Posted by GitBox <gi...@apache.org>.
Ethanlm commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies
URL: https://github.com/apache/storm/pull/3213#discussion_r385165779
 
 

 ##########
 File path: storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
 ##########
 @@ -83,6 +88,10 @@ public void addSelfTo(Map<String, Map<String, Number>> fullPool) {
         }
     }
 
+    public static TestUserResources userRes(String name, Map<String, Double> resources) {
+        return new TestUserResources(name, resources);
 
 Review comment:
   Right. But what I meant is unit tests for GenericResourceAwareSchedulingPriorityStrategy when users do have the generic resource guarantee 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [storm] Ethanlm commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies

Posted by GitBox <gi...@apache.org>.
Ethanlm commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies
URL: https://github.com/apache/storm/pull/3213#discussion_r406823545
 
 

 ##########
 File path: storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/priority/TestGenericResourceAwareSchedulingPriorityStrategy.java
 ##########
 @@ -210,4 +216,11 @@ private Cluster mkTestCluster(Topologies topologies, Config config) {
 
         return new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config);
     }
+
+    private Set<String> collectMapValues(Map<String, Set<String>> map) {
+        return map.values()
 
 Review comment:
   This looks overly complicated. 
   ```
           Set<String> set = new HashSet<>();
           map.values().forEach((s) -> set.addAll(s));
           return set;
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [storm] Ethanlm commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies

Posted by GitBox <gi...@apache.org>.
Ethanlm commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies
URL: https://github.com/apache/storm/pull/3213#discussion_r384800460
 
 

 ##########
 File path: storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/priority/GenericResourceAwareSchedulingPriorityStrategy.java
 ##########
 @@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource.strategies.priority;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.storm.scheduler.ISchedulingState;
+import org.apache.storm.scheduler.TopologyDetails;
+import org.apache.storm.scheduler.resource.User;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class GenericResourceAwareSchedulingPriorityStrategy extends DefaultSchedulingPriorityStrategy {
+    private static final Logger LOG = LoggerFactory.getLogger(DefaultSchedulingPriorityStrategy.class);
+
+    @Override
+    protected GrasSimulatedUser getSimulatedUserFor(User u, ISchedulingState cluster) {
+        return new GrasSimulatedUser(u, cluster);
+    }
+
+    @Override
+    public List<TopologyDetails> getOrderedTopologies(ISchedulingState cluster, Map<String, User> userMap) {
+        double cpuAvail = cluster.getClusterTotalCpuResource();
+        double memAvail = cluster.getClusterTotalMemoryResource();
+        Map<String, Double> genericAvail = cluster.getClusterTotalGenericResource();
+
+        List<TopologyDetails> allUserTopologies = new ArrayList<>();
+        List<GrasSimulatedUser> users = new ArrayList<>();
+
+        for (User u : userMap.values()) {
+            users.add(getSimulatedUserFor(u, cluster));
+        }
+
+        while (!users.isEmpty()) {
+            Collections.sort(users, new GrasSimulatedUserComparator(cpuAvail, memAvail, genericAvail));
+            GrasSimulatedUser u = users.get(0);
+            TopologyDetails td = u.getNextHighest();
+            if (td == null) {
+                users.remove(0);
+            } else {
+                double score = u.getScore(cpuAvail, memAvail, genericAvail);
+                td = u.simScheduleNextHighest();
+                LOG.info("GRAS SIM Scheduling {} with score of {}", td.getId(), score);
+                cpuAvail -= td.getTotalRequestedCpu();
+                memAvail -= (td.getTotalRequestedMemOffHeap() + td.getTotalRequestedMemOnHeap());
+                for (Map.Entry<String, Double> entry : td.getTotalRequestedGenericResources().entrySet()) {
+                    String resource = entry.getKey();
+                    Double requestedAmount = entry.getValue();
+                    if (!genericAvail.containsKey(resource)) {
+                        LOG.warn("Resource: {} is not supported in this cluster. Ignoring this request.", resource);
+                    } else {
+                        genericAvail.put(resource, genericAvail.get(resource) - requestedAmount);
+                    }
+                }
+                allUserTopologies.add(td);
+            }
+        }
+        return allUserTopologies;
+    }
+
+
+
+    protected static class GrasSimulatedUser extends SimulatedUser {
+
+        // Extend support for Generic Resources in addition to CPU and Memory
+        public final Map<String, Double> guaranteedGenericResources;                // resource name : guaranteed amount
+        private Map<String, Double> assignedGenericResources = new HashMap<>();     // resource name : assigned amount
+
+        public GrasSimulatedUser(User other, ISchedulingState cluster) {
+            super(other, cluster);
+
+            Map<String, Double> guaranteedGenericResources = new HashMap<>();
+            Map<String, Double> availGenericResources = cluster.getClusterTotalGenericResource();   // generic resources that are offered
+            for (Map.Entry<String, Double> entry : availGenericResources.entrySet()) {
+                String resource = entry.getKey();
+                Double guaranteedAmount = other.getGenericGuaranteed().getOrDefault(resource, 0.0);
+                guaranteedGenericResources.put(resource, guaranteedAmount);
+            }
+            this.guaranteedGenericResources = guaranteedGenericResources;
+        }
+
+        @Override
+        public TopologyDetails simScheduleNextHighest() {
+            TopologyDetails td = tds.pop();
+            assignedCpu += td.getTotalRequestedCpu();
+            assignedMemory += td.getTotalRequestedMemOffHeap() + td.getTotalRequestedMemOnHeap();
+            Map<String, Double> tdRequestedGenericResource = td.getTotalRequestedGenericResources();
+            for (Map.Entry<String, Double> entry : tdRequestedGenericResource.entrySet()) {
+                String resource = entry.getKey();
+                Double requestedAmount = entry.getValue();
+                assignedGenericResources.put(resource, assignedGenericResources.getOrDefault(resource, 0.0) + requestedAmount);
+            }
+            return td;
+        }
+
+        /**
+         * Get a score for the simulated user.  This is used to sort the users, by their highest priority topology.
+         * Only give user guarantees that will not exceed cluster capacity.
+         * Score of each resource type is calculated as: (Requested + Assigned - Guaranteed)/Available
 
 Review comment:
   Nice javadoc!
   
   nit: `Available` -> `clusterAvailable`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [storm] kishorvpatil commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies

Posted by GitBox <gi...@apache.org>.
kishorvpatil commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies
URL: https://github.com/apache/storm/pull/3213#discussion_r387630064
 
 

 ##########
 File path: storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java
 ##########
 @@ -271,6 +273,13 @@ boolean wouldFit(
      */
     double getClusterTotalMemoryResource();
 
+    /**
+     * Get the total amount of generic resources (excluding CPU and memory) in cluster.
+     */
+    default Map<String, Double> getClusterTotalGenericResource() {
 
 Review comment:
   Method name should probably be `getClusterTotalGenericResources`  with s

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [storm] RuiLi8080 commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies

Posted by GitBox <gi...@apache.org>.
RuiLi8080 commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies
URL: https://github.com/apache/storm/pull/3213#discussion_r407854697
 
 

 ##########
 File path: storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
 ##########
 @@ -873,20 +897,38 @@ public NormalizedResourceOffer getNonBlacklistedClusterAvailableResources(Collec
 
     @Override
     public double getClusterTotalCpuResource() {
-        double sum = 0.0;
-        for (SupervisorDetails sup : supervisors.values()) {
-            sum += sup.getTotalCpu();
-        }
-        return sum;
+        return this.totalCpuResource;
+    }
+
+    private double computeClusterCpuResource() {
+        return supervisors.values().stream()
+            .mapToDouble(SupervisorDetails::getTotalCpu)
+            .sum();
+
     }
 
     @Override
     public double getClusterTotalMemoryResource() {
-        double sum = 0.0;
-        for (SupervisorDetails sup : supervisors.values()) {
-            sum += sup.getTotalMemory();
-        }
-        return sum;
+        return this.totalMemoryResource;
+    }
+
+
+    private double computeClusterMemoryResource() {
+        return supervisors.values().stream()
+            .mapToDouble(SupervisorDetails::getTotalMemory)
+            .sum();
+    }
+
+    @Override
+    public Map<String, Double> getClusterTotalGenericResources() {
+        return this.totalGenericResources;
+    }
+
+    private Map<String, Double> computeClusterGenericResources() {
+        return supervisors.values().stream()
+            .map(sup -> sup.getTotalGenericResources().entrySet())
+            .flatMap(Set::stream)
+            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, Double::sum));
 
 Review comment:
   Sorry I didn't think of a way to further simplify, could you elaborate? `supervisors.values().stream()` will create Stream<SupervisorDetails>. I don't how `Stream.of` can help here.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [storm] Ethanlm commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies

Posted by GitBox <gi...@apache.org>.
Ethanlm commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies
URL: https://github.com/apache/storm/pull/3213#discussion_r386453674
 
 

 ##########
 File path: storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
 ##########
 @@ -256,7 +256,8 @@ private void scheduleTopology(TopologyDetails td, Cluster cluster, final User to
         if (forTest) {
             return this.evictedTopologies;
         } else {
-            throw new Exception("Topology eviction check is only provided for test purposes");
+            throw new UnsupportedOperationException(
 
 Review comment:
   `UnsupportedOperationException` is a runtime exception. `throws Exception` is not needed in the signature. So is in the unit tests like testDefaultSchedulingPriorityStrategyNotEvicting

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [storm] RuiLi8080 commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies

Posted by GitBox <gi...@apache.org>.
RuiLi8080 commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies
URL: https://github.com/apache/storm/pull/3213#discussion_r395229049
 
 

 ##########
 File path: storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
 ##########
 @@ -56,6 +59,10 @@
     private int schedulingTimeoutSeconds;
     private ExecutorService backgroundScheduling;
 
+    // record evicted topologies on each scheduling round, only used in test purpose now
+    private boolean forTest = false;
+    private Set<String> evictedTopologies = new HashSet<>();
+
 
 Review comment:
   Unfortunately no, I have to add this to check whether topo was evicted since in some scenarios, evicted topo would be put back in the same round of scheduling. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [storm] kishorvpatil commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies

Posted by GitBox <gi...@apache.org>.
kishorvpatil commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies
URL: https://github.com/apache/storm/pull/3213#discussion_r387630206
 
 

 ##########
 File path: storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
 ##########
 @@ -889,6 +889,20 @@ public double getClusterTotalMemoryResource() {
         return sum;
     }
 
+    @Override
+    public Map<String, Double> getClusterTotalGenericResource() {
 
 Review comment:
   Method name should include s at the end.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [storm] Ethanlm commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies

Posted by GitBox <gi...@apache.org>.
Ethanlm commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies
URL: https://github.com/apache/storm/pull/3213#discussion_r384784106
 
 

 ##########
 File path: storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/priority/TestGenericResourceAwareSchedulingPriorityStrategy.java
 ##########
 @@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource.strategies.priority;
+
+import org.apache.storm.Config;
+import org.apache.storm.DaemonConfig;
+import org.apache.storm.metric.StormMetricsRegistry;
+import org.apache.storm.scheduler.Cluster;
+import org.apache.storm.scheduler.INimbus;
+import org.apache.storm.scheduler.IScheduler;
+import org.apache.storm.scheduler.SupervisorDetails;
+import org.apache.storm.scheduler.Topologies;
+import org.apache.storm.scheduler.resource.ResourceAwareScheduler;
+import org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler;
+import org.apache.storm.scheduler.resource.normalization.ResourceMetrics;
+import org.apache.storm.scheduler.resource.strategies.eviction.TestDefaultEvictionStrategy;
+import org.apache.storm.utils.Time;
+import org.junit.After;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.addTopologies;
+import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.assertTopologiesBeenEvicted;
+import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.assertTopologiesFullyScheduled;
+import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.assertTopologiesNotBeenEvicted;
+import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.assertTopologiesNotScheduled;
+import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.createGrasClusterConfig;
+import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.genSupervisors;
+import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.genTopology;
+import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.userRes;
+import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.userResourcePool;
+
+public class TestGenericResourceAwareSchedulingPriorityStrategy {
+
+    private static final Logger LOG = LoggerFactory.getLogger(TestDefaultEvictionStrategy.class);
+    private static int currentTime = Time.currentTimeSecs();
+    private static IScheduler scheduler = null;
+
+    @After
+    public void cleanup() {
+        if (scheduler != null) {
+            scheduler.cleanup();
+            scheduler = null;
+        }
+    }
+
+    /*
+     * DefaultSchedulingPriorityStrategy will not evict topo as long as the resources request can be met
+     *
+     *  Ethan asks for heavy cpu and memory while Rui asks for little cpu and memory but heave generic resource
+     *  Since Rui's all types of resources request can be met, no eviction will happend
+    */
+    @Test
+    public void testDefaultSchedulingPriorityStrategyNotEvicting() {
+        Map<String, Double> requestedgenericResourcesMap = new HashMap<>();
+        requestedgenericResourcesMap.put("generic.resource.1", 40.0);
+        // Use full memory and cpu of the cluster capacity
+        Config ruiConf = createGrasClusterConfig(20, 50, 50, null, requestedgenericResourcesMap);
+        Config ethanConf = createGrasClusterConfig(80, 400, 500, null, Collections.emptyMap());
+        Topologies topologies = new Topologies(
+            genTopology("ethan-topo-1", ethanConf, 1, 0, 1, 0, currentTime - 2, 10, "ethan"),
+            genTopology("ethan-topo-2", ethanConf, 1, 0, 1, 0, currentTime - 2, 20, "ethan"),
+            genTopology("ethan-topo-3", ethanConf, 1, 0, 1, 0, currentTime - 2, 28, "ethan"),
+            genTopology("ethan-topo-4", ethanConf, 1, 0, 1, 0, currentTime - 2, 29, "ethan"));
+
+        Topologies withNewTopo = addTopologies(topologies,
+            genTopology("rui-topo-1", ruiConf, 1, 0, 4, 0, currentTime - 2, 10, "rui"));
+
+        Config config = mkClusterConfig(DefaultSchedulingPriorityStrategy.class.getName());
+        Cluster cluster = mkTestCluster(topologies, config);
+        scheduler = new ResourceAwareScheduler();
+        scheduler.prepare(config);
+        scheduler.schedule(topologies, cluster);
+
+        assertTopologiesFullyScheduled(cluster, "ethan-topo-1", "ethan-topo-2", "ethan-topo-3", "ethan-topo-4");
+
+
+        cluster = new Cluster(cluster, withNewTopo);
+        scheduler.schedule(withNewTopo, cluster);
+
+        assertTopologiesFullyScheduled(cluster, "ethan-topo-1", "ethan-topo-2", "ethan-topo-3", "ethan-topo-4");
+        assertTopologiesNotBeenEvicted(cluster, (ResourceAwareScheduler) scheduler, "ethan-topo-1", "ethan-topo-2", "ethan-topo-3", "ethan-topo-4");
+        assertTopologiesFullyScheduled(cluster, "rui-topo-1");
+    }
+
+    /*
+     * DefaultSchedulingPriorityStrategy does not take generic resources into account when calculating score
+     * So even if a user is requesting a lot of generic resources other than CPU and memory, scheduler will still score it very low and kick out other topologies
+     *
+     *  Ethan asks for medium cpu and memory while Rui asks for little cpu and memory but heave generic resource
 
 Review comment:
   `heave` -> `heavy`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [storm] Ethanlm commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies

Posted by GitBox <gi...@apache.org>.
Ethanlm commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies
URL: https://github.com/apache/storm/pull/3213#discussion_r384780599
 
 

 ##########
 File path: storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/priority/TestGenericResourceAwareSchedulingPriorityStrategy.java
 ##########
 @@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource.strategies.priority;
+
+import org.apache.storm.Config;
+import org.apache.storm.DaemonConfig;
+import org.apache.storm.metric.StormMetricsRegistry;
+import org.apache.storm.scheduler.Cluster;
+import org.apache.storm.scheduler.INimbus;
+import org.apache.storm.scheduler.IScheduler;
+import org.apache.storm.scheduler.SupervisorDetails;
+import org.apache.storm.scheduler.Topologies;
+import org.apache.storm.scheduler.resource.ResourceAwareScheduler;
+import org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler;
+import org.apache.storm.scheduler.resource.normalization.ResourceMetrics;
+import org.apache.storm.scheduler.resource.strategies.eviction.TestDefaultEvictionStrategy;
+import org.apache.storm.utils.Time;
+import org.junit.After;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.addTopologies;
+import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.assertTopologiesBeenEvicted;
+import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.assertTopologiesFullyScheduled;
+import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.assertTopologiesNotBeenEvicted;
+import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.assertTopologiesNotScheduled;
+import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.createGrasClusterConfig;
+import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.genSupervisors;
+import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.genTopology;
+import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.userRes;
+import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.userResourcePool;
+
+public class TestGenericResourceAwareSchedulingPriorityStrategy {
+
+    private static final Logger LOG = LoggerFactory.getLogger(TestDefaultEvictionStrategy.class);
 
 Review comment:
   This should be `TestGenericResourceAwareSchedulingPriorityStrategy`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [storm] bipinprasad commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies

Posted by GitBox <gi...@apache.org>.
bipinprasad commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies
URL: https://github.com/apache/storm/pull/3213#discussion_r394781127
 
 

 ##########
 File path: storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
 ##########
 @@ -56,6 +59,10 @@
     private int schedulingTimeoutSeconds;
     private ExecutorService backgroundScheduling;
 
+    // record evicted topologies on each scheduling round, only used in test purpose now
+    private boolean forTest = false;
+    private Set<String> evictedTopologies = new HashSet<>();
+
 
 Review comment:
   Can this be removed. Use TestUtilsForResourceAwareScheduler#assertTopologiesNotScheduled.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [storm] RuiLi8080 commented on issue #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies

Posted by GitBox <gi...@apache.org>.
RuiLi8080 commented on issue #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies
URL: https://github.com/apache/storm/pull/3213#issuecomment-614412309
 
 
   Only one server test failed in JDK11, JDK8 passed. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [storm] Ethanlm commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies

Posted by GitBox <gi...@apache.org>.
Ethanlm commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies
URL: https://github.com/apache/storm/pull/3213#discussion_r406921364
 
 

 ##########
 File path: storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
 ##########
 @@ -57,6 +59,8 @@
     private int maxSchedulingAttempts;
     private int schedulingTimeoutSeconds;
     private ExecutorService backgroundScheduling;
+    private Map<String, Set<String>> evictedTopologiesMap = new HashMap<>();   // topoId : topoEvicted
+    private Map<String, Set<String>> tmpEvictedTopologiesMap = new HashMap<>();
 
 Review comment:
   This doesn't have to be a attribute since it's only used inside schedule method

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [storm] govind-menon commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies

Posted by GitBox <gi...@apache.org>.
govind-menon commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies
URL: https://github.com/apache/storm/pull/3213#discussion_r407593702
 
 

 ##########
 File path: storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java
 ##########
 @@ -271,6 +273,13 @@ boolean wouldFit(
      */
     double getClusterTotalMemoryResource();
 
+    /**
+     * Get the total amount of generic resources (excluding CPU and memory) in cluster.
+     */
+    default Map<String, Double> getClusterTotalGenericResources() {
+        return Collections.emptyMap();
 
 Review comment:
   Do we need to define a default?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [storm] RuiLi8080 commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies

Posted by GitBox <gi...@apache.org>.
RuiLi8080 commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies
URL: https://github.com/apache/storm/pull/3213#discussion_r407855790
 
 

 ##########
 File path: storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/priority/GenericResourceAwareSchedulingPriorityStrategy.java
 ##########
 @@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource.strategies.priority;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.storm.scheduler.ISchedulingState;
+import org.apache.storm.scheduler.TopologyDetails;
+import org.apache.storm.scheduler.resource.User;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class GenericResourceAwareSchedulingPriorityStrategy extends DefaultSchedulingPriorityStrategy {
+    private static final Logger LOG = LoggerFactory.getLogger(GenericResourceAwareSchedulingPriorityStrategy.class);
+
+    @Override
+    protected GrasSimulatedUser getSimulatedUserFor(User u, ISchedulingState cluster) {
+        return new GrasSimulatedUser(u, cluster);
+    }
+
+    @Override
+    public List<TopologyDetails> getOrderedTopologies(ISchedulingState cluster, Map<String, User> userMap) {
+        double cpuAvail = cluster.getClusterTotalCpuResource();
+        double memAvail = cluster.getClusterTotalMemoryResource();
+        Map<String, Double> genericAvail = cluster.getClusterTotalGenericResources();
+
+        List<TopologyDetails> allUserTopologies = new ArrayList<>();
+        List<GrasSimulatedUser> users = new ArrayList<>();
+
+        for (User u : userMap.values()) {
+            users.add(getSimulatedUserFor(u, cluster));
+        }
+
+        while (!users.isEmpty()) {
+            Collections.sort(users, new GrasSimulatedUserComparator(cpuAvail, memAvail, genericAvail));
+            GrasSimulatedUser u = users.get(0);
+            TopologyDetails td = u.getNextHighest();
+            if (td == null) {
+                users.remove(0);
+            } else {
+                double score = u.getScore(cpuAvail, memAvail, genericAvail);
+                td = u.simScheduleNextHighest();
+                LOG.info("GRAS SIM Scheduling {} with score of {}", td.getId(), score);
+                cpuAvail -= td.getTotalRequestedCpu();
+                memAvail -= (td.getTotalRequestedMemOffHeap() + td.getTotalRequestedMemOnHeap());
+                for (Map.Entry<String, Double> entry : td.getTotalRequestedGenericResources().entrySet()) {
+                    String resource = entry.getKey();
+                    Double requestedAmount = entry.getValue();
+                    if (!genericAvail.containsKey(resource)) {
+                        LOG.warn("Resource: {} is not supported in this cluster. Ignoring this request.", resource);
+                    } else {
+                        genericAvail.put(resource, genericAvail.get(resource) - requestedAmount);
+                    }
+                }
+                allUserTopologies.add(td);
+            }
+        }
+        return allUserTopologies;
+    }
+
+    protected static class GrasSimulatedUser extends SimulatedUser {
+
+        // Extend support for Generic Resources in addition to CPU and Memory
+        public final Map<String, Double> guaranteedGenericResources;                // resource name : guaranteed amount
+        private Map<String, Double> assignedGenericResources = new HashMap<>();     // resource name : assigned amount
+
+        public GrasSimulatedUser(User other, ISchedulingState cluster) {
+            super(other, cluster);
+
+            Map<String, Double> guaranteedGenericResources = new HashMap<>();
+            // generic resource types that are offered
+            Set<String> availGenericResourceTypes = cluster.getClusterTotalGenericResources().keySet();
+            for (String resourceType : availGenericResourceTypes) {
+                Double guaranteedAmount = other.getGenericGuaranteed().getOrDefault(resourceType, 0.0);
+                guaranteedGenericResources.put(resourceType, guaranteedAmount);
+            }
+            this.guaranteedGenericResources = guaranteedGenericResources;
+        }
+
+        @Override
+        public TopologyDetails simScheduleNextHighest() {
+            TopologyDetails td = super.simScheduleNextHighest();
+            Map<String, Double> tdRequestedGenericResource = td.getTotalRequestedGenericResources();
+            for (Map.Entry<String, Double> entry : tdRequestedGenericResource.entrySet()) {
+                String resource = entry.getKey();
+                Double requestedAmount = entry.getValue();
+                assignedGenericResources.put(resource, assignedGenericResources.getOrDefault(resource, 0.0) + requestedAmount);
+            }
+            return td;
+        }
+
+        /**
+         * Get a score for the simulated user.  This is used to sort the users, by their highest priority topology.
+         * Only give user guarantees that will not exceed cluster capacity.
+         * Score of each resource type is calculated as: (Requested + Assigned - Guaranteed)/clusterAvailable
+         * The final score is a max over all resource types.
+         * Topology score will fall into the following intervals if:
+         *      User is under quota (guarantee):                    [(-guarantee)/available : 0]
+         *      User is over quota:                                 (0, infinity)
+         * Unfortunately, score below 0 does not guarantee that the topology will be scheduled due to resources fragmentation.
+         * @param availableCpu available CPU on the cluster.
+         * @param availableMemory available memory on the cluster.
+         * @param availableGenericResources available generic resources (other that cpu and memory) in cluster
+         * @param td the topology we are looking at.
+         * @return the score.
+         */
+        protected double getScore(double availableCpu, double availableMemory,
+                                  Map<String, Double> availableGenericResources, TopologyDetails td) {
+            // calculate scores for cpu and memory first
+            double ret = super.getScore(availableCpu, availableMemory, td);
+            if (ret == Double.MAX_VALUE) {
+                return ret;
+            }
+            Map<String, Double> tdTotalRequestedGeneric = td.getTotalRequestedGenericResources();
+            if (tdTotalRequestedGeneric == null) {
+                tdTotalRequestedGeneric = Collections.emptyMap();
+            }
+            for (Map.Entry<String, Double> entry : availableGenericResources.entrySet()) {
+                String resource = entry.getKey();
+                Double available = entry.getValue();
+                if (available <= 0) {
+                    return Double.MAX_VALUE;
+                }
+                Double wouldBeResource = assignedGenericResources.getOrDefault(resource, 0.0)
+                    + tdTotalRequestedGeneric.getOrDefault(resource, 0.0);
+                double thisScore = (wouldBeResource -  guaranteedGenericResources.getOrDefault(resource, 0.0)) / available;
+                ret = Math.max(ret, thisScore);
+            }
+
+            return ret;
+        }
+
+        protected double getScore(double availableCpu, double availableMemory, Map<String, Double> availableGenericResources) {
+            TopologyDetails td = getNextHighest();
 
 Review comment:
   Fixed the inline.
   
   As for `tmpEvictedTopologiesMap`, my concern is that in the the `scheduleTopology()` method, we have a lot of return/break points since we have a pretty complex nested if else structure. Passing the `tmpEvictedTopologiesMap` can make sure the snapshot will be passed to the `evictedTopologiesMap` after this round scheduling finished for whatever reason. 
   
   So we create a new `tmpEvictedTopologiesMap` object before each round of `scheduleTopology()` and passed it over afterward.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [storm] RuiLi8080 commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies

Posted by GitBox <gi...@apache.org>.
RuiLi8080 commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies
URL: https://github.com/apache/storm/pull/3213#discussion_r406865055
 
 

 ##########
 File path: storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
 ##########
 @@ -248,6 +255,11 @@ private void scheduleTopology(TopologyDetails td, Cluster cluster, final User to
                     + topologySchedulingResources.getRemainingRequiredResourcesMessage());
     }
 
+    public Map<String, Set<String>> getEvictedTopologiesMap() {
 
 Review comment:
   Added.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [storm] RuiLi8080 commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies

Posted by GitBox <gi...@apache.org>.
RuiLi8080 commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies
URL: https://github.com/apache/storm/pull/3213#discussion_r406945579
 
 

 ##########
 File path: storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
 ##########
 @@ -57,6 +59,8 @@
     private int maxSchedulingAttempts;
     private int schedulingTimeoutSeconds;
     private ExecutorService backgroundScheduling;
+    private Map<String, Set<String>> evictedTopologiesMap = new HashMap<>();   // topoId : topoEvicted
+    private Map<String, Set<String>> tmpEvictedTopologiesMap = new HashMap<>();
 
 Review comment:
   I was thinking not to add more parameters to he private scheduleTopology function, but I think this makes sense and fixed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [storm] bipinprasad commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies

Posted by GitBox <gi...@apache.org>.
bipinprasad commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies
URL: https://github.com/apache/storm/pull/3213#discussion_r394771047
 
 

 ##########
 File path: storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/priority/GenericResourceAwareSchedulingPriorityStrategy.java
 ##########
 @@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource.strategies.priority;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.storm.scheduler.ISchedulingState;
+import org.apache.storm.scheduler.TopologyDetails;
+import org.apache.storm.scheduler.resource.User;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class GenericResourceAwareSchedulingPriorityStrategy extends DefaultSchedulingPriorityStrategy {
+    private static final Logger LOG = LoggerFactory.getLogger(GenericResourceAwareSchedulingPriorityStrategy.class);
+
+    @Override
+    protected GrasSimulatedUser getSimulatedUserFor(User u, ISchedulingState cluster) {
+        return new GrasSimulatedUser(u, cluster);
+    }
+
+    @Override
+    public List<TopologyDetails> getOrderedTopologies(ISchedulingState cluster, Map<String, User> userMap) {
+        double cpuAvail = cluster.getClusterTotalCpuResource();
+        double memAvail = cluster.getClusterTotalMemoryResource();
+        Map<String, Double> genericAvail = cluster.getClusterTotalGenericResources();
+
+        List<TopologyDetails> allUserTopologies = new ArrayList<>();
+        List<GrasSimulatedUser> users = new ArrayList<>();
+
+        for (User u : userMap.values()) {
+            users.add(getSimulatedUserFor(u, cluster));
+        }
+
+        while (!users.isEmpty()) {
+            Collections.sort(users, new GrasSimulatedUserComparator(cpuAvail, memAvail, genericAvail));
+            GrasSimulatedUser u = users.get(0);
+            TopologyDetails td = u.getNextHighest();
+            if (td == null) {
+                users.remove(0);
+            } else {
+                double score = u.getScore(cpuAvail, memAvail, genericAvail);
+                td = u.simScheduleNextHighest();
+                LOG.info("GRAS SIM Scheduling {} with score of {}", td.getId(), score);
+                cpuAvail -= td.getTotalRequestedCpu();
+                memAvail -= (td.getTotalRequestedMemOffHeap() + td.getTotalRequestedMemOnHeap());
+                for (Map.Entry<String, Double> entry : td.getTotalRequestedGenericResources().entrySet()) {
+                    String resource = entry.getKey();
+                    Double requestedAmount = entry.getValue();
+                    if (!genericAvail.containsKey(resource)) {
+                        LOG.warn("Resource: {} is not supported in this cluster. Ignoring this request.", resource);
+                    } else {
+                        genericAvail.put(resource, genericAvail.get(resource) - requestedAmount);
+                    }
+                }
+                allUserTopologies.add(td);
+            }
+        }
+        return allUserTopologies;
+    }
+
+    protected static class GrasSimulatedUser extends SimulatedUser {
+
+        // Extend support for Generic Resources in addition to CPU and Memory
+        public final Map<String, Double> guaranteedGenericResources;                // resource name : guaranteed amount
+        private Map<String, Double> assignedGenericResources = new HashMap<>();     // resource name : assigned amount
+
+        public GrasSimulatedUser(User other, ISchedulingState cluster) {
+            super(other, cluster);
+
+            Map<String, Double> guaranteedGenericResources = new HashMap<>();
+            // generic resource types that are offered
+            Set<String> availGenericResourceTypes = cluster.getClusterTotalGenericResources().keySet();
+            for (String resourceType : availGenericResourceTypes) {
+                Double guaranteedAmount = other.getGenericGuaranteed().getOrDefault(resourceType, 0.0);
+                guaranteedGenericResources.put(resourceType, guaranteedAmount);
+            }
+            this.guaranteedGenericResources = guaranteedGenericResources;
+        }
+
+        @Override
+        public TopologyDetails simScheduleNextHighest() {
+            TopologyDetails td = super.simScheduleNextHighest();
+            Map<String, Double> tdRequestedGenericResource = td.getTotalRequestedGenericResources();
+            for (Map.Entry<String, Double> entry : tdRequestedGenericResource.entrySet()) {
+                String resource = entry.getKey();
+                Double requestedAmount = entry.getValue();
+                assignedGenericResources.put(resource, assignedGenericResources.getOrDefault(resource, 0.0) + requestedAmount);
+            }
+            return td;
+        }
+
+        /**
+         * Get a score for the simulated user.  This is used to sort the users, by their highest priority topology.
+         * Only give user guarantees that will not exceed cluster capacity.
+         * Score of each resource type is calculated as: (Requested + Assigned - Guaranteed)/clusterAvailable
+         * The final score is a max over all resource types.
+         * Topology score will fall into the following intervals if:
+         *      User is under quota (guarantee):                    [(-guarantee)/available : 0]
+         *      User is over quota:                                 (0, infinity)
+         * Unfortunately, score below 0 does not guarantee that the topology will be scheduled due to resources fragmentation.
+         * @param availableCpu available CPU on the cluster.
+         * @param availableMemory available memory on the cluster.
+         * @param availableGenericResources available generic resources (other that cpu and memory) in cluster
+         * @param td the topology we are looking at.
+         * @return the score.
+         */
+        protected double getScore(double availableCpu, double availableMemory,
+                                  Map<String, Double> availableGenericResources, TopologyDetails td) {
+            // calculate scores for cpu and memory first
+            double ret = super.getScore(availableCpu, availableMemory, td);
+            if (ret == Double.MAX_VALUE) {
+                return ret;
+            }
+            Map<String, Double> tdTotalRequestedGeneric = td.getTotalRequestedGenericResources();
+            if (tdTotalRequestedGeneric == null) {
+                tdTotalRequestedGeneric = Collections.emptyMap();
+            }
+            for (Map.Entry<String, Double> entry : availableGenericResources.entrySet()) {
+                String resource = entry.getKey();
+                Double available = entry.getValue();
+                if (available <= 0) {
+                    return Double.MAX_VALUE;
+                }
+                Double wouldBeResource = assignedGenericResources.getOrDefault(resource, 0.0)
+                    + tdTotalRequestedGeneric.getOrDefault(resource, 0.0);
+                double thisScore = (wouldBeResource -  guaranteedGenericResources.getOrDefault(resource, 0.0)) / available;
+                ret = Math.max(ret, thisScore);
+            }
+
+            return ret;
+        }
+
+        protected double getScore(double availableCpu, double availableMemory, Map<String, Double> availableGenericResources) {
+            TopologyDetails td = getNextHighest();
+            return getScore(availableCpu, availableMemory, availableGenericResources, td);
+        }
+    }
+
+    private static class GrasSimulatedUserComparator implements Comparator<GrasSimulatedUser> {
 
 Review comment:
   Is this comparator supposed to compare GrasSimulatedUser for use in a sort? Then it should compare o1 to o2. But it "seems" to be comparing  "distance" from some fixed point to o1 and o2. Is this intentional?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [storm] Ethanlm commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies

Posted by GitBox <gi...@apache.org>.
Ethanlm commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies
URL: https://github.com/apache/storm/pull/3213#discussion_r405255219
 
 

 ##########
 File path: storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
 ##########
 @@ -239,6 +246,10 @@ private void scheduleTopology(TopologyDetails td, Cluster cluster, final User to
         markFailedTopology(topologySubmitter, cluster, td, "Failed to schedule within " + maxSchedulingAttempts + " attempts");
     }
 
+    public Set<String> getEvictedTopologies() {
 
 Review comment:
   Yes that's right. We should return a unmodifiableCollection to solve the race condition. I think my question is mostly on how do we want to use this `evictedTopologies` information in regular code except unit test since it's only a snapshot of all the `evictedTopologies`  at the moment we call `getEvictedTopologies`.  And it could be evictedTopologies after scheduling multiple topologies. And this information is not very useful.
   If we want to log what are evicted when scheduling a certain topology, we can change the above line to `LOG.info` and it works better than having a `evictedTopologies` list.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [storm] RuiLi8080 commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies

Posted by GitBox <gi...@apache.org>.
RuiLi8080 commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies
URL: https://github.com/apache/storm/pull/3213#discussion_r387823536
 
 

 ##########
 File path: storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java
 ##########
 @@ -271,6 +273,13 @@ boolean wouldFit(
      */
     double getClusterTotalMemoryResource();
 
+    /**
+     * Get the total amount of generic resources (excluding CPU and memory) in cluster.
+     */
+    default Map<String, Double> getClusterTotalGenericResource() {
 
 Review comment:
   Addressed

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [storm] Ethanlm commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies

Posted by GitBox <gi...@apache.org>.
Ethanlm commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies
URL: https://github.com/apache/storm/pull/3213#discussion_r406827001
 
 

 ##########
 File path: storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
 ##########
 @@ -196,21 +204,18 @@ private void scheduleTopology(TopologyDetails td, Cluster cluster, final User to
                     } else if (result.getStatus() == SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES) {
                         LOG.debug("Not enough resources to schedule {}", td.getName());
                         List<TopologyDetails> reversedList = ImmutableList.copyOf(orderedTopologies).reverse();
-                        boolean evictedSomething = false;
-                        LOG.debug("attempting to make space for topo {} from user {}", td.getName(), td.getTopologySubmitter());
+                        LOG.debug("Attempting to make space for topo {} from user {}", td.getName(), td.getTopologySubmitter());
                         int tdIndex = reversedList.indexOf(td);
                         topologySchedulingResources.setRemainingRequiredResources(toSchedule, td);
 
+                        Set<String> tmpEvictedTopos = new HashSet<>();
                         for (int index = 0; index < tdIndex; index++) {
                             TopologyDetails topologyEvict = reversedList.get(index);
                             SchedulerAssignment evictAssignemnt = workingState.getAssignmentById(topologyEvict.getId());
                             if (evictAssignemnt != null && !evictAssignemnt.getSlots().isEmpty()) {
-                                Collection<WorkerSlot> workersToEvict = workingState.getUsedSlotsByTopologyId(topologyEvict.getId());
                                 topologySchedulingResources.adjustResourcesForEvictedTopology(toSchedule, topologyEvict);
-
-                                LOG.debug("Evicting Topology {} with workers: {} from user {}", topologyEvict.getName(), workersToEvict,
-                                    topologyEvict.getTopologySubmitter());
-                                evictedSomething = true;
+                                tmpEvictedTopos.add(topologyEvict.getId());
 
 Review comment:
   `tmpEvictedTopos` is a set of evictedTopologies in every attempt. 
   It might evict multiple topologies in maxSchedulingAttempts. 
   
   So the current evictedTopologiesMap is just a subset.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [storm] Ethanlm commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies

Posted by GitBox <gi...@apache.org>.
Ethanlm commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies
URL: https://github.com/apache/storm/pull/3213#discussion_r406831958
 
 

 ##########
 File path: storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
 ##########
 @@ -248,6 +255,11 @@ private void scheduleTopology(TopologyDetails td, Cluster cluster, final User to
                     + topologySchedulingResources.getRemainingRequiredResourcesMessage());
     }
 
+    public Map<String, Set<String>> getEvictedTopologiesMap() {
 
 Review comment:
   I would add a comment like `This method is not stable. It's subject to change` since we don't really use it elsewhere besides unit test currently. It might change depending on how we want to use the evictedTopologies information. As @bipinprasad  said, we might want to put the evictedTopologies information into the scheduleResult in the future. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [storm] RuiLi8080 commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies

Posted by GitBox <gi...@apache.org>.
RuiLi8080 commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies
URL: https://github.com/apache/storm/pull/3213#discussion_r395228271
 
 

 ##########
 File path: storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/priority/GenericResourceAwareSchedulingPriorityStrategy.java
 ##########
 @@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource.strategies.priority;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.storm.scheduler.ISchedulingState;
+import org.apache.storm.scheduler.TopologyDetails;
+import org.apache.storm.scheduler.resource.User;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class GenericResourceAwareSchedulingPriorityStrategy extends DefaultSchedulingPriorityStrategy {
+    private static final Logger LOG = LoggerFactory.getLogger(GenericResourceAwareSchedulingPriorityStrategy.class);
+
+    @Override
+    protected GrasSimulatedUser getSimulatedUserFor(User u, ISchedulingState cluster) {
+        return new GrasSimulatedUser(u, cluster);
+    }
+
+    @Override
+    public List<TopologyDetails> getOrderedTopologies(ISchedulingState cluster, Map<String, User> userMap) {
+        double cpuAvail = cluster.getClusterTotalCpuResource();
+        double memAvail = cluster.getClusterTotalMemoryResource();
+        Map<String, Double> genericAvail = cluster.getClusterTotalGenericResources();
+
+        List<TopologyDetails> allUserTopologies = new ArrayList<>();
+        List<GrasSimulatedUser> users = new ArrayList<>();
+
+        for (User u : userMap.values()) {
+            users.add(getSimulatedUserFor(u, cluster));
+        }
+
+        while (!users.isEmpty()) {
+            Collections.sort(users, new GrasSimulatedUserComparator(cpuAvail, memAvail, genericAvail));
+            GrasSimulatedUser u = users.get(0);
+            TopologyDetails td = u.getNextHighest();
+            if (td == null) {
+                users.remove(0);
+            } else {
+                double score = u.getScore(cpuAvail, memAvail, genericAvail);
+                td = u.simScheduleNextHighest();
+                LOG.info("GRAS SIM Scheduling {} with score of {}", td.getId(), score);
+                cpuAvail -= td.getTotalRequestedCpu();
+                memAvail -= (td.getTotalRequestedMemOffHeap() + td.getTotalRequestedMemOnHeap());
+                for (Map.Entry<String, Double> entry : td.getTotalRequestedGenericResources().entrySet()) {
+                    String resource = entry.getKey();
+                    Double requestedAmount = entry.getValue();
+                    if (!genericAvail.containsKey(resource)) {
+                        LOG.warn("Resource: {} is not supported in this cluster. Ignoring this request.", resource);
+                    } else {
+                        genericAvail.put(resource, genericAvail.get(resource) - requestedAmount);
+                    }
+                }
+                allUserTopologies.add(td);
+            }
+        }
+        return allUserTopologies;
+    }
+
+    protected static class GrasSimulatedUser extends SimulatedUser {
+
+        // Extend support for Generic Resources in addition to CPU and Memory
+        public final Map<String, Double> guaranteedGenericResources;                // resource name : guaranteed amount
+        private Map<String, Double> assignedGenericResources = new HashMap<>();     // resource name : assigned amount
+
+        public GrasSimulatedUser(User other, ISchedulingState cluster) {
+            super(other, cluster);
+
+            Map<String, Double> guaranteedGenericResources = new HashMap<>();
+            // generic resource types that are offered
+            Set<String> availGenericResourceTypes = cluster.getClusterTotalGenericResources().keySet();
+            for (String resourceType : availGenericResourceTypes) {
+                Double guaranteedAmount = other.getGenericGuaranteed().getOrDefault(resourceType, 0.0);
+                guaranteedGenericResources.put(resourceType, guaranteedAmount);
+            }
+            this.guaranteedGenericResources = guaranteedGenericResources;
+        }
+
+        @Override
+        public TopologyDetails simScheduleNextHighest() {
+            TopologyDetails td = super.simScheduleNextHighest();
+            Map<String, Double> tdRequestedGenericResource = td.getTotalRequestedGenericResources();
+            for (Map.Entry<String, Double> entry : tdRequestedGenericResource.entrySet()) {
+                String resource = entry.getKey();
+                Double requestedAmount = entry.getValue();
+                assignedGenericResources.put(resource, assignedGenericResources.getOrDefault(resource, 0.0) + requestedAmount);
+            }
+            return td;
+        }
+
+        /**
+         * Get a score for the simulated user.  This is used to sort the users, by their highest priority topology.
+         * Only give user guarantees that will not exceed cluster capacity.
+         * Score of each resource type is calculated as: (Requested + Assigned - Guaranteed)/clusterAvailable
+         * The final score is a max over all resource types.
+         * Topology score will fall into the following intervals if:
+         *      User is under quota (guarantee):                    [(-guarantee)/available : 0]
+         *      User is over quota:                                 (0, infinity)
+         * Unfortunately, score below 0 does not guarantee that the topology will be scheduled due to resources fragmentation.
+         * @param availableCpu available CPU on the cluster.
+         * @param availableMemory available memory on the cluster.
+         * @param availableGenericResources available generic resources (other that cpu and memory) in cluster
+         * @param td the topology we are looking at.
+         * @return the score.
+         */
+        protected double getScore(double availableCpu, double availableMemory,
+                                  Map<String, Double> availableGenericResources, TopologyDetails td) {
+            // calculate scores for cpu and memory first
+            double ret = super.getScore(availableCpu, availableMemory, td);
+            if (ret == Double.MAX_VALUE) {
+                return ret;
+            }
+            Map<String, Double> tdTotalRequestedGeneric = td.getTotalRequestedGenericResources();
+            if (tdTotalRequestedGeneric == null) {
+                tdTotalRequestedGeneric = Collections.emptyMap();
+            }
+            for (Map.Entry<String, Double> entry : availableGenericResources.entrySet()) {
+                String resource = entry.getKey();
+                Double available = entry.getValue();
+                if (available <= 0) {
+                    return Double.MAX_VALUE;
+                }
+                Double wouldBeResource = assignedGenericResources.getOrDefault(resource, 0.0)
+                    + tdTotalRequestedGeneric.getOrDefault(resource, 0.0);
+                double thisScore = (wouldBeResource -  guaranteedGenericResources.getOrDefault(resource, 0.0)) / available;
+                ret = Math.max(ret, thisScore);
+            }
+
+            return ret;
+        }
+
+        protected double getScore(double availableCpu, double availableMemory, Map<String, Double> availableGenericResources) {
+            TopologyDetails td = getNextHighest();
+            return getScore(availableCpu, availableMemory, availableGenericResources, td);
+        }
+    }
+
+    private static class GrasSimulatedUserComparator implements Comparator<GrasSimulatedUser> {
 
 Review comment:
   It is comparing GrasSimulatedUser  for sorting at line 58. And it is comparing o1 and o2 based on their scores, please refer to the overriding compare method at line 175.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [storm] Ethanlm commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies

Posted by GitBox <gi...@apache.org>.
Ethanlm commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies
URL: https://github.com/apache/storm/pull/3213#discussion_r406823545
 
 

 ##########
 File path: storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/priority/TestGenericResourceAwareSchedulingPriorityStrategy.java
 ##########
 @@ -210,4 +216,11 @@ private Cluster mkTestCluster(Topologies topologies, Config config) {
 
         return new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config);
     }
+
+    private Set<String> collectMapValues(Map<String, Set<String>> map) {
+        return map.values()
 
 Review comment:
    This looks overly complicated.  Maybe
   ```
           Set<String> set = new HashSet<>();
           map.values().forEach((s) -> set.addAll(s));
           return set;
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [storm] RuiLi8080 edited a comment on issue #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies

Posted by GitBox <gi...@apache.org>.
RuiLi8080 edited a comment on issue #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies
URL: https://github.com/apache/storm/pull/3213#issuecomment-601342635
 
 
   @kishorvpatil After having discussed with Ethan, I have made the changes as you suggested. Thanks for your review.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [storm] govind-menon commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies

Posted by GitBox <gi...@apache.org>.
govind-menon commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies
URL: https://github.com/apache/storm/pull/3213#discussion_r409153634
 
 

 ##########
 File path: storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
 ##########
 @@ -873,20 +897,38 @@ public NormalizedResourceOffer getNonBlacklistedClusterAvailableResources(Collec
 
     @Override
     public double getClusterTotalCpuResource() {
-        double sum = 0.0;
-        for (SupervisorDetails sup : supervisors.values()) {
-            sum += sup.getTotalCpu();
-        }
-        return sum;
+        return this.totalCpuResource;
+    }
+
+    private double computeClusterCpuResource() {
+        return supervisors.values().stream()
+            .mapToDouble(SupervisorDetails::getTotalCpu)
+            .sum();
+
     }
 
     @Override
     public double getClusterTotalMemoryResource() {
-        double sum = 0.0;
-        for (SupervisorDetails sup : supervisors.values()) {
-            sum += sup.getTotalMemory();
-        }
-        return sum;
+        return this.totalMemoryResource;
+    }
+
+
+    private double computeClusterMemoryResource() {
+        return supervisors.values().stream()
+            .mapToDouble(SupervisorDetails::getTotalMemory)
+            .sum();
+    }
+
+    @Override
+    public Map<String, Double> getClusterTotalGenericResources() {
+        return this.totalGenericResources;
+    }
+
+    private Map<String, Double> computeClusterGenericResources() {
+        return supervisors.values().stream()
+            .map(sup -> sup.getTotalGenericResources().entrySet())
+            .flatMap(Set::stream)
+            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, Double::sum));
 
 Review comment:
   If you see the example in the link it's a slightly different way to add the key, value pairs of a Map but it's not an issue. You can keep this as is.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [storm] kishorvpatil commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies

Posted by GitBox <gi...@apache.org>.
kishorvpatil commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies
URL: https://github.com/apache/storm/pull/3213#discussion_r393183895
 
 

 ##########
 File path: storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/priority/GenericResourceAwareSchedulingPriorityStrategy.java
 ##########
 @@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource.strategies.priority;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.storm.scheduler.ISchedulingState;
+import org.apache.storm.scheduler.TopologyDetails;
+import org.apache.storm.scheduler.resource.User;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class GenericResourceAwareSchedulingPriorityStrategy extends DefaultSchedulingPriorityStrategy {
+    private static final Logger LOG = LoggerFactory.getLogger(GenericResourceAwareSchedulingPriorityStrategy.class);
+
+    @Override
+    protected GrasSimulatedUser getSimulatedUserFor(User u, ISchedulingState cluster) {
+        return new GrasSimulatedUser(u, cluster);
+    }
+
+    @Override
+    public List<TopologyDetails> getOrderedTopologies(ISchedulingState cluster, Map<String, User> userMap) {
+        double cpuAvail = cluster.getClusterTotalCpuResource();
+        double memAvail = cluster.getClusterTotalMemoryResource();
+        Map<String, Double> genericAvail = cluster.getClusterTotalGenericResources();
+
+        List<TopologyDetails> allUserTopologies = new ArrayList<>();
+        List<GrasSimulatedUser> users = new ArrayList<>();
+
+        for (User u : userMap.values()) {
+            users.add(getSimulatedUserFor(u, cluster));
+        }
+
+        while (!users.isEmpty()) {
+            Collections.sort(users, new GrasSimulatedUserComparator(cpuAvail, memAvail, genericAvail));
+            GrasSimulatedUser u = users.get(0);
+            TopologyDetails td = u.getNextHighest();
+            if (td == null) {
+                users.remove(0);
+            } else {
+                double score = u.getScore(cpuAvail, memAvail, genericAvail);
+                td = u.simScheduleNextHighest();
+                LOG.info("GRAS SIM Scheduling {} with score of {}", td.getId(), score);
+                cpuAvail -= td.getTotalRequestedCpu();
+                memAvail -= (td.getTotalRequestedMemOffHeap() + td.getTotalRequestedMemOnHeap());
+                for (Map.Entry<String, Double> entry : td.getTotalRequestedGenericResources().entrySet()) {
+                    String resource = entry.getKey();
+                    Double requestedAmount = entry.getValue();
+                    if (!genericAvail.containsKey(resource)) {
+                        LOG.warn("Resource: {} is not supported in this cluster. Ignoring this request.", resource);
+                    } else {
+                        genericAvail.put(resource, genericAvail.get(resource) - requestedAmount);
+                    }
+                }
+                allUserTopologies.add(td);
+            }
+        }
+        return allUserTopologies;
+    }
+
+    protected static class GrasSimulatedUser extends SimulatedUser {
+
+        // Extend support for Generic Resources in addition to CPU and Memory
+        public final Map<String, Double> guaranteedGenericResources;                // resource name : guaranteed amount
+        private Map<String, Double> assignedGenericResources = new HashMap<>();     // resource name : assigned amount
+
+        public GrasSimulatedUser(User other, ISchedulingState cluster) {
+            super(other, cluster);
+
+            Map<String, Double> guaranteedGenericResources = new HashMap<>();
+            // generic resource types that are offered
+            Set<String> availGenericResourceTypes = cluster.getClusterTotalGenericResources().keySet();
+            for (String resourceType : availGenericResourceTypes) {
+                Double guaranteedAmount = other.getGenericGuaranteed().getOrDefault(resourceType, 0.0);
+                guaranteedGenericResources.put(resourceType, guaranteedAmount);
+            }
+            this.guaranteedGenericResources = guaranteedGenericResources;
+        }
+
+        @Override
+        public TopologyDetails simScheduleNextHighest() {
+            TopologyDetails td = super.simScheduleNextHighest();
+            Map<String, Double> tdRequestedGenericResource = td.getTotalRequestedGenericResources();
+            for (Map.Entry<String, Double> entry : tdRequestedGenericResource.entrySet()) {
+                String resource = entry.getKey();
+                Double requestedAmount = entry.getValue();
+                assignedGenericResources.put(resource, assignedGenericResources.getOrDefault(resource, 0.0) + requestedAmount);
+            }
 
 Review comment:
   should we not adjust cpu and memory as well? Very similar to how `SimulatedUser.simScheduleNextHighest()`
   
   ```                assignedCpu += td.getTotalRequestedCpu();
                   assignedMemory += td.getTotalRequestedMemOffHeap() + td.getTotalRequestedMemOnHeap();
   ```
   Even better would be generally refactor this whole class to minimize the duplicate code...
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [storm] Ethanlm commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies

Posted by GitBox <gi...@apache.org>.
Ethanlm commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies
URL: https://github.com/apache/storm/pull/3213#discussion_r405230149
 
 

 ##########
 File path: storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
 ##########
 @@ -239,6 +246,10 @@ private void scheduleTopology(TopologyDetails td, Cluster cluster, final User to
         markFailedTopology(topologySubmitter, cluster, td, "Failed to schedule within " + maxSchedulingAttempts + " attempts");
     }
 
+    public Set<String> getEvictedTopologies() {
 
 Review comment:
   I am seeing a possible race condition here. Since the `evictedTopologies` will be kept updated during scheduling, any function call to `getEvictedTopologies` doesn't guarantee to return a useful result.  This `evictedTopologies` includes every topology that is evicted during the scheduling process until the point when `getEvictedTopologies` is called.  It doesn't seem useful. 
   
   To know what topologies are being evicted, we already have
   ```
   LOG.debug("Evicting Topology {} with workers: {} from user {}", topologyEvict.getName(), workersToEvict,
                                       topologyEvict.getTopologySubmitter());
   ```
   code in place. We can change it to `warn` if we want it to show up in the regular logs.  
   
   @kishorvpatil  what do you think?
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [storm] Ethanlm commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies

Posted by GitBox <gi...@apache.org>.
Ethanlm commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies
URL: https://github.com/apache/storm/pull/3213#discussion_r384797367
 
 

 ##########
 File path: storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/priority/GenericResourceAwareSchedulingPriorityStrategy.java
 ##########
 @@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource.strategies.priority;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.storm.scheduler.ISchedulingState;
+import org.apache.storm.scheduler.TopologyDetails;
+import org.apache.storm.scheduler.resource.User;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class GenericResourceAwareSchedulingPriorityStrategy extends DefaultSchedulingPriorityStrategy {
+    private static final Logger LOG = LoggerFactory.getLogger(DefaultSchedulingPriorityStrategy.class);
+
+    @Override
+    protected GrasSimulatedUser getSimulatedUserFor(User u, ISchedulingState cluster) {
+        return new GrasSimulatedUser(u, cluster);
+    }
+
+    @Override
+    public List<TopologyDetails> getOrderedTopologies(ISchedulingState cluster, Map<String, User> userMap) {
+        double cpuAvail = cluster.getClusterTotalCpuResource();
+        double memAvail = cluster.getClusterTotalMemoryResource();
+        Map<String, Double> genericAvail = cluster.getClusterTotalGenericResource();
+
+        List<TopologyDetails> allUserTopologies = new ArrayList<>();
+        List<GrasSimulatedUser> users = new ArrayList<>();
+
+        for (User u : userMap.values()) {
+            users.add(getSimulatedUserFor(u, cluster));
+        }
+
+        while (!users.isEmpty()) {
+            Collections.sort(users, new GrasSimulatedUserComparator(cpuAvail, memAvail, genericAvail));
+            GrasSimulatedUser u = users.get(0);
+            TopologyDetails td = u.getNextHighest();
+            if (td == null) {
+                users.remove(0);
+            } else {
+                double score = u.getScore(cpuAvail, memAvail, genericAvail);
+                td = u.simScheduleNextHighest();
+                LOG.info("GRAS SIM Scheduling {} with score of {}", td.getId(), score);
+                cpuAvail -= td.getTotalRequestedCpu();
+                memAvail -= (td.getTotalRequestedMemOffHeap() + td.getTotalRequestedMemOnHeap());
+                for (Map.Entry<String, Double> entry : td.getTotalRequestedGenericResources().entrySet()) {
+                    String resource = entry.getKey();
+                    Double requestedAmount = entry.getValue();
+                    if (!genericAvail.containsKey(resource)) {
+                        LOG.warn("Resource: {} is not supported in this cluster. Ignoring this request.", resource);
+                    } else {
+                        genericAvail.put(resource, genericAvail.get(resource) - requestedAmount);
+                    }
+                }
+                allUserTopologies.add(td);
+            }
+        }
+        return allUserTopologies;
+    }
+
+
+
+    protected static class GrasSimulatedUser extends SimulatedUser {
+
+        // Extend support for Generic Resources in addition to CPU and Memory
+        public final Map<String, Double> guaranteedGenericResources;                // resource name : guaranteed amount
+        private Map<String, Double> assignedGenericResources = new HashMap<>();     // resource name : assigned amount
+
+        public GrasSimulatedUser(User other, ISchedulingState cluster) {
+            super(other, cluster);
+
+            Map<String, Double> guaranteedGenericResources = new HashMap<>();
+            Map<String, Double> availGenericResources = cluster.getClusterTotalGenericResource();   // generic resources that are offered
+            for (Map.Entry<String, Double> entry : availGenericResources.entrySet()) {
+                String resource = entry.getKey();
+                Double guaranteedAmount = other.getGenericGuaranteed().getOrDefault(resource, 0.0);
+                guaranteedGenericResources.put(resource, guaranteedAmount);
+            }
+            this.guaranteedGenericResources = guaranteedGenericResources;
+        }
+
+        @Override
+        public TopologyDetails simScheduleNextHighest() {
+            TopologyDetails td = tds.pop();
+            assignedCpu += td.getTotalRequestedCpu();
+            assignedMemory += td.getTotalRequestedMemOffHeap() + td.getTotalRequestedMemOnHeap();
 
 Review comment:
   We could reply on `super.simScheduleNextHighest();` so everything in GrasSimulatedUser is gras specific, like what you did elsewhere

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [storm] RuiLi8080 commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies

Posted by GitBox <gi...@apache.org>.
RuiLi8080 commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies
URL: https://github.com/apache/storm/pull/3213#discussion_r387823631
 
 

 ##########
 File path: storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
 ##########
 @@ -889,6 +889,20 @@ public double getClusterTotalMemoryResource() {
         return sum;
     }
 
+    @Override
+    public Map<String, Double> getClusterTotalGenericResource() {
 
 Review comment:
   Yes, addressed

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [storm] govind-menon commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies

Posted by GitBox <gi...@apache.org>.
govind-menon commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies
URL: https://github.com/apache/storm/pull/3213#discussion_r407727300
 
 

 ##########
 File path: storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/priority/GenericResourceAwareSchedulingPriorityStrategy.java
 ##########
 @@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource.strategies.priority;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.storm.scheduler.ISchedulingState;
+import org.apache.storm.scheduler.TopologyDetails;
+import org.apache.storm.scheduler.resource.User;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class GenericResourceAwareSchedulingPriorityStrategy extends DefaultSchedulingPriorityStrategy {
+    private static final Logger LOG = LoggerFactory.getLogger(GenericResourceAwareSchedulingPriorityStrategy.class);
+
+    @Override
+    protected GrasSimulatedUser getSimulatedUserFor(User u, ISchedulingState cluster) {
+        return new GrasSimulatedUser(u, cluster);
+    }
+
+    @Override
+    public List<TopologyDetails> getOrderedTopologies(ISchedulingState cluster, Map<String, User> userMap) {
+        double cpuAvail = cluster.getClusterTotalCpuResource();
+        double memAvail = cluster.getClusterTotalMemoryResource();
+        Map<String, Double> genericAvail = cluster.getClusterTotalGenericResources();
+
+        List<TopologyDetails> allUserTopologies = new ArrayList<>();
+        List<GrasSimulatedUser> users = new ArrayList<>();
+
+        for (User u : userMap.values()) {
+            users.add(getSimulatedUserFor(u, cluster));
+        }
+
+        while (!users.isEmpty()) {
+            Collections.sort(users, new GrasSimulatedUserComparator(cpuAvail, memAvail, genericAvail));
+            GrasSimulatedUser u = users.get(0);
+            TopologyDetails td = u.getNextHighest();
+            if (td == null) {
+                users.remove(0);
+            } else {
+                double score = u.getScore(cpuAvail, memAvail, genericAvail);
+                td = u.simScheduleNextHighest();
+                LOG.info("GRAS SIM Scheduling {} with score of {}", td.getId(), score);
+                cpuAvail -= td.getTotalRequestedCpu();
+                memAvail -= (td.getTotalRequestedMemOffHeap() + td.getTotalRequestedMemOnHeap());
+                for (Map.Entry<String, Double> entry : td.getTotalRequestedGenericResources().entrySet()) {
+                    String resource = entry.getKey();
+                    Double requestedAmount = entry.getValue();
+                    if (!genericAvail.containsKey(resource)) {
+                        LOG.warn("Resource: {} is not supported in this cluster. Ignoring this request.", resource);
+                    } else {
+                        genericAvail.put(resource, genericAvail.get(resource) - requestedAmount);
+                    }
+                }
+                allUserTopologies.add(td);
+            }
+        }
+        return allUserTopologies;
+    }
+
+    protected static class GrasSimulatedUser extends SimulatedUser {
+
+        // Extend support for Generic Resources in addition to CPU and Memory
+        public final Map<String, Double> guaranteedGenericResources;                // resource name : guaranteed amount
+        private Map<String, Double> assignedGenericResources = new HashMap<>();     // resource name : assigned amount
+
+        public GrasSimulatedUser(User other, ISchedulingState cluster) {
+            super(other, cluster);
+
+            Map<String, Double> guaranteedGenericResources = new HashMap<>();
+            // generic resource types that are offered
+            Set<String> availGenericResourceTypes = cluster.getClusterTotalGenericResources().keySet();
+            for (String resourceType : availGenericResourceTypes) {
+                Double guaranteedAmount = other.getGenericGuaranteed().getOrDefault(resourceType, 0.0);
+                guaranteedGenericResources.put(resourceType, guaranteedAmount);
+            }
+            this.guaranteedGenericResources = guaranteedGenericResources;
+        }
+
+        @Override
+        public TopologyDetails simScheduleNextHighest() {
+            TopologyDetails td = super.simScheduleNextHighest();
+            Map<String, Double> tdRequestedGenericResource = td.getTotalRequestedGenericResources();
+            for (Map.Entry<String, Double> entry : tdRequestedGenericResource.entrySet()) {
+                String resource = entry.getKey();
+                Double requestedAmount = entry.getValue();
+                assignedGenericResources.put(resource, assignedGenericResources.getOrDefault(resource, 0.0) + requestedAmount);
+            }
+            return td;
+        }
+
+        /**
+         * Get a score for the simulated user.  This is used to sort the users, by their highest priority topology.
+         * Only give user guarantees that will not exceed cluster capacity.
+         * Score of each resource type is calculated as: (Requested + Assigned - Guaranteed)/clusterAvailable
+         * The final score is a max over all resource types.
+         * Topology score will fall into the following intervals if:
+         *      User is under quota (guarantee):                    [(-guarantee)/available : 0]
+         *      User is over quota:                                 (0, infinity)
+         * Unfortunately, score below 0 does not guarantee that the topology will be scheduled due to resources fragmentation.
+         * @param availableCpu available CPU on the cluster.
+         * @param availableMemory available memory on the cluster.
+         * @param availableGenericResources available generic resources (other that cpu and memory) in cluster
+         * @param td the topology we are looking at.
+         * @return the score.
+         */
+        protected double getScore(double availableCpu, double availableMemory,
+                                  Map<String, Double> availableGenericResources, TopologyDetails td) {
+            // calculate scores for cpu and memory first
+            double ret = super.getScore(availableCpu, availableMemory, td);
+            if (ret == Double.MAX_VALUE) {
+                return ret;
+            }
+            Map<String, Double> tdTotalRequestedGeneric = td.getTotalRequestedGenericResources();
+            if (tdTotalRequestedGeneric == null) {
+                tdTotalRequestedGeneric = Collections.emptyMap();
+            }
+            for (Map.Entry<String, Double> entry : availableGenericResources.entrySet()) {
+                String resource = entry.getKey();
+                Double available = entry.getValue();
+                if (available <= 0) {
+                    return Double.MAX_VALUE;
+                }
+                Double wouldBeResource = assignedGenericResources.getOrDefault(resource, 0.0)
+                    + tdTotalRequestedGeneric.getOrDefault(resource, 0.0);
+                double thisScore = (wouldBeResource -  guaranteedGenericResources.getOrDefault(resource, 0.0)) / available;
+                ret = Math.max(ret, thisScore);
+            }
+
+            return ret;
+        }
+
+        protected double getScore(double availableCpu, double availableMemory, Map<String, Double> availableGenericResources) {
+            TopologyDetails td = getNextHighest();
 
 Review comment:
   Nit: could we inline this to avoid unnecessary object creation

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [storm] kishorvpatil commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies

Posted by GitBox <gi...@apache.org>.
kishorvpatil commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies
URL: https://github.com/apache/storm/pull/3213#discussion_r387632643
 
 

 ##########
 File path: storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java
 ##########
 @@ -271,6 +273,13 @@ boolean wouldFit(
      */
     double getClusterTotalMemoryResource();
 
+    /**
+     * Get the total amount of generic resources (excluding CPU and memory) in cluster.
+     */
+    default Map<String, Double> getClusterTotalGenericResource() {
 
 Review comment:
   Cluster should have a private variable that avoids recalculating this often. 
   Similarly, it should have Set<String> genericResourceTypes.
   And method `getClusterGenericResourceTypes`
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [storm] govind-menon commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies

Posted by GitBox <gi...@apache.org>.
govind-menon commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies
URL: https://github.com/apache/storm/pull/3213#discussion_r409153932
 
 

 ##########
 File path: storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/priority/GenericResourceAwareSchedulingPriorityStrategy.java
 ##########
 @@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource.strategies.priority;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.storm.scheduler.ISchedulingState;
+import org.apache.storm.scheduler.TopologyDetails;
+import org.apache.storm.scheduler.resource.User;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class GenericResourceAwareSchedulingPriorityStrategy extends DefaultSchedulingPriorityStrategy {
+    private static final Logger LOG = LoggerFactory.getLogger(GenericResourceAwareSchedulingPriorityStrategy.class);
+
+    @Override
+    protected GrasSimulatedUser getSimulatedUserFor(User u, ISchedulingState cluster) {
+        return new GrasSimulatedUser(u, cluster);
+    }
+
+    @Override
+    public List<TopologyDetails> getOrderedTopologies(ISchedulingState cluster, Map<String, User> userMap) {
+        double cpuAvail = cluster.getClusterTotalCpuResource();
+        double memAvail = cluster.getClusterTotalMemoryResource();
+        Map<String, Double> genericAvail = cluster.getClusterTotalGenericResources();
+
+        List<TopologyDetails> allUserTopologies = new ArrayList<>();
+        List<GrasSimulatedUser> users = new ArrayList<>();
+
+        for (User u : userMap.values()) {
+            users.add(getSimulatedUserFor(u, cluster));
+        }
+
+        while (!users.isEmpty()) {
+            Collections.sort(users, new GrasSimulatedUserComparator(cpuAvail, memAvail, genericAvail));
+            GrasSimulatedUser u = users.get(0);
+            TopologyDetails td = u.getNextHighest();
+            if (td == null) {
+                users.remove(0);
+            } else {
+                double score = u.getScore(cpuAvail, memAvail, genericAvail);
+                td = u.simScheduleNextHighest();
+                LOG.info("GRAS SIM Scheduling {} with score of {}", td.getId(), score);
+                cpuAvail -= td.getTotalRequestedCpu();
+                memAvail -= (td.getTotalRequestedMemOffHeap() + td.getTotalRequestedMemOnHeap());
+                for (Map.Entry<String, Double> entry : td.getTotalRequestedGenericResources().entrySet()) {
+                    String resource = entry.getKey();
+                    Double requestedAmount = entry.getValue();
+                    if (!genericAvail.containsKey(resource)) {
+                        LOG.warn("Resource: {} is not supported in this cluster. Ignoring this request.", resource);
+                    } else {
+                        genericAvail.put(resource, genericAvail.get(resource) - requestedAmount);
+                    }
+                }
+                allUserTopologies.add(td);
+            }
+        }
+        return allUserTopologies;
+    }
+
+    protected static class GrasSimulatedUser extends SimulatedUser {
+
+        // Extend support for Generic Resources in addition to CPU and Memory
+        public final Map<String, Double> guaranteedGenericResources;                // resource name : guaranteed amount
+        private Map<String, Double> assignedGenericResources = new HashMap<>();     // resource name : assigned amount
+
+        public GrasSimulatedUser(User other, ISchedulingState cluster) {
+            super(other, cluster);
+
+            Map<String, Double> guaranteedGenericResources = new HashMap<>();
+            // generic resource types that are offered
+            Set<String> availGenericResourceTypes = cluster.getClusterTotalGenericResources().keySet();
+            for (String resourceType : availGenericResourceTypes) {
+                Double guaranteedAmount = other.getGenericGuaranteed().getOrDefault(resourceType, 0.0);
+                guaranteedGenericResources.put(resourceType, guaranteedAmount);
+            }
+            this.guaranteedGenericResources = guaranteedGenericResources;
+        }
+
+        @Override
+        public TopologyDetails simScheduleNextHighest() {
+            TopologyDetails td = super.simScheduleNextHighest();
+            Map<String, Double> tdRequestedGenericResource = td.getTotalRequestedGenericResources();
+            for (Map.Entry<String, Double> entry : tdRequestedGenericResource.entrySet()) {
+                String resource = entry.getKey();
+                Double requestedAmount = entry.getValue();
+                assignedGenericResources.put(resource, assignedGenericResources.getOrDefault(resource, 0.0) + requestedAmount);
+            }
+            return td;
+        }
+
+        /**
+         * Get a score for the simulated user.  This is used to sort the users, by their highest priority topology.
+         * Only give user guarantees that will not exceed cluster capacity.
+         * Score of each resource type is calculated as: (Requested + Assigned - Guaranteed)/clusterAvailable
+         * The final score is a max over all resource types.
+         * Topology score will fall into the following intervals if:
+         *      User is under quota (guarantee):                    [(-guarantee)/available : 0]
+         *      User is over quota:                                 (0, infinity)
+         * Unfortunately, score below 0 does not guarantee that the topology will be scheduled due to resources fragmentation.
+         * @param availableCpu available CPU on the cluster.
+         * @param availableMemory available memory on the cluster.
+         * @param availableGenericResources available generic resources (other that cpu and memory) in cluster
+         * @param td the topology we are looking at.
+         * @return the score.
+         */
+        protected double getScore(double availableCpu, double availableMemory,
+                                  Map<String, Double> availableGenericResources, TopologyDetails td) {
+            // calculate scores for cpu and memory first
+            double ret = super.getScore(availableCpu, availableMemory, td);
+            if (ret == Double.MAX_VALUE) {
+                return ret;
+            }
+            Map<String, Double> tdTotalRequestedGeneric = td.getTotalRequestedGenericResources();
+            if (tdTotalRequestedGeneric == null) {
+                tdTotalRequestedGeneric = Collections.emptyMap();
+            }
+            for (Map.Entry<String, Double> entry : availableGenericResources.entrySet()) {
+                String resource = entry.getKey();
+                Double available = entry.getValue();
+                if (available <= 0) {
+                    return Double.MAX_VALUE;
+                }
+                Double wouldBeResource = assignedGenericResources.getOrDefault(resource, 0.0)
+                    + tdTotalRequestedGeneric.getOrDefault(resource, 0.0);
+                double thisScore = (wouldBeResource -  guaranteedGenericResources.getOrDefault(resource, 0.0)) / available;
+                ret = Math.max(ret, thisScore);
+            }
+
+            return ret;
+        }
+
+        protected double getScore(double availableCpu, double availableMemory, Map<String, Double> availableGenericResources) {
+            TopologyDetails td = getNextHighest();
 
 Review comment:
   Thank you - I figured that was why - just wanted to be sure I wasn't missing anything

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [storm] RuiLi8080 commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies

Posted by GitBox <gi...@apache.org>.
RuiLi8080 commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies
URL: https://github.com/apache/storm/pull/3213#discussion_r386471285
 
 

 ##########
 File path: storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
 ##########
 @@ -256,7 +256,8 @@ private void scheduleTopology(TopologyDetails td, Cluster cluster, final User to
         if (forTest) {
             return this.evictedTopologies;
         } else {
-            throw new Exception("Topology eviction check is only provided for test purposes");
+            throw new UnsupportedOperationException(
 
 Review comment:
   Thanks for catching. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [storm] Ethanlm commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies

Posted by GitBox <gi...@apache.org>.
Ethanlm commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies
URL: https://github.com/apache/storm/pull/3213#discussion_r406831958
 
 

 ##########
 File path: storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
 ##########
 @@ -248,6 +255,11 @@ private void scheduleTopology(TopologyDetails td, Cluster cluster, final User to
                     + topologySchedulingResources.getRemainingRequiredResourcesMessage());
     }
 
+    public Map<String, Set<String>> getEvictedTopologiesMap() {
 
 Review comment:
   I would add a comment like `This method is not stable. It's subject to change` since we don't really use it elsewhere besides unit test currently. It might change depending on how we want to use the evictedTopologies information. As @bipinprasad  said, we might want to put it into the scheduleResult in the future. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [storm] kishorvpatil commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies

Posted by GitBox <gi...@apache.org>.
kishorvpatil commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies
URL: https://github.com/apache/storm/pull/3213#discussion_r387631007
 
 

 ##########
 File path: storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/priority/GenericResourceAwareSchedulingPriorityStrategy.java
 ##########
 @@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource.strategies.priority;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.storm.scheduler.ISchedulingState;
+import org.apache.storm.scheduler.TopologyDetails;
+import org.apache.storm.scheduler.resource.User;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class GenericResourceAwareSchedulingPriorityStrategy extends DefaultSchedulingPriorityStrategy {
+    private static final Logger LOG = LoggerFactory.getLogger(GenericResourceAwareSchedulingPriorityStrategy.class);
+
+    @Override
+    protected GrasSimulatedUser getSimulatedUserFor(User u, ISchedulingState cluster) {
+        return new GrasSimulatedUser(u, cluster);
+    }
+
+    @Override
+    public List<TopologyDetails> getOrderedTopologies(ISchedulingState cluster, Map<String, User> userMap) {
+        double cpuAvail = cluster.getClusterTotalCpuResource();
+        double memAvail = cluster.getClusterTotalMemoryResource();
+        Map<String, Double> genericAvail = cluster.getClusterTotalGenericResource();
+
+        List<TopologyDetails> allUserTopologies = new ArrayList<>();
+        List<GrasSimulatedUser> users = new ArrayList<>();
+
+        for (User u : userMap.values()) {
+            users.add(getSimulatedUserFor(u, cluster));
+        }
+
+        while (!users.isEmpty()) {
+            Collections.sort(users, new GrasSimulatedUserComparator(cpuAvail, memAvail, genericAvail));
+            GrasSimulatedUser u = users.get(0);
+            TopologyDetails td = u.getNextHighest();
+            if (td == null) {
+                users.remove(0);
+            } else {
+                double score = u.getScore(cpuAvail, memAvail, genericAvail);
+                td = u.simScheduleNextHighest();
+                LOG.info("GRAS SIM Scheduling {} with score of {}", td.getId(), score);
+                cpuAvail -= td.getTotalRequestedCpu();
+                memAvail -= (td.getTotalRequestedMemOffHeap() + td.getTotalRequestedMemOnHeap());
+                for (Map.Entry<String, Double> entry : td.getTotalRequestedGenericResources().entrySet()) {
+                    String resource = entry.getKey();
+                    Double requestedAmount = entry.getValue();
+                    if (!genericAvail.containsKey(resource)) {
+                        LOG.warn("Resource: {} is not supported in this cluster. Ignoring this request.", resource);
+                    } else {
+                        genericAvail.put(resource, genericAvail.get(resource) - requestedAmount);
+                    }
+                }
+                allUserTopologies.add(td);
+            }
+        }
+        return allUserTopologies;
+    }
+
+    protected static class GrasSimulatedUser extends SimulatedUser {
+
+        // Extend support for Generic Resources in addition to CPU and Memory
+        public final Map<String, Double> guaranteedGenericResources;                // resource name : guaranteed amount
+        private Map<String, Double> assignedGenericResources = new HashMap<>();     // resource name : assigned amount
+
+        public GrasSimulatedUser(User other, ISchedulingState cluster) {
+            super(other, cluster);
+
+            Map<String, Double> guaranteedGenericResources = new HashMap<>();
+            Map<String, Double> availGenericResources = cluster.getClusterTotalGenericResource();   // generic resources that are offered
 
 Review comment:
   Instead of using `getClusterTotalGenericResource` here, probably adding below `getClusterTotalGenericResourceType` to cluster would help avoid calculations..
   It would looks something like:
   ```    @Override
       public Set<String> getClusterGenericResourceTypes() {
           Set<String> resourceTypes =
               supervisors.values().parallelStream().map(sup -> sup.getTotalGenericResources().keySet()).flatMap(Set::stream).collect(Collectors.toSet());
           return resourceTypes;
       }```
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [storm] Ethanlm commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies

Posted by GitBox <gi...@apache.org>.
Ethanlm commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies
URL: https://github.com/apache/storm/pull/3213#discussion_r384780999
 
 

 ##########
 File path: storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/priority/TestGenericResourceAwareSchedulingPriorityStrategy.java
 ##########
 @@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource.strategies.priority;
+
+import org.apache.storm.Config;
+import org.apache.storm.DaemonConfig;
+import org.apache.storm.metric.StormMetricsRegistry;
+import org.apache.storm.scheduler.Cluster;
+import org.apache.storm.scheduler.INimbus;
+import org.apache.storm.scheduler.IScheduler;
+import org.apache.storm.scheduler.SupervisorDetails;
+import org.apache.storm.scheduler.Topologies;
+import org.apache.storm.scheduler.resource.ResourceAwareScheduler;
+import org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler;
+import org.apache.storm.scheduler.resource.normalization.ResourceMetrics;
+import org.apache.storm.scheduler.resource.strategies.eviction.TestDefaultEvictionStrategy;
+import org.apache.storm.utils.Time;
+import org.junit.After;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.addTopologies;
+import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.assertTopologiesBeenEvicted;
+import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.assertTopologiesFullyScheduled;
+import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.assertTopologiesNotBeenEvicted;
+import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.assertTopologiesNotScheduled;
+import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.createGrasClusterConfig;
+import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.genSupervisors;
+import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.genTopology;
+import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.userRes;
+import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.userResourcePool;
+
+public class TestGenericResourceAwareSchedulingPriorityStrategy {
+
+    private static final Logger LOG = LoggerFactory.getLogger(TestDefaultEvictionStrategy.class);
+    private static int currentTime = Time.currentTimeSecs();
+    private static IScheduler scheduler = null;
+
+    @After
+    public void cleanup() {
+        if (scheduler != null) {
+            scheduler.cleanup();
+            scheduler = null;
+        }
+    }
+
+    /*
+     * DefaultSchedulingPriorityStrategy will not evict topo as long as the resources request can be met
+     *
+     *  Ethan asks for heavy cpu and memory while Rui asks for little cpu and memory but heave generic resource
 
 Review comment:
   `heave` -> `heavy` 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [storm] RuiLi8080 commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies

Posted by GitBox <gi...@apache.org>.
RuiLi8080 commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies
URL: https://github.com/apache/storm/pull/3213#discussion_r406862799
 
 

 ##########
 File path: storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
 ##########
 @@ -115,6 +122,7 @@ public void schedule(Topologies topologies, Cluster cluster) {
                 scheduleTopology(td, cluster, submitter, orderedTopologies);
             }
         }
+        evictedTopologiesMap = tmpEvictedTopologiesMap;
 
 Review comment:
   Fixed on above comments.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [storm] RuiLi8080 commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies

Posted by GitBox <gi...@apache.org>.
RuiLi8080 commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies
URL: https://github.com/apache/storm/pull/3213#discussion_r406862614
 
 

 ##########
 File path: storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
 ##########
 @@ -196,21 +204,18 @@ private void scheduleTopology(TopologyDetails td, Cluster cluster, final User to
                     } else if (result.getStatus() == SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES) {
                         LOG.debug("Not enough resources to schedule {}", td.getName());
                         List<TopologyDetails> reversedList = ImmutableList.copyOf(orderedTopologies).reverse();
-                        boolean evictedSomething = false;
-                        LOG.debug("attempting to make space for topo {} from user {}", td.getName(), td.getTopologySubmitter());
+                        LOG.debug("Attempting to make space for topo {} from user {}", td.getName(), td.getTopologySubmitter());
                         int tdIndex = reversedList.indexOf(td);
                         topologySchedulingResources.setRemainingRequiredResources(toSchedule, td);
 
+                        Set<String> tmpEvictedTopos = new HashSet<>();
                         for (int index = 0; index < tdIndex; index++) {
                             TopologyDetails topologyEvict = reversedList.get(index);
                             SchedulerAssignment evictAssignemnt = workingState.getAssignmentById(topologyEvict.getId());
                             if (evictAssignemnt != null && !evictAssignemnt.getSlots().isEmpty()) {
-                                Collection<WorkerSlot> workersToEvict = workingState.getUsedSlotsByTopologyId(topologyEvict.getId());
                                 topologySchedulingResources.adjustResourcesForEvictedTopology(toSchedule, topologyEvict);
-
-                                LOG.debug("Evicting Topology {} with workers: {} from user {}", topologyEvict.getName(), workersToEvict,
-                                    topologyEvict.getTopologySubmitter());
-                                evictedSomething = true;
+                                tmpEvictedTopos.add(topologyEvict.getId());
 
 Review comment:
   Thanks for catching. This is my mistake. Made changes that 
   1. new a hashmap for tmpEvictedTopos  at every round of scheduling.
   2. combine evicted topologies at each attempt into tmpEvictedTopologiesMap

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [storm] Ethanlm commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies

Posted by GitBox <gi...@apache.org>.
Ethanlm commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies
URL: https://github.com/apache/storm/pull/3213#discussion_r406921115
 
 

 ##########
 File path: storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
 ##########
 @@ -196,21 +204,18 @@ private void scheduleTopology(TopologyDetails td, Cluster cluster, final User to
                     } else if (result.getStatus() == SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES) {
                         LOG.debug("Not enough resources to schedule {}", td.getName());
                         List<TopologyDetails> reversedList = ImmutableList.copyOf(orderedTopologies).reverse();
-                        boolean evictedSomething = false;
-                        LOG.debug("attempting to make space for topo {} from user {}", td.getName(), td.getTopologySubmitter());
+                        LOG.debug("Attempting to make space for topo {} from user {}", td.getName(), td.getTopologySubmitter());
                         int tdIndex = reversedList.indexOf(td);
                         topologySchedulingResources.setRemainingRequiredResources(toSchedule, td);
 
+                        Set<String> tmpEvictedTopos = new HashSet<>();
                         for (int index = 0; index < tdIndex; index++) {
                             TopologyDetails topologyEvict = reversedList.get(index);
                             SchedulerAssignment evictAssignemnt = workingState.getAssignmentById(topologyEvict.getId());
                             if (evictAssignemnt != null && !evictAssignemnt.getSlots().isEmpty()) {
-                                Collection<WorkerSlot> workersToEvict = workingState.getUsedSlotsByTopologyId(topologyEvict.getId());
                                 topologySchedulingResources.adjustResourcesForEvictedTopology(toSchedule, topologyEvict);
-
-                                LOG.debug("Evicting Topology {} with workers: {} from user {}", topologyEvict.getName(), workersToEvict,
 
 Review comment:
   Makes sense

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [storm] bipinprasad commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies

Posted by GitBox <gi...@apache.org>.
bipinprasad commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies
URL: https://github.com/apache/storm/pull/3213#discussion_r394769759
 
 

 ##########
 File path: storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/priority/GenericResourceAwareSchedulingPriorityStrategy.java
 ##########
 @@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource.strategies.priority;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.storm.scheduler.ISchedulingState;
+import org.apache.storm.scheduler.TopologyDetails;
+import org.apache.storm.scheduler.resource.User;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class GenericResourceAwareSchedulingPriorityStrategy extends DefaultSchedulingPriorityStrategy {
+    private static final Logger LOG = LoggerFactory.getLogger(GenericResourceAwareSchedulingPriorityStrategy.class);
+
+    @Override
+    protected GrasSimulatedUser getSimulatedUserFor(User u, ISchedulingState cluster) {
+        return new GrasSimulatedUser(u, cluster);
+    }
+
+    @Override
+    public List<TopologyDetails> getOrderedTopologies(ISchedulingState cluster, Map<String, User> userMap) {
+        double cpuAvail = cluster.getClusterTotalCpuResource();
+        double memAvail = cluster.getClusterTotalMemoryResource();
+        Map<String, Double> genericAvail = cluster.getClusterTotalGenericResources();
+
+        List<TopologyDetails> allUserTopologies = new ArrayList<>();
+        List<GrasSimulatedUser> users = new ArrayList<>();
+
+        for (User u : userMap.values()) {
+            users.add(getSimulatedUserFor(u, cluster));
+        }
+
+        while (!users.isEmpty()) {
+            Collections.sort(users, new GrasSimulatedUserComparator(cpuAvail, memAvail, genericAvail));
+            GrasSimulatedUser u = users.get(0);
+            TopologyDetails td = u.getNextHighest();
+            if (td == null) {
+                users.remove(0);
+            } else {
+                double score = u.getScore(cpuAvail, memAvail, genericAvail);
+                td = u.simScheduleNextHighest();
+                LOG.info("GRAS SIM Scheduling {} with score of {}", td.getId(), score);
+                cpuAvail -= td.getTotalRequestedCpu();
+                memAvail -= (td.getTotalRequestedMemOffHeap() + td.getTotalRequestedMemOnHeap());
+                for (Map.Entry<String, Double> entry : td.getTotalRequestedGenericResources().entrySet()) {
+                    String resource = entry.getKey();
+                    Double requestedAmount = entry.getValue();
+                    if (!genericAvail.containsKey(resource)) {
+                        LOG.warn("Resource: {} is not supported in this cluster. Ignoring this request.", resource);
+                    } else {
+                        genericAvail.put(resource, genericAvail.get(resource) - requestedAmount);
+                    }
+                }
+                allUserTopologies.add(td);
+            }
+        }
+        return allUserTopologies;
+    }
+
+    protected static class GrasSimulatedUser extends SimulatedUser {
+
+        // Extend support for Generic Resources in addition to CPU and Memory
+        public final Map<String, Double> guaranteedGenericResources;                // resource name : guaranteed amount
+        private Map<String, Double> assignedGenericResources = new HashMap<>();     // resource name : assigned amount
+
+        public GrasSimulatedUser(User other, ISchedulingState cluster) {
+            super(other, cluster);
+
+            Map<String, Double> guaranteedGenericResources = new HashMap<>();
+            // generic resource types that are offered
+            Set<String> availGenericResourceTypes = cluster.getClusterTotalGenericResources().keySet();
+            for (String resourceType : availGenericResourceTypes) {
+                Double guaranteedAmount = other.getGenericGuaranteed().getOrDefault(resourceType, 0.0);
+                guaranteedGenericResources.put(resourceType, guaranteedAmount);
+            }
+            this.guaranteedGenericResources = guaranteedGenericResources;
+        }
+
+        @Override
+        public TopologyDetails simScheduleNextHighest() {
+            TopologyDetails td = super.simScheduleNextHighest();
+            Map<String, Double> tdRequestedGenericResource = td.getTotalRequestedGenericResources();
+            for (Map.Entry<String, Double> entry : tdRequestedGenericResource.entrySet()) {
+                String resource = entry.getKey();
+                Double requestedAmount = entry.getValue();
+                assignedGenericResources.put(resource, assignedGenericResources.getOrDefault(resource, 0.0) + requestedAmount);
+            }
+            return td;
+        }
+
+        /**
+         * Get a score for the simulated user.  This is used to sort the users, by their highest priority topology.
+         * Only give user guarantees that will not exceed cluster capacity.
+         * Score of each resource type is calculated as: (Requested + Assigned - Guaranteed)/clusterAvailable
+         * The final score is a max over all resource types.
+         * Topology score will fall into the following intervals if:
+         *      User is under quota (guarantee):                    [(-guarantee)/available : 0]
+         *      User is over quota:                                 (0, infinity)
+         * Unfortunately, score below 0 does not guarantee that the topology will be scheduled due to resources fragmentation.
+         * @param availableCpu available CPU on the cluster.
+         * @param availableMemory available memory on the cluster.
+         * @param availableGenericResources available generic resources (other that cpu and memory) in cluster
+         * @param td the topology we are looking at.
+         * @return the score.
+         */
+        protected double getScore(double availableCpu, double availableMemory,
+                                  Map<String, Double> availableGenericResources, TopologyDetails td) {
+            // calculate scores for cpu and memory first
+            double ret = super.getScore(availableCpu, availableMemory, td);
+            if (ret == Double.MAX_VALUE) {
+                return ret;
+            }
+            Map<String, Double> tdTotalRequestedGeneric = td.getTotalRequestedGenericResources();
+            if (tdTotalRequestedGeneric == null) {
+                tdTotalRequestedGeneric = Collections.emptyMap();
+            }
+            for (Map.Entry<String, Double> entry : availableGenericResources.entrySet()) {
+                String resource = entry.getKey();
+                Double available = entry.getValue();
+                if (available <= 0) {
+                    return Double.MAX_VALUE;
+                }
+                Double wouldBeResource = assignedGenericResources.getOrDefault(resource, 0.0)
+                    + tdTotalRequestedGeneric.getOrDefault(resource, 0.0);
+                double thisScore = (wouldBeResource -  guaranteedGenericResources.getOrDefault(resource, 0.0)) / available;
+                ret = Math.max(ret, thisScore);
+            }
+
+            return ret;
+        }
+
+        protected double getScore(double availableCpu, double availableMemory, Map<String, Double> availableGenericResources) {
+            TopologyDetails td = getNextHighest();
+            return getScore(availableCpu, availableMemory, availableGenericResources, td);
+        }
+    }
+
+    private static class GrasSimulatedUserComparator implements Comparator<GrasSimulatedUser> {
+        private final double cpuAvail;
+        private final double memAvail;
+        private final Map<String, Double> genericAvail;
+
+        private GrasSimulatedUserComparator(double cpuAvail, double memAvail, Map<String, Double> genericAvail) {
+            this.cpuAvail = cpuAvail;
+            this.memAvail = memAvail;
+            this.genericAvail = genericAvail;
+        }
 
 Review comment:
   I think this can be removed. Equivalent to Comparator<GrasSimulatedUser> c = (o1,o2) -> Double.compare(o1.getScore(cpuAvail, memAvail, genericAvail), o2.getScore(cpuAvail, memAvail, genericAvail));

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [storm] Ethanlm commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies

Posted by GitBox <gi...@apache.org>.
Ethanlm commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies
URL: https://github.com/apache/storm/pull/3213#discussion_r384803453
 
 

 ##########
 File path: storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
 ##########
 @@ -83,6 +88,10 @@ public void addSelfTo(Map<String, Map<String, Number>> fullPool) {
         }
     }
 
+    public static TestUserResources userRes(String name, Map<String, Double> resources) {
+        return new TestUserResources(name, resources);
 
 Review comment:
   This reminds me that we don't have a unit test to test the case where users have the generic resource guarantee. But this can be a follow up work

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [storm] bipinprasad commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies

Posted by GitBox <gi...@apache.org>.
bipinprasad commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies
URL: https://github.com/apache/storm/pull/3213#discussion_r394769759
 
 

 ##########
 File path: storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/priority/GenericResourceAwareSchedulingPriorityStrategy.java
 ##########
 @@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource.strategies.priority;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.storm.scheduler.ISchedulingState;
+import org.apache.storm.scheduler.TopologyDetails;
+import org.apache.storm.scheduler.resource.User;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class GenericResourceAwareSchedulingPriorityStrategy extends DefaultSchedulingPriorityStrategy {
+    private static final Logger LOG = LoggerFactory.getLogger(GenericResourceAwareSchedulingPriorityStrategy.class);
+
+    @Override
+    protected GrasSimulatedUser getSimulatedUserFor(User u, ISchedulingState cluster) {
+        return new GrasSimulatedUser(u, cluster);
+    }
+
+    @Override
+    public List<TopologyDetails> getOrderedTopologies(ISchedulingState cluster, Map<String, User> userMap) {
+        double cpuAvail = cluster.getClusterTotalCpuResource();
+        double memAvail = cluster.getClusterTotalMemoryResource();
+        Map<String, Double> genericAvail = cluster.getClusterTotalGenericResources();
+
+        List<TopologyDetails> allUserTopologies = new ArrayList<>();
+        List<GrasSimulatedUser> users = new ArrayList<>();
+
+        for (User u : userMap.values()) {
+            users.add(getSimulatedUserFor(u, cluster));
+        }
+
+        while (!users.isEmpty()) {
+            Collections.sort(users, new GrasSimulatedUserComparator(cpuAvail, memAvail, genericAvail));
+            GrasSimulatedUser u = users.get(0);
+            TopologyDetails td = u.getNextHighest();
+            if (td == null) {
+                users.remove(0);
+            } else {
+                double score = u.getScore(cpuAvail, memAvail, genericAvail);
+                td = u.simScheduleNextHighest();
+                LOG.info("GRAS SIM Scheduling {} with score of {}", td.getId(), score);
+                cpuAvail -= td.getTotalRequestedCpu();
+                memAvail -= (td.getTotalRequestedMemOffHeap() + td.getTotalRequestedMemOnHeap());
+                for (Map.Entry<String, Double> entry : td.getTotalRequestedGenericResources().entrySet()) {
+                    String resource = entry.getKey();
+                    Double requestedAmount = entry.getValue();
+                    if (!genericAvail.containsKey(resource)) {
+                        LOG.warn("Resource: {} is not supported in this cluster. Ignoring this request.", resource);
+                    } else {
+                        genericAvail.put(resource, genericAvail.get(resource) - requestedAmount);
+                    }
+                }
+                allUserTopologies.add(td);
+            }
+        }
+        return allUserTopologies;
+    }
+
+    protected static class GrasSimulatedUser extends SimulatedUser {
+
+        // Extend support for Generic Resources in addition to CPU and Memory
+        public final Map<String, Double> guaranteedGenericResources;                // resource name : guaranteed amount
+        private Map<String, Double> assignedGenericResources = new HashMap<>();     // resource name : assigned amount
+
+        public GrasSimulatedUser(User other, ISchedulingState cluster) {
+            super(other, cluster);
+
+            Map<String, Double> guaranteedGenericResources = new HashMap<>();
+            // generic resource types that are offered
+            Set<String> availGenericResourceTypes = cluster.getClusterTotalGenericResources().keySet();
+            for (String resourceType : availGenericResourceTypes) {
+                Double guaranteedAmount = other.getGenericGuaranteed().getOrDefault(resourceType, 0.0);
+                guaranteedGenericResources.put(resourceType, guaranteedAmount);
+            }
+            this.guaranteedGenericResources = guaranteedGenericResources;
+        }
+
+        @Override
+        public TopologyDetails simScheduleNextHighest() {
+            TopologyDetails td = super.simScheduleNextHighest();
+            Map<String, Double> tdRequestedGenericResource = td.getTotalRequestedGenericResources();
+            for (Map.Entry<String, Double> entry : tdRequestedGenericResource.entrySet()) {
+                String resource = entry.getKey();
+                Double requestedAmount = entry.getValue();
+                assignedGenericResources.put(resource, assignedGenericResources.getOrDefault(resource, 0.0) + requestedAmount);
+            }
+            return td;
+        }
+
+        /**
+         * Get a score for the simulated user.  This is used to sort the users, by their highest priority topology.
+         * Only give user guarantees that will not exceed cluster capacity.
+         * Score of each resource type is calculated as: (Requested + Assigned - Guaranteed)/clusterAvailable
+         * The final score is a max over all resource types.
+         * Topology score will fall into the following intervals if:
+         *      User is under quota (guarantee):                    [(-guarantee)/available : 0]
+         *      User is over quota:                                 (0, infinity)
+         * Unfortunately, score below 0 does not guarantee that the topology will be scheduled due to resources fragmentation.
+         * @param availableCpu available CPU on the cluster.
+         * @param availableMemory available memory on the cluster.
+         * @param availableGenericResources available generic resources (other that cpu and memory) in cluster
+         * @param td the topology we are looking at.
+         * @return the score.
+         */
+        protected double getScore(double availableCpu, double availableMemory,
+                                  Map<String, Double> availableGenericResources, TopologyDetails td) {
+            // calculate scores for cpu and memory first
+            double ret = super.getScore(availableCpu, availableMemory, td);
+            if (ret == Double.MAX_VALUE) {
+                return ret;
+            }
+            Map<String, Double> tdTotalRequestedGeneric = td.getTotalRequestedGenericResources();
+            if (tdTotalRequestedGeneric == null) {
+                tdTotalRequestedGeneric = Collections.emptyMap();
+            }
+            for (Map.Entry<String, Double> entry : availableGenericResources.entrySet()) {
+                String resource = entry.getKey();
+                Double available = entry.getValue();
+                if (available <= 0) {
+                    return Double.MAX_VALUE;
+                }
+                Double wouldBeResource = assignedGenericResources.getOrDefault(resource, 0.0)
+                    + tdTotalRequestedGeneric.getOrDefault(resource, 0.0);
+                double thisScore = (wouldBeResource -  guaranteedGenericResources.getOrDefault(resource, 0.0)) / available;
+                ret = Math.max(ret, thisScore);
+            }
+
+            return ret;
+        }
+
+        protected double getScore(double availableCpu, double availableMemory, Map<String, Double> availableGenericResources) {
+            TopologyDetails td = getNextHighest();
+            return getScore(availableCpu, availableMemory, availableGenericResources, td);
+        }
+    }
+
+    private static class GrasSimulatedUserComparator implements Comparator<GrasSimulatedUser> {
+        private final double cpuAvail;
+        private final double memAvail;
+        private final Map<String, Double> genericAvail;
+
+        private GrasSimulatedUserComparator(double cpuAvail, double memAvail, Map<String, Double> genericAvail) {
+            this.cpuAvail = cpuAvail;
+            this.memAvail = memAvail;
+            this.genericAvail = genericAvail;
+        }
 
 Review comment:
   I think this can be removed. Equivalent to Comparator<GrasSimulatedUser> c = (o1,o2) -> Double.compare(o1.getScore(cpuAvail, memAvail, genericAvail), o2.getScore(cpuAvail, memAvail, genericAvail));

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [storm] RuiLi8080 commented on issue #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies

Posted by GitBox <gi...@apache.org>.
RuiLi8080 commented on issue #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies
URL: https://github.com/apache/storm/pull/3213#issuecomment-601342635
 
 
   @kishorvpatil I think your point makes sense. But I was worried that maintaining the information of evicted topos will add unnecessary computation and also flood logs with evicted topoids since it is calculated at every round of scheduling. I made a small change so we only do this when testing or debugging, not sure if this can address your point.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [storm] RuiLi8080 commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies

Posted by GitBox <gi...@apache.org>.
RuiLi8080 commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies
URL: https://github.com/apache/storm/pull/3213#discussion_r393988865
 
 

 ##########
 File path: storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
 ##########
 @@ -80,6 +80,13 @@
     private final Map<String, Map<String, Double>> nodeToScheduledOffHeapNodeMemoryCache;   // node -> topologyId -> double
     private final Map<String, Set<WorkerSlot>> nodeToUsedSlotsCache;
     private final Map<String, NormalizedResourceRequest> totalResourcesPerNodeCache = new HashMap<>();
+    /**
+     * Snapshot of cluster total resources (cpu, memory, generic).
+     */
+    private final double totalCpuResource;
+    private final double totalMemoryResource;
+    private final Map<String, Double> totalGenericResources;
 
 Review comment:
   Fixed

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [storm] RuiLi8080 commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies

Posted by GitBox <gi...@apache.org>.
RuiLi8080 commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies
URL: https://github.com/apache/storm/pull/3213#discussion_r387823684
 
 

 ##########
 File path: storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/priority/GenericResourceAwareSchedulingPriorityStrategy.java
 ##########
 @@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource.strategies.priority;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.storm.scheduler.ISchedulingState;
+import org.apache.storm.scheduler.TopologyDetails;
+import org.apache.storm.scheduler.resource.User;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class GenericResourceAwareSchedulingPriorityStrategy extends DefaultSchedulingPriorityStrategy {
+    private static final Logger LOG = LoggerFactory.getLogger(GenericResourceAwareSchedulingPriorityStrategy.class);
+
+    @Override
+    protected GrasSimulatedUser getSimulatedUserFor(User u, ISchedulingState cluster) {
+        return new GrasSimulatedUser(u, cluster);
+    }
+
+    @Override
+    public List<TopologyDetails> getOrderedTopologies(ISchedulingState cluster, Map<String, User> userMap) {
+        double cpuAvail = cluster.getClusterTotalCpuResource();
+        double memAvail = cluster.getClusterTotalMemoryResource();
+        Map<String, Double> genericAvail = cluster.getClusterTotalGenericResource();
+
+        List<TopologyDetails> allUserTopologies = new ArrayList<>();
+        List<GrasSimulatedUser> users = new ArrayList<>();
+
+        for (User u : userMap.values()) {
+            users.add(getSimulatedUserFor(u, cluster));
+        }
+
+        while (!users.isEmpty()) {
+            Collections.sort(users, new GrasSimulatedUserComparator(cpuAvail, memAvail, genericAvail));
+            GrasSimulatedUser u = users.get(0);
+            TopologyDetails td = u.getNextHighest();
+            if (td == null) {
+                users.remove(0);
+            } else {
+                double score = u.getScore(cpuAvail, memAvail, genericAvail);
+                td = u.simScheduleNextHighest();
+                LOG.info("GRAS SIM Scheduling {} with score of {}", td.getId(), score);
+                cpuAvail -= td.getTotalRequestedCpu();
+                memAvail -= (td.getTotalRequestedMemOffHeap() + td.getTotalRequestedMemOnHeap());
+                for (Map.Entry<String, Double> entry : td.getTotalRequestedGenericResources().entrySet()) {
+                    String resource = entry.getKey();
+                    Double requestedAmount = entry.getValue();
+                    if (!genericAvail.containsKey(resource)) {
+                        LOG.warn("Resource: {} is not supported in this cluster. Ignoring this request.", resource);
+                    } else {
+                        genericAvail.put(resource, genericAvail.get(resource) - requestedAmount);
+                    }
+                }
+                allUserTopologies.add(td);
+            }
+        }
+        return allUserTopologies;
+    }
+
+    protected static class GrasSimulatedUser extends SimulatedUser {
+
+        // Extend support for Generic Resources in addition to CPU and Memory
+        public final Map<String, Double> guaranteedGenericResources;                // resource name : guaranteed amount
+        private Map<String, Double> assignedGenericResources = new HashMap<>();     // resource name : assigned amount
+
+        public GrasSimulatedUser(User other, ISchedulingState cluster) {
+            super(other, cluster);
+
+            Map<String, Double> guaranteedGenericResources = new HashMap<>();
+            Map<String, Double> availGenericResources = cluster.getClusterTotalGenericResource();   // generic resources that are offered
 
 Review comment:
   Added

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [storm] RuiLi8080 commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies

Posted by GitBox <gi...@apache.org>.
RuiLi8080 commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies
URL: https://github.com/apache/storm/pull/3213#discussion_r395229049
 
 

 ##########
 File path: storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
 ##########
 @@ -56,6 +59,10 @@
     private int schedulingTimeoutSeconds;
     private ExecutorService backgroundScheduling;
 
+    // record evicted topologies on each scheduling round, only used in test purpose now
+    private boolean forTest = false;
+    private Set<String> evictedTopologies = new HashSet<>();
+
 
 Review comment:
   Unfortunately no, I have to add this to check whether topo has been evicted since in some scenarios, evicted topo would be put back in the same round of scheduling. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [storm] bipinprasad commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies

Posted by GitBox <gi...@apache.org>.
bipinprasad commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies
URL: https://github.com/apache/storm/pull/3213#discussion_r405438903
 
 

 ##########
 File path: storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
 ##########
 @@ -239,6 +246,10 @@ private void scheduleTopology(TopologyDetails td, Cluster cluster, final User to
         markFailedTopology(topologySubmitter, cluster, td, "Failed to schedule within " + maxSchedulingAttempts + " attempts");
     }
 
+    public Set<String> getEvictedTopologies() {
 
 Review comment:
   As a member of SchedulingResult this information can be more useful.
   But looks like the intent is to keep track of topologies evicted as part of each topo scheduled, in which case we need a map i.e. topoId -> toposEvicted

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [storm] govind-menon commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies

Posted by GitBox <gi...@apache.org>.
govind-menon commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies
URL: https://github.com/apache/storm/pull/3213#discussion_r407590612
 
 

 ##########
 File path: storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
 ##########
 @@ -873,20 +897,38 @@ public NormalizedResourceOffer getNonBlacklistedClusterAvailableResources(Collec
 
     @Override
     public double getClusterTotalCpuResource() {
-        double sum = 0.0;
-        for (SupervisorDetails sup : supervisors.values()) {
-            sum += sup.getTotalCpu();
-        }
-        return sum;
+        return this.totalCpuResource;
+    }
+
+    private double computeClusterCpuResource() {
+        return supervisors.values().stream()
+            .mapToDouble(SupervisorDetails::getTotalCpu)
+            .sum();
+
     }
 
     @Override
     public double getClusterTotalMemoryResource() {
-        double sum = 0.0;
-        for (SupervisorDetails sup : supervisors.values()) {
-            sum += sup.getTotalMemory();
-        }
-        return sum;
+        return this.totalMemoryResource;
+    }
+
+
+    private double computeClusterMemoryResource() {
+        return supervisors.values().stream()
+            .mapToDouble(SupervisorDetails::getTotalMemory)
+            .sum();
+    }
+
+    @Override
+    public Map<String, Double> getClusterTotalGenericResources() {
+        return this.totalGenericResources;
+    }
+
+    private Map<String, Double> computeClusterGenericResources() {
+        return supervisors.values().stream()
+            .map(sup -> sup.getTotalGenericResources().entrySet())
+            .flatMap(Set::stream)
+            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, Double::sum));
 
 Review comment:
   Nit: Possibly use https://www.baeldung.com/java-merge-maps#of to simplify this a little.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [storm] Ethanlm commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies

Posted by GitBox <gi...@apache.org>.
Ethanlm commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies
URL: https://github.com/apache/storm/pull/3213#discussion_r386041992
 
 

 ##########
 File path: storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
 ##########
 @@ -248,8 +252,17 @@ private void scheduleTopology(TopologyDetails td, Cluster cluster, final User to
     }
 
     @VisibleForTesting
-    public Set<String> getEvictedTopologies() {
-        return this.evictedTopologies;
+    public Set<String> getEvictedTopologies() throws Exception {
+        if (forTest) {
+            return this.evictedTopologies;
+        } else {
+            throw new Exception("Topology eviction check is only provided for test purposes");
 
 Review comment:
   This can be `UnsupportedOperationException`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [storm] govind-menon commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies

Posted by GitBox <gi...@apache.org>.
govind-menon commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies
URL: https://github.com/apache/storm/pull/3213#discussion_r407601346
 
 

 ##########
 File path: storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
 ##########
 @@ -248,6 +255,11 @@ private void scheduleTopology(TopologyDetails td, Cluster cluster, final User to
                     + topologySchedulingResources.getRemainingRequiredResourcesMessage());
     }
 
+    public Map<String, Set<String>> getEvictedTopologiesMap() {
 
 Review comment:
   Regarding my previous question - in this case it makes sense because there are multiple scheduling attempts and you need the final result to keep it consistent but that is not the case with `evictedTopologiesMap ` and `tmpEvictedTopologiesMap ` - unless I'm missing something about my previous question?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [storm] kishorvpatil commented on issue #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies

Posted by GitBox <gi...@apache.org>.
kishorvpatil commented on issue #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies
URL: https://github.com/apache/storm/pull/3213#issuecomment-601254408
 
 
   @RuiLi8080 
   I would like us to have behavior what we test and test that check production behavior. We should expose any evictions as standard result of scheduling available for both production and tests instead of forTest kind of special code path executed only during tests.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [storm] bipinprasad commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies

Posted by GitBox <gi...@apache.org>.
bipinprasad commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies
URL: https://github.com/apache/storm/pull/3213#discussion_r405247393
 
 

 ##########
 File path: storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
 ##########
 @@ -239,6 +246,10 @@ private void scheduleTopology(TopologyDetails td, Cluster cluster, final User to
         markFailedTopology(topologySubmitter, cluster, td, "Failed to schedule within " + maxSchedulingAttempts + " attempts");
     }
 
+    public Set<String> getEvictedTopologies() {
 
 Review comment:
   In that case this method should return a copy "return new HashSet<>(this.evictedTopologies);"

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [storm] Ethanlm commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies

Posted by GitBox <gi...@apache.org>.
Ethanlm commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies
URL: https://github.com/apache/storm/pull/3213#discussion_r384785673
 
 

 ##########
 File path: storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/priority/TestGenericResourceAwareSchedulingPriorityStrategy.java
 ##########
 @@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource.strategies.priority;
+
+import org.apache.storm.Config;
+import org.apache.storm.DaemonConfig;
+import org.apache.storm.metric.StormMetricsRegistry;
+import org.apache.storm.scheduler.Cluster;
+import org.apache.storm.scheduler.INimbus;
+import org.apache.storm.scheduler.IScheduler;
+import org.apache.storm.scheduler.SupervisorDetails;
+import org.apache.storm.scheduler.Topologies;
+import org.apache.storm.scheduler.resource.ResourceAwareScheduler;
+import org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler;
+import org.apache.storm.scheduler.resource.normalization.ResourceMetrics;
+import org.apache.storm.scheduler.resource.strategies.eviction.TestDefaultEvictionStrategy;
+import org.apache.storm.utils.Time;
+import org.junit.After;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.addTopologies;
+import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.assertTopologiesBeenEvicted;
+import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.assertTopologiesFullyScheduled;
+import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.assertTopologiesNotBeenEvicted;
+import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.assertTopologiesNotScheduled;
+import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.createGrasClusterConfig;
+import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.genSupervisors;
+import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.genTopology;
+import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.userRes;
+import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.userResourcePool;
+
+public class TestGenericResourceAwareSchedulingPriorityStrategy {
+
+    private static final Logger LOG = LoggerFactory.getLogger(TestDefaultEvictionStrategy.class);
+    private static int currentTime = Time.currentTimeSecs();
+    private static IScheduler scheduler = null;
+
+    @After
+    public void cleanup() {
+        if (scheduler != null) {
+            scheduler.cleanup();
+            scheduler = null;
+        }
+    }
+
+    /*
+     * DefaultSchedulingPriorityStrategy will not evict topo as long as the resources request can be met
+     *
+     *  Ethan asks for heavy cpu and memory while Rui asks for little cpu and memory but heave generic resource
+     *  Since Rui's all types of resources request can be met, no eviction will happend
+    */
+    @Test
+    public void testDefaultSchedulingPriorityStrategyNotEvicting() {
+        Map<String, Double> requestedgenericResourcesMap = new HashMap<>();
+        requestedgenericResourcesMap.put("generic.resource.1", 40.0);
+        // Use full memory and cpu of the cluster capacity
+        Config ruiConf = createGrasClusterConfig(20, 50, 50, null, requestedgenericResourcesMap);
+        Config ethanConf = createGrasClusterConfig(80, 400, 500, null, Collections.emptyMap());
+        Topologies topologies = new Topologies(
+            genTopology("ethan-topo-1", ethanConf, 1, 0, 1, 0, currentTime - 2, 10, "ethan"),
+            genTopology("ethan-topo-2", ethanConf, 1, 0, 1, 0, currentTime - 2, 20, "ethan"),
+            genTopology("ethan-topo-3", ethanConf, 1, 0, 1, 0, currentTime - 2, 28, "ethan"),
+            genTopology("ethan-topo-4", ethanConf, 1, 0, 1, 0, currentTime - 2, 29, "ethan"));
+
+        Topologies withNewTopo = addTopologies(topologies,
+            genTopology("rui-topo-1", ruiConf, 1, 0, 4, 0, currentTime - 2, 10, "rui"));
+
+        Config config = mkClusterConfig(DefaultSchedulingPriorityStrategy.class.getName());
+        Cluster cluster = mkTestCluster(topologies, config);
+        scheduler = new ResourceAwareScheduler();
+        scheduler.prepare(config);
+        scheduler.schedule(topologies, cluster);
+
+        assertTopologiesFullyScheduled(cluster, "ethan-topo-1", "ethan-topo-2", "ethan-topo-3", "ethan-topo-4");
+
+
+        cluster = new Cluster(cluster, withNewTopo);
+        scheduler.schedule(withNewTopo, cluster);
+
+        assertTopologiesFullyScheduled(cluster, "ethan-topo-1", "ethan-topo-2", "ethan-topo-3", "ethan-topo-4");
+        assertTopologiesNotBeenEvicted(cluster, (ResourceAwareScheduler) scheduler, "ethan-topo-1", "ethan-topo-2", "ethan-topo-3", "ethan-topo-4");
+        assertTopologiesFullyScheduled(cluster, "rui-topo-1");
+    }
+
+    /*
+     * DefaultSchedulingPriorityStrategy does not take generic resources into account when calculating score
+     * So even if a user is requesting a lot of generic resources other than CPU and memory, scheduler will still score it very low and kick out other topologies
+     *
+     *  Ethan asks for medium cpu and memory while Rui asks for little cpu and memory but heave generic resource
+     *  However, Rui's generic request can not be met and default scoring system is not taking generic resources into account,
+     *  so the score of Rui's new topo will be much lower than all Ethan's topos'.
+     *  Then all Ethan's topo will be evicted in trying to make rooms for Rui.
+     */
+    @Test
+    public void testDefaultSchedulingPriorityStrategyEvicting() {
+        Map<String, Double> requestedgenericResourcesMap = new HashMap<>();
+        requestedgenericResourcesMap.put("generic.resource.1", 40.0);
+        Config ruiConf = createGrasClusterConfig(10, 10, 10, null, requestedgenericResourcesMap);
+        Config ethanConf = createGrasClusterConfig(60, 200, 300, null, Collections.emptyMap());
+        Topologies topologies = new Topologies(
+            genTopology("ethan-topo-1", ethanConf, 1, 0, 1, 0, currentTime - 2, 10, "ethan"),
+            genTopology("ethan-topo-2", ethanConf, 1, 0, 1, 0, currentTime - 2, 20, "ethan"),
+            genTopology("ethan-topo-3", ethanConf, 1, 0, 1, 0, currentTime - 2, 28, "ethan"),
+            genTopology("ethan-topo-4", ethanConf, 1, 0, 1, 0, currentTime - 2, 29, "ethan"));
+
+        Topologies withNewTopo = addTopologies(topologies,
+            genTopology("rui-topo-1", ruiConf, 1, 0, 5, 0, currentTime - 2, 10, "rui"));
+
+        Config config = mkClusterConfig(DefaultSchedulingPriorityStrategy.class.getName());
+        Cluster cluster = mkTestCluster(topologies, config);
+        scheduler = new ResourceAwareScheduler();
+        scheduler.prepare(config);
+        scheduler.schedule(topologies, cluster);
+
+        assertTopologiesFullyScheduled(cluster, "ethan-topo-1", "ethan-topo-2", "ethan-topo-3", "ethan-topo-4");
+
+
+        cluster = new Cluster(cluster, withNewTopo);
+        scheduler.schedule(withNewTopo, cluster);
+
+        assertTopologiesFullyScheduled(cluster, "ethan-topo-1", "ethan-topo-2", "ethan-topo-3", "ethan-topo-4");
+        assertTopologiesBeenEvicted(cluster, (ResourceAwareScheduler) scheduler, "ethan-topo-1", "ethan-topo-2", "ethan-topo-3", "ethan-topo-4");
+        assertTopologiesNotScheduled(cluster, "rui-topo-1");
+    }
+
+    /*
+     * GenericResourceAwareSchedulingPriorityStrategy extend scoring formula to accommodate generic resources
+     *
+     *   Same setting as the above test, but this time, new scoring system is taking generic resources into account,
 
 Review comment:
   For `as the above test` , maybe it's better to refer to the test case directly. If anyone moves the order of these unit tests, this comment can be confusing

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [storm] RuiLi8080 commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies

Posted by GitBox <gi...@apache.org>.
RuiLi8080 commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies
URL: https://github.com/apache/storm/pull/3213#discussion_r386442504
 
 

 ##########
 File path: storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
 ##########
 @@ -248,8 +252,17 @@ private void scheduleTopology(TopologyDetails td, Cluster cluster, final User to
     }
 
     @VisibleForTesting
-    public Set<String> getEvictedTopologies() {
-        return this.evictedTopologies;
+    public Set<String> getEvictedTopologies() throws Exception {
+        if (forTest) {
+            return this.evictedTopologies;
+        } else {
+            throw new Exception("Topology eviction check is only provided for test purposes");
 
 Review comment:
   Fixed. Thank you.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [storm] RuiLi8080 commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies

Posted by GitBox <gi...@apache.org>.
RuiLi8080 commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies
URL: https://github.com/apache/storm/pull/3213#discussion_r387816618
 
 

 ##########
 File path: storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
 ##########
 @@ -889,6 +889,20 @@ public double getClusterTotalMemoryResource() {
         return sum;
     }
 
+    @Override
+    public Map<String, Double> getClusterTotalGenericResource() {
+        Map<String, Double> ret = new HashMap<>();
+        for (SupervisorDetails sup : supervisors.values()) {
+            Map<String, Double> supGenericResources = sup.getTotalGenericResources();
+            for (Map.Entry<String, Double> entry : supGenericResources.entrySet()) {
+                String resourceName = entry.getKey();
+                Double amount = entry.getValue();
+                ret.put(resourceName, ret.getOrDefault(resourceName, 0.0) + amount);
 
 Review comment:
   Very good suggestions. Thank you and addressed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [storm] RuiLi8080 commented on issue #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies

Posted by GitBox <gi...@apache.org>.
RuiLi8080 commented on issue #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies
URL: https://github.com/apache/storm/pull/3213#issuecomment-594812990
 
 
   @kishorvpatil Thanks for your review. As I addressed your comment, I noticed that we actually can cache all types of total cluster resources since the superviors snapshot won't change in cluster object.
   So we don't need to add another getClusterGenericResourceTypes() function. We can just cache generic resources map and only fetch the keySet while we need it.
   I also added cache for CPU and memory in latest commit, any further comments would be appreciated.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [storm] RuiLi8080 commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies

Posted by GitBox <gi...@apache.org>.
RuiLi8080 commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies
URL: https://github.com/apache/storm/pull/3213#discussion_r385888781
 
 

 ##########
 File path: storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
 ##########
 @@ -202,7 +210,7 @@ private void scheduleTopology(TopologyDetails td, Cluster cluster, final User to
                             if (evictAssignemnt != null && !evictAssignemnt.getSlots().isEmpty()) {
                                 Collection<WorkerSlot> workersToEvict = workingState.getUsedSlotsByTopologyId(topologyEvict.getId());
                                 topologySchedulingResources.adjustResourcesForEvictedTopology(toSchedule, topologyEvict);
-
+                                evictedTopologies.add(topologyEvict.getId());
 
 Review comment:
   Addressed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [storm] Ethanlm commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies

Posted by GitBox <gi...@apache.org>.
Ethanlm commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies
URL: https://github.com/apache/storm/pull/3213#discussion_r406827570
 
 

 ##########
 File path: storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
 ##########
 @@ -115,6 +122,7 @@ public void schedule(Topologies topologies, Cluster cluster) {
                 scheduleTopology(td, cluster, submitter, orderedTopologies);
             }
         }
+        evictedTopologiesMap = tmpEvictedTopologiesMap;
 
 Review comment:
   The idea of making `evictedTopologiesMap` a result after a complete schedule round is a good idea. 
   
   But the code here makes `evictedTopologiesMap` and `tmpEvictedTopologiesMap` the same object.  
   So tmpEvictedTopologiesMap.clear == evictedTopologiesMap.clear. This won't achieve the goal here.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [storm] kishorvpatil commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies

Posted by GitBox <gi...@apache.org>.
kishorvpatil commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies
URL: https://github.com/apache/storm/pull/3213#discussion_r393152100
 
 

 ##########
 File path: storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
 ##########
 @@ -80,6 +80,13 @@
     private final Map<String, Map<String, Double>> nodeToScheduledOffHeapNodeMemoryCache;   // node -> topologyId -> double
     private final Map<String, Set<WorkerSlot>> nodeToUsedSlotsCache;
     private final Map<String, NormalizedResourceRequest> totalResourcesPerNodeCache = new HashMap<>();
+    /**
+     * Snapshot of cluster total resources (cpu, memory, generic).
+     */
+    private final double totalCpuResource;
+    private final double totalMemoryResource;
+    private final Map<String, Double> totalGenericResources;
 
 Review comment:
   These three variables nee to be part if copying from src in copy constructor and  `public Cluster(Cluster src, Topologies topologies)`
     

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [storm] Ethanlm merged pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies

Posted by GitBox <gi...@apache.org>.
Ethanlm merged pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies
URL: https://github.com/apache/storm/pull/3213
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [storm] RuiLi8080 commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies

Posted by GitBox <gi...@apache.org>.
RuiLi8080 commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies
URL: https://github.com/apache/storm/pull/3213#discussion_r406861527
 
 

 ##########
 File path: storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/priority/TestGenericResourceAwareSchedulingPriorityStrategy.java
 ##########
 @@ -210,4 +216,11 @@ private Cluster mkTestCluster(Topologies topologies, Config config) {
 
         return new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config);
     }
+
+    private Set<String> collectMapValues(Map<String, Set<String>> map) {
+        return map.values()
 
 Review comment:
   Fixed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [storm] Ethanlm commented on issue #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies

Posted by GitBox <gi...@apache.org>.
Ethanlm commented on issue #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies
URL: https://github.com/apache/storm/pull/3213#issuecomment-613205078
 
 
   ```
   [ERROR] Failed to execute goal org.apache.maven.plugins:maven-checkstyle-plugin:3.0.0:check (validate) on project storm-server: You have 1 Checkstyle violation. -> [Help 1]
   ```
   checkstyle violations.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [storm] Ethanlm commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies

Posted by GitBox <gi...@apache.org>.
Ethanlm commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies
URL: https://github.com/apache/storm/pull/3213#discussion_r406831396
 
 

 ##########
 File path: storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
 ##########
 @@ -196,21 +204,18 @@ private void scheduleTopology(TopologyDetails td, Cluster cluster, final User to
                     } else if (result.getStatus() == SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES) {
                         LOG.debug("Not enough resources to schedule {}", td.getName());
                         List<TopologyDetails> reversedList = ImmutableList.copyOf(orderedTopologies).reverse();
-                        boolean evictedSomething = false;
-                        LOG.debug("attempting to make space for topo {} from user {}", td.getName(), td.getTopologySubmitter());
+                        LOG.debug("Attempting to make space for topo {} from user {}", td.getName(), td.getTopologySubmitter());
                         int tdIndex = reversedList.indexOf(td);
                         topologySchedulingResources.setRemainingRequiredResources(toSchedule, td);
 
+                        Set<String> tmpEvictedTopos = new HashSet<>();
                         for (int index = 0; index < tdIndex; index++) {
                             TopologyDetails topologyEvict = reversedList.get(index);
                             SchedulerAssignment evictAssignemnt = workingState.getAssignmentById(topologyEvict.getId());
                             if (evictAssignemnt != null && !evictAssignemnt.getSlots().isEmpty()) {
-                                Collection<WorkerSlot> workersToEvict = workingState.getUsedSlotsByTopologyId(topologyEvict.getId());
                                 topologySchedulingResources.adjustResourcesForEvictedTopology(toSchedule, topologyEvict);
-
-                                LOG.debug("Evicting Topology {} with workers: {} from user {}", topologyEvict.getName(), workersToEvict,
 
 Review comment:
   We can change this to `WARN` and add the current topologyId since this also includes the workers being evicted.
   
   Then delete Line228.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [storm] Ethanlm commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies

Posted by GitBox <gi...@apache.org>.
Ethanlm commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies
URL: https://github.com/apache/storm/pull/3213#discussion_r385166333
 
 

 ##########
 File path: storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
 ##########
 @@ -202,7 +210,7 @@ private void scheduleTopology(TopologyDetails td, Cluster cluster, final User to
                             if (evictAssignemnt != null && !evictAssignemnt.getSlots().isEmpty()) {
                                 Collection<WorkerSlot> workersToEvict = workingState.getUsedSlotsByTopologyId(topologyEvict.getId());
                                 topologySchedulingResources.adjustResourcesForEvictedTopology(toSchedule, topologyEvict);
-
+                                evictedTopologies.add(topologyEvict.getId());
 
 Review comment:
   One last thing, we can have a boolean like `forTests`, which by default to false and we can set it to true from unit tests. This is to avoid unnecessary computation/operations. And make it clear that this is only for tests. 
   
   As for `getEvictedTopologies`, we can throw exceptions if `forTests` is false. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [storm] kishorvpatil commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies

Posted by GitBox <gi...@apache.org>.
kishorvpatil commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies
URL: https://github.com/apache/storm/pull/3213#discussion_r387629493
 
 

 ##########
 File path: storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
 ##########
 @@ -889,6 +889,20 @@ public double getClusterTotalMemoryResource() {
         return sum;
     }
 
+    @Override
+    public Map<String, Double> getClusterTotalGenericResource() {
+        Map<String, Double> ret = new HashMap<>();
+        for (SupervisorDetails sup : supervisors.values()) {
+            Map<String, Double> supGenericResources = sup.getTotalGenericResources();
+            for (Map.Entry<String, Double> entry : supGenericResources.entrySet()) {
+                String resourceName = entry.getKey();
+                Double amount = entry.getValue();
+                ret.put(resourceName, ret.getOrDefault(resourceName, 0.0) + amount);
 
 Review comment:
   A stream API could make it simpler to represent aggregation of maps:.
   `return supervisors.values().stream().flatMap(m -> m.getTotalGenericResources().entrySet().stream()).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, Double::sum));`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [storm] Ethanlm commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies

Posted by GitBox <gi...@apache.org>.
Ethanlm commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies
URL: https://github.com/apache/storm/pull/3213#discussion_r405255219
 
 

 ##########
 File path: storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
 ##########
 @@ -239,6 +246,10 @@ private void scheduleTopology(TopologyDetails td, Cluster cluster, final User to
         markFailedTopology(topologySubmitter, cluster, td, "Failed to schedule within " + maxSchedulingAttempts + " attempts");
     }
 
+    public Set<String> getEvictedTopologies() {
 
 Review comment:
   Yes that's right. We should return a unmodifiableCollection to solve the race condition. I think my question is mostly on how do we want to use this `evictedTopologies` information in regular code except unit test? 
   If we want to log what are evicted when scheduling a certain topology, we can change the above line to `LOG.info` and it works better than having a `evictedTopologies` list.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [storm] RuiLi8080 commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies

Posted by GitBox <gi...@apache.org>.
RuiLi8080 commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies
URL: https://github.com/apache/storm/pull/3213#discussion_r407854045
 
 

 ##########
 File path: storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java
 ##########
 @@ -271,6 +273,13 @@ boolean wouldFit(
      */
     double getClusterTotalMemoryResource();
 
+    /**
+     * Get the total amount of generic resources (excluding CPU and memory) in cluster.
+     */
+    default Map<String, Double> getClusterTotalGenericResources() {
+        return Collections.emptyMap();
 
 Review comment:
   No we don't. Only cluster class implement this interface and cluster will have the implementation in this PR. Good catch.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [storm] Ethanlm commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies

Posted by GitBox <gi...@apache.org>.
Ethanlm commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies
URL: https://github.com/apache/storm/pull/3213#discussion_r406831958
 
 

 ##########
 File path: storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
 ##########
 @@ -248,6 +255,11 @@ private void scheduleTopology(TopologyDetails td, Cluster cluster, final User to
                     + topologySchedulingResources.getRemainingRequiredResourcesMessage());
     }
 
+    public Map<String, Set<String>> getEvictedTopologiesMap() {
 
 Review comment:
   I would add a comment like `This is not stable. It's subject to change` since we don't really use it elsewhere besides unit test currently. It might change depending on how we want to use the evictedTopologies information. As @bipinprasad  said, we might want to put it into the scheduleResult in the future. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [storm] Ethanlm commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies

Posted by GitBox <gi...@apache.org>.
Ethanlm commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies
URL: https://github.com/apache/storm/pull/3213#discussion_r384791734
 
 

 ##########
 File path: storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/priority/GenericResourceAwareSchedulingPriorityStrategy.java
 ##########
 @@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource.strategies.priority;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.storm.scheduler.ISchedulingState;
+import org.apache.storm.scheduler.TopologyDetails;
+import org.apache.storm.scheduler.resource.User;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class GenericResourceAwareSchedulingPriorityStrategy extends DefaultSchedulingPriorityStrategy {
+    private static final Logger LOG = LoggerFactory.getLogger(DefaultSchedulingPriorityStrategy.class);
+
+    @Override
+    protected GrasSimulatedUser getSimulatedUserFor(User u, ISchedulingState cluster) {
+        return new GrasSimulatedUser(u, cluster);
+    }
+
+    @Override
+    public List<TopologyDetails> getOrderedTopologies(ISchedulingState cluster, Map<String, User> userMap) {
+        double cpuAvail = cluster.getClusterTotalCpuResource();
+        double memAvail = cluster.getClusterTotalMemoryResource();
+        Map<String, Double> genericAvail = cluster.getClusterTotalGenericResource();
+
+        List<TopologyDetails> allUserTopologies = new ArrayList<>();
+        List<GrasSimulatedUser> users = new ArrayList<>();
+
+        for (User u : userMap.values()) {
+            users.add(getSimulatedUserFor(u, cluster));
+        }
+
+        while (!users.isEmpty()) {
+            Collections.sort(users, new GrasSimulatedUserComparator(cpuAvail, memAvail, genericAvail));
+            GrasSimulatedUser u = users.get(0);
+            TopologyDetails td = u.getNextHighest();
+            if (td == null) {
+                users.remove(0);
+            } else {
+                double score = u.getScore(cpuAvail, memAvail, genericAvail);
+                td = u.simScheduleNextHighest();
+                LOG.info("GRAS SIM Scheduling {} with score of {}", td.getId(), score);
+                cpuAvail -= td.getTotalRequestedCpu();
+                memAvail -= (td.getTotalRequestedMemOffHeap() + td.getTotalRequestedMemOnHeap());
+                for (Map.Entry<String, Double> entry : td.getTotalRequestedGenericResources().entrySet()) {
+                    String resource = entry.getKey();
+                    Double requestedAmount = entry.getValue();
+                    if (!genericAvail.containsKey(resource)) {
+                        LOG.warn("Resource: {} is not supported in this cluster. Ignoring this request.", resource);
+                    } else {
+                        genericAvail.put(resource, genericAvail.get(resource) - requestedAmount);
+                    }
+                }
+                allUserTopologies.add(td);
+            }
+        }
+        return allUserTopologies;
+    }
+
+
 
 Review comment:
   nit: too many new lines

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [storm] RuiLi8080 commented on issue #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies

Posted by GitBox <gi...@apache.org>.
RuiLi8080 commented on issue #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies
URL: https://github.com/apache/storm/pull/3213#issuecomment-591044221
 
 
   Add evictedTopologies set in ResourceAwareScheduler currently only for test since we did not find a better way to check eviction history. We added here considering that eviction is a RAS specific method and this new field could probably be useful in the future. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [storm] Ethanlm commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies

Posted by GitBox <gi...@apache.org>.
Ethanlm commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies
URL: https://github.com/apache/storm/pull/3213#discussion_r384781153
 
 

 ##########
 File path: storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/priority/TestGenericResourceAwareSchedulingPriorityStrategy.java
 ##########
 @@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource.strategies.priority;
+
+import org.apache.storm.Config;
+import org.apache.storm.DaemonConfig;
+import org.apache.storm.metric.StormMetricsRegistry;
+import org.apache.storm.scheduler.Cluster;
+import org.apache.storm.scheduler.INimbus;
+import org.apache.storm.scheduler.IScheduler;
+import org.apache.storm.scheduler.SupervisorDetails;
+import org.apache.storm.scheduler.Topologies;
+import org.apache.storm.scheduler.resource.ResourceAwareScheduler;
+import org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler;
+import org.apache.storm.scheduler.resource.normalization.ResourceMetrics;
+import org.apache.storm.scheduler.resource.strategies.eviction.TestDefaultEvictionStrategy;
+import org.apache.storm.utils.Time;
+import org.junit.After;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.addTopologies;
+import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.assertTopologiesBeenEvicted;
+import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.assertTopologiesFullyScheduled;
+import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.assertTopologiesNotBeenEvicted;
+import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.assertTopologiesNotScheduled;
+import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.createGrasClusterConfig;
+import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.genSupervisors;
+import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.genTopology;
+import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.userRes;
+import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.userResourcePool;
+
+public class TestGenericResourceAwareSchedulingPriorityStrategy {
+
+    private static final Logger LOG = LoggerFactory.getLogger(TestDefaultEvictionStrategy.class);
+    private static int currentTime = Time.currentTimeSecs();
+    private static IScheduler scheduler = null;
+
+    @After
+    public void cleanup() {
+        if (scheduler != null) {
+            scheduler.cleanup();
+            scheduler = null;
+        }
+    }
+
+    /*
+     * DefaultSchedulingPriorityStrategy will not evict topo as long as the resources request can be met
+     *
+     *  Ethan asks for heavy cpu and memory while Rui asks for little cpu and memory but heave generic resource
+     *  Since Rui's all types of resources request can be met, no eviction will happend
 
 Review comment:
   `happend` -> 'happen`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [storm] Ethanlm commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies

Posted by GitBox <gi...@apache.org>.
Ethanlm commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies
URL: https://github.com/apache/storm/pull/3213#discussion_r384793382
 
 

 ##########
 File path: storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/priority/DefaultSchedulingPriorityStrategy.java
 ##########
 @@ -140,7 +140,7 @@ public int compare(SimulatedUser o1, SimulatedUser o2) {
      * Comparator that sorts topologies by priority and then by submission time.
      * First sort by Topology Priority, if there is a tie for topology priority, topology uptime is used to sort.
      */
-    private static class TopologyByPriorityAndSubmissionTimeComparator implements Comparator<TopologyDetails> {
+    protected static class TopologyByPriorityAndSubmissionTimeComparator implements Comparator<TopologyDetails> {
 
 Review comment:
   This can be `private` since it's not used elsewhere ?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [storm] Ethanlm commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies

Posted by GitBox <gi...@apache.org>.
Ethanlm commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies
URL: https://github.com/apache/storm/pull/3213#discussion_r404830638
 
 

 ##########
 File path: storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
 ##########
 @@ -113,6 +119,9 @@ public void schedule(Topologies topologies, Cluster cluster) {
             } else {
                 User submitter = userMap.get(td.getTopologySubmitter());
                 scheduleTopology(td, cluster, submitter, orderedTopologies);
+                if (!evictedTopologies.isEmpty()) {
+                    LOG.warn("Evicted Topologies {} when scheduling topology: {}", evictedTopologies, td.getId());
 
 Review comment:
   evictedTopologies is cumulated during scheduling all the topologies since it only clears `evictedTopologies` at Line 111. So `evictedTopologies` here include all the evictedTopologies not only evicted by scheduling this topology `td.getId()`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [storm] RuiLi8080 commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies

Posted by GitBox <gi...@apache.org>.
RuiLi8080 commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies
URL: https://github.com/apache/storm/pull/3213#discussion_r406945614
 
 

 ##########
 File path: storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
 ##########
 @@ -219,8 +224,14 @@ private void scheduleTopology(TopologyDetails td, Cluster cluster, final User to
                                 }
                             }
                         }
-
-                        if (!evictedSomething) {
+                        if (!tmpEvictedTopos.isEmpty()) {
+                            LOG.warn("Evicted Topologies {} when scheduling topology: {}", tmpEvictedTopos, td.getId());
+                            String topoId = td.getId();
+                            if (!tmpEvictedTopologiesMap.containsKey(topoId)) {
+                                tmpEvictedTopologiesMap.put(topoId, new HashSet<>());
+                            }
+                            tmpEvictedTopologiesMap.get(topoId).addAll(tmpEvictedTopos);
 
 Review comment:
   Fixed

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [storm] RuiLi8080 commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies

Posted by GitBox <gi...@apache.org>.
RuiLi8080 commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies
URL: https://github.com/apache/storm/pull/3213#discussion_r393988777
 
 

 ##########
 File path: storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/priority/GenericResourceAwareSchedulingPriorityStrategy.java
 ##########
 @@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource.strategies.priority;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.storm.scheduler.ISchedulingState;
+import org.apache.storm.scheduler.TopologyDetails;
+import org.apache.storm.scheduler.resource.User;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class GenericResourceAwareSchedulingPriorityStrategy extends DefaultSchedulingPriorityStrategy {
+    private static final Logger LOG = LoggerFactory.getLogger(GenericResourceAwareSchedulingPriorityStrategy.class);
+
+    @Override
+    protected GrasSimulatedUser getSimulatedUserFor(User u, ISchedulingState cluster) {
+        return new GrasSimulatedUser(u, cluster);
+    }
+
+    @Override
+    public List<TopologyDetails> getOrderedTopologies(ISchedulingState cluster, Map<String, User> userMap) {
+        double cpuAvail = cluster.getClusterTotalCpuResource();
+        double memAvail = cluster.getClusterTotalMemoryResource();
+        Map<String, Double> genericAvail = cluster.getClusterTotalGenericResources();
+
+        List<TopologyDetails> allUserTopologies = new ArrayList<>();
+        List<GrasSimulatedUser> users = new ArrayList<>();
+
+        for (User u : userMap.values()) {
+            users.add(getSimulatedUserFor(u, cluster));
+        }
+
+        while (!users.isEmpty()) {
+            Collections.sort(users, new GrasSimulatedUserComparator(cpuAvail, memAvail, genericAvail));
+            GrasSimulatedUser u = users.get(0);
+            TopologyDetails td = u.getNextHighest();
+            if (td == null) {
+                users.remove(0);
+            } else {
+                double score = u.getScore(cpuAvail, memAvail, genericAvail);
+                td = u.simScheduleNextHighest();
+                LOG.info("GRAS SIM Scheduling {} with score of {}", td.getId(), score);
+                cpuAvail -= td.getTotalRequestedCpu();
+                memAvail -= (td.getTotalRequestedMemOffHeap() + td.getTotalRequestedMemOnHeap());
+                for (Map.Entry<String, Double> entry : td.getTotalRequestedGenericResources().entrySet()) {
+                    String resource = entry.getKey();
+                    Double requestedAmount = entry.getValue();
+                    if (!genericAvail.containsKey(resource)) {
+                        LOG.warn("Resource: {} is not supported in this cluster. Ignoring this request.", resource);
+                    } else {
+                        genericAvail.put(resource, genericAvail.get(resource) - requestedAmount);
+                    }
+                }
+                allUserTopologies.add(td);
+            }
+        }
+        return allUserTopologies;
+    }
+
+    protected static class GrasSimulatedUser extends SimulatedUser {
+
+        // Extend support for Generic Resources in addition to CPU and Memory
+        public final Map<String, Double> guaranteedGenericResources;                // resource name : guaranteed amount
+        private Map<String, Double> assignedGenericResources = new HashMap<>();     // resource name : assigned amount
+
+        public GrasSimulatedUser(User other, ISchedulingState cluster) {
+            super(other, cluster);
+
+            Map<String, Double> guaranteedGenericResources = new HashMap<>();
+            // generic resource types that are offered
+            Set<String> availGenericResourceTypes = cluster.getClusterTotalGenericResources().keySet();
+            for (String resourceType : availGenericResourceTypes) {
+                Double guaranteedAmount = other.getGenericGuaranteed().getOrDefault(resourceType, 0.0);
+                guaranteedGenericResources.put(resourceType, guaranteedAmount);
+            }
+            this.guaranteedGenericResources = guaranteedGenericResources;
+        }
+
+        @Override
+        public TopologyDetails simScheduleNextHighest() {
+            TopologyDetails td = super.simScheduleNextHighest();
+            Map<String, Double> tdRequestedGenericResource = td.getTotalRequestedGenericResources();
+            for (Map.Entry<String, Double> entry : tdRequestedGenericResource.entrySet()) {
+                String resource = entry.getKey();
+                Double requestedAmount = entry.getValue();
+                assignedGenericResources.put(resource, assignedGenericResources.getOrDefault(resource, 0.0) + requestedAmount);
+            }
 
 Review comment:
   cpu and mem are adjusted by its overrided method on line 105:
   ```TopologyDetails td = super.simScheduleNextHighest();```
   
   Not sure if this is what you are looking for.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [storm] bipinprasad commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies

Posted by GitBox <gi...@apache.org>.
bipinprasad commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies
URL: https://github.com/apache/storm/pull/3213#discussion_r394764507
 
 

 ##########
 File path: storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
 ##########
 @@ -80,6 +80,13 @@
     private final Map<String, Map<String, Double>> nodeToScheduledOffHeapNodeMemoryCache;   // node -> topologyId -> double
     private final Map<String, Set<WorkerSlot>> nodeToUsedSlotsCache;
     private final Map<String, NormalizedResourceRequest> totalResourcesPerNodeCache = new HashMap<>();
+    /**
+     * Snapshot of cluster total resources (cpu, memory, generic).
+     */
+    private final double totalCpuResource;
+    private final double totalMemoryResource;
+    private final Map<String, Double> totalGenericResources;
 
 Review comment:
   If these variables are being used to keep running totals, should these be called " cacheTotalCpuResource" and so on. And the totals be incrementally updated in assign and unassign? If not running cache, then maybe just getTotalResouceCpu() == computeClusterCpuResource()?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [storm] Ethanlm commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies

Posted by GitBox <gi...@apache.org>.
Ethanlm commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies
URL: https://github.com/apache/storm/pull/3213#discussion_r406923688
 
 

 ##########
 File path: storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
 ##########
 @@ -248,6 +255,11 @@ private void scheduleTopology(TopologyDetails td, Cluster cluster, final User to
                     + topologySchedulingResources.getRemainingRequiredResourcesMessage());
     }
 
+    public Map<String, Set<String>> getEvictedTopologiesMap() {
 
 Review comment:
   Good comment

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [storm] govind-menon commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies

Posted by GitBox <gi...@apache.org>.
govind-menon commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies
URL: https://github.com/apache/storm/pull/3213#discussion_r407599592
 
 

 ##########
 File path: storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
 ##########
 @@ -115,6 +122,7 @@ public void schedule(Topologies topologies, Cluster cluster) {
                 scheduleTopology(td, cluster, submitter, orderedTopologies);
             }
         }
+        evictedTopologiesMap = tmpEvictedTopologiesMap;
 
 Review comment:
   I'm trying to clarify why we don't pass in `evictedTopologiesMap` - is it because there could be an exception in scheduleTopology and you want to be able to get the previously evicted topologies?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [storm] RuiLi8080 commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies

Posted by GitBox <gi...@apache.org>.
RuiLi8080 commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies
URL: https://github.com/apache/storm/pull/3213#discussion_r384900371
 
 

 ##########
 File path: storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
 ##########
 @@ -83,6 +88,10 @@ public void addSelfTo(Map<String, Map<String, Number>> fullPool) {
         }
     }
 
+    public static TestUserResources userRes(String name, Map<String, Double> resources) {
+        return new TestUserResources(name, resources);
 
 Review comment:
   Addressed all comments, thanks for the prompt review. As for this, yes we should have some tests on User class. Currently we only have a testResourcePoolUtilization. When I was coding I also extended the usage of pool utilization related functions to include generic resources as well. But after I realized that there is nowhere we are using it. I will filed another PR for removing them.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [storm] Ethanlm commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies

Posted by GitBox <gi...@apache.org>.
Ethanlm commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies
URL: https://github.com/apache/storm/pull/3213#discussion_r406923154
 
 

 ##########
 File path: storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
 ##########
 @@ -219,8 +224,14 @@ private void scheduleTopology(TopologyDetails td, Cluster cluster, final User to
                                 }
                             }
                         }
-
-                        if (!evictedSomething) {
+                        if (!tmpEvictedTopos.isEmpty()) {
+                            LOG.warn("Evicted Topologies {} when scheduling topology: {}", tmpEvictedTopos, td.getId());
+                            String topoId = td.getId();
+                            if (!tmpEvictedTopologiesMap.containsKey(topoId)) {
+                                tmpEvictedTopologiesMap.put(topoId, new HashSet<>());
+                            }
+                            tmpEvictedTopologiesMap.get(topoId).addAll(tmpEvictedTopos);
 
 Review comment:
   Can simplify this with 
   
   ```
   tmpEvictedTopologiesMap.computeIfAbsent(topoId, k -> new HashSet<>()).addAll(tmpEvictedTopos);
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [storm] Ethanlm commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies

Posted by GitBox <gi...@apache.org>.
Ethanlm commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies
URL: https://github.com/apache/storm/pull/3213#discussion_r384712536
 
 

 ##########
 File path: storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
 ##########
 @@ -239,6 +246,10 @@ private void scheduleTopology(TopologyDetails td, Cluster cluster, final User to
         markFailedTopology(topologySubmitter, cluster, td, "Failed to schedule within " + maxSchedulingAttempts + " attempts");
     }
 
+    public Set<String> getEvictedTopologies() {
 
 Review comment:
   better to add `VisibleForTesting` and comments to make it clear that this is only used by unit test

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [storm] Ethanlm commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies

Posted by GitBox <gi...@apache.org>.
Ethanlm commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies
URL: https://github.com/apache/storm/pull/3213#discussion_r405255219
 
 

 ##########
 File path: storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
 ##########
 @@ -239,6 +246,10 @@ private void scheduleTopology(TopologyDetails td, Cluster cluster, final User to
         markFailedTopology(topologySubmitter, cluster, td, "Failed to schedule within " + maxSchedulingAttempts + " attempts");
     }
 
+    public Set<String> getEvictedTopologies() {
 
 Review comment:
   Yes that's right. We should return a unmodifiableCollection to solve the race condition. I think my question is mostly on how do we want to use this `evictedTopologies` information in regular code except unit test?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [storm] RuiLi8080 commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies

Posted by GitBox <gi...@apache.org>.
RuiLi8080 commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies
URL: https://github.com/apache/storm/pull/3213#discussion_r406864149
 
 

 ##########
 File path: storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
 ##########
 @@ -196,21 +204,18 @@ private void scheduleTopology(TopologyDetails td, Cluster cluster, final User to
                     } else if (result.getStatus() == SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES) {
                         LOG.debug("Not enough resources to schedule {}", td.getName());
                         List<TopologyDetails> reversedList = ImmutableList.copyOf(orderedTopologies).reverse();
-                        boolean evictedSomething = false;
-                        LOG.debug("attempting to make space for topo {} from user {}", td.getName(), td.getTopologySubmitter());
+                        LOG.debug("Attempting to make space for topo {} from user {}", td.getName(), td.getTopologySubmitter());
                         int tdIndex = reversedList.indexOf(td);
                         topologySchedulingResources.setRemainingRequiredResources(toSchedule, td);
 
+                        Set<String> tmpEvictedTopos = new HashSet<>();
                         for (int index = 0; index < tdIndex; index++) {
                             TopologyDetails topologyEvict = reversedList.get(index);
                             SchedulerAssignment evictAssignemnt = workingState.getAssignmentById(topologyEvict.getId());
                             if (evictAssignemnt != null && !evictAssignemnt.getSlots().isEmpty()) {
-                                Collection<WorkerSlot> workersToEvict = workingState.getUsedSlotsByTopologyId(topologyEvict.getId());
                                 topologySchedulingResources.adjustResourcesForEvictedTopology(toSchedule, topologyEvict);
-
-                                LOG.debug("Evicting Topology {} with workers: {} from user {}", topologyEvict.getName(), workersToEvict,
 
 Review comment:
   As for log info. I feel the worker and user information is not useful here since all workers will be evicted when evicting topologies. We are more concerned about for what topology the evictions happen. And we can easily know user info from topology info.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [storm] Ethanlm commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies

Posted by GitBox <gi...@apache.org>.
Ethanlm commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies
URL: https://github.com/apache/storm/pull/3213#discussion_r384776794
 
 

 ##########
 File path: storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
 ##########
 @@ -451,6 +460,32 @@ public static void assertTopologiesFullyScheduled(Cluster cluster, String... top
         }
     }
 
+    public static void assertTopologiesBeenEvicted(Cluster cluster, ResourceAwareScheduler scheduler, String... topoNames) {
 
 Review comment:
   We could pass `Set<String> evictedTopologies` instead of a scheduler since that's the only thing we really need from the scheduler

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [storm] Ethanlm commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies

Posted by GitBox <gi...@apache.org>.
Ethanlm commented on a change in pull request #3213: [STORM-3588] add GenericResourceAwareSchedulingPriorityStrategy to accommodate generic resource in grading topologies
URL: https://github.com/apache/storm/pull/3213#discussion_r384792338
 
 

 ##########
 File path: storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/priority/GenericResourceAwareSchedulingPriorityStrategy.java
 ##########
 @@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource.strategies.priority;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.storm.scheduler.ISchedulingState;
+import org.apache.storm.scheduler.TopologyDetails;
+import org.apache.storm.scheduler.resource.User;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class GenericResourceAwareSchedulingPriorityStrategy extends DefaultSchedulingPriorityStrategy {
+    private static final Logger LOG = LoggerFactory.getLogger(DefaultSchedulingPriorityStrategy.class);
 
 Review comment:
   should be `GenericResourceAwareSchedulingPriorityStrategy`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services