You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/07/31 02:07:05 UTC
[06/20] storm git commit: STORM-2497: Let Supervisor enforce memory
and add in support for shared memory regions
http://git-wip-us.apache.org/repos/asf/storm/blob/b4d33955/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
new file mode 100644
index 0000000..a6ae4cc
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
@@ -0,0 +1,1050 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.scheduler;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.storm.Config;
+import org.apache.storm.daemon.nimbus.TopologyResources;
+import org.apache.storm.generated.SharedMemory;
+import org.apache.storm.generated.WorkerResources;
+import org.apache.storm.networktopography.DNSToSwitchMapping;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.ReflectionUtils;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Cluster implements ISchedulingState {
+ private static final Logger LOG = LoggerFactory.getLogger(Cluster.class);
+
+ public static class SupervisorResources {
+ private final double totalMem;
+ private final double totalCpu;
+ private final double usedMem;
+ private final double usedCpu;
+
+ /**
+ * Constructor for a Supervisor's resources.
+ *
+ * @param totalMem the total mem on the supervisor
+ * @param totalCpu the total CPU on the supervisor
+ * @param usedMem the used mem on the supervisor
+ * @param usedCpu the used CPU on the supervisor
+ */
+ public SupervisorResources(double totalMem, double totalCpu, double usedMem, double usedCpu) {
+ this.totalMem = totalMem;
+ this.totalCpu = totalCpu;
+ this.usedMem = usedMem;
+ this.usedCpu = usedCpu;
+ }
+
+ public double getUsedMem() {
+ return usedMem;
+ }
+
+ public double getUsedCpu() {
+ return usedCpu;
+ }
+
+ public double getTotalMem() {
+ return totalMem;
+ }
+
+ public double getTotalCpu() {
+ return totalCpu;
+ }
+
+ private SupervisorResources add(WorkerResources wr) {
+ return new SupervisorResources(
+ totalMem,
+ totalCpu,
+ usedMem + wr.get_mem_off_heap() + wr.get_mem_on_heap(),
+ usedCpu + wr.get_cpu());
+ }
+
+ public SupervisorResources addMem(Double value) {
+ return new SupervisorResources(totalMem, totalCpu, usedMem + value, usedCpu);
+ }
+ }
+
+ /**
+ * key: supervisor id, value: supervisor details.
+ */
+ private final Map<String, SupervisorDetails> supervisors = new HashMap<>();
+
+ /**
+ * key: rack, value: nodes in that rack.
+ */
+ private final Map<String, List<String>> networkTopography = new HashMap<>();
+
+ /**
+ * key: topologyId, value: topology's current assignments.
+ */
+ private final Map<String, SchedulerAssignmentImpl> assignments = new HashMap<>();
+
+ /**
+ * key topologyId, Value: scheduler's status.
+ */
+ private final Map<String, String> status = new HashMap<>();
+
+ /**
+ * A map from hostname to supervisor ids.
+ */
+ private final Map<String, List<String>> hostToId = new HashMap<>();
+
+ private final Map<String, Object> conf;
+
+ private Set<String> blackListedHosts = new HashSet<>();
+ private INimbus inimbus;
+ private final Topologies topologies;
+ private final Map<String, Double> scheduledCPUCache = new HashMap<>();
+ private final Map<String, Double> scheduledMemoryCache = new HashMap<>();
+
+ public Cluster(
+ INimbus nimbus,
+ Map<String, SupervisorDetails> supervisors,
+ Map<String, ? extends SchedulerAssignment> map,
+ Topologies topologies,
+ Map<String, Object> conf) {
+ this(nimbus, supervisors, map, topologies, conf, null, null, null);
+ }
+
+ /**
+ * Copy constructor.
+ */
+ public Cluster(Cluster src) {
+ this(
+ src.inimbus,
+ src.supervisors,
+ src.assignments,
+ src.topologies,
+ new HashMap<>(src.conf),
+ src.status,
+ src.blackListedHosts,
+ src.networkTopography);
+ }
+
+ /**
+ * Testing Constructor that takes an existing cluster and replaces the topologies in it.
+ *
+ * @param src the original cluster
+ * @param topologies the new topolgoies to use
+ */
+ @VisibleForTesting
+ public Cluster(Cluster src, Topologies topologies) {
+ this(
+ src.inimbus,
+ src.supervisors,
+ src.assignments,
+ topologies,
+ new HashMap<>(src.conf),
+ src.status,
+ src.blackListedHosts,
+ src.networkTopography);
+ }
+
+ private Cluster(
+ INimbus nimbus,
+ Map<String, SupervisorDetails> supervisors,
+ Map<String, ? extends SchedulerAssignment> assignments,
+ Topologies topologies,
+ Map<String, Object> conf,
+ Map<String, String> status,
+ Set<String> blackListedHosts,
+ Map<String, List<String>> networkTopography) {
+ this.inimbus = nimbus;
+ this.supervisors.putAll(supervisors);
+
+ for (Map.Entry<String, SupervisorDetails> entry : supervisors.entrySet()) {
+ String nodeId = entry.getKey();
+ SupervisorDetails supervisor = entry.getValue();
+ String host = supervisor.getHost();
+ List<String> ids = hostToId.get(host);
+ if (ids == null) {
+ ids = new ArrayList<>();
+ hostToId.put(host, ids);
+ }
+ ids.add(nodeId);
+ }
+ this.conf = conf;
+ this.topologies = topologies;
+
+ ArrayList<String> supervisorHostNames = new ArrayList<String>();
+ for (SupervisorDetails s : supervisors.values()) {
+ supervisorHostNames.add(s.getHost());
+ }
+
+ //Initialize the network topography
+ if (networkTopography == null || networkTopography.isEmpty()) {
+ //Initialize the network topography
+ String clazz = (String) conf.get(Config.STORM_NETWORK_TOPOGRAPHY_PLUGIN);
+ if (clazz != null && !clazz.isEmpty()) {
+ DNSToSwitchMapping topographyMapper =
+ (DNSToSwitchMapping) ReflectionUtils.newInstance(clazz);
+
+ Map<String, String> resolvedSuperVisors = topographyMapper.resolve(supervisorHostNames);
+ for (Map.Entry<String, String> entry : resolvedSuperVisors.entrySet()) {
+ String hostName = entry.getKey();
+ String rack = entry.getValue();
+ List<String> nodesForRack = this.networkTopography.get(rack);
+ if (nodesForRack == null) {
+ nodesForRack = new ArrayList<String>();
+ this.networkTopography.put(rack, nodesForRack);
+ }
+ nodesForRack.add(hostName);
+ }
+ }
+ } else {
+ this.networkTopography.putAll(networkTopography);
+ }
+
+ if (status != null) {
+ this.status.putAll(status);
+ }
+
+ if (blackListedHosts != null) {
+ this.blackListedHosts.addAll(blackListedHosts);
+ }
+
+ setAssignments(assignments, true);
+ }
+
+ /**
+ * Check if the given topology is allowed for modification right now. If not throw an
+ * IllegalArgumentException else go on.
+ *
+ * @param topologyId the id of the topology to check
+ */
+ protected void assertValidTopologyForModification(String topologyId) {
+ //NOOP
+ }
+
+ /**
+ * Set the list of hosts that are blacklisted.
+ *
+ * @param hosts the new hosts that are blacklisted.
+ */
+ public void setBlacklistedHosts(Set<String> hosts) {
+ if (hosts == blackListedHosts) {
+ //NOOP
+ return;
+ }
+ blackListedHosts.clear();
+ blackListedHosts.addAll(hosts);
+ }
+
+ @Override
+ public Topologies getTopologies() {
+ return topologies;
+ }
+
+ @Override
+ public Set<String> getBlacklistedHosts() {
+ return blackListedHosts;
+ }
+
+ public void blacklistHost(String host) {
+ blackListedHosts.add(host);
+ }
+
+ @Override
+ public boolean isBlackListed(String supervisorId) {
+ return blackListedHosts.contains(getHost(supervisorId));
+ }
+
+ @Override
+ public boolean isBlacklistedHost(String host) {
+ return blackListedHosts.contains(host);
+ }
+
+ @Override
+ public String getHost(String supervisorId) {
+ return inimbus.getHostName(supervisors, supervisorId);
+ }
+
+ @Override
+ public List<TopologyDetails> needsSchedulingTopologies() {
+ List<TopologyDetails> ret = new ArrayList<TopologyDetails>();
+ for (TopologyDetails topology : getTopologies()) {
+ if (needsScheduling(topology)) {
+ ret.add(topology);
+ }
+ }
+
+ return ret;
+ }
+
+ @Override
+ public boolean needsScheduling(TopologyDetails topology) {
+ int desiredNumWorkers = topology.getNumWorkers();
+ int assignedNumWorkers = this.getAssignedNumWorkers(topology);
+ return desiredNumWorkers > assignedNumWorkers || getUnassignedExecutors(topology).size() > 0;
+ }
+
+ @Override
+ public boolean needsSchedulingRas(TopologyDetails topology) {
+ return getUnassignedExecutors(topology).size() > 0;
+ }
+
+ @Override
+ public Map<ExecutorDetails, String> getNeedsSchedulingExecutorToComponents(
+ TopologyDetails topology) {
+ Collection<ExecutorDetails> allExecutors = new HashSet<>(topology.getExecutors());
+
+ SchedulerAssignment assignment = assignments.get(topology.getId());
+ if (assignment != null) {
+ allExecutors.removeAll(assignment.getExecutors());
+ }
+ return topology.selectExecutorToComponent(allExecutors);
+ }
+
+ @Override
+ public Map<String, List<ExecutorDetails>> getNeedsSchedulingComponentToExecutors(
+ TopologyDetails topology) {
+ Map<ExecutorDetails, String> executorToComponents =
+ getNeedsSchedulingExecutorToComponents(topology);
+ Map<String, List<ExecutorDetails>> componentToExecutors = new HashMap<>();
+ for (Map.Entry<ExecutorDetails, String> entry : executorToComponents.entrySet()) {
+ ExecutorDetails executor = entry.getKey();
+ String component = entry.getValue();
+ if (!componentToExecutors.containsKey(component)) {
+ componentToExecutors.put(component, new ArrayList<>());
+ }
+
+ componentToExecutors.get(component).add(executor);
+ }
+
+ return componentToExecutors;
+ }
+
+ @Override
+ public Set<Integer> getUsedPorts(SupervisorDetails supervisor) {
+ Set<Integer> usedPorts = new HashSet<>();
+
+ for (SchedulerAssignment assignment : assignments.values()) {
+ for (WorkerSlot slot : assignment.getExecutorToSlot().values()) {
+ if (slot.getNodeId().equals(supervisor.getId())) {
+ usedPorts.add(slot.getPort());
+ }
+ }
+ }
+
+ return usedPorts;
+ }
+
+ @Override
+ public Set<Integer> getAvailablePorts(SupervisorDetails supervisor) {
+ Set<Integer> usedPorts = this.getUsedPorts(supervisor);
+
+ Set<Integer> ret = new HashSet<>();
+ ret.addAll(getAssignablePorts(supervisor));
+ ret.removeAll(usedPorts);
+
+ return ret;
+ }
+
+ @Override
+ public Set<Integer> getAssignablePorts(SupervisorDetails supervisor) {
+ if (isBlackListed(supervisor.id)) {
+ return Collections.emptySet();
+ }
+ return supervisor.allPorts;
+ }
+
+ @Override
+ public List<WorkerSlot> getAvailableSlots() {
+ List<WorkerSlot> slots = new ArrayList<WorkerSlot>();
+ for (SupervisorDetails supervisor : this.supervisors.values()) {
+ slots.addAll(this.getAvailableSlots(supervisor));
+ }
+
+ return slots;
+ }
+
+ @Override
+ public List<WorkerSlot> getAvailableSlots(SupervisorDetails supervisor) {
+ Set<Integer> ports = this.getAvailablePorts(supervisor);
+ List<WorkerSlot> slots = new ArrayList<WorkerSlot>(ports.size());
+
+ for (Integer port : ports) {
+ slots.add(new WorkerSlot(supervisor.getId(), port));
+ }
+
+ return slots;
+ }
+
+ @Override
+ public List<WorkerSlot> getAssignableSlots(SupervisorDetails supervisor) {
+ Set<Integer> ports = this.getAssignablePorts(supervisor);
+ List<WorkerSlot> slots = new ArrayList<WorkerSlot>(ports.size());
+
+ for (Integer port : ports) {
+ slots.add(new WorkerSlot(supervisor.getId(), port));
+ }
+
+ return slots;
+ }
+
+ @Override
+ public List<WorkerSlot> getAssignableSlots() {
+ List<WorkerSlot> slots = new ArrayList<WorkerSlot>();
+ for (SupervisorDetails supervisor : this.supervisors.values()) {
+ slots.addAll(this.getAssignableSlots(supervisor));
+ }
+
+ return slots;
+ }
+
+ @Override
+ public Collection<ExecutorDetails> getUnassignedExecutors(TopologyDetails topology) {
+ if (topology == null) {
+ return new ArrayList<ExecutorDetails>(0);
+ }
+
+ Collection<ExecutorDetails> ret = new HashSet<>(topology.getExecutors());
+
+ SchedulerAssignment assignment = getAssignmentById(topology.getId());
+ if (assignment != null) {
+ Set<ExecutorDetails> assignedExecutors = assignment.getExecutors();
+ ret.removeAll(assignedExecutors);
+ }
+
+ return ret;
+ }
+
+ @Override
+ public int getAssignedNumWorkers(TopologyDetails topology) {
+ SchedulerAssignment assignment =
+ topology != null ? this.getAssignmentById(topology.getId()) : null;
+ if (assignment == null) {
+ return 0;
+ }
+
+ Set<WorkerSlot> slots = new HashSet<WorkerSlot>();
+ slots.addAll(assignment.getExecutorToSlot().values());
+ return slots.size();
+ }
+
+ private WorkerResources calculateWorkerResources(
+ TopologyDetails td, Collection<ExecutorDetails> executors) {
+ double onHeapMem = 0.0;
+ double offHeapMem = 0.0;
+ double cpu = 0.0;
+ double sharedOn = 0.0;
+ double sharedOff = 0.0;
+ for (ExecutorDetails exec : executors) {
+ Double onHeapMemForExec = td.getOnHeapMemoryRequirement(exec);
+ if (onHeapMemForExec != null) {
+ onHeapMem += onHeapMemForExec;
+ }
+ Double offHeapMemForExec = td.getOffHeapMemoryRequirement(exec);
+ if (offHeapMemForExec != null) {
+ offHeapMem += offHeapMemForExec;
+ }
+ Double cpuForExec = td.getTotalCpuReqTask(exec);
+ if (cpuForExec != null) {
+ cpu += cpuForExec;
+ }
+ }
+
+ for (SharedMemory shared : td.getSharedMemoryRequests(executors)) {
+ onHeapMem += shared.get_on_heap();
+ sharedOn += shared.get_on_heap();
+ offHeapMem += shared.get_off_heap_worker();
+ sharedOff += shared.get_off_heap_worker();
+ }
+
+ WorkerResources ret = new WorkerResources();
+ ret.set_cpu(cpu);
+ ret.set_mem_on_heap(onHeapMem);
+ ret.set_mem_off_heap(offHeapMem);
+ ret.set_shared_mem_on_heap(sharedOn);
+ ret.set_shared_mem_off_heap(sharedOff);
+ return ret;
+ }
+
+ @Override
+ public boolean wouldFit(
+ WorkerSlot ws,
+ ExecutorDetails exec,
+ TopologyDetails td,
+ double maxHeap,
+ double memoryAvailable,
+ double cpuAvailable) {
+ //NOTE this is called lots and lots by schedulers, so anything we can do to make it faster is going to help a lot.
+ //CPU is simplest because it does not have odd interactions.
+ double cpuNeeded = td.getTotalCpuReqTask(exec);
+ if (cpuNeeded > cpuAvailable) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Could not schedule {}:{} on {} not enough CPU {} > {}",
+ td.getName(),
+ exec,
+ ws,
+ cpuNeeded,
+ cpuAvailable);
+ }
+ //Not enough CPU no need to try any more
+ return false;
+ }
+
+ double currentTotal = 0.0;
+ double afterTotal = 0.0;
+ double afterOnHeap = 0.0;
+ Set<ExecutorDetails> wouldBeAssigned = new HashSet<>();
+ wouldBeAssigned.add(exec);
+ SchedulerAssignmentImpl assignment = assignments.get(td.getId());
+ if (assignment != null) {
+ Collection<ExecutorDetails> currentlyAssigned = assignment.getSlotToExecutors().get(ws);
+ if (currentlyAssigned != null) {
+ wouldBeAssigned.addAll(currentlyAssigned);
+ WorkerResources wrCurrent = calculateWorkerResources(td, currentlyAssigned);
+ currentTotal = wrCurrent.get_mem_off_heap() + wrCurrent.get_mem_on_heap();
+ }
+ WorkerResources wrAfter = calculateWorkerResources(td, wouldBeAssigned);
+ afterTotal = wrAfter.get_mem_off_heap() + wrAfter.get_mem_on_heap();
+ afterOnHeap = wrAfter.get_mem_on_heap();
+
+ currentTotal += calculateSharedOffHeapMemory(ws.getNodeId(), assignment);
+ afterTotal += calculateSharedOffHeapMemory(ws.getNodeId(), assignment, exec);
+ }
+
+ double memoryAdded = afterTotal - currentTotal;
+ if (memoryAdded > memoryAvailable) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Could not schedule {}:{} on {} not enough Mem {} > {}",
+ td.getName(),
+ exec,
+ ws,
+ memoryAdded,
+ memoryAvailable);
+ }
+ return false;
+ }
+ if (afterOnHeap > maxHeap) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Could not schedule {}:{} on {} HEAP would be too large {} > {}",
+ td.getName(),
+ exec,
+ ws,
+ afterOnHeap,
+ maxHeap);
+ }
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Assign the slot to the executors for this topology.
+ *
+ * @throws RuntimeException if the specified slot is already occupied.
+ */
+ public void assign(WorkerSlot slot, String topologyId, Collection<ExecutorDetails> executors) {
+ assertValidTopologyForModification(topologyId);
+ if (isSlotOccupied(slot)) {
+ throw new RuntimeException(
+ "slot: [" + slot.getNodeId() + ", " + slot.getPort() + "] is already occupied.");
+ }
+
+ TopologyDetails td = topologies.getById(topologyId);
+ if (td == null) {
+ throw new IllegalArgumentException(
+ "Trying to schedule for topo "
+ + topologyId
+ + " but that is not a known topology "
+ + topologies.getAllIds());
+ }
+ WorkerResources resources = calculateWorkerResources(td, executors);
+ SchedulerAssignmentImpl assignment = assignments.get(topologyId);
+ if (assignment == null) {
+ assignment = new SchedulerAssignmentImpl(topologyId);
+ assignments.put(topologyId, assignment);
+ } else {
+ for (ExecutorDetails executor : executors) {
+ if (assignment.isExecutorAssigned(executor)) {
+ throw new RuntimeException(
+ "Attempting to assign executor: "
+ + executor
+ + " of topology: "
+ + topologyId
+ + " to workerslot: "
+ + slot
+ + ". The executor is already assigned to workerslot: "
+ + assignment.getExecutorToSlot().get(executor)
+ + ". The executor must unassigned before it can be assigned to another slot!");
+ }
+ }
+ }
+
+ assignment.assign(slot, executors, resources);
+ String nodeId = slot.getNodeId();
+ assignment.setTotalSharedOffHeapMemory(
+ nodeId, calculateSharedOffHeapMemory(nodeId, assignment));
+ scheduledCPUCache.remove(nodeId);
+ scheduledMemoryCache.remove(nodeId);
+ }
+
+ /**
+ * Assign everything for the given topology.
+ *
+ * @param assignment the new assignment to make
+ */
+ public void assign(SchedulerAssignment assignment, boolean ignoreSingleExceptions) {
+ String id = assignment.getTopologyId();
+ assertValidTopologyForModification(id);
+ Map<WorkerSlot, Collection<ExecutorDetails>> slotToExecs = assignment.getSlotToExecutors();
+ for (Entry<WorkerSlot, Collection<ExecutorDetails>> entry : slotToExecs.entrySet()) {
+ try {
+ assign(entry.getKey(), id, entry.getValue());
+ } catch (RuntimeException e) {
+ if (!ignoreSingleExceptions) {
+ throw e;
+ }
+ }
+ }
+ }
+
+ /**
+ * Calculate the amount of shared off heap memory on a given nodes with the given assignment.
+ *
+ * @param nodeId the id of the node
+ * @param assignment the current assignment
+ * @return the amount of shared off heap memory for that node in MB
+ */
+ private double calculateSharedOffHeapMemory(String nodeId, SchedulerAssignmentImpl assignment) {
+ return calculateSharedOffHeapMemory(nodeId, assignment, null);
+ }
+
+ private double calculateSharedOffHeapMemory(
+ String nodeId, SchedulerAssignmentImpl assignment, ExecutorDetails extra) {
+ TopologyDetails td = topologies.getById(assignment.getTopologyId());
+ Set<ExecutorDetails> executorsOnNode = new HashSet<>();
+ for (Entry<WorkerSlot, Collection<ExecutorDetails>> entry :
+ assignment.getSlotToExecutors().entrySet()) {
+ if (nodeId.equals(entry.getKey().getNodeId())) {
+ executorsOnNode.addAll(entry.getValue());
+ }
+ }
+ if (extra != null) {
+ executorsOnNode.add(extra);
+ }
+ double memorySharedWithinNode = 0.0;
+ //Now check for overlap on the node
+ for (SharedMemory shared : td.getSharedMemoryRequests(executorsOnNode)) {
+ memorySharedWithinNode += shared.get_off_heap_node();
+ }
+ return memorySharedWithinNode;
+ }
+
+ /**
+ * Free the specified slot.
+ *
+ * @param slot the slot to free
+ */
+ public void freeSlot(WorkerSlot slot) {
+ // remove the slot from the existing assignments
+ for (SchedulerAssignmentImpl assignment : assignments.values()) {
+ if (assignment.isSlotOccupied(slot)) {
+ assertValidTopologyForModification(assignment.getTopologyId());
+ assignment.unassignBySlot(slot);
+
+ String nodeId = slot.getNodeId();
+ assignment.setTotalSharedOffHeapMemory(
+ nodeId, calculateSharedOffHeapMemory(nodeId, assignment));
+ scheduledCPUCache.remove(nodeId);
+ scheduledMemoryCache.remove(nodeId);
+ }
+ }
+ }
+
+ /**
+ * free the slots.
+ *
+ * @param slots multiple slots to free
+ */
+ public void freeSlots(Collection<WorkerSlot> slots) {
+ if (slots != null) {
+ for (WorkerSlot slot : slots) {
+ freeSlot(slot);
+ }
+ }
+ }
+
+ @Override
+ public boolean isSlotOccupied(WorkerSlot slot) {
+ for (SchedulerAssignment assignment : assignments.values()) {
+ if (assignment.isSlotOccupied(slot)) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ @Override
+ public SchedulerAssignment getAssignmentById(String topologyId) {
+ if (assignments.containsKey(topologyId)) {
+ return assignments.get(topologyId);
+ }
+
+ return null;
+ }
+
+ @Override
+ public Collection<WorkerSlot> getUsedSlotsByTopologyId(String topologyId) {
+ SchedulerAssignmentImpl assignment = assignments.get(topologyId);
+ if (assignment == null) {
+ return Collections.emptySet();
+ }
+ return assignment.getSlots();
+ }
+
+ @Override
+ public SupervisorDetails getSupervisorById(String nodeId) {
+ return supervisors.get(nodeId);
+ }
+
+ @Override
+ public Collection<WorkerSlot> getUsedSlots() {
+ Set<WorkerSlot> ret = new HashSet<>();
+ for (SchedulerAssignmentImpl s : assignments.values()) {
+ ret.addAll(s.getExecutorToSlot().values());
+ }
+ return ret;
+ }
+
+ @Override
+ public List<SupervisorDetails> getSupervisorsByHost(String host) {
+ List<String> nodeIds = this.hostToId.get(host);
+ List<SupervisorDetails> ret = new ArrayList<SupervisorDetails>();
+
+ if (nodeIds != null) {
+ for (String nodeId : nodeIds) {
+ ret.add(this.getSupervisorById(nodeId));
+ }
+ }
+
+ return ret;
+ }
+
+ @Override
+ public Map<String, SchedulerAssignment> getAssignments() {
+ return new HashMap<String, SchedulerAssignment>(assignments);
+ }
+
+ /**
+ * Set assignments for cluster.
+ */
+ public void setAssignments(
+ Map<String, ? extends SchedulerAssignment> newAssignments, boolean ignoreSingleExceptions) {
+ if (newAssignments == assignments) {
+ //NOOP
+ return;
+ }
+ for (SchedulerAssignment assignment : newAssignments.values()) {
+ assertValidTopologyForModification(assignment.getTopologyId());
+ }
+ for (SchedulerAssignment assignment : assignments.values()) {
+ assertValidTopologyForModification(assignment.getTopologyId());
+ }
+ assignments.clear();
+ for (SchedulerAssignment assignment : newAssignments.values()) {
+ assign(assignment, ignoreSingleExceptions);
+ }
+ }
+
+ @Override
+ public Map<String, SupervisorDetails> getSupervisors() {
+ return this.supervisors;
+ }
+
+ @Override
+ public double getClusterTotalCpuResource() {
+ double sum = 0.0;
+ for (SupervisorDetails sup : supervisors.values()) {
+ sum += sup.getTotalCPU();
+ }
+ return sum;
+ }
+
+ @Override
+ public double getClusterTotalMemoryResource() {
+ double sum = 0.0;
+ for (SupervisorDetails sup : supervisors.values()) {
+ sum += sup.getTotalMemory();
+ }
+ return sum;
+ }
+
+ @Override
+ public Map<String, List<String>> getNetworkTopography() {
+ return networkTopography;
+ }
+
+ @VisibleForTesting
+ public void setNetworkTopography(Map<String, List<String>> networkTopography) {
+ this.networkTopography.clear();
+ this.networkTopography.putAll(networkTopography);
+ }
+
+ /**
+ * Get heap memory usage for a worker's main process and logwriter process.
+ * @param topConf - the topology config
+ * @return the assigned memory (in MB)
+ */
+ public static Double getAssignedMemoryForSlot(final Map<String, Object> topConf) {
+ Double totalWorkerMemory = 0.0;
+ final Integer topologyWorkerDefaultMemoryAllocation = 768;
+
+ List<String> topologyWorkerGcChildopts = ConfigUtils.getValueAsList(
+ Config.TOPOLOGY_WORKER_GC_CHILDOPTS, topConf);
+ List<String> workerGcChildopts = ConfigUtils.getValueAsList(
+ Config.WORKER_GC_CHILDOPTS, topConf);
+ Double memGcChildopts = null;
+ memGcChildopts = Utils.parseJvmHeapMemByChildOpts(
+ topologyWorkerGcChildopts, null);
+ if (memGcChildopts == null) {
+ memGcChildopts = Utils.parseJvmHeapMemByChildOpts(
+ workerGcChildopts, null);
+ }
+
+ List<String> topologyWorkerChildopts = ConfigUtils.getValueAsList(
+ Config.TOPOLOGY_WORKER_CHILDOPTS, topConf);
+ Double memTopologyWorkerChildopts = Utils.parseJvmHeapMemByChildOpts(
+ topologyWorkerChildopts, null);
+
+ List<String> workerChildopts = ConfigUtils.getValueAsList(
+ Config.WORKER_CHILDOPTS, topConf);
+ Double memWorkerChildopts = Utils.parseJvmHeapMemByChildOpts(
+ workerChildopts, null);
+
+ if (memGcChildopts != null) {
+ totalWorkerMemory += memGcChildopts;
+ } else if (memTopologyWorkerChildopts != null) {
+ totalWorkerMemory += memTopologyWorkerChildopts;
+ } else if (memWorkerChildopts != null) {
+ totalWorkerMemory += memWorkerChildopts;
+ } else {
+ Object workerHeapMemoryMb = topConf.get(
+ Config.WORKER_HEAP_MEMORY_MB);
+ totalWorkerMemory += ObjectReader.getInt(
+ workerHeapMemoryMb, topologyWorkerDefaultMemoryAllocation);
+ }
+
+ List<String> topoWorkerLwChildopts = ConfigUtils.getValueAsList(
+ Config.TOPOLOGY_WORKER_LOGWRITER_CHILDOPTS, topConf);
+ if (topoWorkerLwChildopts != null) {
+ totalWorkerMemory += Utils.parseJvmHeapMemByChildOpts(
+ topoWorkerLwChildopts, 0.0);
+ }
+ return totalWorkerMemory;
+ }
+
+ /**
+ * set scheduler status for a topology.
+ */
+ public void setStatus(String topologyId, String statusMessage) {
+ assertValidTopologyForModification(topologyId);
+ LOG.info("STATUS - {} {}", topologyId, statusMessage);
+ status.put(topologyId, statusMessage);
+ }
+
+ public void setStatusIfAbsent(String topologyId, String statusMessage) {
+ assertValidTopologyForModification(topologyId);
+ status.putIfAbsent(topologyId, statusMessage);
+ }
+
+ @Override
+ public Map<String, String> getStatusMap() {
+ return status;
+ }
+
+ public String getStatus(String topoId) {
+ return status.get(topoId);
+ }
+
+ /**
+ * set scheduler status map.
+ */
+ public void setStatusMap(Map<String, String> statusMap) {
+ if (statusMap == this.status) {
+ return; //This is a NOOP
+ }
+ for (String topologyId : statusMap.keySet()) {
+ assertValidTopologyForModification(topologyId);
+ }
+ for (String topologyId : status.keySet()) {
+ assertValidTopologyForModification(topologyId);
+ }
+ this.status.clear();
+ this.status.putAll(statusMap);
+ }
+
+ @Override
+ public Map<String, TopologyResources> getTopologyResourcesMap() {
+ Map<String, TopologyResources> ret = new HashMap<>(assignments.size());
+ for (TopologyDetails td : topologies.getTopologies()) {
+ String topoId = td.getId();
+ SchedulerAssignmentImpl assignment = assignments.get(topoId);
+ ret.put(topoId, new TopologyResources(td, assignment));
+ }
+ return ret;
+ }
+
+ @Override
+ public Map<String, SupervisorResources> getSupervisorsResourcesMap() {
+ Map<String, SupervisorResources> ret = new HashMap<>();
+ for (SupervisorDetails sd : supervisors.values()) {
+ ret.put(sd.getId(), new SupervisorResources(sd.getTotalMemory(), sd.getTotalMemory(), 0, 0));
+ }
+ for (SchedulerAssignmentImpl assignment : assignments.values()) {
+ for (Entry<WorkerSlot, WorkerResources> entry :
+ assignment.getScheduledResources().entrySet()) {
+ String id = entry.getKey().getNodeId();
+ SupervisorResources sr = ret.get(id);
+ if (sr == null) {
+ sr = new SupervisorResources(0, 0, 0, 0);
+ }
+ sr = sr.add(entry.getValue());
+ ret.put(id, sr);
+ }
+ Map<String, Double> nodeIdToSharedOffHeap = assignment.getNodeIdToTotalSharedOffHeapMemory();
+ if (nodeIdToSharedOffHeap != null) {
+ for (Entry<String, Double> entry : nodeIdToSharedOffHeap.entrySet()) {
+ String id = entry.getKey();
+ SupervisorResources sr = ret.get(id);
+ if (sr == null) {
+ sr = new SupervisorResources(0, 0, 0, 0);
+ }
+ sr = sr.addMem(entry.getValue());
+ ret.put(id, sr);
+ }
+ }
+ }
+ return ret;
+ }
+
+ @Override
+ public Map<String, Map<WorkerSlot, WorkerResources>> getWorkerResourcesMap() {
+ HashMap<String, Map<WorkerSlot, WorkerResources>> ret = new HashMap<>();
+ for (Entry<String, SchedulerAssignmentImpl> entry : assignments.entrySet()) {
+ ret.put(entry.getKey(), entry.getValue().getScheduledResources());
+ }
+ return ret;
+ }
+
+ @Override
+ public WorkerResources getWorkerResources(WorkerSlot ws) {
+ WorkerResources ret = null;
+ for (SchedulerAssignmentImpl assignment : assignments.values()) {
+ ret = assignment.getScheduledResources().get(ws);
+ if (ret != null) {
+ break;
+ }
+ }
+ return ret;
+ }
+
+ @Override
+ public double getScheduledMemoryForNode(String nodeId) {
+ Double ret = scheduledMemoryCache.get(nodeId);
+ if (ret != null) {
+ return ret;
+ }
+ double totalMemory = 0.0;
+ for (SchedulerAssignmentImpl assignment : assignments.values()) {
+ for (Entry<WorkerSlot, WorkerResources> entry :
+ assignment.getScheduledResources().entrySet()) {
+ if (nodeId.equals(entry.getKey().getNodeId())) {
+ WorkerResources resources = entry.getValue();
+ totalMemory += resources.get_mem_off_heap() + resources.get_mem_on_heap();
+ }
+ }
+ Double sharedOffHeap = assignment.getNodeIdToTotalSharedOffHeapMemory().get(nodeId);
+ if (sharedOffHeap != null) {
+ totalMemory += sharedOffHeap;
+ }
+ }
+ scheduledMemoryCache.put(nodeId, totalMemory);
+ return totalMemory;
+ }
+
+ @Override
+ public double getScheduledCpuForNode(String nodeId) {
+ Double ret = scheduledCPUCache.get(nodeId);
+ if (ret != null) {
+ return ret;
+ }
+ double totalCpu = 0.0;
+ for (SchedulerAssignmentImpl assignment : assignments.values()) {
+ for (Entry<WorkerSlot, WorkerResources> entry :
+ assignment.getScheduledResources().entrySet()) {
+ if (nodeId.equals(entry.getKey().getNodeId())) {
+ WorkerResources resources = entry.getValue();
+ totalCpu += resources.get_cpu();
+ }
+ }
+ }
+ scheduledCPUCache.put(nodeId, totalCpu);
+ return totalCpu;
+ }
+
+ public INimbus getINimbus() {
+ return this.inimbus;
+ }
+
+ @Override
+ public Map<String, Object> getConf() {
+ return this.conf;
+ }
+
+ /**
+ * Unassign everything for the given topology id.
+ *
+ * @param topoId the is of the topology to unassign
+ */
+ public void unassign(String topoId) {
+ assertValidTopologyForModification(topoId);
+ freeSlots(getUsedSlotsByTopologyId(topoId));
+ }
+
+ /**
+ * Update the assignments and status from the other cluster.
+ *
+ * @param other the cluster to get the assignments and status from
+ */
+ public void updateFrom(Cluster other) {
+ for (SchedulerAssignment assignment : other.getAssignments().values()) {
+ assertValidTopologyForModification(assignment.getTopologyId());
+ }
+ setAssignments(other.getAssignments(), false);
+ setStatusMap(other.getStatusMap());
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/b4d33955/storm-server/src/main/java/org/apache/storm/scheduler/Component.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/Component.java b/storm-server/src/main/java/org/apache/storm/scheduler/Component.java
new file mode 100644
index 0000000..6a0de72
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/Component.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.storm.generated.ComponentType;
+
+public class Component {
+ private final String id;
+ private final List<ExecutorDetails> execs;
+ private final ComponentType type;
+ private final Set<String> parents = new HashSet<>();
+ private final Set<String> children = new HashSet<>();
+
+ /**
+ * Create a new component.
+ * @param type the type of component this is
+ * @param compId the id of the component
+ * @param execs the executors for this component.
+ */
+ public Component(ComponentType type, String compId, List<ExecutorDetails> execs) {
+ this.type = type;
+ this.id = compId;
+ this.execs = execs;
+ }
+
+ /**
+ * Add a child link.
+ *
+ * @param child a child the consumes from this
+ */
+ public void addChild(Component child) {
+ children.add(child.getId());
+ child.parents.add(id);
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public List<ExecutorDetails> getExecs() {
+ return execs;
+ }
+
+ public ComponentType getType() {
+ return type;
+ }
+
+ public Set<String> getParents() {
+ return parents;
+ }
+
+ public Set<String> getChildren() {
+ return children;
+ }
+
+ @Override
+ public String toString() {
+ return "{id: "
+ + getId()
+ + " Parents: "
+ + getParents()
+ + " Children: "
+ + getChildren()
+ + " Execs: "
+ + getExecs()
+ + "}";
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/b4d33955/storm-server/src/main/java/org/apache/storm/scheduler/DefaultScheduler.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/DefaultScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/DefaultScheduler.java
index 3908416..5ef2ce5 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/DefaultScheduler.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/DefaultScheduler.java
@@ -70,8 +70,7 @@ public class DefaultScheduler implements IScheduler {
}
public static void defaultSchedule(Topologies topologies, Cluster cluster) {
- List<TopologyDetails> needsSchedulingTopologies = cluster.needsSchedulingTopologies(topologies);
- for (TopologyDetails topology : needsSchedulingTopologies) {
+ for (TopologyDetails topology : cluster.needsSchedulingTopologies()) {
List<WorkerSlot> availableSlots = cluster.getAvailableSlots();
Set<ExecutorDetails> allExecutors = topology.getExecutors();
@@ -89,12 +88,10 @@ public class DefaultScheduler implements IScheduler {
badSlots = badSlots(aliveAssigned, allExecutors.size(), totalSlotsToUse);
}
if (badSlots != null) {
- cluster.freeSlots(badSlots);
+ cluster.freeSlots(badSlots);
}
- Map<String, TopologyDetails> _topologies = new HashMap<String, TopologyDetails>();
- _topologies.put(topology.getId(), topology);
- EvenScheduler.scheduleTopologiesEvenly(new Topologies(_topologies), cluster);
+ EvenScheduler.scheduleTopologiesEvenly(new Topologies(topology), cluster);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/b4d33955/storm-server/src/main/java/org/apache/storm/scheduler/EvenScheduler.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/EvenScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/EvenScheduler.java
index 3ad2648..d4dfd94 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/EvenScheduler.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/EvenScheduler.java
@@ -15,8 +15,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.storm.scheduler;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Sets;
+
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
@@ -32,11 +36,10 @@ import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.Sets;
-
public class EvenScheduler implements IScheduler {
private static final Logger LOG = LoggerFactory.getLogger(EvenScheduler.class);
+ @VisibleForTesting
public static List<WorkerSlot> sortSlots(List<WorkerSlot> availableSlots) {
//For example, we have a three nodes(supervisor1, supervisor2, supervisor3) cluster:
//slots before sort:
@@ -51,15 +54,15 @@ public class EvenScheduler implements IScheduler {
if (availableSlots != null && availableSlots.size() > 0) {
// group by node
- Map<String, List<WorkerSlot>> slotGroups = new TreeMap<String, List<WorkerSlot>>();
+ Map<String, List<WorkerSlot>> slotGroups = new TreeMap<>();
for (WorkerSlot slot : availableSlots) {
String node = slot.getNodeId();
List<WorkerSlot> slots = null;
- if(slotGroups.containsKey(node)){
- slots = slotGroups.get(node);
- }else{
- slots = new ArrayList<WorkerSlot>();
- slotGroups.put(node, slots);
+ if (slotGroups.containsKey(node)) {
+ slots = slotGroups.get(node);
+ } else {
+ slots = new ArrayList<WorkerSlot>();
+ slotGroups.put(node, slots);
}
slots.add(slot);
}
@@ -112,7 +115,8 @@ public class EvenScheduler implements IScheduler {
}
//allow requesting slots number bigger than available slots
- int toIndex = (totalSlotsToUse - aliveAssigned.size()) > sortedList.size() ? sortedList.size() : (totalSlotsToUse - aliveAssigned.size());
+ int toIndex = (totalSlotsToUse - aliveAssigned.size())
+ > sortedList.size() ? sortedList.size() : (totalSlotsToUse - aliveAssigned.size());
List<WorkerSlot> reassignSlots = sortedList.subList(0, toIndex);
Set<ExecutorDetails> aliveExecutors = new HashSet<ExecutorDetails>();
@@ -145,8 +149,7 @@ public class EvenScheduler implements IScheduler {
}
public static void scheduleTopologiesEvenly(Topologies topologies, Cluster cluster) {
- List<TopologyDetails> needsSchedulingTopologies = cluster.needsSchedulingTopologies(topologies);
- for (TopologyDetails topology : needsSchedulingTopologies) {
+ for (TopologyDetails topology : cluster.needsSchedulingTopologies()) {
String topologyId = topology.getId();
Map<ExecutorDetails, WorkerSlot> newAssignment = scheduleTopology(topology, cluster);
Map<WorkerSlot, List<ExecutorDetails>> nodePortToExecutors = Utils.reverseMap(newAssignment);
http://git-wip-us.apache.org/repos/asf/storm/blob/b4d33955/storm-server/src/main/java/org/apache/storm/scheduler/INimbus.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/INimbus.java b/storm-server/src/main/java/org/apache/storm/scheduler/INimbus.java
new file mode 100644
index 0000000..db2fed9
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/INimbus.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+public interface INimbus {
+ void prepare(Map<String, Object> topoConf, String schedulerLocalDir);
+
+ /**
+ * Returns all slots that are available for the next round of scheduling. A slot is available for scheduling
+ * if it is free and can be assigned to, or if it is used and can be reassigned.
+ */
+ Collection<WorkerSlot> allSlotsAvailableForScheduling(Collection<SupervisorDetails> existingSupervisors,
+ Topologies topologies,
+ Set<String> topologiesMissingAssignments);
+
+ /**
+ * this is called after the assignment is changed in ZK.
+ */
+ void assignSlots(Topologies topologies, Map<String, Collection<WorkerSlot>> newSlotsByTopologyId);
+
+ /**
+ * map from node id to supervisor details.
+ */
+ String getHostName(Map<String, SupervisorDetails> existingSupervisors, String nodeId);
+
+ IScheduler getForcedScheduler();
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/b4d33955/storm-server/src/main/java/org/apache/storm/scheduler/IScheduler.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/IScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/IScheduler.java
new file mode 100644
index 0000000..2f1de1c
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/IScheduler.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler;
+
+import java.util.Map;
+
+public interface IScheduler {
+
+ void prepare(Map<String, Object> conf);
+
+ /**
+ * Set assignments for the topologies which needs scheduling. The new assignments is available
+ * through `cluster.getAssignments()`
+ *
+ *@param topologies all the topologies in the cluster, some of them need schedule. Topologies object here
+ * only contain static information about topologies. Information like assignments, slots are all in
+ * the `cluster` object.
+ *@param cluster the cluster these topologies are running in. `cluster` contains everything user
+ * need to develop a new scheduling logic. e.g. supervisors information, available slots, current
+ * assignments for all the topologies etc. User can set the new assignment for topologies using
+ * cluster.setAssignmentById()`
+ */
+ void schedule(Topologies topologies, Cluster cluster);
+
+ /**
+ * This function returns the scheduler's configuration.
+ *
+ * @return The scheduler's configuration.
+ */
+ Map<String, Object> config();
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/b4d33955/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java b/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java
new file mode 100644
index 0000000..95e0c86
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.storm.daemon.nimbus.TopologyResources;
+import org.apache.storm.generated.WorkerResources;
+import org.apache.storm.scheduler.Cluster.SupervisorResources;
+
+/** An interface that provides access to the current scheduling state. */
+public interface ISchedulingState {
+
+ /**
+ * Get all of the topologies.
+ * @return all of the topologies that are a part of the cluster.
+ */
+ Topologies getTopologies();
+
+ /**
+ * Get all of the topologies that need scheduling.
+ * @return all of the topologies that are not fully scheduled.
+ */
+ List<TopologyDetails> needsSchedulingTopologies();
+
+ /**
+ * Does the topology need scheduling?
+ *
+ * <p>A topology needs scheduling if one of the following conditions holds:
+ *
+ * <ul>
+ * <li>Although the topology is assigned slots, but is squeezed. i.e. the topology is assigned
+ * less slots than desired.
+ * <li>There are unassigned executors in this topology
+ * </ul>
+ */
+ boolean needsScheduling(TopologyDetails topology);
+
+ /**
+ * Like {@link #needsScheduling(TopologyDetails)} but does not take into account the number of
+ * workers requested. This is because the number of workers is ignored in RAS
+ *
+ * @param topology the topology to check
+ * @return true if the topology needs scheduling else false.
+ */
+ boolean needsSchedulingRas(TopologyDetails topology);
+
+ /**
+ * Get all of the hosts that are blacklisted.
+ * @return all of the hosts that are blacklisted
+ */
+ Set<String> getBlacklistedHosts();
+
+ /**
+ * Check is a given supervisor is on a blacklisted host.
+ *
+ * @param supervisorId the id of the supervisor
+ * @return true if it is else false
+ */
+ boolean isBlackListed(String supervisorId);
+
+ /**
+ * Check if a given host is blacklisted.
+ *
+ * @param host the name of the host
+ * @return true if it is else false.
+ */
+ boolean isBlacklistedHost(String host);
+
+ /**
+ * Map a supervisor to a given host.
+ *
+ * @param supervisorId the id of the supervisor
+ * @return the actual host name the supervisor is on
+ */
+ String getHost(String supervisorId);
+
+ /**
+ * get the unassigned executors of the topology.
+ *
+ * @param topology the topology to check
+ * @return the unassigned executors of the topology.
+ */
+ Collection<ExecutorDetails> getUnassignedExecutors(TopologyDetails topology);
+
+ /**
+ * @param topology the topology this is for
+ * @return a executor -> component-id map which needs scheduling in this topology.
+ */
+ Map<ExecutorDetails, String> getNeedsSchedulingExecutorToComponents(TopologyDetails topology);
+
+ /**
+ * @param topology the topology this is for
+ * @return a component-id -> executors map which needs scheduling in this topology.
+ */
+ Map<String, List<ExecutorDetails>> getNeedsSchedulingComponentToExecutors(
+ TopologyDetails topology);
+
+ /** Get all the used ports of this supervisor. */
+ Set<Integer> getUsedPorts(SupervisorDetails supervisor);
+
+ /** Return the available ports of this supervisor. */
+ Set<Integer> getAvailablePorts(SupervisorDetails supervisor);
+
+ /**
+ * Get the ports that are not blacklisted.
+ *
+ * @param supervisor the supervisor
+ * @return the ports that are not blacklisted
+ */
+ Set<Integer> getAssignablePorts(SupervisorDetails supervisor);
+
+ /** Return all the available slots on this supervisor. */
+ List<WorkerSlot> getAvailableSlots(SupervisorDetails supervisor);
+
+ /** Get all the available worker slots in the cluster. */
+ List<WorkerSlot> getAvailableSlots();
+
+ /**
+ * Return all non-blacklisted slots on this supervisor.
+ *
+ * @param supervisor the supervisor
+ * @return the non-blacklisted slots
+ */
+ List<WorkerSlot> getAssignableSlots(SupervisorDetails supervisor);
+
+ /** Get all non-blacklisted slots in the cluster. */
+ List<WorkerSlot> getAssignableSlots();
+
+ /** Get all currently occupied slots. */
+ Collection<WorkerSlot> getUsedSlots();
+
+ /**
+ * Check if a slot is occupied or not.
+ * @param slot the slot be to checked.
+ * @return true if the specified slot is occupied.
+ */
+ boolean isSlotOccupied(WorkerSlot slot);
+
+ /**
+ * Get the number of workers assigned to a topology.
+ * @param topology the topology this is for
+ * @return the number of workers assigned to this topology.
+ */
+ int getAssignedNumWorkers(TopologyDetails topology);
+
+ /**
+ * Would scheduling exec on ws fit? With a heap <= maxHeap total memory added <= memoryAvailable
+ * and cpu added <= cpuAvailable.
+ *
+ * @param ws the slot to put it in
+ * @param exec the executor to investigate
+ * @param td the topology detains for this executor
+ * @param maxHeap the maximum heap size for ws
+ * @param memoryAvailable the amount of memory available
+ * @param cpuAvailable the amount of CPU available
+ * @return true it fits else false
+ */
+ boolean wouldFit(
+ WorkerSlot ws,
+ ExecutorDetails exec,
+ TopologyDetails td,
+ double maxHeap,
+ double memoryAvailable,
+ double cpuAvailable);
+
+ /** get the current assignment for the topology. */
+ SchedulerAssignment getAssignmentById(String topologyId);
+
+ /** get slots used by a topology. */
+ Collection<WorkerSlot> getUsedSlotsByTopologyId(String topologyId);
+
+ /** Get a specific supervisor with the <code>nodeId</code>. */
+ SupervisorDetails getSupervisorById(String nodeId);
+
+ /**
+ * Get all the supervisors on the specified <code>host</code>.
+ *
+ * @param host hostname of the supervisor
+ * @return the <code>SupervisorDetails</code> object.
+ */
+ List<SupervisorDetails> getSupervisorsByHost(String host);
+
+ /** Get all the assignments. */
+ Map<String, SchedulerAssignment> getAssignments();
+
+ /** Get all the supervisors. */
+ Map<String, SupervisorDetails> getSupervisors();
+
+ /** Get the total amount of CPU resources in cluster. */
+ double getClusterTotalCpuResource();
+
+ /** Get the total amount of memory resources in cluster. */
+ double getClusterTotalMemoryResource();
+
+ /** Get the network topography (rackId -> nodes in the rack). */
+ Map<String, List<String>> getNetworkTopography();
+
+ /** Get all topology scheduler statuses. */
+ Map<String, String> getStatusMap();
+
+ /**
+ * Get the amount of resources used by topologies. Used for displaying resource information on the
+ * UI.
+ *
+ * @return a map that contains multiple topologies and the resources the topology requested and
+ * assigned. Key: topology id Value: an array that describes the resources the topology
+ * requested and assigned in the following format: {requestedMemOnHeap, requestedMemOffHeap,
+ * requestedCpu, assignedMemOnHeap, assignedMemOffHeap, assignedCpu}
+ */
+ Map<String, TopologyResources> getTopologyResourcesMap();
+
+ /**
+ * Get the amount of used and free resources on a supervisor. Used for displaying resource
+ * information on the UI
+ *
+ * @return a map where the key is the supervisor id and the value is a map that represents
+ * resource usage for a supervisor in the following format: {totalMem, totalCpu, usedMem,
+ * usedCpu}
+ */
+ Map<String, SupervisorResources> getSupervisorsResourcesMap();
+
+ /**
+ * Gets the reference to the full topology->worker resource map.
+ *
+ * @return map of topology -> map of worker slot ->resources for that worker
+ */
+ Map<String, Map<WorkerSlot, WorkerResources>> getWorkerResourcesMap();
+
+ /**
+ * Get the resources for a given slot.
+ *
+ * @param ws the slot
+ * @return the resources currently assigned
+ */
+ WorkerResources getWorkerResources(WorkerSlot ws);
+
+ /**
+ * Get the total memory currently scheduled on a node.
+ *
+ * @param nodeId the id of the node
+ * @return the total memory currently scheduled on the node
+ */
+ double getScheduledMemoryForNode(String nodeId);
+
+ /**
+ * Get the total cpu currently scheduled on a node.
+ *
+ * @param nodeId the id of the node
+ * @return the total cpu currently scheduled on the node
+ */
+ double getScheduledCpuForNode(String nodeId);
+
+ /** Get the nimbus configuration. */
+ Map<String, Object> getConf();
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/b4d33955/storm-server/src/main/java/org/apache/storm/scheduler/ISupervisor.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/ISupervisor.java b/storm-server/src/main/java/org/apache/storm/scheduler/ISupervisor.java
new file mode 100644
index 0000000..4ecf28e
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/ISupervisor.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler;
+
+import java.util.Collection;
+import java.util.Map;
+
+public interface ISupervisor {
+ void prepare(Map<String, Object> topoConf, String schedulerLocalDir);
+
+ // for mesos, this is {hostname}-{topologyid}
+ /**
+ * The id used for writing metadata into ZK.
+ */
+ String getSupervisorId();
+
+ /**
+ * The id used in assignments. This combined with confirmAssigned decides what
+ * this supervisor is responsible for. The combination of this and getSupervisorId
+ * allows Nimbus to assign to a single machine and have multiple supervisors
+ * on that machine execute the assignment. This is important for achieving resource isolation.
+ */
+ String getAssignmentId();
+
+ Object getMetadata();
+
+ boolean confirmAssigned(int port);
+
+ // calls this before actually killing the worker locally...
+ // sends a "task finished" update
+ void killedWorker(int port);
+
+ void assigned(Collection<Integer> ports);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/b4d33955/storm-server/src/main/java/org/apache/storm/scheduler/SingleTopologyCluster.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/SingleTopologyCluster.java b/storm-server/src/main/java/org/apache/storm/scheduler/SingleTopologyCluster.java
new file mode 100644
index 0000000..ee8ef00
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/SingleTopologyCluster.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler;
+
+/**
+ * A Cluster that only allows modification to a single topology.
+ */
+public class SingleTopologyCluster extends Cluster {
+ private final String allowedId;
+
+ /**
+ * Create a new cluster that only allows modifications to a single topology.
+ *
+ * @param other the current cluster to base this off of
+ * @param topologyId the topology that is allowed to be modified.
+ */
+ public SingleTopologyCluster(Cluster other, String topologyId) {
+ super(other);
+ allowedId = topologyId;
+ }
+
+ @Override
+ protected void assertValidTopologyForModification(String topologyId) {
+ //AllowedId is null in the constructor, so it can assign what it needs/etc.
+ if (allowedId != null && !allowedId.equals(topologyId)) {
+ throw new IllegalArgumentException(
+ "Only " + allowedId + " is allowed to be modified at this time.");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/b4d33955/storm-server/src/main/java/org/apache/storm/scheduler/Topologies.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/Topologies.java b/storm-server/src/main/java/org/apache/storm/scheduler/Topologies.java
new file mode 100644
index 0000000..16d230a
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/Topologies.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+
+public class Topologies implements Iterable<TopologyDetails> {
+ Map<String, TopologyDetails> topologies;
+ Map<String, String> nameToId;
+ Map<String, Map<String, Component>> allComponents;
+
+ private static Map<String, TopologyDetails> mkMap(TopologyDetails[] details) {
+ Map<String, TopologyDetails> ret = new HashMap<>();
+ for (TopologyDetails td : details) {
+ if (ret.put(td.getId(), td) != null) {
+ throw new IllegalArgumentException(
+ "Cannot have multiple topologies with the id " + td.getId());
+ }
+ }
+ return ret;
+ }
+
+ /**
+ * Create a new Topologies from a list of TopologyDetails.
+ * @param details the list of details to use.
+ * @throws IllegalArgumentException if duplicate topology ids are found.
+ */
+ public Topologies(TopologyDetails... details) {
+ this(mkMap(details));
+ }
+
+ /**
+ * Create a new Topologies from a map of id to topology
+ * @param topologies a map of topology id to topology details.
+ */
+ public Topologies(Map<String, TopologyDetails> topologies) {
+ if (topologies == null) {
+ topologies = Collections.emptyMap();
+ }
+ this.topologies = new HashMap<>(topologies);
+ this.nameToId = new HashMap<>(topologies.size());
+
+ for (Map.Entry<String, TopologyDetails> entry : topologies.entrySet()) {
+ TopologyDetails topology = entry.getValue();
+ this.nameToId.put(topology.getName(), entry.getKey());
+ }
+ }
+
+ /**
+ * Copy constructor.
+ */
+ public Topologies(Topologies src) {
+ this(src.topologies);
+ }
+
+ public Collection<String> getAllIds() {
+ return topologies.keySet();
+ }
+
+ /**
+ * Get a topology given an ID
+ * @param topologyId the id of the topology to get
+ * @return the topology or null if it is not found.
+ */
+ public TopologyDetails getById(String topologyId) {
+ return topologies.get(topologyId);
+ }
+
+ /**
+ * Get a topology given a topology name. Nimbus prevents multiple topologies
+ * from having the same name, so this assumes it is true.
+ * @param topologyName the name of the topology to look for
+ * @return the a topology with the given name.
+ */
+ public TopologyDetails getByName(String topologyName) {
+ String topologyId = this.nameToId.get(topologyName);
+
+ if (topologyId == null) {
+ return null;
+ } else {
+ return this.getById(topologyId);
+ }
+ }
+
+ public Collection<TopologyDetails> getTopologies() {
+ return topologies.values();
+ }
+
+ /**
+ * Get all topologies submitted/owned by a given user.
+ * @param user the name of the user
+ * @return all of the topologies submitted by this user.
+ */
+ public Collection<TopologyDetails> getTopologiesOwnedBy(String user) {
+ HashSet<TopologyDetails> ret = new HashSet<>();
+ for (TopologyDetails td : this) {
+ if (user.equals(td.getTopologySubmitter())) {
+ ret.add(td);
+ }
+ }
+ return ret;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder ret = new StringBuilder();
+ ret.append("Topologies:\n");
+ for (TopologyDetails td : this.getTopologies()) {
+ ret.append(td.toString()).append("\n");
+ }
+ return ret.toString();
+ }
+
+ @Override
+ public Iterator<TopologyDetails> iterator() {
+ return topologies.values().iterator();
+ }
+}