You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2018/09/10 16:31:35 UTC
[1/2] storm git commit: STORM-3216: Add in RasBlacklistStrategy
Repository: storm
Updated Branches:
refs/heads/master f2ced23fa -> e6ff07180
STORM-3216: Add in RasBlacklistStrategy
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e3998864
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e3998864
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e3998864
Branch: refs/heads/master
Commit: e39988646d9ed3e96d129c85b65a0ddeb4233910
Parents: 7da6d6a
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Fri Sep 7 12:31:47 2018 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Fri Sep 7 16:06:08 2018 -0500
----------------------------------------------------------------------
.../java/org/apache/storm/DaemonConfig.java | 3 +
.../org/apache/storm/scheduler/Cluster.java | 37 +++++++
.../storm/scheduler/ISchedulingState.java | 21 +++-
.../apache/storm/scheduler/TopologyDetails.java | 12 +++
.../strategies/DefaultBlacklistStrategy.java | 86 +++++++++-------
.../strategies/RasBlacklistStrategy.java | 103 +++++++++++++++++++
.../normalization/NormalizedResourceOffer.java | 22 ++++
.../NormalizedResourceRequest.java | 5 +
.../normalization/NormalizedResources.java | 37 +++++++
.../NormalizedResourcesWithMemory.java | 4 +
.../scheduling/BaseResourceAwareStrategy.java | 2 +-
11 files changed, 295 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/e3998864/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
index d9869d4..9eaf6f6 100644
--- a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
+++ b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
@@ -122,6 +122,9 @@ public class DaemonConfig implements Validated {
/**
* The class that specifies the eviction strategy to use in blacklist scheduler.
+ * If you are using the RAS scheduler please set this to
+ * "org.apache.storm.scheduler.blacklist.strategies.RasBlacklistStrategy" or you may
+ * get odd behavior when the cluster is full and there are blacklisted nodes.
*/
@NotNull
@isImplementationOfClass(implementsClass = IBlacklistStrategy.class)
http://git-wip-us.apache.org/repos/asf/storm/blob/e3998864/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
index d014236..e8d771d 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
@@ -375,6 +375,18 @@ public class Cluster implements ISchedulingState {
}
@Override
+ public List<WorkerSlot> getNonBlacklistedAvailableSlots(List<String> blacklistedSupervisorIds) {
+ List<WorkerSlot> slots = new ArrayList<>();
+ for (SupervisorDetails supervisor : this.supervisors.values()) {
+ if (!isBlackListed(supervisor.getId()) && !blacklistedSupervisorIds.contains(supervisor.getId())) {
+ slots.addAll(getAvailableSlots(supervisor));
+ }
+ }
+
+ return slots;
+ }
+
+ @Override
public List<WorkerSlot> getAvailableSlots() {
List<WorkerSlot> slots = new ArrayList<>();
for (SupervisorDetails supervisor : this.supervisors.values()) {
@@ -448,6 +460,19 @@ public class Cluster implements ISchedulingState {
return slots.size();
}
+ @Override
+ public NormalizedResourceOffer getAvailableResources(SupervisorDetails sd) {
+ NormalizedResourceOffer ret = new NormalizedResourceOffer(sd.getTotalResources());
+ for (SchedulerAssignment assignment: assignments.values()) {
+ for (Entry<WorkerSlot, WorkerResources> entry: assignment.getScheduledResources().entrySet()) {
+ if (sd.getId().equals(entry.getKey().getNodeId())) {
+ ret.remove(entry.getValue());
+ }
+ }
+ }
+ return ret;
+ }
+
private void addResource(Map<String, Double> resourceMap, String resourceName, Double valueToBeAdded) {
if (!resourceMap.containsKey(resourceName)) {
resourceMap.put(resourceName, 0.0);
@@ -782,6 +807,18 @@ public class Cluster implements ISchedulingState {
}
@Override
+ public NormalizedResourceOffer getNonBlacklistedClusterAvailableResources(Collection<String> blacklistedSupervisorIds) {
+ NormalizedResourceOffer available = new NormalizedResourceOffer();
+ for (SupervisorDetails sup : supervisors.values()) {
+ if (!isBlackListed(sup.getId()) && !blacklistedSupervisorIds.contains(sup.getId())) {
+ available.add(sup.getTotalResources());
+ available.remove(getAllScheduledResourcesForNode(sup.getId()));
+ }
+ }
+ return available;
+ }
+
+ @Override
public double getClusterTotalCpuResource() {
double sum = 0.0;
for (SupervisorDetails sup : supervisors.values()) {
http://git-wip-us.apache.org/repos/asf/storm/blob/e3998864/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java b/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java
index d96109c..41a00b0 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java
@@ -154,6 +154,12 @@ public interface ISchedulingState {
List<WorkerSlot> getAvailableSlots();
/**
+ * Get all the available worker slots in the cluster, that are not blacklisted.
+ * @param blacklistedSupervisorIds list of supervisor ids that should also be considered blacklisted.
+ */
+ List<WorkerSlot> getNonBlacklistedAvailableSlots(List<String> blacklistedSupervisorIds);
+
+ /**
* Return all non-blacklisted slots on this supervisor.
*
* @param supervisor the supervisor
@@ -188,6 +194,13 @@ public interface ISchedulingState {
int getAssignedNumWorkers(TopologyDetails topology);
/**
+ * Get the resources on the supervisor that are available to be scheduled.
+ * @param sd the supervisor.
+ * @return the resources available to be scheduled.
+ */
+ NormalizedResourceOffer getAvailableResources(SupervisorDetails sd);
+
+ /**
* Would scheduling exec on ws fit? With a heap <= maxHeap total memory added <= memoryAvailable and cpu added <= cpuAvailable.
*
* @param ws the slot to put it in
@@ -239,10 +252,16 @@ public interface ISchedulingState {
/**
* Get all scheduled resources for node.
- **/
+ */
NormalizedResourceRequest getAllScheduledResourcesForNode(String nodeId);
/**
+ * Get the resources in the cluster that are available for scheduling.
+ * @param blacklistedSupervisorIds other ids that are tentatively blacklisted.
+ */
+ NormalizedResourceOffer getNonBlacklistedClusterAvailableResources(Collection<String> blacklistedSupervisorIds);
+
+ /**
* Get the total amount of CPU resources in cluster.
*/
double getClusterTotalCpuResource();
http://git-wip-us.apache.org/repos/asf/storm/blob/e3998864/storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java b/storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java
index 6fd7d38..5d781aa 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java
@@ -340,6 +340,18 @@ public class TopologyDetails {
}
/**
+ * Get an approximate total resources needed for this topology.
+ * @return the approximate total resources needed for this topology.
+ */
+ public NormalizedResourceRequest getApproximateTotalResources() {
+ NormalizedResourceRequest ret = new NormalizedResourceRequest();
+ for (NormalizedResourceRequest resources : resourceList.values()) {
+ ret.add(resources);
+ }
+ return ret;
+ }
+
+ /**
* Get the total CPU requirement for executor.
*
* @param exec
http://git-wip-us.apache.org/repos/asf/storm/blob/e3998864/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java
index 3332b2d..a0c5b6c 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java
@@ -12,6 +12,7 @@
package org.apache.storm.scheduler.blacklist.strategies;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -30,11 +31,14 @@ import org.apache.storm.utils.ObjectReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * The default strategy used for blacklisting hosts.
+ */
public class DefaultBlacklistStrategy implements IBlacklistStrategy {
public static final int DEFAULT_BLACKLIST_SCHEDULER_RESUME_TIME = 1800;
public static final int DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_COUNT = 3;
- private static Logger LOG = LoggerFactory.getLogger(DefaultBlacklistStrategy.class);
+ private static final Logger LOG = LoggerFactory.getLogger(DefaultBlacklistStrategy.class);
private IReporter reporter;
private int toleranceCount;
@@ -59,7 +63,7 @@ public class DefaultBlacklistStrategy implements IBlacklistStrategy {
@Override
public Set<String> getBlacklist(List<Map<String, Set<Integer>>> supervisorsWithFailures, Cluster cluster, Topologies topologies) {
- Map<String, Integer> countMap = new HashMap<String, Integer>();
+ Map<String, Integer> countMap = new HashMap<>();
for (Map<String, Set<Integer>> item : supervisorsWithFailures) {
Set<String> supervisors = item.keySet();
@@ -68,25 +72,32 @@ public class DefaultBlacklistStrategy implements IBlacklistStrategy {
countMap.put(supervisor, supervisorCount + 1);
}
}
+
for (Map.Entry<String, Integer> entry : countMap.entrySet()) {
String supervisor = entry.getKey();
int count = entry.getValue();
if (count >= toleranceCount) {
if (!blacklist.containsKey(supervisor)) { // if not in blacklist then add it and set the resume time according to config
- LOG.debug("add supervisor {} to blacklist", supervisor);
+ LOG.debug("Added supervisor {} to blacklist", supervisor);
LOG.debug("supervisorsWithFailures : {}", supervisorsWithFailures);
reporter.reportBlacklist(supervisor, supervisorsWithFailures);
blacklist.put(supervisor, resumeTime / nimbusMonitorFreqSecs);
}
}
}
- releaseBlacklistWhenNeeded(cluster, topologies);
+ Set<String> toRelease = releaseBlacklistWhenNeeded(cluster, new ArrayList<>(blacklist.keySet()));
+ if (toRelease != null) {
+ LOG.debug("Releasing {} nodes because of low resources", toRelease.size());
+ for (String key: toRelease) {
+ blacklist.remove(key);
+ }
+ }
return blacklist.keySet();
}
@Override
public void resumeFromBlacklist() {
- Set<String> readyToRemove = new HashSet<String>();
+ Set<String> readyToRemove = new HashSet<>();
for (Map.Entry<String, Integer> entry : blacklist.entrySet()) {
String key = entry.getKey();
int value = entry.getValue() - 1;
@@ -102,48 +113,53 @@ public class DefaultBlacklistStrategy implements IBlacklistStrategy {
}
}
- private void releaseBlacklistWhenNeeded(Cluster cluster, Topologies topologies) {
- if (blacklist.size() > 0) {
- int totalNeedNumWorkers = 0;
- List<TopologyDetails> needSchedulingTopologies = cluster.needsSchedulingTopologies();
- for (TopologyDetails topologyDetails : needSchedulingTopologies) {
- int numWorkers = topologyDetails.getNumWorkers();
- int assignedNumWorkers = cluster.getAssignedNumWorkers(topologyDetails);
- int unAssignedNumWorkers = numWorkers - assignedNumWorkers;
- totalNeedNumWorkers += unAssignedNumWorkers;
+ /**
+ * Decide when/if to release blacklisted hosts.
+ * @param cluster the current state of the cluster.
+ * @param blacklistedNodeIds the current set of blacklisted node ids sorted by earliest
+ * @return the set of nodes to be released.
+ */
+ protected Set<String> releaseBlacklistWhenNeeded(Cluster cluster, final List<String> blacklistedNodeIds) {
+ Set<String> readyToRemove = new HashSet<>();
+ if (blacklistedNodeIds.size() > 0) {
+ int availableSlots = cluster.getNonBlacklistedAvailableSlots(blacklistedNodeIds).size();
+ int neededSlots = 0;
+
+ for (TopologyDetails td : cluster.needsSchedulingTopologies()) {
+ int slots = td.getNumWorkers();
+ int assignedSlots = cluster.getAssignedNumWorkers(td);
+ int tdSlotsNeeded = slots - assignedSlots;
+ neededSlots += tdSlotsNeeded;
}
+
+ //Now we need to free up some resources...
Map<String, SupervisorDetails> availableSupervisors = cluster.getSupervisors();
- List<WorkerSlot> availableSlots = cluster.getAvailableSlots();
- int availableSlotsNotInBlacklistCount = 0;
- for (WorkerSlot slot : availableSlots) {
- if (!blacklist.containsKey(slot.getNodeId())) {
- availableSlotsNotInBlacklistCount += 1;
- }
- }
- int shortage = totalNeedNumWorkers - availableSlotsNotInBlacklistCount;
+ int shortageSlots = neededSlots - availableSlots;
+ LOG.debug("Need {} slots.", neededSlots);
+ LOG.debug("Available {} slots.", availableSlots);
+ LOG.debug("Shortage {} slots.", shortageSlots);
- if (shortage > 0) {
- LOG.info("total needed num of workers :{}, available num of slots not in blacklist :{}, num blacklist :{}, " +
- "will release some blacklist.", totalNeedNumWorkers, availableSlotsNotInBlacklistCount, blacklist.size());
+ if (shortageSlots > 0) {
+ LOG.info("Need {} slots more. Releasing some blacklisted nodes to cover it.", shortageSlots);
//release earliest blacklist
- Set<String> readyToRemove = new HashSet<>();
- for (String supervisor : blacklist.keySet()) { //blacklist is treeMap sorted by value, minimum value means earliest
- if (availableSupervisors.containsKey(supervisor)) {
- Set<Integer> ports = cluster.getAvailablePorts(availableSupervisors.get(supervisor));
+ for (String supervisor : blacklistedNodeIds) {
+ SupervisorDetails sd = availableSupervisors.get(supervisor);
+ if (sd != null) {
+ int sdAvailableSlots = cluster.getAvailablePorts(sd).size();
readyToRemove.add(supervisor);
- shortage -= ports.size();
- if (shortage <= 0) { //released enough supervisor
+ shortageSlots -= sdAvailableSlots;
+ LOG.debug("Releasing {} with {} slots leaving {} slots to go", supervisor,
+ sdAvailableSlots, shortageSlots);
+ if (shortageSlots <= 0) {
+ // we have enough resources now...
break;
}
}
}
- for (String key : readyToRemove) {
- blacklist.remove(key);
- LOG.info("release supervisor {} for shortage of worker slots.", key);
- }
}
}
+ return readyToRemove;
}
private Object initializeInstance(String className, String representation) {
http://git-wip-us.apache.org/repos/asf/storm/blob/e3998864/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/RasBlacklistStrategy.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/RasBlacklistStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/RasBlacklistStrategy.java
new file mode 100644
index 0000000..f7abc04
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/RasBlacklistStrategy.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.blacklist.strategies;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.storm.generated.InvalidTopologyException;
+import org.apache.storm.scheduler.Cluster;
+import org.apache.storm.scheduler.SupervisorDetails;
+import org.apache.storm.scheduler.TopologyDetails;
+import org.apache.storm.scheduler.resource.normalization.NormalizedResourceOffer;
+import org.apache.storm.scheduler.resource.normalization.NormalizedResourceRequest;
+import org.apache.storm.scheduler.resource.normalization.NormalizedResourcesWithMemory;
+import org.apache.storm.utils.ServerUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Blacklisting strategy just like the default one, but specifically setup for use with the resource aware scheduler.
+ */
+public class RasBlacklistStrategy extends DefaultBlacklistStrategy {
+ private static final Logger LOG = LoggerFactory.getLogger(RasBlacklistStrategy.class);
+
+ @Override
+ protected Set<String> releaseBlacklistWhenNeeded(Cluster cluster, final List<String> blacklistedNodeIds) {
+ LOG.info("RAS We have {} nodes blacklisted...", blacklistedNodeIds.size());
+ Set<String> readyToRemove = new HashSet<>();
+ if (blacklistedNodeIds.size() > 0) {
+ int availableSlots = cluster.getNonBlacklistedAvailableSlots(blacklistedNodeIds).size();
+ int neededSlots = 0;
+ NormalizedResourceOffer available = cluster.getNonBlacklistedClusterAvailableResources(blacklistedNodeIds);
+ NormalizedResourceOffer needed = new NormalizedResourceOffer();
+
+ for (TopologyDetails td : cluster.getTopologies()) {
+ if (cluster.needsSchedulingRas(td)) {
+ int slots = 0;
+ try {
+ slots = ServerUtils.getEstimatedWorkerCountForRASTopo(td.getConf(), td.getTopology());
+ } catch (InvalidTopologyException e) {
+ LOG.warn("Could not guess the number of slots needed for {}", td.getName(), e);
+ }
+ int assignedSlots = cluster.getAssignedNumWorkers(td);
+ int tdSlotsNeeded = slots - assignedSlots;
+ neededSlots += tdSlotsNeeded;
+
+ NormalizedResourceRequest resources = td.getApproximateTotalResources();
+ needed.add(resources);
+
+ LOG.warn("{} needs to be scheduled with {} and {} slots", td.getName(), resources, tdSlotsNeeded);
+ }
+ }
+
+ //Now we need to free up some resources...
+ Map<String, SupervisorDetails> availableSupervisors = cluster.getSupervisors();
+ NormalizedResourceOffer shortage = new NormalizedResourceOffer(needed);
+ shortage.remove(available);
+ int shortageSlots = neededSlots - availableSlots;
+ LOG.debug("Need {} and {} slots.", needed, neededSlots);
+ LOG.debug("Available {} and {} slots.", available, availableSlots);
+ LOG.debug("Shortage {} and {} slots.", shortage, shortageSlots);
+
+ if (shortage.areAnyOverZero() || shortageSlots > 0) {
+ LOG.info("Need {} and {} slots more. Releasing some blacklisted nodes to cover it.", shortage, shortageSlots);
+ //release earliest blacklist
+ for (String supervisor : blacklistedNodeIds) {
+ SupervisorDetails sd = availableSupervisors.get(supervisor);
+ if (sd != null) {
+ NormalizedResourcesWithMemory sdAvailable = cluster.getAvailableResources(sd);
+ int sdAvailableSlots = cluster.getAvailablePorts(sd).size();
+ readyToRemove.add(supervisor);
+ shortage.remove(sdAvailable);
+ shortageSlots -= sdAvailableSlots;
+ LOG.debug("Releasing {} with {} and {} slots leaving {} and {} slots to go", supervisor,
+ sdAvailable, sdAvailableSlots, shortage, shortageSlots);
+ if (!shortage.areAnyOverZero() && shortageSlots <= 0) {
+ // we have enough resources now...
+ break;
+ }
+ }
+ }
+ }
+ }
+ return readyToRemove;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e3998864/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceOffer.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceOffer.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceOffer.java
index 2c1f5be..c680be7 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceOffer.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceOffer.java
@@ -20,6 +20,7 @@ package org.apache.storm.scheduler.resource.normalization;
import java.util.Map;
import org.apache.storm.Constants;
+import org.apache.storm.generated.WorkerResources;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -97,6 +98,22 @@ public class NormalizedResourceOffer implements NormalizedResourcesWithMemory {
}
/**
+ * Remove the resources in other from this.
+ * @param other the resources to be removed.
+ * @return true if one or more resources in other were larger than available resources in this, else false.
+ */
+ public boolean remove(WorkerResources other) {
+ boolean negativeResources = normalizedResources.remove(other);
+ totalMemoryMb -= (other.get_mem_off_heap() + other.get_mem_on_heap());
+ if (totalMemoryMb < 0.0) {
+ negativeResources = true;
+ NormalizedResources.numNegativeResourceEvents.mark();
+ totalMemoryMb = 0.0;
+ }
+ return negativeResources;
+ }
+
+ /**
* Calculate the average percentage used.
* @see NormalizedResources#calculateAveragePercentageUsedBy(org.apache.storm.scheduler.resource.normalization.NormalizedResources,
* double, double)
@@ -153,4 +170,9 @@ public class NormalizedResourceOffer implements NormalizedResourcesWithMemory {
this.totalMemoryMb = 0.0;
this.normalizedResources.clear();
}
+
+ @Override
+ public boolean areAnyOverZero() {
+ return totalMemoryMb > 0 || normalizedResources.areAnyOverZero();
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/e3998864/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceRequest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceRequest.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceRequest.java
index 63a8a56..14c6846 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceRequest.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceRequest.java
@@ -242,4 +242,9 @@ public class NormalizedResourceRequest implements NormalizedResourcesWithMemory
offHeap = 0.0;
onHeap = 0.0;
}
+
+ @Override
+ public boolean areAnyOverZero() {
+ return onHeap > 0 || offHeap > 0 || normalizedResources.areAnyOverZero();
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/e3998864/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java
index 4a06999..1e6af6f 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java
@@ -153,6 +153,30 @@ public class NormalizedResources {
return ret;
}
+ /**
+ * Remove the resources of a worker from this.
+ *
+ * @param value the worker resources that should be removed from this.
+ */
+ public boolean remove(WorkerResources value) {
+ Map<String, Double> workerNormalizedResources = value.get_resources();
+ cpu -= workerNormalizedResources.getOrDefault(Constants.COMMON_CPU_RESOURCE_NAME, 0.0);
+ return remove(RESOURCE_MAP_ARRAY_BRIDGE.translateToResourceArray(workerNormalizedResources)) || cpu < 0;
+ }
+
+ private boolean remove(double[] resourceArray) {
+ boolean ret = false;
+ int otherLength = resourceArray.length;
+ zeroPadOtherResourcesIfNecessary(otherLength);
+ for (int i = 0; i < otherLength; i++) {
+ otherResources[i] -= resourceArray[i];
+ if (otherResources[i] < 0) {
+ ret = true;
+ }
+ }
+ return ret;
+ }
+
@Override
public String toString() {
return "Normalized resources: " + toNormalizedMap();
@@ -366,4 +390,17 @@ public class NormalizedResources {
otherResources[i] = 0.0;
}
}
+
+ /**
+ * Are any of the resources positive.
+ * @return true of any of the resources are positive. False if they are all <= 0.
+ */
+ public boolean areAnyOverZero() {
+ for (int i = 0; i < otherResources.length; i++) {
+ if (otherResources[i] > 0) {
+ return true;
+ }
+ }
+ return cpu > 0;
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/e3998864/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourcesWithMemory.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourcesWithMemory.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourcesWithMemory.java
index aeb0737..2c910ba 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourcesWithMemory.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourcesWithMemory.java
@@ -30,4 +30,8 @@ public interface NormalizedResourcesWithMemory {
*/
void clear();
+ /**
+ * Return true if any of the resources are > 0.
+ */
+ boolean areAnyOverZero();
}
http://git-wip-us.apache.org/repos/asf/storm/blob/e3998864/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
index 7d22336..1132432 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
@@ -603,7 +603,7 @@ public abstract class BaseResourceAwareStrategy implements IStrategy {
* @return the ids n that node.
*/
public List<RAS_Node> hostnameToNodes(String hostname) {
- return hostnameToNodes.get(hostname);
+ return hostnameToNodes.getOrDefault(hostname, Collections.emptyList());
}
/**
[2/2] storm git commit: Merge branch 'STORM-3216' of
https://github.com/revans2/incubator-storm into STORM-3216
Posted by bo...@apache.org.
Merge branch 'STORM-3216' of https://github.com/revans2/incubator-storm into STORM-3216
STORM-3216: Add in RasBlacklistStrategy
This closes #2821
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e6ff0718
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e6ff0718
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e6ff0718
Branch: refs/heads/master
Commit: e6ff07180a9fb209c56bf38dc2eaf68f09a572a6
Parents: f2ced23 e399886
Author: Robert Evans <ev...@yahoo-inc.com>
Authored: Mon Sep 10 11:07:48 2018 -0500
Committer: Robert Evans <ev...@yahoo-inc.com>
Committed: Mon Sep 10 11:07:48 2018 -0500
----------------------------------------------------------------------
.../java/org/apache/storm/DaemonConfig.java | 3 +
.../org/apache/storm/scheduler/Cluster.java | 37 +++++++
.../storm/scheduler/ISchedulingState.java | 21 +++-
.../apache/storm/scheduler/TopologyDetails.java | 12 +++
.../strategies/DefaultBlacklistStrategy.java | 86 +++++++++-------
.../strategies/RasBlacklistStrategy.java | 103 +++++++++++++++++++
.../normalization/NormalizedResourceOffer.java | 22 ++++
.../NormalizedResourceRequest.java | 5 +
.../normalization/NormalizedResources.java | 37 +++++++
.../NormalizedResourcesWithMemory.java | 4 +
.../scheduling/BaseResourceAwareStrategy.java | 2 +-
11 files changed, 295 insertions(+), 37 deletions(-)
----------------------------------------------------------------------