You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2018/01/08 21:20:56 UTC
[3/6] storm git commit: Split up NormalizedResources into a few
classes with more narrow responsibilities. Make NormalizedResources a regular
class instead of an abstract class. Add some tests. Make calculateAvg/Min
throw exceptions if the resource total
Split up NormalizedResources into a few classes with more narrow responsibilities. Make NormalizedResources a regular class instead of an abstract class. Add some tests. Make calculateAvg/Min throw exceptions if the resource total is not a superset of the used resources.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/41db23fe
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/41db23fe
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/41db23fe
Branch: refs/heads/master
Commit: 41db23fe86299f71b6ecc7deaa70ab731899c83f
Parents: 8aae491
Author: Stig Rohde Døssing <sr...@apache.org>
Authored: Fri Jan 5 21:56:54 2018 +0100
Committer: Stig Rohde Døssing <sr...@apache.org>
Committed: Sun Jan 7 18:27:04 2018 +0100
----------------------------------------------------------------------
storm-server/pom.xml | 2 +-
.../apache/storm/blobstore/BlobStoreUtils.java | 2 -
.../org/apache/storm/daemon/nimbus/Nimbus.java | 5 +-
.../supervisor/timer/SupervisorHeartbeat.java | 16 +-
.../org/apache/storm/scheduler/Cluster.java | 5 +-
.../resource/NormalizedResourceOffer.java | 89 ++--
.../resource/NormalizedResourceRequest.java | 65 ++-
.../scheduler/resource/NormalizedResources.java | 294 ++++++--------
.../resource/NormalizedResourcesWithMemory.java | 28 ++
.../storm/scheduler/resource/RAS_Node.java | 3 -
.../storm/scheduler/resource/RAS_Nodes.java | 1 -
.../resource/ResourceAwareScheduler.java | 1 -
.../resource/ResourceMapArrayBridge.java | 89 ++++
.../resource/ResourceNameNormalizer.java | 65 +++
.../storm/scheduler/resource/ResourceUtils.java | 14 +-
.../scheduling/BaseResourceAwareStrategy.java | 6 +-
.../DefaultResourceAwareStrategy.java | 2 -
.../GenericResourceAwareStrategy.java | 3 -
.../org/apache/storm/utils/ServerUtils.java | 16 +-
.../resource/TestResourceAwareScheduler.java | 7 +-
.../TestUtilsForResourceAwareScheduler.java | 4 +-
.../normalization/NormalizedResourcesRule.java | 50 +++
.../normalization/NormalizedResourcesTest.java | 404 +++++++++++++++++++
.../ResourceMapArrayBridgeTest.java | 65 +++
.../scheduling/NormalizedResourcesRule.java | 50 ---
.../TestDefaultResourceAwareStrategy.java | 1 +
26 files changed, 941 insertions(+), 346 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/pom.xml
----------------------------------------------------------------------
diff --git a/storm-server/pom.xml b/storm-server/pom.xml
index d6554de..ff917ca 100644
--- a/storm-server/pom.xml
+++ b/storm-server/pom.xml
@@ -130,7 +130,7 @@
<artifactId>maven-checkstyle-plugin</artifactId>
<!--Note - the version would be inherited-->
<configuration>
- <maxAllowedViolations>2630</maxAllowedViolations>
+ <maxAllowedViolations>2620</maxAllowedViolations>
</configuration>
</plugin>
<plugin>
http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/main/java/org/apache/storm/blobstore/BlobStoreUtils.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/blobstore/BlobStoreUtils.java b/storm-server/src/main/java/org/apache/storm/blobstore/BlobStoreUtils.java
index d660ab0..fd1cf40 100644
--- a/storm-server/src/main/java/org/apache/storm/blobstore/BlobStoreUtils.java
+++ b/storm-server/src/main/java/org/apache/storm/blobstore/BlobStoreUtils.java
@@ -24,9 +24,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
-
import javax.security.auth.Subject;
-
import org.apache.commons.collections.CollectionUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.storm.Config;
http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
index b495cfa..2c785a5 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -52,7 +52,6 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.function.UnaryOperator;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import java.util.stream.Collectors;
import javax.security.auth.Subject;
import org.apache.storm.Config;
import org.apache.storm.Constants;
@@ -136,7 +135,6 @@ import org.apache.storm.nimbus.ITopologyActionNotifierPlugin;
import org.apache.storm.nimbus.ITopologyValidator;
import org.apache.storm.nimbus.NimbusInfo;
import org.apache.storm.scheduler.Cluster;
-import org.apache.storm.scheduler.SupervisorResources;
import org.apache.storm.scheduler.DefaultScheduler;
import org.apache.storm.scheduler.ExecutorDetails;
import org.apache.storm.scheduler.INimbus;
@@ -144,14 +142,15 @@ import org.apache.storm.scheduler.IScheduler;
import org.apache.storm.scheduler.SchedulerAssignment;
import org.apache.storm.scheduler.SchedulerAssignmentImpl;
import org.apache.storm.scheduler.SupervisorDetails;
+import org.apache.storm.scheduler.SupervisorResources;
import org.apache.storm.scheduler.Topologies;
import org.apache.storm.scheduler.TopologyDetails;
import org.apache.storm.scheduler.WorkerSlot;
import org.apache.storm.scheduler.blacklist.BlacklistScheduler;
import org.apache.storm.scheduler.multitenant.MultitenantScheduler;
-import org.apache.storm.scheduler.resource.NormalizedResourceRequest;
import org.apache.storm.scheduler.resource.ResourceAwareScheduler;
import org.apache.storm.scheduler.resource.ResourceUtils;
+import org.apache.storm.scheduler.resource.NormalizedResourceRequest;
import org.apache.storm.security.INimbusCredentialPlugin;
import org.apache.storm.security.auth.AuthUtils;
import org.apache.storm.security.auth.IAuthorizer;
http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
index 87e310e..3faf1e4 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
@@ -17,22 +17,20 @@
*/
package org.apache.storm.daemon.supervisor.timer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import org.apache.storm.Config;
import org.apache.storm.DaemonConfig;
import org.apache.storm.cluster.IStormClusterState;
import org.apache.storm.daemon.supervisor.Supervisor;
import org.apache.storm.generated.SupervisorInfo;
+import org.apache.storm.scheduler.resource.NormalizedResources;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.Time;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.storm.scheduler.resource.NormalizedResources.normalizedResourceMap;
-
public class SupervisorHeartbeat implements Runnable {
private final IStormClusterState stormClusterState;
@@ -92,7 +90,7 @@ public class SupervisorHeartbeat implements Runnable {
ret.put(stringNumberEntry.getKey(), stringNumberEntry.getValue().doubleValue());
}
- return normalizedResourceMap(ret);
+ return NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(ret);
}
@Override
http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
index 9ab0c93..1ed88c7 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
@@ -37,6 +37,7 @@ import org.apache.storm.networktopography.DNSToSwitchMapping;
import org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping;
import org.apache.storm.scheduler.resource.NormalizedResourceOffer;
import org.apache.storm.scheduler.resource.NormalizedResourceRequest;
+import org.apache.storm.scheduler.resource.NormalizedResources;
import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.ReflectionUtils;
@@ -44,8 +45,6 @@ import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.storm.scheduler.resource.NormalizedResources.normalizedResourceMap;
-
public class Cluster implements ISchedulingState {
private static final Logger LOG = LoggerFactory.getLogger(Cluster.class);
private SchedulerAssignmentImpl assignment;
@@ -440,7 +439,7 @@ public class Cluster implements ISchedulingState {
Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, shared.get_on_heap()
);
}
- sharedTotalResources = normalizedResourceMap(sharedTotalResources);
+ sharedTotalResources = NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(sharedTotalResources);
WorkerResources ret = new WorkerResources();
ret.set_resources(totalResources.toNormalizedMap());
ret.set_shared_resources(sharedTotalResources);
http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceOffer.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceOffer.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceOffer.java
index 58f12d0..787bbce 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceOffer.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceOffer.java
@@ -18,68 +18,97 @@
package org.apache.storm.scheduler.resource;
-import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import org.apache.storm.Constants;
-import org.apache.storm.generated.WorkerResources;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * An offer of resources that has been normalized.
+ * An offer of resources with normalized resource names.
*/
-public class NormalizedResourceOffer extends NormalizedResources {
+public class NormalizedResourceOffer implements NormalizedResourcesWithMemory {
+
private static final Logger LOG = LoggerFactory.getLogger(NormalizedResourceOffer.class);
- private double totalMemory;
+ private final NormalizedResources normalizedResources;
+ private double totalMemoryMb;
/**
- * Create a new normalized set of resources. Note that memory is not covered here because it is not consistent in requests vs offers
- * because of how on heap vs off heap is used.
+ * Create a new normalized resource offer.
*
* @param resources the resources to be normalized.
*/
public NormalizedResourceOffer(Map<String, ? extends Number> resources) {
- super(resources, null);
- totalMemory = getNormalizedResources().getOrDefault(Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME, 0.0);
+ Map<String, Double> normalizedResourceMap = NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(resources);
+ totalMemoryMb = normalizedResourceMap.getOrDefault(Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME, 0.0);
+ this.normalizedResources = new NormalizedResources(normalizedResourceMap);
}
public NormalizedResourceOffer() {
- this((Map<String, ? extends Number>)null);
+ this((Map<String, ? extends Number>) null);
}
public NormalizedResourceOffer(NormalizedResourceOffer other) {
- super(other);
- this.totalMemory = other.totalMemory;
+ this.totalMemoryMb = other.totalMemoryMb;
+ this.normalizedResources = new NormalizedResources(other.normalizedResources);
}
@Override
public double getTotalMemoryMb() {
- return totalMemory;
+ return totalMemoryMb;
}
- @Override
- public String toString() {
- return super.toString() + " MEM: " + totalMemory;
+ public Map<String, Double> toNormalizedMap() {
+ Map<String, Double> ret = normalizedResources.toNormalizedMap();
+ ret.put(Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME, totalMemoryMb);
+ return ret;
}
- @Override
- public Map<String,Double> toNormalizedMap() {
- Map<String, Double> ret = super.toNormalizedMap();
- ret.put(Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME, totalMemory);
- return ret;
+ public void add(NormalizedResourcesWithMemory other) {
+ normalizedResources.add(other.getNormalizedResources());
+ totalMemoryMb += other.getTotalMemoryMb();
}
- @Override
- public void add(NormalizedResources other) {
- super.add(other);
- totalMemory += other.getTotalMemoryMb();
+ public void remove(NormalizedResourcesWithMemory other) {
+ normalizedResources.remove(other.getNormalizedResources());
+ totalMemoryMb -= other.getTotalMemoryMb();
+ if (totalMemoryMb < 0.0) {
+ normalizedResources.throwBecauseResourceBecameNegative(
+ Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME, totalMemoryMb, other.getTotalMemoryMb());
+ };
+ }
+
+ /**
+ * @see NormalizedResources#calculateAveragePercentageUsedBy(org.apache.storm.scheduler.resource.normalization.NormalizedResources,
+ * double, double).
+ */
+ public double calculateAveragePercentageUsedBy(NormalizedResourceOffer used) {
+ return normalizedResources.calculateAveragePercentageUsedBy(
+ used.getNormalizedResources(), getTotalMemoryMb(), used.getTotalMemoryMb());
+ }
+
+ /**
+ * @see NormalizedResources#calculateMinPercentageUsedBy(org.apache.storm.scheduler.resource.normalization.NormalizedResources, double,
+ * double)
+ */
+ public double calculateMinPercentageUsedBy(NormalizedResourceOffer used) {
+ return normalizedResources.calculateMinPercentageUsedBy(used.getNormalizedResources(), getTotalMemoryMb(), used.getTotalMemoryMb());
+ }
+
+ /**
+ * @see NormalizedResources#couldHoldIgnoringSharedMemory(org.apache.storm.scheduler.resource.normalization.NormalizedResources, double,
+ * double).
+ */
+ public boolean couldHoldIgnoringSharedMemory(NormalizedResourcesWithMemory other) {
+ return normalizedResources.couldHoldIgnoringSharedMemory(
+ other.getNormalizedResources(), getTotalMemoryMb(), other.getTotalMemoryMb());
+ }
+
+ public double getTotalCpu() {
+ return normalizedResources.getTotalCpu();
}
@Override
- public void remove(NormalizedResources other) {
- super.remove(other);
- totalMemory -= other.getTotalMemoryMb();
- assert totalMemory >= 0.0;
+ public NormalizedResources getNormalizedResources() {
+ return normalizedResources;
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceRequest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceRequest.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceRequest.java
index 5963750..2af90ab 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceRequest.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceRequest.java
@@ -18,12 +18,8 @@
package org.apache.storm.scheduler.resource;
-import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
import org.apache.storm.Config;
import org.apache.storm.Constants;
import org.apache.storm.generated.ComponentCommon;
@@ -36,14 +32,15 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * A request that has been normalized.
+ * A resource request with normalized resource names.
*/
-public class NormalizedResourceRequest extends NormalizedResources {
+public class NormalizedResourceRequest implements NormalizedResourcesWithMemory {
+
private static final Logger LOG = LoggerFactory.getLogger(NormalizedResourceRequest.class);
private static void putIfMissing(Map<String, Double> dest, String destKey, Map<String, Object> src, String srcKey) {
if (!dest.containsKey(destKey)) {
- Number value = (Number)src.get(srcKey);
+ Number value = (Number) src.get(srcKey);
if (value != null) {
dest.put(destKey, value.doubleValue());
}
@@ -51,7 +48,7 @@ public class NormalizedResourceRequest extends NormalizedResources {
}
private static Map<String, Double> getDefaultResources(Map<String, Object> topoConf) {
- Map<String, Double> ret = normalizedResourceMap((Map<String, Number>) topoConf.getOrDefault(
+ Map<String, Double> ret = NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap((Map<String, Number>) topoConf.getOrDefault(
Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, new HashMap<>()));
putIfMissing(ret, Constants.COMMON_CPU_RESOURCE_NAME, topoConf, Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);
putIfMissing(ret, Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, topoConf, Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB);
@@ -96,7 +93,6 @@ public class NormalizedResourceRequest extends NormalizedResources {
stringNumberEntry.getKey(), stringNumberEntry.getValue().doubleValue());
}
-
}
}
} catch (ParseException e) {
@@ -106,48 +102,38 @@ public class NormalizedResourceRequest extends NormalizedResources {
return topologyResources;
}
+ private final NormalizedResources normalizedResources;
private double onHeap;
private double offHeap;
- /**
- * Create a new normalized set of resources. Note that memory is not covered here because it is not consistent in requests vs offers
- * because of how on heap vs off heap is used.
- *
- * @param resources the resources to be normalized.
- * @param topologyConf the config for the topology
- */
private NormalizedResourceRequest(Map<String, ? extends Number> resources,
- Map<String, Object> topologyConf) {
- super(resources, getDefaultResources(topologyConf));
- initializeMemory(getNormalizedResources());
+ Map<String, Double> defaultResources) {
+ Map<String, Double> normalizedResourceMap = NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(defaultResources);
+ normalizedResourceMap.putAll(NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(resources));
+ onHeap = normalizedResourceMap.getOrDefault(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, 0.0);
+ offHeap = normalizedResourceMap.getOrDefault(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, 0.0);
+ normalizedResources = new NormalizedResources(normalizedResourceMap);
}
public NormalizedResourceRequest(ComponentCommon component, Map<String, Object> topoConf) {
- this(parseResources(component.get_json_conf()), topoConf);
+ this(parseResources(component.get_json_conf()), getDefaultResources(topoConf));
}
public NormalizedResourceRequest(Map<String, Object> topoConf) {
- this((Map<String, ? extends Number>) null, topoConf);
+ this((Map<String, ? extends Number>) null, getDefaultResources(topoConf));
}
public NormalizedResourceRequest() {
- super(null, null);
- initializeMemory(getNormalizedResources());
+ this((Map<String, ? extends Number>) null, null);
}
- @Override
- public Map<String,Double> toNormalizedMap() {
- Map<String, Double> ret = super.toNormalizedMap();
+ public Map<String, Double> toNormalizedMap() {
+ Map<String, Double> ret = this.normalizedResources.toNormalizedMap();
ret.put(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, offHeap);
ret.put(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, onHeap);
return ret;
}
- private void initializeMemory(Map<String, Double> normalizedResources) {
- onHeap = normalizedResources.getOrDefault(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, 0.0);
- offHeap = normalizedResources.getOrDefault(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, 0.0);
- }
-
public double getOnHeapMemoryMb() {
return onHeap;
}
@@ -166,17 +152,17 @@ public class NormalizedResourceRequest extends NormalizedResources {
/**
* Add the resources in other to this.
+ *
* @param other the other Request to add to this.
*/
public void add(NormalizedResourceRequest other) {
- super.add(other);
+ this.normalizedResources.add(other.normalizedResources);
onHeap += other.onHeap;
offHeap += other.offHeap;
}
- @Override
public void add(WorkerResources value) {
- super.add(value);
+ this.normalizedResources.add(value);
//The resources are already normalized
Map<String, Double> resources = value.get_resources();
onHeap += resources.getOrDefault(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, 0.0);
@@ -185,11 +171,20 @@ public class NormalizedResourceRequest extends NormalizedResources {
@Override
public double getTotalMemoryMb() {
- return getOnHeapMemoryMb() + getOffHeapMemoryMb();
+ return onHeap + offHeap;
}
@Override
public String toString() {
return super.toString() + " onHeap: " + onHeap + " offHeap: " + offHeap;
}
+
+ public double getTotalCpu() {
+ return this.normalizedResources.getTotalCpu();
+ }
+
+ @Override
+ public NormalizedResources getNormalizedResources() {
+ return this.normalizedResources;
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResources.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResources.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResources.java
index 846ab69..f8e911a 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResources.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResources.java
@@ -20,74 +20,39 @@ package org.apache.storm.scheduler.resource;
import com.google.common.annotations.VisibleForTesting;
import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
-import org.apache.storm.Config;
+import org.apache.commons.lang.Validate;
import org.apache.storm.Constants;
import org.apache.storm.generated.WorkerResources;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Resources that have been normalized.
+ * Resources that have been normalized. This class is intended as a delegate for more specific types of normalized resource set, since it
+ * does not keep track of memory as a resource.
*/
-public abstract class NormalizedResources {
+public class NormalizedResources {
private static final Logger LOG = LoggerFactory.getLogger(NormalizedResources.class);
- public static final Map<String, String> RESOURCE_NAME_MAPPING;
- static {
- Map<String, String> tmp = new HashMap<>();
- tmp.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, Constants.COMMON_CPU_RESOURCE_NAME);
- tmp.put(Config.SUPERVISOR_CPU_CAPACITY, Constants.COMMON_CPU_RESOURCE_NAME);
- tmp.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME);
- tmp.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME);
- tmp.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME);
- RESOURCE_NAME_MAPPING = Collections.unmodifiableMap(tmp);
- }
+ public static ResourceNameNormalizer RESOURCE_NAME_NORMALIZER;
+ private static ResourceMapArrayBridge RESOURCE_MAP_ARRAY_BRIDGE;
- private static double[] makeArray(Map<String, Double> normalizedResources) {
- //To avoid locking we will go through the map twice. It should be small so it is probably not a big deal
- for (String key : normalizedResources.keySet()) {
- //We are going to skip over CPU and Memory, because they are captured elsewhere
- if (!Constants.COMMON_CPU_RESOURCE_NAME.equals(key)
- && !Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME.equals(key)
- && !Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME.equals(key)
- && !Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME.equals(key)) {
- resourceNames.computeIfAbsent(key, (k) -> counter.getAndIncrement());
- }
- }
- //By default all of the values are 0
- double[] ret = new double[counter.get()];
- for (Map.Entry<String, Double> entry : normalizedResources.entrySet()) {
- Integer index = resourceNames.get(entry.getKey());
- if (index != null) {
- //index == null if it is memory or CPU
- ret[index] = entry.getValue();
- }
- }
- return ret;
+ static {
+ resetResourceNames();
}
- private static final ConcurrentMap<String, Integer> resourceNames = new ConcurrentHashMap<>();
- private static final AtomicInteger counter = new AtomicInteger(0);
private double cpu;
private double[] otherResources;
- private final Map<String, Double> normalizedResources;
/**
- * This is for testing only. It allows a test to reset the mapping of resource names in the array. We reset the mapping because some
+ * This is for testing only. It allows a test to reset the static state relating to resource names. We reset the mapping because some
* algorithms sadly have different behavior if a resource exists or not.
*/
@VisibleForTesting
public static void resetResourceNames() {
- resourceNames.clear();
- counter.set(0);
+ RESOURCE_NAME_NORMALIZER = new ResourceNameNormalizer();
+ RESOURCE_MAP_ARRAY_BRIDGE = new ResourceMapArrayBridge();
}
/**
@@ -96,53 +61,21 @@ public abstract class NormalizedResources {
public NormalizedResources(NormalizedResources other) {
cpu = other.cpu;
otherResources = Arrays.copyOf(other.otherResources, other.otherResources.length);
- normalizedResources = other.normalizedResources;
}
/**
- * Create a new normalized set of resources. Note that memory is not covered here because it is not consistent in requests vs offers
- * because of how on heap vs off heap is used.
+ * Create a new normalized set of resources. Note that memory is not managed by this class, as it is not consistent in requests vs
+ * offers because of how on heap vs off heap is used.
*
- * @param resources the resources to be normalized.
- * @param defaults the default resources that will also be normalized and combined with the real resources.
+ * @param normalizedResources the normalized resource map
+ * @param getTotalMemoryMb Supplier of total memory in MB.
*/
- public NormalizedResources(Map<String, ? extends Number> resources, Map<String, ? extends Number> defaults) {
- normalizedResources = normalizedResourceMap(defaults);
- normalizedResources.putAll(normalizedResourceMap(resources));
+ public NormalizedResources(Map<String, Double> normalizedResources) {
cpu = normalizedResources.getOrDefault(Constants.COMMON_CPU_RESOURCE_NAME, 0.0);
- otherResources = makeArray(normalizedResources);
- }
-
- protected final Map<String, Double> getNormalizedResources() {
- return this.normalizedResources;
+ otherResources = RESOURCE_MAP_ARRAY_BRIDGE.translateToResourceArray(normalizedResources);
}
/**
- * Normalizes a supervisor resource map or topology details map's keys to universal resource names.
- *
- * @param resourceMap resource map of either Supervisor or Topology
- * @return the resource map with common resource names
- */
- public static Map<String, Double> normalizedResourceMap(Map<String, ? extends Number> resourceMap) {
- if (resourceMap == null) {
- return new HashMap<>();
- }
- return new HashMap<>(resourceMap.entrySet().stream()
- .collect(Collectors.toMap(
- //Map the key if needed
- (e) -> RESOURCE_NAME_MAPPING.getOrDefault(e.getKey(), e.getKey()),
- //Map the value
- (e) -> e.getValue().doubleValue())));
- }
-
- /**
- * Get the total amount of memory.
- *
- * @return the total amount of memory requested or provided.
- */
- public abstract double getTotalMemoryMb();
-
- /**
* Get the total amount of cpu.
*
* @return the amount of cpu.
@@ -150,15 +83,18 @@ public abstract class NormalizedResources {
public double getTotalCpu() {
return cpu;
}
+
+ private void zeroPadOtherResourcesIfNecessary(int requiredLength) {
+ if (requiredLength > otherResources.length) {
+ double[] newResources = new double[requiredLength];
+ System.arraycopy(otherResources, 0, newResources, 0, otherResources.length);
+ otherResources = newResources;
+ }
+ }
private void add(double[] resourceArray) {
int otherLength = resourceArray.length;
- int length = otherResources.length;
- if (otherLength > length) {
- double[] newResources = new double[otherLength];
- System.arraycopy(newResources, 0, otherResources, 0, length);
- otherResources = newResources;
- }
+ zeroPadOtherResourcesIfNecessary(otherLength);
for (int i = 0; i < otherLength; i++) {
otherResources[i] += resourceArray[i];
}
@@ -177,45 +113,39 @@ public abstract class NormalizedResources {
public void add(WorkerResources value) {
Map<String, Double> workerNormalizedResources = value.get_resources();
cpu += workerNormalizedResources.getOrDefault(Constants.COMMON_CPU_RESOURCE_NAME, 0.0);
- add(makeArray(workerNormalizedResources));
+ add(RESOURCE_MAP_ARRAY_BRIDGE.translateToResourceArray(workerNormalizedResources));
}
+ public void throwBecauseResourceBecameNegative(String resourceName, double currentValue, double subtractedValue) {
+ throw new IllegalArgumentException(String.format("Resource amounts should never be negative."
+ + " Resource '%s' with current value '%f' became negative because '%f' was removed.",
+ resourceName, currentValue, subtractedValue));
+ }
+
/**
* Remove the other resources from this. This is the same as subtracting the resources in other from this.
*
* @param other the resources we want removed.
+ * @throws IllegalArgumentException if subtracting other from this would result in any resource amount becoming negative.
*/
public void remove(NormalizedResources other) {
this.cpu -= other.cpu;
- assert cpu >= 0.0;
- int otherLength = other.otherResources.length;
- int length = otherResources.length;
- if (otherLength > length) {
- double[] newResources = new double[otherLength];
- System.arraycopy(newResources, 0, otherResources, 0, length);
- otherResources = newResources;
+ if (cpu < 0.0) {
+ throwBecauseResourceBecameNegative(Constants.COMMON_CPU_RESOURCE_NAME, cpu, other.cpu);
}
+ int otherLength = other.otherResources.length;
+ zeroPadOtherResourcesIfNecessary(otherLength);
for (int i = 0; i < otherLength; i++) {
otherResources[i] -= other.otherResources[i];
- assert otherResources[i] >= 0.0;
+ if (otherResources[i] < 0.0) {
+ throwBecauseResourceBecameNegative(getResourceNameForResourceIndex(i), otherResources[i], other.otherResources[i]);
+ }
}
}
@Override
public String toString() {
- return "CPU: " + cpu + " Other resources: " + toNormalizedOtherResources();
- }
-
- private Map<String, Double> toNormalizedOtherResources() {
- Map<String, Double> ret = new HashMap<>();
- int length = otherResources.length;
- for (Map.Entry<String, Integer> entry : resourceNames.entrySet()) {
- int index = entry.getValue();
- if (index < length) {
- ret.put(entry.getKey(), otherResources[index]);
- }
- }
- return ret;
+ return "Normalized resources: " + toNormalizedMap();
}
/**
@@ -223,7 +153,7 @@ public abstract class NormalizedResources {
* user.
*/
public Map<String, Double> toNormalizedMap() {
- Map<String, Double> ret = toNormalizedOtherResources();
+ Map<String, Double> ret = RESOURCE_MAP_ARRAY_BRIDGE.translateFromResourceArray(otherResources);
ret.put(Constants.COMMON_CPU_RESOURCE_NAME, cpu);
return ret;
}
@@ -240,9 +170,11 @@ public abstract class NormalizedResources {
* does not check memory because with shared memory it is beyond the scope of this.
*
* @param other the resources that we want to check if they would fit in this.
+ * @param thisTotalMemoryMb The total memory in MB of this
+ * @param otherTotalMemoryMb The total memory in MB of other
* @return true if it might fit, else false if it could not possibly fit.
*/
- public boolean couldHoldIgnoringSharedMemory(NormalizedResources other) {
+ public boolean couldHoldIgnoringSharedMemory(NormalizedResources other, double thisTotalMemoryMb, double otherTotalMemoryMb) {
if (this.cpu < other.getTotalCpu()) {
return false;
}
@@ -253,69 +185,70 @@ public abstract class NormalizedResources {
}
}
- if (this.getTotalMemoryMb() < other.getTotalMemoryMb()) {
- return false;
- }
- return true;
+ return thisTotalMemoryMb >= otherTotalMemoryMb;
}
- private void throwBecauseResourceIsMissingFromTotal(int resourceIndex) {
- String resourceName = null;
- for (Map.Entry<String, Integer> entry : resourceNames.entrySet()) {
+ private String getResourceNameForResourceIndex(int resourceIndex) {
+ for (Map.Entry<String, Integer> entry : RESOURCE_MAP_ARRAY_BRIDGE.getResourceNamesToArrayIndex().entrySet()) {
int index = entry.getValue();
if (index == resourceIndex) {
- resourceName = entry.getKey();
- break;
+ return entry.getKey();
}
}
- if (resourceName == null) {
- throw new IllegalStateException("Array index " + resourceIndex + " is not mapped in the resource names map."
- + " This should not be possible, and is likely a bug in the Storm code.");
- }
- throw new IllegalArgumentException("Total resources does not contain resource '"
- + resourceName
- + "'. All resources should be represented in the total. This is likely a bug in the Storm code");
+ return null;
+ }
+
+ private void throwBecauseUsedIsNotSubsetOfTotal(NormalizedResources used, double totalMemoryMb, double usedMemoryMb) {
+ throw new IllegalArgumentException(String.format("The used resources must be a subset of the total resources."
+ + " Used: '%s', Total: '%s', Used Mem: '%f', Total Mem: '%f'",
+ used.toNormalizedMap(), this.toNormalizedMap(), usedMemoryMb, totalMemoryMb));
}
/**
- * Calculate the average resource usage percentage with this being the total resources and used being the amounts used.
+ * Calculate the average resource usage percentage with this being the total resources and used being the amounts used. Used must be a
+ * subset of the total resources. If a resource in the total has a value of zero, it will be skipped in the calculation to avoid
+ * division by 0. If all resources are skipped the result is defined to be 100.0.
*
* @param used the amount of resources used.
- * @return the average percentage used 0.0 to 100.0. Clamps to 100.0 in case there are no available resources in the total
+ * @param totalMemoryMb The total memory in MB
+ * @param usedMemoryMb The used memory in MB
+ * @return the average percentage used 0.0 to 100.0.
+ * @throws IllegalArgumentException if any resource in used has a greater value than the same resource in the total, or used has generic
+ * resources that are not present in the total.
*/
- public double calculateAveragePercentageUsedBy(NormalizedResources used) {
+ public double calculateAveragePercentageUsedBy(NormalizedResources used, double totalMemoryMb, double usedMemoryMb) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Calculating avg percentage used by. Used Mem: {} Total Mem: {}"
+ + " Used Normalized Resources: {} Total Normalized Resources: {}", totalMemoryMb, usedMemoryMb,
+ toNormalizedMap(), used.toNormalizedMap());
+ }
+
int skippedResourceTypes = 0;
double total = 0.0;
- double totalMemory = getTotalMemoryMb();
- if (totalMemory != 0.0) {
- total += used.getTotalMemoryMb() / totalMemory;
+ if (usedMemoryMb > totalMemoryMb) {
+ throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
+ }
+ if (totalMemoryMb != 0.0) {
+ total += usedMemoryMb / totalMemoryMb;
} else {
skippedResourceTypes++;
}
double totalCpu = getTotalCpu();
+ if (used.getTotalCpu() > getTotalCpu()) {
+ throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
+ }
if (totalCpu != 0.0) {
total += used.getTotalCpu() / getTotalCpu();
} else {
skippedResourceTypes++;
}
- if (LOG.isTraceEnabled()) {
- LOG.trace("Calculating avg percentage used by. Used CPU: {} Total CPU: {} Used Mem: {} Total Mem: {}"
- + " Other Used: {} Other Total: {}", totalCpu, used.getTotalCpu(), totalMemory, used.getTotalMemoryMb(),
- this.toNormalizedOtherResources(), used.toNormalizedOtherResources());
- }
if (used.otherResources.length > otherResources.length) {
- throwBecauseResourceIsMissingFromTotal(used.otherResources.length);
+ throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
}
for (int i = 0; i < otherResources.length; i++) {
double totalValue = otherResources[i];
- if (totalValue == 0.0) {
- //Skip any resources where the total is 0, the percent used for this resource isn't meaningful.
- //We fall back to prioritizing by cpu, memory and any other resources by ignoring this value
- skippedResourceTypes++;
- continue;
- }
double usedValue;
if (i >= used.otherResources.length) {
//Resources missing from used are using none of that resource
@@ -323,14 +256,24 @@ public abstract class NormalizedResources {
} else {
usedValue = used.otherResources[i];
}
+ if (usedValue > totalValue) {
+ throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
+ }
+ if (totalValue == 0.0) {
+ //Skip any resources where the total is 0, the percent used for this resource isn't meaningful.
+ //We fall back to prioritizing by cpu, memory and any other resources by ignoring this value
+ skippedResourceTypes++;
+ continue;
+ }
+
total += usedValue / totalValue;
}
//Adjust the divisor for the average to account for any skipped resources (those where the total was 0)
int divisor = 2 + otherResources.length - skippedResourceTypes;
if (divisor == 0) {
- /*This is an arbitrary choice to make the result consistent with calculateMin.
- Any value would be valid here, becase there are no (non-zero) resources in the total set of resources,
- so we're trying to average 0 values.
+ /*
+ * This is an arbitrary choice to make the result consistent with calculateMin. Any value would be valid here, becase there are
+ * no (non-zero) resources in the total set of resources, so we're trying to average 0 values.
*/
return 100.0;
} else {
@@ -339,41 +282,43 @@ public abstract class NormalizedResources {
}
/**
- * Calculate the minimum resource usage percentage with this being the total resources and used being the amounts used.
+ * Calculate the minimum resource usage percentage with this being the total resources and used being the amounts used. Used must be a
+ * subset of the total resources. If a resource in the total has a value of zero, it will be skipped in the calculation to avoid
+ * division by 0. If all resources are skipped the result is defined to be 100.0.
*
* @param used the amount of resources used.
- * @return the minimum percentage used 0.0 to 100.0. Clamps to 100.0 in case there are no available resources in the total.
+ * @param totalMemoryMb The total memory in MB
+ * @param usedMemoryMb The used memory in MB
+ * @return the minimum percentage used 0.0 to 100.0.
+ * @throws IllegalArgumentException if any resource in used has a greater value than the same resource in the total, or used has generic
+ * resources that are not present in the total.
*/
- public double calculateMinPercentageUsedBy(NormalizedResources used) {
- double totalMemory = getTotalMemoryMb();
- double totalCpu = getTotalCpu();
-
+ public double calculateMinPercentageUsedBy(NormalizedResources used, double totalMemoryMb, double usedMemoryMb) {
if (LOG.isTraceEnabled()) {
- LOG.trace("Calculating min percentage used by. Used CPU: {} Total CPU: {} Used Mem: {} Total Mem: {}"
- + " Other Used: {} Other Total: {}", totalCpu, used.getTotalCpu(), totalMemory, used.getTotalMemoryMb(),
- toNormalizedOtherResources(), used.toNormalizedOtherResources());
+ LOG.trace("Calculating min percentage used by. Used Mem: {} Total Mem: {}"
+ + " Used Normalized Resources: {} Total Normalized Resources: {}", totalMemoryMb, usedMemoryMb,
+ toNormalizedMap(), used.toNormalizedMap());
}
- if (used.otherResources.length > otherResources.length) {
- throwBecauseResourceIsMissingFromTotal(used.otherResources.length);
+ double min = 1.0;
+ if (usedMemoryMb > totalMemoryMb) {
+ throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
}
-
- if (used.otherResources.length != otherResources.length
- || totalMemory == 0.0
- || totalCpu == 0.0) {
- //If the lengths don't match one of the resources will be 0, which means we would calculate the percentage to be 0.0
- // and so the min would be 0.0 (assuming that we can never go negative on a resource being used.
- return 0.0;
+ if (totalMemoryMb != 0.0) {
+ min = Math.min(min, usedMemoryMb / totalMemoryMb);
}
-
- double min = 100.0;
- if (totalMemory != 0.0) {
- min = Math.min(min, used.getTotalMemoryMb() / totalMemory);
+ double totalCpu = getTotalCpu();
+ if (used.getTotalCpu() > totalCpu) {
+ throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
}
if (totalCpu != 0.0) {
min = Math.min(min, used.getTotalCpu() / totalCpu);
}
+ if (used.otherResources.length > otherResources.length) {
+ throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
+ }
+
for (int i = 0; i < otherResources.length; i++) {
if (otherResources[i] == 0.0) {
//Skip any resources where the total is 0, the percent used for this resource isn't meaningful.
@@ -384,6 +329,9 @@ public abstract class NormalizedResources {
//Resources missing from used are using none of that resource
return 0;
}
+ if (used.otherResources[i] > otherResources[i]) {
+ throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
+ }
min = Math.min(min, used.otherResources[i] / otherResources[i]);
}
return min * 100.0;
http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourcesWithMemory.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourcesWithMemory.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourcesWithMemory.java
new file mode 100644
index 0000000..640233a
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourcesWithMemory.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource;
+
+/**
+ * Intended for {@link NormalizedResources} wrappers that handle memory.
+ */
+public interface NormalizedResourcesWithMemory {
+
+ NormalizedResources getNormalizedResources();
+
+ double getTotalMemoryMb();
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
index 5350005..db9ca73 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
@@ -25,14 +25,11 @@ import java.util.LinkedList;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
-
-import org.apache.storm.Constants;
import org.apache.storm.scheduler.Cluster;
import org.apache.storm.scheduler.ExecutorDetails;
import org.apache.storm.scheduler.SupervisorDetails;
import org.apache.storm.scheduler.TopologyDetails;
import org.apache.storm.scheduler.WorkerSlot;
-import org.apache.storm.utils.ObjectReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Nodes.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Nodes.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Nodes.java
index 6c70ff1..2bd2b86 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Nodes.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Nodes.java
@@ -22,7 +22,6 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
-
import org.apache.storm.scheduler.Cluster;
import org.apache.storm.scheduler.ExecutorDetails;
import org.apache.storm.scheduler.SchedulerAssignment;
http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
index dc557ff..e730705 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
@@ -30,7 +30,6 @@ import org.apache.storm.DaemonConfig;
import org.apache.storm.scheduler.Cluster;
import org.apache.storm.scheduler.IScheduler;
import org.apache.storm.scheduler.SchedulerAssignment;
-import org.apache.storm.scheduler.SchedulerAssignmentImpl;
import org.apache.storm.scheduler.SingleTopologyCluster;
import org.apache.storm.scheduler.Topologies;
import org.apache.storm.scheduler.TopologyDetails;
http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceMapArrayBridge.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceMapArrayBridge.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceMapArrayBridge.java
new file mode 100644
index 0000000..cf4f80b
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceMapArrayBridge.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.storm.Constants;
+
+/**
+ * Provides translation between normalized resource maps and resource value arrays. Some operations use resource value arrays instead of the
+ * full normalized resource map as an optimization. See {@link NormalizedResources}.
+ */
+public class ResourceMapArrayBridge {
+
+ private final ConcurrentMap<String, Integer> resourceNamesToArrayIndex = new ConcurrentHashMap<>();
+ private final AtomicInteger counter = new AtomicInteger(0);
+
+ /**
+ * Translates a normalized resource map to an array of resource values. Each resource name will be assigned an index in the array, which
+ * is guaranteed to be consistent with subsequent invocations of this method. Note that CPU and memory resources are not translated by
+ * this method, as they are expected to be captured elsewhere.
+ *
+ * @param normalizedResources The resources to translate to an array
+ * @return The array of resource values
+ */
+ public double[] translateToResourceArray(Map<String, Double> normalizedResources) {
+ //To avoid locking we will go through the map twice. It should be small so it is probably not a big deal
+ for (String key : normalizedResources.keySet()) {
+ //We are going to skip over CPU and Memory, because they are captured elsewhere
+ if (!Constants.COMMON_CPU_RESOURCE_NAME.equals(key)
+ && !Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME.equals(key)
+ && !Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME.equals(key)
+ && !Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME.equals(key)) {
+ resourceNamesToArrayIndex.computeIfAbsent(key, (k) -> counter.getAndIncrement());
+ }
+ }
+ //By default all of the values are 0
+ double[] ret = new double[counter.get()];
+ for (Map.Entry<String, Double> entry : normalizedResources.entrySet()) {
+ Integer index = resourceNamesToArrayIndex.get(entry.getKey());
+ if (index != null) {
+ //index == null if it is memory or CPU
+ ret[index] = entry.getValue();
+ }
+ }
+ return ret;
+ }
+
+ /**
+ * Translates an array of resource values to a normalized resource map.
+ *
+ * @param resources The resource array to translate
+ * @return The normalized resource map
+ */
+ public Map<String, Double> translateFromResourceArray(double[] resources) {
+ Map<String, Double> ret = new HashMap<>();
+ int length = resources.length;
+ for (Map.Entry<String, Integer> entry : resourceNamesToArrayIndex.entrySet()) {
+ int index = entry.getValue();
+ if (index < length) {
+ ret.put(entry.getKey(), resources[index]);
+ }
+ }
+ return ret;
+ }
+
+ public Map<String, Integer> getResourceNamesToArrayIndex() {
+ return Collections.unmodifiableMap(resourceNamesToArrayIndex);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceNameNormalizer.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceNameNormalizer.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceNameNormalizer.java
new file mode 100644
index 0000000..d59ba92
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceNameNormalizer.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.storm.Config;
+import org.apache.storm.Constants;
+
+/**
+ * Provides resource name normalization for resource maps.
+ */
+public class ResourceNameNormalizer {
+
+ private final Map<String, String> resourceNameMapping;
+
+ public ResourceNameNormalizer() {
+ Map<String, String> tmp = new HashMap<>();
+ tmp.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, Constants.COMMON_CPU_RESOURCE_NAME);
+ tmp.put(Config.SUPERVISOR_CPU_CAPACITY, Constants.COMMON_CPU_RESOURCE_NAME);
+ tmp.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME);
+ tmp.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME);
+ tmp.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME);
+ resourceNameMapping = Collections.unmodifiableMap(tmp);
+ }
+
+ /**
+ * Normalizes a supervisor resource map or topology details map's keys to universal resource names.
+ *
+ * @param resourceMap resource map of either Supervisor or Topology
+ * @return the resource map with common resource names
+ */
+ public Map<String, Double> normalizedResourceMap(Map<String, ? extends Number> resourceMap) {
+ if (resourceMap == null) {
+ return new HashMap<>();
+ }
+ return new HashMap<>(resourceMap.entrySet().stream()
+ .collect(Collectors.toMap(
+ //Map the key if needed
+ (e) -> resourceNameMapping.getOrDefault(e.getKey(), e.getKey()),
+ //Map the value
+ (e) -> e.getValue().doubleValue())));
+ }
+
+ public Map<String, String> getResourceNameMapping() {
+ return resourceNameMapping;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java
index db9dbfa..ca4ea63 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java
@@ -20,9 +20,7 @@ package org.apache.storm.scheduler.resource;
import java.util.HashMap;
import java.util.Map;
-
import org.apache.storm.Config;
-import org.apache.storm.Constants;
import org.apache.storm.generated.Bolt;
import org.apache.storm.generated.ComponentCommon;
import org.apache.storm.generated.SpoutSpec;
@@ -33,8 +31,6 @@ import org.json.simple.parser.ParseException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.storm.scheduler.resource.NormalizedResources.normalizedResourceMap;
-
public class ResourceUtils {
private static final Logger LOG = LoggerFactory.getLogger(ResourceUtils.class);
@@ -95,7 +91,8 @@ public class ResourceUtils {
if (resourceUpdatesMap.containsKey(spoutName)) {
ComponentCommon spoutCommon = spoutSpec.get_common();
- Map<String, Double> resourcesUpdate = normalizedResourceMap(resourceUpdatesMap.get(spoutName));
+ Map<String, Double> resourcesUpdate = NormalizedResources
+ .RESOURCE_NAME_NORMALIZER.normalizedResourceMap(resourceUpdatesMap.get(spoutName));
String newJsonConf = getJsonWithUpdatedResources(spoutCommon.get_json_conf(), resourcesUpdate);
spoutCommon.set_json_conf(newJsonConf);
componentsUpdated.put(spoutName, resourcesUpdate);
@@ -110,7 +107,8 @@ public class ResourceUtils {
if (resourceUpdatesMap.containsKey(boltName)) {
ComponentCommon boltCommon = boltObj.get_common();
- Map<String, Double> resourcesUpdate = normalizedResourceMap(resourceUpdatesMap.get(boltName));
+ Map<String, Double> resourcesUpdate = NormalizedResources
+ .RESOURCE_NAME_NORMALIZER.normalizedResourceMap(resourceUpdatesMap.get(boltName));
String newJsonConf = getJsonWithUpdatedResources(boltCommon.get_json_conf(), resourceUpdatesMap.get(boltName));
boltCommon.set_json_conf(newJsonConf);
componentsUpdated.put(boltName, resourcesUpdate);
@@ -128,7 +126,7 @@ public class ResourceUtils {
}
public static String getCorrespondingLegacyResourceName(String normalizedResourceName) {
- for(Map.Entry<String, String> entry : NormalizedResources.RESOURCE_NAME_MAPPING.entrySet()) {
+ for(Map.Entry<String, String> entry : NormalizedResources.RESOURCE_NAME_NORMALIZER.getResourceNameMapping().entrySet()) {
if (entry.getValue().equals(normalizedResourceName)) {
return entry.getKey();
}
@@ -148,7 +146,7 @@ public class ResourceUtils {
);
for (Map.Entry<String, Double> resourceUpdateEntry : resourceUpdates.entrySet()) {
- if (NormalizedResources.RESOURCE_NAME_MAPPING.containsValue(resourceUpdateEntry.getKey())) {
+ if (NormalizedResources.RESOURCE_NAME_NORMALIZER.getResourceNameMapping().containsValue(resourceUpdateEntry.getKey())) {
// if there will be legacy values they will be in the outer conf
jsonObject.remove(getCorrespondingLegacyResourceName(resourceUpdateEntry.getKey()));
componentResourceMap.remove(getCorrespondingLegacyResourceName(resourceUpdateEntry.getKey()));
http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
index 67bc867..73e1aa0 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
@@ -19,7 +19,6 @@
package org.apache.storm.scheduler.resource.strategies.scheduling;
import com.google.common.annotations.VisibleForTesting;
-
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -31,18 +30,15 @@ import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.TreeSet;
-
-import org.apache.storm.executor.Executor;
import org.apache.storm.generated.ComponentType;
import org.apache.storm.scheduler.Cluster;
import org.apache.storm.scheduler.Component;
import org.apache.storm.scheduler.ExecutorDetails;
import org.apache.storm.scheduler.TopologyDetails;
import org.apache.storm.scheduler.WorkerSlot;
-import org.apache.storm.scheduler.resource.NormalizedResourceOffer;
import org.apache.storm.scheduler.resource.RAS_Node;
import org.apache.storm.scheduler.resource.RAS_Nodes;
-import org.apache.storm.scheduler.resource.ResourceUtils;
+import org.apache.storm.scheduler.resource.NormalizedResourceOffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
index e0dc881..efa3ecf 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
@@ -23,13 +23,11 @@ import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.TreeSet;
-
import org.apache.storm.Config;
import org.apache.storm.scheduler.Cluster;
import org.apache.storm.scheduler.Component;
import org.apache.storm.scheduler.ExecutorDetails;
import org.apache.storm.scheduler.TopologyDetails;
-
import org.apache.storm.scheduler.resource.SchedulingResult;
import org.apache.storm.scheduler.resource.SchedulingStatus;
import org.slf4j.Logger;
http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/GenericResourceAwareStrategy.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/GenericResourceAwareStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/GenericResourceAwareStrategy.java
index 108dc49..2b0ca40 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/GenericResourceAwareStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/GenericResourceAwareStrategy.java
@@ -21,16 +21,13 @@ package org.apache.storm.scheduler.resource.strategies.scheduling;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
-import java.util.LinkedList;
import java.util.List;
-import java.util.Map;
import java.util.TreeSet;
import org.apache.storm.Config;
import org.apache.storm.scheduler.Cluster;
import org.apache.storm.scheduler.Component;
import org.apache.storm.scheduler.ExecutorDetails;
import org.apache.storm.scheduler.TopologyDetails;
-import org.apache.storm.scheduler.resource.ResourceUtils;
import org.apache.storm.scheduler.resource.SchedulingResult;
import org.apache.storm.scheduler.resource.SchedulingStatus;
import org.slf4j.Logger;
http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java b/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java
index a64c9b4..6f33251 100644
--- a/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java
+++ b/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java
@@ -25,28 +25,27 @@ import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.FileWriter;
-import java.io.InputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.io.OutputStream;
import java.io.PrintStream;
import java.io.RandomAccessFile;
-import java.nio.file.Files;
import java.nio.file.FileSystems;
+import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
-import java.util.jar.JarEntry;
-import java.util.jar.JarFile;
import java.util.List;
import java.util.Map;
+import java.util.jar.JarEntry;
+import java.util.jar.JarFile;
import java.util.zip.GZIPInputStream;
import java.util.zip.ZipEntry;
import java.util.zip.ZipFile;
import javax.security.auth.Subject;
-
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.commons.exec.CommandLine;
@@ -54,16 +53,15 @@ import org.apache.commons.exec.DefaultExecutor;
import org.apache.commons.exec.ExecuteException;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
+import org.apache.storm.Config;
+import org.apache.storm.DaemonConfig;
import org.apache.storm.blobstore.BlobStore;
import org.apache.storm.blobstore.BlobStoreAclHandler;
import org.apache.storm.blobstore.ClientBlobStore;
import org.apache.storm.blobstore.InputStreamWithMeta;
import org.apache.storm.blobstore.LocalFsBlobStore;
import org.apache.storm.blobstore.LocalModeClientBlobStore;
-import org.apache.storm.Config;
-import org.apache.storm.Constants;
import org.apache.storm.daemon.StormCommon;
-import org.apache.storm.DaemonConfig;
import org.apache.storm.generated.AccessControl;
import org.apache.storm.generated.AccessControlType;
import org.apache.storm.generated.AuthorizationException;
@@ -73,8 +71,8 @@ import org.apache.storm.generated.ReadableBlobMeta;
import org.apache.storm.generated.SettableBlobMeta;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.nimbus.NimbusInfo;
-import org.apache.storm.scheduler.resource.NormalizedResourceRequest;
import org.apache.storm.scheduler.resource.ResourceUtils;
+import org.apache.storm.scheduler.resource.NormalizedResourceRequest;
import org.apache.storm.security.auth.SingleUserPrincipal;
import org.apache.thrift.TException;
import org.slf4j.Logger;
http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
index eb5f70f..360db3a 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
@@ -501,9 +501,6 @@ public class TestResourceAwareScheduler {
}
public void testHeterogeneousCluster(Config topologyConf, String strategyName) {
- Map<String, Double> test = new HashMap<>();
- test.put("gpu.count", 0.0);
- new NormalizedResourceOffer(test);
LOG.info("\n\n\t\ttestHeterogeneousCluster");
INimbus iNimbus = new INimbusTest();
Map<String, Double> resourceMap1 = new HashMap<>(); // strong supervisor node
@@ -513,8 +510,8 @@ public class TestResourceAwareScheduler {
resourceMap2.put(Config.SUPERVISOR_CPU_CAPACITY, 200.0);
resourceMap2.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 1024.0);
- resourceMap1 = NormalizedResources.normalizedResourceMap(resourceMap1);
- resourceMap2 = NormalizedResources.normalizedResourceMap(resourceMap2);
+ resourceMap1 = NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(resourceMap1);
+ resourceMap2 = NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(resourceMap2);
Map<String, SupervisorDetails> supMap = new HashMap<>();
for (int i = 0; i < 2; i++) {
http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
index 4a2e644..3d37ad9 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
@@ -65,8 +65,6 @@ import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import static org.apache.storm.scheduler.resource.NormalizedResources.normalizedResourceMap;
-
public class TestUtilsForResourceAwareScheduler {
private static final Logger LOG = LoggerFactory.getLogger(TestUtilsForResourceAwareScheduler.class);
@@ -155,7 +153,7 @@ public class TestUtilsForResourceAwareScheduler {
for (int j = 0; j < numPorts; j++) {
ports.add(j);
}
- SupervisorDetails sup = new SupervisorDetails("sup-" + i, "host-" + i, null, ports, normalizedResourceMap(resourceMap));
+ SupervisorDetails sup = new SupervisorDetails("sup-" + i, "host-" + i, null, ports, NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(resourceMap));
retList.put(sup.getId(), sup);
}
return retList;
http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/test/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourcesRule.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourcesRule.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourcesRule.java
new file mode 100644
index 0000000..1f78f53
--- /dev/null
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourcesRule.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource.normalization;
+
+import org.apache.storm.scheduler.resource.NormalizedResources;
+import org.junit.rules.TestRule;
+import org.junit.runner.Description;
+import org.junit.runners.model.Statement;
+
+public class NormalizedResourcesRule implements TestRule {
+
+ public static class NRStatement extends Statement {
+ private final Statement base;
+
+ public NRStatement(Statement base) {
+ this.base = base;
+ }
+
+ @Override
+ public void evaluate() throws Throwable {
+ NormalizedResources.resetResourceNames();
+ try {
+ base.evaluate();
+ } finally {
+ NormalizedResources.resetResourceNames();
+ }
+ }
+ }
+
+ @Override
+ public Statement apply(Statement statement, Description description) {
+ return new NRStatement(statement);
+ }
+}