You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/07/31 02:07:04 UTC

[05/20] storm git commit: STORM-2497: Let Supervisor enforce memory and add in support for shared memory regions

http://git-wip-us.apache.org/repos/asf/storm/blob/b4d33955/storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java b/storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java
new file mode 100644
index 0000000..575463b
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java
@@ -0,0 +1,573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.storm.Config;
+import org.apache.storm.generated.Bolt;
+import org.apache.storm.generated.ComponentCommon;
+import org.apache.storm.generated.ComponentType;
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.generated.SharedMemory;
+import org.apache.storm.generated.SpoutSpec;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.scheduler.resource.ResourceUtils;
+import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TopologyDetails {
+    private final String topologyId;
+    private final Map<String, Object> topologyConf;
+    private final StormTopology topology;
+    private final Map<ExecutorDetails, String> executorToComponent;
+    private final int numWorkers;
+    //<ExecutorDetails - Task, Map<String - Type of resource, Map<String - type of that resource, Double - amount>>>
+    private Map<ExecutorDetails, Map<String, Double>> resourceList;
+    //Max heap size for a worker used by topology
+    private Double topologyWorkerMaxHeapSize;
+    //topology priority
+    private Integer topologyPriority;
+    //when topology was launched
+    private final int launchTime;
+    private final String owner;
+    private final String topoName;
+
+    private static final Logger LOG = LoggerFactory.getLogger(TopologyDetails.class);
+
+    public TopologyDetails(String topologyId, Map<String, Object> topologyConf, StormTopology topology, int numWorkers, String owner) {
+        this(topologyId, topologyConf, topology,  numWorkers,  null, 0, owner);
+    }
+
+    public TopologyDetails(String topologyId, Map<String, Object> topologyConf, StormTopology topology,
+                           int numWorkers, Map<ExecutorDetails, String> executorToComponents, String owner) {
+        this(topologyId, topologyConf, topology,  numWorkers,  executorToComponents, 0, owner);
+    }
+
+    public TopologyDetails(String topologyId, Map<String, Object> topologyConf, StormTopology topology, int numWorkers,
+                           Map<ExecutorDetails, String> executorToComponents, int launchTime, String owner) {
+        this.owner = owner;
+        this.topologyId = topologyId;
+        this.topologyConf = topologyConf;
+        this.topology = topology;
+        this.numWorkers = numWorkers;
+        this.executorToComponent = new HashMap<>(0);
+        if (executorToComponents != null) {
+            this.executorToComponent.putAll(executorToComponents);
+        }
+        if (topology != null) {
+            initResourceList();
+        }
+        initConfigs();
+        this.launchTime = launchTime;
+        this.topoName = (String) topologyConf.get(Config.TOPOLOGY_NAME);
+    }
+
+    public String getId() {
+        return topologyId;
+    }
+
+    public String getName() {
+        return topoName;
+    }
+
+    public Map<String, Object> getConf() {
+        return topologyConf;
+    }
+
+    public int getNumWorkers() {
+        return numWorkers;
+    }
+
+    public StormTopology getTopology() {
+        return topology;
+    }
+
+    public Map<ExecutorDetails, String> getExecutorToComponent() {
+        return executorToComponent;
+    }
+
+    public Map<ExecutorDetails, String> selectExecutorToComponent(
+        Collection<ExecutorDetails> executors) {
+        Map<ExecutorDetails, String> ret = new HashMap<>(executors.size());
+        for (ExecutorDetails executor : executors) {
+            String compId = executorToComponent.get(executor);
+            if (compId != null) {
+                ret.put(executor, compId);
+            }
+        }
+
+        return ret;
+    }
+
+    public Set<ExecutorDetails> getExecutors() {
+        return executorToComponent.keySet();
+    }
+
+    private void initResourceList() {
+        this.resourceList = new HashMap<>();
+        // Extract bolt memory info
+        if (topology.get_bolts() != null) {
+            for (Map.Entry<String, Bolt> bolt : topology.get_bolts().entrySet()) {
+                //the json_conf is populated by TopologyBuilder (e.g. boltDeclarer.setMemoryLoad)
+                Map<String, Double> topologyResources =
+                    ResourceUtils.parseResources(bolt.getValue().get_common().get_json_conf());
+                ResourceUtils.checkIntialization(topologyResources, bolt.getKey(), topologyConf);
+                for (Map.Entry<ExecutorDetails, String> anExecutorToComponent :
+                    executorToComponent.entrySet()) {
+                    if (bolt.getKey().equals(anExecutorToComponent.getValue())) {
+                        resourceList.put(anExecutorToComponent.getKey(), topologyResources);
+                    }
+                }
+            }
+        }
+        // Extract spout memory info
+        if (topology.get_spouts() != null) {
+            for (Map.Entry<String, SpoutSpec> spout : topology.get_spouts().entrySet()) {
+                Map<String, Double> topologyResources =
+                    ResourceUtils.parseResources(spout.getValue().get_common().get_json_conf());
+                ResourceUtils.checkIntialization(topologyResources, spout.getKey(), this.topologyConf);
+                for (Map.Entry<ExecutorDetails, String> anExecutorToComponent :
+                    executorToComponent.entrySet()) {
+                    if (spout.getKey().equals(anExecutorToComponent.getValue())) {
+                        resourceList.put(anExecutorToComponent.getKey(), topologyResources);
+                    }
+                }
+            }
+        } else {
+            LOG.warn("Topology " + topologyId + " does not seem to have any spouts!");
+        }
+        //schedule tasks that are not part of components returned from topology.get_spout or
+        // topology.getbolt (AKA sys tasks most specifically __acker tasks)
+        for (ExecutorDetails exec : getExecutors()) {
+            if (!resourceList.containsKey(exec)) {
+                LOG.debug(
+                    "Scheduling {} {} with memory requirement as 'on heap' - {} and 'off heap' - {} and "
+                        + "CPU requirement as {}",
+                    getExecutorToComponent().get(exec),
+                    exec,
+                    topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB),
+                    topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB),
+                    topologyConf.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT));
+                addDefaultResforExec(exec);
+            }
+        }
+    }
+
+    private List<ExecutorDetails> componentToExecs(String comp) {
+        List<ExecutorDetails> execs = new ArrayList<>();
+        for (Map.Entry<ExecutorDetails, String> entry : executorToComponent.entrySet()) {
+            if (entry.getValue().equals(comp)) {
+                execs.add(entry.getKey());
+            }
+        }
+        return execs;
+    }
+
+    private Set<String> getInputsTo(ComponentCommon comp) {
+        Set<String> ret = new HashSet<>();
+        for (GlobalStreamId globalId : comp.get_inputs().keySet()) {
+            ret.add(globalId.get_componentId());
+        }
+        return ret;
+    }
+
+    /**
+     * Returns a representation of the non-system components of the topology graph Each Component
+     * object in the returning map is populated with the list of its parents, children and execs
+     * assigned to that component.
+     *
+     * @return a map of components
+     */
+    public Map<String, Component> getComponents() {
+        Map<String, Component> ret = new HashMap<>();
+
+        Map<String, SpoutSpec> spouts = topology.get_spouts();
+        Map<String, Bolt> bolts = topology.get_bolts();
+        //Add in all of the components
+        if (spouts != null) {
+            for (Map.Entry<String, SpoutSpec> entry : spouts.entrySet()) {
+                String compId = entry.getKey();
+                if (!Utils.isSystemId(compId)) {
+                    Component comp = new Component(ComponentType.SPOUT, compId, componentToExecs(compId));
+                    ret.put(compId, comp);
+                }
+            }
+        }
+        if (bolts != null) {
+            for (Map.Entry<String, Bolt> entry : bolts.entrySet()) {
+                String compId = entry.getKey();
+                if (!Utils.isSystemId(compId)) {
+                    Component comp = new Component(ComponentType.BOLT, compId, componentToExecs(compId));
+                    ret.put(compId, comp);
+                }
+            }
+        }
+
+        //Link the components together
+        if (spouts != null) {
+            for (Map.Entry<String, SpoutSpec> entry : spouts.entrySet()) {
+                Component spout = ret.get(entry.getKey());
+                for (String parentId : getInputsTo(entry.getValue().get_common())) {
+                    ret.get(parentId).addChild(spout);
+                }
+            }
+        }
+
+        if (bolts != null) {
+            for (Map.Entry<String, Bolt> entry : bolts.entrySet()) {
+                Component bolt = ret.get(entry.getKey());
+                for (String parentId : getInputsTo(entry.getValue().get_common())) {
+                    ret.get(parentId).addChild(bolt);
+                }
+            }
+        }
+        return ret;
+    }
+
+    /**
+     * Gets the on heap memory requirement for a certain task within a topology.
+     * @param exec the executor the inquiry is concerning.
+     * @return Double the amount of on heap memory requirement for this exec in topology topoId.
+     */
+    public Double getOnHeapMemoryRequirement(ExecutorDetails exec) {
+        Double ret = null;
+        if (hasExecInTopo(exec)) {
+            ret = resourceList
+                    .get(exec)
+                    .get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
+        }
+        return ret;
+    }
+
+    /**
+     * Gets the off heap memory requirement for a certain task within a topology.
+     * @param exec the executor the inquiry is concerning.
+     * @return Double the amount of off heap memory requirement for this exec in topology topoId.
+     */
+    public Double getOffHeapMemoryRequirement(ExecutorDetails exec) {
+        Double ret = null;
+        if (hasExecInTopo(exec)) {
+            ret = resourceList
+                    .get(exec)
+                    .get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB);
+        }
+        return ret;
+    }
+
+    /**
+     * Gets the total memory requirement for a task.
+     * @param exec the executor the inquiry is concerning.
+     * @return Double the total memory requirement for this exec in topology topoId.
+     */
+    public Double getTotalMemReqTask(ExecutorDetails exec) {
+        if (hasExecInTopo(exec)) {
+            return getOffHeapMemoryRequirement(exec)
+                    + getOnHeapMemoryRequirement(exec);
+        }
+        return null;
+    }
+
+    /**
+     * Gets the total memory resource list for a set of tasks that is part of a topology.
+     * @return Map<ExecutorDetails, Double> a map of the total memory requirement for all tasks in topology topoId.
+     */
+    public Map<ExecutorDetails, Double> getTotalMemoryResourceList() {
+        Map<ExecutorDetails, Double> ret = new HashMap<>();
+        for (ExecutorDetails exec : resourceList.keySet()) {
+            ret.put(exec, getTotalMemReqTask(exec));
+        }
+        return ret;
+    }
+
+    public Set<SharedMemory> getSharedMemoryRequests(Collection<ExecutorDetails> executors) {
+        Set<String> components = new HashSet<>();
+        for (ExecutorDetails exec : executors) {
+            String component = executorToComponent.get(exec);
+            if (component != null) {
+                components.add(component);
+            }
+        }
+        Set<SharedMemory> ret = new HashSet<>();
+        if (topology != null) {
+            //topology being null is used for tests  We probably should fix that at some point,
+            // but it is not trivial to do...
+            Map<String, Set<String>> compToSharedName = topology.get_component_to_shared_memory();
+            if (compToSharedName != null) {
+                for (String component : components) {
+                    Set<String> sharedNames = compToSharedName.get(component);
+                    if (sharedNames != null) {
+                        for (String name : sharedNames) {
+                            ret.add(topology.get_shared_memory().get(name));
+                        }
+                    }
+                }
+            }
+        }
+        return ret;
+    }
+
+    /**
+     * Get the total CPU requirement for executor
+     * @return Double the total about of cpu requirement for executor
+     */
+    public Double getTotalCpuReqTask(ExecutorDetails exec) {
+        if (hasExecInTopo(exec)) {
+            return resourceList
+                    .get(exec)
+                    .get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);
+        }
+        return null;
+    }
+
+    /**
+     * Note: The public API relevant to resource aware scheduling is unstable as of May 2015.
+     *       We reserve the right to change them.
+     *
+     * @return the total on-heap memory requested for this topology
+     */
+    public double getTotalRequestedMemOnHeap() {
+        return getRequestedSharedOnHeap() + getRequestedNonSharedOnHeap();
+    }
+
+    public double getRequestedSharedOnHeap() {
+        double ret = 0.0;
+        if (topology.is_set_shared_memory()) {
+            for (SharedMemory req : topology.get_shared_memory().values()) {
+                ret += req.get_on_heap();
+            }
+        }
+        return ret;
+    }
+
+    public double getRequestedNonSharedOnHeap() {
+        double totalMemOnHeap = 0.0;
+        for (ExecutorDetails exec : this.getExecutors()) {
+            Double execMem = getOnHeapMemoryRequirement(exec);
+            if (execMem != null) {
+                totalMemOnHeap += execMem;
+            }
+        }
+        return totalMemOnHeap;
+    }
+
+    /**
+     * Note: The public API relevant to resource aware scheduling is unstable as of May 2015.
+     *       We reserve the right to change them.
+     *
+     * @return the total off-heap memory requested for this topology
+     */
+    public double getTotalRequestedMemOffHeap() {
+        return getRequestedNonSharedOffHeap() + getRequestedSharedOffHeap();
+    }
+
+    public double getRequestedNonSharedOffHeap() {
+        double totalMemOffHeap = 0.0;
+        for (ExecutorDetails exec : this.getExecutors()) {
+            Double execMem = getOffHeapMemoryRequirement(exec);
+            if (execMem != null) {
+                totalMemOffHeap += execMem;
+            }
+        }
+        return totalMemOffHeap;
+    }
+
+    public double getRequestedSharedOffHeap() {
+        double ret = 0.0;
+        if (topology.is_set_shared_memory()) {
+            for (SharedMemory req : topology.get_shared_memory().values()) {
+                ret += req.get_off_heap_worker() + req.get_off_heap_node();
+            }
+        }
+        return ret;
+    }
+
+    /**
+     * Note: The public API relevant to resource aware scheduling is unstable as of May 2015.
+     *       We reserve the right to change them.
+     *
+     * @return the total cpu requested for this topology
+     */
+    public double getTotalRequestedCpu() {
+        double totalCpu = 0.0;
+        for (ExecutorDetails exec : this.getExecutors()) {
+            Double execCpu = getTotalCpuReqTask(exec);
+            if (execCpu != null) {
+                totalCpu += execCpu;
+            }
+        }
+        return totalCpu;
+    }
+
+    /**
+     * get the resources requirements for a executor
+     * @param exec
+     * @return a map containing the resource requirements for this exec
+     */
+    public Map<String, Double> getTaskResourceReqList(ExecutorDetails exec) {
+        if (hasExecInTopo(exec)) {
+            return resourceList.get(exec);
+        }
+        return null;
+    }
+
+    /**
+     * Checks if a executor is part of this topology
+     * @return Boolean whether or not a certain ExecutorDetail is included in the resourceList.
+     */
+    public boolean hasExecInTopo(ExecutorDetails exec) {
+        return resourceList != null && resourceList.containsKey(exec);
+    }
+
+    /**
+     * add resource requirements for a executor
+     */
+    public void addResourcesForExec(ExecutorDetails exec, Map<String, Double> resourceList) {
+        if (hasExecInTopo(exec)) {
+            LOG.warn("Executor {} already exists...ResourceList: {}", exec, getTaskResourceReqList(exec));
+            return;
+        }
+        this.resourceList.put(exec, resourceList);
+    }
+
+    /**
+     * Add default resource requirements for a executor
+     */
+    private void addDefaultResforExec(ExecutorDetails exec) {
+        Double topologyComponentCpuPcorePercent =
+            ObjectReader.getDouble(
+                topologyConf.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT), null);
+        Double topologyComponentResourcesOffheapMemoryMb =
+            ObjectReader.getDouble(
+                topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB), null);
+        Double topologyComponentResourcesOnheapMemoryMb =
+            ObjectReader.getDouble(
+                topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB), null);
+
+        assert topologyComponentCpuPcorePercent != null;
+        assert topologyComponentResourcesOffheapMemoryMb != null;
+        assert topologyComponentResourcesOnheapMemoryMb != null;
+
+        Map<String, Double> defaultResourceList = new HashMap<>();
+        defaultResourceList.put(
+            Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, topologyComponentCpuPcorePercent);
+        defaultResourceList.put(
+            Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB,
+            topologyComponentResourcesOffheapMemoryMb);
+        defaultResourceList.put(
+            Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB,
+            topologyComponentResourcesOnheapMemoryMb);
+        LOG.debug(
+            "Scheduling Executor: {} with memory requirement as onHeap: {} - offHeap: {} "
+                + "and CPU requirement: {}",
+            exec,
+            topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB),
+            topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB),
+            topologyConf.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT));
+        addResourcesForExec(exec, defaultResourceList);
+    }
+
+    /**
+     * initializes member variables
+     */
+    private void initConfigs() {
+        this.topologyWorkerMaxHeapSize =
+            ObjectReader.getDouble(
+                topologyConf.get(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB), null);
+        this.topologyPriority =
+            ObjectReader.getInt(topologyConf.get(Config.TOPOLOGY_PRIORITY), null);
+
+        assert this.topologyWorkerMaxHeapSize != null;
+        assert this.topologyPriority != null;
+    }
+
+    /**
+     * Get the max heap size for a worker used by this topology
+     * @return the worker max heap size
+     */
+    public Double getTopologyWorkerMaxHeapSize() {
+        return topologyWorkerMaxHeapSize;
+    }
+
+    /**
+     * Get the user that submitted this topology
+     */
+    public String getTopologySubmitter() {
+        return owner;
+    }
+
+    /**
+     * get the priority of this topology
+     */
+    public int getTopologyPriority() {
+        return topologyPriority;
+    }
+
+    /**
+     * Get the timestamp of when this topology was launched
+     */
+    public int getLaunchTime() {
+        return launchTime;
+    }
+
+    /**
+     * Get how long this topology has been executing
+     */
+    public int getUpTime() {
+        return Time.currentTimeSecs() - launchTime;
+    }
+
+    @Override
+    public String toString() {
+        return "Name: "
+            + getName()
+            + " id: "
+            + getId()
+            + " Priority: "
+            + getTopologyPriority()
+            + " Uptime: "
+            + getUpTime()
+            + " CPU: "
+            + getTotalRequestedCpu()
+            + " Memory: "
+            + (getTotalRequestedMemOffHeap() + getTotalRequestedMemOnHeap());
+    }
+
+    @Override
+    public int hashCode() {
+        return topologyId.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof TopologyDetails)) {
+            return false;
+        }
+        return (topologyId.equals(((TopologyDetails) o).getId()));
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/b4d33955/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
new file mode 100644
index 0000000..5603541
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
@@ -0,0 +1,465 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.storm.scheduler.Cluster;
+import org.apache.storm.scheduler.ExecutorDetails;
+import org.apache.storm.scheduler.SupervisorDetails;
+import org.apache.storm.scheduler.TopologyDetails;
+import org.apache.storm.scheduler.WorkerSlot;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Represents a single node in the cluster. */
+public class RAS_Node {
+    private static final Logger LOG = LoggerFactory.getLogger(RAS_Node.class);
+
+    //A map consisting of all workers on the node.
+    //The key of the map is the worker id and the value is the corresponding workerslot object
+    private Map<String, WorkerSlot> slots = new HashMap<>();
+
+    // A map describing which topologies are using which slots on this node.  The format of the map is the following:
+    // {TopologyId -> {WorkerId -> {Executors}}}
+    private Map<String, Map<String, Collection<ExecutorDetails>>> topIdToUsedSlots = new HashMap<>();
+
+    private final double totalMemory;
+    private final double totalCpu;
+    private final String nodeId;
+    private String hostname;
+    private boolean isAlive;
+    private SupervisorDetails sup;
+    private final Cluster cluster;
+    private final Set<WorkerSlot> originallyFreeSlots;
+
+    public RAS_Node(
+        String nodeId,
+        SupervisorDetails sup,
+        Cluster cluster,
+        Map<String, WorkerSlot> workerIdToWorker,
+        Map<String, Map<String, Collection<ExecutorDetails>>> assignmentMap) {
+        //Node ID and supervisor ID are the same.
+        this.nodeId = nodeId;
+        if (sup == null) {
+            isAlive = false;
+        } else {
+            isAlive = !cluster.isBlackListed(this.nodeId);
+        }
+
+        this.cluster = cluster;
+
+        // initialize slots for this node
+        if (workerIdToWorker != null) {
+            slots = workerIdToWorker;
+        }
+
+        //initialize assignment map
+        if (assignmentMap != null) {
+            topIdToUsedSlots = assignmentMap;
+        }
+
+        //check if node is alive
+        if (isAlive && sup != null) {
+            hostname = sup.getHost();
+            this.sup = sup;
+        }
+
+        totalMemory = isAlive ? getTotalMemoryResources() : 0.0;
+        totalCpu = isAlive ? getTotalCpuResources() : 0.0;
+        HashSet<String> freeById = new HashSet<>(slots.keySet());
+        if (assignmentMap != null) {
+            for (Map<String, Collection<ExecutorDetails>> assignment : assignmentMap.values()) {
+                freeById.removeAll(assignment.keySet());
+            }
+        }
+        originallyFreeSlots = new HashSet<>();
+        for (WorkerSlot slot : slots.values()) {
+            if (freeById.contains(slot.getId())) {
+                originallyFreeSlots.add(slot);
+            }
+        }
+    }
+
+    public String getId() {
+        return nodeId;
+    }
+
+    public String getHostname() {
+        return hostname;
+    }
+
+    private Collection<WorkerSlot> workerIdsToWorkers(Collection<String> workerIds) {
+        Collection<WorkerSlot> ret = new LinkedList<WorkerSlot>();
+        for (String workerId : workerIds) {
+            ret.add(slots.get(workerId));
+        }
+        return ret;
+    }
+
+    public Collection<String> getFreeSlotsId() {
+        if (!isAlive) {
+            return new HashSet<String>();
+        }
+        Collection<String> usedSlotsId = getUsedSlotsId();
+        Set<String> ret = new HashSet<>();
+        ret.addAll(slots.keySet());
+        ret.removeAll(usedSlotsId);
+        return ret;
+    }
+
+    public Collection<WorkerSlot> getSlotsAvailbleTo(TopologyDetails td) {
+        //Try to reuse a slot if possible....
+        HashSet<WorkerSlot> ret = new HashSet<>();
+        Map<String, Collection<ExecutorDetails>> assigned = topIdToUsedSlots.get(td.getId());
+        if (assigned != null) {
+            ret.addAll(workerIdsToWorkers(assigned.keySet()));
+        }
+        ret.addAll(getFreeSlots());
+        ret.retainAll(
+            originallyFreeSlots); //RAS does not let you move things or modify existing assignments
+        return ret;
+    }
+
+    public Collection<WorkerSlot> getFreeSlots() {
+        return workerIdsToWorkers(getFreeSlotsId());
+    }
+
+    private Collection<String> getUsedSlotsId() {
+        Collection<String> ret = new LinkedList<String>();
+        for (Map<String, Collection<ExecutorDetails>> entry : topIdToUsedSlots.values()) {
+            ret.addAll(entry.keySet());
+        }
+        return ret;
+    }
+
+    public Collection<WorkerSlot> getUsedSlots() {
+        return workerIdsToWorkers(getUsedSlotsId());
+    }
+
+    public Collection<WorkerSlot> getUsedSlots(String topId) {
+        Collection<WorkerSlot> ret = null;
+        if (topIdToUsedSlots.get(topId) != null) {
+            ret = workerIdsToWorkers(topIdToUsedSlots.get(topId).keySet());
+        }
+        return ret;
+    }
+
+    public boolean isAlive() {
+        return isAlive;
+    }
+
+    /** Get a collection of the topology ids currently running on this node. */
+    public Collection<String> getRunningTopologies() {
+        return topIdToUsedSlots.keySet();
+    }
+
+    public boolean isTotallyFree() {
+        return getUsedSlots().isEmpty();
+    }
+
+    public int totalSlotsFree() {
+        return getFreeSlots().size();
+    }
+
+    public int totalSlotsUsed() {
+        return getUsedSlots().size();
+    }
+
+    public int totalSlotsUsed(String topId) {
+        return getUsedSlots(topId).size();
+    }
+
+    public int totalSlots() {
+        return slots.size();
+    }
+
+    /** Free all slots on this node. This will update the Cluster too. */
+    public void freeAllSlots() {
+        if (!isAlive) {
+            LOG.warn("Freeing all slots on a dead node {} ", nodeId);
+        }
+        cluster.freeSlots(slots.values());
+        //clearing assignments
+        topIdToUsedSlots.clear();
+    }
+
+    /**
+     * frees a single executor.
+     *
+     * @param exec is the executor to free
+     * @param topo the topology the executor is a part of
+     */
+    public void freeSingleExecutor(ExecutorDetails exec, TopologyDetails topo) {
+        Map<String, Collection<ExecutorDetails>> usedSlots = topIdToUsedSlots.get(topo.getId());
+        if (usedSlots == null) {
+            throw new IllegalArgumentException("Topology " + topo + " is not assigned");
+        }
+        WorkerSlot ws = null;
+        Set<ExecutorDetails> updatedAssignment = new HashSet<>();
+        for (Entry<String, Collection<ExecutorDetails>> entry : usedSlots.entrySet()) {
+            if (entry.getValue().contains(exec)) {
+                ws = slots.get(entry.getKey());
+                updatedAssignment.addAll(entry.getValue());
+                updatedAssignment.remove(exec);
+                break;
+            }
+        }
+
+        if (ws == null) {
+            throw new IllegalArgumentException(
+                "Executor " + exec + " is not assinged on this node to " + topo);
+        }
+        free(ws);
+        if (!updatedAssignment.isEmpty()) {
+            assign(ws, topo, updatedAssignment);
+        }
+    }
+
+    /**
+     * Frees a single slot in this node.
+     *
+     * @param ws the slot to free
+     */
+    public void free(WorkerSlot ws) {
+        LOG.debug("freeing WorkerSlot {} on node {}", ws, hostname);
+        if (!slots.containsKey(ws.getId())) {
+            throw new IllegalArgumentException(
+                "Tried to free a slot " + ws + " that was not" + " part of this node " + nodeId);
+        }
+
+        TopologyDetails topo = findTopologyUsingWorker(ws);
+        if (topo == null) {
+            throw new IllegalArgumentException("Tried to free a slot " + ws + " that was already free!");
+        }
+
+        //free slot
+        cluster.freeSlot(ws);
+        //cleanup internal assignments
+        topIdToUsedSlots.get(topo.getId()).remove(ws.getId());
+    }
+
+    /**
+     * Find a which topology is running on a worker slot.
+     *
+     * @return the topology using the worker slot. If worker slot is free then return null
+     */
+    private TopologyDetails findTopologyUsingWorker(WorkerSlot ws) {
+        for (Entry<String, Map<String, Collection<ExecutorDetails>>> entry :
+            topIdToUsedSlots.entrySet()) {
+            String topoId = entry.getKey();
+            Set<String> workerIds = entry.getValue().keySet();
+            for (String workerId : workerIds) {
+                if (ws.getId().equals(workerId)) {
+                    return cluster.getTopologies().getById(topoId);
+                }
+            }
+        }
+        return null;
+    }
+
+    /**
+     * Assigns a worker to a node.
+     *
+     * @param target the worker slot to assign the executors
+     * @param td the topology the executors are from
+     * @param executors executors to assign to the specified worker slot
+     */
+    public void assign(WorkerSlot target, TopologyDetails td, Collection<ExecutorDetails> executors) {
+        if (!isAlive) {
+            throw new IllegalStateException("Trying to adding to a dead node " + nodeId);
+        }
+        Collection<WorkerSlot> freeSlots = getFreeSlots();
+        if (freeSlots.isEmpty()) {
+            throw new IllegalStateException("Trying to assign to a full node " + nodeId);
+        }
+        if (executors.size() == 0) {
+            LOG.warn("Trying to assign nothing from " + td.getId() + " to " + nodeId + " (Ignored)");
+        }
+        if (target == null) {
+            target = getFreeSlots().iterator().next();
+        }
+        if (!freeSlots.contains(target)) {
+            throw new IllegalStateException(
+                "Trying to assign already used slot " + target.getPort() + " on node " + nodeId);
+        }
+        LOG.debug("target slot: {}", target);
+
+        cluster.assign(target, td.getId(), executors);
+
+        //assigning internally
+        if (!topIdToUsedSlots.containsKey(td.getId())) {
+            topIdToUsedSlots.put(td.getId(), new HashMap<String, Collection<ExecutorDetails>>());
+        }
+
+        if (!topIdToUsedSlots.get(td.getId()).containsKey(target.getId())) {
+            topIdToUsedSlots.get(td.getId()).put(target.getId(), new LinkedList<ExecutorDetails>());
+        }
+        topIdToUsedSlots.get(td.getId()).get(target.getId()).addAll(executors);
+    }
+
+    public void assignSingleExecutor(WorkerSlot ws, ExecutorDetails exec, TopologyDetails td) {
+        if (!isAlive) {
+            throw new IllegalStateException("Trying to adding to a dead node " + nodeId);
+        }
+        Collection<WorkerSlot> freeSlots = getFreeSlots();
+        Set<ExecutorDetails> toAssign = new HashSet<>();
+        toAssign.add(exec);
+        if (!freeSlots.contains(ws)) {
+            Map<String, Collection<ExecutorDetails>> usedSlots = topIdToUsedSlots.get(td.getId());
+            if (usedSlots == null) {
+                throw new IllegalArgumentException(
+                    "Slot " + ws + " is not availble to schedue " + exec + " on");
+            }
+            Collection<ExecutorDetails> alreadyHere = usedSlots.get(ws.getId());
+            if (alreadyHere == null) {
+                throw new IllegalArgumentException(
+                    "Slot " + ws + " is not availble to schedue " + exec + " on");
+            }
+            toAssign.addAll(alreadyHere);
+            free(ws);
+        }
+        assign(ws, td, toAssign);
+    }
+
+    /**
+     * Would scheduling exec in ws fit with the current resource constraints.
+     *
+     * @param ws the slot to possibly put exec in
+     * @param exec the executor to possibly place in ws
+     * @param td the topology exec is a part of
+     * @return true if it would fit else false
+     */
+    public boolean wouldFit(WorkerSlot ws, ExecutorDetails exec, TopologyDetails td) {
+        if (!nodeId.equals(ws.getNodeId())) {
+            throw new IllegalStateException("Slot " + ws + " is not a part of this node " + nodeId);
+        }
+        return isAlive
+            && cluster.wouldFit(
+            ws,
+            exec,
+            td,
+            td.getTopologyWorkerMaxHeapSize(),
+            getAvailableMemoryResources(),
+            getAvailableCpuResources());
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (other instanceof RAS_Node) {
+            return nodeId.equals(((RAS_Node) other).nodeId);
+        }
+        return false;
+    }
+
+    @Override
+    public int hashCode() {
+        return nodeId.hashCode();
+    }
+
+    @Override
+    public String toString() {
+        return "{Node: "
+            + ((sup == null) ? "null (possibly down)" : sup.getHost())
+            + ", Avail [ Mem: "
+            + getAvailableMemoryResources()
+            + ", CPU: "
+            + getAvailableCpuResources()
+            + ", Slots: "
+            + this.getFreeSlots()
+            + "] Total [ Mem: "
+            + ((sup == null) ? "N/A" : this.getTotalMemoryResources())
+            + ", CPU: "
+            + ((sup == null) ? "N/A" : this.getTotalCpuResources())
+            + ", Slots: "
+            + this.slots.values()
+            + " ]}";
+    }
+
+    public static int countFreeSlotsAlive(Collection<RAS_Node> nodes) {
+        int total = 0;
+        for (RAS_Node n : nodes) {
+            if (n.isAlive()) {
+                total += n.totalSlotsFree();
+            }
+        }
+        return total;
+    }
+
+    public static int countTotalSlotsAlive(Collection<RAS_Node> nodes) {
+        int total = 0;
+        for (RAS_Node n : nodes) {
+            if (n.isAlive()) {
+                total += n.totalSlots();
+            }
+        }
+        return total;
+    }
+
+    /**
+     * Gets the available memory resources for this node.
+     *
+     * @return the available memory for this node
+     */
+    public Double getAvailableMemoryResources() {
+        double used = cluster.getScheduledMemoryForNode(nodeId);
+        return totalMemory - used;
+    }
+
+    /**
+     * Gets the total memory resources for this node.
+     *
+     * @return the total memory for this node
+     */
+    public Double getTotalMemoryResources() {
+        if (sup != null) {
+            return sup.getTotalMemory();
+        } else {
+            return 0.0;
+        }
+    }
+
+    /**
+     * Gets the available cpu resources for this node.
+     *
+     * @return the available cpu for this node
+     */
+    public double getAvailableCpuResources() {
+        return totalCpu - cluster.getScheduledCpuForNode(nodeId);
+    }
+
+    /**
+     * Gets the total cpu resources for this node.
+     *
+     * @return the total cpu for this node
+     */
+    public Double getTotalCpuResources() {
+        if (sup != null) {
+            return sup.getTotalCPU();
+        } else {
+            return 0.0;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/b4d33955/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Nodes.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Nodes.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Nodes.java
new file mode 100644
index 0000000..6c70ff1
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Nodes.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+
+import org.apache.storm.scheduler.Cluster;
+import org.apache.storm.scheduler.ExecutorDetails;
+import org.apache.storm.scheduler.SchedulerAssignment;
+import org.apache.storm.scheduler.SupervisorDetails;
+import org.apache.storm.scheduler.WorkerSlot;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RAS_Nodes {
+
+    private Map<String, RAS_Node> nodeMap;
+
+    private static final Logger LOG = LoggerFactory.getLogger(RAS_Nodes.class);
+
+    public RAS_Nodes(Cluster cluster) {
+        this.nodeMap = getAllNodesFrom(cluster);
+    }
+
+    public static Map<String, RAS_Node> getAllNodesFrom(Cluster cluster) {
+
+        //A map of node ids to node objects
+        Map<String, RAS_Node> nodeIdToNode = new HashMap<>();
+        //A map of assignments organized by node with the following format:
+        //{nodeId -> {topologyId -> {workerId -> {execs}}}}
+        Map<String, Map<String, Map<String, Collection<ExecutorDetails>>>> assignmentRelationshipMap = new HashMap<>();
+
+        Map<String, Map<String, WorkerSlot>> workerIdToWorker = new HashMap<>();
+        for (SchedulerAssignment assignment : cluster.getAssignments().values()) {
+            String topId = assignment.getTopologyId();
+
+            for (Map.Entry<WorkerSlot, Collection<ExecutorDetails>> entry :
+                assignment.getSlotToExecutors().entrySet()) {
+                WorkerSlot slot = entry.getKey();
+                String nodeId = slot.getNodeId();
+                Collection<ExecutorDetails> execs = entry.getValue();
+                if (!assignmentRelationshipMap.containsKey(nodeId)) {
+                    assignmentRelationshipMap.put(
+                        nodeId, new HashMap<String, Map<String, Collection<ExecutorDetails>>>());
+                    workerIdToWorker.put(nodeId, new HashMap<String, WorkerSlot>());
+                }
+                workerIdToWorker.get(nodeId).put(slot.getId(), slot);
+                if (!assignmentRelationshipMap.get(nodeId).containsKey(topId)) {
+                    assignmentRelationshipMap
+                        .get(nodeId)
+                        .put(topId, new HashMap<String, Collection<ExecutorDetails>>());
+                }
+                if (!assignmentRelationshipMap.get(nodeId).get(topId).containsKey(slot.getId())) {
+                    assignmentRelationshipMap
+                        .get(nodeId)
+                        .get(topId)
+                        .put(slot.getId(), new LinkedList<ExecutorDetails>());
+                }
+                assignmentRelationshipMap.get(nodeId).get(topId).get(slot.getId()).addAll(execs);
+            }
+        }
+
+        for (SupervisorDetails sup : cluster.getSupervisors().values()) {
+            //Initialize a worker slot for every port even if there is no assignment to it
+            for (int port : sup.getAllPorts()) {
+                WorkerSlot worker = new WorkerSlot(sup.getId(), port);
+                if (!workerIdToWorker.containsKey(sup.getId())) {
+                    workerIdToWorker.put(sup.getId(), new HashMap<String, WorkerSlot>());
+                }
+                if (!workerIdToWorker.get(sup.getId()).containsKey(worker.getId())) {
+                    workerIdToWorker.get(sup.getId()).put(worker.getId(), worker);
+                }
+            }
+            nodeIdToNode.put(
+                sup.getId(),
+                new RAS_Node(
+                    sup.getId(),
+                    sup,
+                    cluster,
+                    workerIdToWorker.get(sup.getId()),
+                    assignmentRelationshipMap.get(sup.getId())));
+        }
+
+        //Add in supervisors that might have crashed but workers are still alive
+        for (Map.Entry<String, Map<String, Map<String, Collection<ExecutorDetails>>>> entry :
+            assignmentRelationshipMap.entrySet()) {
+            String nodeId = entry.getKey();
+            Map<String, Map<String, Collection<ExecutorDetails>>> assignments = entry.getValue();
+            if (!nodeIdToNode.containsKey(nodeId)) {
+                LOG.info(
+                    "Found an assigned slot(s) on a dead supervisor {} with assignments {}",
+                    nodeId,
+                    assignments);
+                nodeIdToNode.put(
+                    nodeId, new RAS_Node(nodeId, null, cluster, workerIdToWorker.get(nodeId), assignments));
+            }
+        }
+        return nodeIdToNode;
+    }
+
+    /** get node object from nodeId. */
+    public RAS_Node getNodeById(String nodeId) {
+        return this.nodeMap.get(nodeId);
+    }
+
+    /**
+     * Free everything on the given slots.
+     * @param workerSlots the slots to free
+     */
+    public void freeSlots(Collection<WorkerSlot> workerSlots) {
+        for (RAS_Node node : nodeMap.values()) {
+            for (WorkerSlot ws : node.getUsedSlots()) {
+                if (workerSlots.contains(ws)) {
+                    LOG.debug("freeing ws {} on node {}", ws, node);
+                    node.free(ws);
+                }
+            }
+        }
+    }
+
+    public Collection<RAS_Node> getNodes() {
+        return this.nodeMap.values();
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder ret = new StringBuilder();
+        for (RAS_Node node : nodeMap.values()) {
+            ret.append(node).append("\n");
+        }
+        return ret.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/b4d33955/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
index f92db8a..83231cd 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
@@ -18,44 +18,37 @@
 
 package org.apache.storm.scheduler.resource;
 
+import java.util.HashMap;
+import java.util.Map;
+
 import org.apache.storm.Config;
 import org.apache.storm.DaemonConfig;
+import org.apache.storm.scheduler.Cluster;
+import org.apache.storm.scheduler.IScheduler;
+import org.apache.storm.scheduler.SingleTopologyCluster;
+import org.apache.storm.scheduler.Topologies;
+import org.apache.storm.scheduler.TopologyDetails;
 import org.apache.storm.scheduler.resource.strategies.eviction.IEvictionStrategy;
 import org.apache.storm.scheduler.resource.strategies.priority.ISchedulingPriorityStrategy;
 import org.apache.storm.scheduler.resource.strategies.scheduling.IStrategy;
-import org.apache.storm.utils.Utils;
 import org.apache.storm.utils.ReflectionUtils;
+import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.storm.scheduler.Cluster;
-import org.apache.storm.scheduler.ExecutorDetails;
-import org.apache.storm.scheduler.IScheduler;
-import org.apache.storm.scheduler.Topologies;
-import org.apache.storm.scheduler.TopologyDetails;
-import org.apache.storm.scheduler.WorkerSlot;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
 public class ResourceAwareScheduler implements IScheduler {
-
-    // Object that holds the current scheduling state
-    private SchedulingState schedulingState;
-
-    @SuppressWarnings("rawtypes")
+    private static final Logger LOG = LoggerFactory.getLogger(ResourceAwareScheduler.class);
     private Map<String, Object> conf;
-
-    private static final Logger LOG = LoggerFactory
-            .getLogger(ResourceAwareScheduler.class);
+    private ISchedulingPriorityStrategy schedulingPrioritystrategy;
+    private IEvictionStrategy evictionStrategy;
 
     @Override
     public void prepare(Map<String, Object> conf) {
         this.conf = conf;
-
+        schedulingPrioritystrategy = (ISchedulingPriorityStrategy) ReflectionUtils.newInstance(
+            (String) conf.get(DaemonConfig.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY));
+        evictionStrategy = (IEvictionStrategy) ReflectionUtils.newInstance(
+            (String) conf.get(DaemonConfig.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY));
     }
 
     @Override
@@ -65,296 +58,129 @@ public class ResourceAwareScheduler implements IScheduler {
 
     @Override
     public void schedule(Topologies topologies, Cluster cluster) {
-        LOG.debug("\n\n\nRerunning ResourceAwareScheduler...");
         //initialize data structures
-        initialize(topologies, cluster);
-        //logs everything that is currently scheduled and the location at which they are scheduled
-        LOG.info("Cluster scheduling:\n{}", ResourceUtils.printScheduling(cluster, topologies));
-        //logs the resources available/used for every node
-        LOG.info("Nodes:\n{}", this.schedulingState.nodes);
-        //logs the detailed info about each user
-        for (User user : getUserMap().values()) {
-            LOG.info(user.getDetailedInfo());
+        for (TopologyDetails td : cluster.getTopologies()) {
+            if (!cluster.needsSchedulingRas(td)) {
+                //cluster forgets about its previous status, so if it is scheduled just leave it.
+                cluster.setStatusIfAbsent(td.getId(), "Fully Scheduled");
+            }
         }
+        Map<String, User> userMap = getUsers(cluster);
 
-        ISchedulingPriorityStrategy schedulingPrioritystrategy = null;
         while (true) {
-
-            if (schedulingPrioritystrategy == null) {
-                try {
-                    schedulingPrioritystrategy = (ISchedulingPriorityStrategy) ReflectionUtils.newInstance((String) this.conf.get(DaemonConfig.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY));
-                } catch (RuntimeException ex) {
-                    LOG.error(String.format("failed to create instance of priority strategy: %s with error: %s! No topologies will be scheduled.",
-                                    this.conf.get(DaemonConfig.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY), ex.getMessage()), ex);
-                    break;
-                }
-            }
             TopologyDetails td;
             try {
-                //need to re prepare since scheduling state might have been restored
-                schedulingPrioritystrategy.prepare(this.schedulingState);
                 //Call scheduling priority strategy
-                td = schedulingPrioritystrategy.getNextTopologyToSchedule();
+                td = schedulingPrioritystrategy.getNextTopologyToSchedule(cluster, userMap);
             } catch (Exception ex) {
-                LOG.error(String.format("Exception thrown when running priority strategy %s. No topologies will be scheduled! Error: %s"
-                        , schedulingPrioritystrategy.getClass().getName(), ex.getMessage()), ex.getStackTrace());
+                LOG.error("Exception thrown when running priority strategy {}. No topologies will be scheduled!",
+                    schedulingPrioritystrategy.getClass().getName(), ex);
                 break;
             }
             if (td == null) {
                 break;
             }
-            scheduleTopology(td);
-
-            LOG.debug("Nodes after scheduling:\n{}", this.schedulingState.nodes);
+            User submitter = userMap.get(td.getTopologySubmitter());
+            if (cluster.needsSchedulingRas(td)) {
+                scheduleTopology(td, cluster, submitter, userMap);
+            } else {
+                LOG.warn("Topology {} is already fully scheduled!", td.getName());
+                cluster.setStatusIfAbsent(td.getId(), "Fully Scheduled");
+            }
         }
-
-        //update changes to cluster
-        updateChanges(cluster, topologies);
-    }
-
-    private void updateChanges(Cluster cluster, Topologies topologies) {
-        //Cannot simply set this.cluster=schedulingState.cluster since clojure is immutable
-        cluster.setAssignments(schedulingState.cluster.getAssignments());
-        cluster.setBlacklistedHosts(schedulingState.cluster.getBlacklistedHosts());
-        cluster.setStatusMap(schedulingState.cluster.getStatusMap());
-        cluster.setSupervisorsResourcesMap(schedulingState.cluster.getSupervisorsResourcesMap());
-        cluster.setTopologyResourcesMap(schedulingState.cluster.getTopologyResourcesMap());
-        cluster.setWorkerResourcesMap(schedulingState.cluster.getWorkerResourcesMap());
-        //updating resources used by supervisor
-        updateSupervisorsResources(cluster, topologies);
     }
 
-    public void scheduleTopology(TopologyDetails td) {
-        User topologySubmitter = this.schedulingState.userMap.get(td.getTopologySubmitter());
-        if (this.schedulingState.cluster.getUnassignedExecutors(td).size() > 0) {
-            LOG.debug("/********Scheduling topology {} from User {}************/", td.getName(), topologySubmitter);
-
-            SchedulingState schedulingState = checkpointSchedulingState();
-            IStrategy rasStrategy = null;
+    public void scheduleTopology(TopologyDetails td, Cluster cluster, final User topologySubmitter,
+                                 Map<String, User> userMap) {
+        //A copy of cluster that we can modify, but does not get committed back to cluster unless scheduling succeeds
+        Cluster workingState = new Cluster(cluster);
+        IStrategy rasStrategy = null;
+        String strategyConf = (String) td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY);
+        try {
+            rasStrategy = (IStrategy) ReflectionUtils.newInstance(strategyConf);
+            rasStrategy.prepare(conf);
+        } catch (RuntimeException e) {
+            LOG.error("failed to create instance of IStrategy: {} Topology {} will not be scheduled.",
+                    strategyConf, td.getName(), e);
+            topologySubmitter.markTopoUnsuccess(td);
+            cluster.setStatus(td.getId(), "Unsuccessful in scheduling - failed to create instance of topology strategy "
+                    + strategyConf + ". Please check logs for details");
+            return;
+        }
+       
+        while (true) {
+            // A copy of the cluster that restricts the strategy to only modify a single topology
+            SingleTopologyCluster toSchedule = new SingleTopologyCluster(workingState, td.getId());
+            SchedulingResult result = null;
             try {
-                rasStrategy = (IStrategy) ReflectionUtils.newInstance((String) td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY));
-            } catch (RuntimeException e) {
-                LOG.error("failed to create instance of IStrategy: {} with error: {}! Topology {} will not be scheduled.",
-                        td.getName(), td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY), e.getMessage());
-                topologySubmitter = cleanup(schedulingState, td);
-                topologySubmitter.moveTopoFromPendingToInvalid(td);
-                this.schedulingState.cluster.setStatus(td.getId(), "Unsuccessful in scheduling - failed to create instance of topology strategy "
-                        + td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY) + ". Please check logs for details");
-                return;
+                result = rasStrategy.schedule(toSchedule, td);
+            } catch (Exception ex) {
+                LOG.error("Exception thrown when running strategy {} to schedule topology {}."
+                        + " Topology will not be scheduled!", rasStrategy.getClass().getName(), td.getName(), ex);
+                topologySubmitter.markTopoUnsuccess(td);
+                cluster.setStatus(td.getId(), "Unsuccessful in scheduling - Exception thrown when running strategy {}"
+                        + rasStrategy.getClass().getName() + ". Please check logs for details");
             }
-            IEvictionStrategy evictionStrategy = null;
-            while (true) {
-                SchedulingResult result = null;
-                try {
-                    // Need to re prepare scheduling strategy with cluster and topologies in case scheduling state was restored
-                    // Pass in a copy of scheduling state since the scheduling strategy should not be able to be able to make modifications to
-                    // the state of cluster directly
-                    rasStrategy.prepare(new SchedulingState(this.schedulingState));
-                    result = rasStrategy.schedule(td);
-                } catch (Exception ex) {
-                    LOG.error(String.format("Exception thrown when running strategy %s to schedule topology %s. Topology will not be scheduled!"
-                            , rasStrategy.getClass().getName(), td.getName()), ex);
-                    topologySubmitter = cleanup(schedulingState, td);
-                    topologySubmitter.moveTopoFromPendingToInvalid(td);
-                    this.schedulingState.cluster.setStatus(td.getId(), "Unsuccessful in scheduling - Exception thrown when running strategy {}"
-                            + rasStrategy.getClass().getName() + ". Please check logs for details");
-                }
-                LOG.debug("scheduling result: {}", result);
-                if (result != null && result.isValid()) {
-                    if (result.isSuccess()) {
+            LOG.debug("scheduling result: {}", result);
+            if (result != null) {
+                if (result.isSuccess()) {
+                    try {
+                        cluster.updateFrom(toSchedule);
+                        cluster.setStatus(td.getId(), "Running - " + result.getMessage());
+                    } catch (Exception ex) {
+                        LOG.error("Unsuccessful attempting to assign executors to nodes.", ex);
+                        topologySubmitter.markTopoUnsuccess(td);
+                        cluster.setStatus(td.getId(), "Unsuccessful in scheduling - "
+                            + "IllegalStateException thrown when attempting to assign executors to nodes. Please check"
+                            + " log for details.");
+                    }
+                    return;
+                } else {
+                    if (result.getStatus() == SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES) {
+                        boolean madeSpace = false;
                         try {
-                            if (mkAssignment(td, result.getSchedulingResultMap())) {
-                                topologySubmitter.moveTopoFromPendingToRunning(td);
-                                this.schedulingState.cluster.setStatus(td.getId(), "Running - " + result.getMessage());
-                            } else {
-                                topologySubmitter = this.cleanup(schedulingState, td);
-                                topologySubmitter.moveTopoFromPendingToAttempted(td);
-                                this.schedulingState.cluster.setStatus(td.getId(), "Unsuccessful in scheduling - Unable to assign executors to nodes. Please check logs for details");
-                            }
-                        } catch (IllegalStateException ex) {
-                            LOG.error("Unsuccessful in scheduling - IllegalStateException thrown when attempting to assign executors to nodes.", ex);
-                            topologySubmitter = cleanup(schedulingState, td);
-                            topologySubmitter.moveTopoFromPendingToAttempted(td);
-                            this.schedulingState.cluster.setStatus(td.getId(), "Unsuccessful in scheduling - IllegalStateException thrown when attempting to assign executors to nodes. Please check log for details.");
+                            //need to re prepare since scheduling state might have been restored
+                            madeSpace = evictionStrategy.makeSpaceForTopo(td, workingState, userMap);
+                        } catch (Exception ex) {
+                            LOG.error("Exception thrown when running eviction strategy {} to schedule topology {}."
+                                    + " No evictions will be done!", evictionStrategy.getClass().getName(),
+                                td.getName(), ex);
+                            topologySubmitter.markTopoUnsuccess(td);
+                            return;
                         }
-                        break;
-                    } else {
-                        if (result.getStatus() == SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES) {
-                            if (evictionStrategy == null) {
-                                try {
-                                    evictionStrategy = (IEvictionStrategy) ReflectionUtils.newInstance((String) this.conf.get(DaemonConfig.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY));
-                                } catch (RuntimeException e) {
-                                    LOG.error("failed to create instance of eviction strategy: {} with error: {}! No topology eviction will be done.",
-                                            this.conf.get(DaemonConfig.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY), e.getMessage());
-                                    topologySubmitter.moveTopoFromPendingToAttempted(td);
-                                    break;
-                                }
-                            }
-                            boolean madeSpace = false;
-                            try {
-                                //need to re prepare since scheduling state might have been restored
-                                evictionStrategy.prepare(this.schedulingState);
-                                madeSpace = evictionStrategy.makeSpaceForTopo(td);
-                            } catch (Exception ex) {
-                                LOG.error(String.format("Exception thrown when running eviction strategy %s to schedule topology %s. No evictions will be done! Error: %s"
-                                        , evictionStrategy.getClass().getName(), td.getName(), ex.getClass().getName()), ex);
-                                topologySubmitter = cleanup(schedulingState, td);
-                                topologySubmitter.moveTopoFromPendingToAttempted(td);
-                                break;
-                            }
-                            if (!madeSpace) {
-                                LOG.debug("Could not make space for topo {} will move to attempted", td);
-                                topologySubmitter = cleanup(schedulingState, td);
-                                topologySubmitter.moveTopoFromPendingToAttempted(td);
-                                this.schedulingState.cluster.setStatus(td.getId(), "Not enough resources to schedule - " + result.getErrorMessage());
-                                break;
-                            }
-                            continue;
-                        } else if (result.getStatus() == SchedulingStatus.FAIL_INVALID_TOPOLOGY) {
-                            topologySubmitter = cleanup(schedulingState, td);
-                            topologySubmitter.moveTopoFromPendingToInvalid(td, this.schedulingState.cluster);
-                            break;
-                        } else {
-                            topologySubmitter = cleanup(schedulingState, td);
-                            topologySubmitter.moveTopoFromPendingToAttempted(td, this.schedulingState.cluster);
-                            break;
+                        if (!madeSpace) {
+                            LOG.debug("Could not make space for topo {} will move to attempted", td);
+                            topologySubmitter.markTopoUnsuccess(td);
+                            cluster.setStatus(td.getId(), "Not enough resources to schedule - "
+                                + result.getErrorMessage());
+                            return;
                         }
+                        continue;
+                    } else {
+                        topologySubmitter.markTopoUnsuccess(td, cluster);
+                        return;
                     }
-                } else {
-                    LOG.warn("Scheduling results returned from topology {} is not vaild! Topology with be ignored.", td.getName());
-                    topologySubmitter = cleanup(schedulingState, td);
-                    topologySubmitter.moveTopoFromPendingToInvalid(td, this.schedulingState.cluster);
-                    break;
-                }
-            }
-        } else {
-            LOG.warn("Topology {} is already fully scheduled!", td.getName());
-            topologySubmitter.moveTopoFromPendingToRunning(td);
-            if (this.schedulingState.cluster.getStatusMap().get(td.getId()) == null || this.schedulingState.cluster.getStatusMap().get(td.getId()).equals("")) {
-                this.schedulingState.cluster.setStatus(td.getId(), "Fully Scheduled");
-            }
-        }
-    }
-
-    private User cleanup(SchedulingState schedulingState, TopologyDetails td) {
-        restoreCheckpointSchedulingState(schedulingState);
-        //since state is restored need the update User topologySubmitter to the new User object in userMap
-        return this.schedulingState.userMap.get(td.getTopologySubmitter());
-    }
-
-    private boolean mkAssignment(TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>> schedulerAssignmentMap) {
-        if (schedulerAssignmentMap != null) {
-            double requestedMemOnHeap = td.getTotalRequestedMemOnHeap();
-            double requestedMemOffHeap = td.getTotalRequestedMemOffHeap();
-            double requestedCpu = td.getTotalRequestedCpu();
-            double assignedMemOnHeap = 0.0;
-            double assignedMemOffHeap = 0.0;
-            double assignedCpu = 0.0;
-
-            Map<WorkerSlot, Double[]> workerResources = new HashMap<WorkerSlot, Double[]>();
-
-            Set<String> nodesUsed = new HashSet<String>();
-            for (Map.Entry<WorkerSlot, Collection<ExecutorDetails>> workerToTasksEntry : schedulerAssignmentMap.entrySet()) {
-                WorkerSlot targetSlot = workerToTasksEntry.getKey();
-                Collection<ExecutorDetails> execsNeedScheduling = workerToTasksEntry.getValue();
-                RAS_Node targetNode = this.schedulingState.nodes.getNodeById(targetSlot.getNodeId());
-
-                targetSlot = allocateResourceToSlot(td, execsNeedScheduling, targetSlot);
-
-                targetNode.assign(targetSlot, td, execsNeedScheduling);
-
-                LOG.debug("ASSIGNMENT    TOPOLOGY: {}  TASKS: {} To Node: {} on Slot: {}",
-                        td.getName(), execsNeedScheduling, targetNode.getHostname(), targetSlot.getPort());
-
-                for (ExecutorDetails exec : execsNeedScheduling) {
-                    targetNode.consumeResourcesforTask(exec, td);
-                }
-                if (!nodesUsed.contains(targetNode.getId())) {
-                    nodesUsed.add(targetNode.getId());
                 }
-                assignedMemOnHeap += targetSlot.getAllocatedMemOnHeap();
-                assignedMemOffHeap += targetSlot.getAllocatedMemOffHeap();
-                assignedCpu += targetSlot.getAllocatedCpu();
-
-                Double[] worker_resources = {
-                    requestedMemOnHeap, requestedMemOffHeap, requestedCpu,
-                    targetSlot.getAllocatedMemOnHeap(), targetSlot.getAllocatedMemOffHeap(), targetSlot.getAllocatedCpu()};
-                workerResources.put (targetSlot, worker_resources);
-            }
-
-            Double[] resources = {requestedMemOnHeap, requestedMemOffHeap, requestedCpu,
-                    assignedMemOnHeap, assignedMemOffHeap, assignedCpu};
-            LOG.debug("setTopologyResources for {}: requested on-heap mem, off-heap mem, cpu: {} {} {} " +
-                            "assigned on-heap mem, off-heap mem, cpu: {} {} {}",
-                    td.getId(), requestedMemOnHeap, requestedMemOffHeap, requestedCpu,
-                    assignedMemOnHeap, assignedMemOffHeap, assignedCpu);
-            //updating resources used for a topology
-            this.schedulingState.cluster.setTopologyResources(td.getId(), resources);
-            this.schedulingState.cluster.setWorkerResources(td.getId(), workerResources);
-            return true;
-        } else {
-            LOG.warn("schedulerAssignmentMap for topo {} is null. This shouldn't happen!", td.getName());
-            return false;
-        }
-    }
-
-    private WorkerSlot allocateResourceToSlot (TopologyDetails td, Collection<ExecutorDetails> executors, WorkerSlot slot) {
-        double onHeapMem = 0.0;
-        double offHeapMem = 0.0;
-        double cpu = 0.0;
-        for (ExecutorDetails exec : executors) {
-            Double onHeapMemForExec = td.getOnHeapMemoryRequirement(exec);
-            if (onHeapMemForExec != null) {
-                onHeapMem += onHeapMemForExec;
-            }
-            Double offHeapMemForExec = td.getOffHeapMemoryRequirement(exec);
-            if (offHeapMemForExec != null) {
-                offHeapMem += offHeapMemForExec;
-            }
-            Double cpuForExec = td.getTotalCpuReqTask(exec);
-            if (cpuForExec != null) {
-                cpu += cpuForExec;
+            } else {
+                LOG.warn("Scheduling results returned from topology {} is not vaild! Topology with be ignored.",
+                    td.getName());
+                topologySubmitter.markTopoUnsuccess(td, cluster);
+                return;
             }
         }
-        return new WorkerSlot(slot.getNodeId(), slot.getPort(), onHeapMem, offHeapMem, cpu);
-    }
-
-    private void updateSupervisorsResources(Cluster cluster, Topologies topologies) {
-        Map<String, Double[]> supervisors_resources = new HashMap<String, Double[]>();
-        Map<String, RAS_Node> nodes = RAS_Nodes.getAllNodesFrom(cluster, topologies);
-        for (Map.Entry<String, RAS_Node> entry : nodes.entrySet()) {
-            RAS_Node node = entry.getValue();
-            Double totalMem = node.getTotalMemoryResources();
-            Double totalCpu = node.getTotalCpuResources();
-            Double usedMem = totalMem - node.getAvailableMemoryResources();
-            Double usedCpu = totalCpu - node.getAvailableCpuResources();
-            Double[] resources = {totalMem, totalCpu, usedMem, usedCpu};
-            supervisors_resources.put(entry.getKey(), resources);
-        }
-        cluster.setSupervisorsResourcesMap(supervisors_resources);
-    }
-
-    public User getUser(String user) {
-        return this.schedulingState.userMap.get(user);
-    }
-
-    public Map<String, User> getUserMap() {
-        return this.schedulingState.userMap;
     }
 
     /**
-     * Intialize scheduling and running queues
+     * Get User wrappers around cluster.
      *
-     * @param topologies
-     * @param cluster
+     * @param cluster the cluster to get the users out of.
      */
-    private Map<String, User> getUsers(Topologies topologies, Cluster cluster) {
-        Map<String, User> userMap = new HashMap<String, User>();
+    private Map<String, User> getUsers(Cluster cluster) {
+        Map<String, User> userMap = new HashMap<>();
         Map<String, Map<String, Double>> userResourcePools = getUserResourcePools();
         LOG.debug("userResourcePools: {}", userResourcePools);
 
-        for (TopologyDetails td : topologies.getTopologies()) {
-
+        for (TopologyDetails td : cluster.getTopologies()) {
             String topologySubmitter = td.getTopologySubmitter();
             //additional safety check to make sure that topologySubmitter is going to be a valid value
             if (topologySubmitter == null || topologySubmitter.equals("")) {
@@ -364,37 +190,23 @@ public class ResourceAwareScheduler implements IScheduler {
             if (!userMap.containsKey(topologySubmitter)) {
                 userMap.put(topologySubmitter, new User(topologySubmitter, userResourcePools.get(topologySubmitter)));
             }
-            if (cluster.getUnassignedExecutors(td).size() > 0) {
-                LOG.debug("adding td: {} to pending queue", td.getName());
-                userMap.get(topologySubmitter).addTopologyToPendingQueue(td);
-            } else {
-                LOG.debug("adding td: {} to running queue with existing status: {}", td.getName(), cluster.getStatusMap().get(td.getId()));
-                userMap.get(topologySubmitter).addTopologyToRunningQueue(td);
-                if (cluster.getStatusMap().get(td.getId()) == null || cluster.getStatusMap().get(td.getId()).equals("")) {
-                    cluster.setStatus(td.getId(), "Fully Scheduled");
-                }
-            }
         }
         return userMap;
     }
 
-    private void initialize(Topologies topologies, Cluster cluster) {
-        Map<String, User> userMap = getUsers(topologies, cluster);
-        this.schedulingState = new SchedulingState(userMap, cluster, topologies, this.conf);
-    }
-
     /**
-     * Get resource guarantee configs
+     * Get resource guarantee configs.
      *
      * @return a map that contains resource guarantees of every user of the following format
-     * {userid->{resourceType->amountGuaranteed}}
+     *     {userid->{resourceType->amountGuaranteed}}
      */
     private Map<String, Map<String, Double>> getUserResourcePools() {
-        Object raw = this.conf.get(DaemonConfig.RESOURCE_AWARE_SCHEDULER_USER_POOLS);
-        Map<String, Map<String, Double>> ret = new HashMap<String, Map<String, Double>>();
+        Map<String, Map<String, Number>> raw =
+            (Map<String, Map<String, Number>>) conf.get(DaemonConfig.RESOURCE_AWARE_SCHEDULER_USER_POOLS);
+        Map<String, Map<String, Double>> ret = new HashMap<>();
 
         if (raw != null) {
-            for (Map.Entry<String, Map<String, Number>> userPoolEntry : ((Map<String, Map<String, Number>>) raw).entrySet()) {
+            for (Map.Entry<String, Map<String, Number>> userPoolEntry :  raw.entrySet()) {
                 String user = userPoolEntry.getKey();
                 ret.put(user, new HashMap<String, Double>());
                 for (Map.Entry<String, Number> resourceEntry : userPoolEntry.getValue().entrySet()) {
@@ -404,7 +216,8 @@ public class ResourceAwareScheduler implements IScheduler {
         }
 
         Map fromFile = Utils.findAndReadConfigFile("user-resource-pools.yaml", false);
-        Map<String, Map<String, Number>> tmp = (Map<String, Map<String, Number>>) fromFile.get(DaemonConfig.RESOURCE_AWARE_SCHEDULER_USER_POOLS);
+        Map<String, Map<String, Number>> tmp =
+            (Map<String, Map<String, Number>>) fromFile.get(DaemonConfig.RESOURCE_AWARE_SCHEDULER_USER_POOLS);
         if (tmp != null) {
             for (Map.Entry<String, Map<String, Number>> userPoolEntry : tmp.entrySet()) {
                 String user = userPoolEntry.getKey();
@@ -416,27 +229,4 @@ public class ResourceAwareScheduler implements IScheduler {
         }
         return ret;
     }
-
-    private SchedulingState checkpointSchedulingState() {
-        LOG.debug("/*********Checkpoint scheduling state************/");
-        for (User user : this.schedulingState.userMap.values()) {
-            LOG.debug(user.getDetailedInfo());
-        }
-        LOG.debug(ResourceUtils.printScheduling(this.schedulingState.cluster, this.schedulingState.topologies));
-        LOG.debug("nodes:\n{}", this.schedulingState.nodes);
-        LOG.debug("/*********End************/");
-        return new SchedulingState(this.schedulingState);
-    }
-
-    private void restoreCheckpointSchedulingState(SchedulingState schedulingState) {
-        LOG.debug("/*********restoring scheduling state************/");
-        //reseting cluster
-        this.schedulingState = schedulingState;
-        for (User user : this.schedulingState.userMap.values()) {
-            LOG.debug(user.getDetailedInfo());
-        }
-        LOG.debug(ResourceUtils.printScheduling(this.schedulingState.cluster, this.schedulingState.topologies));
-        LOG.debug("nodes:\n{}", this.schedulingState.nodes);
-        LOG.debug("/*********End************/");
-    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/b4d33955/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java
new file mode 100644
index 0000000..a50dced
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.storm.Config;
+import org.apache.storm.generated.Bolt;
+import org.apache.storm.generated.SpoutSpec;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.utils.ObjectReader;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ResourceUtils {
+    private static final Logger LOG = LoggerFactory.getLogger(ResourceUtils.class);
+
+    public static Map<String, Map<String, Double>> getBoltsResources(StormTopology topology,
+                                                                     Map<String, Object> topologyConf) {
+        Map<String, Map<String, Double>> boltResources = new HashMap<>();
+        if (topology.get_bolts() != null) {
+            for (Map.Entry<String, Bolt> bolt : topology.get_bolts().entrySet()) {
+                Map<String, Double> topologyResources = parseResources(bolt.getValue().get_common().get_json_conf());
+                checkIntialization(topologyResources, bolt.getValue().toString(), topologyConf);
+                LOG.warn("Turned {} into {}", bolt.getValue().get_common().get_json_conf(), topologyResources);
+                boltResources.put(bolt.getKey(), topologyResources);
+            }
+        }
+        return boltResources;
+    }
+
+    public static Map<String, Map<String, Double>> getSpoutsResources(StormTopology topology,
+                                                                      Map<String, Object> topologyConf) {
+        Map<String, Map<String, Double>> spoutResources = new HashMap<>();
+        if (topology.get_spouts() != null) {
+            for (Map.Entry<String, SpoutSpec> spout : topology.get_spouts().entrySet()) {
+                Map<String, Double> topologyResources = parseResources(spout.getValue().get_common().get_json_conf());
+                checkIntialization(topologyResources, spout.getValue().toString(), topologyConf);
+                spoutResources.put(spout.getKey(), topologyResources);
+            }
+        }
+        return spoutResources;
+    }
+
+    public static void checkIntialization(Map<String, Double> topologyResources, String com,
+                                          Map<String, Object> topologyConf) {
+        checkInitMem(topologyResources, com, topologyConf);
+        checkInitCpu(topologyResources, com, topologyConf);
+    }
+
+    private static void checkInitMem(Map<String, Double> topologyResources, String com,
+                                    Map<String, Object> topologyConf) {
+        if (!topologyResources.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB)) {
+            Double onHeap = ObjectReader.getDouble(
+                topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB), null);
+            if (onHeap != null) {
+                topologyResources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, onHeap);
+                debugMessage("ONHEAP", com, topologyConf);
+            }
+        }
+        if (!topologyResources.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB)) {
+            Double offHeap = ObjectReader.getDouble(
+                topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB), null);
+            if (offHeap != null) {
+                topologyResources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, offHeap);
+                debugMessage("OFFHEAP", com, topologyConf);
+            }
+        }
+    }
+
+    private static void checkInitCpu(Map<String, Double> topologyResources, String com,
+                                     Map<String, Object> topologyConf) {
+        if (!topologyResources.containsKey(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT)) {
+            Double cpu = ObjectReader.getDouble(topologyConf.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT), null);
+            if (cpu != null) {
+                topologyResources.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, cpu);
+                debugMessage("CPU", com, topologyConf);
+            }
+        }
+    }
+
+    public static Map<String, Double> parseResources(String input) {
+        Map<String, Double> topologyResources = new HashMap<>();
+        JSONParser parser = new JSONParser();
+        LOG.debug("Input to parseResources {}", input);
+        try {
+            if (input != null) {
+                Object obj = parser.parse(input);
+                JSONObject jsonObject = (JSONObject) obj;
+                if (jsonObject.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB)) {
+                    Double topoMemOnHeap = ObjectReader
+                            .getDouble(jsonObject.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB), null);
+                    topologyResources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, topoMemOnHeap);
+                }
+                if (jsonObject.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB)) {
+                    Double topoMemOffHeap = ObjectReader
+                            .getDouble(jsonObject.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB), null);
+                    topologyResources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, topoMemOffHeap);
+                }
+                if (jsonObject.containsKey(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT)) {
+                    Double topoCpu = ObjectReader.getDouble(jsonObject.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT),
+                        null);
+                    topologyResources.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, topoCpu);
+                }
+                LOG.debug("Topology Resources {}", topologyResources);
+            }
+        } catch (ParseException e) {
+            LOG.error("Failed to parse component resources is:" + e.toString(), e);
+            return null;
+        }
+        return topologyResources;
+    }
+
+    private static void debugMessage(String memoryType, String com, Map<String, Object> topologyConf) {
+        if (memoryType.equals("ONHEAP")) {
+            LOG.debug(
+                    "Unable to extract resource requirement for Component {}\n"
+                        + " Resource : Memory Type : On Heap set to default {}",
+                    com, topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB));
+        } else if (memoryType.equals("OFFHEAP")) {
+            LOG.debug(
+                    "Unable to extract resource requirement for Component {}\n"
+                        + " Resource : Memory Type : Off Heap set to default {}",
+                    com, topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB));
+        } else {
+            LOG.debug(
+                    "Unable to extract resource requirement for Component {}\n"
+                        + " Resource : CPU Pcore Percent set to default {}",
+                    com, topologyConf.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT));
+        }
+    }
+
+    /**
+     * Calculate the sum of a collection of doubles.
+     * @param list collection of doubles
+     * @return the sum of of collection of doubles
+     */
+    public static double sum(Collection<Double> list) {
+        double sum = 0.0;
+        for (Double elem : list) {
+            sum += elem;
+        }
+        return sum;
+    }
+
+    /**
+     * Calculate the average of a collection of doubles.
+     * @param list a collection of doubles
+     * @return the average of collection of doubles
+     */
+    public static double avg(Collection<Double> list) {
+        return sum(list) / list.size();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/b4d33955/storm-server/src/main/java/org/apache/storm/scheduler/resource/SchedulingResult.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/SchedulingResult.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/SchedulingResult.java
new file mode 100644
index 0000000..72f083d
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/SchedulingResult.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class serves as a mechanism to return results and messages from a scheduling strategy to the Resource Aware
+ * Scheduler.
+ */
+public class SchedulingResult {
+    private static final Logger LOG = LoggerFactory.getLogger(SchedulingResult.class);
+
+    //status of scheduling the topology e.g. success or fail?
+    private final SchedulingStatus status;
+
+    //arbitrary message to be returned when scheduling is done
+    private final String message;
+
+    //error message returned is something went wrong
+    private final String errorMessage;
+
+    private SchedulingResult(SchedulingStatus status, String message, String errorMessage) {
+        this.status = status;
+        this.message = message;
+        this.errorMessage = errorMessage;
+    }
+
+    public static SchedulingResult failure(SchedulingStatus status, String errorMessage) {
+        return new SchedulingResult(status, null, errorMessage);
+    }
+
+    public static SchedulingResult success() {
+        return SchedulingResult.success(null);
+    }
+
+    public static SchedulingResult success(String message) {
+        return new SchedulingResult(SchedulingStatus.SUCCESS, message, null);
+    }
+
+    public SchedulingStatus getStatus() {
+        return this.status;
+    }
+
+    public String getMessage() {
+        return this.message;
+    }
+
+    public String getErrorMessage() {
+        return this.errorMessage;
+    }
+    
+    public boolean isSuccess() {
+        return SchedulingStatus.isStatusSuccess(this.status);
+    }
+
+    public boolean isFailure() {
+        return SchedulingStatus.isStatusFailure(this.status);
+    }
+
+    @Override
+    public String toString() {
+        String ret = null;
+        if (isSuccess()) {
+            ret = "Status: " + this.getStatus() + " message: " + this.getMessage();
+        } else {
+            ret = "Status: " + this.getStatus() + " error message: " + this.getErrorMessage();
+        }
+        return ret;
+    }
+}