You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/07/31 02:07:04 UTC
[05/20] storm git commit: STORM-2497: Let Supervisor enforce memory
and add in support for shared memory regions
http://git-wip-us.apache.org/repos/asf/storm/blob/b4d33955/storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java b/storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java
new file mode 100644
index 0000000..575463b
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java
@@ -0,0 +1,573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.storm.Config;
+import org.apache.storm.generated.Bolt;
+import org.apache.storm.generated.ComponentCommon;
+import org.apache.storm.generated.ComponentType;
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.generated.SharedMemory;
+import org.apache.storm.generated.SpoutSpec;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.scheduler.resource.ResourceUtils;
+import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TopologyDetails {
+ private final String topologyId;
+ private final Map<String, Object> topologyConf;
+ private final StormTopology topology;
+ private final Map<ExecutorDetails, String> executorToComponent;
+ private final int numWorkers;
+ //<ExecutorDetails - Task, Map<String - Type of resource, Map<String - type of that resource, Double - amount>>>
+ private Map<ExecutorDetails, Map<String, Double>> resourceList;
+ //Max heap size for a worker used by topology
+ private Double topologyWorkerMaxHeapSize;
+ //topology priority
+ private Integer topologyPriority;
+ //when topology was launched
+ private final int launchTime;
+ private final String owner;
+ private final String topoName;
+
+ private static final Logger LOG = LoggerFactory.getLogger(TopologyDetails.class);
+
+ public TopologyDetails(String topologyId, Map<String, Object> topologyConf, StormTopology topology, int numWorkers, String owner) {
+ this(topologyId, topologyConf, topology, numWorkers, null, 0, owner);
+ }
+
+ public TopologyDetails(String topologyId, Map<String, Object> topologyConf, StormTopology topology,
+ int numWorkers, Map<ExecutorDetails, String> executorToComponents, String owner) {
+ this(topologyId, topologyConf, topology, numWorkers, executorToComponents, 0, owner);
+ }
+
+ public TopologyDetails(String topologyId, Map<String, Object> topologyConf, StormTopology topology, int numWorkers,
+ Map<ExecutorDetails, String> executorToComponents, int launchTime, String owner) {
+ this.owner = owner;
+ this.topologyId = topologyId;
+ this.topologyConf = topologyConf;
+ this.topology = topology;
+ this.numWorkers = numWorkers;
+ this.executorToComponent = new HashMap<>(0);
+ if (executorToComponents != null) {
+ this.executorToComponent.putAll(executorToComponents);
+ }
+ if (topology != null) {
+ initResourceList();
+ }
+ initConfigs();
+ this.launchTime = launchTime;
+ this.topoName = (String) topologyConf.get(Config.TOPOLOGY_NAME);
+ }
+
+ public String getId() {
+ return topologyId;
+ }
+
+ public String getName() {
+ return topoName;
+ }
+
+ public Map<String, Object> getConf() {
+ return topologyConf;
+ }
+
+ public int getNumWorkers() {
+ return numWorkers;
+ }
+
+ public StormTopology getTopology() {
+ return topology;
+ }
+
+ public Map<ExecutorDetails, String> getExecutorToComponent() {
+ return executorToComponent;
+ }
+
+ public Map<ExecutorDetails, String> selectExecutorToComponent(
+ Collection<ExecutorDetails> executors) {
+ Map<ExecutorDetails, String> ret = new HashMap<>(executors.size());
+ for (ExecutorDetails executor : executors) {
+ String compId = executorToComponent.get(executor);
+ if (compId != null) {
+ ret.put(executor, compId);
+ }
+ }
+
+ return ret;
+ }
+
+ public Set<ExecutorDetails> getExecutors() {
+ return executorToComponent.keySet();
+ }
+
+ private void initResourceList() {
+ this.resourceList = new HashMap<>();
+ // Extract bolt memory info
+ if (topology.get_bolts() != null) {
+ for (Map.Entry<String, Bolt> bolt : topology.get_bolts().entrySet()) {
+ //the json_conf is populated by TopologyBuilder (e.g. boltDeclarer.setMemoryLoad)
+ Map<String, Double> topologyResources =
+ ResourceUtils.parseResources(bolt.getValue().get_common().get_json_conf());
+ ResourceUtils.checkIntialization(topologyResources, bolt.getKey(), topologyConf);
+ for (Map.Entry<ExecutorDetails, String> anExecutorToComponent :
+ executorToComponent.entrySet()) {
+ if (bolt.getKey().equals(anExecutorToComponent.getValue())) {
+ resourceList.put(anExecutorToComponent.getKey(), topologyResources);
+ }
+ }
+ }
+ }
+ // Extract spout memory info
+ if (topology.get_spouts() != null) {
+ for (Map.Entry<String, SpoutSpec> spout : topology.get_spouts().entrySet()) {
+ Map<String, Double> topologyResources =
+ ResourceUtils.parseResources(spout.getValue().get_common().get_json_conf());
+ ResourceUtils.checkIntialization(topologyResources, spout.getKey(), this.topologyConf);
+ for (Map.Entry<ExecutorDetails, String> anExecutorToComponent :
+ executorToComponent.entrySet()) {
+ if (spout.getKey().equals(anExecutorToComponent.getValue())) {
+ resourceList.put(anExecutorToComponent.getKey(), topologyResources);
+ }
+ }
+ }
+ } else {
+ LOG.warn("Topology " + topologyId + " does not seem to have any spouts!");
+ }
+ //schedule tasks that are not part of components returned from topology.get_spout or
+ // topology.getbolt (AKA sys tasks most specifically __acker tasks)
+ for (ExecutorDetails exec : getExecutors()) {
+ if (!resourceList.containsKey(exec)) {
+ LOG.debug(
+ "Scheduling {} {} with memory requirement as 'on heap' - {} and 'off heap' - {} and "
+ + "CPU requirement as {}",
+ getExecutorToComponent().get(exec),
+ exec,
+ topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB),
+ topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB),
+ topologyConf.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT));
+ addDefaultResforExec(exec);
+ }
+ }
+ }
+
+ private List<ExecutorDetails> componentToExecs(String comp) {
+ List<ExecutorDetails> execs = new ArrayList<>();
+ for (Map.Entry<ExecutorDetails, String> entry : executorToComponent.entrySet()) {
+ if (entry.getValue().equals(comp)) {
+ execs.add(entry.getKey());
+ }
+ }
+ return execs;
+ }
+
+ private Set<String> getInputsTo(ComponentCommon comp) {
+ Set<String> ret = new HashSet<>();
+ for (GlobalStreamId globalId : comp.get_inputs().keySet()) {
+ ret.add(globalId.get_componentId());
+ }
+ return ret;
+ }
+
+ /**
+ * Returns a representation of the non-system components of the topology graph Each Component
+ * object in the returning map is populated with the list of its parents, children and execs
+ * assigned to that component.
+ *
+ * @return a map of components
+ */
+ public Map<String, Component> getComponents() {
+ Map<String, Component> ret = new HashMap<>();
+
+ Map<String, SpoutSpec> spouts = topology.get_spouts();
+ Map<String, Bolt> bolts = topology.get_bolts();
+ //Add in all of the components
+ if (spouts != null) {
+ for (Map.Entry<String, SpoutSpec> entry : spouts.entrySet()) {
+ String compId = entry.getKey();
+ if (!Utils.isSystemId(compId)) {
+ Component comp = new Component(ComponentType.SPOUT, compId, componentToExecs(compId));
+ ret.put(compId, comp);
+ }
+ }
+ }
+ if (bolts != null) {
+ for (Map.Entry<String, Bolt> entry : bolts.entrySet()) {
+ String compId = entry.getKey();
+ if (!Utils.isSystemId(compId)) {
+ Component comp = new Component(ComponentType.BOLT, compId, componentToExecs(compId));
+ ret.put(compId, comp);
+ }
+ }
+ }
+
+ //Link the components together
+ if (spouts != null) {
+ for (Map.Entry<String, SpoutSpec> entry : spouts.entrySet()) {
+ Component spout = ret.get(entry.getKey());
+ for (String parentId : getInputsTo(entry.getValue().get_common())) {
+ ret.get(parentId).addChild(spout);
+ }
+ }
+ }
+
+ if (bolts != null) {
+ for (Map.Entry<String, Bolt> entry : bolts.entrySet()) {
+ Component bolt = ret.get(entry.getKey());
+ for (String parentId : getInputsTo(entry.getValue().get_common())) {
+ ret.get(parentId).addChild(bolt);
+ }
+ }
+ }
+ return ret;
+ }
+
+ /**
+ * Gets the on heap memory requirement for a certain task within a topology.
+ * @param exec the executor the inquiry is concerning.
+ * @return Double the amount of on heap memory requirement for this exec in topology topoId.
+ */
+ public Double getOnHeapMemoryRequirement(ExecutorDetails exec) {
+ Double ret = null;
+ if (hasExecInTopo(exec)) {
+ ret = resourceList
+ .get(exec)
+ .get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
+ }
+ return ret;
+ }
+
+ /**
+ * Gets the off heap memory requirement for a certain task within a topology.
+ * @param exec the executor the inquiry is concerning.
+ * @return Double the amount of off heap memory requirement for this exec in topology topoId.
+ */
+ public Double getOffHeapMemoryRequirement(ExecutorDetails exec) {
+ Double ret = null;
+ if (hasExecInTopo(exec)) {
+ ret = resourceList
+ .get(exec)
+ .get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB);
+ }
+ return ret;
+ }
+
+ /**
+ * Gets the total memory requirement for a task.
+ * @param exec the executor the inquiry is concerning.
+ * @return Double the total memory requirement for this exec in topology topoId.
+ */
+ public Double getTotalMemReqTask(ExecutorDetails exec) {
+ if (hasExecInTopo(exec)) {
+ return getOffHeapMemoryRequirement(exec)
+ + getOnHeapMemoryRequirement(exec);
+ }
+ return null;
+ }
+
+ /**
+ * Gets the total memory resource list for a set of tasks that is part of a topology.
+ * @return Map<ExecutorDetails, Double> a map of the total memory requirement for all tasks in topology topoId.
+ */
+ public Map<ExecutorDetails, Double> getTotalMemoryResourceList() {
+ Map<ExecutorDetails, Double> ret = new HashMap<>();
+ for (ExecutorDetails exec : resourceList.keySet()) {
+ ret.put(exec, getTotalMemReqTask(exec));
+ }
+ return ret;
+ }
+
+ public Set<SharedMemory> getSharedMemoryRequests(Collection<ExecutorDetails> executors) {
+ Set<String> components = new HashSet<>();
+ for (ExecutorDetails exec : executors) {
+ String component = executorToComponent.get(exec);
+ if (component != null) {
+ components.add(component);
+ }
+ }
+ Set<SharedMemory> ret = new HashSet<>();
+ if (topology != null) {
+ //topology being null is used for tests We probably should fix that at some point,
+ // but it is not trivial to do...
+ Map<String, Set<String>> compToSharedName = topology.get_component_to_shared_memory();
+ if (compToSharedName != null) {
+ for (String component : components) {
+ Set<String> sharedNames = compToSharedName.get(component);
+ if (sharedNames != null) {
+ for (String name : sharedNames) {
+ ret.add(topology.get_shared_memory().get(name));
+ }
+ }
+ }
+ }
+ }
+ return ret;
+ }
+
+ /**
+ * Get the total CPU requirement for executor
+ * @return Double the total about of cpu requirement for executor
+ */
+ public Double getTotalCpuReqTask(ExecutorDetails exec) {
+ if (hasExecInTopo(exec)) {
+ return resourceList
+ .get(exec)
+ .get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);
+ }
+ return null;
+ }
+
+ /**
+ * Note: The public API relevant to resource aware scheduling is unstable as of May 2015.
+ * We reserve the right to change them.
+ *
+ * @return the total on-heap memory requested for this topology
+ */
+ public double getTotalRequestedMemOnHeap() {
+ return getRequestedSharedOnHeap() + getRequestedNonSharedOnHeap();
+ }
+
+ public double getRequestedSharedOnHeap() {
+ double ret = 0.0;
+ if (topology.is_set_shared_memory()) {
+ for (SharedMemory req : topology.get_shared_memory().values()) {
+ ret += req.get_on_heap();
+ }
+ }
+ return ret;
+ }
+
+ public double getRequestedNonSharedOnHeap() {
+ double totalMemOnHeap = 0.0;
+ for (ExecutorDetails exec : this.getExecutors()) {
+ Double execMem = getOnHeapMemoryRequirement(exec);
+ if (execMem != null) {
+ totalMemOnHeap += execMem;
+ }
+ }
+ return totalMemOnHeap;
+ }
+
+ /**
+ * Note: The public API relevant to resource aware scheduling is unstable as of May 2015.
+ * We reserve the right to change them.
+ *
+ * @return the total off-heap memory requested for this topology
+ */
+ public double getTotalRequestedMemOffHeap() {
+ return getRequestedNonSharedOffHeap() + getRequestedSharedOffHeap();
+ }
+
+ public double getRequestedNonSharedOffHeap() {
+ double totalMemOffHeap = 0.0;
+ for (ExecutorDetails exec : this.getExecutors()) {
+ Double execMem = getOffHeapMemoryRequirement(exec);
+ if (execMem != null) {
+ totalMemOffHeap += execMem;
+ }
+ }
+ return totalMemOffHeap;
+ }
+
+ public double getRequestedSharedOffHeap() {
+ double ret = 0.0;
+ if (topology.is_set_shared_memory()) {
+ for (SharedMemory req : topology.get_shared_memory().values()) {
+ ret += req.get_off_heap_worker() + req.get_off_heap_node();
+ }
+ }
+ return ret;
+ }
+
+ /**
+ * Note: The public API relevant to resource aware scheduling is unstable as of May 2015.
+ * We reserve the right to change them.
+ *
+ * @return the total cpu requested for this topology
+ */
+ public double getTotalRequestedCpu() {
+ double totalCpu = 0.0;
+ for (ExecutorDetails exec : this.getExecutors()) {
+ Double execCpu = getTotalCpuReqTask(exec);
+ if (execCpu != null) {
+ totalCpu += execCpu;
+ }
+ }
+ return totalCpu;
+ }
+
+ /**
+ * get the resources requirements for a executor
+ * @param exec
+ * @return a map containing the resource requirements for this exec
+ */
+ public Map<String, Double> getTaskResourceReqList(ExecutorDetails exec) {
+ if (hasExecInTopo(exec)) {
+ return resourceList.get(exec);
+ }
+ return null;
+ }
+
+ /**
+ * Checks if a executor is part of this topology
+ * @return Boolean whether or not a certain ExecutorDetail is included in the resourceList.
+ */
+ public boolean hasExecInTopo(ExecutorDetails exec) {
+ return resourceList != null && resourceList.containsKey(exec);
+ }
+
+ /**
+ * add resource requirements for a executor
+ */
+ public void addResourcesForExec(ExecutorDetails exec, Map<String, Double> resourceList) {
+ if (hasExecInTopo(exec)) {
+ LOG.warn("Executor {} already exists...ResourceList: {}", exec, getTaskResourceReqList(exec));
+ return;
+ }
+ this.resourceList.put(exec, resourceList);
+ }
+
+ /**
+ * Add default resource requirements for a executor
+ */
+ private void addDefaultResforExec(ExecutorDetails exec) {
+ Double topologyComponentCpuPcorePercent =
+ ObjectReader.getDouble(
+ topologyConf.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT), null);
+ Double topologyComponentResourcesOffheapMemoryMb =
+ ObjectReader.getDouble(
+ topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB), null);
+ Double topologyComponentResourcesOnheapMemoryMb =
+ ObjectReader.getDouble(
+ topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB), null);
+
+ assert topologyComponentCpuPcorePercent != null;
+ assert topologyComponentResourcesOffheapMemoryMb != null;
+ assert topologyComponentResourcesOnheapMemoryMb != null;
+
+ Map<String, Double> defaultResourceList = new HashMap<>();
+ defaultResourceList.put(
+ Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, topologyComponentCpuPcorePercent);
+ defaultResourceList.put(
+ Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB,
+ topologyComponentResourcesOffheapMemoryMb);
+ defaultResourceList.put(
+ Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB,
+ topologyComponentResourcesOnheapMemoryMb);
+ LOG.debug(
+ "Scheduling Executor: {} with memory requirement as onHeap: {} - offHeap: {} "
+ + "and CPU requirement: {}",
+ exec,
+ topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB),
+ topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB),
+ topologyConf.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT));
+ addResourcesForExec(exec, defaultResourceList);
+ }
+
+ /**
+ * initializes member variables
+ */
+ private void initConfigs() {
+ this.topologyWorkerMaxHeapSize =
+ ObjectReader.getDouble(
+ topologyConf.get(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB), null);
+ this.topologyPriority =
+ ObjectReader.getInt(topologyConf.get(Config.TOPOLOGY_PRIORITY), null);
+
+ assert this.topologyWorkerMaxHeapSize != null;
+ assert this.topologyPriority != null;
+ }
+
+ /**
+ * Get the max heap size for a worker used by this topology
+ * @return the worker max heap size
+ */
+ public Double getTopologyWorkerMaxHeapSize() {
+ return topologyWorkerMaxHeapSize;
+ }
+
+ /**
+ * Get the user that submitted this topology
+ */
+ public String getTopologySubmitter() {
+ return owner;
+ }
+
+ /**
+ * get the priority of this topology
+ */
+ public int getTopologyPriority() {
+ return topologyPriority;
+ }
+
+ /**
+ * Get the timestamp of when this topology was launched
+ */
+ public int getLaunchTime() {
+ return launchTime;
+ }
+
+ /**
+ * Get how long this topology has been executing
+ */
+ public int getUpTime() {
+ return Time.currentTimeSecs() - launchTime;
+ }
+
+ @Override
+ public String toString() {
+ return "Name: "
+ + getName()
+ + " id: "
+ + getId()
+ + " Priority: "
+ + getTopologyPriority()
+ + " Uptime: "
+ + getUpTime()
+ + " CPU: "
+ + getTotalRequestedCpu()
+ + " Memory: "
+ + (getTotalRequestedMemOffHeap() + getTotalRequestedMemOnHeap());
+ }
+
+ @Override
+ public int hashCode() {
+ return topologyId.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof TopologyDetails)) {
+ return false;
+ }
+ return (topologyId.equals(((TopologyDetails) o).getId()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/b4d33955/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
new file mode 100644
index 0000000..5603541
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
@@ -0,0 +1,465 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.storm.scheduler.Cluster;
+import org.apache.storm.scheduler.ExecutorDetails;
+import org.apache.storm.scheduler.SupervisorDetails;
+import org.apache.storm.scheduler.TopologyDetails;
+import org.apache.storm.scheduler.WorkerSlot;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Represents a single node in the cluster. */
+public class RAS_Node {
+ private static final Logger LOG = LoggerFactory.getLogger(RAS_Node.class);
+
+ //A map consisting of all workers on the node.
+ //The key of the map is the worker id and the value is the corresponding workerslot object
+ private Map<String, WorkerSlot> slots = new HashMap<>();
+
+ // A map describing which topologies are using which slots on this node. The format of the map is the following:
+ // {TopologyId -> {WorkerId -> {Executors}}}
+ private Map<String, Map<String, Collection<ExecutorDetails>>> topIdToUsedSlots = new HashMap<>();
+
+ private final double totalMemory;
+ private final double totalCpu;
+ private final String nodeId;
+ private String hostname;
+ private boolean isAlive;
+ private SupervisorDetails sup;
+ private final Cluster cluster;
+ private final Set<WorkerSlot> originallyFreeSlots;
+
+ public RAS_Node(
+ String nodeId,
+ SupervisorDetails sup,
+ Cluster cluster,
+ Map<String, WorkerSlot> workerIdToWorker,
+ Map<String, Map<String, Collection<ExecutorDetails>>> assignmentMap) {
+ //Node ID and supervisor ID are the same.
+ this.nodeId = nodeId;
+ if (sup == null) {
+ isAlive = false;
+ } else {
+ isAlive = !cluster.isBlackListed(this.nodeId);
+ }
+
+ this.cluster = cluster;
+
+ // initialize slots for this node
+ if (workerIdToWorker != null) {
+ slots = workerIdToWorker;
+ }
+
+ //initialize assignment map
+ if (assignmentMap != null) {
+ topIdToUsedSlots = assignmentMap;
+ }
+
+ //check if node is alive
+ if (isAlive && sup != null) {
+ hostname = sup.getHost();
+ this.sup = sup;
+ }
+
+ totalMemory = isAlive ? getTotalMemoryResources() : 0.0;
+ totalCpu = isAlive ? getTotalCpuResources() : 0.0;
+ HashSet<String> freeById = new HashSet<>(slots.keySet());
+ if (assignmentMap != null) {
+ for (Map<String, Collection<ExecutorDetails>> assignment : assignmentMap.values()) {
+ freeById.removeAll(assignment.keySet());
+ }
+ }
+ originallyFreeSlots = new HashSet<>();
+ for (WorkerSlot slot : slots.values()) {
+ if (freeById.contains(slot.getId())) {
+ originallyFreeSlots.add(slot);
+ }
+ }
+ }
+
+ public String getId() {
+ return nodeId;
+ }
+
+ public String getHostname() {
+ return hostname;
+ }
+
+ private Collection<WorkerSlot> workerIdsToWorkers(Collection<String> workerIds) {
+ Collection<WorkerSlot> ret = new LinkedList<WorkerSlot>();
+ for (String workerId : workerIds) {
+ ret.add(slots.get(workerId));
+ }
+ return ret;
+ }
+
+ public Collection<String> getFreeSlotsId() {
+ if (!isAlive) {
+ return new HashSet<String>();
+ }
+ Collection<String> usedSlotsId = getUsedSlotsId();
+ Set<String> ret = new HashSet<>();
+ ret.addAll(slots.keySet());
+ ret.removeAll(usedSlotsId);
+ return ret;
+ }
+
+ public Collection<WorkerSlot> getSlotsAvailbleTo(TopologyDetails td) {
+ //Try to reuse a slot if possible....
+ HashSet<WorkerSlot> ret = new HashSet<>();
+ Map<String, Collection<ExecutorDetails>> assigned = topIdToUsedSlots.get(td.getId());
+ if (assigned != null) {
+ ret.addAll(workerIdsToWorkers(assigned.keySet()));
+ }
+ ret.addAll(getFreeSlots());
+ ret.retainAll(
+ originallyFreeSlots); //RAS does not let you move things or modify existing assignments
+ return ret;
+ }
+
+ public Collection<WorkerSlot> getFreeSlots() {
+ return workerIdsToWorkers(getFreeSlotsId());
+ }
+
+ private Collection<String> getUsedSlotsId() {
+ Collection<String> ret = new LinkedList<String>();
+ for (Map<String, Collection<ExecutorDetails>> entry : topIdToUsedSlots.values()) {
+ ret.addAll(entry.keySet());
+ }
+ return ret;
+ }
+
+ public Collection<WorkerSlot> getUsedSlots() {
+ return workerIdsToWorkers(getUsedSlotsId());
+ }
+
+ public Collection<WorkerSlot> getUsedSlots(String topId) {
+ Collection<WorkerSlot> ret = null;
+ if (topIdToUsedSlots.get(topId) != null) {
+ ret = workerIdsToWorkers(topIdToUsedSlots.get(topId).keySet());
+ }
+ return ret;
+ }
+
+ public boolean isAlive() {
+ return isAlive;
+ }
+
+ /** Get a collection of the topology ids currently running on this node. */
+ public Collection<String> getRunningTopologies() {
+ return topIdToUsedSlots.keySet();
+ }
+
+ public boolean isTotallyFree() {
+ return getUsedSlots().isEmpty();
+ }
+
+ public int totalSlotsFree() {
+ return getFreeSlots().size();
+ }
+
+ public int totalSlotsUsed() {
+ return getUsedSlots().size();
+ }
+
+ public int totalSlotsUsed(String topId) {
+ return getUsedSlots(topId).size();
+ }
+
+ public int totalSlots() {
+ return slots.size();
+ }
+
+ /** Free all slots on this node. This will update the Cluster too. */
+ public void freeAllSlots() {
+ if (!isAlive) {
+ LOG.warn("Freeing all slots on a dead node {} ", nodeId);
+ }
+ cluster.freeSlots(slots.values());
+ //clearing assignments
+ topIdToUsedSlots.clear();
+ }
+
+ /**
+ * frees a single executor.
+ *
+ * @param exec is the executor to free
+ * @param topo the topology the executor is a part of
+ */
+ public void freeSingleExecutor(ExecutorDetails exec, TopologyDetails topo) {
+ Map<String, Collection<ExecutorDetails>> usedSlots = topIdToUsedSlots.get(topo.getId());
+ if (usedSlots == null) {
+ throw new IllegalArgumentException("Topology " + topo + " is not assigned");
+ }
+ WorkerSlot ws = null;
+ Set<ExecutorDetails> updatedAssignment = new HashSet<>();
+ for (Entry<String, Collection<ExecutorDetails>> entry : usedSlots.entrySet()) {
+ if (entry.getValue().contains(exec)) {
+ ws = slots.get(entry.getKey());
+ updatedAssignment.addAll(entry.getValue());
+ updatedAssignment.remove(exec);
+ break;
+ }
+ }
+
+ if (ws == null) {
+ throw new IllegalArgumentException(
+ "Executor " + exec + " is not assinged on this node to " + topo);
+ }
+ free(ws);
+ if (!updatedAssignment.isEmpty()) {
+ assign(ws, topo, updatedAssignment);
+ }
+ }
+
+ /**
+ * Frees a single slot in this node.
+ *
+ * @param ws the slot to free
+ */
+ public void free(WorkerSlot ws) {
+ LOG.debug("freeing WorkerSlot {} on node {}", ws, hostname);
+ if (!slots.containsKey(ws.getId())) {
+ throw new IllegalArgumentException(
+ "Tried to free a slot " + ws + " that was not" + " part of this node " + nodeId);
+ }
+
+ TopologyDetails topo = findTopologyUsingWorker(ws);
+ if (topo == null) {
+ throw new IllegalArgumentException("Tried to free a slot " + ws + " that was already free!");
+ }
+
+ //free slot
+ cluster.freeSlot(ws);
+ //cleanup internal assignments
+ topIdToUsedSlots.get(topo.getId()).remove(ws.getId());
+ }
+
+ /**
+ * Find a which topology is running on a worker slot.
+ *
+ * @return the topology using the worker slot. If worker slot is free then return null
+ */
+ private TopologyDetails findTopologyUsingWorker(WorkerSlot ws) {
+ for (Entry<String, Map<String, Collection<ExecutorDetails>>> entry :
+ topIdToUsedSlots.entrySet()) {
+ String topoId = entry.getKey();
+ Set<String> workerIds = entry.getValue().keySet();
+ for (String workerId : workerIds) {
+ if (ws.getId().equals(workerId)) {
+ return cluster.getTopologies().getById(topoId);
+ }
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Assigns a worker to a node.
+ *
+ * @param target the worker slot to assign the executors
+ * @param td the topology the executors are from
+ * @param executors executors to assign to the specified worker slot
+ */
+ public void assign(WorkerSlot target, TopologyDetails td, Collection<ExecutorDetails> executors) {
+ if (!isAlive) {
+ throw new IllegalStateException("Trying to adding to a dead node " + nodeId);
+ }
+ Collection<WorkerSlot> freeSlots = getFreeSlots();
+ if (freeSlots.isEmpty()) {
+ throw new IllegalStateException("Trying to assign to a full node " + nodeId);
+ }
+ if (executors.size() == 0) {
+ LOG.warn("Trying to assign nothing from " + td.getId() + " to " + nodeId + " (Ignored)");
+ }
+ if (target == null) {
+ target = getFreeSlots().iterator().next();
+ }
+ if (!freeSlots.contains(target)) {
+ throw new IllegalStateException(
+ "Trying to assign already used slot " + target.getPort() + " on node " + nodeId);
+ }
+ LOG.debug("target slot: {}", target);
+
+ cluster.assign(target, td.getId(), executors);
+
+ //assigning internally
+ if (!topIdToUsedSlots.containsKey(td.getId())) {
+ topIdToUsedSlots.put(td.getId(), new HashMap<String, Collection<ExecutorDetails>>());
+ }
+
+ if (!topIdToUsedSlots.get(td.getId()).containsKey(target.getId())) {
+ topIdToUsedSlots.get(td.getId()).put(target.getId(), new LinkedList<ExecutorDetails>());
+ }
+ topIdToUsedSlots.get(td.getId()).get(target.getId()).addAll(executors);
+ }
+
+ public void assignSingleExecutor(WorkerSlot ws, ExecutorDetails exec, TopologyDetails td) {
+ if (!isAlive) {
+ throw new IllegalStateException("Trying to adding to a dead node " + nodeId);
+ }
+ Collection<WorkerSlot> freeSlots = getFreeSlots();
+ Set<ExecutorDetails> toAssign = new HashSet<>();
+ toAssign.add(exec);
+ if (!freeSlots.contains(ws)) {
+ Map<String, Collection<ExecutorDetails>> usedSlots = topIdToUsedSlots.get(td.getId());
+ if (usedSlots == null) {
+ throw new IllegalArgumentException(
+ "Slot " + ws + " is not availble to schedue " + exec + " on");
+ }
+ Collection<ExecutorDetails> alreadyHere = usedSlots.get(ws.getId());
+ if (alreadyHere == null) {
+ throw new IllegalArgumentException(
+ "Slot " + ws + " is not availble to schedue " + exec + " on");
+ }
+ toAssign.addAll(alreadyHere);
+ free(ws);
+ }
+ assign(ws, td, toAssign);
+ }
+
+ /**
+ * Would scheduling exec in ws fit with the current resource constraints.
+ *
+ * @param ws the slot to possibly put exec in
+ * @param exec the executor to possibly place in ws
+ * @param td the topology exec is a part of
+ * @return true if it would fit else false
+ */
+ public boolean wouldFit(WorkerSlot ws, ExecutorDetails exec, TopologyDetails td) {
+ if (!nodeId.equals(ws.getNodeId())) {
+ throw new IllegalStateException("Slot " + ws + " is not a part of this node " + nodeId);
+ }
+ return isAlive
+ && cluster.wouldFit(
+ ws,
+ exec,
+ td,
+ td.getTopologyWorkerMaxHeapSize(),
+ getAvailableMemoryResources(),
+ getAvailableCpuResources());
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other instanceof RAS_Node) {
+ return nodeId.equals(((RAS_Node) other).nodeId);
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return nodeId.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return "{Node: "
+ + ((sup == null) ? "null (possibly down)" : sup.getHost())
+ + ", Avail [ Mem: "
+ + getAvailableMemoryResources()
+ + ", CPU: "
+ + getAvailableCpuResources()
+ + ", Slots: "
+ + this.getFreeSlots()
+ + "] Total [ Mem: "
+ + ((sup == null) ? "N/A" : this.getTotalMemoryResources())
+ + ", CPU: "
+ + ((sup == null) ? "N/A" : this.getTotalCpuResources())
+ + ", Slots: "
+ + this.slots.values()
+ + " ]}";
+ }
+
+ public static int countFreeSlotsAlive(Collection<RAS_Node> nodes) {
+ int total = 0;
+ for (RAS_Node n : nodes) {
+ if (n.isAlive()) {
+ total += n.totalSlotsFree();
+ }
+ }
+ return total;
+ }
+
+ public static int countTotalSlotsAlive(Collection<RAS_Node> nodes) {
+ int total = 0;
+ for (RAS_Node n : nodes) {
+ if (n.isAlive()) {
+ total += n.totalSlots();
+ }
+ }
+ return total;
+ }
+
+ /**
+ * Gets the available memory resources for this node.
+ *
+ * @return the available memory for this node
+ */
+ public Double getAvailableMemoryResources() {
+ double used = cluster.getScheduledMemoryForNode(nodeId);
+ return totalMemory - used;
+ }
+
+ /**
+ * Gets the total memory resources for this node.
+ *
+ * @return the total memory for this node
+ */
+ public Double getTotalMemoryResources() {
+ if (sup != null) {
+ return sup.getTotalMemory();
+ } else {
+ return 0.0;
+ }
+ }
+
+ /**
+ * Gets the available cpu resources for this node.
+ *
+ * @return the available cpu for this node
+ */
+ public double getAvailableCpuResources() {
+ return totalCpu - cluster.getScheduledCpuForNode(nodeId);
+ }
+
+ /**
+ * Gets the total cpu resources for this node.
+ *
+ * @return the total cpu for this node
+ */
+ public Double getTotalCpuResources() {
+ if (sup != null) {
+ return sup.getTotalCPU();
+ } else {
+ return 0.0;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/b4d33955/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Nodes.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Nodes.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Nodes.java
new file mode 100644
index 0000000..6c70ff1
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Nodes.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+
+import org.apache.storm.scheduler.Cluster;
+import org.apache.storm.scheduler.ExecutorDetails;
+import org.apache.storm.scheduler.SchedulerAssignment;
+import org.apache.storm.scheduler.SupervisorDetails;
+import org.apache.storm.scheduler.WorkerSlot;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RAS_Nodes {
+
+ private Map<String, RAS_Node> nodeMap;
+
+ private static final Logger LOG = LoggerFactory.getLogger(RAS_Nodes.class);
+
+ public RAS_Nodes(Cluster cluster) {
+ this.nodeMap = getAllNodesFrom(cluster);
+ }
+
+ public static Map<String, RAS_Node> getAllNodesFrom(Cluster cluster) {
+
+ //A map of node ids to node objects
+ Map<String, RAS_Node> nodeIdToNode = new HashMap<>();
+ //A map of assignments organized by node with the following format:
+ //{nodeId -> {topologyId -> {workerId -> {execs}}}}
+ Map<String, Map<String, Map<String, Collection<ExecutorDetails>>>> assignmentRelationshipMap = new HashMap<>();
+
+ Map<String, Map<String, WorkerSlot>> workerIdToWorker = new HashMap<>();
+ for (SchedulerAssignment assignment : cluster.getAssignments().values()) {
+ String topId = assignment.getTopologyId();
+
+ for (Map.Entry<WorkerSlot, Collection<ExecutorDetails>> entry :
+ assignment.getSlotToExecutors().entrySet()) {
+ WorkerSlot slot = entry.getKey();
+ String nodeId = slot.getNodeId();
+ Collection<ExecutorDetails> execs = entry.getValue();
+ if (!assignmentRelationshipMap.containsKey(nodeId)) {
+ assignmentRelationshipMap.put(
+ nodeId, new HashMap<String, Map<String, Collection<ExecutorDetails>>>());
+ workerIdToWorker.put(nodeId, new HashMap<String, WorkerSlot>());
+ }
+ workerIdToWorker.get(nodeId).put(slot.getId(), slot);
+ if (!assignmentRelationshipMap.get(nodeId).containsKey(topId)) {
+ assignmentRelationshipMap
+ .get(nodeId)
+ .put(topId, new HashMap<String, Collection<ExecutorDetails>>());
+ }
+ if (!assignmentRelationshipMap.get(nodeId).get(topId).containsKey(slot.getId())) {
+ assignmentRelationshipMap
+ .get(nodeId)
+ .get(topId)
+ .put(slot.getId(), new LinkedList<ExecutorDetails>());
+ }
+ assignmentRelationshipMap.get(nodeId).get(topId).get(slot.getId()).addAll(execs);
+ }
+ }
+
+ for (SupervisorDetails sup : cluster.getSupervisors().values()) {
+ //Initialize a worker slot for every port even if there is no assignment to it
+ for (int port : sup.getAllPorts()) {
+ WorkerSlot worker = new WorkerSlot(sup.getId(), port);
+ if (!workerIdToWorker.containsKey(sup.getId())) {
+ workerIdToWorker.put(sup.getId(), new HashMap<String, WorkerSlot>());
+ }
+ if (!workerIdToWorker.get(sup.getId()).containsKey(worker.getId())) {
+ workerIdToWorker.get(sup.getId()).put(worker.getId(), worker);
+ }
+ }
+ nodeIdToNode.put(
+ sup.getId(),
+ new RAS_Node(
+ sup.getId(),
+ sup,
+ cluster,
+ workerIdToWorker.get(sup.getId()),
+ assignmentRelationshipMap.get(sup.getId())));
+ }
+
+ //Add in supervisors that might have crashed but workers are still alive
+ for (Map.Entry<String, Map<String, Map<String, Collection<ExecutorDetails>>>> entry :
+ assignmentRelationshipMap.entrySet()) {
+ String nodeId = entry.getKey();
+ Map<String, Map<String, Collection<ExecutorDetails>>> assignments = entry.getValue();
+ if (!nodeIdToNode.containsKey(nodeId)) {
+ LOG.info(
+ "Found an assigned slot(s) on a dead supervisor {} with assignments {}",
+ nodeId,
+ assignments);
+ nodeIdToNode.put(
+ nodeId, new RAS_Node(nodeId, null, cluster, workerIdToWorker.get(nodeId), assignments));
+ }
+ }
+ return nodeIdToNode;
+ }
+
+ /** get node object from nodeId. */
+ public RAS_Node getNodeById(String nodeId) {
+ return this.nodeMap.get(nodeId);
+ }
+
+ /**
+ * Free everything on the given slots.
+ * @param workerSlots the slots to free
+ */
+ public void freeSlots(Collection<WorkerSlot> workerSlots) {
+ for (RAS_Node node : nodeMap.values()) {
+ for (WorkerSlot ws : node.getUsedSlots()) {
+ if (workerSlots.contains(ws)) {
+ LOG.debug("freeing ws {} on node {}", ws, node);
+ node.free(ws);
+ }
+ }
+ }
+ }
+
+ public Collection<RAS_Node> getNodes() {
+ return this.nodeMap.values();
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder ret = new StringBuilder();
+ for (RAS_Node node : nodeMap.values()) {
+ ret.append(node).append("\n");
+ }
+ return ret.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/b4d33955/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
index f92db8a..83231cd 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
@@ -18,44 +18,37 @@
package org.apache.storm.scheduler.resource;
+import java.util.HashMap;
+import java.util.Map;
+
import org.apache.storm.Config;
import org.apache.storm.DaemonConfig;
+import org.apache.storm.scheduler.Cluster;
+import org.apache.storm.scheduler.IScheduler;
+import org.apache.storm.scheduler.SingleTopologyCluster;
+import org.apache.storm.scheduler.Topologies;
+import org.apache.storm.scheduler.TopologyDetails;
import org.apache.storm.scheduler.resource.strategies.eviction.IEvictionStrategy;
import org.apache.storm.scheduler.resource.strategies.priority.ISchedulingPriorityStrategy;
import org.apache.storm.scheduler.resource.strategies.scheduling.IStrategy;
-import org.apache.storm.utils.Utils;
import org.apache.storm.utils.ReflectionUtils;
+import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.storm.scheduler.Cluster;
-import org.apache.storm.scheduler.ExecutorDetails;
-import org.apache.storm.scheduler.IScheduler;
-import org.apache.storm.scheduler.Topologies;
-import org.apache.storm.scheduler.TopologyDetails;
-import org.apache.storm.scheduler.WorkerSlot;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
public class ResourceAwareScheduler implements IScheduler {
-
- // Object that holds the current scheduling state
- private SchedulingState schedulingState;
-
- @SuppressWarnings("rawtypes")
+ private static final Logger LOG = LoggerFactory.getLogger(ResourceAwareScheduler.class);
private Map<String, Object> conf;
-
- private static final Logger LOG = LoggerFactory
- .getLogger(ResourceAwareScheduler.class);
+ private ISchedulingPriorityStrategy schedulingPrioritystrategy;
+ private IEvictionStrategy evictionStrategy;
@Override
public void prepare(Map<String, Object> conf) {
this.conf = conf;
-
+ schedulingPrioritystrategy = (ISchedulingPriorityStrategy) ReflectionUtils.newInstance(
+ (String) conf.get(DaemonConfig.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY));
+ evictionStrategy = (IEvictionStrategy) ReflectionUtils.newInstance(
+ (String) conf.get(DaemonConfig.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY));
}
@Override
@@ -65,296 +58,129 @@ public class ResourceAwareScheduler implements IScheduler {
@Override
public void schedule(Topologies topologies, Cluster cluster) {
- LOG.debug("\n\n\nRerunning ResourceAwareScheduler...");
//initialize data structures
- initialize(topologies, cluster);
- //logs everything that is currently scheduled and the location at which they are scheduled
- LOG.info("Cluster scheduling:\n{}", ResourceUtils.printScheduling(cluster, topologies));
- //logs the resources available/used for every node
- LOG.info("Nodes:\n{}", this.schedulingState.nodes);
- //logs the detailed info about each user
- for (User user : getUserMap().values()) {
- LOG.info(user.getDetailedInfo());
+ for (TopologyDetails td : cluster.getTopologies()) {
+ if (!cluster.needsSchedulingRas(td)) {
+ //cluster forgets about its previous status, so if it is scheduled just leave it.
+ cluster.setStatusIfAbsent(td.getId(), "Fully Scheduled");
+ }
}
+ Map<String, User> userMap = getUsers(cluster);
- ISchedulingPriorityStrategy schedulingPrioritystrategy = null;
while (true) {
-
- if (schedulingPrioritystrategy == null) {
- try {
- schedulingPrioritystrategy = (ISchedulingPriorityStrategy) ReflectionUtils.newInstance((String) this.conf.get(DaemonConfig.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY));
- } catch (RuntimeException ex) {
- LOG.error(String.format("failed to create instance of priority strategy: %s with error: %s! No topologies will be scheduled.",
- this.conf.get(DaemonConfig.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY), ex.getMessage()), ex);
- break;
- }
- }
TopologyDetails td;
try {
- //need to re prepare since scheduling state might have been restored
- schedulingPrioritystrategy.prepare(this.schedulingState);
//Call scheduling priority strategy
- td = schedulingPrioritystrategy.getNextTopologyToSchedule();
+ td = schedulingPrioritystrategy.getNextTopologyToSchedule(cluster, userMap);
} catch (Exception ex) {
- LOG.error(String.format("Exception thrown when running priority strategy %s. No topologies will be scheduled! Error: %s"
- , schedulingPrioritystrategy.getClass().getName(), ex.getMessage()), ex.getStackTrace());
+ LOG.error("Exception thrown when running priority strategy {}. No topologies will be scheduled!",
+ schedulingPrioritystrategy.getClass().getName(), ex);
break;
}
if (td == null) {
break;
}
- scheduleTopology(td);
-
- LOG.debug("Nodes after scheduling:\n{}", this.schedulingState.nodes);
+ User submitter = userMap.get(td.getTopologySubmitter());
+ if (cluster.needsSchedulingRas(td)) {
+ scheduleTopology(td, cluster, submitter, userMap);
+ } else {
+ LOG.warn("Topology {} is already fully scheduled!", td.getName());
+ cluster.setStatusIfAbsent(td.getId(), "Fully Scheduled");
+ }
}
-
- //update changes to cluster
- updateChanges(cluster, topologies);
- }
-
- private void updateChanges(Cluster cluster, Topologies topologies) {
- //Cannot simply set this.cluster=schedulingState.cluster since clojure is immutable
- cluster.setAssignments(schedulingState.cluster.getAssignments());
- cluster.setBlacklistedHosts(schedulingState.cluster.getBlacklistedHosts());
- cluster.setStatusMap(schedulingState.cluster.getStatusMap());
- cluster.setSupervisorsResourcesMap(schedulingState.cluster.getSupervisorsResourcesMap());
- cluster.setTopologyResourcesMap(schedulingState.cluster.getTopologyResourcesMap());
- cluster.setWorkerResourcesMap(schedulingState.cluster.getWorkerResourcesMap());
- //updating resources used by supervisor
- updateSupervisorsResources(cluster, topologies);
}
- public void scheduleTopology(TopologyDetails td) {
- User topologySubmitter = this.schedulingState.userMap.get(td.getTopologySubmitter());
- if (this.schedulingState.cluster.getUnassignedExecutors(td).size() > 0) {
- LOG.debug("/********Scheduling topology {} from User {}************/", td.getName(), topologySubmitter);
-
- SchedulingState schedulingState = checkpointSchedulingState();
- IStrategy rasStrategy = null;
+ public void scheduleTopology(TopologyDetails td, Cluster cluster, final User topologySubmitter,
+ Map<String, User> userMap) {
+ //A copy of cluster that we can modify, but does not get committed back to cluster unless scheduling succeeds
+ Cluster workingState = new Cluster(cluster);
+ IStrategy rasStrategy = null;
+ String strategyConf = (String) td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY);
+ try {
+ rasStrategy = (IStrategy) ReflectionUtils.newInstance(strategyConf);
+ rasStrategy.prepare(conf);
+ } catch (RuntimeException e) {
+ LOG.error("failed to create instance of IStrategy: {} Topology {} will not be scheduled.",
+ strategyConf, td.getName(), e);
+ topologySubmitter.markTopoUnsuccess(td);
+ cluster.setStatus(td.getId(), "Unsuccessful in scheduling - failed to create instance of topology strategy "
+ + strategyConf + ". Please check logs for details");
+ return;
+ }
+
+ while (true) {
+ // A copy of the cluster that restricts the strategy to only modify a single topology
+ SingleTopologyCluster toSchedule = new SingleTopologyCluster(workingState, td.getId());
+ SchedulingResult result = null;
try {
- rasStrategy = (IStrategy) ReflectionUtils.newInstance((String) td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY));
- } catch (RuntimeException e) {
- LOG.error("failed to create instance of IStrategy: {} with error: {}! Topology {} will not be scheduled.",
- td.getName(), td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY), e.getMessage());
- topologySubmitter = cleanup(schedulingState, td);
- topologySubmitter.moveTopoFromPendingToInvalid(td);
- this.schedulingState.cluster.setStatus(td.getId(), "Unsuccessful in scheduling - failed to create instance of topology strategy "
- + td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY) + ". Please check logs for details");
- return;
+ result = rasStrategy.schedule(toSchedule, td);
+ } catch (Exception ex) {
+ LOG.error("Exception thrown when running strategy {} to schedule topology {}."
+ + " Topology will not be scheduled!", rasStrategy.getClass().getName(), td.getName(), ex);
+ topologySubmitter.markTopoUnsuccess(td);
+ cluster.setStatus(td.getId(), "Unsuccessful in scheduling - Exception thrown when running strategy {}"
+ + rasStrategy.getClass().getName() + ". Please check logs for details");
}
- IEvictionStrategy evictionStrategy = null;
- while (true) {
- SchedulingResult result = null;
- try {
- // Need to re prepare scheduling strategy with cluster and topologies in case scheduling state was restored
- // Pass in a copy of scheduling state since the scheduling strategy should not be able to be able to make modifications to
- // the state of cluster directly
- rasStrategy.prepare(new SchedulingState(this.schedulingState));
- result = rasStrategy.schedule(td);
- } catch (Exception ex) {
- LOG.error(String.format("Exception thrown when running strategy %s to schedule topology %s. Topology will not be scheduled!"
- , rasStrategy.getClass().getName(), td.getName()), ex);
- topologySubmitter = cleanup(schedulingState, td);
- topologySubmitter.moveTopoFromPendingToInvalid(td);
- this.schedulingState.cluster.setStatus(td.getId(), "Unsuccessful in scheduling - Exception thrown when running strategy {}"
- + rasStrategy.getClass().getName() + ". Please check logs for details");
- }
- LOG.debug("scheduling result: {}", result);
- if (result != null && result.isValid()) {
- if (result.isSuccess()) {
+ LOG.debug("scheduling result: {}", result);
+ if (result != null) {
+ if (result.isSuccess()) {
+ try {
+ cluster.updateFrom(toSchedule);
+ cluster.setStatus(td.getId(), "Running - " + result.getMessage());
+ } catch (Exception ex) {
+ LOG.error("Unsuccessful attempting to assign executors to nodes.", ex);
+ topologySubmitter.markTopoUnsuccess(td);
+ cluster.setStatus(td.getId(), "Unsuccessful in scheduling - "
+ + "IllegalStateException thrown when attempting to assign executors to nodes. Please check"
+ + " log for details.");
+ }
+ return;
+ } else {
+ if (result.getStatus() == SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES) {
+ boolean madeSpace = false;
try {
- if (mkAssignment(td, result.getSchedulingResultMap())) {
- topologySubmitter.moveTopoFromPendingToRunning(td);
- this.schedulingState.cluster.setStatus(td.getId(), "Running - " + result.getMessage());
- } else {
- topologySubmitter = this.cleanup(schedulingState, td);
- topologySubmitter.moveTopoFromPendingToAttempted(td);
- this.schedulingState.cluster.setStatus(td.getId(), "Unsuccessful in scheduling - Unable to assign executors to nodes. Please check logs for details");
- }
- } catch (IllegalStateException ex) {
- LOG.error("Unsuccessful in scheduling - IllegalStateException thrown when attempting to assign executors to nodes.", ex);
- topologySubmitter = cleanup(schedulingState, td);
- topologySubmitter.moveTopoFromPendingToAttempted(td);
- this.schedulingState.cluster.setStatus(td.getId(), "Unsuccessful in scheduling - IllegalStateException thrown when attempting to assign executors to nodes. Please check log for details.");
+ //need to re prepare since scheduling state might have been restored
+ madeSpace = evictionStrategy.makeSpaceForTopo(td, workingState, userMap);
+ } catch (Exception ex) {
+ LOG.error("Exception thrown when running eviction strategy {} to schedule topology {}."
+ + " No evictions will be done!", evictionStrategy.getClass().getName(),
+ td.getName(), ex);
+ topologySubmitter.markTopoUnsuccess(td);
+ return;
}
- break;
- } else {
- if (result.getStatus() == SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES) {
- if (evictionStrategy == null) {
- try {
- evictionStrategy = (IEvictionStrategy) ReflectionUtils.newInstance((String) this.conf.get(DaemonConfig.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY));
- } catch (RuntimeException e) {
- LOG.error("failed to create instance of eviction strategy: {} with error: {}! No topology eviction will be done.",
- this.conf.get(DaemonConfig.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY), e.getMessage());
- topologySubmitter.moveTopoFromPendingToAttempted(td);
- break;
- }
- }
- boolean madeSpace = false;
- try {
- //need to re prepare since scheduling state might have been restored
- evictionStrategy.prepare(this.schedulingState);
- madeSpace = evictionStrategy.makeSpaceForTopo(td);
- } catch (Exception ex) {
- LOG.error(String.format("Exception thrown when running eviction strategy %s to schedule topology %s. No evictions will be done! Error: %s"
- , evictionStrategy.getClass().getName(), td.getName(), ex.getClass().getName()), ex);
- topologySubmitter = cleanup(schedulingState, td);
- topologySubmitter.moveTopoFromPendingToAttempted(td);
- break;
- }
- if (!madeSpace) {
- LOG.debug("Could not make space for topo {} will move to attempted", td);
- topologySubmitter = cleanup(schedulingState, td);
- topologySubmitter.moveTopoFromPendingToAttempted(td);
- this.schedulingState.cluster.setStatus(td.getId(), "Not enough resources to schedule - " + result.getErrorMessage());
- break;
- }
- continue;
- } else if (result.getStatus() == SchedulingStatus.FAIL_INVALID_TOPOLOGY) {
- topologySubmitter = cleanup(schedulingState, td);
- topologySubmitter.moveTopoFromPendingToInvalid(td, this.schedulingState.cluster);
- break;
- } else {
- topologySubmitter = cleanup(schedulingState, td);
- topologySubmitter.moveTopoFromPendingToAttempted(td, this.schedulingState.cluster);
- break;
+ if (!madeSpace) {
+ LOG.debug("Could not make space for topo {} will move to attempted", td);
+ topologySubmitter.markTopoUnsuccess(td);
+ cluster.setStatus(td.getId(), "Not enough resources to schedule - "
+ + result.getErrorMessage());
+ return;
}
+ continue;
+ } else {
+ topologySubmitter.markTopoUnsuccess(td, cluster);
+ return;
}
- } else {
- LOG.warn("Scheduling results returned from topology {} is not vaild! Topology with be ignored.", td.getName());
- topologySubmitter = cleanup(schedulingState, td);
- topologySubmitter.moveTopoFromPendingToInvalid(td, this.schedulingState.cluster);
- break;
- }
- }
- } else {
- LOG.warn("Topology {} is already fully scheduled!", td.getName());
- topologySubmitter.moveTopoFromPendingToRunning(td);
- if (this.schedulingState.cluster.getStatusMap().get(td.getId()) == null || this.schedulingState.cluster.getStatusMap().get(td.getId()).equals("")) {
- this.schedulingState.cluster.setStatus(td.getId(), "Fully Scheduled");
- }
- }
- }
-
- private User cleanup(SchedulingState schedulingState, TopologyDetails td) {
- restoreCheckpointSchedulingState(schedulingState);
- //since state is restored need the update User topologySubmitter to the new User object in userMap
- return this.schedulingState.userMap.get(td.getTopologySubmitter());
- }
-
- private boolean mkAssignment(TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>> schedulerAssignmentMap) {
- if (schedulerAssignmentMap != null) {
- double requestedMemOnHeap = td.getTotalRequestedMemOnHeap();
- double requestedMemOffHeap = td.getTotalRequestedMemOffHeap();
- double requestedCpu = td.getTotalRequestedCpu();
- double assignedMemOnHeap = 0.0;
- double assignedMemOffHeap = 0.0;
- double assignedCpu = 0.0;
-
- Map<WorkerSlot, Double[]> workerResources = new HashMap<WorkerSlot, Double[]>();
-
- Set<String> nodesUsed = new HashSet<String>();
- for (Map.Entry<WorkerSlot, Collection<ExecutorDetails>> workerToTasksEntry : schedulerAssignmentMap.entrySet()) {
- WorkerSlot targetSlot = workerToTasksEntry.getKey();
- Collection<ExecutorDetails> execsNeedScheduling = workerToTasksEntry.getValue();
- RAS_Node targetNode = this.schedulingState.nodes.getNodeById(targetSlot.getNodeId());
-
- targetSlot = allocateResourceToSlot(td, execsNeedScheduling, targetSlot);
-
- targetNode.assign(targetSlot, td, execsNeedScheduling);
-
- LOG.debug("ASSIGNMENT TOPOLOGY: {} TASKS: {} To Node: {} on Slot: {}",
- td.getName(), execsNeedScheduling, targetNode.getHostname(), targetSlot.getPort());
-
- for (ExecutorDetails exec : execsNeedScheduling) {
- targetNode.consumeResourcesforTask(exec, td);
- }
- if (!nodesUsed.contains(targetNode.getId())) {
- nodesUsed.add(targetNode.getId());
}
- assignedMemOnHeap += targetSlot.getAllocatedMemOnHeap();
- assignedMemOffHeap += targetSlot.getAllocatedMemOffHeap();
- assignedCpu += targetSlot.getAllocatedCpu();
-
- Double[] worker_resources = {
- requestedMemOnHeap, requestedMemOffHeap, requestedCpu,
- targetSlot.getAllocatedMemOnHeap(), targetSlot.getAllocatedMemOffHeap(), targetSlot.getAllocatedCpu()};
- workerResources.put (targetSlot, worker_resources);
- }
-
- Double[] resources = {requestedMemOnHeap, requestedMemOffHeap, requestedCpu,
- assignedMemOnHeap, assignedMemOffHeap, assignedCpu};
- LOG.debug("setTopologyResources for {}: requested on-heap mem, off-heap mem, cpu: {} {} {} " +
- "assigned on-heap mem, off-heap mem, cpu: {} {} {}",
- td.getId(), requestedMemOnHeap, requestedMemOffHeap, requestedCpu,
- assignedMemOnHeap, assignedMemOffHeap, assignedCpu);
- //updating resources used for a topology
- this.schedulingState.cluster.setTopologyResources(td.getId(), resources);
- this.schedulingState.cluster.setWorkerResources(td.getId(), workerResources);
- return true;
- } else {
- LOG.warn("schedulerAssignmentMap for topo {} is null. This shouldn't happen!", td.getName());
- return false;
- }
- }
-
- private WorkerSlot allocateResourceToSlot (TopologyDetails td, Collection<ExecutorDetails> executors, WorkerSlot slot) {
- double onHeapMem = 0.0;
- double offHeapMem = 0.0;
- double cpu = 0.0;
- for (ExecutorDetails exec : executors) {
- Double onHeapMemForExec = td.getOnHeapMemoryRequirement(exec);
- if (onHeapMemForExec != null) {
- onHeapMem += onHeapMemForExec;
- }
- Double offHeapMemForExec = td.getOffHeapMemoryRequirement(exec);
- if (offHeapMemForExec != null) {
- offHeapMem += offHeapMemForExec;
- }
- Double cpuForExec = td.getTotalCpuReqTask(exec);
- if (cpuForExec != null) {
- cpu += cpuForExec;
+ } else {
+ LOG.warn("Scheduling results returned from topology {} is not vaild! Topology with be ignored.",
+ td.getName());
+ topologySubmitter.markTopoUnsuccess(td, cluster);
+ return;
}
}
- return new WorkerSlot(slot.getNodeId(), slot.getPort(), onHeapMem, offHeapMem, cpu);
- }
-
- private void updateSupervisorsResources(Cluster cluster, Topologies topologies) {
- Map<String, Double[]> supervisors_resources = new HashMap<String, Double[]>();
- Map<String, RAS_Node> nodes = RAS_Nodes.getAllNodesFrom(cluster, topologies);
- for (Map.Entry<String, RAS_Node> entry : nodes.entrySet()) {
- RAS_Node node = entry.getValue();
- Double totalMem = node.getTotalMemoryResources();
- Double totalCpu = node.getTotalCpuResources();
- Double usedMem = totalMem - node.getAvailableMemoryResources();
- Double usedCpu = totalCpu - node.getAvailableCpuResources();
- Double[] resources = {totalMem, totalCpu, usedMem, usedCpu};
- supervisors_resources.put(entry.getKey(), resources);
- }
- cluster.setSupervisorsResourcesMap(supervisors_resources);
- }
-
- public User getUser(String user) {
- return this.schedulingState.userMap.get(user);
- }
-
- public Map<String, User> getUserMap() {
- return this.schedulingState.userMap;
}
/**
- * Intialize scheduling and running queues
+ * Get User wrappers around cluster.
*
- * @param topologies
- * @param cluster
+ * @param cluster the cluster to get the users out of.
*/
- private Map<String, User> getUsers(Topologies topologies, Cluster cluster) {
- Map<String, User> userMap = new HashMap<String, User>();
+ private Map<String, User> getUsers(Cluster cluster) {
+ Map<String, User> userMap = new HashMap<>();
Map<String, Map<String, Double>> userResourcePools = getUserResourcePools();
LOG.debug("userResourcePools: {}", userResourcePools);
- for (TopologyDetails td : topologies.getTopologies()) {
-
+ for (TopologyDetails td : cluster.getTopologies()) {
String topologySubmitter = td.getTopologySubmitter();
//additional safety check to make sure that topologySubmitter is going to be a valid value
if (topologySubmitter == null || topologySubmitter.equals("")) {
@@ -364,37 +190,23 @@ public class ResourceAwareScheduler implements IScheduler {
if (!userMap.containsKey(topologySubmitter)) {
userMap.put(topologySubmitter, new User(topologySubmitter, userResourcePools.get(topologySubmitter)));
}
- if (cluster.getUnassignedExecutors(td).size() > 0) {
- LOG.debug("adding td: {} to pending queue", td.getName());
- userMap.get(topologySubmitter).addTopologyToPendingQueue(td);
- } else {
- LOG.debug("adding td: {} to running queue with existing status: {}", td.getName(), cluster.getStatusMap().get(td.getId()));
- userMap.get(topologySubmitter).addTopologyToRunningQueue(td);
- if (cluster.getStatusMap().get(td.getId()) == null || cluster.getStatusMap().get(td.getId()).equals("")) {
- cluster.setStatus(td.getId(), "Fully Scheduled");
- }
- }
}
return userMap;
}
- private void initialize(Topologies topologies, Cluster cluster) {
- Map<String, User> userMap = getUsers(topologies, cluster);
- this.schedulingState = new SchedulingState(userMap, cluster, topologies, this.conf);
- }
-
/**
- * Get resource guarantee configs
+ * Get resource guarantee configs.
*
* @return a map that contains resource guarantees of every user of the following format
- * {userid->{resourceType->amountGuaranteed}}
+ * {userid->{resourceType->amountGuaranteed}}
*/
private Map<String, Map<String, Double>> getUserResourcePools() {
- Object raw = this.conf.get(DaemonConfig.RESOURCE_AWARE_SCHEDULER_USER_POOLS);
- Map<String, Map<String, Double>> ret = new HashMap<String, Map<String, Double>>();
+ Map<String, Map<String, Number>> raw =
+ (Map<String, Map<String, Number>>) conf.get(DaemonConfig.RESOURCE_AWARE_SCHEDULER_USER_POOLS);
+ Map<String, Map<String, Double>> ret = new HashMap<>();
if (raw != null) {
- for (Map.Entry<String, Map<String, Number>> userPoolEntry : ((Map<String, Map<String, Number>>) raw).entrySet()) {
+ for (Map.Entry<String, Map<String, Number>> userPoolEntry : raw.entrySet()) {
String user = userPoolEntry.getKey();
ret.put(user, new HashMap<String, Double>());
for (Map.Entry<String, Number> resourceEntry : userPoolEntry.getValue().entrySet()) {
@@ -404,7 +216,8 @@ public class ResourceAwareScheduler implements IScheduler {
}
Map fromFile = Utils.findAndReadConfigFile("user-resource-pools.yaml", false);
- Map<String, Map<String, Number>> tmp = (Map<String, Map<String, Number>>) fromFile.get(DaemonConfig.RESOURCE_AWARE_SCHEDULER_USER_POOLS);
+ Map<String, Map<String, Number>> tmp =
+ (Map<String, Map<String, Number>>) fromFile.get(DaemonConfig.RESOURCE_AWARE_SCHEDULER_USER_POOLS);
if (tmp != null) {
for (Map.Entry<String, Map<String, Number>> userPoolEntry : tmp.entrySet()) {
String user = userPoolEntry.getKey();
@@ -416,27 +229,4 @@ public class ResourceAwareScheduler implements IScheduler {
}
return ret;
}
-
- private SchedulingState checkpointSchedulingState() {
- LOG.debug("/*********Checkpoint scheduling state************/");
- for (User user : this.schedulingState.userMap.values()) {
- LOG.debug(user.getDetailedInfo());
- }
- LOG.debug(ResourceUtils.printScheduling(this.schedulingState.cluster, this.schedulingState.topologies));
- LOG.debug("nodes:\n{}", this.schedulingState.nodes);
- LOG.debug("/*********End************/");
- return new SchedulingState(this.schedulingState);
- }
-
- private void restoreCheckpointSchedulingState(SchedulingState schedulingState) {
- LOG.debug("/*********restoring scheduling state************/");
- //reseting cluster
- this.schedulingState = schedulingState;
- for (User user : this.schedulingState.userMap.values()) {
- LOG.debug(user.getDetailedInfo());
- }
- LOG.debug(ResourceUtils.printScheduling(this.schedulingState.cluster, this.schedulingState.topologies));
- LOG.debug("nodes:\n{}", this.schedulingState.nodes);
- LOG.debug("/*********End************/");
- }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/b4d33955/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java
new file mode 100644
index 0000000..a50dced
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.storm.Config;
+import org.apache.storm.generated.Bolt;
+import org.apache.storm.generated.SpoutSpec;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.utils.ObjectReader;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ResourceUtils {
+ private static final Logger LOG = LoggerFactory.getLogger(ResourceUtils.class);
+
+ public static Map<String, Map<String, Double>> getBoltsResources(StormTopology topology,
+ Map<String, Object> topologyConf) {
+ Map<String, Map<String, Double>> boltResources = new HashMap<>();
+ if (topology.get_bolts() != null) {
+ for (Map.Entry<String, Bolt> bolt : topology.get_bolts().entrySet()) {
+ Map<String, Double> topologyResources = parseResources(bolt.getValue().get_common().get_json_conf());
+ checkIntialization(topologyResources, bolt.getValue().toString(), topologyConf);
+ LOG.warn("Turned {} into {}", bolt.getValue().get_common().get_json_conf(), topologyResources);
+ boltResources.put(bolt.getKey(), topologyResources);
+ }
+ }
+ return boltResources;
+ }
+
+ public static Map<String, Map<String, Double>> getSpoutsResources(StormTopology topology,
+ Map<String, Object> topologyConf) {
+ Map<String, Map<String, Double>> spoutResources = new HashMap<>();
+ if (topology.get_spouts() != null) {
+ for (Map.Entry<String, SpoutSpec> spout : topology.get_spouts().entrySet()) {
+ Map<String, Double> topologyResources = parseResources(spout.getValue().get_common().get_json_conf());
+ checkIntialization(topologyResources, spout.getValue().toString(), topologyConf);
+ spoutResources.put(spout.getKey(), topologyResources);
+ }
+ }
+ return spoutResources;
+ }
+
+ public static void checkIntialization(Map<String, Double> topologyResources, String com,
+ Map<String, Object> topologyConf) {
+ checkInitMem(topologyResources, com, topologyConf);
+ checkInitCpu(topologyResources, com, topologyConf);
+ }
+
+ private static void checkInitMem(Map<String, Double> topologyResources, String com,
+ Map<String, Object> topologyConf) {
+ if (!topologyResources.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB)) {
+ Double onHeap = ObjectReader.getDouble(
+ topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB), null);
+ if (onHeap != null) {
+ topologyResources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, onHeap);
+ debugMessage("ONHEAP", com, topologyConf);
+ }
+ }
+ if (!topologyResources.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB)) {
+ Double offHeap = ObjectReader.getDouble(
+ topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB), null);
+ if (offHeap != null) {
+ topologyResources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, offHeap);
+ debugMessage("OFFHEAP", com, topologyConf);
+ }
+ }
+ }
+
+ private static void checkInitCpu(Map<String, Double> topologyResources, String com,
+ Map<String, Object> topologyConf) {
+ if (!topologyResources.containsKey(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT)) {
+ Double cpu = ObjectReader.getDouble(topologyConf.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT), null);
+ if (cpu != null) {
+ topologyResources.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, cpu);
+ debugMessage("CPU", com, topologyConf);
+ }
+ }
+ }
+
+ public static Map<String, Double> parseResources(String input) {
+ Map<String, Double> topologyResources = new HashMap<>();
+ JSONParser parser = new JSONParser();
+ LOG.debug("Input to parseResources {}", input);
+ try {
+ if (input != null) {
+ Object obj = parser.parse(input);
+ JSONObject jsonObject = (JSONObject) obj;
+ if (jsonObject.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB)) {
+ Double topoMemOnHeap = ObjectReader
+ .getDouble(jsonObject.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB), null);
+ topologyResources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, topoMemOnHeap);
+ }
+ if (jsonObject.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB)) {
+ Double topoMemOffHeap = ObjectReader
+ .getDouble(jsonObject.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB), null);
+ topologyResources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, topoMemOffHeap);
+ }
+ if (jsonObject.containsKey(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT)) {
+ Double topoCpu = ObjectReader.getDouble(jsonObject.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT),
+ null);
+ topologyResources.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, topoCpu);
+ }
+ LOG.debug("Topology Resources {}", topologyResources);
+ }
+ } catch (ParseException e) {
+ LOG.error("Failed to parse component resources is:" + e.toString(), e);
+ return null;
+ }
+ return topologyResources;
+ }
+
+ private static void debugMessage(String memoryType, String com, Map<String, Object> topologyConf) {
+ if (memoryType.equals("ONHEAP")) {
+ LOG.debug(
+ "Unable to extract resource requirement for Component {}\n"
+ + " Resource : Memory Type : On Heap set to default {}",
+ com, topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB));
+ } else if (memoryType.equals("OFFHEAP")) {
+ LOG.debug(
+ "Unable to extract resource requirement for Component {}\n"
+ + " Resource : Memory Type : Off Heap set to default {}",
+ com, topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB));
+ } else {
+ LOG.debug(
+ "Unable to extract resource requirement for Component {}\n"
+ + " Resource : CPU Pcore Percent set to default {}",
+ com, topologyConf.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT));
+ }
+ }
+
+ /**
+ * Calculate the sum of a collection of doubles.
+ * @param list collection of doubles
+ * @return the sum of of collection of doubles
+ */
+ public static double sum(Collection<Double> list) {
+ double sum = 0.0;
+ for (Double elem : list) {
+ sum += elem;
+ }
+ return sum;
+ }
+
+ /**
+ * Calculate the average of a collection of doubles.
+ * @param list a collection of doubles
+ * @return the average of collection of doubles
+ */
+ public static double avg(Collection<Double> list) {
+ return sum(list) / list.size();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/b4d33955/storm-server/src/main/java/org/apache/storm/scheduler/resource/SchedulingResult.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/SchedulingResult.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/SchedulingResult.java
new file mode 100644
index 0000000..72f083d
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/SchedulingResult.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class serves as a mechanism to return results and messages from a scheduling strategy to the Resource Aware
+ * Scheduler.
+ */
+public class SchedulingResult {
+ private static final Logger LOG = LoggerFactory.getLogger(SchedulingResult.class);
+
+ //status of scheduling the topology e.g. success or fail?
+ private final SchedulingStatus status;
+
+ //arbitrary message to be returned when scheduling is done
+ private final String message;
+
+ //error message returned is something went wrong
+ private final String errorMessage;
+
+ private SchedulingResult(SchedulingStatus status, String message, String errorMessage) {
+ this.status = status;
+ this.message = message;
+ this.errorMessage = errorMessage;
+ }
+
+ public static SchedulingResult failure(SchedulingStatus status, String errorMessage) {
+ return new SchedulingResult(status, null, errorMessage);
+ }
+
+ public static SchedulingResult success() {
+ return SchedulingResult.success(null);
+ }
+
+ public static SchedulingResult success(String message) {
+ return new SchedulingResult(SchedulingStatus.SUCCESS, message, null);
+ }
+
+ public SchedulingStatus getStatus() {
+ return this.status;
+ }
+
+ public String getMessage() {
+ return this.message;
+ }
+
+ public String getErrorMessage() {
+ return this.errorMessage;
+ }
+
+ public boolean isSuccess() {
+ return SchedulingStatus.isStatusSuccess(this.status);
+ }
+
+ public boolean isFailure() {
+ return SchedulingStatus.isStatusFailure(this.status);
+ }
+
+ @Override
+ public String toString() {
+ String ret = null;
+ if (isSuccess()) {
+ ret = "Status: " + this.getStatus() + " message: " + this.getMessage();
+ } else {
+ ret = "Status: " + this.getStatus() + " error message: " + this.getErrorMessage();
+ }
+ return ret;
+ }
+}