You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by govind-menon <gi...@git.apache.org> on 2017/10/25 03:46:37 UTC

[GitHub] storm pull request #2385: YSTORM-2727: Generic Resource Aware Scheduling

GitHub user govind-menon opened a pull request:

    https://github.com/apache/storm/pull/2385

    YSTORM-2727: Generic Resource Aware Scheduling

    Remaining
    
    1. Add Tests
    2. Do more manual testing

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/govind-menon/storm YSTORM-2727

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/2385.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2385
    
----
commit facf515b121c80eed6c02d74104e323cfb9e4a1d
Author: Govind Menon <go...@gmail.com>
Date:   2017-09-07T18:50:05Z

    YSTORM-2725: Generic Resource Scheduling - initial config changes and TopologyBuilder API

commit 1f5265bfd57386a026caa02f26b2f86c09a1ad9d
Author: Govind Menon <go...@gmail.com>
Date:   2017-10-23T15:31:08Z

    YSTORM-2727: Generic Resource Aware Scheduling(GRAS) - metadata, scheduling strategies and configuration enhancements

----


---

[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2385#discussion_r149164286
  
    --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java ---
    @@ -189,13 +223,18 @@ private static void checkInitCpu(Map<String, Double> topologyResources, String c
                             null);
                         topologyResources.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, topoCpu);
                     }
    -                LOG.debug("Topology Resources {}", topologyResources);
    +
    +                // If resource is also present in resources map will overwrite the above
    +                if (jsonObject.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP)) {
    +                    topologyResources.putAll((Map<String, Double>) jsonObject.get(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP));
    +                }
    +                    LOG.info("Topology Resources {}", topologyResources);
    --- End diff --
    
    Is this needed?  Should it be debug if it is needed? and can we fix the indentation?


---

[GitHub] storm pull request #2385: YSTORM-2727: Generic Resource Aware Scheduling

Posted by govind-menon <gi...@git.apache.org>.
GitHub user govind-menon reopened a pull request:

    https://github.com/apache/storm/pull/2385

    YSTORM-2727: Generic Resource Aware Scheduling

    Remaining
    
    1. Add Tests
    2. Do more manual testing

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/govind-menon/storm YSTORM-2727

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/2385.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2385
    
----
commit facf515b121c80eed6c02d74104e323cfb9e4a1d
Author: Govind Menon <go...@gmail.com>
Date:   2017-09-07T18:50:05Z

    YSTORM-2725: Generic Resource Scheduling - initial config changes and TopologyBuilder API

commit 4337d44b4f95da34f2b1d0a4160353ab81c0fab0
Author: Govind Menon <go...@gmail.com>
Date:   2017-10-23T15:31:08Z

    YSTORM-2727: Generic Resource Aware Scheduling(GRAS) - metadata, scheduling strategies and configuration enhancements

----


---

[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2385#discussion_r147237298
  
    --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java ---
    @@ -18,22 +18,24 @@
     
     package org.apache.storm.scheduler.resource;
     
    -import java.util.Collection;
    -import java.util.HashMap;
    -import java.util.Map;
    +import java.util.*;
    --- End diff --
    
    again lets avoid .* imports


---

[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2385#discussion_r149143264
  
    --- Diff: storm-client/src/jvm/org/apache/storm/Config.java ---
    @@ -1281,6 +1287,12 @@
         public static final String SUPERVISOR_CPU_CAPACITY = "supervisor.cpu.capacity";
     
         /**
    +     * A map of resources the Supervisor has e.g {"cpu" : 200.0. "memory.capacity.mb": 256.0, "gpu" : 0.5 }
    --- End diff --
    
    Same here for `cpu` and `gpu` resource examples.


---

[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

Posted by Ethanlm <gi...@git.apache.org>.
Github user Ethanlm commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2385#discussion_r147755614
  
    --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/GenericResourceAwareStrategy.java ---
    @@ -0,0 +1,194 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.scheduler.resource.strategies.scheduling;
    +
    +import com.google.common.annotations.VisibleForTesting;
    +
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.HashSet;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.TreeSet;
    +
    +import org.apache.storm.Config;
    +import org.apache.storm.scheduler.Cluster;
    +import org.apache.storm.scheduler.Component;
    +import org.apache.storm.scheduler.ExecutorDetails;
    +import org.apache.storm.scheduler.TopologyDetails;
    +import org.apache.storm.scheduler.resource.ResourceUtils;
    +import org.apache.storm.scheduler.resource.SchedulingResult;
    +import org.apache.storm.scheduler.resource.SchedulingStatus;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +public class GenericResourceAwareStrategy extends BaseResourceAwareStrategy implements IStrategy {
    +    private static final Logger LOG = LoggerFactory.getLogger(GenericResourceAwareStrategy.class);
    +
    +    @Override
    +    public SchedulingResult schedule(Cluster cluster, TopologyDetails td) {
    +        prepare(cluster);
    +        if (nodes.getNodes().size() <= 0) {
    +            LOG.warn("No available nodes to schedule tasks on!");
    +            return SchedulingResult.failure(
    +                    SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES, "No available nodes to schedule tasks on!");
    +        }
    +        Collection<ExecutorDetails> unassignedExecutors =
    +                new HashSet<>(this.cluster.getUnassignedExecutors(td));
    +        LOG.info("ExecutorsNeedScheduling: {}", unassignedExecutors);
    +        Collection<ExecutorDetails> scheduledTasks = new ArrayList<>();
    +        List<Component> spouts = this.getSpouts(td);
    +
    +        if (spouts.size() == 0) {
    +            LOG.error("Cannot find a Spout!");
    +            return SchedulingResult.failure(
    +                    SchedulingStatus.FAIL_INVALID_TOPOLOGY, "Cannot find a Spout!");
    +        }
    +
    +        //order executors to be scheduled
    +        List<ExecutorDetails> orderedExecutors = this.orderExecutors(td, unassignedExecutors);
    +        LOG.info("orderedExecutors");
    +        LOG.info(orderedExecutors.  toString());
    +        Collection<ExecutorDetails> executorsNotScheduled = new HashSet<>(unassignedExecutors);
    +        List<String> favoredNodes = (List<String>) td.getConf().get(Config.TOPOLOGY_SCHEDULER_FAVORED_NODES);
    +        List<String> unFavoredNodes = (List<String>) td.getConf().get(Config.TOPOLOGY_SCHEDULER_UNFAVORED_NODES);
    +
    +        for (ExecutorDetails exec : orderedExecutors) {
    +            LOG.debug(
    +                    "Attempting to schedule: {} of component {}[ REQ {} ]",
    +                    exec,
    +                    td.getExecutorToComponent().get(exec),
    +                    td.getTaskResourceReqList(exec));
    +            final List<ObjectResources> sortedNodes = this.sortAllNodes(td, exec, favoredNodes, unFavoredNodes);
    +            LOG.info("sortedNodes");
    +            LOG.info(sortedNodes.toString());
    +
    +            scheduleExecutor(exec, td, scheduledTasks, sortedNodes);
    +        }
    +
    +        executorsNotScheduled.removeAll(scheduledTasks);
    +        LOG.error("/* Scheduling left over task (most likely sys tasks) */");
    +        // schedule left over system tasks
    +        for (ExecutorDetails exec : executorsNotScheduled) {
    +            final List<ObjectResources> sortedNodes = this.sortAllNodes(td, exec, favoredNodes, unFavoredNodes);
    +            LOG.info("sortedNodes");
    +            LOG.info(sortedNodes.toString());
    --- End diff --
    
    same here


---

[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

Posted by govind-menon <gi...@git.apache.org>.
Github user govind-menon commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2385#discussion_r149511408
  
    --- Diff: storm-client/src/storm.thrift ---
    @@ -496,6 +496,8 @@ struct WorkerResources {
         3: optional double cpu;
         4: optional double shared_mem_on_heap; //This is just for accounting mem_on_heap should be used for enforcement
         5: optional double shared_mem_off_heap; //This is just for accounting mem_off_heap should be used for enforcement
    +    6: optional map<string, double> resources; // Generic resources Map
    +    7: optional map<string, double> shared_resources; // Shared Generic resources Map
    --- End diff --
    
    We're definitely using it.


---

[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

Posted by govind-menon <gi...@git.apache.org>.
GitHub user govind-menon reopened a pull request:

    https://github.com/apache/storm/pull/2385

    STORM-2727: Generic Resource Aware Scheduling

    Remaining
    
    1. Add Tests
    2. Do more manual testing

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/govind-menon/storm YSTORM-2727

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/2385.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2385
    
----
commit facf515b121c80eed6c02d74104e323cfb9e4a1d
Author: Govind Menon <go...@gmail.com>
Date:   2017-09-07T18:50:05Z

    YSTORM-2725: Generic Resource Scheduling - initial config changes and TopologyBuilder API

commit 4337d44b4f95da34f2b1d0a4160353ab81c0fab0
Author: Govind Menon <go...@gmail.com>
Date:   2017-10-23T15:31:08Z

    YSTORM-2727: Generic Resource Aware Scheduling(GRAS) - metadata, scheduling strategies and configuration enhancements

commit 8e867fcf9183e8476658bff7119d9de8545e9aed
Author: Govind Menon <go...@gmail.com>
Date:   2017-10-23T15:31:08Z

    YSTORM-2727: Generic Resource Aware Scheduling(GRAS) - metadata, scheduling strategies and configuration enhancements

----


---

[GitHub] storm issue #2385: STORM-2727: Generic Resource Aware Scheduling

Posted by govind-menon <gi...@git.apache.org>.
Github user govind-menon commented on the issue:

    https://github.com/apache/storm/pull/2385
  
    Thanks @revans2 ! @HeartSaVioR could you take a look?


---

[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2385#discussion_r149164720
  
    --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java ---
    @@ -238,4 +277,57 @@ public static double sum(Collection<Double> list) {
         public static double avg(Collection<Double> list) {
             return sum(list) / list.size();
         }
    +
    +    /**
    +     * Normalizes a supervisor resource map or topology details map's keys to universal resource names.
    +     * @param resourceMap resource map of either Supervisor or Topology
    +     * @return the resource map with common resource names
    +     */
    +    public static Map<String, Double> normalizedResourceMap(Map<String, Double> resourceMap) {
    +        Map<String, Double> result = new HashMap();
    +
    +        result.putAll(resourceMap);
    +        for(Map.Entry entry: resourceMap.entrySet()) {
    +            if (resourceNameMapping.containsKey(entry.getKey())) {
    +                result.put(resourceNameMapping.get(entry.getKey()), ObjectReader.getDouble(entry.getValue(), 0.0));
    +                result.remove(entry.getKey());
    +            }
    +        }
    +        return result;
    +    }
    +
    +    public static Map<String, Double> addResources(Map<String, Double> resourceMap1, Map<String, Double> resourceMap2) {
    +        Map<String, Double> result = new HashMap();
    +
    +        result.putAll(resourceMap1);
    +
    +        for(Map.Entry<String, Double> entry: resourceMap2.entrySet()) {
    --- End diff --
    
    nit space after the for.


---

[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2385#discussion_r149159593
  
    --- Diff: storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java ---
    @@ -576,10 +577,30 @@ public T addConfigurations(Map<String, Object> conf) {
                     throw new IllegalArgumentException("Cannot set serializations for a component using fluent API");
                 }
                 String currConf = _commons.get(_id).get_json_conf();
    -            _commons.get(_id).set_json_conf(mergeIntoJson(Utils.parseJson(currConf), conf));
    +            _commons.get(_id).set_json_conf(mergeIntoJson(parseJson(currConf), conf));
                 return (T) this;
             }
    -        
    +
    +        @SuppressWarnings("unchecked")
    +        @Override
    +        public T addResource(String resourceName, Number resourceValue) {
    +            Map<String, Double> resourcesMap = (Map<String, Double>) getRASConfiguration().get(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP);
    +
    +            if (resourcesMap == null) {
    +                resourcesMap = new HashMap<>();
    +            }
    +            resourcesMap.put(resourceName, resourceValue.doubleValue());
    +
    +            getRASConfiguration().put(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, resourcesMap);
    --- End diff --
    
    Like with the others this is not going to work.  the map you are changing is never put back into the serialized json config.


---

[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2385#discussion_r149163756
  
    --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java ---
    @@ -129,41 +137,65 @@ public static String getJsonWithUpdatedResources(String jsonConf, Map<String, Do
             }
         }
     
    -    public static void checkIntialization(Map<String, Double> topologyResources, String com,
    -                                          Map<String, Object> topologyConf) {
    -        checkInitMem(topologyResources, com, topologyConf);
    -        checkInitCpu(topologyResources, com, topologyConf);
    -    }
    +    public static void checkInitialization(Map<String, Double> topologyResources, String componentId, Map topologyConf) {
    --- End diff --
    
    Why are we losing the type hints on topologyConf?


---

[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2385#discussion_r147233694
  
    --- Diff: examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/CaptureLoad.java ---
    @@ -430,37 +427,37 @@ public static void main(String[] args) throws Exception {
             return topologyResources;
         }
     
    -    static void checkIntialization(Map<String, Double> topologyResources, String com,
    -                                          Map<String, Object> topologyConf) {
    -        checkInitMem(topologyResources, com, topologyConf);
    -        checkInitCpu(topologyResources, com, topologyConf);
    -    }
    +    /**
    +     * Checks if the topology's resource requirements are initialized.
    +     * @param topologyResources map of resouces requirements
    +     * @param componentId component for which initialization is being conducted
    +     * @param topologyConf topology configuration
    +     * @throws Exception on any error
    +     */
    +    public static void checkInitialization(Map<String, Double> topologyResources, String componentId, Map topologyConf) {
    +        StringBuilder msgBuilder = new StringBuilder();
     
    -    static void checkInitMem(Map<String, Double> topologyResources, String com,
    -                                     Map<String, Object> topologyConf) {
    -        if (!topologyResources.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB)) {
    -            Double onHeap = ObjectReader.getDouble(
    -                topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB), null);
    -            if (onHeap != null) {
    -                topologyResources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, onHeap);
    -            }
    +        for (String resourceName : topologyResources.keySet()) {
    +            msgBuilder.append(checkInitResource(topologyResources, topologyConf, resourceName));
             }
    -        if (!topologyResources.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB)) {
    -            Double offHeap = ObjectReader.getDouble(
    -                topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB), null);
    -            if (offHeap != null) {
    -                topologyResources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, offHeap);
    -            }
    +
    +        if (msgBuilder.length() > 0) {
    +            String resourceDefaults = msgBuilder.toString();
    +            LOG.debug(
    +                    "Unable to extract resource requirement for Component {} \n Resources : {}",
    +                    componentId, resourceDefaults);
             }
         }
     
    -    static void checkInitCpu(Map<String, Double> topologyResources, String com,
    -                                     Map<String, Object> topologyConf) {
    -        if (!topologyResources.containsKey(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT)) {
    -            Double cpu = ObjectReader.getDouble(topologyConf.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT), null);
    -            if (cpu != null) {
    -                topologyResources.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, cpu);
    -            }
    +    private static String checkInitResource(Map<String, Double> topologyResources, Map topologyConf, String resourceName) {
    +        StringBuilder msgBuilder = new StringBuilder();
    +        if (topologyResources.containsKey(resourceName)) {
    +            Double resourceValue = (Double) topologyConf.getOrDefault(resourceName, null);
    --- End diff --
    
    I think you missed something in the translation.  I think it should be more like
    
    ```
    if (topologyResources.containsKey(resourceName)) {
        Double resourceValue = ObjectReader.getDouble(topologyConf.get(resourceName), null);
        if (resourceValue != null) {
            topologyResources.put(resourceName, resourceValue);
        }
    ...
    ```


---

[GitHub] storm issue #2385: STORM-2727: Generic Resource Aware Scheduling

Posted by govind-menon <gi...@git.apache.org>.
Github user govind-menon commented on the issue:

    https://github.com/apache/storm/pull/2385
  
    @HeartSaVioR Thank you - I will change the commit and PR titles.


---

[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2385#discussion_r149144246
  
    --- Diff: storm-client/src/jvm/org/apache/storm/Constants.java ---
    @@ -56,5 +58,21 @@
         public static final String STORM_ACTIVE_ATOM = "storm-active-atom";
         public static final String COMPONENT_TO_DEBUG_ATOM = "storm-component->debug-atom";
         public static final Object LOAD_MAPPING = "load-mapping";
    +
    +    public static final String COMMON_CPU_RESOURCE_NAME = "cpu.pcore.percent";
    +    public static final String COMMON_ONHEAP_MEMORY_RESOURCE_NAME = "onheap.memory.mb";
    +    public static final String COMMON_OFFHEAP_MEMORY_RESOURCE_NAME = "offheap.memory.mb";
    +    public static final String COMMON_TOTAL_MEMORY_RESOURCE_NAME = "memory.mb";
    +
    +    public static final Map<String, String> resourceNameMapping;
    +
    +    static {
    +        resourceNameMapping = new HashMap();
    +        resourceNameMapping.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, COMMON_CPU_RESOURCE_NAME);
    +        resourceNameMapping.put(Config.SUPERVISOR_CPU_CAPACITY, COMMON_CPU_RESOURCE_NAME);
    +        resourceNameMapping.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, COMMON_ONHEAP_MEMORY_RESOURCE_NAME);
    +        resourceNameMapping.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, COMMON_OFFHEAP_MEMORY_RESOURCE_NAME);
    +        resourceNameMapping.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, COMMON_TOTAL_MEMORY_RESOURCE_NAME);
    --- End diff --
    
    nit: Can we make this a read only map when we save it?  I don't think we want to modify it after it is setup.
    
    ```
    static {
            Map<String, String> tmp = new HashMap<>();
            tmp.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, COMMON_CPU_RESOURCE_NAME);
            tmp.put(Config.SUPERVISOR_CPU_CAPACITY, COMMON_CPU_RESOURCE_NAME);
            tmp.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, COMMON_ONHEAP_MEMORY_RESOURCE_NAME);
            tmp.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, COMMON_OFFHEAP_MEMORY_RESOURCE_NAME);
            tmp.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, COMMON_TOTAL_MEMORY_RESOURCE_NAME);
            resourceNameMapping = Collections.unmodifiableMap(tmp);
    }
    ```


---

[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2385#discussion_r147237442
  
    --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java ---
    @@ -189,13 +201,20 @@ private static void checkInitCpu(Map<String, Double> topologyResources, String c
                             null);
                         topologyResources.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, topoCpu);
                     }
    -                LOG.debug("Topology Resources {}", topologyResources);
    +
    +                // If resource is also present in resources map will overwrite the above
    +                if (jsonObject.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP)) {
    +                    topologyResources.putAll((Map<String, Double>) jsonObject.get(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP));
    +                }
    +                    LOG.info("Topology Resources {}", topologyResources);
                 }
             } catch (ParseException e) {
                 LOG.error("Failed to parse component resources is:" + e.toString(), e);
                 return null;
             }
    -        return topologyResources;
    +        LOG.info("Topology Resources {}", normalizedResourceMap(topologyResources));
    +        LOG.info("Topology Resources {}", normalizedResourceMap(topologyResources));
    --- End diff --
    
    Why do we need to log this twice?  or at all?


---

[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2385#discussion_r149160853
  
    --- Diff: storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java ---
    @@ -73,11 +75,23 @@ private SupervisorInfo buildSupervisorInfo(Map<String, Object> conf, Supervisor
     
         private Map<String, Double> mkSupervisorCapacities(Map<String, Object> conf) {
             Map<String, Double> ret = new HashMap<String, Double>();
    +        // Put in legacy values
             Double mem = ObjectReader.getDouble(conf.get(Config.SUPERVISOR_MEMORY_CAPACITY_MB), 4096.0);
             ret.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, mem);
             Double cpu = ObjectReader.getDouble(conf.get(Config.SUPERVISOR_CPU_CAPACITY), 400.0);
             ret.put(Config.SUPERVISOR_CPU_CAPACITY, cpu);
    -        return ret;
    +
    +
    +        // If configs are present in Generic map and legacy - the legacy values will be overwritten
    +        Map<String, Double> resourcesMap = (Map<String,Double>) conf.get(Config.SUPERVISOR_RESOURCES_MAP);
    --- End diff --
    
    This is not guaranteed to be a Double.  It is guaranteed to be a Number.  So we need to convert it with something like.
    
    ```
            if (resourcesMap != null) {
                for (Map.Entry<String, Number> stringNumberEntry : resourcesMap.entrySet()) {
                    ret.put(stringNumberEntry.getKey(), stringNumberEntry.getValue().doubleValue());
                }
            }
    ```


---

[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2385#discussion_r147234354
  
    --- Diff: storm-client/src/jvm/org/apache/storm/Config.java ---
    @@ -1272,6 +1278,12 @@
         public static final String SUPERVISOR_CPU_CAPACITY = "supervisor.cpu.capacity";
     
         /**
    +     * A map of resources the Supervisor has e.g {"cpu" : 200.0. "memory.capacity.mb": 256.0, "gpu" : 0.5 }
    +     */
    +    @isMapEntryType(keyType = String.class, valueType = Double.class)
    --- End diff --
    
    Same comment here.  Trying to force the type to be a double is going to be difficult with how we use the configs.


---

[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2385#discussion_r149161816
  
    --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java ---
    @@ -503,43 +553,42 @@ public boolean wouldFit(
             WorkerSlot ws,
             ExecutorDetails exec,
             TopologyDetails td,
    -        double maxHeap,
    -        double memoryAvailable,
    -        double cpuAvailable) {
    -        //NOTE this is called lots and lots by schedulers, so anything we can do to make it faster is going to help a lot.
    -        //CPU is simplest because it does not have odd interactions.
    -        double cpuNeeded = td.getTotalCpuReqTask(exec);
    -        if (cpuNeeded > cpuAvailable) {
    -            if (LOG.isTraceEnabled()) {
    -                LOG.trace("Could not schedule {}:{} on {} not enough CPU {} > {}",
    -                    td.getName(),
    -                    exec,
    -                    ws,
    -                    cpuNeeded,
    -                    cpuAvailable);
    -            }
    -            //Not enough CPU no need to try any more
    -            return false;
    -        }
    +        Map<String, Double> resourcesAvailable,
    +        double maxHeap) {
     
    -        //Lets see if we can make the Memory one fast too, at least in the failure case.
    -        //The totalMemReq is not really that accurate because it does not include shared memory, but if it does not fit we know
    -        // Even with shared it will not work
    -        double minMemNeeded = td.getTotalMemReqTask(exec);
    -        if (minMemNeeded > memoryAvailable) {
    -            if (LOG.isTraceEnabled()) {
    -                LOG.trace("Could not schedule {}:{} on {} not enough Mem {} > {}", td.getName(), exec, ws, minMemNeeded, memoryAvailable);
    +        Map<String, Double> requestedResources = td.getTotalResources(exec);
    +
    +        for (Entry resourceNeededEntry : requestedResources.entrySet()) {
    +            String resourceName = resourceNeededEntry.getKey().toString();
    +            if (resourceName == Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME || resourceName == Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME) {
    +                continue;
    +            }
    +            Double resourceNeeded = ObjectReader.getDouble(resourceNeededEntry.getValue());
    +            Double resourceAvailable = ObjectReader.getDouble(
    +                    resourcesAvailable.getOrDefault(resourceName, null), 0.0);
    --- End diff --
    
    getOrDefault with the second argument a null is the same as calling get.


---

[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

Posted by govind-menon <gi...@git.apache.org>.
Github user govind-menon closed the pull request at:

    https://github.com/apache/storm/pull/2385


---

[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2385#discussion_r147235680
  
    --- Diff: storm-client/src/jvm/org/apache/storm/topology/BaseConfigurationDeclarer.java ---
    @@ -79,8 +81,18 @@ public T setMemoryLoad(Number onHeap, Number offHeap) {
         @Override
         public T setCPULoad(Number amount) {
             if(amount != null) {
    +            addResource(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, amount);
                 return addConfiguration(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, amount);
             }
             return (T) this;
         }
    +
    +    @SuppressWarnings("unchecked")
    +    @Override
    +    public T addResources(Map<String, Double> resources) {
    +        if(resources != null) {
    --- End diff --
    
    nit: you need a space after the if.


---

[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2385#discussion_r147235287
  
    --- Diff: storm-client/src/jvm/org/apache/storm/generated/WorkerResources.java ---
    @@ -1,21 +1,4 @@
     /**
    - * Licensed to the Apache Software Foundation (ASF) under one
    - * or more contributor license agreements.  See the NOTICE file
    - * distributed with this work for additional information
    - * regarding copyright ownership.  The ASF licenses this file
    - * to you under the Apache License, Version 2.0 (the
    - * "License"); you may not use this file except in compliance
    - * with the License.  You may obtain a copy of the License at
    - *
    - * http://www.apache.org/licenses/LICENSE-2.0
    - *
    - * Unless required by applicable law or agreed to in writing, software
    - * distributed under the License is distributed on an "AS IS" BASIS,
    - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    - * See the License for the specific language governing permissions and
    - * limitations under the License.
    - */
    -/**
      * Autogenerated by Thrift Compiler (0.9.3)
    --- End diff --
    
    What happened to the license header?  Please run the shell script to generate the thrift code instead of doing it on your own.


---

[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2385#discussion_r147234270
  
    --- Diff: storm-client/src/jvm/org/apache/storm/Config.java ---
    @@ -240,6 +240,12 @@
         public static final String TOPOLOGY_TASKS = "topology.tasks";
     
         /**
    +     * A map of resources used by each component e.g {"cpu" : 200.0. "onheap.memory.mb": 256.0, "gpu" : 0.5 }
    +     */
    +    @isMapEntryType(keyType = String.class, valueType = Double.class)
    --- End diff --
    
    It is really hard to enforce a double in the conf.  This is because both json and yaml don't know about this and put the parsed number in the smallest type that matches it.  i.e 1 becomes an integer, not a double.  1.0 might be a double, but it might also be a float, I dont' know.  If the number is really big it could be a long.


---

[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2385#discussion_r149164076
  
    --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java ---
    @@ -129,41 +137,65 @@ public static String getJsonWithUpdatedResources(String jsonConf, Map<String, Do
             }
         }
     
    -    public static void checkIntialization(Map<String, Double> topologyResources, String com,
    -                                          Map<String, Object> topologyConf) {
    -        checkInitMem(topologyResources, com, topologyConf);
    -        checkInitCpu(topologyResources, com, topologyConf);
    -    }
    +    public static void checkInitialization(Map<String, Double> topologyResources, String componentId, Map topologyConf) {
    +        StringBuilder msgBuilder = new StringBuilder();
     
    -    private static void checkInitMem(Map<String, Double> topologyResources, String com,
    -                                    Map<String, Object> topologyConf) {
    -        if (!topologyResources.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB)) {
    -            Double onHeap = ObjectReader.getDouble(
    -                topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB), null);
    -            if (onHeap != null) {
    -                topologyResources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, onHeap);
    -                debugMessage("ONHEAP", com, topologyConf);
    -            }
    +        Set<String> resourceNameSet = new HashSet<>();
    +
    +        resourceNameSet.add(
    +                Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT
    +        );
    +        resourceNameSet.add(
    +                Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB
    +        );
    +        resourceNameSet.add(
    +                Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB
    +        );
    +
    +        Map<String, Double> topologyComponentResourcesMap =
    +                (Map<String, Double>) topologyConf.getOrDefault(
    +                        Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, Collections.emptyMap());
    +
    +        resourceNameSet.addAll(topologyResources.keySet());
    +        resourceNameSet.addAll(topologyComponentResourcesMap.keySet());
    +
    +        for (String resourceName : resourceNameSet) {
    +            msgBuilder.append(checkInitResource(topologyResources, topologyConf, topologyComponentResourcesMap, resourceName));
             }
    -        if (!topologyResources.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB)) {
    -            Double offHeap = ObjectReader.getDouble(
    -                topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB), null);
    -            if (offHeap != null) {
    -                topologyResources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, offHeap);
    -                debugMessage("OFFHEAP", com, topologyConf);
    -            }
    +
    +        Map<String, Double> normalizedTopologyResources = normalizedResourceMap(topologyResources);
    +        topologyResources.clear();
    +        topologyResources.putAll(normalizedTopologyResources);
    +
    +        if (msgBuilder.length() > 0) {
    +            String resourceDefaults = msgBuilder.toString();
    +            LOG.debug(
    +                    "Unable to extract resource requirement for Component {} \n Resources : {}",
    +                    componentId, resourceDefaults);
             }
         }
     
    -    private static void checkInitCpu(Map<String, Double> topologyResources, String com,
    -                                     Map<String, Object> topologyConf) {
    -        if (!topologyResources.containsKey(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT)) {
    -            Double cpu = ObjectReader.getDouble(topologyConf.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT), null);
    -            if (cpu != null) {
    -                topologyResources.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, cpu);
    -                debugMessage("CPU", com, topologyConf);
    +    private static String checkInitResource(Map<String, Double> topologyResources, Map topologyConf,
    +                                            Map<String, Double> topologyComponentResourcesMap, String resourceName) {
    +        StringBuilder msgBuilder = new StringBuilder();
    +        String normalizedResourceName = resourceNameMapping.getOrDefault(resourceName, resourceName);
    +        if (!topologyResources.containsKey(normalizedResourceName)) {
    +            if (topologyConf.containsKey(resourceName)) {
    +                Double resourceValue = ObjectReader.getDouble(topologyConf.get(resourceName));
    +                if(resourceValue!=null) {
    +                    topologyResources.put(normalizedResourceName, resourceValue);
    +                }
    +            }
    +
    +            if (topologyComponentResourcesMap.containsKey(normalizedResourceName)) {
    +                Double resourceValue = ObjectReader.getDouble(topologyComponentResourcesMap.get(resourceName));
    +                if(resourceValue!=null) {
    --- End diff --
    
    nit: space after the if.


---

[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

Posted by govind-menon <gi...@git.apache.org>.
Github user govind-menon commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2385#discussion_r149523415
  
    --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java ---
    @@ -18,93 +18,58 @@
     
     package org.apache.storm.scheduler.resource.strategies.scheduling;
     
    -import com.google.common.annotations.VisibleForTesting;
    -
    -import java.util.ArrayList;
    -import java.util.Collection;
    -import java.util.Collections;
    -import java.util.HashMap;
    -import java.util.HashSet;
    -import java.util.LinkedList;
    -import java.util.List;
    -import java.util.Map;
    -import java.util.Queue;
    -import java.util.Set;
    -import java.util.TreeSet;
    -
     import org.apache.storm.Config;
    -import org.apache.storm.generated.ComponentType;
    -import org.apache.storm.scheduler.Cluster;
    -import org.apache.storm.scheduler.Component;
    -import org.apache.storm.scheduler.ExecutorDetails;
    -import org.apache.storm.scheduler.TopologyDetails;
    -import org.apache.storm.scheduler.WorkerSlot;
    -import org.apache.storm.scheduler.resource.RAS_Node;
    -import org.apache.storm.scheduler.resource.RAS_Nodes;
    +import org.apache.storm.scheduler.*;
     import org.apache.storm.scheduler.resource.ResourceUtils;
     import org.apache.storm.scheduler.resource.SchedulingResult;
     import org.apache.storm.scheduler.resource.SchedulingStatus;
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
     
    -public class DefaultResourceAwareStrategy implements IStrategy {
    -    private static final Logger LOG = LoggerFactory.getLogger(DefaultResourceAwareStrategy.class);
    -    private Cluster cluster;
    -    private Map<String, List<String>> networkTopography;
    -    private RAS_Nodes nodes;
    +import java.util.*;
     
    -    @VisibleForTesting
    -    void prepare(Cluster cluster) {
    -        this.cluster = cluster;
    -        nodes = new RAS_Nodes(cluster);
    -        networkTopography = cluster.getNetworkTopography();
    -        logClusterInfo();
    -    }
    +public class DefaultResourceAwareStrategy extends BaseResourceAwareStrategy implements IStrategy {
    --- End diff --
    
    I think that should come in later, ideally there should be a release without GRAS on by default and people can try it out themselves.


---

[GitHub] storm issue #2385: STORM-2727: Generic Resource Aware Scheduling

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/2385
  
    @govind-menon 
    I'm not familiar with RAS and I think +1 from @revans2 is sufficient.
    
    Two suggestions:
    * Please remove starting 'Y' from commit title, as you're submitting patch to Apache Storm project.
    * Please change both the commit title and the title of PR to STORM-2725 if the PR covers STORM-2725 and STORM-2725 can be closed after the patch is merged.


---

[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2385#discussion_r149161951
  
    --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java ---
    @@ -556,6 +605,9 @@ public boolean wouldFit(
             }
     
             double memoryAdded = afterTotal - currentTotal;
    +        double memoryAvailable = ObjectReader.getDouble(
    +                resourcesAvailable.getOrDefault(Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME, null), 0.0);
    --- End diff --
    
    Same as above, just make it a get.


---

[GitHub] storm pull request #2385: YSTORM-2727: Generic Resource Aware Scheduling

Posted by govind-menon <gi...@git.apache.org>.
Github user govind-menon closed the pull request at:

    https://github.com/apache/storm/pull/2385


---

[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2385#discussion_r149148502
  
    --- Diff: storm-client/src/jvm/org/apache/storm/coordination/BatchSubtopologyBuilder.java ---
    @@ -450,9 +456,33 @@ public BoltDeclarer addConfigurations(Map<String, Object> conf) {
             }
     
             @Override
    +        public Map getRASConfiguration() {
    +            for (Map<String, Object> conf : _component.componentConfs) {
    +                if (conf.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP)) {
    +                    return conf;
    +                }
    +            }
    +            return new HashMap<>();
    +        }
    +
    +        @Override
             public BoltDeclarer addSharedMemory(SharedMemory request) {
                 _component.sharedMemory.add(request);
                 return this;
             }
    +
    +        @SuppressWarnings("unchecked")
    +        @Override
    +        public BoltDeclarer addResource(String resourceName, Number resourceValue) {
    +            Map<String, Double> resourcesMap = (Map<String, Double>) getRASConfiguration().get(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP);
    +
    +            if (resourcesMap == null) {
    +                resourcesMap = new HashMap<>();
    +            }
    +            resourcesMap.put(resourceName, resourceValue.doubleValue());
    +
    +            getRASConfiguration().put(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, resourcesMap);
    --- End diff --
    
    This does not work, see my comment above inside `getRASConfiguration`


---

[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2385#discussion_r147235417
  
    --- Diff: storm-client/src/jvm/org/apache/storm/scheduler/SupervisorDetails.java ---
    @@ -17,12 +17,10 @@
      */
     package org.apache.storm.scheduler;
     
    -import java.util.Collection;
    -import java.util.HashSet;
    -import java.util.Set;
    -import java.util.Map;
    +import java.util.*;
    --- End diff --
    
    I think .* is against the coding standard.


---

[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2385#discussion_r147236577
  
    --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java ---
    @@ -503,62 +553,70 @@ public boolean wouldFit(
             WorkerSlot ws,
             ExecutorDetails exec,
             TopologyDetails td,
    -        double maxHeap,
    -        double memoryAvailable,
    -        double cpuAvailable) {
    -        //NOTE this is called lots and lots by schedulers, so anything we can do to make it faster is going to help a lot.
    -        //CPU is simplest because it does not have odd interactions.
    -        double cpuNeeded = td.getTotalCpuReqTask(exec);
    -        if (cpuNeeded > cpuAvailable) {
    -            if (LOG.isTraceEnabled()) {
    -                LOG.trace("Could not schedule {}:{} on {} not enough CPU {} > {}",
    -                    td.getName(),
    -                    exec,
    -                    ws,
    -                    cpuNeeded,
    -                    cpuAvailable);
    +        Map<String, Double> resourcesAvailable,
    +        double maxHeap) {
    +
    +        Map<String, Double> requestedResources = td.getTotalResources(exec);
    +
    +        LOG.info(td.getName());
    +        LOG.info("requested");
    +        LOG.info(requestedResources.toString());
    +        LOG.info("available");
    +        LOG.info(resourcesAvailable.toString());
    +        LOG.info(ws.toString());
    +        for (Entry resourceNeededEntry : requestedResources.entrySet()) {
    +            String resourceName = resourceNeededEntry.getKey().toString();
    +            if (resourceName == Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME || resourceName == Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME) {
    +                continue;
                 }
    -            //Not enough CPU no need to try any more
    -            return false;
    -        }
    -
    -        //Lets see if we can make the Memory one fast too, at least in the failure case.
    -        //The totalMemReq is not really that accurate because it does not include shared memory, but if it does not fit we know
    -        // Even with shared it will not work
    -        double minMemNeeded = td.getTotalMemReqTask(exec);
    -        if (minMemNeeded > memoryAvailable) {
    -            if (LOG.isTraceEnabled()) {
    -                LOG.trace("Could not schedule {}:{} on {} not enough Mem {} > {}", td.getName(), exec, ws, minMemNeeded, memoryAvailable);
    +            Double resourceNeeded = ObjectReader.getDouble(resourceNeededEntry.getValue());
    +            Double resourceAvailable = ObjectReader.getDouble(
    +                    resourcesAvailable.getOrDefault(resourceName, null), 0.0);
    +            if (resourceNeeded > resourceAvailable) {
    +                if (true) {
    +                    LOG.info("Could not schedule {}:{} on {} not enough {} {} > {}",
    +                            td.getName(),
    +                            exec,
    +                            ws,
    +                            resourceName,
    +                            resourceNeeded,
    +                            resourceAvailable);
    +                }
    +                //Not enough resources - stop trying
    +                return false;
                 }
    -            //Not enough minimum MEM no need to try any more
    -            return false;
             }
     
             double currentTotal = 0.0;
             double afterTotal = 0.0;
             double afterOnHeap = 0.0;
    +
             Set<ExecutorDetails> wouldBeAssigned = new HashSet<>();
             wouldBeAssigned.add(exec);
             SchedulerAssignmentImpl assignment = assignments.get(td.getId());
    +
             if (assignment != null) {
                 Collection<ExecutorDetails> currentlyAssigned = assignment.getSlotToExecutors().get(ws);
                 if (currentlyAssigned != null) {
                     wouldBeAssigned.addAll(currentlyAssigned);
                     WorkerResources wrCurrent = calculateWorkerResources(td, currentlyAssigned);
                     currentTotal = wrCurrent.get_mem_off_heap() + wrCurrent.get_mem_on_heap();
                 }
    -            WorkerResources wrAfter = calculateWorkerResources(td, wouldBeAssigned);
    -            afterTotal = wrAfter.get_mem_off_heap() + wrAfter.get_mem_on_heap();
    -            afterOnHeap = wrAfter.get_mem_on_heap();
    -
    -            currentTotal += calculateSharedOffHeapMemory(ws.getNodeId(), assignment);
    -            afterTotal += calculateSharedOffHeapMemory(ws.getNodeId(), assignment, exec);
             }
    +        WorkerResources wrAfter = calculateWorkerResources(td, wouldBeAssigned);
    +        afterTotal = wrAfter.get_mem_off_heap() + wrAfter.get_mem_on_heap();
    +        afterOnHeap = wrAfter.get_mem_on_heap();
    +
    +        currentTotal += calculateSharedOffHeapMemory(ws.getNodeId(), assignment);
    +        afterTotal += calculateSharedOffHeapMemory(ws.getNodeId(), assignment, exec);
     
             double memoryAdded = afterTotal - currentTotal;
    +        double memoryAvailable = ObjectReader.getDouble(
    +                resourcesAvailable.getOrDefault(Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME, null), 0.0);
    +
             if (memoryAdded > memoryAvailable) {
    -            if (LOG.isTraceEnabled()) {
    -                LOG.trace("Could not schedule {}:{} on {} not enough Mem {} > {}",
    +            if (true) {
    +                LOG.info("Could not schedule {}:{} on {} not enough Mem {} > {}",
    --- End diff --
    
    There are several places like this where I think the logs need to go back to how they were before.


---

[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2385#discussion_r147236780
  
    --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java ---
    @@ -664,6 +722,9 @@ private double calculateSharedOffHeapMemory(String nodeId, SchedulerAssignmentIm
     
         private double calculateSharedOffHeapMemory(
             String nodeId, SchedulerAssignmentImpl assignment, ExecutorDetails extra) {
    +        if (assignment == null) {
    --- End diff --
    
    When is the assignment null???  Why would we try to calculate the shared off heap memory for no assignment???


---

[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2385#discussion_r149144724
  
    --- Diff: storm-client/src/jvm/org/apache/storm/coordination/BatchSubtopologyBuilder.java ---
    @@ -450,9 +456,33 @@ public BoltDeclarer addConfigurations(Map<String, Object> conf) {
             }
     
             @Override
    +        public Map getRASConfiguration() {
    +            for (Map<String, Object> conf : _component.componentConfs) {
    +                if (conf.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP)) {
    +                    return conf;
    +                }
    +            }
    +            return new HashMap<>();
    +        }
    +
    +        @Override
             public BoltDeclarer addSharedMemory(SharedMemory request) {
                 _component.sharedMemory.add(request);
                 return this;
             }
    +
    +        @SuppressWarnings("unchecked")
    +        @Override
    +        public BoltDeclarer addResource(String resourceName, Number resourceValue) {
    +            Map<String, Double> resourcesMap = (Map<String, Double>) getRASConfiguration().get(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP);
    +
    +            if (resourcesMap == null) {
    --- End diff --
    
    The above code never returns a null.


---

[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2385#discussion_r149164027
  
    --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java ---
    @@ -129,41 +137,65 @@ public static String getJsonWithUpdatedResources(String jsonConf, Map<String, Do
             }
         }
     
    -    public static void checkIntialization(Map<String, Double> topologyResources, String com,
    -                                          Map<String, Object> topologyConf) {
    -        checkInitMem(topologyResources, com, topologyConf);
    -        checkInitCpu(topologyResources, com, topologyConf);
    -    }
    +    public static void checkInitialization(Map<String, Double> topologyResources, String componentId, Map topologyConf) {
    +        StringBuilder msgBuilder = new StringBuilder();
     
    -    private static void checkInitMem(Map<String, Double> topologyResources, String com,
    -                                    Map<String, Object> topologyConf) {
    -        if (!topologyResources.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB)) {
    -            Double onHeap = ObjectReader.getDouble(
    -                topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB), null);
    -            if (onHeap != null) {
    -                topologyResources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, onHeap);
    -                debugMessage("ONHEAP", com, topologyConf);
    -            }
    +        Set<String> resourceNameSet = new HashSet<>();
    +
    +        resourceNameSet.add(
    +                Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT
    +        );
    +        resourceNameSet.add(
    +                Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB
    +        );
    +        resourceNameSet.add(
    +                Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB
    +        );
    +
    +        Map<String, Double> topologyComponentResourcesMap =
    +                (Map<String, Double>) topologyConf.getOrDefault(
    +                        Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, Collections.emptyMap());
    +
    +        resourceNameSet.addAll(topologyResources.keySet());
    +        resourceNameSet.addAll(topologyComponentResourcesMap.keySet());
    +
    +        for (String resourceName : resourceNameSet) {
    +            msgBuilder.append(checkInitResource(topologyResources, topologyConf, topologyComponentResourcesMap, resourceName));
             }
    -        if (!topologyResources.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB)) {
    -            Double offHeap = ObjectReader.getDouble(
    -                topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB), null);
    -            if (offHeap != null) {
    -                topologyResources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, offHeap);
    -                debugMessage("OFFHEAP", com, topologyConf);
    -            }
    +
    +        Map<String, Double> normalizedTopologyResources = normalizedResourceMap(topologyResources);
    +        topologyResources.clear();
    +        topologyResources.putAll(normalizedTopologyResources);
    +
    +        if (msgBuilder.length() > 0) {
    +            String resourceDefaults = msgBuilder.toString();
    +            LOG.debug(
    +                    "Unable to extract resource requirement for Component {} \n Resources : {}",
    +                    componentId, resourceDefaults);
             }
         }
     
    -    private static void checkInitCpu(Map<String, Double> topologyResources, String com,
    -                                     Map<String, Object> topologyConf) {
    -        if (!topologyResources.containsKey(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT)) {
    -            Double cpu = ObjectReader.getDouble(topologyConf.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT), null);
    -            if (cpu != null) {
    -                topologyResources.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, cpu);
    -                debugMessage("CPU", com, topologyConf);
    +    private static String checkInitResource(Map<String, Double> topologyResources, Map topologyConf,
    +                                            Map<String, Double> topologyComponentResourcesMap, String resourceName) {
    +        StringBuilder msgBuilder = new StringBuilder();
    +        String normalizedResourceName = resourceNameMapping.getOrDefault(resourceName, resourceName);
    +        if (!topologyResources.containsKey(normalizedResourceName)) {
    +            if (topologyConf.containsKey(resourceName)) {
    +                Double resourceValue = ObjectReader.getDouble(topologyConf.get(resourceName));
    +                if(resourceValue!=null) {
    --- End diff --
    
    nit: space after the if.


---

[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2385#discussion_r147234853
  
    --- Diff: storm-client/src/jvm/org/apache/storm/Constants.java ---
    @@ -56,5 +58,20 @@
         public static final String STORM_ACTIVE_ATOM = "storm-active-atom";
         public static final String COMPONENT_TO_DEBUG_ATOM = "storm-component->debug-atom";
         public static final Object LOAD_MAPPING = "load-mapping";
    +
    +    public static final String COMMON_CPU_RESOURCE_NAME = "cpu.pcore.percent";
    +    public static final String COMMON_ONHEAP_MEMORY_RESOURCE_NAME = "onheap.memory.mb";
    +    public static final String COMMON_OFFHEAP_MEMORY_RESOURCE_NAME = "offheap.memory.mb";
    +    public static final String COMMON_TOTAL_MEMORY_RESOURCE_NAME = "memory.mb";
    --- End diff --
    
    Can we document these in some place? For example the configs show some examples, but they are not exhaustive, and the rebalance command is really bad at this so a user has no idea what to put in as the key.


---

[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2385#discussion_r149155733
  
    --- Diff: storm-client/src/jvm/org/apache/storm/drpc/LinearDRPCTopologyBuilder.java ---
    @@ -398,6 +405,30 @@ public LinearDRPCInputDeclarer addConfigurations(Map<String, Object> conf) {
                 return this;
             }
     
    +        @SuppressWarnings("unchecked")
    +        @Override
    +        public LinearDRPCInputDeclarer addResource(String resourceName, Number resourceValue) {
    +            Map<String, Double> resourcesMap = (Map<String, Double>) getRASConfiguration().get(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP);
    +
    +            if (resourcesMap == null) {
    +                resourcesMap = new HashMap<>();
    +            }
    +            resourcesMap.put(resourceName, resourceValue.doubleValue());
    +
    +            getRASConfiguration().put(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, resourcesMap);
    --- End diff --
    
    This has the same problem as the above code.  It is not going to work to add in a resource for the first time.


---

[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2385#discussion_r149164648
  
    --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java ---
    @@ -238,4 +277,57 @@ public static double sum(Collection<Double> list) {
         public static double avg(Collection<Double> list) {
             return sum(list) / list.size();
         }
    +
    +    /**
    +     * Normalizes a supervisor resource map or topology details map's keys to universal resource names.
    +     * @param resourceMap resource map of either Supervisor or Topology
    +     * @return the resource map with common resource names
    +     */
    +    public static Map<String, Double> normalizedResourceMap(Map<String, Double> resourceMap) {
    +        Map<String, Double> result = new HashMap();
    +
    +        result.putAll(resourceMap);
    +        for(Map.Entry entry: resourceMap.entrySet()) {
    --- End diff --
    
    nit: space after the for.


---

[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2385#discussion_r149160149
  
    --- Diff: storm-client/src/storm.thrift ---
    @@ -496,6 +496,8 @@ struct WorkerResources {
         3: optional double cpu;
         4: optional double shared_mem_on_heap; //This is just for accounting mem_on_heap should be used for enforcement
         5: optional double shared_mem_off_heap; //This is just for accounting mem_off_heap should be used for enforcement
    +    6: optional map<string, double> resources; // Generic resources Map
    +    7: optional map<string, double> shared_resources; // Shared Generic resources Map
    --- End diff --
    
    Are we using this?  If not lets hold off on adding it in until we support it.


---

[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2385#discussion_r149161456
  
    --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java ---
    @@ -503,43 +553,42 @@ public boolean wouldFit(
             WorkerSlot ws,
             ExecutorDetails exec,
             TopologyDetails td,
    -        double maxHeap,
    -        double memoryAvailable,
    -        double cpuAvailable) {
    -        //NOTE this is called lots and lots by schedulers, so anything we can do to make it faster is going to help a lot.
    -        //CPU is simplest because it does not have odd interactions.
    -        double cpuNeeded = td.getTotalCpuReqTask(exec);
    -        if (cpuNeeded > cpuAvailable) {
    -            if (LOG.isTraceEnabled()) {
    -                LOG.trace("Could not schedule {}:{} on {} not enough CPU {} > {}",
    -                    td.getName(),
    -                    exec,
    -                    ws,
    -                    cpuNeeded,
    -                    cpuAvailable);
    -            }
    -            //Not enough CPU no need to try any more
    -            return false;
    -        }
    +        Map<String, Double> resourcesAvailable,
    +        double maxHeap) {
     
    -        //Lets see if we can make the Memory one fast too, at least in the failure case.
    -        //The totalMemReq is not really that accurate because it does not include shared memory, but if it does not fit we know
    -        // Even with shared it will not work
    -        double minMemNeeded = td.getTotalMemReqTask(exec);
    -        if (minMemNeeded > memoryAvailable) {
    -            if (LOG.isTraceEnabled()) {
    -                LOG.trace("Could not schedule {}:{} on {} not enough Mem {} > {}", td.getName(), exec, ws, minMemNeeded, memoryAvailable);
    +        Map<String, Double> requestedResources = td.getTotalResources(exec);
    +
    +        for (Entry resourceNeededEntry : requestedResources.entrySet()) {
    +            String resourceName = resourceNeededEntry.getKey().toString();
    +            if (resourceName == Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME || resourceName == Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME) {
    --- End diff --
    
    These need to be a .equals instead of ==.  A lot of the time this might work, but there are cases with strings where it will not work.


---

[GitHub] storm issue #2385: STORM-2727: Generic Resource Aware Scheduling

Posted by govind-menon <gi...@git.apache.org>.
Github user govind-menon commented on the issue:

    https://github.com/apache/storm/pull/2385
  
    @HeartSaVioR this PR should cover STORM-2725 (which covers STORM-2727). I apologize for the confusion - I was planning this as several PRs and it snowballed into one big PR.


---

[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2385#discussion_r149159816
  
    --- Diff: storm-client/src/jvm/org/apache/storm/trident/topology/TridentTopologyBuilder.java ---
    @@ -371,6 +371,30 @@ public SpoutDeclarer addConfigurations(Map<String, Object> conf) {
                 return this;
             }
     
    +        @SuppressWarnings("unchecked")
    +        @Override
    +        public SpoutDeclarer addResource(String resourceName, Number resourceValue) {
    +            Map<String, Double> resourcesMap = (Map<String, Double>) getRASConfiguration().get(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP);
    +
    +            if (resourcesMap == null) {
    +                resourcesMap = new HashMap<>();
    +            }
    +            resourcesMap.put(resourceName, resourceValue.doubleValue());
    +
    +            getRASConfiguration().put(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, resourcesMap);
    --- End diff --
    
    And here...


---

[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

Posted by Ethanlm <gi...@git.apache.org>.
Github user Ethanlm commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2385#discussion_r147755026
  
    --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/GenericResourceAwareStrategy.java ---
    @@ -0,0 +1,194 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.scheduler.resource.strategies.scheduling;
    +
    +import com.google.common.annotations.VisibleForTesting;
    +
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.HashSet;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.TreeSet;
    +
    +import org.apache.storm.Config;
    +import org.apache.storm.scheduler.Cluster;
    +import org.apache.storm.scheduler.Component;
    +import org.apache.storm.scheduler.ExecutorDetails;
    +import org.apache.storm.scheduler.TopologyDetails;
    +import org.apache.storm.scheduler.resource.ResourceUtils;
    +import org.apache.storm.scheduler.resource.SchedulingResult;
    +import org.apache.storm.scheduler.resource.SchedulingStatus;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +public class GenericResourceAwareStrategy extends BaseResourceAwareStrategy implements IStrategy {
    +    private static final Logger LOG = LoggerFactory.getLogger(GenericResourceAwareStrategy.class);
    +
    +    @Override
    +    public SchedulingResult schedule(Cluster cluster, TopologyDetails td) {
    +        prepare(cluster);
    +        if (nodes.getNodes().size() <= 0) {
    +            LOG.warn("No available nodes to schedule tasks on!");
    +            return SchedulingResult.failure(
    +                    SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES, "No available nodes to schedule tasks on!");
    +        }
    +        Collection<ExecutorDetails> unassignedExecutors =
    +                new HashSet<>(this.cluster.getUnassignedExecutors(td));
    +        LOG.info("ExecutorsNeedScheduling: {}", unassignedExecutors);
    +        Collection<ExecutorDetails> scheduledTasks = new ArrayList<>();
    +        List<Component> spouts = this.getSpouts(td);
    +
    +        if (spouts.size() == 0) {
    +            LOG.error("Cannot find a Spout!");
    +            return SchedulingResult.failure(
    +                    SchedulingStatus.FAIL_INVALID_TOPOLOGY, "Cannot find a Spout!");
    +        }
    +
    +        //order executors to be scheduled
    +        List<ExecutorDetails> orderedExecutors = this.orderExecutors(td, unassignedExecutors);
    +        LOG.info("orderedExecutors");
    +        LOG.info(orderedExecutors.  toString());
    --- End diff --
    
    Is it better to put them in one LOG.info()?   (I am not sure if LOG.debug() is better here)


---

[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2385#discussion_r149162313
  
    --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java ---
    @@ -993,6 +1045,31 @@ public WorkerResources getWorkerResources(WorkerSlot ws) {
         }
     
         @Override
    +    public Map<String, Double> getAllScheduledResourcesForNode(String nodeId) {
    +        Map<String, Double> totalScheduledResources = new HashMap<>();
    +        for (SchedulerAssignmentImpl assignment : assignments.values()) {
    +            for (Entry<WorkerSlot, WorkerResources> entry :
    +                    assignment.getScheduledResources().entrySet()) {
    +                if (nodeId.equals(entry.getKey().getNodeId())) {
    +                    WorkerResources resources = entry.getValue();
    +                    for (Map.Entry<String, Double> resourceEntry : resources.get_resources().entrySet()) {
    +                        Double currentResourceValue = totalScheduledResources.containsKey(resourceEntry.getKey()) ? totalScheduledResources.get(resourceEntry.getKey()) : 0.0;
    --- End diff --
    
    This is actually the perfect place to call getOrDefault.


---

[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2385#discussion_r149162413
  
    --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java ---
    @@ -993,6 +1045,31 @@ public WorkerResources getWorkerResources(WorkerSlot ws) {
         }
     
         @Override
    +    public Map<String, Double> getAllScheduledResourcesForNode(String nodeId) {
    +        Map<String, Double> totalScheduledResources = new HashMap<>();
    +        for (SchedulerAssignmentImpl assignment : assignments.values()) {
    +            for (Entry<WorkerSlot, WorkerResources> entry :
    +                    assignment.getScheduledResources().entrySet()) {
    +                if (nodeId.equals(entry.getKey().getNodeId())) {
    +                    WorkerResources resources = entry.getValue();
    +                    for (Map.Entry<String, Double> resourceEntry : resources.get_resources().entrySet()) {
    +                        Double currentResourceValue = totalScheduledResources.containsKey(resourceEntry.getKey()) ? totalScheduledResources.get(resourceEntry.getKey()) : 0.0;
    +                        totalScheduledResources.put(resourceEntry.getKey().toString(), currentResourceValue + ObjectReader.getDouble(resourceEntry.getValue()));
    +                    }
    +
    +                }
    +            }
    +            Double sharedOffHeap = assignment.getNodeIdToTotalSharedOffHeapMemory().get(nodeId);
    +            if (sharedOffHeap != null) {
    +                String resourceName = Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME;
    +                Double currentResourceValue = totalScheduledResources.containsKey(resourceName) ? totalScheduledResources.get(resourceName) : 0.0;
    --- End diff --
    
    Here also getOrDefault.


---

[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

Posted by govind-menon <gi...@git.apache.org>.
Github user govind-menon commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2385#discussion_r149225052
  
    --- Diff: examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/CaptureLoad.java ---
    @@ -430,37 +427,39 @@ public static void main(String[] args) throws Exception {
             return topologyResources;
         }
     
    -    static void checkIntialization(Map<String, Double> topologyResources, String com,
    -                                          Map<String, Object> topologyConf) {
    -        checkInitMem(topologyResources, com, topologyConf);
    -        checkInitCpu(topologyResources, com, topologyConf);
    -    }
    +    /**
    +     * Checks if the topology's resource requirements are initialized.
    +     * @param topologyResources map of resouces requirements
    --- End diff --
    
    @revans2  I agree I was just following of how checkInitialization was behaving before.


---

[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2385#discussion_r149162718
  
    --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java ---
    @@ -133,30 +136,31 @@ public StormTopology getTopology() {
     
         private void initResourceList() {
             this.resourceList = new HashMap<>();
    -        // Extract bolt memory info
    +        // Extract bolt resource info
             if (topology.get_bolts() != null) {
                 for (Map.Entry<String, Bolt> bolt : topology.get_bolts().entrySet()) {
                     //the json_conf is populated by TopologyBuilder (e.g. boltDeclarer.setMemoryLoad)
                     Map<String, Double> topologyResources =
                         ResourceUtils.parseResources(bolt.getValue().get_common().get_json_conf());
    -                ResourceUtils.checkIntialization(topologyResources, bolt.getKey(), topologyConf);
    +                ResourceUtils.checkInitialization(topologyResources, bolt.getKey(), this.topologyConf);
                     for (Map.Entry<ExecutorDetails, String> anExecutorToComponent :
                         executorToComponent.entrySet()) {
    -                    if (bolt.getKey().equals(anExecutorToComponent.getValue())) {
    +                    if (bolt.getKey().equals(anExecutorToComponent.getValue())  && topologyResources.keySet().size() > 0) {
                             resourceList.put(anExecutorToComponent.getKey(), topologyResources);
                         }
                     }
                 }
             }
    -        // Extract spout memory info
    +        // Extract spout resource info
             if (topology.get_spouts() != null) {
                 for (Map.Entry<String, SpoutSpec> spout : topology.get_spouts().entrySet()) {
                     Map<String, Double> topologyResources =
                         ResourceUtils.parseResources(spout.getValue().get_common().get_json_conf());
    -                ResourceUtils.checkIntialization(topologyResources, spout.getKey(), this.topologyConf);
    +                ResourceUtils.checkInitialization(topologyResources, spout.getKey(), this.topologyConf);
    +
                     for (Map.Entry<ExecutorDetails, String> anExecutorToComponent :
                         executorToComponent.entrySet()) {
    -                    if (spout.getKey().equals(anExecutorToComponent.getValue())) {
    +                    if (spout.getKey().equals(anExecutorToComponent.getValue()) && topologyResources.keySet().size() > 0) {
    --- End diff --
    
    nit: here too `!topologyResources.isEmpty()`


---

[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2385#discussion_r149159736
  
    --- Diff: storm-client/src/jvm/org/apache/storm/transactional/TransactionalTopologyBuilder.java ---
    @@ -228,10 +229,34 @@ public SpoutDeclarer addConfigurations(Map<String, Object> conf) {
             }
     
             @Override
    +        public Map getRASConfiguration() {
    +            for(Map<String, Object> conf : _spoutConfs) {
    +                if (conf.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP)) {
    +                    return conf;
    +                }
    +            }
    +            return new HashMap<>();
    +        }
    +
    +        @Override
             public SpoutDeclarer addSharedMemory(SharedMemory request) {
                 _spoutSharedMemory.add(request);
                 return this;
    -        }        
    +        }
    +
    +        @SuppressWarnings("unchecked")
    +        @Override
    +        public SpoutDeclarer addResource(String resourceName, Number resourceValue) {
    +            Map<String, Double> resourcesMap = (Map<String, Double>) getRASConfiguration().get(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP);
    +
    +            if (resourcesMap == null) {
    +                resourcesMap = new HashMap<>();
    +            }
    +            resourcesMap.put(resourceName, resourceValue.doubleValue());
    +
    +            getRASConfiguration().put(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, resourcesMap);
    --- End diff --
    
    Here too I don't think this will work for the same reasons as elsewhere.


---

[GitHub] storm issue #2385: STORM-2727: Generic Resource Aware Scheduling

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/2385
  
    Just a curious: Does this patch address only STORM-2727, or all the things in STORM-2725? The prefix of title of PR is STORM-2727, but the content is STORM-2725 (more clear, just a epic name).


---

[GitHub] storm issue #2385: YSTORM-2727: Generic Resource Aware Scheduling

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on the issue:

    https://github.com/apache/storm/pull/2385
  
    @govind-menon why is YSTORM-2725 prefixed by Y? Should it be STORM-2725? If so, can you please fix the typo. Thanks.


---

[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2385#discussion_r149159863
  
    --- Diff: storm-client/src/jvm/org/apache/storm/trident/topology/TridentTopologyBuilder.java ---
    @@ -763,9 +787,33 @@ public BoltDeclarer addConfigurations(Map<String, Object> conf) {
             }
     
             @Override
    +        public Map getRASConfiguration() {
    +            for(Map<String, Object> conf : _component.componentConfs) {
    +                if (conf.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP)) {
    +                    return conf;
    +                }
    +            }
    +            return new HashMap<>();
    +        }
    +
    +        @Override
             public BoltDeclarer addSharedMemory(SharedMemory request) {
                 _component.sharedMemory.add(request);
                 return this;
             }
    +
    +        @SuppressWarnings("unchecked")
    +        @Override
    +        public BoltDeclarer addResource(String resourceName, Number resourceValue) {
    +            Map<String, Double> resourcesMap = (Map<String, Double>) getRASConfiguration().get(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP);
    +
    +            if (resourcesMap == null) {
    +                resourcesMap = new HashMap<>();
    +            }
    +            resourcesMap.put(resourceName, resourceValue.doubleValue());
    +
    +            getRASConfiguration().put(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, resourcesMap);
    --- End diff --
    
    And here...


---

[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2385#discussion_r149140827
  
    --- Diff: examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/CaptureLoad.java ---
    @@ -430,37 +427,39 @@ public static void main(String[] args) throws Exception {
             return topologyResources;
         }
     
    -    static void checkIntialization(Map<String, Double> topologyResources, String com,
    -                                          Map<String, Object> topologyConf) {
    -        checkInitMem(topologyResources, com, topologyConf);
    -        checkInitCpu(topologyResources, com, topologyConf);
    -    }
    +    /**
    +     * Checks if the topology's resource requirements are initialized.
    +     * @param topologyResources map of resouces requirements
    --- End diff --
    
    nit: could we add a comment that topologyResources will be modified if not properly initialized?  Just so it is clear, because that is not a typical thing to do in a java method.


---

[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2385#discussion_r147236953
  
    --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java ---
    @@ -993,6 +1054,31 @@ public WorkerResources getWorkerResources(WorkerSlot ws) {
         }
     
         @Override
    +    public Map<String, Double> getAllScheduledResourcesForNode(String nodeId) {
    +        Map<String, Double> totalScheduledResources = new HashMap<>();
    +        for (SchedulerAssignmentImpl assignment : assignments.values()) {
    +            for (Entry<WorkerSlot, WorkerResources> entry :
    +                    assignment.getScheduledResources().entrySet()) {
    +                if (nodeId.equals(entry.getKey().getNodeId())) {
    +                    WorkerResources resources = entry.getValue();
    +                    for (Map.Entry resourceEntry : resources.get_resources().entrySet()) {
    --- End diff --
    
    Could you please add in the types for the Map.Entry?


---

[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

Posted by Ethanlm <gi...@git.apache.org>.
Github user Ethanlm commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2385#discussion_r147755367
  
    --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/GenericResourceAwareStrategy.java ---
    @@ -0,0 +1,194 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.scheduler.resource.strategies.scheduling;
    +
    +import com.google.common.annotations.VisibleForTesting;
    +
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.HashSet;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.TreeSet;
    +
    +import org.apache.storm.Config;
    +import org.apache.storm.scheduler.Cluster;
    +import org.apache.storm.scheduler.Component;
    +import org.apache.storm.scheduler.ExecutorDetails;
    +import org.apache.storm.scheduler.TopologyDetails;
    +import org.apache.storm.scheduler.resource.ResourceUtils;
    +import org.apache.storm.scheduler.resource.SchedulingResult;
    +import org.apache.storm.scheduler.resource.SchedulingStatus;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +public class GenericResourceAwareStrategy extends BaseResourceAwareStrategy implements IStrategy {
    +    private static final Logger LOG = LoggerFactory.getLogger(GenericResourceAwareStrategy.class);
    +
    +    @Override
    +    public SchedulingResult schedule(Cluster cluster, TopologyDetails td) {
    +        prepare(cluster);
    +        if (nodes.getNodes().size() <= 0) {
    +            LOG.warn("No available nodes to schedule tasks on!");
    +            return SchedulingResult.failure(
    +                    SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES, "No available nodes to schedule tasks on!");
    +        }
    +        Collection<ExecutorDetails> unassignedExecutors =
    +                new HashSet<>(this.cluster.getUnassignedExecutors(td));
    +        LOG.info("ExecutorsNeedScheduling: {}", unassignedExecutors);
    +        Collection<ExecutorDetails> scheduledTasks = new ArrayList<>();
    +        List<Component> spouts = this.getSpouts(td);
    +
    +        if (spouts.size() == 0) {
    +            LOG.error("Cannot find a Spout!");
    +            return SchedulingResult.failure(
    +                    SchedulingStatus.FAIL_INVALID_TOPOLOGY, "Cannot find a Spout!");
    +        }
    +
    +        //order executors to be scheduled
    +        List<ExecutorDetails> orderedExecutors = this.orderExecutors(td, unassignedExecutors);
    +        LOG.info("orderedExecutors");
    +        LOG.info(orderedExecutors.  toString());
    +        Collection<ExecutorDetails> executorsNotScheduled = new HashSet<>(unassignedExecutors);
    +        List<String> favoredNodes = (List<String>) td.getConf().get(Config.TOPOLOGY_SCHEDULER_FAVORED_NODES);
    +        List<String> unFavoredNodes = (List<String>) td.getConf().get(Config.TOPOLOGY_SCHEDULER_UNFAVORED_NODES);
    +
    +        for (ExecutorDetails exec : orderedExecutors) {
    +            LOG.debug(
    +                    "Attempting to schedule: {} of component {}[ REQ {} ]",
    +                    exec,
    +                    td.getExecutorToComponent().get(exec),
    +                    td.getTaskResourceReqList(exec));
    +            final List<ObjectResources> sortedNodes = this.sortAllNodes(td, exec, favoredNodes, unFavoredNodes);
    +            LOG.info("sortedNodes");
    +            LOG.info(sortedNodes.toString());
    +
    --- End diff --
    
    same here


---

[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2385#discussion_r149161667
  
    --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java ---
    @@ -503,43 +553,42 @@ public boolean wouldFit(
             WorkerSlot ws,
             ExecutorDetails exec,
             TopologyDetails td,
    -        double maxHeap,
    -        double memoryAvailable,
    -        double cpuAvailable) {
    -        //NOTE this is called lots and lots by schedulers, so anything we can do to make it faster is going to help a lot.
    -        //CPU is simplest because it does not have odd interactions.
    -        double cpuNeeded = td.getTotalCpuReqTask(exec);
    -        if (cpuNeeded > cpuAvailable) {
    -            if (LOG.isTraceEnabled()) {
    -                LOG.trace("Could not schedule {}:{} on {} not enough CPU {} > {}",
    -                    td.getName(),
    -                    exec,
    -                    ws,
    -                    cpuNeeded,
    -                    cpuAvailable);
    -            }
    -            //Not enough CPU no need to try any more
    -            return false;
    -        }
    +        Map<String, Double> resourcesAvailable,
    +        double maxHeap) {
     
    -        //Lets see if we can make the Memory one fast too, at least in the failure case.
    -        //The totalMemReq is not really that accurate because it does not include shared memory, but if it does not fit we know
    -        // Even with shared it will not work
    -        double minMemNeeded = td.getTotalMemReqTask(exec);
    -        if (minMemNeeded > memoryAvailable) {
    -            if (LOG.isTraceEnabled()) {
    -                LOG.trace("Could not schedule {}:{} on {} not enough Mem {} > {}", td.getName(), exec, ws, minMemNeeded, memoryAvailable);
    +        Map<String, Double> requestedResources = td.getTotalResources(exec);
    --- End diff --
    
    Again I don't think this is guaranteed to be a Double.  It needs to be a Number that we can then convert.


---

[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2385#discussion_r149159775
  
    --- Diff: storm-client/src/jvm/org/apache/storm/transactional/TransactionalTopologyBuilder.java ---
    @@ -534,9 +559,33 @@ public BoltDeclarer addConfigurations(Map<String, Object> conf) {
             }
     
             @Override
    +        public Map getRASConfiguration() {
    +            for(Map<String, Object> conf : _component.componentConfs) {
    +                if (conf.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP)) {
    +                    return conf;
    +                }
    +            }
    +            return new HashMap<>();
    +        }
    +
    +        @Override
             public BoltDeclarer addSharedMemory(SharedMemory request) {
                 _component.sharedMemory.add(request);
                 return this;
             }
    +
    +        @SuppressWarnings("unchecked")
    +        @Override
    +        public BoltDeclarer addResource(String resourceName, Number resourceValue) {
    +            Map<String, Double> resourcesMap = (Map<String, Double>) getRASConfiguration().get(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP);
    +
    +            if (resourcesMap == null) {
    +                resourcesMap = new HashMap<>();
    +            }
    +            resourcesMap.put(resourceName, resourceValue.doubleValue());
    +
    +            getRASConfiguration().put(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, resourcesMap);
    --- End diff --
    
    And here.


---

[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2385#discussion_r149162640
  
    --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java ---
    @@ -133,30 +136,31 @@ public StormTopology getTopology() {
     
         private void initResourceList() {
             this.resourceList = new HashMap<>();
    -        // Extract bolt memory info
    +        // Extract bolt resource info
             if (topology.get_bolts() != null) {
                 for (Map.Entry<String, Bolt> bolt : topology.get_bolts().entrySet()) {
                     //the json_conf is populated by TopologyBuilder (e.g. boltDeclarer.setMemoryLoad)
                     Map<String, Double> topologyResources =
                         ResourceUtils.parseResources(bolt.getValue().get_common().get_json_conf());
    -                ResourceUtils.checkIntialization(topologyResources, bolt.getKey(), topologyConf);
    +                ResourceUtils.checkInitialization(topologyResources, bolt.getKey(), this.topologyConf);
                     for (Map.Entry<ExecutorDetails, String> anExecutorToComponent :
                         executorToComponent.entrySet()) {
    -                    if (bolt.getKey().equals(anExecutorToComponent.getValue())) {
    +                    if (bolt.getKey().equals(anExecutorToComponent.getValue())  && topologyResources.keySet().size() > 0) {
    --- End diff --
    
    nit: `!topologyResources.isEmpty()` is a bit cleaner.


---

[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2385#discussion_r149162982
  
    --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java ---
    @@ -289,17 +300,13 @@ public Double getTotalMemReqTask(ExecutorDetails exec) {
     
         /**
          * Gets the total memory resource list for a set of tasks that is part of a topology.
    -     * @return Map<ExecutorDetails, Double> a map of the total memory requirement for all tasks in topology topoId.
    +     * @param executors
    --- End diff --
    
    nit Having a param with no explanation is not really helpful.


---

[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

Posted by govind-menon <gi...@git.apache.org>.
Github user govind-menon commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2385#discussion_r148045890
  
    --- Diff: storm-client/src/jvm/org/apache/storm/scheduler/SupervisorDetails.java ---
    @@ -17,12 +17,10 @@
      */
     package org.apache.storm.scheduler;
     
    -import java.util.Collection;
    -import java.util.HashSet;
    -import java.util.Set;
    -import java.util.Map;
    +import java.util.*;
    --- End diff --
    
    @Ethanlm Thanks!


---

[GitHub] storm issue #2385: STORM-2727: Generic Resource Aware Scheduling

Posted by govind-menon <gi...@git.apache.org>.
Github user govind-menon commented on the issue:

    https://github.com/apache/storm/pull/2385
  
    Added tests. I've also manually tested by using a non-GRAS submitter to submit a topology to a GRAS nimbus and it worked fine. Follow on PRs will have tests about the ordering itself. Thanks everyone for your feedback so far!


---

[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2385#discussion_r149162901
  
    --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java ---
    @@ -289,17 +300,13 @@ public Double getTotalMemReqTask(ExecutorDetails exec) {
     
         /**
          * Gets the total memory resource list for a set of tasks that is part of a topology.
    -     * @return Map<ExecutorDetails, Double> a map of the total memory requirement for all tasks in topology topoId.
    +     * @param executors
    +     * @return Map<ExecutorDetails, Double> ,
    --- End diff --
    
    The type is not needed here, and I think it all works on a single line.


---

[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2385#discussion_r149143172
  
    --- Diff: storm-client/src/jvm/org/apache/storm/Config.java ---
    @@ -241,6 +241,12 @@
         public static final String TOPOLOGY_TASKS = "topology.tasks";
     
         /**
    +     * A map of resources used by each component e.g {"cpu" : 200.0. "onheap.memory.mb": 256.0, "gpu" : 0.5 }
    --- End diff --
    
    Do we actually want users setting `cpu` as the name of one of the resources?  Because in the code we are looking for `cpu.pcore.percent` or `topology.component.cpu.pcore.percent` and I really think this is going to confuse our users.  Especially if we are also telling them to use this for the `SUPERVISOR_RESOURCES_MAP`.
    
    Can we please make sure that the comments here match with what we would expect the user to actually set, and ideally have them match any naming convention that we would want them to use?


---

[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2385#discussion_r147236417
  
    --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java ---
    @@ -503,62 +553,70 @@ public boolean wouldFit(
             WorkerSlot ws,
             ExecutorDetails exec,
             TopologyDetails td,
    -        double maxHeap,
    -        double memoryAvailable,
    -        double cpuAvailable) {
    -        //NOTE this is called lots and lots by schedulers, so anything we can do to make it faster is going to help a lot.
    -        //CPU is simplest because it does not have odd interactions.
    -        double cpuNeeded = td.getTotalCpuReqTask(exec);
    -        if (cpuNeeded > cpuAvailable) {
    -            if (LOG.isTraceEnabled()) {
    -                LOG.trace("Could not schedule {}:{} on {} not enough CPU {} > {}",
    -                    td.getName(),
    -                    exec,
    -                    ws,
    -                    cpuNeeded,
    -                    cpuAvailable);
    +        Map<String, Double> resourcesAvailable,
    +        double maxHeap) {
    +
    +        Map<String, Double> requestedResources = td.getTotalResources(exec);
    +
    +        LOG.info(td.getName());
    +        LOG.info("requested");
    +        LOG.info(requestedResources.toString());
    +        LOG.info("available");
    +        LOG.info(resourcesAvailable.toString());
    +        LOG.info(ws.toString());
    +        for (Entry resourceNeededEntry : requestedResources.entrySet()) {
    +            String resourceName = resourceNeededEntry.getKey().toString();
    +            if (resourceName == Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME || resourceName == Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME) {
    +                continue;
                 }
    -            //Not enough CPU no need to try any more
    -            return false;
    -        }
    -
    -        //Lets see if we can make the Memory one fast too, at least in the failure case.
    -        //The totalMemReq is not really that accurate because it does not include shared memory, but if it does not fit we know
    -        // Even with shared it will not work
    -        double minMemNeeded = td.getTotalMemReqTask(exec);
    -        if (minMemNeeded > memoryAvailable) {
    -            if (LOG.isTraceEnabled()) {
    -                LOG.trace("Could not schedule {}:{} on {} not enough Mem {} > {}", td.getName(), exec, ws, minMemNeeded, memoryAvailable);
    +            Double resourceNeeded = ObjectReader.getDouble(resourceNeededEntry.getValue());
    +            Double resourceAvailable = ObjectReader.getDouble(
    +                    resourcesAvailable.getOrDefault(resourceName, null), 0.0);
    +            if (resourceNeeded > resourceAvailable) {
    +                if (true) {
    --- End diff --
    
    I think this is a debug logging change.  Do you want to change it back?


---

[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2385#discussion_r149163179
  
    --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java ---
    @@ -479,6 +499,18 @@ private void addDefaultResforExec(ExecutorDetails exec) {
     
             adjustResourcesForExec(exec, defaultResourceList);
     
    +        Map<String,Double> topologyComponentResourcesMap = (
    +                Map<String, Double>) this.topologyConf.getOrDefault(
    +                    Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, new HashMap<>()
    +        );
    +
    +        assert topologyComponentResourcesMap != null;
    +
    +        //topologyComponentResourcesMap = normalizedResourceMap(topologyComponentResourcesMap);
    --- End diff --
    
    Not needed please delete.


---

[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/storm/pull/2385


---

[GitHub] storm issue #2385: STORM-2727: Generic Resource Aware Scheduling

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:

    https://github.com/apache/storm/pull/2385
  
    @govind-menon could you please squash all of the commits?


---

[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2385#discussion_r149776019
  
    --- Diff: storm-client/src/jvm/org/apache/storm/coordination/BatchSubtopologyBuilder.java ---
    @@ -450,9 +456,33 @@ public BoltDeclarer addConfigurations(Map<String, Object> conf) {
             }
     
             @Override
    +        public Map getRASConfiguration() {
    +            for (Map<String, Object> conf : _component.componentConfs) {
    --- End diff --
    
    Still complicated, but I will file a separate JIRA to clean it all up.


---

[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2385#discussion_r149161531
  
    --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java ---
    @@ -503,43 +553,42 @@ public boolean wouldFit(
             WorkerSlot ws,
             ExecutorDetails exec,
             TopologyDetails td,
    -        double maxHeap,
    -        double memoryAvailable,
    -        double cpuAvailable) {
    -        //NOTE this is called lots and lots by schedulers, so anything we can do to make it faster is going to help a lot.
    -        //CPU is simplest because it does not have odd interactions.
    -        double cpuNeeded = td.getTotalCpuReqTask(exec);
    -        if (cpuNeeded > cpuAvailable) {
    -            if (LOG.isTraceEnabled()) {
    -                LOG.trace("Could not schedule {}:{} on {} not enough CPU {} > {}",
    -                    td.getName(),
    -                    exec,
    -                    ws,
    -                    cpuNeeded,
    -                    cpuAvailable);
    -            }
    -            //Not enough CPU no need to try any more
    -            return false;
    -        }
    +        Map<String, Double> resourcesAvailable,
    +        double maxHeap) {
     
    -        //Lets see if we can make the Memory one fast too, at least in the failure case.
    -        //The totalMemReq is not really that accurate because it does not include shared memory, but if it does not fit we know
    -        // Even with shared it will not work
    -        double minMemNeeded = td.getTotalMemReqTask(exec);
    -        if (minMemNeeded > memoryAvailable) {
    -            if (LOG.isTraceEnabled()) {
    -                LOG.trace("Could not schedule {}:{} on {} not enough Mem {} > {}", td.getName(), exec, ws, minMemNeeded, memoryAvailable);
    +        Map<String, Double> requestedResources = td.getTotalResources(exec);
    +
    +        for (Entry resourceNeededEntry : requestedResources.entrySet()) {
    +            String resourceName = resourceNeededEntry.getKey().toString();
    --- End diff --
    
    Why are we calling toString on a String?


---

[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2385#discussion_r149162183
  
    --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java ---
    @@ -993,6 +1045,31 @@ public WorkerResources getWorkerResources(WorkerSlot ws) {
         }
     
         @Override
    +    public Map<String, Double> getAllScheduledResourcesForNode(String nodeId) {
    +        Map<String, Double> totalScheduledResources = new HashMap<>();
    +        for (SchedulerAssignmentImpl assignment : assignments.values()) {
    +            for (Entry<WorkerSlot, WorkerResources> entry :
    +                    assignment.getScheduledResources().entrySet()) {
    +                if (nodeId.equals(entry.getKey().getNodeId())) {
    +                    WorkerResources resources = entry.getValue();
    +                    for (Map.Entry<String, Double> resourceEntry : resources.get_resources().entrySet()) {
    +                        Double currentResourceValue = totalScheduledResources.containsKey(resourceEntry.getKey()) ? totalScheduledResources.get(resourceEntry.getKey()) : 0.0;
    +                        totalScheduledResources.put(resourceEntry.getKey().toString(), currentResourceValue + ObjectReader.getDouble(resourceEntry.getValue()));
    --- End diff --
    
    Why calling toString on a String?


---

[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2385#discussion_r149163670
  
    --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java ---
    @@ -57,7 +64,8 @@
             if (topology.get_spouts() != null) {
                 for (Map.Entry<String, SpoutSpec> spout : topology.get_spouts().entrySet()) {
                     Map<String, Double> topologyResources = parseResources(spout.getValue().get_common().get_json_conf());
    -                checkIntialization(topologyResources, spout.getValue().toString(), topologyConf);
    +                checkInitialization(topologyResources, spout.getKey(), topologyConf);
    +                LOG.warn("Turned {} into {}", spout.getValue().get_common().get_json_conf(), topologyResources);
    --- End diff --
    
    Can we delete this and the one on line 54 too?  it looks like a debug statement that accidentally slipped into the code. Truth be told I think it is my debug statement that slipped into this...


---

[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2385#discussion_r147237164
  
    --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java ---
    @@ -168,6 +172,15 @@ private void initResourceList() {
             // topology.getbolt (AKA sys tasks most specifically __acker tasks)
             for (ExecutorDetails exec : getExecutors()) {
                 if (!resourceList.containsKey(exec)) {
    +                // TODO: Update this debug statement
    --- End diff --
    
    Either update it or remove the TODO....


---

[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2385#discussion_r149148257
  
    --- Diff: storm-client/src/jvm/org/apache/storm/coordination/BatchSubtopologyBuilder.java ---
    @@ -450,9 +456,33 @@ public BoltDeclarer addConfigurations(Map<String, Object> conf) {
             }
     
             @Override
    +        public Map getRASConfiguration() {
    +            for (Map<String, Object> conf : _component.componentConfs) {
    --- End diff --
    
    OK so this is crazy over complicated for something that does not need all of this complexity.  I don't know why `_component.componentConfs` is a list of maps instead of just a map because they will all be merged together in the final component.  If you could refactor it to be just a single map then life is a lot simpler, if not then we should not be just returning a new empty hash map, we should be adding it to the `_component.componentConfs` or it is going to be lost.


---

[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2385#discussion_r147236218
  
    --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java ---
    @@ -503,62 +553,70 @@ public boolean wouldFit(
             WorkerSlot ws,
             ExecutorDetails exec,
             TopologyDetails td,
    -        double maxHeap,
    -        double memoryAvailable,
    -        double cpuAvailable) {
    -        //NOTE this is called lots and lots by schedulers, so anything we can do to make it faster is going to help a lot.
    -        //CPU is simplest because it does not have odd interactions.
    -        double cpuNeeded = td.getTotalCpuReqTask(exec);
    -        if (cpuNeeded > cpuAvailable) {
    -            if (LOG.isTraceEnabled()) {
    -                LOG.trace("Could not schedule {}:{} on {} not enough CPU {} > {}",
    -                    td.getName(),
    -                    exec,
    -                    ws,
    -                    cpuNeeded,
    -                    cpuAvailable);
    +        Map<String, Double> resourcesAvailable,
    +        double maxHeap) {
    +
    +        Map<String, Double> requestedResources = td.getTotalResources(exec);
    +
    +        LOG.info(td.getName());
    +        LOG.info("requested");
    +        LOG.info(requestedResources.toString());
    +        LOG.info("available");
    +        LOG.info(resourcesAvailable.toString());
    +        LOG.info(ws.toString());
    --- End diff --
    
    I think this is debug logging.  Can we please remove it?


---

[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

Posted by Ethanlm <gi...@git.apache.org>.
Github user Ethanlm commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2385#discussion_r147737087
  
    --- Diff: storm-client/src/jvm/org/apache/storm/scheduler/SupervisorDetails.java ---
    @@ -17,12 +17,10 @@
      */
     package org.apache.storm.scheduler;
     
    -import java.util.Collection;
    -import java.util.HashSet;
    -import java.util.Set;
    -import java.util.Map;
    +import java.util.*;
    --- End diff --
    
    I believe it's automatically done by intellj. You can change the preferences: Preference--> Editor-->Code Syte --> Java --> and then change "class count to use import with '*'" to a very large number, e.g. 999. Then you should never have this problem.


---