You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by jerrypeng <gi...@git.apache.org> on 2015/09/18 21:00:08 UTC

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

GitHub user jerrypeng opened a pull request:

    https://github.com/apache/storm/pull/746

    [STORM-893] Resource Aware Scheduling

    I have created a initial open source implementation of the Resource Aware Scheduler as described in the paper I published: 
    
    web.engr.illinois.edu/~bpeng/files/r-storm.pdf
    
    The paper describes the general architecture, concepts, and algorithms used.
    
    I have written an example topology that demonstrates how to use the API I have written to specify resource requirements in your topology.  Currently the user can only specify two types of resources: Memory and CPU.  We plan on adding support for more resources in the future.
    
    Currently, there is no built in enforcement of resource usage in worker processes, however, we plan to add that functionality via CGroups.
    
    People that worked on the implementation at Yahoo with me:
    
    Bobby Evans (Yahoo & Storm PMC)
    Derek Dagit (Yahoo & Storm PMC)
    Kyle Nusbaum (Yahoo & Storm PMC)
    Liu Zhuo (Yahoo)
    Sanket Chintapalli (Yahoo)
    Reza Fravier (Yahoo & UIUC)

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/jerrypeng/storm opensource_ras

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/746.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #746
    
----
commit e014804a68b9c6887c64b9d03a17929863b80909
Author: Boyang Jerry Peng <je...@yahoo-inc.com>
Date:   2015-09-17T20:50:44Z

    [STORM-893] - Resource Aware Scheduler implementation
    [STORM-894] - Basic Resource Aware Scheduling implementation.

commit 0f3f237f158218cbe46ffc59670f300875eb7950
Author: Boyang Jerry Peng <je...@yahoo-inc.com>
Date:   2015-09-17T20:02:44Z

    Added functionality for users to limit the amount of memory resources allocated to a worker (JVM) process when scheduling with resource aware scheduler. This allows users to potentially spread executors more evenly across workers.
    Also refactored some code

commit 28ee867c5a27c220be562689e61f4bb959a1aa62
Author: Boyang Jerry Peng <je...@yahoo-inc.com>
Date:   2015-09-17T20:56:23Z

    regenerated thrift

commit 0256f6baff63e8c394ebfddbb68d38de5893b053
Author: Boyang Jerry Peng <je...@yahoo-inc.com>
Date:   2015-09-18T18:23:46Z

    adding miscellaneous things

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

Posted by jerrypeng <gi...@git.apache.org>.
Github user jerrypeng commented on the pull request:

    https://github.com/apache/storm/pull/746#issuecomment-145152237
  
    @revans2 Thanks for the review!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

Posted by jerrypeng <gi...@git.apache.org>.
Github user jerrypeng commented on a diff in the pull request:

    https://github.com/apache/storm/pull/746#discussion_r40732961
  
    --- Diff: storm-core/src/jvm/backtype/storm/StormSubmitter.java ---
    @@ -442,4 +444,36 @@ public static String submitJar(Map conf, String localJar, ProgressListener liste
              */
             public void onCompleted(String srcFile, String targetFile, long totalBytes);
         }
    +    
    +
    +    private static void validateConfs(Map stormConf, StormTopology topology) throws IllegalArgumentException {
    +        LOG.info("Validating storm Confs...");
    +        double largestMemReq = getMaxExecutorMemoryUsageForTopo(topology, stormConf);
    +        Double topologyWorkerMaxHeapSize = Utils.getDouble(stormConf.get(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB));
    +        if(topologyWorkerMaxHeapSize < largestMemReq) {
    +            throw new IllegalArgumentException("Topology will not be able to be successfully scheduled: Config TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB="
    +                    +Utils.getDouble(stormConf.get(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB)) + " < " 
    +                            + largestMemReq + " (Largest memory requirement of a component in the topology). Perhaps set TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB to a larger amount");
    +        }
    +    }
    +
    +
    +    private static double getMaxExecutorMemoryUsageForTopo(StormTopology topology, Map topologyConf) {
    +        double largestMemoryOperator = 0.0;
    +        for(Map<String, Double> entry : backtype.storm.scheduler.resource.Utils.getBoltsResources(topology, topologyConf).values()) {
    --- End diff --
    
    rename


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

Posted by jerrypeng <gi...@git.apache.org>.
Github user jerrypeng commented on the pull request:

    https://github.com/apache/storm/pull/746#issuecomment-145261204
  
    @revans2  Thanks for your review again!  I just push a commit that contains those revisions you suggested


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

Posted by erikdw <gi...@git.apache.org>.
Github user erikdw commented on a diff in the pull request:

    https://github.com/apache/storm/pull/746#discussion_r41366711
  
    --- Diff: storm-core/src/jvm/backtype/storm/scheduler/Cluster.java ---
    @@ -438,6 +451,35 @@ public SupervisorDetails getSupervisorById(String nodeId) {
             return this.supervisors;
         }
     
    +    /*
    +    * Note: Make sure the proper conf was passed into the Cluster constructor before calling this function
    +    * It tries to load the proper network topography detection plugin specified in the config.
    +    */
    +    public Map<String, List<String>> getNetworkTopography() {
    +        if (networkTopography == null) {
    +            networkTopography = new HashMap<String, List<String>>();
    +            ArrayList<String> supervisorHostNames = new ArrayList<String>();
    +            for (SupervisorDetails s : supervisors.values()) {
    +                supervisorHostNames.add(s.getHost());
    +            }
    +
    +            String clazz = (String) conf.get(Config.STORM_NETWORK_TOPOGRAPHY_PLUGIN);
    +            DNSToSwitchMapping topographyMapper = (DNSToSwitchMapping) Utils.newInstance(clazz);
    +
    +            Map <String,String> resolvedSuperVisors = topographyMapper.resolve(supervisorHostNames);
    +            for(String hostName: resolvedSuperVisors.keySet()) {
    --- End diff --
    
    Please use consistent for/if/while styling.  i.e., most of your new code is `for(`, but some (see 9 lines above) is `for (`.   It *seems* like the storm project is more consistently using `for (`.
    This comment also applies to the `){` at the end of various lines.  That should always be `) {` IMNSHO.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the pull request:

    https://github.com/apache/storm/pull/746#issuecomment-146880374
  
    @jerrypeng Technically with one +1 from a committer and 24 hours we can merge this in.  But this is a large change so I am glad to see more people looking at it.  I'll try to go though it again and hopefully merge it in shortly.  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

Posted by jerrypeng <gi...@git.apache.org>.
Github user jerrypeng commented on a diff in the pull request:

    https://github.com/apache/storm/pull/746#discussion_r41409816
  
    --- Diff: storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/IStrategy.java ---
    @@ -0,0 +1,37 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package backtype.storm.scheduler.resource.strategies;
    +
    +import java.util.Collection;
    +import java.util.Map;
    +
    +import backtype.storm.scheduler.Topologies;
    +import backtype.storm.scheduler.ExecutorDetails;
    +import backtype.storm.scheduler.TopologyDetails;
    +import backtype.storm.scheduler.WorkerSlot;
    +import backtype.storm.scheduler.resource.RAS_Node;
    +
    +/**
    + * An interface to for implementing different scheduling strategies for the resource aware scheduling
    + * In the future stategies will be pluggable
    + */
    +public interface IStrategy {
    +
    +	public Map<WorkerSlot, Collection<ExecutorDetails>> schedule(TopologyDetails td);
    --- End diff --
    
    will reformate


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

Posted by jerrypeng <gi...@git.apache.org>.
Github user jerrypeng commented on a diff in the pull request:

    https://github.com/apache/storm/pull/746#discussion_r40731631
  
    --- Diff: storm-core/src/jvm/backtype/storm/Config.java ---
    @@ -1101,6 +1127,33 @@
         public static final Object TOPOLOGY_TASKS_SCHEMA = ConfigValidation.IntegerValidator;
     
         /**
    +     * The maximum amount of memory an instance of a spout/bolt will take on heap. This enables the scheduler
    +     * to allocate slots on machines with enough available memory.
    +     */
    +    public static final String TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB = "topology.component.resources.onheap.memory.mb";
    +    public static final Object TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB_SCHEMA = ConfigValidation.NonNegativeNumberValidator;
    +
    +    /**
    +     * The maximum amount of memory an instance of a spout/bolt will take off heap. This enables the scheduler
    +     * to allocate slots on machines with enough available memory.
    +     */
    +    public static final String TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB = "topology.component.resources.offheap.memory.mb";
    +    public static final Object TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB_SCHEMA = ConfigValidation.NonNegativeNumberValidator;
    +
    +    /**
    +     * The config indicates the percentage of cpu for a core. Assuming the a core value to be 100, a
    --- End diff --
    
    will fix comment


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/746#discussion_r40730552
  
    --- Diff: storm-core/src/jvm/backtype/storm/StormSubmitter.java ---
    @@ -442,4 +444,36 @@ public static String submitJar(Map conf, String localJar, ProgressListener liste
              */
             public void onCompleted(String srcFile, String targetFile, long totalBytes);
         }
    +    
    +
    +    private static void validateConfs(Map stormConf, StormTopology topology) throws IllegalArgumentException {
    +        LOG.info("Validating storm Confs...");
    +        double largestMemReq = getMaxExecutorMemoryUsageForTopo(topology, stormConf);
    +        Double topologyWorkerMaxHeapSize = Utils.getDouble(stormConf.get(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB));
    +        if(topologyWorkerMaxHeapSize < largestMemReq) {
    +            throw new IllegalArgumentException("Topology will not be able to be successfully scheduled: Config TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB="
    +                    +Utils.getDouble(stormConf.get(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB)) + " < " 
    +                            + largestMemReq + " (Largest memory requirement of a component in the topology). Perhaps set TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB to a larger amount");
    +        }
    +    }
    +
    +
    +    private static double getMaxExecutorMemoryUsageForTopo(StormTopology topology, Map topologyConf) {
    +        double largestMemoryOperator = 0.0;
    +        for(Map<String, Double> entry : backtype.storm.scheduler.resource.Utils.getBoltsResources(topology, topologyConf).values()) {
    --- End diff --
    
    Can we rename backtype.storm.scheduler.resource.Utils to be something like ResourceUtils instead.  Just so there is no conflict with imports.  I think it will make the code cleaner.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

Posted by jerrypeng <gi...@git.apache.org>.
Github user jerrypeng commented on a diff in the pull request:

    https://github.com/apache/storm/pull/746#discussion_r40849297
  
    --- Diff: storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java ---
    @@ -79,11 +95,302 @@ public StormTopology getTopology() {
                     ret.put(executor, compId);
                 }
             }
    -        
    +
             return ret;
         }
    -    
    +
         public Collection<ExecutorDetails> getExecutors() {
             return this.executorToComponent.keySet();
         }
    +
    +    private void initResourceList() {
    +        _resourceList = new HashMap<ExecutorDetails, Map<String, Double>>();
    +        // Extract bolt memory info
    +        if (this.topology.get_bolts() != null) {
    +            for (Map.Entry<String, Bolt> bolt : this.topology.get_bolts().entrySet()) {
    +                //the json_conf is populated by TopologyBuilder (e.g. boltDeclarer.setMemoryLoad)
    +                Map<String, Double> topology_resources = ResourceUtils.parseResources(bolt
    +                        .getValue().get_common().get_json_conf());
    +                ResourceUtils.checkIntialization(topology_resources, bolt.getValue().toString(), this.topologyConf);
    +                for (Map.Entry<ExecutorDetails, String> anExecutorToComponent : executorToComponent.entrySet()) {
    +                    if (bolt.getKey().equals(anExecutorToComponent.getValue())) {
    +                        _resourceList.put(anExecutorToComponent.getKey(), topology_resources);
    +                    }
    +                }
    +            }
    +        }
    +        // Extract spout memory info
    +        if (this.topology.get_spouts() != null) {
    +            for (Map.Entry<String, SpoutSpec> spout : this.topology.get_spouts().entrySet()) {
    +                Map<String, Double> topology_resources = ResourceUtils.parseResources(spout
    +                        .getValue().get_common().get_json_conf());
    +                ResourceUtils.checkIntialization(topology_resources, spout.getValue().toString(), this.topologyConf);
    +                for (Map.Entry<ExecutorDetails, String> anExecutorToComponent : executorToComponent.entrySet()) {
    +                    if (spout.getKey().equals(anExecutorToComponent.getValue())) {
    +                        _resourceList.put(anExecutorToComponent.getKey(), topology_resources);
    +                    }
    +                }
    +            }
    +        } else {
    +            LOG.warn("Topology " + topologyId + " does not seem to have any spouts!");
    +        }
    +        //schedule tasks that are not part of components returned from topology.get_spout or topology.getbolt (AKA sys tasks most specifically __acker tasks)
    +        for(ExecutorDetails exec : this.getExecutors()) {
    +            if (_resourceList.containsKey(exec) == false) {
    +                LOG.debug(
    +                        "Scheduling {} {} with memory requirement as 'on heap' - {} and 'off heap' - {} and CPU requirement as {}",
    +                        this.getExecutorToComponent().get(exec),
    +                        exec,
    +                        this.topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB),
    +                        this.topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB),
    +                        this.topologyConf.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT));
    +                this.addDefaultResforExec(exec);
    +            } 
    +        }
    +    }
    +
    +    private List<ExecutorDetails> componentToExecs(String comp) {
    +        List<ExecutorDetails> execs = new ArrayList<>();
    +        for (Map.Entry<ExecutorDetails, String> entry : executorToComponent.entrySet()) {
    +            if (entry.getValue().equals(comp)) {
    +                execs.add(entry.getKey());
    +            }
    +        }
    +        return execs;
    +    }
    +
    +    /**
    +     * Returns a representation of the non-system components of the topology graph
    +     * Each Component object in the returning map is populated with the list of its
    +     * parents, children and execs assigned to that component.
    +     * @return a map of components
    +     */
    +    public Map<String, Component> getComponents() {
    +        Map<String, Component> all_comp = new HashMap<String, Component>();
    +
    +        StormTopology storm_topo = this.topology;
    +        // spouts
    +        if (storm_topo.get_spouts() != null) {
    +            for (Map.Entry<String, SpoutSpec> spoutEntry : storm_topo
    +                    .get_spouts().entrySet()) {
    +                if (!Utils.isSystemId(spoutEntry.getKey())) {
    +                    Component newComp = null;
    +                    if (all_comp.containsKey(spoutEntry.getKey())) {
    +                        newComp = all_comp.get(spoutEntry.getKey());
    +                        newComp.execs = componentToExecs(newComp.id);
    +                    } else {
    +                        newComp = new Component(spoutEntry.getKey());
    +                        newComp.execs = componentToExecs(newComp.id);
    +                        all_comp.put(spoutEntry.getKey(), newComp);
    +                    }
    +                    newComp.type = Component.ComponentType.SPOUT;
    +
    +                    for (Map.Entry<GlobalStreamId, Grouping> spoutInput : spoutEntry
    +                            .getValue().get_common().get_inputs()
    +                            .entrySet()) {
    +                        newComp.parents.add(spoutInput.getKey()
    +                                .get_componentId());
    +                        if (!all_comp.containsKey(spoutInput
    +                                .getKey().get_componentId())) {
    +                            all_comp.put(spoutInput.getKey()
    +                                            .get_componentId(),
    +                                    new Component(spoutInput.getKey()
    +                                            .get_componentId()));
    +                        }
    +                        all_comp.get(spoutInput.getKey()
    +                                .get_componentId()).children.add(spoutEntry
    +                                .getKey());
    +                    }
    +                }
    +            }
    +        }
    +        // bolts
    +        if (storm_topo.get_bolts() != null) {
    +            for (Map.Entry<String, Bolt> boltEntry : storm_topo.get_bolts()
    +                    .entrySet()) {
    +                if (!Utils.isSystemId(boltEntry.getKey())) {
    +                    Component newComp = null;
    +                    if (all_comp.containsKey(boltEntry.getKey())) {
    +                        newComp = all_comp.get(boltEntry.getKey());
    +                        newComp.execs = componentToExecs(newComp.id);
    +                    } else {
    +                        newComp = new Component(boltEntry.getKey());
    +                        newComp.execs = componentToExecs(newComp.id);
    +                        all_comp.put(boltEntry.getKey(), newComp);
    +                    }
    +                    newComp.type = Component.ComponentType.BOLT;
    +
    +                    for (Map.Entry<GlobalStreamId, Grouping> boltInput : boltEntry
    +                            .getValue().get_common().get_inputs()
    +                            .entrySet()) {
    +                        newComp.parents.add(boltInput.getKey()
    +                                .get_componentId());
    +                        if (!all_comp.containsKey(boltInput
    +                                .getKey().get_componentId())) {
    +                            all_comp.put(boltInput.getKey()
    +                                            .get_componentId(),
    +                                    new Component(boltInput.getKey()
    +                                            .get_componentId()));
    +                        }
    +                        all_comp.get(boltInput.getKey()
    +                                .get_componentId()).children.add(boltEntry
    +                                .getKey());
    +                    }
    +                }
    +            }
    +        }
    +        return all_comp;
    +    }
    +
    +    /**
    +     * Gets the on heap memory requirement for a
    +     * certain task within a topology
    +     * @param exec the executor the inquiry is concerning.
    +     * @return Double the amount of on heap memory
    +     * requirement for this exec in topology topoId.
    +     */
    +    public Double getOnHeapMemoryRequirement(ExecutorDetails exec) {
    +        Double ret = null;
    +        if (hasExecInTopo(exec)) {
    +            ret = _resourceList
    +                    .get(exec)
    +                    .get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
    +        }
    +        return ret;
    +    }
    +
    +    /**
    +     * Gets the off heap memory requirement for a
    +     * certain task within a topology
    +     * @param exec the executor the inquiry is concerning.
    +     * @return Double the amount of off heap memory
    +     * requirement for this exec in topology topoId.
    +     */
    +    public Double getOffHeapMemoryRequirement(ExecutorDetails exec) {
    +        Double ret = null;
    +        if (hasExecInTopo(exec)) {
    +            ret = _resourceList
    +                    .get(exec)
    +                    .get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB);
    +        }
    +        return ret;
    +    }
    +
    +    /**
    +     * Gets the total memory requirement for a task
    +     * @param exec the executor the inquiry is concerning.
    +     * @return Double the total memory requirement
    +     *  for this exec in topology topoId.
    +     */
    +    public Double getTotalMemReqTask(ExecutorDetails exec) {
    +        if (hasExecInTopo(exec)) {
    +            return getOffHeapMemoryRequirement(exec)
    +                    + getOnHeapMemoryRequirement(exec);
    +        }
    +        LOG.info("cannot find {}", exec);
    +        return null;
    +    }
    +
    +    /**
    +     * Gets the total memory resource list for a
    +     * set of tasks that is part of a topology.
    +     * @return Map<ExecutorDetails, Double> a map of the total memory requirement
    +     *  for all tasks in topology topoId.
    +     */
    +    public Map<ExecutorDetails, Double> getTotalMemoryResourceList() {
    +        Map<ExecutorDetails, Double> ret = new HashMap<ExecutorDetails, Double>();
    +        for (ExecutorDetails exec : _resourceList.keySet()) {
    +            ret.put(exec, getTotalMemReqTask(exec));
    +        }
    +        return ret;
    +    }
    +
    +    /**
    +     * Get the total CPU requirement for executor
    +     * @param exec
    +     * @return Double the total about of cpu requirement for executor
    +     */
    +    public Double getTotalCpuReqTask(ExecutorDetails exec) {
    +        if (hasExecInTopo(exec)) {
    +            return _resourceList
    +                    .get(exec)
    +                    .get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);
    +        }
    +        LOG.warn("cannot find - {}", exec);
    --- End diff --
    
    will remove


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/746#discussion_r40730611
  
    --- Diff: storm-core/src/jvm/backtype/storm/generated/AlreadyAliveException.java ---
    @@ -51,7 +51,7 @@
     import org.slf4j.LoggerFactory;
     
     @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
    -@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-2-6")
    +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-9-18")
    --- End diff --
    
    In general we try to not add in thrift changes that are just date changes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

Posted by jerrypeng <gi...@git.apache.org>.
Github user jerrypeng commented on a diff in the pull request:

    https://github.com/apache/storm/pull/746#discussion_r40850849
  
    --- Diff: storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/IStrategy.java ---
    @@ -0,0 +1,33 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package backtype.storm.scheduler.resource.strategies;
    +
    +import java.util.Collection;
    +import java.util.Map;
    +
    +import backtype.storm.scheduler.Topologies;
    +import backtype.storm.scheduler.ExecutorDetails;
    +import backtype.storm.scheduler.TopologyDetails;
    +import backtype.storm.scheduler.WorkerSlot;
    +import backtype.storm.scheduler.resource.RAS_Node;
    +
    +public interface IStrategy {
    --- End diff --
    
    will add


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/746#discussion_r40838906
  
    --- Diff: storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/IStrategy.java ---
    @@ -0,0 +1,33 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package backtype.storm.scheduler.resource.strategies;
    +
    +import java.util.Collection;
    +import java.util.Map;
    +
    +import backtype.storm.scheduler.Topologies;
    +import backtype.storm.scheduler.ExecutorDetails;
    +import backtype.storm.scheduler.TopologyDetails;
    +import backtype.storm.scheduler.WorkerSlot;
    +import backtype.storm.scheduler.resource.RAS_Node;
    +
    +public interface IStrategy {
    --- End diff --
    
    Can we add a comment about what a Strategy is?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

Posted by zhuoliu <gi...@git.apache.org>.
Github user zhuoliu commented on a diff in the pull request:

    https://github.com/apache/storm/pull/746#discussion_r41285663
  
    --- Diff: storm-core/src/jvm/backtype/storm/utils/Utils.java ---
    @@ -18,10 +18,7 @@
     package backtype.storm.utils;
     
     import backtype.storm.Config;
    -import backtype.storm.generated.AuthorizationException;
    -import backtype.storm.generated.ComponentCommon;
    -import backtype.storm.generated.ComponentObject;
    -import backtype.storm.generated.StormTopology;
    +import backtype.storm.generated.*;
    --- End diff --
    
    May expand this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

Posted by jerrypeng <gi...@git.apache.org>.
Github user jerrypeng commented on a diff in the pull request:

    https://github.com/apache/storm/pull/746#discussion_r40850307
  
    --- Diff: storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java ---
    @@ -0,0 +1,152 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package backtype.storm.scheduler.resource;
    +
    +import java.util.*;
    +
    +import backtype.storm.Config;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import backtype.storm.scheduler.Cluster;
    +import backtype.storm.scheduler.ExecutorDetails;
    +import backtype.storm.scheduler.IScheduler;
    +import backtype.storm.scheduler.Topologies;
    +import backtype.storm.scheduler.TopologyDetails;
    +import backtype.storm.scheduler.WorkerSlot;
    +import backtype.storm.scheduler.resource.strategies.ResourceAwareStrategy;
    +
    +public class ResourceAwareScheduler implements IScheduler {
    +    private static final Logger LOG = LoggerFactory
    +            .getLogger(ResourceAwareScheduler.class);
    +    @SuppressWarnings("rawtypes")
    +    private Map _conf;
    +
    +    @Override
    +    public void prepare(Map conf) {
    +        _conf = conf;
    +    }
    +
    +    @Override
    +    public void schedule(Topologies topologies, Cluster cluster) {
    +        LOG.info("\n\n\nRerunning ResourceAwareScheduler...");
    --- End diff --
    
    will remove/change


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/746#discussion_r40730370
  
    --- Diff: storm-core/src/jvm/backtype/storm/StormSubmitter.java ---
    @@ -442,4 +444,36 @@ public static String submitJar(Map conf, String localJar, ProgressListener liste
              */
             public void onCompleted(String srcFile, String targetFile, long totalBytes);
         }
    +    
    +
    +    private static void validateConfs(Map stormConf, StormTopology topology) throws IllegalArgumentException {
    +        LOG.info("Validating storm Confs...");
    --- End diff --
    
    I don't think we need this log message.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/storm/pull/746


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

Posted by jerrypeng <gi...@git.apache.org>.
Github user jerrypeng commented on the pull request:

    https://github.com/apache/storm/pull/746#issuecomment-146875059
  
    I have two +1s now, can we merge?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

Posted by jerrypeng <gi...@git.apache.org>.
Github user jerrypeng commented on the pull request:

    https://github.com/apache/storm/pull/746#issuecomment-146266179
  
    @erikdw thank you for your review.  I reformated some of the files based on your suggestions


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/746#discussion_r40731979
  
    --- Diff: storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java ---
    @@ -31,41 +42,47 @@
         StormTopology topology;
         Map<ExecutorDetails, String> executorToComponent;
         int numWorkers;
    - 
    +    //<ExecutorDetails - Task, Map<String - Type of resource, Map<String - type of that resource, Double - amount>>>
    +    private Map<ExecutorDetails, Map<String, Double>> _resourceList;
    +    //Scheduler this topology should be scheduled with
    +    private String scheduler;
    --- End diff --
    
    This does not appear to be used anywhere.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/746#discussion_r40837456
  
    --- Diff: storm-core/src/jvm/backtype/storm/generated/AlreadyAliveException.java ---
    @@ -51,7 +51,7 @@
     import org.slf4j.LoggerFactory;
     
     @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
    -@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-2-6")
    +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-9-30")
    --- End diff --
    
    Can we revert the generated files that only have a date change in them?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

Posted by jerrypeng <gi...@git.apache.org>.
Github user jerrypeng commented on a diff in the pull request:

    https://github.com/apache/storm/pull/746#discussion_r40730558
  
    --- Diff: storm-core/src/jvm/backtype/storm/Config.java ---
    @@ -167,6 +167,16 @@
         public static final Object STORM_CLUSTER_MODE_SCHEMA = String.class;
     
         /**
    +     * What Network Topography detection classes should we use.
    +     * Given a list of supervisor hostnames (or IP addresses), this class would return a list of
    +     * rack names that correspond to the supervisors. This information is stored in Cluster.java, and
    +     * is used in the resource aware scheduler.
    +     */
    +    public static final String STORM_NETWORK_TOPOGRAPHY_PLUGIN = "storm.network.topography.plugin";
    +    public static final Object STORM_NETWORK_TOPOGRAPHY_PLUGIN_SCHEMA = String.class;
    +
    +
    --- End diff --
    
    fix


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/746#discussion_r40838272
  
    --- Diff: storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java ---
    @@ -79,11 +95,302 @@ public StormTopology getTopology() {
                     ret.put(executor, compId);
                 }
             }
    -        
    +
             return ret;
         }
    -    
    +
         public Collection<ExecutorDetails> getExecutors() {
             return this.executorToComponent.keySet();
         }
    +
    +    private void initResourceList() {
    +        _resourceList = new HashMap<ExecutorDetails, Map<String, Double>>();
    +        // Extract bolt memory info
    +        if (this.topology.get_bolts() != null) {
    +            for (Map.Entry<String, Bolt> bolt : this.topology.get_bolts().entrySet()) {
    +                //the json_conf is populated by TopologyBuilder (e.g. boltDeclarer.setMemoryLoad)
    +                Map<String, Double> topology_resources = ResourceUtils.parseResources(bolt
    +                        .getValue().get_common().get_json_conf());
    +                ResourceUtils.checkIntialization(topology_resources, bolt.getValue().toString(), this.topologyConf);
    +                for (Map.Entry<ExecutorDetails, String> anExecutorToComponent : executorToComponent.entrySet()) {
    +                    if (bolt.getKey().equals(anExecutorToComponent.getValue())) {
    +                        _resourceList.put(anExecutorToComponent.getKey(), topology_resources);
    +                    }
    +                }
    +            }
    +        }
    +        // Extract spout memory info
    +        if (this.topology.get_spouts() != null) {
    +            for (Map.Entry<String, SpoutSpec> spout : this.topology.get_spouts().entrySet()) {
    +                Map<String, Double> topology_resources = ResourceUtils.parseResources(spout
    +                        .getValue().get_common().get_json_conf());
    +                ResourceUtils.checkIntialization(topology_resources, spout.getValue().toString(), this.topologyConf);
    +                for (Map.Entry<ExecutorDetails, String> anExecutorToComponent : executorToComponent.entrySet()) {
    +                    if (spout.getKey().equals(anExecutorToComponent.getValue())) {
    +                        _resourceList.put(anExecutorToComponent.getKey(), topology_resources);
    +                    }
    +                }
    +            }
    +        } else {
    +            LOG.warn("Topology " + topologyId + " does not seem to have any spouts!");
    +        }
    +        //schedule tasks that are not part of components returned from topology.get_spout or topology.getbolt (AKA sys tasks most specifically __acker tasks)
    +        for(ExecutorDetails exec : this.getExecutors()) {
    +            if (_resourceList.containsKey(exec) == false) {
    +                LOG.debug(
    +                        "Scheduling {} {} with memory requirement as 'on heap' - {} and 'off heap' - {} and CPU requirement as {}",
    +                        this.getExecutorToComponent().get(exec),
    +                        exec,
    +                        this.topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB),
    +                        this.topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB),
    +                        this.topologyConf.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT));
    +                this.addDefaultResforExec(exec);
    +            } 
    +        }
    +    }
    +
    +    private List<ExecutorDetails> componentToExecs(String comp) {
    +        List<ExecutorDetails> execs = new ArrayList<>();
    +        for (Map.Entry<ExecutorDetails, String> entry : executorToComponent.entrySet()) {
    +            if (entry.getValue().equals(comp)) {
    +                execs.add(entry.getKey());
    +            }
    +        }
    +        return execs;
    +    }
    +
    +    /**
    +     * Returns a representation of the non-system components of the topology graph
    +     * Each Component object in the returning map is populated with the list of its
    +     * parents, children and execs assigned to that component.
    +     * @return a map of components
    +     */
    +    public Map<String, Component> getComponents() {
    +        Map<String, Component> all_comp = new HashMap<String, Component>();
    +
    +        StormTopology storm_topo = this.topology;
    +        // spouts
    +        if (storm_topo.get_spouts() != null) {
    +            for (Map.Entry<String, SpoutSpec> spoutEntry : storm_topo
    +                    .get_spouts().entrySet()) {
    +                if (!Utils.isSystemId(spoutEntry.getKey())) {
    +                    Component newComp = null;
    +                    if (all_comp.containsKey(spoutEntry.getKey())) {
    +                        newComp = all_comp.get(spoutEntry.getKey());
    +                        newComp.execs = componentToExecs(newComp.id);
    +                    } else {
    +                        newComp = new Component(spoutEntry.getKey());
    +                        newComp.execs = componentToExecs(newComp.id);
    +                        all_comp.put(spoutEntry.getKey(), newComp);
    +                    }
    +                    newComp.type = Component.ComponentType.SPOUT;
    +
    +                    for (Map.Entry<GlobalStreamId, Grouping> spoutInput : spoutEntry
    +                            .getValue().get_common().get_inputs()
    +                            .entrySet()) {
    +                        newComp.parents.add(spoutInput.getKey()
    +                                .get_componentId());
    +                        if (!all_comp.containsKey(spoutInput
    +                                .getKey().get_componentId())) {
    +                            all_comp.put(spoutInput.getKey()
    +                                            .get_componentId(),
    +                                    new Component(spoutInput.getKey()
    +                                            .get_componentId()));
    +                        }
    +                        all_comp.get(spoutInput.getKey()
    +                                .get_componentId()).children.add(spoutEntry
    +                                .getKey());
    +                    }
    +                }
    +            }
    +        }
    +        // bolts
    +        if (storm_topo.get_bolts() != null) {
    +            for (Map.Entry<String, Bolt> boltEntry : storm_topo.get_bolts()
    +                    .entrySet()) {
    +                if (!Utils.isSystemId(boltEntry.getKey())) {
    +                    Component newComp = null;
    +                    if (all_comp.containsKey(boltEntry.getKey())) {
    +                        newComp = all_comp.get(boltEntry.getKey());
    +                        newComp.execs = componentToExecs(newComp.id);
    +                    } else {
    +                        newComp = new Component(boltEntry.getKey());
    +                        newComp.execs = componentToExecs(newComp.id);
    +                        all_comp.put(boltEntry.getKey(), newComp);
    +                    }
    +                    newComp.type = Component.ComponentType.BOLT;
    +
    +                    for (Map.Entry<GlobalStreamId, Grouping> boltInput : boltEntry
    +                            .getValue().get_common().get_inputs()
    +                            .entrySet()) {
    +                        newComp.parents.add(boltInput.getKey()
    +                                .get_componentId());
    +                        if (!all_comp.containsKey(boltInput
    +                                .getKey().get_componentId())) {
    +                            all_comp.put(boltInput.getKey()
    +                                            .get_componentId(),
    +                                    new Component(boltInput.getKey()
    +                                            .get_componentId()));
    +                        }
    +                        all_comp.get(boltInput.getKey()
    +                                .get_componentId()).children.add(boltEntry
    +                                .getKey());
    +                    }
    +                }
    +            }
    +        }
    +        return all_comp;
    +    }
    +
    +    /**
    +     * Gets the on heap memory requirement for a
    +     * certain task within a topology
    +     * @param exec the executor the inquiry is concerning.
    +     * @return Double the amount of on heap memory
    +     * requirement for this exec in topology topoId.
    +     */
    +    public Double getOnHeapMemoryRequirement(ExecutorDetails exec) {
    +        Double ret = null;
    +        if (hasExecInTopo(exec)) {
    +            ret = _resourceList
    +                    .get(exec)
    +                    .get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
    +        }
    +        return ret;
    +    }
    +
    +    /**
    +     * Gets the off heap memory requirement for a
    +     * certain task within a topology
    +     * @param exec the executor the inquiry is concerning.
    +     * @return Double the amount of off heap memory
    +     * requirement for this exec in topology topoId.
    +     */
    +    public Double getOffHeapMemoryRequirement(ExecutorDetails exec) {
    +        Double ret = null;
    +        if (hasExecInTopo(exec)) {
    +            ret = _resourceList
    +                    .get(exec)
    +                    .get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB);
    +        }
    +        return ret;
    +    }
    +
    +    /**
    +     * Gets the total memory requirement for a task
    +     * @param exec the executor the inquiry is concerning.
    +     * @return Double the total memory requirement
    +     *  for this exec in topology topoId.
    +     */
    +    public Double getTotalMemReqTask(ExecutorDetails exec) {
    +        if (hasExecInTopo(exec)) {
    +            return getOffHeapMemoryRequirement(exec)
    +                    + getOnHeapMemoryRequirement(exec);
    +        }
    +        LOG.info("cannot find {}", exec);
    +        return null;
    +    }
    +
    +    /**
    +     * Gets the total memory resource list for a
    +     * set of tasks that is part of a topology.
    +     * @return Map<ExecutorDetails, Double> a map of the total memory requirement
    +     *  for all tasks in topology topoId.
    +     */
    +    public Map<ExecutorDetails, Double> getTotalMemoryResourceList() {
    +        Map<ExecutorDetails, Double> ret = new HashMap<ExecutorDetails, Double>();
    +        for (ExecutorDetails exec : _resourceList.keySet()) {
    +            ret.put(exec, getTotalMemReqTask(exec));
    +        }
    +        return ret;
    +    }
    +
    +    /**
    +     * Get the total CPU requirement for executor
    +     * @param exec
    +     * @return Double the total about of cpu requirement for executor
    +     */
    +    public Double getTotalCpuReqTask(ExecutorDetails exec) {
    +        if (hasExecInTopo(exec)) {
    +            return _resourceList
    +                    .get(exec)
    +                    .get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);
    +        }
    +        LOG.warn("cannot find - {}", exec);
    +        return null;
    +    }
    +
    +    /**
    +     * get the resources requirements for a executor
    +     * @param exec
    +     * @return a map containing the resource requirements for this exec
    +     */
    +    public Map<String, Double> getTaskResourceReqList(ExecutorDetails exec) {
    +        if (hasExecInTopo(exec)) {
    +            return _resourceList.get(exec);
    +        }
    +        LOG.warn("cannot find - {}", exec);
    +        return null;
    +    }
    +
    +    /**
    +     * Checks if a executor is part of this topology
    +     * @return Boolean whether or not a certain ExecutorDetail is included in the _resourceList.
    +     */
    +    public boolean hasExecInTopo(ExecutorDetails exec) {
    +        if (_resourceList != null) { // null is possible if the first constructor of TopologyDetails is used
    +            return _resourceList.containsKey(exec);
    +        } else {
    +            return false;
    +        }
    +    }
    +
    +    /**
    +     * add resource requirements for a executor
    +     * @param exec
    +     * @param resourceList
    +     */
    +    public void addResourcesForExec(ExecutorDetails exec, Map<String, Double> resourceList) {
    +        if (hasExecInTopo(exec)) {
    +            LOG.warn("Executor {} already exists...ResourceList: {}", exec, getTaskResourceReqList(exec));
    +            return;
    +        }
    +        _resourceList.put(exec, resourceList);
    +    }
    +
    +    /**
    +     * Add default resource requirements for a executor
    +     * @param exec
    +     */
    +    public void addDefaultResforExec(ExecutorDetails exec) {
    +        Map<String, Double> defaultResourceList = new HashMap<String, Double>();
    +        defaultResourceList.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT,
    +                        Utils.getDouble(topologyConf.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT), null));
    +        defaultResourceList.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB,
    +                        Utils.getDouble(topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB), null));
    +        defaultResourceList.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB,
    +                        Utils.getDouble(topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB), null));
    +        LOG.warn("Scheduling Executor: {} with memory requirement as onHeap: {} - offHeap: {} " +
    --- End diff --
    
    This should either be debug or should be removed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/746#discussion_r40838769
  
    --- Diff: storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceUtils.java ---
    @@ -0,0 +1,117 @@
    +package backtype.storm.scheduler.resource;
    --- End diff --
    
    This file needs the apache header.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

Posted by zhuoliu <gi...@git.apache.org>.
Github user zhuoliu commented on a diff in the pull request:

    https://github.com/apache/storm/pull/746#discussion_r41283265
  
    --- Diff: storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/ResourceAwareStrategy.java ---
    @@ -0,0 +1,478 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package backtype.storm.scheduler.resource.strategies;
    +
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +import java.util.Queue;
    +import java.util.TreeMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import backtype.storm.scheduler.Cluster;
    +import backtype.storm.scheduler.ExecutorDetails;
    +import backtype.storm.scheduler.Topologies;
    +import backtype.storm.scheduler.TopologyDetails;
    +import backtype.storm.scheduler.WorkerSlot;
    +import backtype.storm.scheduler.resource.Component;
    +import backtype.storm.scheduler.resource.RAS_Node;
    +
    +public class ResourceAwareStrategy implements IStrategy {
    +    private Logger LOG = null;
    +    private Topologies _topologies;
    +    private Cluster _cluster;
    +    //Map key is the supervisor id and the value is the corresponding RAS_Node Object 
    +    private Map<String, RAS_Node> _availNodes;
    +    private RAS_Node refNode = null;
    +    /**
    +     * supervisor id -> Node
    +     */
    +    private Map<String, RAS_Node> _nodes;
    +    private Map<String, List<String>> _clusterInfo;
    +
    +    private final double CPU_WEIGHT = 1.0;
    +    private final double MEM_WEIGHT = 1.0;
    +    private final double NETWORK_WEIGHT = 1.0;
    +
    +    public ResourceAwareStrategy(Cluster cluster, Topologies topologies) {
    +        _topologies = topologies;
    +        _cluster = cluster;
    +        _nodes = RAS_Node.getAllNodesFrom(cluster, _topologies);
    +        _availNodes = this.getAvailNodes();
    +        this.LOG = LoggerFactory.getLogger(this.getClass());
    +        _clusterInfo = cluster.getNetworkTopography();
    +        LOG.debug(this.getClusterInfo());
    +    }
    +
    +    //the returned TreeMap keeps the Components sorted
    +    private TreeMap<Integer, List<ExecutorDetails>> getPriorityToExecutorDetailsListMap(
    +            Queue<Component> ordered__Component_list, Collection<ExecutorDetails> unassignedExecutors) {
    +        TreeMap<Integer, List<ExecutorDetails>> retMap = new TreeMap<Integer, List<ExecutorDetails>>();
    +        Integer rank = 0;
    +        for (Component ras_comp : ordered__Component_list) {
    +            retMap.put(rank, new ArrayList<ExecutorDetails>());
    +            for(ExecutorDetails exec : ras_comp.execs) {
    +                if(unassignedExecutors.contains(exec)) {
    +                    retMap.get(rank).add(exec);
    +                }
    +            }
    +            rank++;
    +        }
    +        return retMap;
    +    }
    +
    +    public Map<WorkerSlot, Collection<ExecutorDetails>> schedule(TopologyDetails td) {
    +        if (_availNodes.size() <= 0) {
    +            LOG.warn("No available nodes to schedule tasks on!");
    +            return null;
    +        }
    +        Collection<ExecutorDetails> unassignedExecutors = _cluster.getUnassignedExecutors(td);
    +        Map<WorkerSlot, Collection<ExecutorDetails>> schedulerAssignmentMap = new HashMap<WorkerSlot, Collection<ExecutorDetails>>();
    +        LOG.debug("ExecutorsNeedScheduling: {}", unassignedExecutors);
    +        Collection<ExecutorDetails> scheduledTasks = new ArrayList<ExecutorDetails>();
    +        List<Component> spouts = this.getSpouts(_topologies, td);
    +
    +        if (spouts.size() == 0) {
    +            LOG.error("Cannot find a Spout!");
    +            return null;
    +        }
    +
    +        Queue<Component> ordered__Component_list = bfs(_topologies, td, spouts);
    +
    +        Map<Integer, List<ExecutorDetails>> priorityToExecutorMap = getPriorityToExecutorDetailsListMap(ordered__Component_list, unassignedExecutors);
    +        Collection<ExecutorDetails> executorsNotScheduled = new HashSet<ExecutorDetails>(unassignedExecutors);
    +        Integer longestPriorityListSize = this.getLongestPriorityListSize(priorityToExecutorMap);
    +        //Pick the first executor with priority one, then the 1st exec with priority 2, so on an so forth. 
    +        //Once we reach the last priority, we go back to priority 1 and schedule the second task with priority 1.
    +        for (int i = 0; i < longestPriorityListSize; i++) {
    +            for (Entry<Integer, List<ExecutorDetails>> entry : priorityToExecutorMap.entrySet()) {
    +                Iterator<ExecutorDetails> it = entry.getValue().iterator();
    +                if (it.hasNext()) {
    +                    ExecutorDetails exec = it.next();
    +                    LOG.debug("\n\nAttempting to schedule: {} of component {}[avail {}] with rank {}",
    +                            new Object[] { exec, td.getExecutorToComponent().get(exec),
    +                    td.getTaskResourceReqList(exec), entry.getKey() });
    +                    WorkerSlot targetSlot = this.findWorkerForExec(exec, td, schedulerAssignmentMap);
    +                    if (targetSlot != null) {
    +                        RAS_Node targetNode = this.idToNode(targetSlot.getNodeId());
    +                        if(!schedulerAssignmentMap.containsKey(targetSlot)) {
    +                            schedulerAssignmentMap.put(targetSlot, new LinkedList<ExecutorDetails>());
    +                        }
    +                       
    +                        schedulerAssignmentMap.get(targetSlot).add(exec);
    +                        targetNode.consumeResourcesforTask(exec, td);
    +                        scheduledTasks.add(exec);
    +                        LOG.debug("TASK {} assigned to Node: {} avail [mem: {} cpu: {}] total [mem: {} cpu: {}] on slot: {}", exec,
    +                                targetNode, targetNode.getAvailableMemoryResources(),
    +                                targetNode.getAvailableCpuResources(), targetNode.getTotalMemoryResources(),
    +                                targetNode.getTotalCpuResources(), targetSlot);
    +                    } else {
    +                        LOG.error("Not Enough Resources to schedule Task {}", exec);
    +                    }
    +                    it.remove();
    +                }
    +            }
    +        }
    +
    +        executorsNotScheduled.removeAll(scheduledTasks);
    +        LOG.debug("/* Scheduling left over task (most likely sys tasks) */");
    +        // schedule left over system tasks
    +        for (ExecutorDetails exec : executorsNotScheduled) {
    +            WorkerSlot targetSlot = this.findWorkerForExec(exec, td, schedulerAssignmentMap);
    +            if (targetSlot != null) {
    +                RAS_Node targetNode = this.idToNode(targetSlot.getNodeId());
    +                if(schedulerAssignmentMap.containsKey(targetSlot) == false) {
    +                    schedulerAssignmentMap.put(targetSlot, new LinkedList<ExecutorDetails>());
    +                }
    +               
    +                schedulerAssignmentMap.get(targetSlot).add(exec);
    +                targetNode.consumeResourcesforTask(exec, td);
    +                scheduledTasks.add(exec);
    +                LOG.debug("TASK {} assigned to Node: {} avail [mem: {} cpu: {}] total [mem: {} cpu: {}] on slot: {}", exec,
    +                        targetNode, targetNode.getAvailableMemoryResources(),
    +                        targetNode.getAvailableCpuResources(), targetNode.getTotalMemoryResources(),
    +                        targetNode.getTotalCpuResources(), targetSlot);
    +            } else {
    +                LOG.error("Not Enough Resources to schedule Task {}", exec);
    +            }
    +        }
    +        executorsNotScheduled.removeAll(scheduledTasks);
    +        if (executorsNotScheduled.size() > 0) {
    +            LOG.error("Not all executors successfully scheduled: {}",
    +                    executorsNotScheduled);
    +            schedulerAssignmentMap = null;
    +        } else {
    +            LOG.debug("All resources successfully scheduled!");
    +        }
    +        if (schedulerAssignmentMap == null) {
    +            LOG.error("Topology {} not successfully scheduled!", td.getId());
    +        }
    +        return schedulerAssignmentMap;
    +    }
    +
    +    private WorkerSlot findWorkerForExec(ExecutorDetails exec, TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) {
    +      WorkerSlot ws = null;
    +      // first scheduling
    +      if (this.refNode == null) {
    +          String clus = this.getBestClustering();
    +          ws = this.getBestWorker(exec, td, clus, scheduleAssignmentMap);
    +      } else {
    +          ws = this.getBestWorker(exec, td, scheduleAssignmentMap);
    +      }
    +      if(ws != null) {
    +          this.refNode = this.idToNode(ws.getNodeId());
    +      }
    +      LOG.debug("reference node for the resource aware scheduler is: {}", this.refNode);
    +      return ws;
    +    }
    +
    +    private WorkerSlot getBestWorker(ExecutorDetails exec, TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) {
    +        return this.getBestWorker(exec, td, null, scheduleAssignmentMap);
    +    }
    +
    +    private WorkerSlot getBestWorker(ExecutorDetails exec, TopologyDetails td, String clusterId, Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) {
    +        double taskMem = td.getTotalMemReqTask(exec);
    +        double taskCPU = td.getTotalCpuReqTask(exec);
    +        List<RAS_Node> nodes;
    +        if(clusterId != null) {
    +            nodes = this.getAvailableNodesFromCluster(clusterId);
    +            
    +        } else {
    +            nodes = this.getAvailableNodes();
    +        }
    +        TreeMap<Double, RAS_Node> nodeRankMap = new TreeMap<Double, RAS_Node>();
    --- End diff --
    
    May add some brief comments to introduce the logic of choosing best node here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

Posted by jerrypeng <gi...@git.apache.org>.
Github user jerrypeng commented on a diff in the pull request:

    https://github.com/apache/storm/pull/746#discussion_r40849566
  
    --- Diff: storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java ---
    @@ -0,0 +1,152 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package backtype.storm.scheduler.resource;
    +
    +import java.util.*;
    --- End diff --
    
    will change


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/746#discussion_r40838121
  
    --- Diff: storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java ---
    @@ -79,11 +95,302 @@ public StormTopology getTopology() {
                     ret.put(executor, compId);
                 }
             }
    -        
    +
             return ret;
         }
    -    
    +
         public Collection<ExecutorDetails> getExecutors() {
             return this.executorToComponent.keySet();
         }
    +
    +    private void initResourceList() {
    +        _resourceList = new HashMap<ExecutorDetails, Map<String, Double>>();
    +        // Extract bolt memory info
    +        if (this.topology.get_bolts() != null) {
    +            for (Map.Entry<String, Bolt> bolt : this.topology.get_bolts().entrySet()) {
    +                //the json_conf is populated by TopologyBuilder (e.g. boltDeclarer.setMemoryLoad)
    +                Map<String, Double> topology_resources = ResourceUtils.parseResources(bolt
    +                        .getValue().get_common().get_json_conf());
    +                ResourceUtils.checkIntialization(topology_resources, bolt.getValue().toString(), this.topologyConf);
    +                for (Map.Entry<ExecutorDetails, String> anExecutorToComponent : executorToComponent.entrySet()) {
    +                    if (bolt.getKey().equals(anExecutorToComponent.getValue())) {
    +                        _resourceList.put(anExecutorToComponent.getKey(), topology_resources);
    +                    }
    +                }
    +            }
    +        }
    +        // Extract spout memory info
    +        if (this.topology.get_spouts() != null) {
    +            for (Map.Entry<String, SpoutSpec> spout : this.topology.get_spouts().entrySet()) {
    +                Map<String, Double> topology_resources = ResourceUtils.parseResources(spout
    +                        .getValue().get_common().get_json_conf());
    +                ResourceUtils.checkIntialization(topology_resources, spout.getValue().toString(), this.topologyConf);
    +                for (Map.Entry<ExecutorDetails, String> anExecutorToComponent : executorToComponent.entrySet()) {
    +                    if (spout.getKey().equals(anExecutorToComponent.getValue())) {
    +                        _resourceList.put(anExecutorToComponent.getKey(), topology_resources);
    +                    }
    +                }
    +            }
    +        } else {
    +            LOG.warn("Topology " + topologyId + " does not seem to have any spouts!");
    +        }
    +        //schedule tasks that are not part of components returned from topology.get_spout or topology.getbolt (AKA sys tasks most specifically __acker tasks)
    +        for(ExecutorDetails exec : this.getExecutors()) {
    +            if (_resourceList.containsKey(exec) == false) {
    +                LOG.debug(
    +                        "Scheduling {} {} with memory requirement as 'on heap' - {} and 'off heap' - {} and CPU requirement as {}",
    +                        this.getExecutorToComponent().get(exec),
    +                        exec,
    +                        this.topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB),
    +                        this.topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB),
    +                        this.topologyConf.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT));
    +                this.addDefaultResforExec(exec);
    +            } 
    +        }
    +    }
    +
    +    private List<ExecutorDetails> componentToExecs(String comp) {
    +        List<ExecutorDetails> execs = new ArrayList<>();
    +        for (Map.Entry<ExecutorDetails, String> entry : executorToComponent.entrySet()) {
    +            if (entry.getValue().equals(comp)) {
    +                execs.add(entry.getKey());
    +            }
    +        }
    +        return execs;
    +    }
    +
    +    /**
    +     * Returns a representation of the non-system components of the topology graph
    +     * Each Component object in the returning map is populated with the list of its
    +     * parents, children and execs assigned to that component.
    +     * @return a map of components
    +     */
    +    public Map<String, Component> getComponents() {
    +        Map<String, Component> all_comp = new HashMap<String, Component>();
    +
    +        StormTopology storm_topo = this.topology;
    +        // spouts
    +        if (storm_topo.get_spouts() != null) {
    +            for (Map.Entry<String, SpoutSpec> spoutEntry : storm_topo
    +                    .get_spouts().entrySet()) {
    +                if (!Utils.isSystemId(spoutEntry.getKey())) {
    +                    Component newComp = null;
    +                    if (all_comp.containsKey(spoutEntry.getKey())) {
    +                        newComp = all_comp.get(spoutEntry.getKey());
    +                        newComp.execs = componentToExecs(newComp.id);
    +                    } else {
    +                        newComp = new Component(spoutEntry.getKey());
    +                        newComp.execs = componentToExecs(newComp.id);
    +                        all_comp.put(spoutEntry.getKey(), newComp);
    +                    }
    +                    newComp.type = Component.ComponentType.SPOUT;
    +
    +                    for (Map.Entry<GlobalStreamId, Grouping> spoutInput : spoutEntry
    +                            .getValue().get_common().get_inputs()
    +                            .entrySet()) {
    +                        newComp.parents.add(spoutInput.getKey()
    +                                .get_componentId());
    +                        if (!all_comp.containsKey(spoutInput
    +                                .getKey().get_componentId())) {
    +                            all_comp.put(spoutInput.getKey()
    +                                            .get_componentId(),
    +                                    new Component(spoutInput.getKey()
    +                                            .get_componentId()));
    +                        }
    +                        all_comp.get(spoutInput.getKey()
    +                                .get_componentId()).children.add(spoutEntry
    +                                .getKey());
    +                    }
    +                }
    +            }
    +        }
    +        // bolts
    +        if (storm_topo.get_bolts() != null) {
    +            for (Map.Entry<String, Bolt> boltEntry : storm_topo.get_bolts()
    +                    .entrySet()) {
    +                if (!Utils.isSystemId(boltEntry.getKey())) {
    +                    Component newComp = null;
    +                    if (all_comp.containsKey(boltEntry.getKey())) {
    +                        newComp = all_comp.get(boltEntry.getKey());
    +                        newComp.execs = componentToExecs(newComp.id);
    +                    } else {
    +                        newComp = new Component(boltEntry.getKey());
    +                        newComp.execs = componentToExecs(newComp.id);
    +                        all_comp.put(boltEntry.getKey(), newComp);
    +                    }
    +                    newComp.type = Component.ComponentType.BOLT;
    +
    +                    for (Map.Entry<GlobalStreamId, Grouping> boltInput : boltEntry
    +                            .getValue().get_common().get_inputs()
    +                            .entrySet()) {
    +                        newComp.parents.add(boltInput.getKey()
    +                                .get_componentId());
    +                        if (!all_comp.containsKey(boltInput
    +                                .getKey().get_componentId())) {
    +                            all_comp.put(boltInput.getKey()
    +                                            .get_componentId(),
    +                                    new Component(boltInput.getKey()
    +                                            .get_componentId()));
    +                        }
    +                        all_comp.get(boltInput.getKey()
    +                                .get_componentId()).children.add(boltEntry
    +                                .getKey());
    +                    }
    +                }
    +            }
    +        }
    +        return all_comp;
    +    }
    +
    +    /**
    +     * Gets the on heap memory requirement for a
    +     * certain task within a topology
    +     * @param exec the executor the inquiry is concerning.
    +     * @return Double the amount of on heap memory
    +     * requirement for this exec in topology topoId.
    +     */
    +    public Double getOnHeapMemoryRequirement(ExecutorDetails exec) {
    +        Double ret = null;
    +        if (hasExecInTopo(exec)) {
    +            ret = _resourceList
    +                    .get(exec)
    +                    .get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
    +        }
    +        return ret;
    +    }
    +
    +    /**
    +     * Gets the off heap memory requirement for a
    +     * certain task within a topology
    +     * @param exec the executor the inquiry is concerning.
    +     * @return Double the amount of off heap memory
    +     * requirement for this exec in topology topoId.
    +     */
    +    public Double getOffHeapMemoryRequirement(ExecutorDetails exec) {
    +        Double ret = null;
    +        if (hasExecInTopo(exec)) {
    +            ret = _resourceList
    +                    .get(exec)
    +                    .get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB);
    +        }
    +        return ret;
    +    }
    +
    +    /**
    +     * Gets the total memory requirement for a task
    +     * @param exec the executor the inquiry is concerning.
    +     * @return Double the total memory requirement
    +     *  for this exec in topology topoId.
    +     */
    +    public Double getTotalMemReqTask(ExecutorDetails exec) {
    +        if (hasExecInTopo(exec)) {
    +            return getOffHeapMemoryRequirement(exec)
    +                    + getOnHeapMemoryRequirement(exec);
    +        }
    +        LOG.info("cannot find {}", exec);
    +        return null;
    +    }
    +
    +    /**
    +     * Gets the total memory resource list for a
    +     * set of tasks that is part of a topology.
    +     * @return Map<ExecutorDetails, Double> a map of the total memory requirement
    +     *  for all tasks in topology topoId.
    +     */
    +    public Map<ExecutorDetails, Double> getTotalMemoryResourceList() {
    +        Map<ExecutorDetails, Double> ret = new HashMap<ExecutorDetails, Double>();
    +        for (ExecutorDetails exec : _resourceList.keySet()) {
    +            ret.put(exec, getTotalMemReqTask(exec));
    +        }
    +        return ret;
    +    }
    +
    +    /**
    +     * Get the total CPU requirement for executor
    +     * @param exec
    +     * @return Double the total about of cpu requirement for executor
    +     */
    +    public Double getTotalCpuReqTask(ExecutorDetails exec) {
    +        if (hasExecInTopo(exec)) {
    +            return _resourceList
    +                    .get(exec)
    +                    .get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);
    +        }
    +        LOG.warn("cannot find - {}", exec);
    --- End diff --
    
    Again I don't think this log message adds much.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/746#discussion_r40731800
  
    --- Diff: storm-core/src/jvm/backtype/storm/scheduler/Topologies.java ---
    @@ -21,10 +21,17 @@
     import java.util.HashMap;
     import java.util.Map;
     
    +import backtype.storm.scheduler.resource.RAS_Component;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
     public class Topologies {
         Map<String, TopologyDetails> topologies;
         Map<String, String> nameToId;
    -    
    +    Map<String, Map<String, RAS_Component>> _allRAS_Components;
    +
    +    private static final Logger LOG = LoggerFactory.getLogger(Topologies.class);
    --- End diff --
    
    This does not look like it is being used.  Not a big deal, but unless we use it we should probably remove it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

Posted by jerrypeng <gi...@git.apache.org>.
Github user jerrypeng commented on a diff in the pull request:

    https://github.com/apache/storm/pull/746#discussion_r41412262
  
    --- Diff: storm-core/src/jvm/backtype/storm/scheduler/Cluster.java ---
    @@ -438,6 +451,35 @@ public SupervisorDetails getSupervisorById(String nodeId) {
             return this.supervisors;
         }
     
    +    /*
    +    * Note: Make sure the proper conf was passed into the Cluster constructor before calling this function
    +    * It tries to load the proper network topography detection plugin specified in the config.
    +    */
    +    public Map<String, List<String>> getNetworkTopography() {
    +        if (networkTopography == null) {
    +            networkTopography = new HashMap<String, List<String>>();
    +            ArrayList<String> supervisorHostNames = new ArrayList<String>();
    +            for (SupervisorDetails s : supervisors.values()) {
    +                supervisorHostNames.add(s.getHost());
    +            }
    +
    +            String clazz = (String) conf.get(Config.STORM_NETWORK_TOPOGRAPHY_PLUGIN);
    +            DNSToSwitchMapping topographyMapper = (DNSToSwitchMapping) Utils.newInstance(clazz);
    +
    +            Map <String,String> resolvedSuperVisors = topographyMapper.resolve(supervisorHostNames);
    +            for(String hostName: resolvedSuperVisors.keySet()) {
    --- End diff --
    
    will fix


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

Posted by jerrypeng <gi...@git.apache.org>.
Github user jerrypeng commented on a diff in the pull request:

    https://github.com/apache/storm/pull/746#discussion_r40734280
  
    --- Diff: storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java ---
    @@ -31,41 +42,47 @@
         StormTopology topology;
         Map<ExecutorDetails, String> executorToComponent;
         int numWorkers;
    - 
    +    //<ExecutorDetails - Task, Map<String - Type of resource, Map<String - type of that resource, Double - amount>>>
    +    private Map<ExecutorDetails, Map<String, Double>> _resourceList;
    +    //Scheduler this topology should be scheduled with
    +    private String scheduler;
    --- End diff --
    
    will delete


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/746#discussion_r40729530
  
    --- Diff: storm-core/src/clj/backtype/storm/daemon/nimbus.clj ---
    @@ -308,15 +308,15 @@
     (defn- all-supervisor-info
       ([storm-cluster-state] (all-supervisor-info storm-cluster-state nil))
       ([storm-cluster-state callback]
    -     (let [supervisor-ids (.supervisors storm-cluster-state callback)]
    -       (into {}
    -             (mapcat
    -              (fn [id]
    -                (if-let [info (.supervisor-info storm-cluster-state id)]
    -                  [[id info]]
    -                  ))
    -              supervisor-ids))
    -       )))
    +    (let [supervisor-ids (.supervisors storm-cluster-state callback)]
    +      (into {}
    +        (mapcat
    +          (fn [id]
    +            (if-let [info (.supervisor-info storm-cluster-state id)]
    +              [[id info]]
    +              ))
    +          supervisor-ids))
    +      )))
    --- End diff --
    
    If this is just white space changes I think we should probably revert them


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/746#discussion_r40837341
  
    --- Diff: storm-core/src/jvm/backtype/storm/StormSubmitter.java ---
    @@ -442,4 +446,35 @@ public static String submitJar(Map conf, String localJar, ProgressListener liste
              */
             public void onCompleted(String srcFile, String targetFile, long totalBytes);
         }
    +    
    +
    --- End diff --
    
    Extra space


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

Posted by jerrypeng <gi...@git.apache.org>.
Github user jerrypeng commented on a diff in the pull request:

    https://github.com/apache/storm/pull/746#discussion_r40735533
  
    --- Diff: storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java ---
    @@ -79,11 +96,304 @@ public StormTopology getTopology() {
                     ret.put(executor, compId);
                 }
             }
    -        
    +
             return ret;
         }
    -    
    +
         public Collection<ExecutorDetails> getExecutors() {
             return this.executorToComponent.keySet();
         }
    +
    +    private void initResourceList() {
    +        _resourceList = new HashMap<ExecutorDetails, Map<String, Double>>();
    +        // Extract bolt memory info
    +        if (this.topology.get_bolts() != null) {
    +            for (Map.Entry<String, Bolt> bolt : this.topology.get_bolts().entrySet()) {
    +                //the json_conf is populated by TopologyBuilder (e.g. boltDeclarer.setMemoryLoad)
    +                Map<String, Double> topology_resources = backtype.storm.scheduler.resource.Utils.parseResources(bolt
    +                        .getValue().get_common().get_json_conf());
    +                backtype.storm.scheduler.resource.Utils.checkIntialization(topology_resources, bolt.getValue().toString(), this.topologyConf);
    +                for (Map.Entry<ExecutorDetails, String> anExecutorToComponent : executorToComponent.entrySet()) {
    +                    if (bolt.getKey().equals(anExecutorToComponent.getValue())) {
    +                        _resourceList.put(anExecutorToComponent.getKey(), topology_resources);
    +                    }
    +                }
    +            }
    +        } else {
    +            LOG.warn("Topology " + topologyId + " does not seem to have any bolts!");
    +        }
    +        // Extract spout memory info
    +        if (this.topology.get_spouts() != null) {
    +            for (Map.Entry<String, SpoutSpec> spout : this.topology.get_spouts().entrySet()) {
    +                Map<String, Double> topology_resources = backtype.storm.scheduler.resource.Utils.parseResources(spout
    +                        .getValue().get_common().get_json_conf());
    +                backtype.storm.scheduler.resource.Utils.checkIntialization(topology_resources, spout.getValue().toString(), this.topologyConf);
    +                for (Map.Entry<ExecutorDetails, String> anExecutorToComponent : executorToComponent.entrySet()) {
    +                    if (spout.getKey().equals(anExecutorToComponent.getValue())) {
    +                        _resourceList.put(anExecutorToComponent.getKey(), topology_resources);
    +                    }
    +                }
    +            }
    +        } else {
    +            LOG.warn("Topology " + topologyId + " does not seem to have any spouts!");
    +        }
    +        //schedule tasks that are not part of components returned from topology.get_spout or topology.getbolt (AKA sys tasks most specifically __acker tasks)
    +        for(ExecutorDetails exec : this.getExecutors()) {
    +            if (_resourceList.containsKey(exec) == false) {
    +                LOG.info(
    +                        "Scheduling {} {} with memory requirement as 'on heap' - {} and 'off heap' - {} and CPU requirement as {}",
    +                        this.getExecutorToComponent().get(exec),
    +                        exec,
    +                        this.topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB),
    +                        this.topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB),
    +                        this.topologyConf.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT));
    +                this.addDefaultResforExec(exec);
    +            } 
    +        }
    +    }
    +
    +    private List<ExecutorDetails> componentToExecs(String comp) {
    +        List<ExecutorDetails> execs = new ArrayList<>();
    +        for (Map.Entry<ExecutorDetails, String> entry : executorToComponent.entrySet()) {
    +            if (entry.getValue().equals(comp)) {
    +                execs.add(entry.getKey());
    +            }
    +        }
    +        return execs;
    +    }
    +
    +    /**
    +     * Returns a representation of the non-system components of the topology graph
    +     * Each Component object in the returning map is populated with the list of its
    +     * parents, children and execs assigned to that component.
    +     * @return a map of components
    +     */
    +    public Map<String, RAS_Component> getRAS_Components() {
    --- End diff --
    
    will rename


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/746#discussion_r40838180
  
    --- Diff: storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java ---
    @@ -79,11 +95,302 @@ public StormTopology getTopology() {
                     ret.put(executor, compId);
                 }
             }
    -        
    +
             return ret;
         }
    -    
    +
         public Collection<ExecutorDetails> getExecutors() {
             return this.executorToComponent.keySet();
         }
    +
    +    private void initResourceList() {
    +        _resourceList = new HashMap<ExecutorDetails, Map<String, Double>>();
    +        // Extract bolt memory info
    +        if (this.topology.get_bolts() != null) {
    +            for (Map.Entry<String, Bolt> bolt : this.topology.get_bolts().entrySet()) {
    +                //the json_conf is populated by TopologyBuilder (e.g. boltDeclarer.setMemoryLoad)
    +                Map<String, Double> topology_resources = ResourceUtils.parseResources(bolt
    +                        .getValue().get_common().get_json_conf());
    +                ResourceUtils.checkIntialization(topology_resources, bolt.getValue().toString(), this.topologyConf);
    +                for (Map.Entry<ExecutorDetails, String> anExecutorToComponent : executorToComponent.entrySet()) {
    +                    if (bolt.getKey().equals(anExecutorToComponent.getValue())) {
    +                        _resourceList.put(anExecutorToComponent.getKey(), topology_resources);
    +                    }
    +                }
    +            }
    +        }
    +        // Extract spout memory info
    +        if (this.topology.get_spouts() != null) {
    +            for (Map.Entry<String, SpoutSpec> spout : this.topology.get_spouts().entrySet()) {
    +                Map<String, Double> topology_resources = ResourceUtils.parseResources(spout
    +                        .getValue().get_common().get_json_conf());
    +                ResourceUtils.checkIntialization(topology_resources, spout.getValue().toString(), this.topologyConf);
    +                for (Map.Entry<ExecutorDetails, String> anExecutorToComponent : executorToComponent.entrySet()) {
    +                    if (spout.getKey().equals(anExecutorToComponent.getValue())) {
    +                        _resourceList.put(anExecutorToComponent.getKey(), topology_resources);
    +                    }
    +                }
    +            }
    +        } else {
    +            LOG.warn("Topology " + topologyId + " does not seem to have any spouts!");
    +        }
    +        //schedule tasks that are not part of components returned from topology.get_spout or topology.getbolt (AKA sys tasks most specifically __acker tasks)
    +        for(ExecutorDetails exec : this.getExecutors()) {
    +            if (_resourceList.containsKey(exec) == false) {
    +                LOG.debug(
    +                        "Scheduling {} {} with memory requirement as 'on heap' - {} and 'off heap' - {} and CPU requirement as {}",
    +                        this.getExecutorToComponent().get(exec),
    +                        exec,
    +                        this.topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB),
    +                        this.topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB),
    +                        this.topologyConf.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT));
    +                this.addDefaultResforExec(exec);
    +            } 
    +        }
    +    }
    +
    +    private List<ExecutorDetails> componentToExecs(String comp) {
    +        List<ExecutorDetails> execs = new ArrayList<>();
    +        for (Map.Entry<ExecutorDetails, String> entry : executorToComponent.entrySet()) {
    +            if (entry.getValue().equals(comp)) {
    +                execs.add(entry.getKey());
    +            }
    +        }
    +        return execs;
    +    }
    +
    +    /**
    +     * Returns a representation of the non-system components of the topology graph
    +     * Each Component object in the returning map is populated with the list of its
    +     * parents, children and execs assigned to that component.
    +     * @return a map of components
    +     */
    +    public Map<String, Component> getComponents() {
    +        Map<String, Component> all_comp = new HashMap<String, Component>();
    +
    +        StormTopology storm_topo = this.topology;
    +        // spouts
    +        if (storm_topo.get_spouts() != null) {
    +            for (Map.Entry<String, SpoutSpec> spoutEntry : storm_topo
    +                    .get_spouts().entrySet()) {
    +                if (!Utils.isSystemId(spoutEntry.getKey())) {
    +                    Component newComp = null;
    +                    if (all_comp.containsKey(spoutEntry.getKey())) {
    +                        newComp = all_comp.get(spoutEntry.getKey());
    +                        newComp.execs = componentToExecs(newComp.id);
    +                    } else {
    +                        newComp = new Component(spoutEntry.getKey());
    +                        newComp.execs = componentToExecs(newComp.id);
    +                        all_comp.put(spoutEntry.getKey(), newComp);
    +                    }
    +                    newComp.type = Component.ComponentType.SPOUT;
    +
    +                    for (Map.Entry<GlobalStreamId, Grouping> spoutInput : spoutEntry
    +                            .getValue().get_common().get_inputs()
    +                            .entrySet()) {
    +                        newComp.parents.add(spoutInput.getKey()
    +                                .get_componentId());
    +                        if (!all_comp.containsKey(spoutInput
    +                                .getKey().get_componentId())) {
    +                            all_comp.put(spoutInput.getKey()
    +                                            .get_componentId(),
    +                                    new Component(spoutInput.getKey()
    +                                            .get_componentId()));
    +                        }
    +                        all_comp.get(spoutInput.getKey()
    +                                .get_componentId()).children.add(spoutEntry
    +                                .getKey());
    +                    }
    +                }
    +            }
    +        }
    +        // bolts
    +        if (storm_topo.get_bolts() != null) {
    +            for (Map.Entry<String, Bolt> boltEntry : storm_topo.get_bolts()
    +                    .entrySet()) {
    +                if (!Utils.isSystemId(boltEntry.getKey())) {
    +                    Component newComp = null;
    +                    if (all_comp.containsKey(boltEntry.getKey())) {
    +                        newComp = all_comp.get(boltEntry.getKey());
    +                        newComp.execs = componentToExecs(newComp.id);
    +                    } else {
    +                        newComp = new Component(boltEntry.getKey());
    +                        newComp.execs = componentToExecs(newComp.id);
    +                        all_comp.put(boltEntry.getKey(), newComp);
    +                    }
    +                    newComp.type = Component.ComponentType.BOLT;
    +
    +                    for (Map.Entry<GlobalStreamId, Grouping> boltInput : boltEntry
    +                            .getValue().get_common().get_inputs()
    +                            .entrySet()) {
    +                        newComp.parents.add(boltInput.getKey()
    +                                .get_componentId());
    +                        if (!all_comp.containsKey(boltInput
    +                                .getKey().get_componentId())) {
    +                            all_comp.put(boltInput.getKey()
    +                                            .get_componentId(),
    +                                    new Component(boltInput.getKey()
    +                                            .get_componentId()));
    +                        }
    +                        all_comp.get(boltInput.getKey()
    +                                .get_componentId()).children.add(boltEntry
    +                                .getKey());
    +                    }
    +                }
    +            }
    +        }
    +        return all_comp;
    +    }
    +
    +    /**
    +     * Gets the on heap memory requirement for a
    +     * certain task within a topology
    +     * @param exec the executor the inquiry is concerning.
    +     * @return Double the amount of on heap memory
    +     * requirement for this exec in topology topoId.
    +     */
    +    public Double getOnHeapMemoryRequirement(ExecutorDetails exec) {
    +        Double ret = null;
    +        if (hasExecInTopo(exec)) {
    +            ret = _resourceList
    +                    .get(exec)
    +                    .get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
    +        }
    +        return ret;
    +    }
    +
    +    /**
    +     * Gets the off heap memory requirement for a
    +     * certain task within a topology
    +     * @param exec the executor the inquiry is concerning.
    +     * @return Double the amount of off heap memory
    +     * requirement for this exec in topology topoId.
    +     */
    +    public Double getOffHeapMemoryRequirement(ExecutorDetails exec) {
    +        Double ret = null;
    +        if (hasExecInTopo(exec)) {
    +            ret = _resourceList
    +                    .get(exec)
    +                    .get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB);
    +        }
    +        return ret;
    +    }
    +
    +    /**
    +     * Gets the total memory requirement for a task
    +     * @param exec the executor the inquiry is concerning.
    +     * @return Double the total memory requirement
    +     *  for this exec in topology topoId.
    +     */
    +    public Double getTotalMemReqTask(ExecutorDetails exec) {
    +        if (hasExecInTopo(exec)) {
    +            return getOffHeapMemoryRequirement(exec)
    +                    + getOnHeapMemoryRequirement(exec);
    +        }
    +        LOG.info("cannot find {}", exec);
    +        return null;
    +    }
    +
    +    /**
    +     * Gets the total memory resource list for a
    +     * set of tasks that is part of a topology.
    +     * @return Map<ExecutorDetails, Double> a map of the total memory requirement
    +     *  for all tasks in topology topoId.
    +     */
    +    public Map<ExecutorDetails, Double> getTotalMemoryResourceList() {
    +        Map<ExecutorDetails, Double> ret = new HashMap<ExecutorDetails, Double>();
    +        for (ExecutorDetails exec : _resourceList.keySet()) {
    +            ret.put(exec, getTotalMemReqTask(exec));
    +        }
    +        return ret;
    +    }
    +
    +    /**
    +     * Get the total CPU requirement for executor
    +     * @param exec
    +     * @return Double the total about of cpu requirement for executor
    +     */
    +    public Double getTotalCpuReqTask(ExecutorDetails exec) {
    +        if (hasExecInTopo(exec)) {
    +            return _resourceList
    +                    .get(exec)
    +                    .get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);
    +        }
    +        LOG.warn("cannot find - {}", exec);
    +        return null;
    +    }
    +
    +    /**
    +     * get the resources requirements for a executor
    +     * @param exec
    +     * @return a map containing the resource requirements for this exec
    +     */
    +    public Map<String, Double> getTaskResourceReqList(ExecutorDetails exec) {
    +        if (hasExecInTopo(exec)) {
    +            return _resourceList.get(exec);
    +        }
    +        LOG.warn("cannot find - {}", exec);
    --- End diff --
    
    And here too.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the pull request:

    https://github.com/apache/storm/pull/746#issuecomment-145253628
  
    @jerrypeng Just two more issues, both of which re relatively minor.  After them I am +1 for merging this in.  Then hopefully development work on RAS can move forward in open source.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/746#discussion_r40730329
  
    --- Diff: storm-core/src/jvm/backtype/storm/StormSubmitter.java ---
    @@ -187,7 +187,7 @@ public static void submitTopology(String name, Map stormConf, StormTopology topo
          * @throws AuthorizationException
          */
    --- End diff --
    
    If we now throw an IllegalArgumentException, even if it is a Runtime Exception we should document it here, and ideally describe the situation when it is thrown.  Otherwise why declare it at all?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

Posted by jerrypeng <gi...@git.apache.org>.
Github user jerrypeng commented on a diff in the pull request:

    https://github.com/apache/storm/pull/746#discussion_r40850487
  
    --- Diff: storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceUtils.java ---
    @@ -0,0 +1,117 @@
    +package backtype.storm.scheduler.resource;
    --- End diff --
    
    will add


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/746#discussion_r40732321
  
    --- Diff: storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java ---
    @@ -79,11 +96,304 @@ public StormTopology getTopology() {
                     ret.put(executor, compId);
                 }
             }
    -        
    +
             return ret;
         }
    -    
    +
         public Collection<ExecutorDetails> getExecutors() {
             return this.executorToComponent.keySet();
         }
    +
    +    private void initResourceList() {
    +        _resourceList = new HashMap<ExecutorDetails, Map<String, Double>>();
    +        // Extract bolt memory info
    +        if (this.topology.get_bolts() != null) {
    +            for (Map.Entry<String, Bolt> bolt : this.topology.get_bolts().entrySet()) {
    +                //the json_conf is populated by TopologyBuilder (e.g. boltDeclarer.setMemoryLoad)
    +                Map<String, Double> topology_resources = backtype.storm.scheduler.resource.Utils.parseResources(bolt
    +                        .getValue().get_common().get_json_conf());
    +                backtype.storm.scheduler.resource.Utils.checkIntialization(topology_resources, bolt.getValue().toString(), this.topologyConf);
    +                for (Map.Entry<ExecutorDetails, String> anExecutorToComponent : executorToComponent.entrySet()) {
    +                    if (bolt.getKey().equals(anExecutorToComponent.getValue())) {
    +                        _resourceList.put(anExecutorToComponent.getKey(), topology_resources);
    +                    }
    +                }
    +            }
    +        } else {
    +            LOG.warn("Topology " + topologyId + " does not seem to have any bolts!");
    +        }
    +        // Extract spout memory info
    +        if (this.topology.get_spouts() != null) {
    +            for (Map.Entry<String, SpoutSpec> spout : this.topology.get_spouts().entrySet()) {
    +                Map<String, Double> topology_resources = backtype.storm.scheduler.resource.Utils.parseResources(spout
    +                        .getValue().get_common().get_json_conf());
    +                backtype.storm.scheduler.resource.Utils.checkIntialization(topology_resources, spout.getValue().toString(), this.topologyConf);
    +                for (Map.Entry<ExecutorDetails, String> anExecutorToComponent : executorToComponent.entrySet()) {
    +                    if (spout.getKey().equals(anExecutorToComponent.getValue())) {
    +                        _resourceList.put(anExecutorToComponent.getKey(), topology_resources);
    +                    }
    +                }
    +            }
    +        } else {
    +            LOG.warn("Topology " + topologyId + " does not seem to have any spouts!");
    +        }
    +        //schedule tasks that are not part of components returned from topology.get_spout or topology.getbolt (AKA sys tasks most specifically __acker tasks)
    +        for(ExecutorDetails exec : this.getExecutors()) {
    +            if (_resourceList.containsKey(exec) == false) {
    +                LOG.info(
    --- End diff --
    
    Can we make this debug instead?  I don't think we need to tell everyone all the time about all of the topologies resources.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

Posted by jerrypeng <gi...@git.apache.org>.
Github user jerrypeng commented on the pull request:

    https://github.com/apache/storm/pull/746#issuecomment-145152436
  
    Done making modifications based on comments.  Can I get some more reviews please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/746#discussion_r41087614
  
    --- Diff: storm-core/src/jvm/backtype/storm/networktopography/DefaultRackDNSToSwitchMapping.java ---
    @@ -0,0 +1,39 @@
    +package backtype.storm.networktopography;
    +
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.ConcurrentHashMap;
    +
    +/**
    + * This class implements the {@link DNSToSwitchMapping} interface
    + *    It returns the DEFAULT_RACK for every host.
    + */
    +public final class DefaultRackDNSToSwitchMapping extends CachedDNSToSwitchMapping {
    --- End diff --
    
    I now we borrowed this from Hadoop, and it looks like Hadoop has the same issue, but we cannot subclass CachedDNSToSwitchMapping to get caching.  It does not work that way.  Please either fix CacheDNSToSwitchMapping to actually do caching when it is a parent class, or remove it all together and have it's children inherent directly from AbstractDNSToSwitchMapping. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

Posted by jerrypeng <gi...@git.apache.org>.
Github user jerrypeng commented on a diff in the pull request:

    https://github.com/apache/storm/pull/746#discussion_r40731147
  
    --- Diff: storm-core/src/jvm/backtype/storm/StormSubmitter.java ---
    @@ -187,7 +187,7 @@ public static void submitTopology(String name, Map stormConf, StormTopology topo
          * @throws AuthorizationException
          */
    --- End diff --
    
    add comment


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

Posted by jerrypeng <gi...@git.apache.org>.
Github user jerrypeng commented on the pull request:

    https://github.com/apache/storm/pull/746#issuecomment-146066442
  
    @zhuoliu Thanks for your review.  I think just push a commit that I think address all your comments


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/746#discussion_r40732622
  
    --- Diff: storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java ---
    @@ -79,11 +96,304 @@ public StormTopology getTopology() {
                     ret.put(executor, compId);
                 }
             }
    -        
    +
             return ret;
         }
    -    
    +
         public Collection<ExecutorDetails> getExecutors() {
             return this.executorToComponent.keySet();
         }
    +
    +    private void initResourceList() {
    +        _resourceList = new HashMap<ExecutorDetails, Map<String, Double>>();
    +        // Extract bolt memory info
    +        if (this.topology.get_bolts() != null) {
    +            for (Map.Entry<String, Bolt> bolt : this.topology.get_bolts().entrySet()) {
    +                //the json_conf is populated by TopologyBuilder (e.g. boltDeclarer.setMemoryLoad)
    +                Map<String, Double> topology_resources = backtype.storm.scheduler.resource.Utils.parseResources(bolt
    +                        .getValue().get_common().get_json_conf());
    +                backtype.storm.scheduler.resource.Utils.checkIntialization(topology_resources, bolt.getValue().toString(), this.topologyConf);
    +                for (Map.Entry<ExecutorDetails, String> anExecutorToComponent : executorToComponent.entrySet()) {
    +                    if (bolt.getKey().equals(anExecutorToComponent.getValue())) {
    +                        _resourceList.put(anExecutorToComponent.getKey(), topology_resources);
    +                    }
    +                }
    +            }
    +        } else {
    +            LOG.warn("Topology " + topologyId + " does not seem to have any bolts!");
    +        }
    +        // Extract spout memory info
    +        if (this.topology.get_spouts() != null) {
    +            for (Map.Entry<String, SpoutSpec> spout : this.topology.get_spouts().entrySet()) {
    +                Map<String, Double> topology_resources = backtype.storm.scheduler.resource.Utils.parseResources(spout
    +                        .getValue().get_common().get_json_conf());
    +                backtype.storm.scheduler.resource.Utils.checkIntialization(topology_resources, spout.getValue().toString(), this.topologyConf);
    +                for (Map.Entry<ExecutorDetails, String> anExecutorToComponent : executorToComponent.entrySet()) {
    +                    if (spout.getKey().equals(anExecutorToComponent.getValue())) {
    +                        _resourceList.put(anExecutorToComponent.getKey(), topology_resources);
    +                    }
    +                }
    +            }
    +        } else {
    +            LOG.warn("Topology " + topologyId + " does not seem to have any spouts!");
    +        }
    +        //schedule tasks that are not part of components returned from topology.get_spout or topology.getbolt (AKA sys tasks most specifically __acker tasks)
    +        for(ExecutorDetails exec : this.getExecutors()) {
    +            if (_resourceList.containsKey(exec) == false) {
    +                LOG.info(
    +                        "Scheduling {} {} with memory requirement as 'on heap' - {} and 'off heap' - {} and CPU requirement as {}",
    +                        this.getExecutorToComponent().get(exec),
    +                        exec,
    +                        this.topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB),
    +                        this.topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB),
    +                        this.topologyConf.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT));
    +                this.addDefaultResforExec(exec);
    +            } 
    +        }
    +    }
    +
    +    private List<ExecutorDetails> componentToExecs(String comp) {
    +        List<ExecutorDetails> execs = new ArrayList<>();
    +        for (Map.Entry<ExecutorDetails, String> entry : executorToComponent.entrySet()) {
    +            if (entry.getValue().equals(comp)) {
    +                execs.add(entry.getKey());
    +            }
    +        }
    +        return execs;
    +    }
    +
    +    /**
    +     * Returns a representation of the non-system components of the topology graph
    +     * Each Component object in the returning map is populated with the list of its
    +     * parents, children and execs assigned to that component.
    +     * @return a map of components
    +     */
    +    public Map<String, RAS_Component> getRAS_Components() {
    --- End diff --
    
    I'm not sure that RAS_Component is the right name for this class or this method.  The class has nothing to do with resources at all.  Perhaps we can call it a Component and the method getComponentGraph instead?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

Posted by jerrypeng <gi...@git.apache.org>.
Github user jerrypeng commented on a diff in the pull request:

    https://github.com/apache/storm/pull/746#discussion_r40849301
  
    --- Diff: storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java ---
    @@ -79,11 +95,302 @@ public StormTopology getTopology() {
                     ret.put(executor, compId);
                 }
             }
    -        
    +
             return ret;
         }
    -    
    +
         public Collection<ExecutorDetails> getExecutors() {
             return this.executorToComponent.keySet();
         }
    +
    +    private void initResourceList() {
    +        _resourceList = new HashMap<ExecutorDetails, Map<String, Double>>();
    +        // Extract bolt memory info
    +        if (this.topology.get_bolts() != null) {
    +            for (Map.Entry<String, Bolt> bolt : this.topology.get_bolts().entrySet()) {
    +                //the json_conf is populated by TopologyBuilder (e.g. boltDeclarer.setMemoryLoad)
    +                Map<String, Double> topology_resources = ResourceUtils.parseResources(bolt
    +                        .getValue().get_common().get_json_conf());
    +                ResourceUtils.checkIntialization(topology_resources, bolt.getValue().toString(), this.topologyConf);
    +                for (Map.Entry<ExecutorDetails, String> anExecutorToComponent : executorToComponent.entrySet()) {
    +                    if (bolt.getKey().equals(anExecutorToComponent.getValue())) {
    +                        _resourceList.put(anExecutorToComponent.getKey(), topology_resources);
    +                    }
    +                }
    +            }
    +        }
    +        // Extract spout memory info
    +        if (this.topology.get_spouts() != null) {
    +            for (Map.Entry<String, SpoutSpec> spout : this.topology.get_spouts().entrySet()) {
    +                Map<String, Double> topology_resources = ResourceUtils.parseResources(spout
    +                        .getValue().get_common().get_json_conf());
    +                ResourceUtils.checkIntialization(topology_resources, spout.getValue().toString(), this.topologyConf);
    +                for (Map.Entry<ExecutorDetails, String> anExecutorToComponent : executorToComponent.entrySet()) {
    +                    if (spout.getKey().equals(anExecutorToComponent.getValue())) {
    +                        _resourceList.put(anExecutorToComponent.getKey(), topology_resources);
    +                    }
    +                }
    +            }
    +        } else {
    +            LOG.warn("Topology " + topologyId + " does not seem to have any spouts!");
    +        }
    +        //schedule tasks that are not part of components returned from topology.get_spout or topology.getbolt (AKA sys tasks most specifically __acker tasks)
    +        for(ExecutorDetails exec : this.getExecutors()) {
    +            if (_resourceList.containsKey(exec) == false) {
    +                LOG.debug(
    +                        "Scheduling {} {} with memory requirement as 'on heap' - {} and 'off heap' - {} and CPU requirement as {}",
    +                        this.getExecutorToComponent().get(exec),
    +                        exec,
    +                        this.topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB),
    +                        this.topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB),
    +                        this.topologyConf.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT));
    +                this.addDefaultResforExec(exec);
    +            } 
    +        }
    +    }
    +
    +    private List<ExecutorDetails> componentToExecs(String comp) {
    +        List<ExecutorDetails> execs = new ArrayList<>();
    +        for (Map.Entry<ExecutorDetails, String> entry : executorToComponent.entrySet()) {
    +            if (entry.getValue().equals(comp)) {
    +                execs.add(entry.getKey());
    +            }
    +        }
    +        return execs;
    +    }
    +
    +    /**
    +     * Returns a representation of the non-system components of the topology graph
    +     * Each Component object in the returning map is populated with the list of its
    +     * parents, children and execs assigned to that component.
    +     * @return a map of components
    +     */
    +    public Map<String, Component> getComponents() {
    +        Map<String, Component> all_comp = new HashMap<String, Component>();
    +
    +        StormTopology storm_topo = this.topology;
    +        // spouts
    +        if (storm_topo.get_spouts() != null) {
    +            for (Map.Entry<String, SpoutSpec> spoutEntry : storm_topo
    +                    .get_spouts().entrySet()) {
    +                if (!Utils.isSystemId(spoutEntry.getKey())) {
    +                    Component newComp = null;
    +                    if (all_comp.containsKey(spoutEntry.getKey())) {
    +                        newComp = all_comp.get(spoutEntry.getKey());
    +                        newComp.execs = componentToExecs(newComp.id);
    +                    } else {
    +                        newComp = new Component(spoutEntry.getKey());
    +                        newComp.execs = componentToExecs(newComp.id);
    +                        all_comp.put(spoutEntry.getKey(), newComp);
    +                    }
    +                    newComp.type = Component.ComponentType.SPOUT;
    +
    +                    for (Map.Entry<GlobalStreamId, Grouping> spoutInput : spoutEntry
    +                            .getValue().get_common().get_inputs()
    +                            .entrySet()) {
    +                        newComp.parents.add(spoutInput.getKey()
    +                                .get_componentId());
    +                        if (!all_comp.containsKey(spoutInput
    +                                .getKey().get_componentId())) {
    +                            all_comp.put(spoutInput.getKey()
    +                                            .get_componentId(),
    +                                    new Component(spoutInput.getKey()
    +                                            .get_componentId()));
    +                        }
    +                        all_comp.get(spoutInput.getKey()
    +                                .get_componentId()).children.add(spoutEntry
    +                                .getKey());
    +                    }
    +                }
    +            }
    +        }
    +        // bolts
    +        if (storm_topo.get_bolts() != null) {
    +            for (Map.Entry<String, Bolt> boltEntry : storm_topo.get_bolts()
    +                    .entrySet()) {
    +                if (!Utils.isSystemId(boltEntry.getKey())) {
    +                    Component newComp = null;
    +                    if (all_comp.containsKey(boltEntry.getKey())) {
    +                        newComp = all_comp.get(boltEntry.getKey());
    +                        newComp.execs = componentToExecs(newComp.id);
    +                    } else {
    +                        newComp = new Component(boltEntry.getKey());
    +                        newComp.execs = componentToExecs(newComp.id);
    +                        all_comp.put(boltEntry.getKey(), newComp);
    +                    }
    +                    newComp.type = Component.ComponentType.BOLT;
    +
    +                    for (Map.Entry<GlobalStreamId, Grouping> boltInput : boltEntry
    +                            .getValue().get_common().get_inputs()
    +                            .entrySet()) {
    +                        newComp.parents.add(boltInput.getKey()
    +                                .get_componentId());
    +                        if (!all_comp.containsKey(boltInput
    +                                .getKey().get_componentId())) {
    +                            all_comp.put(boltInput.getKey()
    +                                            .get_componentId(),
    +                                    new Component(boltInput.getKey()
    +                                            .get_componentId()));
    +                        }
    +                        all_comp.get(boltInput.getKey()
    +                                .get_componentId()).children.add(boltEntry
    +                                .getKey());
    +                    }
    +                }
    +            }
    +        }
    +        return all_comp;
    +    }
    +
    +    /**
    +     * Gets the on heap memory requirement for a
    +     * certain task within a topology
    +     * @param exec the executor the inquiry is concerning.
    +     * @return Double the amount of on heap memory
    +     * requirement for this exec in topology topoId.
    +     */
    +    public Double getOnHeapMemoryRequirement(ExecutorDetails exec) {
    +        Double ret = null;
    +        if (hasExecInTopo(exec)) {
    +            ret = _resourceList
    +                    .get(exec)
    +                    .get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
    +        }
    +        return ret;
    +    }
    +
    +    /**
    +     * Gets the off heap memory requirement for a
    +     * certain task within a topology
    +     * @param exec the executor the inquiry is concerning.
    +     * @return Double the amount of off heap memory
    +     * requirement for this exec in topology topoId.
    +     */
    +    public Double getOffHeapMemoryRequirement(ExecutorDetails exec) {
    +        Double ret = null;
    +        if (hasExecInTopo(exec)) {
    +            ret = _resourceList
    +                    .get(exec)
    +                    .get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB);
    +        }
    +        return ret;
    +    }
    +
    +    /**
    +     * Gets the total memory requirement for a task
    +     * @param exec the executor the inquiry is concerning.
    +     * @return Double the total memory requirement
    +     *  for this exec in topology topoId.
    +     */
    +    public Double getTotalMemReqTask(ExecutorDetails exec) {
    +        if (hasExecInTopo(exec)) {
    +            return getOffHeapMemoryRequirement(exec)
    +                    + getOnHeapMemoryRequirement(exec);
    +        }
    +        LOG.info("cannot find {}", exec);
    +        return null;
    +    }
    +
    +    /**
    +     * Gets the total memory resource list for a
    +     * set of tasks that is part of a topology.
    +     * @return Map<ExecutorDetails, Double> a map of the total memory requirement
    +     *  for all tasks in topology topoId.
    +     */
    +    public Map<ExecutorDetails, Double> getTotalMemoryResourceList() {
    +        Map<ExecutorDetails, Double> ret = new HashMap<ExecutorDetails, Double>();
    +        for (ExecutorDetails exec : _resourceList.keySet()) {
    +            ret.put(exec, getTotalMemReqTask(exec));
    +        }
    +        return ret;
    +    }
    +
    +    /**
    +     * Get the total CPU requirement for executor
    +     * @param exec
    +     * @return Double the total about of cpu requirement for executor
    +     */
    +    public Double getTotalCpuReqTask(ExecutorDetails exec) {
    +        if (hasExecInTopo(exec)) {
    +            return _resourceList
    +                    .get(exec)
    +                    .get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);
    +        }
    +        LOG.warn("cannot find - {}", exec);
    +        return null;
    +    }
    +
    +    /**
    +     * get the resources requirements for a executor
    +     * @param exec
    +     * @return a map containing the resource requirements for this exec
    +     */
    +    public Map<String, Double> getTaskResourceReqList(ExecutorDetails exec) {
    +        if (hasExecInTopo(exec)) {
    +            return _resourceList.get(exec);
    +        }
    +        LOG.warn("cannot find - {}", exec);
    --- End diff --
    
    will remove


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

Posted by jerrypeng <gi...@git.apache.org>.
Github user jerrypeng commented on a diff in the pull request:

    https://github.com/apache/storm/pull/746#discussion_r40735548
  
    --- Diff: storm-core/src/jvm/backtype/storm/scheduler/Topologies.java ---
    @@ -21,10 +21,17 @@
     import java.util.HashMap;
     import java.util.Map;
     
    +import backtype.storm.scheduler.resource.RAS_Component;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
     public class Topologies {
         Map<String, TopologyDetails> topologies;
         Map<String, String> nameToId;
    -    
    +    Map<String, Map<String, RAS_Component>> _allRAS_Components;
    +
    +    private static final Logger LOG = LoggerFactory.getLogger(Topologies.class);
    --- End diff --
    
    will remove


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/746#discussion_r40730129
  
    --- Diff: storm-core/src/jvm/backtype/storm/Config.java ---
    @@ -1101,6 +1127,33 @@
         public static final Object TOPOLOGY_TASKS_SCHEMA = ConfigValidation.IntegerValidator;
     
         /**
    +     * The maximum amount of memory an instance of a spout/bolt will take on heap. This enables the scheduler
    +     * to allocate slots on machines with enough available memory.
    +     */
    +    public static final String TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB = "topology.component.resources.onheap.memory.mb";
    +    public static final Object TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB_SCHEMA = ConfigValidation.NonNegativeNumberValidator;
    +
    +    /**
    +     * The maximum amount of memory an instance of a spout/bolt will take off heap. This enables the scheduler
    +     * to allocate slots on machines with enough available memory.
    +     */
    +    public static final String TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB = "topology.component.resources.offheap.memory.mb";
    +    public static final Object TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB_SCHEMA = ConfigValidation.NonNegativeNumberValidator;
    +
    +    /**
    +     * The config indicates the percentage of cpu for a core. Assuming the a core value to be 100, a
    --- End diff --
    
    This is a bit confusing I think you mean the percentage of a cpu core a component will take.
    
    All of the configs should probably mention that this is the default value for components that don't override the value.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

Posted by zhuoliu <gi...@git.apache.org>.
Github user zhuoliu commented on the pull request:

    https://github.com/apache/storm/pull/746#issuecomment-146692455
  
    Nice work! +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

Posted by jerrypeng <gi...@git.apache.org>.
Github user jerrypeng commented on a diff in the pull request:

    https://github.com/apache/storm/pull/746#discussion_r41347393
  
    --- Diff: storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/ResourceAwareStrategy.java ---
    @@ -0,0 +1,478 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package backtype.storm.scheduler.resource.strategies;
    +
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +import java.util.Queue;
    +import java.util.TreeMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import backtype.storm.scheduler.Cluster;
    +import backtype.storm.scheduler.ExecutorDetails;
    +import backtype.storm.scheduler.Topologies;
    +import backtype.storm.scheduler.TopologyDetails;
    +import backtype.storm.scheduler.WorkerSlot;
    +import backtype.storm.scheduler.resource.Component;
    +import backtype.storm.scheduler.resource.RAS_Node;
    +
    +public class ResourceAwareStrategy implements IStrategy {
    +    private Logger LOG = null;
    +    private Topologies _topologies;
    +    private Cluster _cluster;
    +    //Map key is the supervisor id and the value is the corresponding RAS_Node Object 
    +    private Map<String, RAS_Node> _availNodes;
    +    private RAS_Node refNode = null;
    +    /**
    +     * supervisor id -> Node
    +     */
    +    private Map<String, RAS_Node> _nodes;
    +    private Map<String, List<String>> _clusterInfo;
    +
    +    private final double CPU_WEIGHT = 1.0;
    +    private final double MEM_WEIGHT = 1.0;
    +    private final double NETWORK_WEIGHT = 1.0;
    +
    +    public ResourceAwareStrategy(Cluster cluster, Topologies topologies) {
    +        _topologies = topologies;
    +        _cluster = cluster;
    +        _nodes = RAS_Node.getAllNodesFrom(cluster, _topologies);
    +        _availNodes = this.getAvailNodes();
    +        this.LOG = LoggerFactory.getLogger(this.getClass());
    +        _clusterInfo = cluster.getNetworkTopography();
    +        LOG.debug(this.getClusterInfo());
    +    }
    +
    +    //the returned TreeMap keeps the Components sorted
    +    private TreeMap<Integer, List<ExecutorDetails>> getPriorityToExecutorDetailsListMap(
    +            Queue<Component> ordered__Component_list, Collection<ExecutorDetails> unassignedExecutors) {
    +        TreeMap<Integer, List<ExecutorDetails>> retMap = new TreeMap<Integer, List<ExecutorDetails>>();
    +        Integer rank = 0;
    +        for (Component ras_comp : ordered__Component_list) {
    +            retMap.put(rank, new ArrayList<ExecutorDetails>());
    +            for(ExecutorDetails exec : ras_comp.execs) {
    +                if(unassignedExecutors.contains(exec)) {
    +                    retMap.get(rank).add(exec);
    +                }
    +            }
    +            rank++;
    +        }
    +        return retMap;
    +    }
    +
    +    public Map<WorkerSlot, Collection<ExecutorDetails>> schedule(TopologyDetails td) {
    +        if (_availNodes.size() <= 0) {
    +            LOG.warn("No available nodes to schedule tasks on!");
    +            return null;
    +        }
    +        Collection<ExecutorDetails> unassignedExecutors = _cluster.getUnassignedExecutors(td);
    +        Map<WorkerSlot, Collection<ExecutorDetails>> schedulerAssignmentMap = new HashMap<WorkerSlot, Collection<ExecutorDetails>>();
    +        LOG.debug("ExecutorsNeedScheduling: {}", unassignedExecutors);
    +        Collection<ExecutorDetails> scheduledTasks = new ArrayList<ExecutorDetails>();
    +        List<Component> spouts = this.getSpouts(_topologies, td);
    +
    +        if (spouts.size() == 0) {
    +            LOG.error("Cannot find a Spout!");
    +            return null;
    +        }
    +
    +        Queue<Component> ordered__Component_list = bfs(_topologies, td, spouts);
    +
    +        Map<Integer, List<ExecutorDetails>> priorityToExecutorMap = getPriorityToExecutorDetailsListMap(ordered__Component_list, unassignedExecutors);
    +        Collection<ExecutorDetails> executorsNotScheduled = new HashSet<ExecutorDetails>(unassignedExecutors);
    +        Integer longestPriorityListSize = this.getLongestPriorityListSize(priorityToExecutorMap);
    +        //Pick the first executor with priority one, then the 1st exec with priority 2, so on an so forth. 
    +        //Once we reach the last priority, we go back to priority 1 and schedule the second task with priority 1.
    +        for (int i = 0; i < longestPriorityListSize; i++) {
    +            for (Entry<Integer, List<ExecutorDetails>> entry : priorityToExecutorMap.entrySet()) {
    +                Iterator<ExecutorDetails> it = entry.getValue().iterator();
    +                if (it.hasNext()) {
    +                    ExecutorDetails exec = it.next();
    +                    LOG.debug("\n\nAttempting to schedule: {} of component {}[avail {}] with rank {}",
    +                            new Object[] { exec, td.getExecutorToComponent().get(exec),
    +                    td.getTaskResourceReqList(exec), entry.getKey() });
    +                    WorkerSlot targetSlot = this.findWorkerForExec(exec, td, schedulerAssignmentMap);
    +                    if (targetSlot != null) {
    +                        RAS_Node targetNode = this.idToNode(targetSlot.getNodeId());
    +                        if(!schedulerAssignmentMap.containsKey(targetSlot)) {
    +                            schedulerAssignmentMap.put(targetSlot, new LinkedList<ExecutorDetails>());
    +                        }
    +                       
    +                        schedulerAssignmentMap.get(targetSlot).add(exec);
    +                        targetNode.consumeResourcesforTask(exec, td);
    +                        scheduledTasks.add(exec);
    +                        LOG.debug("TASK {} assigned to Node: {} avail [mem: {} cpu: {}] total [mem: {} cpu: {}] on slot: {}", exec,
    +                                targetNode, targetNode.getAvailableMemoryResources(),
    +                                targetNode.getAvailableCpuResources(), targetNode.getTotalMemoryResources(),
    +                                targetNode.getTotalCpuResources(), targetSlot);
    +                    } else {
    +                        LOG.error("Not Enough Resources to schedule Task {}", exec);
    +                    }
    +                    it.remove();
    +                }
    +            }
    +        }
    +
    +        executorsNotScheduled.removeAll(scheduledTasks);
    +        LOG.debug("/* Scheduling left over task (most likely sys tasks) */");
    +        // schedule left over system tasks
    +        for (ExecutorDetails exec : executorsNotScheduled) {
    +            WorkerSlot targetSlot = this.findWorkerForExec(exec, td, schedulerAssignmentMap);
    +            if (targetSlot != null) {
    +                RAS_Node targetNode = this.idToNode(targetSlot.getNodeId());
    +                if(schedulerAssignmentMap.containsKey(targetSlot) == false) {
    +                    schedulerAssignmentMap.put(targetSlot, new LinkedList<ExecutorDetails>());
    +                }
    +               
    +                schedulerAssignmentMap.get(targetSlot).add(exec);
    +                targetNode.consumeResourcesforTask(exec, td);
    +                scheduledTasks.add(exec);
    +                LOG.debug("TASK {} assigned to Node: {} avail [mem: {} cpu: {}] total [mem: {} cpu: {}] on slot: {}", exec,
    +                        targetNode, targetNode.getAvailableMemoryResources(),
    +                        targetNode.getAvailableCpuResources(), targetNode.getTotalMemoryResources(),
    +                        targetNode.getTotalCpuResources(), targetSlot);
    +            } else {
    +                LOG.error("Not Enough Resources to schedule Task {}", exec);
    +            }
    +        }
    +        executorsNotScheduled.removeAll(scheduledTasks);
    +        if (executorsNotScheduled.size() > 0) {
    +            LOG.error("Not all executors successfully scheduled: {}",
    +                    executorsNotScheduled);
    +            schedulerAssignmentMap = null;
    +        } else {
    +            LOG.debug("All resources successfully scheduled!");
    +        }
    +        if (schedulerAssignmentMap == null) {
    +            LOG.error("Topology {} not successfully scheduled!", td.getId());
    +        }
    +        return schedulerAssignmentMap;
    +    }
    +
    +    private WorkerSlot findWorkerForExec(ExecutorDetails exec, TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) {
    +      WorkerSlot ws = null;
    +      // first scheduling
    +      if (this.refNode == null) {
    +          String clus = this.getBestClustering();
    +          ws = this.getBestWorker(exec, td, clus, scheduleAssignmentMap);
    +      } else {
    +          ws = this.getBestWorker(exec, td, scheduleAssignmentMap);
    +      }
    +      if(ws != null) {
    +          this.refNode = this.idToNode(ws.getNodeId());
    +      }
    +      LOG.debug("reference node for the resource aware scheduler is: {}", this.refNode);
    +      return ws;
    +    }
    +
    +    private WorkerSlot getBestWorker(ExecutorDetails exec, TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) {
    +        return this.getBestWorker(exec, td, null, scheduleAssignmentMap);
    +    }
    +
    +    private WorkerSlot getBestWorker(ExecutorDetails exec, TopologyDetails td, String clusterId, Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) {
    +        double taskMem = td.getTotalMemReqTask(exec);
    +        double taskCPU = td.getTotalCpuReqTask(exec);
    +        List<RAS_Node> nodes;
    +        if(clusterId != null) {
    +            nodes = this.getAvailableNodesFromCluster(clusterId);
    +            
    +        } else {
    +            nodes = this.getAvailableNodes();
    +        }
    +        TreeMap<Double, RAS_Node> nodeRankMap = new TreeMap<Double, RAS_Node>();
    +        for (RAS_Node n : nodes) {
    +            if(n.getFreeSlots().size()>0) {
    +                if (n.getAvailableMemoryResources() >= taskMem
    +                      && n.getAvailableCpuResources() >= taskCPU) {
    +                  double a = Math.pow((taskCPU - n.getAvailableCpuResources())
    +                          * this.CPU_WEIGHT, 2);
    +                  double b = Math.pow((taskMem - n.getAvailableMemoryResources())
    +                          * this.MEM_WEIGHT, 2);
    +                  double c = 0.0;
    +                  if(this.refNode != null) {
    +                      c = Math.pow(this.distToNode(this.refNode, n)
    +                              * this.NETWORK_WEIGHT, 2);
    +                  }
    +                  double distance = Math.sqrt(a + b + c);
    +                  nodeRankMap.put(distance, n);
    +                }
    +            }
    +        }
    +        
    +        for(Map.Entry<Double, RAS_Node> entry : nodeRankMap.entrySet()) {
    +            RAS_Node n = entry.getValue();
    +            for(WorkerSlot ws : n.getFreeSlots()) {
    +                if(checkWorkerConstraints(exec, ws, td, scheduleAssignmentMap)) {
    +                    return ws;
    +                }
    +            }
    +        }
    +        return null;
    +    }
    +
    +    private String getBestClustering() {
    +        String bestCluster = null;
    +        Double mostRes = 0.0;
    +        for (Entry<String, List<String>> cluster : _clusterInfo
    +                .entrySet()) {
    +            Double clusterTotalRes = this.getTotalClusterRes(cluster.getValue());
    +            if (clusterTotalRes > mostRes) {
    +                mostRes = clusterTotalRes;
    +                bestCluster = cluster.getKey();
    +            }
    +        }
    +        return bestCluster;
    +    }
    +
    +    private Double getTotalClusterRes(List<String> cluster) {
    +        Double res = 0.0;
    +        for (String node : cluster) {
    +            res += _availNodes.get(this.NodeHostnameToId(node))
    +                    .getAvailableMemoryResources()
    +                    + _availNodes.get(this.NodeHostnameToId(node))
    +                    .getAvailableCpuResources();
    +        }
    +        return res;
    +    }
    +
    +    private Double distToNode(RAS_Node src, RAS_Node dest) {
    +        if (src.getId().equals(dest.getId())==true) {
    +            return 1.0;
    +        }else if (this.NodeToCluster(src) == this.NodeToCluster(dest)) {
    +            return 2.0;
    +        } else {
    +            return 3.0;
    +        }
    +    }
    +
    +    private String NodeToCluster(RAS_Node node) {
    +        for (Entry<String, List<String>> entry : _clusterInfo
    +                .entrySet()) {
    +            if (entry.getValue().contains(node.getHostname())) {
    +                return entry.getKey();
    +            }
    +        }
    +        LOG.error("Node: {} not found in any clusters", node.getHostname());
    +        return null;
    +    }
    +    
    +    private List<RAS_Node> getAvailableNodes() {
    +        LinkedList<RAS_Node> nodes = new LinkedList<RAS_Node>();
    +        for (String clusterId : _clusterInfo.keySet()) {
    +            nodes.addAll(this.getAvailableNodesFromCluster(clusterId));
    +        }
    +        return nodes;
    +    }
    +
    +    private List<RAS_Node> getAvailableNodesFromCluster(String clus) {
    +        List<RAS_Node> retList = new ArrayList<RAS_Node>();
    +        for (String node_id : _clusterInfo.get(clus)) {
    +            retList.add(_availNodes.get(this
    +                    .NodeHostnameToId(node_id)));
    +        }
    +        return retList;
    +    }
    +
    +    private List<WorkerSlot> getAvailableWorkersFromCluster(String clusterId) {
    +        List<RAS_Node> nodes = this.getAvailableNodesFromCluster(clusterId);
    +        List<WorkerSlot> workers = new LinkedList<WorkerSlot>();
    +        for(RAS_Node node : nodes) {
    +            workers.addAll(node.getFreeSlots());
    +        }
    +        return workers;
    +    }
    +
    +    private List<WorkerSlot> getAvailableWorker() {
    +        List<WorkerSlot> workers = new LinkedList<WorkerSlot>();
    +        for (String clusterId : _clusterInfo.keySet()) {
    +            workers.addAll(this.getAvailableWorkersFromCluster(clusterId));
    +        }
    +        return workers;
    +    }
    +
    +    /**
    +     * In case in the future RAS can only use a subset of nodes
    +     */
    +    private Map<String, RAS_Node> getAvailNodes() {
    +        return _nodes;
    +    }
    +
    +    /**
    +     * Breadth first traversal of the topology DAG
    +     * @param topologies
    +     * @param td
    +     * @param spouts
    +     * @return A partial ordering of components
    +     */
    +    private Queue<Component> bfs(Topologies topologies, TopologyDetails td, List<Component> spouts) {
    +        // Since queue is a interface
    +        Queue<Component> ordered__Component_list = new LinkedList<Component>();
    +        HashMap<String, Component> visited = new HashMap<String, Component>();
    +
    +        /* start from each spout that is not visited, each does a breadth-first traverse */
    +        for (Component spout : spouts) {
    +            if (!visited.containsKey(spout.id)) {
    +                Queue<Component> queue = new LinkedList<Component>();
    +                queue.offer(spout);
    +                while (!queue.isEmpty()) {
    +                    Component comp = queue.poll();
    +                    visited.put(comp.id, comp);
    +                    ordered__Component_list.add(comp);
    +                    List<String> neighbors = new ArrayList<String>();
    +                    neighbors.addAll(comp.children);
    +                    neighbors.addAll(comp.parents);
    +                    for (String nbID : neighbors) {
    +                        if (!visited.containsKey(nbID)) {
    +                            Component child = topologies.getAllComponents().get(td.getId()).get(nbID);
    +                            queue.offer(child);
    +                        }
    +                    }
    +                }
    +            }
    +        }
    +        return ordered__Component_list;
    +    }
    +
    +    private List<Component> getSpouts(Topologies topologies, TopologyDetails td) {
    +        List<Component> spouts = new ArrayList<Component>();
    +        for (Component c : topologies.getAllComponents().get(td.getId())
    +                .values()) {
    +            if (c.type == Component.ComponentType.SPOUT) {
    +                spouts.add(c);
    +            }
    +        }
    +        return spouts;
    +    }
    +
    +    private Integer getLongestPriorityListSize(Map<Integer, List<ExecutorDetails>> priorityToExecutorMap) {
    +        Integer mostNum = 0;
    +        for (List<ExecutorDetails> execs : priorityToExecutorMap.values()) {
    +            Integer numExecs = execs.size();
    +            if (mostNum < numExecs) {
    +                mostNum = numExecs;
    +            }
    +        }
    +        return mostNum;
    +    }
    +
    +    /**
    +     * Get the remaining amount memory that can be assigned to a worker given the set worker max heap size
    +     * @param ws
    +     * @param td
    +     * @param scheduleAssignmentMap
    +     * @return The remaining amount of memory
    +     */
    +    private Double getWorkerScheduledMemoryAvailable(WorkerSlot ws, TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) {
    +        Double memScheduleUsed = this.getWorkerScheduledMemoryUse(ws, td, scheduleAssignmentMap);
    +        return td.getTopologyWorkerMaxHeapSize() - memScheduleUsed;
    +    }
    +
    +    /**
    +     * Get the amount of memory already assigned to a worker
    +     * @param ws
    +     * @param td
    +     * @param scheduleAssignmentMap
    +     * @return the amount of memory
    +     */
    +    private Double getWorkerScheduledMemoryUse(WorkerSlot ws, TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) {
    +        Double totalMem = 0.0;
    +        Collection<ExecutorDetails> execs = scheduleAssignmentMap.get(ws);
    +        if(execs != null) {
    +            for(ExecutorDetails exec : execs) {
    +                totalMem += td.getTotalMemReqTask(exec);
    +            }
    +        } 
    +        return totalMem;
    +    }
    +
    +    /**
    +     * Checks whether we can schedule an Executor exec on the worker slot ws
    +     * @param exec
    +     * @param ws
    +     * @param td
    +     * @param scheduleAssignmentMap
    +     * @return a boolean: True denoting the exec can be scheduled on ws and false if it cannot
    +     */
    +    private boolean checkWorkerConstraints(ExecutorDetails exec, WorkerSlot ws, TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) {
    --- End diff --
    
    will add comment


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/746#discussion_r40838824
  
    --- Diff: storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceUtils.java ---
    @@ -0,0 +1,117 @@
    +package backtype.storm.scheduler.resource;
    +
    +import backtype.storm.Config;
    +import backtype.storm.generated.Bolt;
    +import backtype.storm.generated.SpoutSpec;
    +import backtype.storm.generated.StormTopology;
    +import org.json.simple.JSONObject;
    +import org.json.simple.parser.JSONParser;
    +import org.json.simple.parser.ParseException;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +/**
    + * Created by jerrypeng on 9/22/15.
    + */
    +public class ResourceUtils {
    --- End diff --
    
    git should take care of history instead of code comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

Posted by jerrypeng <gi...@git.apache.org>.
Github user jerrypeng commented on a diff in the pull request:

    https://github.com/apache/storm/pull/746#discussion_r40734516
  
    --- Diff: storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java ---
    @@ -79,11 +96,304 @@ public StormTopology getTopology() {
                     ret.put(executor, compId);
                 }
             }
    -        
    +
             return ret;
         }
    -    
    +
         public Collection<ExecutorDetails> getExecutors() {
             return this.executorToComponent.keySet();
         }
    +
    +    private void initResourceList() {
    +        _resourceList = new HashMap<ExecutorDetails, Map<String, Double>>();
    +        // Extract bolt memory info
    +        if (this.topology.get_bolts() != null) {
    +            for (Map.Entry<String, Bolt> bolt : this.topology.get_bolts().entrySet()) {
    +                //the json_conf is populated by TopologyBuilder (e.g. boltDeclarer.setMemoryLoad)
    +                Map<String, Double> topology_resources = backtype.storm.scheduler.resource.Utils.parseResources(bolt
    +                        .getValue().get_common().get_json_conf());
    +                backtype.storm.scheduler.resource.Utils.checkIntialization(topology_resources, bolt.getValue().toString(), this.topologyConf);
    +                for (Map.Entry<ExecutorDetails, String> anExecutorToComponent : executorToComponent.entrySet()) {
    +                    if (bolt.getKey().equals(anExecutorToComponent.getValue())) {
    +                        _resourceList.put(anExecutorToComponent.getKey(), topology_resources);
    +                    }
    +                }
    +            }
    +        } else {
    +            LOG.warn("Topology " + topologyId + " does not seem to have any bolts!");
    +        }
    +        // Extract spout memory info
    +        if (this.topology.get_spouts() != null) {
    +            for (Map.Entry<String, SpoutSpec> spout : this.topology.get_spouts().entrySet()) {
    +                Map<String, Double> topology_resources = backtype.storm.scheduler.resource.Utils.parseResources(spout
    +                        .getValue().get_common().get_json_conf());
    +                backtype.storm.scheduler.resource.Utils.checkIntialization(topology_resources, spout.getValue().toString(), this.topologyConf);
    +                for (Map.Entry<ExecutorDetails, String> anExecutorToComponent : executorToComponent.entrySet()) {
    +                    if (spout.getKey().equals(anExecutorToComponent.getValue())) {
    +                        _resourceList.put(anExecutorToComponent.getKey(), topology_resources);
    +                    }
    +                }
    +            }
    +        } else {
    +            LOG.warn("Topology " + topologyId + " does not seem to have any spouts!");
    +        }
    +        //schedule tasks that are not part of components returned from topology.get_spout or topology.getbolt (AKA sys tasks most specifically __acker tasks)
    +        for(ExecutorDetails exec : this.getExecutors()) {
    +            if (_resourceList.containsKey(exec) == false) {
    +                LOG.info(
    --- End diff --
    
    will change


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

Posted by erikdw <gi...@git.apache.org>.
Github user erikdw commented on a diff in the pull request:

    https://github.com/apache/storm/pull/746#discussion_r41366483
  
    --- Diff: storm-core/src/jvm/backtype/storm/networktopography/DNSToSwitchMapping.java ---
    @@ -0,0 +1,50 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package backtype.storm.networktopography;
    +
    +import java.util.List;
    +import java.util.Map;
    +
    +/**
    + * An interface that must be implemented to allow pluggable
    + * DNS-name/IP-address to RackID resolvers.
    + *
    + */
    +public interface DNSToSwitchMapping {
    +  public final static String DEFAULT_RACK = "/default-rack";
    --- End diff --
    
    please use consistent indentation.  The java code in the storm project seems to use 4-space indents.  This applies to this file as well as AbstractDNSToSwitchMapping.java.  Maybe others that I've missed with all the generated code making this a bit hard to read through.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/746#discussion_r40838543
  
    --- Diff: storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java ---
    @@ -0,0 +1,152 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package backtype.storm.scheduler.resource;
    +
    +import java.util.*;
    --- End diff --
    
    I don't really like .* imports they can result in errors if new classes are added to that package.  It is not a big deal, but would be nice to avoid.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/746#discussion_r40729915
  
    --- Diff: storm-core/src/jvm/backtype/storm/Config.java ---
    @@ -167,6 +167,16 @@
         public static final Object STORM_CLUSTER_MODE_SCHEMA = String.class;
     
         /**
    +     * What Network Topography detection classes should we use.
    +     * Given a list of supervisor hostnames (or IP addresses), this class would return a list of
    +     * rack names that correspond to the supervisors. This information is stored in Cluster.java, and
    +     * is used in the resource aware scheduler.
    +     */
    +    public static final String STORM_NETWORK_TOPOGRAPHY_PLUGIN = "storm.network.topography.plugin";
    +    public static final Object STORM_NETWORK_TOPOGRAPHY_PLUGIN_SCHEMA = String.class;
    +
    +
    --- End diff --
    
    Extra line of white space


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

Posted by erikdw <gi...@git.apache.org>.
Github user erikdw commented on a diff in the pull request:

    https://github.com/apache/storm/pull/746#discussion_r41366160
  
    --- Diff: storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/IStrategy.java ---
    @@ -0,0 +1,37 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package backtype.storm.scheduler.resource.strategies;
    +
    +import java.util.Collection;
    +import java.util.Map;
    +
    +import backtype.storm.scheduler.Topologies;
    +import backtype.storm.scheduler.ExecutorDetails;
    +import backtype.storm.scheduler.TopologyDetails;
    +import backtype.storm.scheduler.WorkerSlot;
    +import backtype.storm.scheduler.resource.RAS_Node;
    +
    +/**
    + * An interface to for implementing different scheduling strategies for the resource aware scheduling
    + * In the future stategies will be pluggable
    + */
    +public interface IStrategy {
    +
    +	public Map<WorkerSlot, Collection<ExecutorDetails>> schedule(TopologyDetails td);
    --- End diff --
    
    this is either a tab (evil) or 8 spaces.  It seems most of this java code has 4 space indents.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/746#discussion_r40838080
  
    --- Diff: storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java ---
    @@ -79,11 +95,302 @@ public StormTopology getTopology() {
                     ret.put(executor, compId);
                 }
             }
    -        
    +
             return ret;
         }
    -    
    +
         public Collection<ExecutorDetails> getExecutors() {
             return this.executorToComponent.keySet();
         }
    +
    +    private void initResourceList() {
    +        _resourceList = new HashMap<ExecutorDetails, Map<String, Double>>();
    +        // Extract bolt memory info
    +        if (this.topology.get_bolts() != null) {
    +            for (Map.Entry<String, Bolt> bolt : this.topology.get_bolts().entrySet()) {
    +                //the json_conf is populated by TopologyBuilder (e.g. boltDeclarer.setMemoryLoad)
    +                Map<String, Double> topology_resources = ResourceUtils.parseResources(bolt
    +                        .getValue().get_common().get_json_conf());
    +                ResourceUtils.checkIntialization(topology_resources, bolt.getValue().toString(), this.topologyConf);
    +                for (Map.Entry<ExecutorDetails, String> anExecutorToComponent : executorToComponent.entrySet()) {
    +                    if (bolt.getKey().equals(anExecutorToComponent.getValue())) {
    +                        _resourceList.put(anExecutorToComponent.getKey(), topology_resources);
    +                    }
    +                }
    +            }
    +        }
    +        // Extract spout memory info
    +        if (this.topology.get_spouts() != null) {
    +            for (Map.Entry<String, SpoutSpec> spout : this.topology.get_spouts().entrySet()) {
    +                Map<String, Double> topology_resources = ResourceUtils.parseResources(spout
    +                        .getValue().get_common().get_json_conf());
    +                ResourceUtils.checkIntialization(topology_resources, spout.getValue().toString(), this.topologyConf);
    +                for (Map.Entry<ExecutorDetails, String> anExecutorToComponent : executorToComponent.entrySet()) {
    +                    if (spout.getKey().equals(anExecutorToComponent.getValue())) {
    +                        _resourceList.put(anExecutorToComponent.getKey(), topology_resources);
    +                    }
    +                }
    +            }
    +        } else {
    +            LOG.warn("Topology " + topologyId + " does not seem to have any spouts!");
    +        }
    +        //schedule tasks that are not part of components returned from topology.get_spout or topology.getbolt (AKA sys tasks most specifically __acker tasks)
    +        for(ExecutorDetails exec : this.getExecutors()) {
    +            if (_resourceList.containsKey(exec) == false) {
    +                LOG.debug(
    +                        "Scheduling {} {} with memory requirement as 'on heap' - {} and 'off heap' - {} and CPU requirement as {}",
    +                        this.getExecutorToComponent().get(exec),
    +                        exec,
    +                        this.topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB),
    +                        this.topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB),
    +                        this.topologyConf.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT));
    +                this.addDefaultResforExec(exec);
    +            } 
    +        }
    +    }
    +
    +    private List<ExecutorDetails> componentToExecs(String comp) {
    +        List<ExecutorDetails> execs = new ArrayList<>();
    +        for (Map.Entry<ExecutorDetails, String> entry : executorToComponent.entrySet()) {
    +            if (entry.getValue().equals(comp)) {
    +                execs.add(entry.getKey());
    +            }
    +        }
    +        return execs;
    +    }
    +
    +    /**
    +     * Returns a representation of the non-system components of the topology graph
    +     * Each Component object in the returning map is populated with the list of its
    +     * parents, children and execs assigned to that component.
    +     * @return a map of components
    +     */
    +    public Map<String, Component> getComponents() {
    +        Map<String, Component> all_comp = new HashMap<String, Component>();
    +
    +        StormTopology storm_topo = this.topology;
    +        // spouts
    +        if (storm_topo.get_spouts() != null) {
    +            for (Map.Entry<String, SpoutSpec> spoutEntry : storm_topo
    +                    .get_spouts().entrySet()) {
    +                if (!Utils.isSystemId(spoutEntry.getKey())) {
    +                    Component newComp = null;
    +                    if (all_comp.containsKey(spoutEntry.getKey())) {
    +                        newComp = all_comp.get(spoutEntry.getKey());
    +                        newComp.execs = componentToExecs(newComp.id);
    +                    } else {
    +                        newComp = new Component(spoutEntry.getKey());
    +                        newComp.execs = componentToExecs(newComp.id);
    +                        all_comp.put(spoutEntry.getKey(), newComp);
    +                    }
    +                    newComp.type = Component.ComponentType.SPOUT;
    +
    +                    for (Map.Entry<GlobalStreamId, Grouping> spoutInput : spoutEntry
    +                            .getValue().get_common().get_inputs()
    +                            .entrySet()) {
    +                        newComp.parents.add(spoutInput.getKey()
    +                                .get_componentId());
    +                        if (!all_comp.containsKey(spoutInput
    +                                .getKey().get_componentId())) {
    +                            all_comp.put(spoutInput.getKey()
    +                                            .get_componentId(),
    +                                    new Component(spoutInput.getKey()
    +                                            .get_componentId()));
    +                        }
    +                        all_comp.get(spoutInput.getKey()
    +                                .get_componentId()).children.add(spoutEntry
    +                                .getKey());
    +                    }
    +                }
    +            }
    +        }
    +        // bolts
    +        if (storm_topo.get_bolts() != null) {
    +            for (Map.Entry<String, Bolt> boltEntry : storm_topo.get_bolts()
    +                    .entrySet()) {
    +                if (!Utils.isSystemId(boltEntry.getKey())) {
    +                    Component newComp = null;
    +                    if (all_comp.containsKey(boltEntry.getKey())) {
    +                        newComp = all_comp.get(boltEntry.getKey());
    +                        newComp.execs = componentToExecs(newComp.id);
    +                    } else {
    +                        newComp = new Component(boltEntry.getKey());
    +                        newComp.execs = componentToExecs(newComp.id);
    +                        all_comp.put(boltEntry.getKey(), newComp);
    +                    }
    +                    newComp.type = Component.ComponentType.BOLT;
    +
    +                    for (Map.Entry<GlobalStreamId, Grouping> boltInput : boltEntry
    +                            .getValue().get_common().get_inputs()
    +                            .entrySet()) {
    +                        newComp.parents.add(boltInput.getKey()
    +                                .get_componentId());
    +                        if (!all_comp.containsKey(boltInput
    +                                .getKey().get_componentId())) {
    +                            all_comp.put(boltInput.getKey()
    +                                            .get_componentId(),
    +                                    new Component(boltInput.getKey()
    +                                            .get_componentId()));
    +                        }
    +                        all_comp.get(boltInput.getKey()
    +                                .get_componentId()).children.add(boltEntry
    +                                .getKey());
    +                    }
    +                }
    +            }
    +        }
    +        return all_comp;
    +    }
    +
    +    /**
    +     * Gets the on heap memory requirement for a
    +     * certain task within a topology
    +     * @param exec the executor the inquiry is concerning.
    +     * @return Double the amount of on heap memory
    +     * requirement for this exec in topology topoId.
    +     */
    +    public Double getOnHeapMemoryRequirement(ExecutorDetails exec) {
    +        Double ret = null;
    +        if (hasExecInTopo(exec)) {
    +            ret = _resourceList
    +                    .get(exec)
    +                    .get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
    +        }
    +        return ret;
    +    }
    +
    +    /**
    +     * Gets the off heap memory requirement for a
    +     * certain task within a topology
    +     * @param exec the executor the inquiry is concerning.
    +     * @return Double the amount of off heap memory
    +     * requirement for this exec in topology topoId.
    +     */
    +    public Double getOffHeapMemoryRequirement(ExecutorDetails exec) {
    +        Double ret = null;
    +        if (hasExecInTopo(exec)) {
    +            ret = _resourceList
    +                    .get(exec)
    +                    .get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB);
    +        }
    +        return ret;
    +    }
    +
    +    /**
    +     * Gets the total memory requirement for a task
    +     * @param exec the executor the inquiry is concerning.
    +     * @return Double the total memory requirement
    +     *  for this exec in topology topoId.
    +     */
    +    public Double getTotalMemReqTask(ExecutorDetails exec) {
    +        if (hasExecInTopo(exec)) {
    +            return getOffHeapMemoryRequirement(exec)
    +                    + getOnHeapMemoryRequirement(exec);
    +        }
    +        LOG.info("cannot find {}", exec);
    --- End diff --
    
    Why does this need a log message? Does not seem to add much.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

Posted by jerrypeng <gi...@git.apache.org>.
Github user jerrypeng commented on the pull request:

    https://github.com/apache/storm/pull/746#issuecomment-146880546
  
    @revans2 Thank you!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

Posted by jerrypeng <gi...@git.apache.org>.
Github user jerrypeng commented on the pull request:

    https://github.com/apache/storm/pull/746#issuecomment-146896112
  
    @revans2  thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the pull request:

    https://github.com/apache/storm/pull/746#issuecomment-144190888
  
    Well first of all you need to upmerge at some point soon.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

Posted by jerrypeng <gi...@git.apache.org>.
Github user jerrypeng commented on a diff in the pull request:

    https://github.com/apache/storm/pull/746#discussion_r41410090
  
    --- Diff: storm-core/src/jvm/backtype/storm/networktopography/DNSToSwitchMapping.java ---
    @@ -0,0 +1,50 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package backtype.storm.networktopography;
    +
    +import java.util.List;
    +import java.util.Map;
    +
    +/**
    + * An interface that must be implemented to allow pluggable
    + * DNS-name/IP-address to RackID resolvers.
    + *
    + */
    +public interface DNSToSwitchMapping {
    +  public final static String DEFAULT_RACK = "/default-rack";
    --- End diff --
    
    will reformat


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/746#discussion_r40838571
  
    --- Diff: storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java ---
    @@ -0,0 +1,152 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package backtype.storm.scheduler.resource;
    +
    +import java.util.*;
    +
    +import backtype.storm.Config;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import backtype.storm.scheduler.Cluster;
    +import backtype.storm.scheduler.ExecutorDetails;
    +import backtype.storm.scheduler.IScheduler;
    +import backtype.storm.scheduler.Topologies;
    +import backtype.storm.scheduler.TopologyDetails;
    +import backtype.storm.scheduler.WorkerSlot;
    +import backtype.storm.scheduler.resource.strategies.ResourceAwareStrategy;
    +
    +public class ResourceAwareScheduler implements IScheduler {
    +    private static final Logger LOG = LoggerFactory
    +            .getLogger(ResourceAwareScheduler.class);
    +    @SuppressWarnings("rawtypes")
    +    private Map _conf;
    +
    +    @Override
    +    public void prepare(Map conf) {
    +        _conf = conf;
    +    }
    +
    +    @Override
    +    public void schedule(Topologies topologies, Cluster cluster) {
    +        LOG.info("\n\n\nRerunning ResourceAwareScheduler...");
    --- End diff --
    
    debug or remove.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

Posted by zhuoliu <gi...@git.apache.org>.
Github user zhuoliu commented on a diff in the pull request:

    https://github.com/apache/storm/pull/746#discussion_r41283467
  
    --- Diff: storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/ResourceAwareStrategy.java ---
    @@ -0,0 +1,478 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package backtype.storm.scheduler.resource.strategies;
    +
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +import java.util.Queue;
    +import java.util.TreeMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import backtype.storm.scheduler.Cluster;
    +import backtype.storm.scheduler.ExecutorDetails;
    +import backtype.storm.scheduler.Topologies;
    +import backtype.storm.scheduler.TopologyDetails;
    +import backtype.storm.scheduler.WorkerSlot;
    +import backtype.storm.scheduler.resource.Component;
    +import backtype.storm.scheduler.resource.RAS_Node;
    +
    +public class ResourceAwareStrategy implements IStrategy {
    +    private Logger LOG = null;
    +    private Topologies _topologies;
    +    private Cluster _cluster;
    +    //Map key is the supervisor id and the value is the corresponding RAS_Node Object 
    +    private Map<String, RAS_Node> _availNodes;
    +    private RAS_Node refNode = null;
    +    /**
    +     * supervisor id -> Node
    +     */
    +    private Map<String, RAS_Node> _nodes;
    +    private Map<String, List<String>> _clusterInfo;
    +
    +    private final double CPU_WEIGHT = 1.0;
    +    private final double MEM_WEIGHT = 1.0;
    +    private final double NETWORK_WEIGHT = 1.0;
    +
    +    public ResourceAwareStrategy(Cluster cluster, Topologies topologies) {
    +        _topologies = topologies;
    +        _cluster = cluster;
    +        _nodes = RAS_Node.getAllNodesFrom(cluster, _topologies);
    +        _availNodes = this.getAvailNodes();
    +        this.LOG = LoggerFactory.getLogger(this.getClass());
    +        _clusterInfo = cluster.getNetworkTopography();
    +        LOG.debug(this.getClusterInfo());
    +    }
    +
    +    //the returned TreeMap keeps the Components sorted
    +    private TreeMap<Integer, List<ExecutorDetails>> getPriorityToExecutorDetailsListMap(
    +            Queue<Component> ordered__Component_list, Collection<ExecutorDetails> unassignedExecutors) {
    +        TreeMap<Integer, List<ExecutorDetails>> retMap = new TreeMap<Integer, List<ExecutorDetails>>();
    +        Integer rank = 0;
    +        for (Component ras_comp : ordered__Component_list) {
    +            retMap.put(rank, new ArrayList<ExecutorDetails>());
    +            for(ExecutorDetails exec : ras_comp.execs) {
    +                if(unassignedExecutors.contains(exec)) {
    +                    retMap.get(rank).add(exec);
    +                }
    +            }
    +            rank++;
    +        }
    +        return retMap;
    +    }
    +
    +    public Map<WorkerSlot, Collection<ExecutorDetails>> schedule(TopologyDetails td) {
    +        if (_availNodes.size() <= 0) {
    +            LOG.warn("No available nodes to schedule tasks on!");
    +            return null;
    +        }
    +        Collection<ExecutorDetails> unassignedExecutors = _cluster.getUnassignedExecutors(td);
    +        Map<WorkerSlot, Collection<ExecutorDetails>> schedulerAssignmentMap = new HashMap<WorkerSlot, Collection<ExecutorDetails>>();
    +        LOG.debug("ExecutorsNeedScheduling: {}", unassignedExecutors);
    +        Collection<ExecutorDetails> scheduledTasks = new ArrayList<ExecutorDetails>();
    +        List<Component> spouts = this.getSpouts(_topologies, td);
    +
    +        if (spouts.size() == 0) {
    +            LOG.error("Cannot find a Spout!");
    +            return null;
    +        }
    +
    +        Queue<Component> ordered__Component_list = bfs(_topologies, td, spouts);
    +
    +        Map<Integer, List<ExecutorDetails>> priorityToExecutorMap = getPriorityToExecutorDetailsListMap(ordered__Component_list, unassignedExecutors);
    +        Collection<ExecutorDetails> executorsNotScheduled = new HashSet<ExecutorDetails>(unassignedExecutors);
    +        Integer longestPriorityListSize = this.getLongestPriorityListSize(priorityToExecutorMap);
    +        //Pick the first executor with priority one, then the 1st exec with priority 2, so on an so forth. 
    +        //Once we reach the last priority, we go back to priority 1 and schedule the second task with priority 1.
    +        for (int i = 0; i < longestPriorityListSize; i++) {
    +            for (Entry<Integer, List<ExecutorDetails>> entry : priorityToExecutorMap.entrySet()) {
    +                Iterator<ExecutorDetails> it = entry.getValue().iterator();
    +                if (it.hasNext()) {
    +                    ExecutorDetails exec = it.next();
    +                    LOG.debug("\n\nAttempting to schedule: {} of component {}[avail {}] with rank {}",
    +                            new Object[] { exec, td.getExecutorToComponent().get(exec),
    +                    td.getTaskResourceReqList(exec), entry.getKey() });
    +                    WorkerSlot targetSlot = this.findWorkerForExec(exec, td, schedulerAssignmentMap);
    +                    if (targetSlot != null) {
    +                        RAS_Node targetNode = this.idToNode(targetSlot.getNodeId());
    +                        if(!schedulerAssignmentMap.containsKey(targetSlot)) {
    +                            schedulerAssignmentMap.put(targetSlot, new LinkedList<ExecutorDetails>());
    +                        }
    +                       
    +                        schedulerAssignmentMap.get(targetSlot).add(exec);
    +                        targetNode.consumeResourcesforTask(exec, td);
    +                        scheduledTasks.add(exec);
    +                        LOG.debug("TASK {} assigned to Node: {} avail [mem: {} cpu: {}] total [mem: {} cpu: {}] on slot: {}", exec,
    +                                targetNode, targetNode.getAvailableMemoryResources(),
    +                                targetNode.getAvailableCpuResources(), targetNode.getTotalMemoryResources(),
    +                                targetNode.getTotalCpuResources(), targetSlot);
    +                    } else {
    +                        LOG.error("Not Enough Resources to schedule Task {}", exec);
    +                    }
    +                    it.remove();
    +                }
    +            }
    +        }
    +
    +        executorsNotScheduled.removeAll(scheduledTasks);
    +        LOG.debug("/* Scheduling left over task (most likely sys tasks) */");
    +        // schedule left over system tasks
    +        for (ExecutorDetails exec : executorsNotScheduled) {
    +            WorkerSlot targetSlot = this.findWorkerForExec(exec, td, schedulerAssignmentMap);
    +            if (targetSlot != null) {
    +                RAS_Node targetNode = this.idToNode(targetSlot.getNodeId());
    +                if(schedulerAssignmentMap.containsKey(targetSlot) == false) {
    +                    schedulerAssignmentMap.put(targetSlot, new LinkedList<ExecutorDetails>());
    +                }
    +               
    +                schedulerAssignmentMap.get(targetSlot).add(exec);
    +                targetNode.consumeResourcesforTask(exec, td);
    +                scheduledTasks.add(exec);
    +                LOG.debug("TASK {} assigned to Node: {} avail [mem: {} cpu: {}] total [mem: {} cpu: {}] on slot: {}", exec,
    +                        targetNode, targetNode.getAvailableMemoryResources(),
    +                        targetNode.getAvailableCpuResources(), targetNode.getTotalMemoryResources(),
    +                        targetNode.getTotalCpuResources(), targetSlot);
    +            } else {
    +                LOG.error("Not Enough Resources to schedule Task {}", exec);
    +            }
    +        }
    +        executorsNotScheduled.removeAll(scheduledTasks);
    +        if (executorsNotScheduled.size() > 0) {
    +            LOG.error("Not all executors successfully scheduled: {}",
    +                    executorsNotScheduled);
    +            schedulerAssignmentMap = null;
    +        } else {
    +            LOG.debug("All resources successfully scheduled!");
    +        }
    +        if (schedulerAssignmentMap == null) {
    +            LOG.error("Topology {} not successfully scheduled!", td.getId());
    +        }
    +        return schedulerAssignmentMap;
    +    }
    +
    +    private WorkerSlot findWorkerForExec(ExecutorDetails exec, TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) {
    +      WorkerSlot ws = null;
    +      // first scheduling
    +      if (this.refNode == null) {
    +          String clus = this.getBestClustering();
    +          ws = this.getBestWorker(exec, td, clus, scheduleAssignmentMap);
    +      } else {
    +          ws = this.getBestWorker(exec, td, scheduleAssignmentMap);
    +      }
    +      if(ws != null) {
    +          this.refNode = this.idToNode(ws.getNodeId());
    +      }
    +      LOG.debug("reference node for the resource aware scheduler is: {}", this.refNode);
    +      return ws;
    +    }
    +
    +    private WorkerSlot getBestWorker(ExecutorDetails exec, TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) {
    +        return this.getBestWorker(exec, td, null, scheduleAssignmentMap);
    +    }
    +
    +    private WorkerSlot getBestWorker(ExecutorDetails exec, TopologyDetails td, String clusterId, Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) {
    +        double taskMem = td.getTotalMemReqTask(exec);
    +        double taskCPU = td.getTotalCpuReqTask(exec);
    +        List<RAS_Node> nodes;
    +        if(clusterId != null) {
    +            nodes = this.getAvailableNodesFromCluster(clusterId);
    +            
    +        } else {
    +            nodes = this.getAvailableNodes();
    +        }
    +        TreeMap<Double, RAS_Node> nodeRankMap = new TreeMap<Double, RAS_Node>();
    +        for (RAS_Node n : nodes) {
    +            if(n.getFreeSlots().size()>0) {
    +                if (n.getAvailableMemoryResources() >= taskMem
    +                      && n.getAvailableCpuResources() >= taskCPU) {
    +                  double a = Math.pow((taskCPU - n.getAvailableCpuResources())
    +                          * this.CPU_WEIGHT, 2);
    +                  double b = Math.pow((taskMem - n.getAvailableMemoryResources())
    +                          * this.MEM_WEIGHT, 2);
    +                  double c = 0.0;
    +                  if(this.refNode != null) {
    +                      c = Math.pow(this.distToNode(this.refNode, n)
    +                              * this.NETWORK_WEIGHT, 2);
    +                  }
    +                  double distance = Math.sqrt(a + b + c);
    +                  nodeRankMap.put(distance, n);
    +                }
    +            }
    +        }
    +        
    +        for(Map.Entry<Double, RAS_Node> entry : nodeRankMap.entrySet()) {
    +            RAS_Node n = entry.getValue();
    +            for(WorkerSlot ws : n.getFreeSlots()) {
    +                if(checkWorkerConstraints(exec, ws, td, scheduleAssignmentMap)) {
    +                    return ws;
    +                }
    +            }
    +        }
    +        return null;
    +    }
    +
    +    private String getBestClustering() {
    +        String bestCluster = null;
    +        Double mostRes = 0.0;
    +        for (Entry<String, List<String>> cluster : _clusterInfo
    +                .entrySet()) {
    +            Double clusterTotalRes = this.getTotalClusterRes(cluster.getValue());
    +            if (clusterTotalRes > mostRes) {
    +                mostRes = clusterTotalRes;
    +                bestCluster = cluster.getKey();
    +            }
    +        }
    +        return bestCluster;
    +    }
    +
    +    private Double getTotalClusterRes(List<String> cluster) {
    +        Double res = 0.0;
    +        for (String node : cluster) {
    +            res += _availNodes.get(this.NodeHostnameToId(node))
    +                    .getAvailableMemoryResources()
    +                    + _availNodes.get(this.NodeHostnameToId(node))
    +                    .getAvailableCpuResources();
    +        }
    +        return res;
    +    }
    +
    +    private Double distToNode(RAS_Node src, RAS_Node dest) {
    +        if (src.getId().equals(dest.getId())==true) {
    +            return 1.0;
    +        }else if (this.NodeToCluster(src) == this.NodeToCluster(dest)) {
    +            return 2.0;
    +        } else {
    +            return 3.0;
    +        }
    +    }
    +
    +    private String NodeToCluster(RAS_Node node) {
    +        for (Entry<String, List<String>> entry : _clusterInfo
    +                .entrySet()) {
    +            if (entry.getValue().contains(node.getHostname())) {
    +                return entry.getKey();
    +            }
    +        }
    +        LOG.error("Node: {} not found in any clusters", node.getHostname());
    +        return null;
    +    }
    +    
    +    private List<RAS_Node> getAvailableNodes() {
    +        LinkedList<RAS_Node> nodes = new LinkedList<RAS_Node>();
    +        for (String clusterId : _clusterInfo.keySet()) {
    +            nodes.addAll(this.getAvailableNodesFromCluster(clusterId));
    +        }
    +        return nodes;
    +    }
    +
    +    private List<RAS_Node> getAvailableNodesFromCluster(String clus) {
    +        List<RAS_Node> retList = new ArrayList<RAS_Node>();
    +        for (String node_id : _clusterInfo.get(clus)) {
    +            retList.add(_availNodes.get(this
    +                    .NodeHostnameToId(node_id)));
    +        }
    +        return retList;
    +    }
    +
    +    private List<WorkerSlot> getAvailableWorkersFromCluster(String clusterId) {
    +        List<RAS_Node> nodes = this.getAvailableNodesFromCluster(clusterId);
    +        List<WorkerSlot> workers = new LinkedList<WorkerSlot>();
    +        for(RAS_Node node : nodes) {
    +            workers.addAll(node.getFreeSlots());
    +        }
    +        return workers;
    +    }
    +
    +    private List<WorkerSlot> getAvailableWorker() {
    +        List<WorkerSlot> workers = new LinkedList<WorkerSlot>();
    +        for (String clusterId : _clusterInfo.keySet()) {
    +            workers.addAll(this.getAvailableWorkersFromCluster(clusterId));
    +        }
    +        return workers;
    +    }
    +
    +    /**
    +     * In case in the future RAS can only use a subset of nodes
    +     */
    +    private Map<String, RAS_Node> getAvailNodes() {
    +        return _nodes;
    +    }
    +
    +    /**
    +     * Breadth first traversal of the topology DAG
    +     * @param topologies
    +     * @param td
    +     * @param spouts
    +     * @return A partial ordering of components
    +     */
    +    private Queue<Component> bfs(Topologies topologies, TopologyDetails td, List<Component> spouts) {
    +        // Since queue is a interface
    +        Queue<Component> ordered__Component_list = new LinkedList<Component>();
    +        HashMap<String, Component> visited = new HashMap<String, Component>();
    +
    +        /* start from each spout that is not visited, each does a breadth-first traverse */
    +        for (Component spout : spouts) {
    +            if (!visited.containsKey(spout.id)) {
    +                Queue<Component> queue = new LinkedList<Component>();
    +                queue.offer(spout);
    +                while (!queue.isEmpty()) {
    +                    Component comp = queue.poll();
    +                    visited.put(comp.id, comp);
    +                    ordered__Component_list.add(comp);
    +                    List<String> neighbors = new ArrayList<String>();
    +                    neighbors.addAll(comp.children);
    +                    neighbors.addAll(comp.parents);
    +                    for (String nbID : neighbors) {
    +                        if (!visited.containsKey(nbID)) {
    +                            Component child = topologies.getAllComponents().get(td.getId()).get(nbID);
    +                            queue.offer(child);
    +                        }
    +                    }
    +                }
    +            }
    +        }
    +        return ordered__Component_list;
    +    }
    +
    +    private List<Component> getSpouts(Topologies topologies, TopologyDetails td) {
    +        List<Component> spouts = new ArrayList<Component>();
    +        for (Component c : topologies.getAllComponents().get(td.getId())
    +                .values()) {
    +            if (c.type == Component.ComponentType.SPOUT) {
    +                spouts.add(c);
    +            }
    +        }
    +        return spouts;
    +    }
    +
    +    private Integer getLongestPriorityListSize(Map<Integer, List<ExecutorDetails>> priorityToExecutorMap) {
    +        Integer mostNum = 0;
    +        for (List<ExecutorDetails> execs : priorityToExecutorMap.values()) {
    +            Integer numExecs = execs.size();
    +            if (mostNum < numExecs) {
    +                mostNum = numExecs;
    +            }
    +        }
    +        return mostNum;
    +    }
    +
    +    /**
    +     * Get the remaining amount memory that can be assigned to a worker given the set worker max heap size
    +     * @param ws
    +     * @param td
    +     * @param scheduleAssignmentMap
    +     * @return The remaining amount of memory
    +     */
    +    private Double getWorkerScheduledMemoryAvailable(WorkerSlot ws, TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) {
    +        Double memScheduleUsed = this.getWorkerScheduledMemoryUse(ws, td, scheduleAssignmentMap);
    +        return td.getTopologyWorkerMaxHeapSize() - memScheduleUsed;
    +    }
    +
    +    /**
    +     * Get the amount of memory already assigned to a worker
    +     * @param ws
    +     * @param td
    +     * @param scheduleAssignmentMap
    +     * @return the amount of memory
    +     */
    +    private Double getWorkerScheduledMemoryUse(WorkerSlot ws, TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) {
    +        Double totalMem = 0.0;
    +        Collection<ExecutorDetails> execs = scheduleAssignmentMap.get(ws);
    +        if(execs != null) {
    +            for(ExecutorDetails exec : execs) {
    +                totalMem += td.getTotalMemReqTask(exec);
    +            }
    +        } 
    +        return totalMem;
    +    }
    +
    +    /**
    +     * Checks whether we can schedule an Executor exec on the worker slot ws
    +     * @param exec
    +     * @param ws
    +     * @param td
    +     * @param scheduleAssignmentMap
    +     * @return a boolean: True denoting the exec can be scheduled on ws and false if it cannot
    +     */
    +    private boolean checkWorkerConstraints(ExecutorDetails exec, WorkerSlot ws, TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) {
    --- End diff --
    
    Since we now only consider the memory when checkWorkerConstraints, may add some comments saying that we will also check CPU constraint in the near future.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

Posted by jerrypeng <gi...@git.apache.org>.
Github user jerrypeng commented on a diff in the pull request:

    https://github.com/apache/storm/pull/746#discussion_r40734496
  
    --- Diff: storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java ---
    @@ -79,11 +96,304 @@ public StormTopology getTopology() {
                     ret.put(executor, compId);
                 }
             }
    -        
    +
             return ret;
         }
    -    
    +
         public Collection<ExecutorDetails> getExecutors() {
             return this.executorToComponent.keySet();
         }
    +
    +    private void initResourceList() {
    +        _resourceList = new HashMap<ExecutorDetails, Map<String, Double>>();
    +        // Extract bolt memory info
    +        if (this.topology.get_bolts() != null) {
    +            for (Map.Entry<String, Bolt> bolt : this.topology.get_bolts().entrySet()) {
    +                //the json_conf is populated by TopologyBuilder (e.g. boltDeclarer.setMemoryLoad)
    +                Map<String, Double> topology_resources = backtype.storm.scheduler.resource.Utils.parseResources(bolt
    +                        .getValue().get_common().get_json_conf());
    +                backtype.storm.scheduler.resource.Utils.checkIntialization(topology_resources, bolt.getValue().toString(), this.topologyConf);
    +                for (Map.Entry<ExecutorDetails, String> anExecutorToComponent : executorToComponent.entrySet()) {
    +                    if (bolt.getKey().equals(anExecutorToComponent.getValue())) {
    +                        _resourceList.put(anExecutorToComponent.getKey(), topology_resources);
    +                    }
    +                }
    +            }
    +        } else {
    +            LOG.warn("Topology " + topologyId + " does not seem to have any bolts!");
    --- End diff --
    
    will remove


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

Posted by jerrypeng <gi...@git.apache.org>.
Github user jerrypeng commented on the pull request:

    https://github.com/apache/storm/pull/746#issuecomment-145155422
  
    @knusbaum 
    @rfarivar 
    @zhuoliu 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the pull request:

    https://github.com/apache/storm/pull/746#issuecomment-146895509
  
    @jerrypeng I merged this into master.  Looking at the sub tasks for STORM-893, this looked more like STORM-894.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/746#discussion_r40732244
  
    --- Diff: storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java ---
    @@ -79,11 +96,304 @@ public StormTopology getTopology() {
                     ret.put(executor, compId);
                 }
             }
    -        
    +
             return ret;
         }
    -    
    +
         public Collection<ExecutorDetails> getExecutors() {
             return this.executorToComponent.keySet();
         }
    +
    +    private void initResourceList() {
    +        _resourceList = new HashMap<ExecutorDetails, Map<String, Double>>();
    +        // Extract bolt memory info
    +        if (this.topology.get_bolts() != null) {
    +            for (Map.Entry<String, Bolt> bolt : this.topology.get_bolts().entrySet()) {
    +                //the json_conf is populated by TopologyBuilder (e.g. boltDeclarer.setMemoryLoad)
    +                Map<String, Double> topology_resources = backtype.storm.scheduler.resource.Utils.parseResources(bolt
    +                        .getValue().get_common().get_json_conf());
    +                backtype.storm.scheduler.resource.Utils.checkIntialization(topology_resources, bolt.getValue().toString(), this.topologyConf);
    +                for (Map.Entry<ExecutorDetails, String> anExecutorToComponent : executorToComponent.entrySet()) {
    +                    if (bolt.getKey().equals(anExecutorToComponent.getValue())) {
    +                        _resourceList.put(anExecutorToComponent.getKey(), topology_resources);
    +                    }
    +                }
    +            }
    +        } else {
    +            LOG.warn("Topology " + topologyId + " does not seem to have any bolts!");
    +        }
    +        // Extract spout memory info
    +        if (this.topology.get_spouts() != null) {
    +            for (Map.Entry<String, SpoutSpec> spout : this.topology.get_spouts().entrySet()) {
    +                Map<String, Double> topology_resources = backtype.storm.scheduler.resource.Utils.parseResources(spout
    +                        .getValue().get_common().get_json_conf());
    +                backtype.storm.scheduler.resource.Utils.checkIntialization(topology_resources, spout.getValue().toString(), this.topologyConf);
    +                for (Map.Entry<ExecutorDetails, String> anExecutorToComponent : executorToComponent.entrySet()) {
    +                    if (spout.getKey().equals(anExecutorToComponent.getValue())) {
    +                        _resourceList.put(anExecutorToComponent.getKey(), topology_resources);
    +                    }
    +                }
    +            }
    +        } else {
    +            LOG.warn("Topology " + topologyId + " does not seem to have any spouts!");
    --- End diff --
    
    Ok so this is rather odd, this should probably be a warning.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

Posted by jerrypeng <gi...@git.apache.org>.
Github user jerrypeng commented on the pull request:

    https://github.com/apache/storm/pull/746#issuecomment-144111799
  
    @HeartSaVioR  
    @revans2 
    Can I get a review for my PR. Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the pull request:

    https://github.com/apache/storm/pull/746#issuecomment-144514012
  
    @jerrypeng for the most part the code looks really good.  I would mostly just like to see a lot of the debug logging turned into LOG.debug instead of LOG.info statements.
    
    For everyone else I think it is important to point out that resource aware scheduling is still an experimental feature.  The APIs should be fairly stable, but there is a lot more work to be done with it before it is production ready.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

Posted by jerrypeng <gi...@git.apache.org>.
Github user jerrypeng commented on a diff in the pull request:

    https://github.com/apache/storm/pull/746#discussion_r40849422
  
    --- Diff: storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java ---
    @@ -79,11 +95,302 @@ public StormTopology getTopology() {
                     ret.put(executor, compId);
                 }
             }
    -        
    +
             return ret;
         }
    -    
    +
         public Collection<ExecutorDetails> getExecutors() {
             return this.executorToComponent.keySet();
         }
    +
    +    private void initResourceList() {
    +        _resourceList = new HashMap<ExecutorDetails, Map<String, Double>>();
    +        // Extract bolt memory info
    +        if (this.topology.get_bolts() != null) {
    +            for (Map.Entry<String, Bolt> bolt : this.topology.get_bolts().entrySet()) {
    +                //the json_conf is populated by TopologyBuilder (e.g. boltDeclarer.setMemoryLoad)
    +                Map<String, Double> topology_resources = ResourceUtils.parseResources(bolt
    +                        .getValue().get_common().get_json_conf());
    +                ResourceUtils.checkIntialization(topology_resources, bolt.getValue().toString(), this.topologyConf);
    +                for (Map.Entry<ExecutorDetails, String> anExecutorToComponent : executorToComponent.entrySet()) {
    +                    if (bolt.getKey().equals(anExecutorToComponent.getValue())) {
    +                        _resourceList.put(anExecutorToComponent.getKey(), topology_resources);
    +                    }
    +                }
    +            }
    +        }
    +        // Extract spout memory info
    +        if (this.topology.get_spouts() != null) {
    +            for (Map.Entry<String, SpoutSpec> spout : this.topology.get_spouts().entrySet()) {
    +                Map<String, Double> topology_resources = ResourceUtils.parseResources(spout
    +                        .getValue().get_common().get_json_conf());
    +                ResourceUtils.checkIntialization(topology_resources, spout.getValue().toString(), this.topologyConf);
    +                for (Map.Entry<ExecutorDetails, String> anExecutorToComponent : executorToComponent.entrySet()) {
    +                    if (spout.getKey().equals(anExecutorToComponent.getValue())) {
    +                        _resourceList.put(anExecutorToComponent.getKey(), topology_resources);
    +                    }
    +                }
    +            }
    +        } else {
    +            LOG.warn("Topology " + topologyId + " does not seem to have any spouts!");
    +        }
    +        //schedule tasks that are not part of components returned from topology.get_spout or topology.getbolt (AKA sys tasks most specifically __acker tasks)
    +        for(ExecutorDetails exec : this.getExecutors()) {
    +            if (_resourceList.containsKey(exec) == false) {
    +                LOG.debug(
    +                        "Scheduling {} {} with memory requirement as 'on heap' - {} and 'off heap' - {} and CPU requirement as {}",
    +                        this.getExecutorToComponent().get(exec),
    +                        exec,
    +                        this.topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB),
    +                        this.topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB),
    +                        this.topologyConf.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT));
    +                this.addDefaultResforExec(exec);
    +            } 
    +        }
    +    }
    +
    +    private List<ExecutorDetails> componentToExecs(String comp) {
    +        List<ExecutorDetails> execs = new ArrayList<>();
    +        for (Map.Entry<ExecutorDetails, String> entry : executorToComponent.entrySet()) {
    +            if (entry.getValue().equals(comp)) {
    +                execs.add(entry.getKey());
    +            }
    +        }
    +        return execs;
    +    }
    +
    +    /**
    +     * Returns a representation of the non-system components of the topology graph
    +     * Each Component object in the returning map is populated with the list of its
    +     * parents, children and execs assigned to that component.
    +     * @return a map of components
    +     */
    +    public Map<String, Component> getComponents() {
    +        Map<String, Component> all_comp = new HashMap<String, Component>();
    +
    +        StormTopology storm_topo = this.topology;
    +        // spouts
    +        if (storm_topo.get_spouts() != null) {
    +            for (Map.Entry<String, SpoutSpec> spoutEntry : storm_topo
    +                    .get_spouts().entrySet()) {
    +                if (!Utils.isSystemId(spoutEntry.getKey())) {
    +                    Component newComp = null;
    +                    if (all_comp.containsKey(spoutEntry.getKey())) {
    +                        newComp = all_comp.get(spoutEntry.getKey());
    +                        newComp.execs = componentToExecs(newComp.id);
    +                    } else {
    +                        newComp = new Component(spoutEntry.getKey());
    +                        newComp.execs = componentToExecs(newComp.id);
    +                        all_comp.put(spoutEntry.getKey(), newComp);
    +                    }
    +                    newComp.type = Component.ComponentType.SPOUT;
    +
    +                    for (Map.Entry<GlobalStreamId, Grouping> spoutInput : spoutEntry
    +                            .getValue().get_common().get_inputs()
    +                            .entrySet()) {
    +                        newComp.parents.add(spoutInput.getKey()
    +                                .get_componentId());
    +                        if (!all_comp.containsKey(spoutInput
    +                                .getKey().get_componentId())) {
    +                            all_comp.put(spoutInput.getKey()
    +                                            .get_componentId(),
    +                                    new Component(spoutInput.getKey()
    +                                            .get_componentId()));
    +                        }
    +                        all_comp.get(spoutInput.getKey()
    +                                .get_componentId()).children.add(spoutEntry
    +                                .getKey());
    +                    }
    +                }
    +            }
    +        }
    +        // bolts
    +        if (storm_topo.get_bolts() != null) {
    +            for (Map.Entry<String, Bolt> boltEntry : storm_topo.get_bolts()
    +                    .entrySet()) {
    +                if (!Utils.isSystemId(boltEntry.getKey())) {
    +                    Component newComp = null;
    +                    if (all_comp.containsKey(boltEntry.getKey())) {
    +                        newComp = all_comp.get(boltEntry.getKey());
    +                        newComp.execs = componentToExecs(newComp.id);
    +                    } else {
    +                        newComp = new Component(boltEntry.getKey());
    +                        newComp.execs = componentToExecs(newComp.id);
    +                        all_comp.put(boltEntry.getKey(), newComp);
    +                    }
    +                    newComp.type = Component.ComponentType.BOLT;
    +
    +                    for (Map.Entry<GlobalStreamId, Grouping> boltInput : boltEntry
    +                            .getValue().get_common().get_inputs()
    +                            .entrySet()) {
    +                        newComp.parents.add(boltInput.getKey()
    +                                .get_componentId());
    +                        if (!all_comp.containsKey(boltInput
    +                                .getKey().get_componentId())) {
    +                            all_comp.put(boltInput.getKey()
    +                                            .get_componentId(),
    +                                    new Component(boltInput.getKey()
    +                                            .get_componentId()));
    +                        }
    +                        all_comp.get(boltInput.getKey()
    +                                .get_componentId()).children.add(boltEntry
    +                                .getKey());
    +                    }
    +                }
    +            }
    +        }
    +        return all_comp;
    +    }
    +
    +    /**
    +     * Gets the on heap memory requirement for a
    +     * certain task within a topology
    +     * @param exec the executor the inquiry is concerning.
    +     * @return Double the amount of on heap memory
    +     * requirement for this exec in topology topoId.
    +     */
    +    public Double getOnHeapMemoryRequirement(ExecutorDetails exec) {
    +        Double ret = null;
    +        if (hasExecInTopo(exec)) {
    +            ret = _resourceList
    +                    .get(exec)
    +                    .get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
    +        }
    +        return ret;
    +    }
    +
    +    /**
    +     * Gets the off heap memory requirement for a
    +     * certain task within a topology
    +     * @param exec the executor the inquiry is concerning.
    +     * @return Double the amount of off heap memory
    +     * requirement for this exec in topology topoId.
    +     */
    +    public Double getOffHeapMemoryRequirement(ExecutorDetails exec) {
    +        Double ret = null;
    +        if (hasExecInTopo(exec)) {
    +            ret = _resourceList
    +                    .get(exec)
    +                    .get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB);
    +        }
    +        return ret;
    +    }
    +
    +    /**
    +     * Gets the total memory requirement for a task
    +     * @param exec the executor the inquiry is concerning.
    +     * @return Double the total memory requirement
    +     *  for this exec in topology topoId.
    +     */
    +    public Double getTotalMemReqTask(ExecutorDetails exec) {
    +        if (hasExecInTopo(exec)) {
    +            return getOffHeapMemoryRequirement(exec)
    +                    + getOnHeapMemoryRequirement(exec);
    +        }
    +        LOG.info("cannot find {}", exec);
    +        return null;
    +    }
    +
    +    /**
    +     * Gets the total memory resource list for a
    +     * set of tasks that is part of a topology.
    +     * @return Map<ExecutorDetails, Double> a map of the total memory requirement
    +     *  for all tasks in topology topoId.
    +     */
    +    public Map<ExecutorDetails, Double> getTotalMemoryResourceList() {
    +        Map<ExecutorDetails, Double> ret = new HashMap<ExecutorDetails, Double>();
    +        for (ExecutorDetails exec : _resourceList.keySet()) {
    +            ret.put(exec, getTotalMemReqTask(exec));
    +        }
    +        return ret;
    +    }
    +
    +    /**
    +     * Get the total CPU requirement for executor
    +     * @param exec
    +     * @return Double the total about of cpu requirement for executor
    +     */
    +    public Double getTotalCpuReqTask(ExecutorDetails exec) {
    +        if (hasExecInTopo(exec)) {
    +            return _resourceList
    +                    .get(exec)
    +                    .get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);
    +        }
    +        LOG.warn("cannot find - {}", exec);
    +        return null;
    +    }
    +
    +    /**
    +     * get the resources requirements for a executor
    +     * @param exec
    +     * @return a map containing the resource requirements for this exec
    +     */
    +    public Map<String, Double> getTaskResourceReqList(ExecutorDetails exec) {
    +        if (hasExecInTopo(exec)) {
    +            return _resourceList.get(exec);
    +        }
    +        LOG.warn("cannot find - {}", exec);
    +        return null;
    +    }
    +
    +    /**
    +     * Checks if a executor is part of this topology
    +     * @return Boolean whether or not a certain ExecutorDetail is included in the _resourceList.
    +     */
    +    public boolean hasExecInTopo(ExecutorDetails exec) {
    +        if (_resourceList != null) { // null is possible if the first constructor of TopologyDetails is used
    +            return _resourceList.containsKey(exec);
    +        } else {
    +            return false;
    +        }
    +    }
    +
    +    /**
    +     * add resource requirements for a executor
    +     * @param exec
    +     * @param resourceList
    +     */
    +    public void addResourcesForExec(ExecutorDetails exec, Map<String, Double> resourceList) {
    +        if (hasExecInTopo(exec)) {
    +            LOG.warn("Executor {} already exists...ResourceList: {}", exec, getTaskResourceReqList(exec));
    +            return;
    +        }
    +        _resourceList.put(exec, resourceList);
    +    }
    +
    +    /**
    +     * Add default resource requirements for a executor
    +     * @param exec
    +     */
    +    public void addDefaultResforExec(ExecutorDetails exec) {
    +        Map<String, Double> defaultResourceList = new HashMap<String, Double>();
    +        defaultResourceList.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT,
    +                        Utils.getDouble(topologyConf.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT), null));
    +        defaultResourceList.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB,
    +                        Utils.getDouble(topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB), null));
    +        defaultResourceList.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB,
    +                        Utils.getDouble(topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB), null));
    +        LOG.warn("Scheduling Executor: {} with memory requirement as onHeap: {} - offHeap: {} " +
    --- End diff --
    
    will change to debug


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

Posted by jerrypeng <gi...@git.apache.org>.
Github user jerrypeng commented on a diff in the pull request:

    https://github.com/apache/storm/pull/746#discussion_r41347349
  
    --- Diff: storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/ResourceAwareStrategy.java ---
    @@ -0,0 +1,478 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package backtype.storm.scheduler.resource.strategies;
    +
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +import java.util.Queue;
    +import java.util.TreeMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import backtype.storm.scheduler.Cluster;
    +import backtype.storm.scheduler.ExecutorDetails;
    +import backtype.storm.scheduler.Topologies;
    +import backtype.storm.scheduler.TopologyDetails;
    +import backtype.storm.scheduler.WorkerSlot;
    +import backtype.storm.scheduler.resource.Component;
    +import backtype.storm.scheduler.resource.RAS_Node;
    +
    +public class ResourceAwareStrategy implements IStrategy {
    +    private Logger LOG = null;
    +    private Topologies _topologies;
    +    private Cluster _cluster;
    +    //Map key is the supervisor id and the value is the corresponding RAS_Node Object 
    +    private Map<String, RAS_Node> _availNodes;
    +    private RAS_Node refNode = null;
    +    /**
    +     * supervisor id -> Node
    +     */
    +    private Map<String, RAS_Node> _nodes;
    +    private Map<String, List<String>> _clusterInfo;
    +
    +    private final double CPU_WEIGHT = 1.0;
    +    private final double MEM_WEIGHT = 1.0;
    +    private final double NETWORK_WEIGHT = 1.0;
    +
    +    public ResourceAwareStrategy(Cluster cluster, Topologies topologies) {
    +        _topologies = topologies;
    +        _cluster = cluster;
    +        _nodes = RAS_Node.getAllNodesFrom(cluster, _topologies);
    +        _availNodes = this.getAvailNodes();
    +        this.LOG = LoggerFactory.getLogger(this.getClass());
    +        _clusterInfo = cluster.getNetworkTopography();
    +        LOG.debug(this.getClusterInfo());
    +    }
    +
    +    //the returned TreeMap keeps the Components sorted
    +    private TreeMap<Integer, List<ExecutorDetails>> getPriorityToExecutorDetailsListMap(
    +            Queue<Component> ordered__Component_list, Collection<ExecutorDetails> unassignedExecutors) {
    +        TreeMap<Integer, List<ExecutorDetails>> retMap = new TreeMap<Integer, List<ExecutorDetails>>();
    +        Integer rank = 0;
    +        for (Component ras_comp : ordered__Component_list) {
    +            retMap.put(rank, new ArrayList<ExecutorDetails>());
    +            for(ExecutorDetails exec : ras_comp.execs) {
    +                if(unassignedExecutors.contains(exec)) {
    +                    retMap.get(rank).add(exec);
    +                }
    +            }
    +            rank++;
    +        }
    +        return retMap;
    +    }
    +
    +    public Map<WorkerSlot, Collection<ExecutorDetails>> schedule(TopologyDetails td) {
    +        if (_availNodes.size() <= 0) {
    +            LOG.warn("No available nodes to schedule tasks on!");
    +            return null;
    +        }
    +        Collection<ExecutorDetails> unassignedExecutors = _cluster.getUnassignedExecutors(td);
    +        Map<WorkerSlot, Collection<ExecutorDetails>> schedulerAssignmentMap = new HashMap<WorkerSlot, Collection<ExecutorDetails>>();
    +        LOG.debug("ExecutorsNeedScheduling: {}", unassignedExecutors);
    +        Collection<ExecutorDetails> scheduledTasks = new ArrayList<ExecutorDetails>();
    +        List<Component> spouts = this.getSpouts(_topologies, td);
    +
    +        if (spouts.size() == 0) {
    +            LOG.error("Cannot find a Spout!");
    +            return null;
    +        }
    +
    +        Queue<Component> ordered__Component_list = bfs(_topologies, td, spouts);
    +
    +        Map<Integer, List<ExecutorDetails>> priorityToExecutorMap = getPriorityToExecutorDetailsListMap(ordered__Component_list, unassignedExecutors);
    +        Collection<ExecutorDetails> executorsNotScheduled = new HashSet<ExecutorDetails>(unassignedExecutors);
    +        Integer longestPriorityListSize = this.getLongestPriorityListSize(priorityToExecutorMap);
    +        //Pick the first executor with priority one, then the 1st exec with priority 2, so on an so forth. 
    +        //Once we reach the last priority, we go back to priority 1 and schedule the second task with priority 1.
    +        for (int i = 0; i < longestPriorityListSize; i++) {
    +            for (Entry<Integer, List<ExecutorDetails>> entry : priorityToExecutorMap.entrySet()) {
    +                Iterator<ExecutorDetails> it = entry.getValue().iterator();
    +                if (it.hasNext()) {
    +                    ExecutorDetails exec = it.next();
    +                    LOG.debug("\n\nAttempting to schedule: {} of component {}[avail {}] with rank {}",
    +                            new Object[] { exec, td.getExecutorToComponent().get(exec),
    +                    td.getTaskResourceReqList(exec), entry.getKey() });
    +                    WorkerSlot targetSlot = this.findWorkerForExec(exec, td, schedulerAssignmentMap);
    +                    if (targetSlot != null) {
    +                        RAS_Node targetNode = this.idToNode(targetSlot.getNodeId());
    +                        if(!schedulerAssignmentMap.containsKey(targetSlot)) {
    +                            schedulerAssignmentMap.put(targetSlot, new LinkedList<ExecutorDetails>());
    +                        }
    +                       
    +                        schedulerAssignmentMap.get(targetSlot).add(exec);
    +                        targetNode.consumeResourcesforTask(exec, td);
    +                        scheduledTasks.add(exec);
    +                        LOG.debug("TASK {} assigned to Node: {} avail [mem: {} cpu: {}] total [mem: {} cpu: {}] on slot: {}", exec,
    +                                targetNode, targetNode.getAvailableMemoryResources(),
    +                                targetNode.getAvailableCpuResources(), targetNode.getTotalMemoryResources(),
    +                                targetNode.getTotalCpuResources(), targetSlot);
    +                    } else {
    +                        LOG.error("Not Enough Resources to schedule Task {}", exec);
    +                    }
    +                    it.remove();
    +                }
    +            }
    +        }
    +
    +        executorsNotScheduled.removeAll(scheduledTasks);
    +        LOG.debug("/* Scheduling left over task (most likely sys tasks) */");
    +        // schedule left over system tasks
    +        for (ExecutorDetails exec : executorsNotScheduled) {
    +            WorkerSlot targetSlot = this.findWorkerForExec(exec, td, schedulerAssignmentMap);
    +            if (targetSlot != null) {
    +                RAS_Node targetNode = this.idToNode(targetSlot.getNodeId());
    +                if(schedulerAssignmentMap.containsKey(targetSlot) == false) {
    +                    schedulerAssignmentMap.put(targetSlot, new LinkedList<ExecutorDetails>());
    +                }
    +               
    +                schedulerAssignmentMap.get(targetSlot).add(exec);
    +                targetNode.consumeResourcesforTask(exec, td);
    +                scheduledTasks.add(exec);
    +                LOG.debug("TASK {} assigned to Node: {} avail [mem: {} cpu: {}] total [mem: {} cpu: {}] on slot: {}", exec,
    +                        targetNode, targetNode.getAvailableMemoryResources(),
    +                        targetNode.getAvailableCpuResources(), targetNode.getTotalMemoryResources(),
    +                        targetNode.getTotalCpuResources(), targetSlot);
    +            } else {
    +                LOG.error("Not Enough Resources to schedule Task {}", exec);
    +            }
    +        }
    +        executorsNotScheduled.removeAll(scheduledTasks);
    +        if (executorsNotScheduled.size() > 0) {
    +            LOG.error("Not all executors successfully scheduled: {}",
    +                    executorsNotScheduled);
    +            schedulerAssignmentMap = null;
    +        } else {
    +            LOG.debug("All resources successfully scheduled!");
    +        }
    +        if (schedulerAssignmentMap == null) {
    +            LOG.error("Topology {} not successfully scheduled!", td.getId());
    +        }
    +        return schedulerAssignmentMap;
    +    }
    +
    +    private WorkerSlot findWorkerForExec(ExecutorDetails exec, TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) {
    +      WorkerSlot ws = null;
    +      // first scheduling
    +      if (this.refNode == null) {
    +          String clus = this.getBestClustering();
    +          ws = this.getBestWorker(exec, td, clus, scheduleAssignmentMap);
    +      } else {
    +          ws = this.getBestWorker(exec, td, scheduleAssignmentMap);
    +      }
    +      if(ws != null) {
    +          this.refNode = this.idToNode(ws.getNodeId());
    +      }
    +      LOG.debug("reference node for the resource aware scheduler is: {}", this.refNode);
    +      return ws;
    +    }
    +
    +    private WorkerSlot getBestWorker(ExecutorDetails exec, TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) {
    +        return this.getBestWorker(exec, td, null, scheduleAssignmentMap);
    +    }
    +
    +    private WorkerSlot getBestWorker(ExecutorDetails exec, TopologyDetails td, String clusterId, Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) {
    +        double taskMem = td.getTotalMemReqTask(exec);
    +        double taskCPU = td.getTotalCpuReqTask(exec);
    +        List<RAS_Node> nodes;
    +        if(clusterId != null) {
    +            nodes = this.getAvailableNodesFromCluster(clusterId);
    +            
    +        } else {
    +            nodes = this.getAvailableNodes();
    +        }
    +        TreeMap<Double, RAS_Node> nodeRankMap = new TreeMap<Double, RAS_Node>();
    --- End diff --
    
    will add comments


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

Posted by jerrypeng <gi...@git.apache.org>.
Github user jerrypeng commented on a diff in the pull request:

    https://github.com/apache/storm/pull/746#discussion_r40732462
  
    --- Diff: storm-core/src/jvm/backtype/storm/StormSubmitter.java ---
    @@ -442,4 +444,36 @@ public static String submitJar(Map conf, String localJar, ProgressListener liste
              */
             public void onCompleted(String srcFile, String targetFile, long totalBytes);
         }
    +    
    +
    +    private static void validateConfs(Map stormConf, StormTopology topology) throws IllegalArgumentException {
    +        LOG.info("Validating storm Confs...");
    --- End diff --
    
    will delete


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/746#discussion_r40837655
  
    --- Diff: storm-core/src/jvm/backtype/storm/networktopography/AbstractDNSToSwitchMapping.java ---
    @@ -0,0 +1,97 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package backtype.storm.networktopography;
    +
    +import java.util.HashSet;
    +import java.util.Map;
    +import java.util.Set;
    +
    +/**
    + * This is a base class for DNS to Switch mappings. <p/> It is not mandatory to
    + * derive {@link DNSToSwitchMapping} implementations from it, but it is strongly
    + * recommended, as it makes it easy for the developers to add new methods
    + * to this base class that are automatically picked up by all implementations.
    + * <p/>
    + *
    + */
    +public abstract class AbstractDNSToSwitchMapping
    +    implements DNSToSwitchMapping {
    +
    +//  private Configuration conf;
    --- End diff --
    
    Lets remove this if it is not used. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/746#discussion_r40837399
  
    --- Diff: storm-core/src/jvm/backtype/storm/StormSubmitter.java ---
    @@ -442,4 +446,35 @@ public static String submitJar(Map conf, String localJar, ProgressListener liste
              */
             public void onCompleted(String srcFile, String targetFile, long totalBytes);
         }
    +    
    +
    +    private static void validateConfs(Map stormConf, StormTopology topology) throws IllegalArgumentException {
    +        double largestMemReq = getMaxExecutorMemoryUsageForTopo(topology, stormConf);
    +        Double topologyWorkerMaxHeapSize = Utils.getDouble(stormConf.get(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB));
    +        if(topologyWorkerMaxHeapSize < largestMemReq) {
    +            throw new IllegalArgumentException("Topology will not be able to be successfully scheduled: Config TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB="
    +                    +Utils.getDouble(stormConf.get(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB)) + " < " 
    +                            + largestMemReq + " (Largest memory requirement of a component in the topology). Perhaps set TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB to a larger amount");
    +        }
    +    }
    +
    +
    --- End diff --
    
    Extra line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/746#discussion_r40732194
  
    --- Diff: storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java ---
    @@ -79,11 +96,304 @@ public StormTopology getTopology() {
                     ret.put(executor, compId);
                 }
             }
    -        
    +
             return ret;
         }
    -    
    +
         public Collection<ExecutorDetails> getExecutors() {
             return this.executorToComponent.keySet();
         }
    +
    +    private void initResourceList() {
    +        _resourceList = new HashMap<ExecutorDetails, Map<String, Double>>();
    +        // Extract bolt memory info
    +        if (this.topology.get_bolts() != null) {
    +            for (Map.Entry<String, Bolt> bolt : this.topology.get_bolts().entrySet()) {
    +                //the json_conf is populated by TopologyBuilder (e.g. boltDeclarer.setMemoryLoad)
    +                Map<String, Double> topology_resources = backtype.storm.scheduler.resource.Utils.parseResources(bolt
    +                        .getValue().get_common().get_json_conf());
    +                backtype.storm.scheduler.resource.Utils.checkIntialization(topology_resources, bolt.getValue().toString(), this.topologyConf);
    +                for (Map.Entry<ExecutorDetails, String> anExecutorToComponent : executorToComponent.entrySet()) {
    +                    if (bolt.getKey().equals(anExecutorToComponent.getValue())) {
    +                        _resourceList.put(anExecutorToComponent.getKey(), topology_resources);
    +                    }
    +                }
    +            }
    +        } else {
    +            LOG.warn("Topology " + topologyId + " does not seem to have any bolts!");
    --- End diff --
    
    Is this really a warning?  I would almost rather see this removed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

Posted by jerrypeng <gi...@git.apache.org>.
Github user jerrypeng commented on the pull request:

    https://github.com/apache/storm/pull/746#issuecomment-144451299
  
    upmerged!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

Posted by jerrypeng <gi...@git.apache.org>.
Github user jerrypeng commented on a diff in the pull request:

    https://github.com/apache/storm/pull/746#discussion_r41347451
  
    --- Diff: storm-core/src/jvm/backtype/storm/utils/Utils.java ---
    @@ -18,10 +18,7 @@
     package backtype.storm.utils;
     
     import backtype.storm.Config;
    -import backtype.storm.generated.AuthorizationException;
    -import backtype.storm.generated.ComponentCommon;
    -import backtype.storm.generated.ComponentObject;
    -import backtype.storm.generated.StormTopology;
    +import backtype.storm.generated.*;
    --- End diff --
    
    will expand


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

Posted by jerrypeng <gi...@git.apache.org>.
Github user jerrypeng commented on a diff in the pull request:

    https://github.com/apache/storm/pull/746#discussion_r40848979
  
    --- Diff: storm-core/src/jvm/backtype/storm/StormSubmitter.java ---
    @@ -442,4 +446,35 @@ public static String submitJar(Map conf, String localJar, ProgressListener liste
              */
             public void onCompleted(String srcFile, String targetFile, long totalBytes);
         }
    +    
    +
    --- End diff --
    
    will remove


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

Posted by jerrypeng <gi...@git.apache.org>.
Github user jerrypeng commented on a diff in the pull request:

    https://github.com/apache/storm/pull/746#discussion_r40849130
  
    --- Diff: storm-core/src/jvm/backtype/storm/networktopography/AbstractDNSToSwitchMapping.java ---
    @@ -0,0 +1,97 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package backtype.storm.networktopography;
    +
    +import java.util.HashSet;
    +import java.util.Map;
    +import java.util.Set;
    +
    +/**
    + * This is a base class for DNS to Switch mappings. <p/> It is not mandatory to
    + * derive {@link DNSToSwitchMapping} implementations from it, but it is strongly
    + * recommended, as it makes it easy for the developers to add new methods
    + * to this base class that are automatically picked up by all implementations.
    + * <p/>
    + *
    + */
    +public abstract class AbstractDNSToSwitchMapping
    +    implements DNSToSwitchMapping {
    +
    +//  private Configuration conf;
    --- End diff --
    
    will remove


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

Posted by jerrypeng <gi...@git.apache.org>.
Github user jerrypeng commented on a diff in the pull request:

    https://github.com/apache/storm/pull/746#discussion_r40850542
  
    --- Diff: storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceUtils.java ---
    @@ -0,0 +1,117 @@
    +package backtype.storm.scheduler.resource;
    +
    +import backtype.storm.Config;
    +import backtype.storm.generated.Bolt;
    +import backtype.storm.generated.SpoutSpec;
    +import backtype.storm.generated.StormTopology;
    +import org.json.simple.JSONObject;
    +import org.json.simple.parser.JSONParser;
    +import org.json.simple.parser.ParseException;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +/**
    + * Created by jerrypeng on 9/22/15.
    + */
    +public class ResourceUtils {
    --- End diff --
    
    will remove


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/746#discussion_r41087631
  
    --- Diff: storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/ResourceAwareStrategy.java ---
    @@ -0,0 +1,478 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package backtype.storm.scheduler.resource.strategies;
    +
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +import java.util.Queue;
    +import java.util.TreeMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import backtype.storm.scheduler.Cluster;
    +import backtype.storm.scheduler.ExecutorDetails;
    +import backtype.storm.scheduler.Topologies;
    +import backtype.storm.scheduler.TopologyDetails;
    +import backtype.storm.scheduler.WorkerSlot;
    +import backtype.storm.scheduler.resource.Component;
    +import backtype.storm.scheduler.resource.RAS_Node;
    +
    +public class ResourceAwareStrategy implements IStrategy {
    +    private Logger LOG = null;
    +    private Topologies _topologies;
    +    private Cluster _cluster;
    +    //Map key is the supervisor id and the value is the corresponding RAS_Node Object 
    +    private Map<String, RAS_Node> _availNodes;
    +    private RAS_Node refNode = null;
    +    /**
    +     * supervisor id -> Node
    +     */
    +    private Map<String, RAS_Node> _nodes;
    +    private Map<String, List<String>> _clusterInfo;
    +
    +    private final double CPU_WEIGHT = 1.0;
    +    private final double MEM_WEIGHT = 1.0;
    +    private final double NETWORK_WEIGHT = 1.0;
    +
    +    public ResourceAwareStrategy(Cluster cluster, Topologies topologies) {
    +        _topologies = topologies;
    +        _cluster = cluster;
    +        _nodes = RAS_Node.getAllNodesFrom(cluster, _topologies);
    +        _availNodes = this.getAvailNodes();
    +        this.LOG = LoggerFactory.getLogger(this.getClass());
    +        _clusterInfo = cluster.getNetworkTopography();
    +        LOG.info(this.getClusterInfo());
    +    }
    +
    +    //the returned TreeMap keeps the Components sorted
    +    private TreeMap<Integer, List<ExecutorDetails>> getPriorityToExecutorDetailsListMap(
    +            Queue<Component> ordered__Component_list, Collection<ExecutorDetails> unassignedExecutors) {
    +        TreeMap<Integer, List<ExecutorDetails>> retMap = new TreeMap<Integer, List<ExecutorDetails>>();
    +        Integer rank = 0;
    +        for (Component ras_comp : ordered__Component_list) {
    +            retMap.put(rank, new ArrayList<ExecutorDetails>());
    +            for(ExecutorDetails exec : ras_comp.execs) {
    +                if(unassignedExecutors.contains(exec)) {
    +                    retMap.get(rank).add(exec);
    +                }
    +            }
    +            rank++;
    +        }
    +        return retMap;
    +    }
    +
    +    public Map<WorkerSlot, Collection<ExecutorDetails>> schedule(TopologyDetails td) {
    +        if (_availNodes.size() <= 0) {
    +            LOG.warn("No available nodes to schedule tasks on!");
    +            return null;
    +        }
    +        Collection<ExecutorDetails> unassignedExecutors = _cluster.getUnassignedExecutors(td);
    +        Map<WorkerSlot, Collection<ExecutorDetails>> schedulerAssignmentMap = new HashMap<WorkerSlot, Collection<ExecutorDetails>>();
    +        LOG.debug("ExecutorsNeedScheduling: {}", unassignedExecutors);
    +        Collection<ExecutorDetails> scheduledTasks = new ArrayList<ExecutorDetails>();
    +        List<Component> spouts = this.getSpouts(_topologies, td);
    +
    +        if (spouts.size() == 0) {
    +            LOG.error("Cannot find a Spout!");
    +            return null;
    +        }
    +
    +        Queue<Component> ordered__Component_list = bfs(_topologies, td, spouts);
    +
    +        Map<Integer, List<ExecutorDetails>> priorityToExecutorMap = getPriorityToExecutorDetailsListMap(ordered__Component_list, unassignedExecutors);
    +        Collection<ExecutorDetails> executorsNotScheduled = new HashSet<ExecutorDetails>(unassignedExecutors);
    +        Integer longestPriorityListSize = this.getLongestPriorityListSize(priorityToExecutorMap);
    +        //Pick the first executor with priority one, then the 1st exec with priority 2, so on an so forth. 
    +        //Once we reach the last priority, we go back to priority 1 and schedule the second task with priority 1.
    +        for (int i = 0; i < longestPriorityListSize; i++) {
    +            for (Entry<Integer, List<ExecutorDetails>> entry : priorityToExecutorMap.entrySet()) {
    +                Iterator<ExecutorDetails> it = entry.getValue().iterator();
    +                if (it.hasNext()) {
    +                    ExecutorDetails exec = it.next();
    +                    LOG.info("\n\nAttempting to schedule: {} of component {}[avail {}] with rank {}",
    --- End diff --
    
    Again I think all of these should be debug messages, we don't need them printed out all the time, but we can configure them on if we are doing debugging.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

Posted by jerrypeng <gi...@git.apache.org>.
Github user jerrypeng commented on a diff in the pull request:

    https://github.com/apache/storm/pull/746#discussion_r40849279
  
    --- Diff: storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java ---
    @@ -79,11 +95,302 @@ public StormTopology getTopology() {
                     ret.put(executor, compId);
                 }
             }
    -        
    +
             return ret;
         }
    -    
    +
         public Collection<ExecutorDetails> getExecutors() {
             return this.executorToComponent.keySet();
         }
    +
    +    private void initResourceList() {
    +        _resourceList = new HashMap<ExecutorDetails, Map<String, Double>>();
    +        // Extract bolt memory info
    +        if (this.topology.get_bolts() != null) {
    +            for (Map.Entry<String, Bolt> bolt : this.topology.get_bolts().entrySet()) {
    +                //the json_conf is populated by TopologyBuilder (e.g. boltDeclarer.setMemoryLoad)
    +                Map<String, Double> topology_resources = ResourceUtils.parseResources(bolt
    +                        .getValue().get_common().get_json_conf());
    +                ResourceUtils.checkIntialization(topology_resources, bolt.getValue().toString(), this.topologyConf);
    +                for (Map.Entry<ExecutorDetails, String> anExecutorToComponent : executorToComponent.entrySet()) {
    +                    if (bolt.getKey().equals(anExecutorToComponent.getValue())) {
    +                        _resourceList.put(anExecutorToComponent.getKey(), topology_resources);
    +                    }
    +                }
    +            }
    +        }
    +        // Extract spout memory info
    +        if (this.topology.get_spouts() != null) {
    +            for (Map.Entry<String, SpoutSpec> spout : this.topology.get_spouts().entrySet()) {
    +                Map<String, Double> topology_resources = ResourceUtils.parseResources(spout
    +                        .getValue().get_common().get_json_conf());
    +                ResourceUtils.checkIntialization(topology_resources, spout.getValue().toString(), this.topologyConf);
    +                for (Map.Entry<ExecutorDetails, String> anExecutorToComponent : executorToComponent.entrySet()) {
    +                    if (spout.getKey().equals(anExecutorToComponent.getValue())) {
    +                        _resourceList.put(anExecutorToComponent.getKey(), topology_resources);
    +                    }
    +                }
    +            }
    +        } else {
    +            LOG.warn("Topology " + topologyId + " does not seem to have any spouts!");
    +        }
    +        //schedule tasks that are not part of components returned from topology.get_spout or topology.getbolt (AKA sys tasks most specifically __acker tasks)
    +        for(ExecutorDetails exec : this.getExecutors()) {
    +            if (_resourceList.containsKey(exec) == false) {
    +                LOG.debug(
    +                        "Scheduling {} {} with memory requirement as 'on heap' - {} and 'off heap' - {} and CPU requirement as {}",
    +                        this.getExecutorToComponent().get(exec),
    +                        exec,
    +                        this.topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB),
    +                        this.topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB),
    +                        this.topologyConf.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT));
    +                this.addDefaultResforExec(exec);
    +            } 
    +        }
    +    }
    +
    +    private List<ExecutorDetails> componentToExecs(String comp) {
    +        List<ExecutorDetails> execs = new ArrayList<>();
    +        for (Map.Entry<ExecutorDetails, String> entry : executorToComponent.entrySet()) {
    +            if (entry.getValue().equals(comp)) {
    +                execs.add(entry.getKey());
    +            }
    +        }
    +        return execs;
    +    }
    +
    +    /**
    +     * Returns a representation of the non-system components of the topology graph
    +     * Each Component object in the returning map is populated with the list of its
    +     * parents, children and execs assigned to that component.
    +     * @return a map of components
    +     */
    +    public Map<String, Component> getComponents() {
    +        Map<String, Component> all_comp = new HashMap<String, Component>();
    +
    +        StormTopology storm_topo = this.topology;
    +        // spouts
    +        if (storm_topo.get_spouts() != null) {
    +            for (Map.Entry<String, SpoutSpec> spoutEntry : storm_topo
    +                    .get_spouts().entrySet()) {
    +                if (!Utils.isSystemId(spoutEntry.getKey())) {
    +                    Component newComp = null;
    +                    if (all_comp.containsKey(spoutEntry.getKey())) {
    +                        newComp = all_comp.get(spoutEntry.getKey());
    +                        newComp.execs = componentToExecs(newComp.id);
    +                    } else {
    +                        newComp = new Component(spoutEntry.getKey());
    +                        newComp.execs = componentToExecs(newComp.id);
    +                        all_comp.put(spoutEntry.getKey(), newComp);
    +                    }
    +                    newComp.type = Component.ComponentType.SPOUT;
    +
    +                    for (Map.Entry<GlobalStreamId, Grouping> spoutInput : spoutEntry
    +                            .getValue().get_common().get_inputs()
    +                            .entrySet()) {
    +                        newComp.parents.add(spoutInput.getKey()
    +                                .get_componentId());
    +                        if (!all_comp.containsKey(spoutInput
    +                                .getKey().get_componentId())) {
    +                            all_comp.put(spoutInput.getKey()
    +                                            .get_componentId(),
    +                                    new Component(spoutInput.getKey()
    +                                            .get_componentId()));
    +                        }
    +                        all_comp.get(spoutInput.getKey()
    +                                .get_componentId()).children.add(spoutEntry
    +                                .getKey());
    +                    }
    +                }
    +            }
    +        }
    +        // bolts
    +        if (storm_topo.get_bolts() != null) {
    +            for (Map.Entry<String, Bolt> boltEntry : storm_topo.get_bolts()
    +                    .entrySet()) {
    +                if (!Utils.isSystemId(boltEntry.getKey())) {
    +                    Component newComp = null;
    +                    if (all_comp.containsKey(boltEntry.getKey())) {
    +                        newComp = all_comp.get(boltEntry.getKey());
    +                        newComp.execs = componentToExecs(newComp.id);
    +                    } else {
    +                        newComp = new Component(boltEntry.getKey());
    +                        newComp.execs = componentToExecs(newComp.id);
    +                        all_comp.put(boltEntry.getKey(), newComp);
    +                    }
    +                    newComp.type = Component.ComponentType.BOLT;
    +
    +                    for (Map.Entry<GlobalStreamId, Grouping> boltInput : boltEntry
    +                            .getValue().get_common().get_inputs()
    +                            .entrySet()) {
    +                        newComp.parents.add(boltInput.getKey()
    +                                .get_componentId());
    +                        if (!all_comp.containsKey(boltInput
    +                                .getKey().get_componentId())) {
    +                            all_comp.put(boltInput.getKey()
    +                                            .get_componentId(),
    +                                    new Component(boltInput.getKey()
    +                                            .get_componentId()));
    +                        }
    +                        all_comp.get(boltInput.getKey()
    +                                .get_componentId()).children.add(boltEntry
    +                                .getKey());
    +                    }
    +                }
    +            }
    +        }
    +        return all_comp;
    +    }
    +
    +    /**
    +     * Gets the on heap memory requirement for a
    +     * certain task within a topology
    +     * @param exec the executor the inquiry is concerning.
    +     * @return Double the amount of on heap memory
    +     * requirement for this exec in topology topoId.
    +     */
    +    public Double getOnHeapMemoryRequirement(ExecutorDetails exec) {
    +        Double ret = null;
    +        if (hasExecInTopo(exec)) {
    +            ret = _resourceList
    +                    .get(exec)
    +                    .get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
    +        }
    +        return ret;
    +    }
    +
    +    /**
    +     * Gets the off heap memory requirement for a
    +     * certain task within a topology
    +     * @param exec the executor the inquiry is concerning.
    +     * @return Double the amount of off heap memory
    +     * requirement for this exec in topology topoId.
    +     */
    +    public Double getOffHeapMemoryRequirement(ExecutorDetails exec) {
    +        Double ret = null;
    +        if (hasExecInTopo(exec)) {
    +            ret = _resourceList
    +                    .get(exec)
    +                    .get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB);
    +        }
    +        return ret;
    +    }
    +
    +    /**
    +     * Gets the total memory requirement for a task
    +     * @param exec the executor the inquiry is concerning.
    +     * @return Double the total memory requirement
    +     *  for this exec in topology topoId.
    +     */
    +    public Double getTotalMemReqTask(ExecutorDetails exec) {
    +        if (hasExecInTopo(exec)) {
    +            return getOffHeapMemoryRequirement(exec)
    +                    + getOnHeapMemoryRequirement(exec);
    +        }
    +        LOG.info("cannot find {}", exec);
    --- End diff --
    
    will remove


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---