You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/07/31 02:07:05 UTC

[06/20] storm git commit: STORM-2497: Let Supervisor enforce memory and add in support for shared memory regions

http://git-wip-us.apache.org/repos/asf/storm/blob/b4d33955/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
new file mode 100644
index 0000000..a6ae4cc
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
@@ -0,0 +1,1050 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.scheduler;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.storm.Config;
+import org.apache.storm.daemon.nimbus.TopologyResources;
+import org.apache.storm.generated.SharedMemory;
+import org.apache.storm.generated.WorkerResources;
+import org.apache.storm.networktopography.DNSToSwitchMapping;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.ReflectionUtils;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Cluster implements ISchedulingState {
+    private static final Logger LOG = LoggerFactory.getLogger(Cluster.class);
+
+    public static class SupervisorResources {
+        private final double totalMem;
+        private final double totalCpu;
+        private final double usedMem;
+        private final double usedCpu;
+
+        /**
+         * Constructor for a Supervisor's resources.
+         *
+         * @param totalMem the total mem on the supervisor
+         * @param totalCpu the total CPU on the supervisor
+         * @param usedMem the used mem on the supervisor
+         * @param usedCpu the used CPU on the supervisor
+         */
+        public SupervisorResources(double totalMem, double totalCpu, double usedMem, double usedCpu) {
+            this.totalMem = totalMem;
+            this.totalCpu = totalCpu;
+            this.usedMem = usedMem;
+            this.usedCpu = usedCpu;
+        }
+
+        public double getUsedMem() {
+            return usedMem;
+        }
+
+        public double getUsedCpu() {
+            return usedCpu;
+        }
+
+        public double getTotalMem() {
+            return totalMem;
+        }
+
+        public double getTotalCpu() {
+            return totalCpu;
+        }
+
+        private SupervisorResources add(WorkerResources wr) {
+            return new SupervisorResources(
+                totalMem,
+                totalCpu,
+                usedMem + wr.get_mem_off_heap() + wr.get_mem_on_heap(),
+                usedCpu + wr.get_cpu());
+        }
+
+        public SupervisorResources addMem(Double value) {
+            return new SupervisorResources(totalMem, totalCpu, usedMem + value, usedCpu);
+        }
+    }
+
+    /**
+     * key: supervisor id, value: supervisor details.
+     */
+    private final Map<String, SupervisorDetails> supervisors = new HashMap<>();
+
+    /**
+     * key: rack, value: nodes in that rack.
+     */
+    private final Map<String, List<String>> networkTopography = new HashMap<>();
+
+    /**
+     * key: topologyId, value: topology's current assignments.
+     */
+    private final Map<String, SchedulerAssignmentImpl> assignments = new HashMap<>();
+
+    /**
+     * key topologyId, Value: scheduler's status.
+     */
+    private final Map<String, String> status = new HashMap<>();
+
+    /**
+     * A map from hostname to supervisor ids.
+     */
+    private final Map<String, List<String>> hostToId = new HashMap<>();
+
+    private final Map<String, Object> conf;
+
+    private Set<String> blackListedHosts = new HashSet<>();
+    private INimbus inimbus;
+    private final Topologies topologies;
+    private final Map<String, Double> scheduledCPUCache = new HashMap<>();
+    private final Map<String, Double> scheduledMemoryCache = new HashMap<>();
+
+    public Cluster(
+        INimbus nimbus,
+        Map<String, SupervisorDetails> supervisors,
+        Map<String, ? extends SchedulerAssignment> map,
+        Topologies topologies,
+        Map<String, Object> conf) {
+        this(nimbus, supervisors, map, topologies, conf, null, null, null);
+    }
+
+    /**
+     * Copy constructor.
+     */
+    public Cluster(Cluster src) {
+        this(
+            src.inimbus,
+            src.supervisors,
+            src.assignments,
+            src.topologies,
+            new HashMap<>(src.conf),
+            src.status,
+            src.blackListedHosts,
+            src.networkTopography);
+    }
+
+    /**
+     * Testing Constructor that takes an existing cluster and replaces the topologies in it.
+     *
+     * @param src the original cluster
+     * @param topologies the new topolgoies to use
+     */
+    @VisibleForTesting
+    public Cluster(Cluster src, Topologies topologies) {
+        this(
+            src.inimbus,
+            src.supervisors,
+            src.assignments,
+            topologies,
+            new HashMap<>(src.conf),
+            src.status,
+            src.blackListedHosts,
+            src.networkTopography);
+    }
+
+    private Cluster(
+        INimbus nimbus,
+        Map<String, SupervisorDetails> supervisors,
+        Map<String, ? extends SchedulerAssignment> assignments,
+        Topologies topologies,
+        Map<String, Object> conf,
+        Map<String, String> status,
+        Set<String> blackListedHosts,
+        Map<String, List<String>> networkTopography) {
+        this.inimbus = nimbus;
+        this.supervisors.putAll(supervisors);
+
+        for (Map.Entry<String, SupervisorDetails> entry : supervisors.entrySet()) {
+            String nodeId = entry.getKey();
+            SupervisorDetails supervisor = entry.getValue();
+            String host = supervisor.getHost();
+            List<String> ids = hostToId.get(host);
+            if (ids == null) {
+                ids = new ArrayList<>();
+                hostToId.put(host, ids);
+            }
+            ids.add(nodeId);
+        }
+        this.conf = conf;
+        this.topologies = topologies;
+
+        ArrayList<String> supervisorHostNames = new ArrayList<String>();
+        for (SupervisorDetails s : supervisors.values()) {
+            supervisorHostNames.add(s.getHost());
+        }
+
+        //Initialize the network topography
+        if (networkTopography == null || networkTopography.isEmpty()) {
+            //Initialize the network topography
+            String clazz = (String) conf.get(Config.STORM_NETWORK_TOPOGRAPHY_PLUGIN);
+            if (clazz != null && !clazz.isEmpty()) {
+                DNSToSwitchMapping topographyMapper =
+                    (DNSToSwitchMapping) ReflectionUtils.newInstance(clazz);
+
+                Map<String, String> resolvedSuperVisors = topographyMapper.resolve(supervisorHostNames);
+                for (Map.Entry<String, String> entry : resolvedSuperVisors.entrySet()) {
+                    String hostName = entry.getKey();
+                    String rack = entry.getValue();
+                    List<String> nodesForRack = this.networkTopography.get(rack);
+                    if (nodesForRack == null) {
+                        nodesForRack = new ArrayList<String>();
+                        this.networkTopography.put(rack, nodesForRack);
+                    }
+                    nodesForRack.add(hostName);
+                }
+            }
+        } else {
+            this.networkTopography.putAll(networkTopography);
+        }
+
+        if (status != null) {
+            this.status.putAll(status);
+        }
+
+        if (blackListedHosts != null) {
+            this.blackListedHosts.addAll(blackListedHosts);
+        }
+
+        setAssignments(assignments, true);
+    }
+
+    /**
+     * Check if the given topology is allowed for modification right now. If not throw an
+     * IllegalArgumentException else go on.
+     *
+     * @param topologyId the id of the topology to check
+     */
+    protected void assertValidTopologyForModification(String topologyId) {
+        //NOOP
+    }
+
+    /**
+     * Set the list of hosts that are blacklisted.
+     *
+     * @param hosts the new hosts that are blacklisted.
+     */
+    public void setBlacklistedHosts(Set<String> hosts) {
+        if (hosts == blackListedHosts) {
+            //NOOP
+            return;
+        }
+        blackListedHosts.clear();
+        blackListedHosts.addAll(hosts);
+    }
+
+    @Override
+    public Topologies getTopologies() {
+        return topologies;
+    }
+
+    @Override
+    public Set<String> getBlacklistedHosts() {
+        return blackListedHosts;
+    }
+
+    public void blacklistHost(String host) {
+        blackListedHosts.add(host);
+    }
+
+    @Override
+    public boolean isBlackListed(String supervisorId) {
+        return blackListedHosts.contains(getHost(supervisorId));
+    }
+
+    @Override
+    public boolean isBlacklistedHost(String host) {
+        return blackListedHosts.contains(host);
+    }
+
+    @Override
+    public String getHost(String supervisorId) {
+        return inimbus.getHostName(supervisors, supervisorId);
+    }
+
+    @Override
+    public List<TopologyDetails> needsSchedulingTopologies() {
+        List<TopologyDetails> ret = new ArrayList<TopologyDetails>();
+        for (TopologyDetails topology : getTopologies()) {
+            if (needsScheduling(topology)) {
+                ret.add(topology);
+            }
+        }
+
+        return ret;
+    }
+
+    @Override
+    public boolean needsScheduling(TopologyDetails topology) {
+        int desiredNumWorkers = topology.getNumWorkers();
+        int assignedNumWorkers = this.getAssignedNumWorkers(topology);
+        return desiredNumWorkers > assignedNumWorkers || getUnassignedExecutors(topology).size() > 0;
+    }
+
+    @Override
+    public boolean needsSchedulingRas(TopologyDetails topology) {
+        return getUnassignedExecutors(topology).size() > 0;
+    }
+
+    @Override
+    public Map<ExecutorDetails, String> getNeedsSchedulingExecutorToComponents(
+        TopologyDetails topology) {
+        Collection<ExecutorDetails> allExecutors = new HashSet<>(topology.getExecutors());
+
+        SchedulerAssignment assignment = assignments.get(topology.getId());
+        if (assignment != null) {
+            allExecutors.removeAll(assignment.getExecutors());
+        }
+        return topology.selectExecutorToComponent(allExecutors);
+    }
+
+    @Override
+    public Map<String, List<ExecutorDetails>> getNeedsSchedulingComponentToExecutors(
+        TopologyDetails topology) {
+        Map<ExecutorDetails, String> executorToComponents =
+            getNeedsSchedulingExecutorToComponents(topology);
+        Map<String, List<ExecutorDetails>> componentToExecutors = new HashMap<>();
+        for (Map.Entry<ExecutorDetails, String> entry : executorToComponents.entrySet()) {
+            ExecutorDetails executor = entry.getKey();
+            String component = entry.getValue();
+            if (!componentToExecutors.containsKey(component)) {
+                componentToExecutors.put(component, new ArrayList<>());
+            }
+
+            componentToExecutors.get(component).add(executor);
+        }
+
+        return componentToExecutors;
+    }
+
+    @Override
+    public Set<Integer> getUsedPorts(SupervisorDetails supervisor) {
+        Set<Integer> usedPorts = new HashSet<>();
+
+        for (SchedulerAssignment assignment : assignments.values()) {
+            for (WorkerSlot slot : assignment.getExecutorToSlot().values()) {
+                if (slot.getNodeId().equals(supervisor.getId())) {
+                    usedPorts.add(slot.getPort());
+                }
+            }
+        }
+
+        return usedPorts;
+    }
+
+    @Override
+    public Set<Integer> getAvailablePorts(SupervisorDetails supervisor) {
+        Set<Integer> usedPorts = this.getUsedPorts(supervisor);
+
+        Set<Integer> ret = new HashSet<>();
+        ret.addAll(getAssignablePorts(supervisor));
+        ret.removeAll(usedPorts);
+
+        return ret;
+    }
+
+    @Override
+    public Set<Integer> getAssignablePorts(SupervisorDetails supervisor) {
+        if (isBlackListed(supervisor.id)) {
+            return Collections.emptySet();
+        }
+        return supervisor.allPorts;
+    }
+
+    @Override
+    public List<WorkerSlot> getAvailableSlots() {
+        List<WorkerSlot> slots = new ArrayList<WorkerSlot>();
+        for (SupervisorDetails supervisor : this.supervisors.values()) {
+            slots.addAll(this.getAvailableSlots(supervisor));
+        }
+
+        return slots;
+    }
+
+    @Override
+    public List<WorkerSlot> getAvailableSlots(SupervisorDetails supervisor) {
+        Set<Integer> ports = this.getAvailablePorts(supervisor);
+        List<WorkerSlot> slots = new ArrayList<WorkerSlot>(ports.size());
+
+        for (Integer port : ports) {
+            slots.add(new WorkerSlot(supervisor.getId(), port));
+        }
+
+        return slots;
+    }
+
+    @Override
+    public List<WorkerSlot> getAssignableSlots(SupervisorDetails supervisor) {
+        Set<Integer> ports = this.getAssignablePorts(supervisor);
+        List<WorkerSlot> slots = new ArrayList<WorkerSlot>(ports.size());
+
+        for (Integer port : ports) {
+            slots.add(new WorkerSlot(supervisor.getId(), port));
+        }
+
+        return slots;
+    }
+
+    @Override
+    public List<WorkerSlot> getAssignableSlots() {
+        List<WorkerSlot> slots = new ArrayList<WorkerSlot>();
+        for (SupervisorDetails supervisor : this.supervisors.values()) {
+            slots.addAll(this.getAssignableSlots(supervisor));
+        }
+
+        return slots;
+    }
+
+    @Override
+    public Collection<ExecutorDetails> getUnassignedExecutors(TopologyDetails topology) {
+        if (topology == null) {
+            return new ArrayList<ExecutorDetails>(0);
+        }
+
+        Collection<ExecutorDetails> ret = new HashSet<>(topology.getExecutors());
+
+        SchedulerAssignment assignment = getAssignmentById(topology.getId());
+        if (assignment != null) {
+            Set<ExecutorDetails> assignedExecutors = assignment.getExecutors();
+            ret.removeAll(assignedExecutors);
+        }
+
+        return ret;
+    }
+
+    @Override
+    public int getAssignedNumWorkers(TopologyDetails topology) {
+        SchedulerAssignment assignment =
+            topology != null ? this.getAssignmentById(topology.getId()) : null;
+        if (assignment == null) {
+            return 0;
+        }
+
+        Set<WorkerSlot> slots = new HashSet<WorkerSlot>();
+        slots.addAll(assignment.getExecutorToSlot().values());
+        return slots.size();
+    }
+
+    private WorkerResources calculateWorkerResources(
+        TopologyDetails td, Collection<ExecutorDetails> executors) {
+        double onHeapMem = 0.0;
+        double offHeapMem = 0.0;
+        double cpu = 0.0;
+        double sharedOn = 0.0;
+        double sharedOff = 0.0;
+        for (ExecutorDetails exec : executors) {
+            Double onHeapMemForExec = td.getOnHeapMemoryRequirement(exec);
+            if (onHeapMemForExec != null) {
+                onHeapMem += onHeapMemForExec;
+            }
+            Double offHeapMemForExec = td.getOffHeapMemoryRequirement(exec);
+            if (offHeapMemForExec != null) {
+                offHeapMem += offHeapMemForExec;
+            }
+            Double cpuForExec = td.getTotalCpuReqTask(exec);
+            if (cpuForExec != null) {
+                cpu += cpuForExec;
+            }
+        }
+
+        for (SharedMemory shared : td.getSharedMemoryRequests(executors)) {
+            onHeapMem += shared.get_on_heap();
+            sharedOn += shared.get_on_heap();
+            offHeapMem += shared.get_off_heap_worker();
+            sharedOff += shared.get_off_heap_worker();
+        }
+
+        WorkerResources ret = new WorkerResources();
+        ret.set_cpu(cpu);
+        ret.set_mem_on_heap(onHeapMem);
+        ret.set_mem_off_heap(offHeapMem);
+        ret.set_shared_mem_on_heap(sharedOn);
+        ret.set_shared_mem_off_heap(sharedOff);
+        return ret;
+    }
+
+    @Override
+    public boolean wouldFit(
+        WorkerSlot ws,
+        ExecutorDetails exec,
+        TopologyDetails td,
+        double maxHeap,
+        double memoryAvailable,
+        double cpuAvailable) {
+        //NOTE this is called lots and lots by schedulers, so anything we can do to make it faster is going to help a lot.
+        //CPU is simplest because it does not have odd interactions.
+        double cpuNeeded = td.getTotalCpuReqTask(exec);
+        if (cpuNeeded > cpuAvailable) {
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Could not schedule {}:{} on {} not enough CPU {} > {}",
+                    td.getName(),
+                    exec,
+                    ws,
+                    cpuNeeded,
+                    cpuAvailable);
+            }
+            //Not enough CPU no need to try any more
+            return false;
+        }
+
+        double currentTotal = 0.0;
+        double afterTotal = 0.0;
+        double afterOnHeap = 0.0;
+        Set<ExecutorDetails> wouldBeAssigned = new HashSet<>();
+        wouldBeAssigned.add(exec);
+        SchedulerAssignmentImpl assignment = assignments.get(td.getId());
+        if (assignment != null) {
+            Collection<ExecutorDetails> currentlyAssigned = assignment.getSlotToExecutors().get(ws);
+            if (currentlyAssigned != null) {
+                wouldBeAssigned.addAll(currentlyAssigned);
+                WorkerResources wrCurrent = calculateWorkerResources(td, currentlyAssigned);
+                currentTotal = wrCurrent.get_mem_off_heap() + wrCurrent.get_mem_on_heap();
+            }
+            WorkerResources wrAfter = calculateWorkerResources(td, wouldBeAssigned);
+            afterTotal = wrAfter.get_mem_off_heap() + wrAfter.get_mem_on_heap();
+            afterOnHeap = wrAfter.get_mem_on_heap();
+
+            currentTotal += calculateSharedOffHeapMemory(ws.getNodeId(), assignment);
+            afterTotal += calculateSharedOffHeapMemory(ws.getNodeId(), assignment, exec);
+        }
+
+        double memoryAdded = afterTotal - currentTotal;
+        if (memoryAdded > memoryAvailable) {
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Could not schedule {}:{} on {} not enough Mem {} > {}",
+                    td.getName(),
+                    exec,
+                    ws,
+                    memoryAdded,
+                    memoryAvailable);
+            }
+            return false;
+        }
+        if (afterOnHeap > maxHeap) {
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Could not schedule {}:{} on {} HEAP would be too large {} > {}",
+                    td.getName(),
+                    exec,
+                    ws,
+                    afterOnHeap,
+                    maxHeap);
+            }
+            return false;
+        }
+        return true;
+    }
+
+    /**
+     * Assign the slot to the executors for this topology.
+     *
+     * @throws RuntimeException if the specified slot is already occupied.
+     */
+    public void assign(WorkerSlot slot, String topologyId, Collection<ExecutorDetails> executors) {
+        assertValidTopologyForModification(topologyId);
+        if (isSlotOccupied(slot)) {
+            throw new RuntimeException(
+                "slot: [" + slot.getNodeId() + ", " + slot.getPort() + "] is already occupied.");
+        }
+
+        TopologyDetails td = topologies.getById(topologyId);
+        if (td == null) {
+            throw new IllegalArgumentException(
+                "Trying to schedule for topo "
+                    + topologyId
+                    + " but that is not a known topology "
+                    + topologies.getAllIds());
+        }
+        WorkerResources resources = calculateWorkerResources(td, executors);
+        SchedulerAssignmentImpl assignment = assignments.get(topologyId);
+        if (assignment == null) {
+            assignment = new SchedulerAssignmentImpl(topologyId);
+            assignments.put(topologyId, assignment);
+        } else {
+            for (ExecutorDetails executor : executors) {
+                if (assignment.isExecutorAssigned(executor)) {
+                    throw new RuntimeException(
+                        "Attempting to assign executor: "
+                            + executor
+                            + " of topology: "
+                            + topologyId
+                            + " to workerslot: "
+                            + slot
+                            + ". The executor is already assigned to workerslot: "
+                            + assignment.getExecutorToSlot().get(executor)
+                            + ". The executor must unassigned before it can be assigned to another slot!");
+                }
+            }
+        }
+
+        assignment.assign(slot, executors, resources);
+        String nodeId = slot.getNodeId();
+        assignment.setTotalSharedOffHeapMemory(
+            nodeId, calculateSharedOffHeapMemory(nodeId, assignment));
+        scheduledCPUCache.remove(nodeId);
+        scheduledMemoryCache.remove(nodeId);
+    }
+
+    /**
+     * Assign everything for the given topology.
+     *
+     * @param assignment the new assignment to make
+     */
+    public void assign(SchedulerAssignment assignment, boolean ignoreSingleExceptions) {
+        String id = assignment.getTopologyId();
+        assertValidTopologyForModification(id);
+        Map<WorkerSlot, Collection<ExecutorDetails>> slotToExecs = assignment.getSlotToExecutors();
+        for (Entry<WorkerSlot, Collection<ExecutorDetails>> entry : slotToExecs.entrySet()) {
+            try {
+                assign(entry.getKey(), id, entry.getValue());
+            } catch (RuntimeException e) {
+                if (!ignoreSingleExceptions) {
+                    throw e;
+                }
+            }
+        }
+    }
+
+    /**
+     * Calculate the amount of shared off heap memory on a given nodes with the given assignment.
+     *
+     * @param nodeId the id of the node
+     * @param assignment the current assignment
+     * @return the amount of shared off heap memory for that node in MB
+     */
+    private double calculateSharedOffHeapMemory(String nodeId, SchedulerAssignmentImpl assignment) {
+        return calculateSharedOffHeapMemory(nodeId, assignment, null);
+    }
+
+    private double calculateSharedOffHeapMemory(
+        String nodeId, SchedulerAssignmentImpl assignment, ExecutorDetails extra) {
+        TopologyDetails td = topologies.getById(assignment.getTopologyId());
+        Set<ExecutorDetails> executorsOnNode = new HashSet<>();
+        for (Entry<WorkerSlot, Collection<ExecutorDetails>> entry :
+            assignment.getSlotToExecutors().entrySet()) {
+            if (nodeId.equals(entry.getKey().getNodeId())) {
+                executorsOnNode.addAll(entry.getValue());
+            }
+        }
+        if (extra != null) {
+            executorsOnNode.add(extra);
+        }
+        double memorySharedWithinNode = 0.0;
+        //Now check for overlap on the node
+        for (SharedMemory shared : td.getSharedMemoryRequests(executorsOnNode)) {
+            memorySharedWithinNode += shared.get_off_heap_node();
+        }
+        return memorySharedWithinNode;
+    }
+
+    /**
+     * Free the specified slot.
+     *
+     * @param slot the slot to free
+     */
+    public void freeSlot(WorkerSlot slot) {
+        // remove the slot from the existing assignments
+        for (SchedulerAssignmentImpl assignment : assignments.values()) {
+            if (assignment.isSlotOccupied(slot)) {
+                assertValidTopologyForModification(assignment.getTopologyId());
+                assignment.unassignBySlot(slot);
+
+                String nodeId = slot.getNodeId();
+                assignment.setTotalSharedOffHeapMemory(
+                    nodeId, calculateSharedOffHeapMemory(nodeId, assignment));
+                scheduledCPUCache.remove(nodeId);
+                scheduledMemoryCache.remove(nodeId);
+            }
+        }
+    }
+
+    /**
+     * free the slots.
+     *
+     * @param slots multiple slots to free
+     */
+    public void freeSlots(Collection<WorkerSlot> slots) {
+        if (slots != null) {
+            for (WorkerSlot slot : slots) {
+                freeSlot(slot);
+            }
+        }
+    }
+
+    @Override
+    public boolean isSlotOccupied(WorkerSlot slot) {
+        for (SchedulerAssignment assignment : assignments.values()) {
+            if (assignment.isSlotOccupied(slot)) {
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    @Override
+    public SchedulerAssignment getAssignmentById(String topologyId) {
+        if (assignments.containsKey(topologyId)) {
+            return assignments.get(topologyId);
+        }
+
+        return null;
+    }
+
+    @Override
+    public Collection<WorkerSlot> getUsedSlotsByTopologyId(String topologyId) {
+        SchedulerAssignmentImpl assignment = assignments.get(topologyId);
+        if (assignment == null) {
+            return Collections.emptySet();
+        }
+        return assignment.getSlots();
+    }
+
+    @Override
+    public SupervisorDetails getSupervisorById(String nodeId) {
+        return supervisors.get(nodeId);
+    }
+
+    @Override
+    public Collection<WorkerSlot> getUsedSlots() {
+        Set<WorkerSlot> ret = new HashSet<>();
+        for (SchedulerAssignmentImpl s : assignments.values()) {
+            ret.addAll(s.getExecutorToSlot().values());
+        }
+        return ret;
+    }
+
+    @Override
+    public List<SupervisorDetails> getSupervisorsByHost(String host) {
+        List<String> nodeIds = this.hostToId.get(host);
+        List<SupervisorDetails> ret = new ArrayList<SupervisorDetails>();
+
+        if (nodeIds != null) {
+            for (String nodeId : nodeIds) {
+                ret.add(this.getSupervisorById(nodeId));
+            }
+        }
+
+        return ret;
+    }
+
+    @Override
+    public Map<String, SchedulerAssignment> getAssignments() {
+        return new HashMap<String, SchedulerAssignment>(assignments);
+    }
+
+    /**
+     * Set assignments for cluster.
+     */
+    public void setAssignments(
+        Map<String, ? extends SchedulerAssignment> newAssignments, boolean ignoreSingleExceptions) {
+        if (newAssignments == assignments) {
+            //NOOP
+            return;
+        }
+        for (SchedulerAssignment assignment : newAssignments.values()) {
+            assertValidTopologyForModification(assignment.getTopologyId());
+        }
+        for (SchedulerAssignment assignment : assignments.values()) {
+            assertValidTopologyForModification(assignment.getTopologyId());
+        }
+        assignments.clear();
+        for (SchedulerAssignment assignment : newAssignments.values()) {
+            assign(assignment, ignoreSingleExceptions);
+        }
+    }
+
+    @Override
+    public Map<String, SupervisorDetails> getSupervisors() {
+        return this.supervisors;
+    }
+
+    @Override
+    public double getClusterTotalCpuResource() {
+        double sum = 0.0;
+        for (SupervisorDetails sup : supervisors.values()) {
+            sum += sup.getTotalCPU();
+        }
+        return sum;
+    }
+
+    @Override
+    public double getClusterTotalMemoryResource() {
+        double sum = 0.0;
+        for (SupervisorDetails sup : supervisors.values()) {
+            sum += sup.getTotalMemory();
+        }
+        return sum;
+    }
+
+    @Override
+    public Map<String, List<String>> getNetworkTopography() {
+        return networkTopography;
+    }
+
+    @VisibleForTesting
+    public void setNetworkTopography(Map<String, List<String>> networkTopography) {
+        this.networkTopography.clear();
+        this.networkTopography.putAll(networkTopography);
+    }
+
+    /**
+     * Get heap memory usage for a worker's main process and logwriter process.
+     * @param topConf - the topology config
+     * @return the assigned memory (in MB)
+     */
+    public static Double getAssignedMemoryForSlot(final Map<String, Object> topConf) {
+        Double totalWorkerMemory = 0.0;
+        final Integer topologyWorkerDefaultMemoryAllocation = 768;
+
+        List<String> topologyWorkerGcChildopts = ConfigUtils.getValueAsList(
+            Config.TOPOLOGY_WORKER_GC_CHILDOPTS, topConf);
+        List<String> workerGcChildopts = ConfigUtils.getValueAsList(
+            Config.WORKER_GC_CHILDOPTS, topConf);
+        Double memGcChildopts = null;
+        memGcChildopts = Utils.parseJvmHeapMemByChildOpts(
+            topologyWorkerGcChildopts, null);
+        if (memGcChildopts == null) {
+            memGcChildopts = Utils.parseJvmHeapMemByChildOpts(
+                workerGcChildopts, null);
+        }
+
+        List<String> topologyWorkerChildopts = ConfigUtils.getValueAsList(
+            Config.TOPOLOGY_WORKER_CHILDOPTS, topConf);
+        Double memTopologyWorkerChildopts = Utils.parseJvmHeapMemByChildOpts(
+            topologyWorkerChildopts, null);
+
+        List<String> workerChildopts = ConfigUtils.getValueAsList(
+            Config.WORKER_CHILDOPTS, topConf);
+        Double memWorkerChildopts = Utils.parseJvmHeapMemByChildOpts(
+            workerChildopts, null);
+
+        if (memGcChildopts != null) {
+            totalWorkerMemory += memGcChildopts;
+        } else if (memTopologyWorkerChildopts != null) {
+            totalWorkerMemory += memTopologyWorkerChildopts;
+        } else if (memWorkerChildopts != null) {
+            totalWorkerMemory += memWorkerChildopts;
+        } else {
+            Object workerHeapMemoryMb = topConf.get(
+                Config.WORKER_HEAP_MEMORY_MB);
+            totalWorkerMemory += ObjectReader.getInt(
+                workerHeapMemoryMb, topologyWorkerDefaultMemoryAllocation);
+        }
+
+        List<String> topoWorkerLwChildopts = ConfigUtils.getValueAsList(
+            Config.TOPOLOGY_WORKER_LOGWRITER_CHILDOPTS, topConf);
+        if (topoWorkerLwChildopts != null) {
+            totalWorkerMemory += Utils.parseJvmHeapMemByChildOpts(
+                topoWorkerLwChildopts, 0.0);
+        }
+        return totalWorkerMemory;
+    }
+
+    /**
+     * set scheduler status for a topology.
+     */
+    public void setStatus(String topologyId, String statusMessage) {
+        assertValidTopologyForModification(topologyId);
+        LOG.info("STATUS - {} {}", topologyId, statusMessage);
+        status.put(topologyId, statusMessage);
+    }
+
+    public void setStatusIfAbsent(String topologyId, String statusMessage) {
+        assertValidTopologyForModification(topologyId);
+        status.putIfAbsent(topologyId, statusMessage);
+    }
+
+    @Override
+    public Map<String, String> getStatusMap() {
+        return status;
+    }
+
+    public String getStatus(String topoId) {
+        return status.get(topoId);
+    }
+
+    /**
+     * set scheduler status map.
+     */
+    public void setStatusMap(Map<String, String> statusMap) {
+        if (statusMap == this.status) {
+            return; //This is a NOOP
+        }
+        for (String topologyId : statusMap.keySet()) {
+            assertValidTopologyForModification(topologyId);
+        }
+        for (String topologyId : status.keySet()) {
+            assertValidTopologyForModification(topologyId);
+        }
+        this.status.clear();
+        this.status.putAll(statusMap);
+    }
+
+    @Override
+    public Map<String, TopologyResources> getTopologyResourcesMap() {
+        Map<String, TopologyResources> ret = new HashMap<>(assignments.size());
+        for (TopologyDetails td : topologies.getTopologies()) {
+            String topoId = td.getId();
+            SchedulerAssignmentImpl assignment = assignments.get(topoId);
+            ret.put(topoId, new TopologyResources(td, assignment));
+        }
+        return ret;
+    }
+
+    @Override
+    public Map<String, SupervisorResources> getSupervisorsResourcesMap() {
+        Map<String, SupervisorResources> ret = new HashMap<>();
+        for (SupervisorDetails sd : supervisors.values()) {
+            ret.put(sd.getId(), new SupervisorResources(sd.getTotalMemory(), sd.getTotalMemory(), 0, 0));
+        }
+        for (SchedulerAssignmentImpl assignment : assignments.values()) {
+            for (Entry<WorkerSlot, WorkerResources> entry :
+                assignment.getScheduledResources().entrySet()) {
+                String id = entry.getKey().getNodeId();
+                SupervisorResources sr = ret.get(id);
+                if (sr == null) {
+                    sr = new SupervisorResources(0, 0, 0, 0);
+                }
+                sr = sr.add(entry.getValue());
+                ret.put(id, sr);
+            }
+            Map<String, Double> nodeIdToSharedOffHeap = assignment.getNodeIdToTotalSharedOffHeapMemory();
+            if (nodeIdToSharedOffHeap != null) {
+                for (Entry<String, Double> entry : nodeIdToSharedOffHeap.entrySet()) {
+                    String id = entry.getKey();
+                    SupervisorResources sr = ret.get(id);
+                    if (sr == null) {
+                        sr = new SupervisorResources(0, 0, 0, 0);
+                    }
+                    sr = sr.addMem(entry.getValue());
+                    ret.put(id, sr);
+                }
+            }
+        }
+        return ret;
+    }
+
+    @Override
+    public Map<String, Map<WorkerSlot, WorkerResources>> getWorkerResourcesMap() {
+        HashMap<String, Map<WorkerSlot, WorkerResources>> ret = new HashMap<>();
+        for (Entry<String, SchedulerAssignmentImpl> entry : assignments.entrySet()) {
+            ret.put(entry.getKey(), entry.getValue().getScheduledResources());
+        }
+        return ret;
+    }
+
+    @Override
+    public WorkerResources getWorkerResources(WorkerSlot ws) {
+        WorkerResources ret = null;
+        for (SchedulerAssignmentImpl assignment : assignments.values()) {
+            ret = assignment.getScheduledResources().get(ws);
+            if (ret != null) {
+                break;
+            }
+        }
+        return ret;
+    }
+
+    @Override
+    public double getScheduledMemoryForNode(String nodeId) {
+        Double ret = scheduledMemoryCache.get(nodeId);
+        if (ret != null) {
+            return ret;
+        }
+        double totalMemory = 0.0;
+        for (SchedulerAssignmentImpl assignment : assignments.values()) {
+            for (Entry<WorkerSlot, WorkerResources> entry :
+                assignment.getScheduledResources().entrySet()) {
+                if (nodeId.equals(entry.getKey().getNodeId())) {
+                    WorkerResources resources = entry.getValue();
+                    totalMemory += resources.get_mem_off_heap() + resources.get_mem_on_heap();
+                }
+            }
+            Double sharedOffHeap = assignment.getNodeIdToTotalSharedOffHeapMemory().get(nodeId);
+            if (sharedOffHeap != null) {
+                totalMemory += sharedOffHeap;
+            }
+        }
+        scheduledMemoryCache.put(nodeId, totalMemory);
+        return totalMemory;
+    }
+
+    @Override
+    public double getScheduledCpuForNode(String nodeId) {
+        Double ret = scheduledCPUCache.get(nodeId);
+        if (ret != null) {
+            return ret;
+        }
+        double totalCpu = 0.0;
+        for (SchedulerAssignmentImpl assignment : assignments.values()) {
+            for (Entry<WorkerSlot, WorkerResources> entry :
+                assignment.getScheduledResources().entrySet()) {
+                if (nodeId.equals(entry.getKey().getNodeId())) {
+                    WorkerResources resources = entry.getValue();
+                    totalCpu += resources.get_cpu();
+                }
+            }
+        }
+        scheduledCPUCache.put(nodeId, totalCpu);
+        return totalCpu;
+    }
+
+    public INimbus getINimbus() {
+        return this.inimbus;
+    }
+
+    @Override
+    public Map<String, Object> getConf() {
+        return this.conf;
+    }
+
+    /**
+     * Unassign everything for the given topology id.
+     *
+     * @param topoId the is of the topology to unassign
+     */
+    public void unassign(String topoId) {
+        assertValidTopologyForModification(topoId);
+        freeSlots(getUsedSlotsByTopologyId(topoId));
+    }
+
+    /**
+     * Update the assignments and status from the other cluster.
+     *
+     * @param other the cluster to get the assignments and status from
+     */
+    public void updateFrom(Cluster other) {
+        for (SchedulerAssignment assignment : other.getAssignments().values()) {
+            assertValidTopologyForModification(assignment.getTopologyId());
+        }
+        setAssignments(other.getAssignments(), false);
+        setStatusMap(other.getStatusMap());
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/b4d33955/storm-server/src/main/java/org/apache/storm/scheduler/Component.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/Component.java b/storm-server/src/main/java/org/apache/storm/scheduler/Component.java
new file mode 100644
index 0000000..6a0de72
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/Component.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.storm.generated.ComponentType;
+
+public class Component {
+    private final String id;
+    private final List<ExecutorDetails> execs;
+    private final ComponentType type;
+    private final Set<String> parents = new HashSet<>();
+    private final Set<String> children = new HashSet<>();
+
+    /**
+     * Create a new component.
+     * @param type the type of component this is
+     * @param compId the id of the component
+     * @param execs the executors for this component.
+     */
+    public Component(ComponentType type, String compId, List<ExecutorDetails> execs) {
+        this.type = type;
+        this.id = compId;
+        this.execs = execs;
+    }
+
+    /**
+     * Add a child link.
+     *
+     * @param child a child the consumes from this
+     */
+    public void addChild(Component child) {
+        children.add(child.getId());
+        child.parents.add(id);
+    }
+
+    public String getId() {
+        return id;
+    }
+
+    public List<ExecutorDetails> getExecs() {
+        return execs;
+    }
+
+    public ComponentType getType() {
+        return type;
+    }
+
+    public Set<String> getParents() {
+        return parents;
+    }
+
+    public Set<String> getChildren() {
+        return children;
+    }
+
+    @Override
+    public String toString() {
+        return "{id: "
+            + getId()
+            + " Parents: "
+            + getParents()
+            + " Children: "
+            + getChildren()
+            + " Execs: "
+            + getExecs()
+            + "}";
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/b4d33955/storm-server/src/main/java/org/apache/storm/scheduler/DefaultScheduler.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/DefaultScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/DefaultScheduler.java
index 3908416..5ef2ce5 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/DefaultScheduler.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/DefaultScheduler.java
@@ -70,8 +70,7 @@ public class DefaultScheduler implements IScheduler {
     }
 
     public static void defaultSchedule(Topologies topologies, Cluster cluster) {
-        List<TopologyDetails> needsSchedulingTopologies = cluster.needsSchedulingTopologies(topologies);
-        for (TopologyDetails topology : needsSchedulingTopologies) {
+        for (TopologyDetails topology : cluster.needsSchedulingTopologies()) {
             List<WorkerSlot> availableSlots = cluster.getAvailableSlots();
             Set<ExecutorDetails> allExecutors = topology.getExecutors();
 
@@ -89,12 +88,10 @@ public class DefaultScheduler implements IScheduler {
                 badSlots = badSlots(aliveAssigned, allExecutors.size(), totalSlotsToUse);                
             }
             if (badSlots != null) {
-                cluster.freeSlots(badSlots);                
+                cluster.freeSlots(badSlots);
             }
 
-            Map<String, TopologyDetails> _topologies = new HashMap<String, TopologyDetails>();
-            _topologies.put(topology.getId(), topology);
-            EvenScheduler.scheduleTopologiesEvenly(new Topologies(_topologies), cluster);
+            EvenScheduler.scheduleTopologiesEvenly(new Topologies(topology), cluster);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/b4d33955/storm-server/src/main/java/org/apache/storm/scheduler/EvenScheduler.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/EvenScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/EvenScheduler.java
index 3ad2648..d4dfd94 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/EvenScheduler.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/EvenScheduler.java
@@ -15,8 +15,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.scheduler;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Sets;
+
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
@@ -32,11 +36,10 @@ import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Sets;
-
 public class EvenScheduler implements IScheduler {
     private static final Logger LOG = LoggerFactory.getLogger(EvenScheduler.class);
 
+    @VisibleForTesting
     public static List<WorkerSlot> sortSlots(List<WorkerSlot> availableSlots) {
         //For example, we have a three nodes(supervisor1, supervisor2, supervisor3) cluster:
         //slots before sort:
@@ -51,15 +54,15 @@ public class EvenScheduler implements IScheduler {
 
         if (availableSlots != null && availableSlots.size() > 0) {
             // group by node
-            Map<String, List<WorkerSlot>> slotGroups = new TreeMap<String, List<WorkerSlot>>();
+            Map<String, List<WorkerSlot>> slotGroups = new TreeMap<>();
             for (WorkerSlot slot : availableSlots) {
                 String node = slot.getNodeId();
                 List<WorkerSlot> slots = null;
-                if(slotGroups.containsKey(node)){
-                   slots = slotGroups.get(node);
-                }else{
-                   slots = new ArrayList<WorkerSlot>();
-                   slotGroups.put(node, slots);
+                if (slotGroups.containsKey(node)) {
+                    slots = slotGroups.get(node);
+                } else {
+                    slots = new ArrayList<WorkerSlot>();
+                    slotGroups.put(node, slots);
                 }
                 slots.add(slot);
             }
@@ -112,7 +115,8 @@ public class EvenScheduler implements IScheduler {
         }
 
         //allow requesting slots number bigger than available slots
-        int toIndex = (totalSlotsToUse - aliveAssigned.size()) > sortedList.size() ? sortedList.size() : (totalSlotsToUse - aliveAssigned.size());
+        int toIndex = (totalSlotsToUse - aliveAssigned.size())
+            > sortedList.size() ? sortedList.size() : (totalSlotsToUse - aliveAssigned.size());
         List<WorkerSlot> reassignSlots = sortedList.subList(0, toIndex);
 
         Set<ExecutorDetails> aliveExecutors = new HashSet<ExecutorDetails>();
@@ -145,8 +149,7 @@ public class EvenScheduler implements IScheduler {
     }
 
     public static void scheduleTopologiesEvenly(Topologies topologies, Cluster cluster) {
-        List<TopologyDetails> needsSchedulingTopologies = cluster.needsSchedulingTopologies(topologies);
-        for (TopologyDetails topology : needsSchedulingTopologies) {
+        for (TopologyDetails topology : cluster.needsSchedulingTopologies()) {
             String topologyId = topology.getId();
             Map<ExecutorDetails, WorkerSlot> newAssignment = scheduleTopology(topology, cluster);
             Map<WorkerSlot, List<ExecutorDetails>> nodePortToExecutors = Utils.reverseMap(newAssignment);

http://git-wip-us.apache.org/repos/asf/storm/blob/b4d33955/storm-server/src/main/java/org/apache/storm/scheduler/INimbus.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/INimbus.java b/storm-server/src/main/java/org/apache/storm/scheduler/INimbus.java
new file mode 100644
index 0000000..db2fed9
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/INimbus.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+public interface INimbus {
+    void prepare(Map<String, Object> topoConf, String schedulerLocalDir);
+
+    /**
+     * Returns all slots that are available for the next round of scheduling. A slot is available for scheduling
+     * if it is free and can be assigned to, or if it is used and can be reassigned.
+     */
+    Collection<WorkerSlot> allSlotsAvailableForScheduling(Collection<SupervisorDetails> existingSupervisors,
+                                                          Topologies topologies,
+                                                          Set<String> topologiesMissingAssignments);
+
+    /**
+     * this is called after the assignment is changed in ZK.
+     */
+    void assignSlots(Topologies topologies, Map<String, Collection<WorkerSlot>> newSlotsByTopologyId);
+
+    /**
+     * map from node id to supervisor details.
+     */
+    String getHostName(Map<String, SupervisorDetails> existingSupervisors, String nodeId);
+    
+    IScheduler getForcedScheduler(); 
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/b4d33955/storm-server/src/main/java/org/apache/storm/scheduler/IScheduler.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/IScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/IScheduler.java
new file mode 100644
index 0000000..2f1de1c
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/IScheduler.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler;
+
+import java.util.Map;
+
+public interface IScheduler {
+    
+    void prepare(Map<String, Object> conf);
+    
+    /**
+     * Set assignments for the topologies which needs scheduling. The new assignments is available 
+     * through `cluster.getAssignments()`
+     *
+     *@param topologies all the topologies in the cluster, some of them need schedule. Topologies object here 
+     *       only contain static information about topologies. Information like assignments, slots are all in
+     *       the `cluster` object.
+     *@param cluster the cluster these topologies are running in. `cluster` contains everything user
+     *       need to develop a new scheduling logic. e.g. supervisors information, available slots, current 
+     *       assignments for all the topologies etc. User can set the new assignment for topologies using
+     *       cluster.setAssignmentById()`
+     */
+    void schedule(Topologies topologies, Cluster cluster);
+
+    /**
+     * This function returns the scheduler's configuration.
+     *
+     * @return The scheduler's configuration.
+     */
+    Map<String, Object> config();
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/b4d33955/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java b/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java
new file mode 100644
index 0000000..95e0c86
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.storm.daemon.nimbus.TopologyResources;
+import org.apache.storm.generated.WorkerResources;
+import org.apache.storm.scheduler.Cluster.SupervisorResources;
+
+/** An interface that provides access to the current scheduling state. */
+public interface ISchedulingState {
+
+    /**
+     * Get all of the topologies.
+     * @return all of the topologies that are a part of the cluster.
+     */
+    Topologies getTopologies();
+
+    /**
+     * Get all of the topologies that need scheduling.
+     * @return all of the topologies that are not fully scheduled.
+     */
+    List<TopologyDetails> needsSchedulingTopologies();
+
+    /**
+     * Does the topology need scheduling?
+     *
+     * <p>A topology needs scheduling if one of the following conditions holds:
+     *
+     * <ul>
+     *   <li>Although the topology is assigned slots, but is squeezed. i.e. the topology is assigned
+     *       less slots than desired.
+     *   <li>There are unassigned executors in this topology
+     * </ul>
+     */
+    boolean needsScheduling(TopologyDetails topology);
+
+    /**
+     * Like {@link #needsScheduling(TopologyDetails)} but does not take into account the number of
+     * workers requested. This is because the number of workers is ignored in RAS
+     *
+     * @param topology the topology to check
+     * @return true if the topology needs scheduling else false.
+     */
+    boolean needsSchedulingRas(TopologyDetails topology);
+
+    /**
+     * Get all of the hosts that are blacklisted.
+     * @return all of the hosts that are blacklisted
+     */
+    Set<String> getBlacklistedHosts();
+
+    /**
+     * Check is a given supervisor is on a blacklisted host.
+     *
+     * @param supervisorId the id of the supervisor
+     * @return true if it is else false
+     */
+    boolean isBlackListed(String supervisorId);
+
+    /**
+     * Check if a given host is blacklisted.
+     *
+     * @param host the name of the host
+     * @return true if it is else false.
+     */
+    boolean isBlacklistedHost(String host);
+
+    /**
+     * Map a supervisor to a given host.
+     *
+     * @param supervisorId the id of the supervisor
+     * @return the actual host name the supervisor is on
+     */
+    String getHost(String supervisorId);
+
+    /**
+     * get the unassigned executors of the topology.
+     *
+     * @param topology the topology to check
+     * @return the unassigned executors of the topology.
+     */
+    Collection<ExecutorDetails> getUnassignedExecutors(TopologyDetails topology);
+
+    /**
+     * @param topology the topology this is for
+     * @return a executor -> component-id map which needs scheduling in this topology.
+     */
+    Map<ExecutorDetails, String> getNeedsSchedulingExecutorToComponents(TopologyDetails topology);
+
+    /**
+     * @param topology the topology this is for
+     * @return a component-id -> executors map which needs scheduling in this topology.
+     */
+    Map<String, List<ExecutorDetails>> getNeedsSchedulingComponentToExecutors(
+        TopologyDetails topology);
+
+    /** Get all the used ports of this supervisor. */
+    Set<Integer> getUsedPorts(SupervisorDetails supervisor);
+
+    /** Return the available ports of this supervisor. */
+    Set<Integer> getAvailablePorts(SupervisorDetails supervisor);
+
+    /**
+     * Get the ports that are not blacklisted.
+     *
+     * @param supervisor the supervisor
+     * @return the ports that are not blacklisted
+     */
+    Set<Integer> getAssignablePorts(SupervisorDetails supervisor);
+
+    /** Return all the available slots on this supervisor. */
+    List<WorkerSlot> getAvailableSlots(SupervisorDetails supervisor);
+
+    /** Get all the available worker slots in the cluster. */
+    List<WorkerSlot> getAvailableSlots();
+
+    /**
+     * Return all non-blacklisted slots on this supervisor.
+     *
+     * @param supervisor the supervisor
+     * @return the non-blacklisted slots
+     */
+    List<WorkerSlot> getAssignableSlots(SupervisorDetails supervisor);
+
+    /** Get all non-blacklisted slots in the cluster. */
+    List<WorkerSlot> getAssignableSlots();
+
+    /** Get all currently occupied slots. */
+    Collection<WorkerSlot> getUsedSlots();
+
+    /**
+     * Check if a slot is occupied or not.
+     * @param slot the slot be to checked.
+     * @return true if the specified slot is occupied.
+     */
+    boolean isSlotOccupied(WorkerSlot slot);
+
+    /**
+     * Get the number of workers assigned to a topology.
+     * @param topology the topology this is for
+     * @return the number of workers assigned to this topology.
+     */
+    int getAssignedNumWorkers(TopologyDetails topology);
+
+    /**
+     * Would scheduling exec on ws fit? With a heap <= maxHeap total memory added <= memoryAvailable
+     * and cpu added <= cpuAvailable.
+     *
+     * @param ws the slot to put it in
+     * @param exec the executor to investigate
+     * @param td the topology detains for this executor
+     * @param maxHeap the maximum heap size for ws
+     * @param memoryAvailable the amount of memory available
+     * @param cpuAvailable the amount of CPU available
+     * @return true it fits else false
+     */
+    boolean wouldFit(
+        WorkerSlot ws,
+        ExecutorDetails exec,
+        TopologyDetails td,
+        double maxHeap,
+        double memoryAvailable,
+        double cpuAvailable);
+
+    /** get the current assignment for the topology. */
+    SchedulerAssignment getAssignmentById(String topologyId);
+
+    /** get slots used by a topology. */
+    Collection<WorkerSlot> getUsedSlotsByTopologyId(String topologyId);
+
+    /** Get a specific supervisor with the <code>nodeId</code>. */
+    SupervisorDetails getSupervisorById(String nodeId);
+
+    /**
+     * Get all the supervisors on the specified <code>host</code>.
+     *
+     * @param host hostname of the supervisor
+     * @return the <code>SupervisorDetails</code> object.
+     */
+    List<SupervisorDetails> getSupervisorsByHost(String host);
+
+    /** Get all the assignments. */
+    Map<String, SchedulerAssignment> getAssignments();
+
+    /** Get all the supervisors. */
+    Map<String, SupervisorDetails> getSupervisors();
+
+    /** Get the total amount of CPU resources in cluster. */
+    double getClusterTotalCpuResource();
+
+    /** Get the total amount of memory resources in cluster. */
+    double getClusterTotalMemoryResource();
+
+    /** Get the network topography (rackId -> nodes in the rack). */
+    Map<String, List<String>> getNetworkTopography();
+
+    /** Get all topology scheduler statuses. */
+    Map<String, String> getStatusMap();
+
+    /**
+     * Get the amount of resources used by topologies. Used for displaying resource information on the
+     * UI.
+     *
+     * @return a map that contains multiple topologies and the resources the topology requested and
+     *     assigned. Key: topology id Value: an array that describes the resources the topology
+     *     requested and assigned in the following format: {requestedMemOnHeap, requestedMemOffHeap,
+     *     requestedCpu, assignedMemOnHeap, assignedMemOffHeap, assignedCpu}
+     */
+    Map<String, TopologyResources> getTopologyResourcesMap();
+
+    /**
+     * Get the amount of used and free resources on a supervisor. Used for displaying resource
+     * information on the UI
+     *
+     * @return a map where the key is the supervisor id and the value is a map that represents
+     *     resource usage for a supervisor in the following format: {totalMem, totalCpu, usedMem,
+     *     usedCpu}
+     */
+    Map<String, SupervisorResources> getSupervisorsResourcesMap();
+
+    /**
+     * Gets the reference to the full topology->worker resource map.
+     *
+     * @return map of topology -> map of worker slot ->resources for that worker
+     */
+    Map<String, Map<WorkerSlot, WorkerResources>> getWorkerResourcesMap();
+
+    /**
+     * Get the resources for a given slot.
+     *
+     * @param ws the slot
+     * @return the resources currently assigned
+     */
+    WorkerResources getWorkerResources(WorkerSlot ws);
+
+    /**
+     * Get the total memory currently scheduled on a node.
+     *
+     * @param nodeId the id of the node
+     * @return the total memory currently scheduled on the node
+     */
+    double getScheduledMemoryForNode(String nodeId);
+
+    /**
+     * Get the total cpu currently scheduled on a node.
+     *
+     * @param nodeId the id of the node
+     * @return the total cpu currently scheduled on the node
+     */
+    double getScheduledCpuForNode(String nodeId);
+
+    /** Get the nimbus configuration. */
+    Map<String, Object> getConf();
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/b4d33955/storm-server/src/main/java/org/apache/storm/scheduler/ISupervisor.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/ISupervisor.java b/storm-server/src/main/java/org/apache/storm/scheduler/ISupervisor.java
new file mode 100644
index 0000000..4ecf28e
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/ISupervisor.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler;
+
+import java.util.Collection;
+import java.util.Map;
+
+public interface ISupervisor {
+    void prepare(Map<String, Object> topoConf, String schedulerLocalDir);
+
+    // for mesos, this is {hostname}-{topologyid}
+    /**
+     * The id used for writing metadata into ZK.
+     */
+    String getSupervisorId();
+
+    /**
+     * The id used in assignments. This combined with confirmAssigned decides what
+     * this supervisor is responsible for. The combination of this and getSupervisorId
+     * allows Nimbus to assign to a single machine and have multiple supervisors
+     * on that machine execute the assignment. This is important for achieving resource isolation.
+     */
+    String getAssignmentId();
+
+    Object getMetadata();
+    
+    boolean confirmAssigned(int port);
+
+    // calls this before actually killing the worker locally...
+    // sends a "task finished" update
+    void killedWorker(int port);
+
+    void assigned(Collection<Integer> ports);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/b4d33955/storm-server/src/main/java/org/apache/storm/scheduler/SingleTopologyCluster.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/SingleTopologyCluster.java b/storm-server/src/main/java/org/apache/storm/scheduler/SingleTopologyCluster.java
new file mode 100644
index 0000000..ee8ef00
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/SingleTopologyCluster.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler;
+
+/**
+ * A Cluster that only allows modification to a single topology.
+ */
+public class SingleTopologyCluster extends Cluster {
+    private final String allowedId;
+
+    /**
+     * Create a new cluster that only allows modifications to a single topology.
+     *
+     * @param other the current cluster to base this off of
+     * @param topologyId the topology that is allowed to be modified.
+     */
+    public SingleTopologyCluster(Cluster other, String topologyId) {
+        super(other);
+        allowedId = topologyId;
+    }
+
+    @Override
+    protected void assertValidTopologyForModification(String topologyId) {
+        //AllowedId is null in the constructor, so it can assign what it needs/etc.
+        if (allowedId != null && !allowedId.equals(topologyId)) {
+            throw new IllegalArgumentException(
+                "Only " + allowedId + " is allowed to be modified at this time.");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/b4d33955/storm-server/src/main/java/org/apache/storm/scheduler/Topologies.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/Topologies.java b/storm-server/src/main/java/org/apache/storm/scheduler/Topologies.java
new file mode 100644
index 0000000..16d230a
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/Topologies.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+
+public class Topologies implements Iterable<TopologyDetails> {
+    Map<String, TopologyDetails> topologies;
+    Map<String, String> nameToId;
+    Map<String, Map<String, Component>> allComponents;
+
+    private static Map<String, TopologyDetails> mkMap(TopologyDetails[] details) {
+        Map<String, TopologyDetails> ret = new HashMap<>();
+        for (TopologyDetails td : details) {
+            if (ret.put(td.getId(), td) != null) {
+                throw new IllegalArgumentException(
+                    "Cannot have multiple topologies with the id " + td.getId());
+            }
+        }
+        return ret;
+    }
+
+    /**
+     * Create a new Topologies from a list of TopologyDetails.
+     * @param details the list of details to use.
+     * @throws IllegalArgumentException if duplicate topology ids are found.
+     */
+    public Topologies(TopologyDetails... details) {
+        this(mkMap(details));
+    }
+
+    /**
+     * Create a new Topologies from a map of id to topology
+     * @param topologies a map of topology id to topology details.
+     */
+    public Topologies(Map<String, TopologyDetails> topologies) {
+        if (topologies == null) {
+            topologies = Collections.emptyMap();
+        }
+        this.topologies = new HashMap<>(topologies);
+        this.nameToId = new HashMap<>(topologies.size());
+
+        for (Map.Entry<String, TopologyDetails> entry : topologies.entrySet()) {
+            TopologyDetails topology = entry.getValue();
+            this.nameToId.put(topology.getName(), entry.getKey());
+        }
+    }
+
+    /**
+     * Copy constructor.
+     */
+    public Topologies(Topologies src) {
+        this(src.topologies);
+    }
+
+    public Collection<String> getAllIds() {
+        return topologies.keySet();
+    }
+
+    /**
+     * Get a topology given an ID
+     * @param topologyId the id of the topology to get
+     * @return the topology or null if it is not found.
+     */
+    public TopologyDetails getById(String topologyId) {
+        return topologies.get(topologyId);
+    }
+
+    /**
+     * Get a topology given a topology name. Nimbus prevents multiple topologies
+     * from having the same name, so this assumes it is true.
+     * @param topologyName the name of the topology to look for
+     * @return the a topology with the given name.
+     */
+    public TopologyDetails getByName(String topologyName) {
+        String topologyId = this.nameToId.get(topologyName);
+
+        if (topologyId == null) {
+            return null;
+        } else {
+            return this.getById(topologyId);
+        }
+    }
+
+    public Collection<TopologyDetails> getTopologies() {
+        return topologies.values();
+    }
+
+    /**
+     * Get all topologies submitted/owned by a given user.
+     * @param user the name of the user
+     * @return all of the topologies submitted by this user.
+     */
+    public Collection<TopologyDetails> getTopologiesOwnedBy(String user) {
+        HashSet<TopologyDetails> ret = new HashSet<>();
+        for (TopologyDetails td : this) {
+            if (user.equals(td.getTopologySubmitter())) {
+                ret.add(td);
+            }
+        }
+        return ret;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder ret = new StringBuilder();
+        ret.append("Topologies:\n");
+        for (TopologyDetails td : this.getTopologies()) {
+            ret.append(td.toString()).append("\n");
+        }
+        return ret.toString();
+    }
+
+    @Override
+    public Iterator<TopologyDetails> iterator() {
+        return topologies.values().iterator();
+    }
+}