You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2018/09/10 16:31:35 UTC

[1/2] storm git commit: STORM-3216: Add in RasBlacklistStrategy

Repository: storm
Updated Branches:
  refs/heads/master f2ced23fa -> e6ff07180


STORM-3216: Add in RasBlacklistStrategy


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e3998864
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e3998864
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e3998864

Branch: refs/heads/master
Commit: e39988646d9ed3e96d129c85b65a0ddeb4233910
Parents: 7da6d6a
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Fri Sep 7 12:31:47 2018 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Fri Sep 7 16:06:08 2018 -0500

----------------------------------------------------------------------
 .../java/org/apache/storm/DaemonConfig.java     |   3 +
 .../org/apache/storm/scheduler/Cluster.java     |  37 +++++++
 .../storm/scheduler/ISchedulingState.java       |  21 +++-
 .../apache/storm/scheduler/TopologyDetails.java |  12 +++
 .../strategies/DefaultBlacklistStrategy.java    |  86 +++++++++-------
 .../strategies/RasBlacklistStrategy.java        | 103 +++++++++++++++++++
 .../normalization/NormalizedResourceOffer.java  |  22 ++++
 .../NormalizedResourceRequest.java              |   5 +
 .../normalization/NormalizedResources.java      |  37 +++++++
 .../NormalizedResourcesWithMemory.java          |   4 +
 .../scheduling/BaseResourceAwareStrategy.java   |   2 +-
 11 files changed, 295 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/e3998864/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
index d9869d4..9eaf6f6 100644
--- a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
+++ b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
@@ -122,6 +122,9 @@ public class DaemonConfig implements Validated {
 
     /**
      * The class that specifies the eviction strategy to use in blacklist scheduler.
+     * If you are using the RAS scheduler please set this to
+     * "org.apache.storm.scheduler.blacklist.strategies.RasBlacklistStrategy" or you may
+     * get odd behavior when the cluster is full and there are blacklisted nodes.
      */
     @NotNull
     @isImplementationOfClass(implementsClass = IBlacklistStrategy.class)

http://git-wip-us.apache.org/repos/asf/storm/blob/e3998864/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
index d014236..e8d771d 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
@@ -375,6 +375,18 @@ public class Cluster implements ISchedulingState {
     }
 
     @Override
+    public List<WorkerSlot> getNonBlacklistedAvailableSlots(List<String> blacklistedSupervisorIds) {
+        List<WorkerSlot> slots = new ArrayList<>();
+        for (SupervisorDetails supervisor : this.supervisors.values()) {
+            if (!isBlackListed(supervisor.getId()) && !blacklistedSupervisorIds.contains(supervisor.getId())) {
+                slots.addAll(getAvailableSlots(supervisor));
+            }
+        }
+
+        return slots;
+    }
+
+    @Override
     public List<WorkerSlot> getAvailableSlots() {
         List<WorkerSlot> slots = new ArrayList<>();
         for (SupervisorDetails supervisor : this.supervisors.values()) {
@@ -448,6 +460,19 @@ public class Cluster implements ISchedulingState {
         return slots.size();
     }
 
+    @Override
+    public NormalizedResourceOffer getAvailableResources(SupervisorDetails sd) {
+        NormalizedResourceOffer ret = new NormalizedResourceOffer(sd.getTotalResources());
+        for (SchedulerAssignment assignment: assignments.values()) {
+            for (Entry<WorkerSlot, WorkerResources> entry: assignment.getScheduledResources().entrySet()) {
+                if (sd.getId().equals(entry.getKey().getNodeId())) {
+                   ret.remove(entry.getValue());
+                }
+            }
+        }
+        return ret;
+    }
+
     private void addResource(Map<String, Double> resourceMap, String resourceName, Double valueToBeAdded) {
         if (!resourceMap.containsKey(resourceName)) {
             resourceMap.put(resourceName, 0.0);
@@ -782,6 +807,18 @@ public class Cluster implements ISchedulingState {
     }
 
     @Override
+    public NormalizedResourceOffer getNonBlacklistedClusterAvailableResources(Collection<String> blacklistedSupervisorIds) {
+        NormalizedResourceOffer available = new NormalizedResourceOffer();
+        for (SupervisorDetails sup : supervisors.values()) {
+            if (!isBlackListed(sup.getId()) && !blacklistedSupervisorIds.contains(sup.getId())) {
+                available.add(sup.getTotalResources());
+                available.remove(getAllScheduledResourcesForNode(sup.getId()));
+            }
+        }
+        return available;
+    }
+
+    @Override
     public double getClusterTotalCpuResource() {
         double sum = 0.0;
         for (SupervisorDetails sup : supervisors.values()) {

http://git-wip-us.apache.org/repos/asf/storm/blob/e3998864/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java b/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java
index d96109c..41a00b0 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java
@@ -154,6 +154,12 @@ public interface ISchedulingState {
     List<WorkerSlot> getAvailableSlots();
 
     /**
+     * Get all the available worker slots in the cluster, that are not blacklisted.
+     * @param blacklistedSupervisorIds list of supervisor ids that should also be considered blacklisted.
+     */
+    List<WorkerSlot> getNonBlacklistedAvailableSlots(List<String> blacklistedSupervisorIds);
+
+    /**
      * Return all non-blacklisted slots on this supervisor.
      *
      * @param supervisor the supervisor
@@ -188,6 +194,13 @@ public interface ISchedulingState {
     int getAssignedNumWorkers(TopologyDetails topology);
 
     /**
+     * Get the resources on the supervisor that are available to be scheduled.
+     * @param sd the supervisor.
+     * @return the resources available to be scheduled.
+     */
+    NormalizedResourceOffer getAvailableResources(SupervisorDetails sd);
+
+    /**
      * Would scheduling exec on ws fit? With a heap <= maxHeap total memory added <= memoryAvailable and cpu added <= cpuAvailable.
      *
      * @param ws                 the slot to put it in
@@ -239,10 +252,16 @@ public interface ISchedulingState {
 
     /**
      * Get all scheduled resources for node.
-     **/
+     */
     NormalizedResourceRequest getAllScheduledResourcesForNode(String nodeId);
 
     /**
+     * Get the resources in the cluster that are available for scheduling.
+     * @param blacklistedSupervisorIds other ids that are tentatively blacklisted.
+     */
+    NormalizedResourceOffer getNonBlacklistedClusterAvailableResources(Collection<String> blacklistedSupervisorIds);
+
+    /**
      * Get the total amount of CPU resources in cluster.
      */
     double getClusterTotalCpuResource();

http://git-wip-us.apache.org/repos/asf/storm/blob/e3998864/storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java b/storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java
index 6fd7d38..5d781aa 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java
@@ -340,6 +340,18 @@ public class TopologyDetails {
     }
 
     /**
+     * Get an approximate total resources needed for this topology.
+     * @return the approximate total resources needed for this topology.
+     */
+    public NormalizedResourceRequest getApproximateTotalResources() {
+        NormalizedResourceRequest ret = new NormalizedResourceRequest();
+        for (NormalizedResourceRequest resources : resourceList.values()) {
+            ret.add(resources);
+        }
+        return ret;
+    }
+
+    /**
      * Get the total CPU requirement for executor.
      *
      * @param exec

http://git-wip-us.apache.org/repos/asf/storm/blob/e3998864/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java
index 3332b2d..a0c5b6c 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java
@@ -12,6 +12,7 @@
 
 package org.apache.storm.scheduler.blacklist.strategies;
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -30,11 +31,14 @@ import org.apache.storm.utils.ObjectReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * The default strategy used for blacklisting hosts.
+ */
 public class DefaultBlacklistStrategy implements IBlacklistStrategy {
 
     public static final int DEFAULT_BLACKLIST_SCHEDULER_RESUME_TIME = 1800;
     public static final int DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_COUNT = 3;
-    private static Logger LOG = LoggerFactory.getLogger(DefaultBlacklistStrategy.class);
+    private static final Logger LOG = LoggerFactory.getLogger(DefaultBlacklistStrategy.class);
     private IReporter reporter;
 
     private int toleranceCount;
@@ -59,7 +63,7 @@ public class DefaultBlacklistStrategy implements IBlacklistStrategy {
 
     @Override
     public Set<String> getBlacklist(List<Map<String, Set<Integer>>> supervisorsWithFailures, Cluster cluster, Topologies topologies) {
-        Map<String, Integer> countMap = new HashMap<String, Integer>();
+        Map<String, Integer> countMap = new HashMap<>();
 
         for (Map<String, Set<Integer>> item : supervisorsWithFailures) {
             Set<String> supervisors = item.keySet();
@@ -68,25 +72,32 @@ public class DefaultBlacklistStrategy implements IBlacklistStrategy {
                 countMap.put(supervisor, supervisorCount + 1);
             }
         }
+
         for (Map.Entry<String, Integer> entry : countMap.entrySet()) {
             String supervisor = entry.getKey();
             int count = entry.getValue();
             if (count >= toleranceCount) {
                 if (!blacklist.containsKey(supervisor)) { // if not in blacklist then add it and set the resume time according to config
-                    LOG.debug("add supervisor {} to blacklist", supervisor);
+                    LOG.debug("Added supervisor {} to blacklist", supervisor);
                     LOG.debug("supervisorsWithFailures : {}", supervisorsWithFailures);
                     reporter.reportBlacklist(supervisor, supervisorsWithFailures);
                     blacklist.put(supervisor, resumeTime / nimbusMonitorFreqSecs);
                 }
             }
         }
-        releaseBlacklistWhenNeeded(cluster, topologies);
+        Set<String> toRelease = releaseBlacklistWhenNeeded(cluster, new ArrayList<>(blacklist.keySet()));
+        if (toRelease != null) {
+            LOG.debug("Releasing {} nodes because of low resources", toRelease.size());
+            for (String key: toRelease) {
+                blacklist.remove(key);
+            }
+        }
         return blacklist.keySet();
     }
 
     @Override
     public void resumeFromBlacklist() {
-        Set<String> readyToRemove = new HashSet<String>();
+        Set<String> readyToRemove = new HashSet<>();
         for (Map.Entry<String, Integer> entry : blacklist.entrySet()) {
             String key = entry.getKey();
             int value = entry.getValue() - 1;
@@ -102,48 +113,53 @@ public class DefaultBlacklistStrategy implements IBlacklistStrategy {
         }
     }
 
-    private void releaseBlacklistWhenNeeded(Cluster cluster, Topologies topologies) {
-        if (blacklist.size() > 0) {
-            int totalNeedNumWorkers = 0;
-            List<TopologyDetails> needSchedulingTopologies = cluster.needsSchedulingTopologies();
-            for (TopologyDetails topologyDetails : needSchedulingTopologies) {
-                int numWorkers = topologyDetails.getNumWorkers();
-                int assignedNumWorkers = cluster.getAssignedNumWorkers(topologyDetails);
-                int unAssignedNumWorkers = numWorkers - assignedNumWorkers;
-                totalNeedNumWorkers += unAssignedNumWorkers;
+    /**
+     * Decide when/if to release blacklisted hosts.
+     * @param cluster the current state of the cluster.
+     * @param blacklistedNodeIds the current set of blacklisted node ids sorted by earliest
+     * @return the set of nodes to be released.
+     */
+    protected Set<String> releaseBlacklistWhenNeeded(Cluster cluster, final List<String> blacklistedNodeIds) {
+        Set<String> readyToRemove = new HashSet<>();
+        if (blacklistedNodeIds.size() > 0) {
+            int availableSlots = cluster.getNonBlacklistedAvailableSlots(blacklistedNodeIds).size();
+            int neededSlots = 0;
+
+            for (TopologyDetails td : cluster.needsSchedulingTopologies()) {
+                int slots = td.getNumWorkers();
+                int assignedSlots = cluster.getAssignedNumWorkers(td);
+                int tdSlotsNeeded = slots - assignedSlots;
+                neededSlots += tdSlotsNeeded;
             }
+
+            //Now we need to free up some resources...
             Map<String, SupervisorDetails> availableSupervisors = cluster.getSupervisors();
-            List<WorkerSlot> availableSlots = cluster.getAvailableSlots();
-            int availableSlotsNotInBlacklistCount = 0;
-            for (WorkerSlot slot : availableSlots) {
-                if (!blacklist.containsKey(slot.getNodeId())) {
-                    availableSlotsNotInBlacklistCount += 1;
-                }
-            }
-            int shortage = totalNeedNumWorkers - availableSlotsNotInBlacklistCount;
+            int shortageSlots = neededSlots - availableSlots;
+            LOG.debug("Need {} slots.", neededSlots);
+            LOG.debug("Available {} slots.", availableSlots);
+            LOG.debug("Shortage {} slots.", shortageSlots);
 
-            if (shortage > 0) {
-                LOG.info("total needed num of workers :{}, available num of slots not in blacklist :{}, num blacklist :{}, " +
-                         "will release some blacklist.", totalNeedNumWorkers, availableSlotsNotInBlacklistCount, blacklist.size());
+            if (shortageSlots > 0) {
+                LOG.info("Need {} slots more. Releasing some blacklisted nodes to cover it.", shortageSlots);
 
                 //release earliest blacklist
-                Set<String> readyToRemove = new HashSet<>();
-                for (String supervisor : blacklist.keySet()) { //blacklist is treeMap sorted by value, minimum value means earliest
-                    if (availableSupervisors.containsKey(supervisor)) {
-                        Set<Integer> ports = cluster.getAvailablePorts(availableSupervisors.get(supervisor));
+                for (String supervisor : blacklistedNodeIds) {
+                    SupervisorDetails sd = availableSupervisors.get(supervisor);
+                    if (sd != null) {
+                        int sdAvailableSlots = cluster.getAvailablePorts(sd).size();
                         readyToRemove.add(supervisor);
-                        shortage -= ports.size();
-                        if (shortage <= 0) { //released enough supervisor
+                        shortageSlots -= sdAvailableSlots;
+                        LOG.debug("Releasing {} with {} slots leaving {} slots to go", supervisor,
+                            sdAvailableSlots, shortageSlots);
+                        if (shortageSlots <= 0) {
+                            // we have enough resources now...
                             break;
                         }
                     }
                 }
-                for (String key : readyToRemove) {
-                    blacklist.remove(key);
-                    LOG.info("release supervisor {} for shortage of worker slots.", key);
-                }
             }
         }
+        return readyToRemove;
     }
 
     private Object initializeInstance(String className, String representation) {

http://git-wip-us.apache.org/repos/asf/storm/blob/e3998864/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/RasBlacklistStrategy.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/RasBlacklistStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/RasBlacklistStrategy.java
new file mode 100644
index 0000000..f7abc04
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/RasBlacklistStrategy.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.blacklist.strategies;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.storm.generated.InvalidTopologyException;
+import org.apache.storm.scheduler.Cluster;
+import org.apache.storm.scheduler.SupervisorDetails;
+import org.apache.storm.scheduler.TopologyDetails;
+import org.apache.storm.scheduler.resource.normalization.NormalizedResourceOffer;
+import org.apache.storm.scheduler.resource.normalization.NormalizedResourceRequest;
+import org.apache.storm.scheduler.resource.normalization.NormalizedResourcesWithMemory;
+import org.apache.storm.utils.ServerUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Blacklisting strategy just like the default one, but specifically setup for use with the resource aware scheduler.
+ */
+public class RasBlacklistStrategy extends DefaultBlacklistStrategy {
+    private static final Logger LOG = LoggerFactory.getLogger(RasBlacklistStrategy.class);
+
+    @Override
+    protected Set<String> releaseBlacklistWhenNeeded(Cluster cluster, final List<String> blacklistedNodeIds) {
+        LOG.info("RAS We have {} nodes blacklisted...", blacklistedNodeIds.size());
+        Set<String> readyToRemove = new HashSet<>();
+        if (blacklistedNodeIds.size() > 0) {
+            int availableSlots = cluster.getNonBlacklistedAvailableSlots(blacklistedNodeIds).size();
+            int neededSlots = 0;
+            NormalizedResourceOffer available = cluster.getNonBlacklistedClusterAvailableResources(blacklistedNodeIds);
+            NormalizedResourceOffer needed = new NormalizedResourceOffer();
+
+            for (TopologyDetails td : cluster.getTopologies()) {
+                if (cluster.needsSchedulingRas(td)) {
+                    int slots = 0;
+                    try {
+                        slots = ServerUtils.getEstimatedWorkerCountForRASTopo(td.getConf(), td.getTopology());
+                    } catch (InvalidTopologyException e) {
+                        LOG.warn("Could not guess the number of slots needed for {}", td.getName(), e);
+                    }
+                    int assignedSlots = cluster.getAssignedNumWorkers(td);
+                    int tdSlotsNeeded = slots - assignedSlots;
+                    neededSlots += tdSlotsNeeded;
+
+                    NormalizedResourceRequest resources = td.getApproximateTotalResources();
+                    needed.add(resources);
+
+                    LOG.warn("{} needs to be scheduled with {} and {} slots", td.getName(), resources, tdSlotsNeeded);
+                }
+            }
+
+            //Now we need to free up some resources...
+            Map<String, SupervisorDetails> availableSupervisors = cluster.getSupervisors();
+            NormalizedResourceOffer shortage = new NormalizedResourceOffer(needed);
+            shortage.remove(available);
+            int shortageSlots = neededSlots - availableSlots;
+            LOG.debug("Need {} and {} slots.", needed, neededSlots);
+            LOG.debug("Available {} and {} slots.", available, availableSlots);
+            LOG.debug("Shortage {} and {} slots.", shortage, shortageSlots);
+
+            if (shortage.areAnyOverZero() || shortageSlots > 0) {
+                LOG.info("Need {} and {} slots more. Releasing some blacklisted nodes to cover it.", shortage, shortageSlots);
+                //release earliest blacklist
+                for (String supervisor : blacklistedNodeIds) {
+                    SupervisorDetails sd = availableSupervisors.get(supervisor);
+                    if (sd != null) {
+                        NormalizedResourcesWithMemory sdAvailable = cluster.getAvailableResources(sd);
+                        int sdAvailableSlots = cluster.getAvailablePorts(sd).size();
+                        readyToRemove.add(supervisor);
+                        shortage.remove(sdAvailable);
+                        shortageSlots -= sdAvailableSlots;
+                        LOG.debug("Releasing {} with {} and {} slots leaving {} and {} slots to go", supervisor,
+                            sdAvailable, sdAvailableSlots, shortage, shortageSlots);
+                        if (!shortage.areAnyOverZero() && shortageSlots <= 0) {
+                            // we have enough resources now...
+                            break;
+                        }
+                    }
+                }
+            }
+        }
+        return readyToRemove;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e3998864/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceOffer.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceOffer.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceOffer.java
index 2c1f5be..c680be7 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceOffer.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceOffer.java
@@ -20,6 +20,7 @@ package org.apache.storm.scheduler.resource.normalization;
 
 import java.util.Map;
 import org.apache.storm.Constants;
+import org.apache.storm.generated.WorkerResources;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -97,6 +98,22 @@ public class NormalizedResourceOffer implements NormalizedResourcesWithMemory {
     }
 
     /**
+     * Remove the resources in other from this.
+     * @param other the resources to be removed.
+     * @return true if one or more resources in other were larger than available resources in this, else false.
+     */
+    public boolean remove(WorkerResources other) {
+        boolean negativeResources = normalizedResources.remove(other);
+        totalMemoryMb -= (other.get_mem_off_heap() + other.get_mem_on_heap());
+        if (totalMemoryMb < 0.0) {
+            negativeResources = true;
+            NormalizedResources.numNegativeResourceEvents.mark();
+            totalMemoryMb = 0.0;
+        }
+        return negativeResources;
+    }
+
+    /**
      * Calculate the average percentage used.
      * @see NormalizedResources#calculateAveragePercentageUsedBy(org.apache.storm.scheduler.resource.normalization.NormalizedResources,
      *     double, double)
@@ -153,4 +170,9 @@ public class NormalizedResourceOffer implements NormalizedResourcesWithMemory {
         this.totalMemoryMb = 0.0;
         this.normalizedResources.clear();
     }
+
+    @Override
+    public boolean areAnyOverZero() {
+        return totalMemoryMb > 0 || normalizedResources.areAnyOverZero();
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/e3998864/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceRequest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceRequest.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceRequest.java
index 63a8a56..14c6846 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceRequest.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceRequest.java
@@ -242,4 +242,9 @@ public class NormalizedResourceRequest implements NormalizedResourcesWithMemory
         offHeap = 0.0;
         onHeap = 0.0;
     }
+
+    @Override
+    public boolean areAnyOverZero() {
+        return onHeap > 0 || offHeap > 0 || normalizedResources.areAnyOverZero();
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/e3998864/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java
index 4a06999..1e6af6f 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java
@@ -153,6 +153,30 @@ public class NormalizedResources {
         return ret;
     }
 
+    /**
+     * Remove the resources of a worker from this.
+     *
+     * @param value the worker resources that should be removed from this.
+     */
+    public boolean remove(WorkerResources value) {
+        Map<String, Double> workerNormalizedResources = value.get_resources();
+        cpu -= workerNormalizedResources.getOrDefault(Constants.COMMON_CPU_RESOURCE_NAME, 0.0);
+        return remove(RESOURCE_MAP_ARRAY_BRIDGE.translateToResourceArray(workerNormalizedResources)) || cpu < 0;
+    }
+
+    private boolean remove(double[] resourceArray) {
+        boolean ret = false;
+        int otherLength = resourceArray.length;
+        zeroPadOtherResourcesIfNecessary(otherLength);
+        for (int i = 0; i < otherLength; i++) {
+            otherResources[i] -= resourceArray[i];
+            if (otherResources[i] < 0) {
+                ret = true;
+            }
+        }
+        return ret;
+    }
+
     @Override
     public String toString() {
         return "Normalized resources: " + toNormalizedMap();
@@ -366,4 +390,17 @@ public class NormalizedResources {
             otherResources[i] = 0.0;
         }
     }
+
+    /**
+     * Are any of the resources positive.
+     * @return true of any of the resources are positive.  False if they are all <= 0.
+     */
+    public boolean areAnyOverZero() {
+        for (int i = 0; i < otherResources.length; i++) {
+            if (otherResources[i] > 0) {
+                return true;
+            }
+        }
+        return cpu > 0;
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/e3998864/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourcesWithMemory.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourcesWithMemory.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourcesWithMemory.java
index aeb0737..2c910ba 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourcesWithMemory.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourcesWithMemory.java
@@ -30,4 +30,8 @@ public interface NormalizedResourcesWithMemory {
      */
     void clear();
 
+    /**
+     * Return true if any of the resources are > 0.
+     */
+    boolean areAnyOverZero();
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/e3998864/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
index 7d22336..1132432 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
@@ -603,7 +603,7 @@ public abstract class BaseResourceAwareStrategy implements IStrategy {
      * @return the ids n that node.
      */
     public List<RAS_Node> hostnameToNodes(String hostname) {
-        return hostnameToNodes.get(hostname);
+        return hostnameToNodes.getOrDefault(hostname, Collections.emptyList());
     }
 
     /**


[2/2] storm git commit: Merge branch 'STORM-3216' of https://github.com/revans2/incubator-storm into STORM-3216

Posted by bo...@apache.org.
Merge branch 'STORM-3216' of https://github.com/revans2/incubator-storm into STORM-3216

STORM-3216: Add in RasBlacklistStrategy

This closes #2821


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e6ff0718
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e6ff0718
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e6ff0718

Branch: refs/heads/master
Commit: e6ff07180a9fb209c56bf38dc2eaf68f09a572a6
Parents: f2ced23 e399886
Author: Robert Evans <ev...@yahoo-inc.com>
Authored: Mon Sep 10 11:07:48 2018 -0500
Committer: Robert Evans <ev...@yahoo-inc.com>
Committed: Mon Sep 10 11:07:48 2018 -0500

----------------------------------------------------------------------
 .../java/org/apache/storm/DaemonConfig.java     |   3 +
 .../org/apache/storm/scheduler/Cluster.java     |  37 +++++++
 .../storm/scheduler/ISchedulingState.java       |  21 +++-
 .../apache/storm/scheduler/TopologyDetails.java |  12 +++
 .../strategies/DefaultBlacklistStrategy.java    |  86 +++++++++-------
 .../strategies/RasBlacklistStrategy.java        | 103 +++++++++++++++++++
 .../normalization/NormalizedResourceOffer.java  |  22 ++++
 .../NormalizedResourceRequest.java              |   5 +
 .../normalization/NormalizedResources.java      |  37 +++++++
 .../NormalizedResourcesWithMemory.java          |   4 +
 .../scheduling/BaseResourceAwareStrategy.java   |   2 +-
 11 files changed, 295 insertions(+), 37 deletions(-)
----------------------------------------------------------------------