You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2018/01/08 21:20:54 UTC
[1/6] storm git commit: STORM-2859: Fix a number of issues with
NormalizedResources when resource totals are zero
Repository: storm
Updated Branches:
refs/heads/master 7ecb3d73e -> e8dd1f7e2
STORM-2859: Fix a number of issues with NormalizedResources when resource totals are zero
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8aae491b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8aae491b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8aae491b
Branch: refs/heads/master
Commit: 8aae491b6b1e2be2a085f61d45d5d915203eb0ee
Parents: 873028b
Author: Stig Rohde Døssing <sr...@apache.org>
Authored: Wed Dec 13 16:47:31 2017 +0100
Committer: Stig Rohde Døssing <sr...@apache.org>
Committed: Thu Jan 4 16:36:21 2018 +0100
----------------------------------------------------------------------
.../apache/storm/blobstore/BlobStoreUtils.java | 4 +-
.../resource/NormalizedResourceOffer.java | 10 +-
.../resource/NormalizedResourceRequest.java | 7 +-
.../scheduler/resource/NormalizedResources.java | 191 ++++++++++++++-----
.../DefaultResourceAwareStrategy.java | 13 +-
.../TestDefaultResourceAwareStrategy.java | 39 ++--
.../TestGenericResourceAwareStrategy.java | 4 +-
7 files changed, 180 insertions(+), 88 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/8aae491b/storm-server/src/main/java/org/apache/storm/blobstore/BlobStoreUtils.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/blobstore/BlobStoreUtils.java b/storm-server/src/main/java/org/apache/storm/blobstore/BlobStoreUtils.java
index 9aca91c..d660ab0 100644
--- a/storm-server/src/main/java/org/apache/storm/blobstore/BlobStoreUtils.java
+++ b/storm-server/src/main/java/org/apache/storm/blobstore/BlobStoreUtils.java
@@ -196,10 +196,10 @@ public class BlobStoreUtils {
// Catching and logging KeyNotFoundException because, if
// there is a subsequent update and delete, the non-leader
// nimbodes might throw an exception.
- LOG.info("KeyNotFoundException {}", knf);
+ LOG.info("KeyNotFoundException", knf);
} catch (Exception exp) {
// Logging an exception while client is connecting
- LOG.error("Exception {}", exp);
+ LOG.error("Exception", exp);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/8aae491b/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceOffer.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceOffer.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceOffer.java
index 2623655..58f12d0 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceOffer.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceOffer.java
@@ -34,17 +34,18 @@ public class NormalizedResourceOffer extends NormalizedResources {
private double totalMemory;
/**
- * Create a new normalized set of resources. Note that memory is not covered here becasue it is not consistent in requests vs offers
+ * Create a new normalized set of resources. Note that memory is not covered here because it is not consistent in requests vs offers
* because of how on heap vs off heap is used.
*
* @param resources the resources to be normalized.
*/
public NormalizedResourceOffer(Map<String, ? extends Number> resources) {
super(resources, null);
+ totalMemory = getNormalizedResources().getOrDefault(Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME, 0.0);
}
public NormalizedResourceOffer() {
- super(null, null);
+ this((Map<String, ? extends Number>)null);
}
public NormalizedResourceOffer(NormalizedResourceOffer other) {
@@ -53,11 +54,6 @@ public class NormalizedResourceOffer extends NormalizedResources {
}
@Override
- protected void initializeMemory(Map<String, Double> normalizedResources) {
- totalMemory = normalizedResources.getOrDefault(Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME, 0.0);
- }
-
- @Override
public double getTotalMemoryMb() {
return totalMemory;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/8aae491b/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceRequest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceRequest.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceRequest.java
index 926184c..5963750 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceRequest.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceRequest.java
@@ -110,7 +110,7 @@ public class NormalizedResourceRequest extends NormalizedResources {
private double offHeap;
/**
- * Create a new normalized set of resources. Note that memory is not covered here becasue it is not consistent in requests vs offers
+ * Create a new normalized set of resources. Note that memory is not covered here because it is not consistent in requests vs offers
* because of how on heap vs off heap is used.
*
* @param resources the resources to be normalized.
@@ -119,6 +119,7 @@ public class NormalizedResourceRequest extends NormalizedResources {
private NormalizedResourceRequest(Map<String, ? extends Number> resources,
Map<String, Object> topologyConf) {
super(resources, getDefaultResources(topologyConf));
+ initializeMemory(getNormalizedResources());
}
public NormalizedResourceRequest(ComponentCommon component, Map<String, Object> topoConf) {
@@ -131,6 +132,7 @@ public class NormalizedResourceRequest extends NormalizedResources {
public NormalizedResourceRequest() {
super(null, null);
+ initializeMemory(getNormalizedResources());
}
@Override
@@ -141,8 +143,7 @@ public class NormalizedResourceRequest extends NormalizedResources {
return ret;
}
- @Override
- protected void initializeMemory(Map<String, Double> normalizedResources) {
+ private void initializeMemory(Map<String, Double> normalizedResources) {
onHeap = normalizedResources.getOrDefault(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, 0.0);
offHeap = normalizedResources.getOrDefault(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, 0.0);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/8aae491b/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResources.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResources.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResources.java
index eea38cf..846ab69 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResources.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResources.java
@@ -37,6 +37,7 @@ import org.slf4j.LoggerFactory;
* Resources that have been normalized.
*/
public abstract class NormalizedResources {
+
private static final Logger LOG = LoggerFactory.getLogger(NormalizedResources.class);
public static final Map<String, String> RESOURCE_NAME_MAPPING;
@@ -62,7 +63,7 @@ public abstract class NormalizedResources {
}
}
//By default all of the values are 0
- double [] ret = new double[counter.get()];
+ double[] ret = new double[counter.get()];
for (Map.Entry<String, Double> entry : normalizedResources.entrySet()) {
Integer index = resourceNames.get(entry.getKey());
if (index != null) {
@@ -77,11 +78,11 @@ public abstract class NormalizedResources {
private static final AtomicInteger counter = new AtomicInteger(0);
private double cpu;
private double[] otherResources;
+ private final Map<String, Double> normalizedResources;
/**
- * This is for testing only. It allows a test to reset the mapping of resource names in the array.
- * We reset the mapping because some algorithms sadly have different behavior if a resource exists
- * or not.
+ * This is for testing only. It allows a test to reset the mapping of resource names in the array. We reset the mapping because some
+ * algorithms sadly have different behavior if a resource exists or not.
*/
@VisibleForTesting
public static void resetResourceNames() {
@@ -89,34 +90,36 @@ public abstract class NormalizedResources {
counter.set(0);
}
+ /**
+ * Copy constructor.
+ */
public NormalizedResources(NormalizedResources other) {
cpu = other.cpu;
otherResources = Arrays.copyOf(other.otherResources, other.otherResources.length);
+ normalizedResources = other.normalizedResources;
}
/**
- * Create a new normalized set of resources. Note that memory is not
- * covered here because it is not consistent in requests vs offers because
- * of how on heap vs off heap is used.
+ * Create a new normalized set of resources. Note that memory is not covered here because it is not consistent in requests vs offers
+ * because of how on heap vs off heap is used.
+ *
* @param resources the resources to be normalized.
* @param defaults the default resources that will also be normalized and combined with the real resources.
*/
public NormalizedResources(Map<String, ? extends Number> resources, Map<String, ? extends Number> defaults) {
- Map<String, Double> normalizedResources = normalizedResourceMap(defaults);
+ normalizedResources = normalizedResourceMap(defaults);
normalizedResources.putAll(normalizedResourceMap(resources));
cpu = normalizedResources.getOrDefault(Constants.COMMON_CPU_RESOURCE_NAME, 0.0);
otherResources = makeArray(normalizedResources);
- initializeMemory(normalizedResources);
}
- /**
- * Initialize any memory usage from the normalized map.
- * @param normalizedResources the normalized resource map.
- */
- protected abstract void initializeMemory(Map<String, Double> normalizedResources);
+ protected final Map<String, Double> getNormalizedResources() {
+ return this.normalizedResources;
+ }
/**
* Normalizes a supervisor resource map or topology details map's keys to universal resource names.
+ *
* @param resourceMap resource map of either Supervisor or Topology
* @return the resource map with common resource names
*/
@@ -134,12 +137,14 @@ public abstract class NormalizedResources {
/**
* Get the total amount of memory.
+ *
* @return the total amount of memory requested or provided.
*/
public abstract double getTotalMemoryMb();
/**
* Get the total amount of cpu.
+ *
* @return the amount of cpu.
*/
public double getTotalCpu() {
@@ -150,7 +155,7 @@ public abstract class NormalizedResources {
int otherLength = resourceArray.length;
int length = otherResources.length;
if (otherLength > length) {
- double [] newResources = new double[otherLength];
+ double[] newResources = new double[otherLength];
System.arraycopy(newResources, 0, otherResources, 0, length);
otherResources = newResources;
}
@@ -166,16 +171,18 @@ public abstract class NormalizedResources {
/**
* Add the resources from a worker to this.
+ *
* @param value the worker resources that should be added to this.
*/
public void add(WorkerResources value) {
- Map<String, Double> normalizedResources = value.get_resources();
- cpu += normalizedResources.getOrDefault(Constants.COMMON_CPU_RESOURCE_NAME, 0.0);
- add(makeArray(normalizedResources));
+ Map<String, Double> workerNormalizedResources = value.get_resources();
+ cpu += workerNormalizedResources.getOrDefault(Constants.COMMON_CPU_RESOURCE_NAME, 0.0);
+ add(makeArray(workerNormalizedResources));
}
/**
- * Remove the resources from other. This is the same as subtracting the resources in other from this.
+ * Remove the other resources from this. This is the same as subtracting the resources in other from this.
+ *
* @param other the resources we want removed.
*/
public void remove(NormalizedResources other) {
@@ -184,11 +191,11 @@ public abstract class NormalizedResources {
int otherLength = other.otherResources.length;
int length = otherResources.length;
if (otherLength > length) {
- double [] newResources = new double[otherLength];
+ double[] newResources = new double[otherLength];
System.arraycopy(newResources, 0, otherResources, 0, length);
otherResources = newResources;
}
- for (int i = 0; i < Math.min(length, otherLength); i++) {
+ for (int i = 0; i < otherLength; i++) {
otherResources[i] -= other.otherResources[i];
assert otherResources[i] >= 0.0;
}
@@ -196,18 +203,13 @@ public abstract class NormalizedResources {
@Override
public String toString() {
- return "CPU: " + cpu;
+ return "CPU: " + cpu + " Other resources: " + toNormalizedOtherResources();
}
- /**
- * Return a Map of the normalized resource name to a double. This should only
- * be used when returning thrift resource requests to the end user.
- */
- public Map<String,Double> toNormalizedMap() {
- HashMap<String, Double> ret = new HashMap<>();
- ret.put(Constants.COMMON_CPU_RESOURCE_NAME, cpu);
+ private Map<String, Double> toNormalizedOtherResources() {
+ Map<String, Double> ret = new HashMap<>();
int length = otherResources.length;
- for (Map.Entry<String, Integer> entry: resourceNames.entrySet()) {
+ for (Map.Entry<String, Integer> entry : resourceNames.entrySet()) {
int index = entry.getValue();
if (index < length) {
ret.put(entry.getKey(), otherResources[index]);
@@ -216,6 +218,16 @@ public abstract class NormalizedResources {
return ret;
}
+ /**
+ * Return a Map of the normalized resource name to a double. This should only be used when returning thrift resource requests to the end
+ * user.
+ */
+ public Map<String, Double> toNormalizedMap() {
+ Map<String, Double> ret = toNormalizedOtherResources();
+ ret.put(Constants.COMMON_CPU_RESOURCE_NAME, cpu);
+ return ret;
+ }
+
private double getResourceAt(int index) {
if (index >= otherResources.length) {
return 0.0;
@@ -224,8 +236,9 @@ public abstract class NormalizedResources {
}
/**
- * A simple sanity check to see if all of the resources in this would be large enough to hold the resources in other ignoring memory.
- * It does not check memory because with shared memory it is beyond the scope of this.
+ * A simple sanity check to see if all of the resources in this would be large enough to hold the resources in other ignoring memory. It
+ * does not check memory because with shared memory it is beyond the scope of this.
+ *
* @param other the resources that we want to check if they would fit in this.
* @return true if it might fit, else false if it could not possibly fit.
*/
@@ -246,42 +259,105 @@ public abstract class NormalizedResources {
return true;
}
+ private void throwBecauseResourceIsMissingFromTotal(int resourceIndex) {
+ String resourceName = null;
+ for (Map.Entry<String, Integer> entry : resourceNames.entrySet()) {
+ int index = entry.getValue();
+ if (index == resourceIndex) {
+ resourceName = entry.getKey();
+ break;
+ }
+ }
+ if (resourceName == null) {
+ throw new IllegalStateException("Array index " + resourceIndex + " is not mapped in the resource names map."
+ + " This should not be possible, and is likely a bug in the Storm code.");
+ }
+ throw new IllegalArgumentException("Total resources does not contain resource '"
+ + resourceName
+ + "'. All resources should be represented in the total. This is likely a bug in the Storm code");
+ }
+
/**
- * Calculate the average resource usage percentage with this being the total resources and
- * used being the amounts used.
+ * Calculate the average resource usage percentage with this being the total resources and used being the amounts used.
+ *
* @param used the amount of resources used.
- * @return the average percentage used 0.0 to 100.0.
+ * @return the average percentage used 0.0 to 100.0. Clamps to 100.0 in case there are no available resources in the total
*/
public double calculateAveragePercentageUsedBy(NormalizedResources used) {
+ int skippedResourceTypes = 0;
double total = 0.0;
double totalMemory = getTotalMemoryMb();
if (totalMemory != 0.0) {
total += used.getTotalMemoryMb() / totalMemory;
+ } else {
+ skippedResourceTypes++;
}
double totalCpu = getTotalCpu();
if (totalCpu != 0.0) {
total += used.getTotalCpu() / getTotalCpu();
+ } else {
+ skippedResourceTypes++;
}
- //If total is 0 we add in a 0% used, so we can just skip over anything that is not in both.
- int length = Math.min(used.otherResources.length, otherResources.length);
- for (int i = 0; i < length; i++) {
- if (otherResources[i] != 0.0) {
- total += used.otherResources[i] / otherResources[i];
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Calculating avg percentage used by. Used CPU: {} Total CPU: {} Used Mem: {} Total Mem: {}"
+ + " Other Used: {} Other Total: {}", totalCpu, used.getTotalCpu(), totalMemory, used.getTotalMemoryMb(),
+ this.toNormalizedOtherResources(), used.toNormalizedOtherResources());
+ }
+
+ if (used.otherResources.length > otherResources.length) {
+ throwBecauseResourceIsMissingFromTotal(used.otherResources.length);
+ }
+
+ for (int i = 0; i < otherResources.length; i++) {
+ double totalValue = otherResources[i];
+ if (totalValue == 0.0) {
+ //Skip any resources where the total is 0, the percent used for this resource isn't meaningful.
+ //We fall back to prioritizing by cpu, memory and any other resources by ignoring this value
+ skippedResourceTypes++;
+ continue;
}
+ double usedValue;
+ if (i >= used.otherResources.length) {
+ //Resources missing from used are using none of that resource
+ usedValue = 0.0;
+ } else {
+ usedValue = used.otherResources[i];
+ }
+ total += usedValue / totalValue;
+ }
+ //Adjust the divisor for the average to account for any skipped resources (those where the total was 0)
+ int divisor = 2 + otherResources.length - skippedResourceTypes;
+ if (divisor == 0) {
+ /*This is an arbitrary choice to make the result consistent with calculateMin.
+ Any value would be valid here, becase there are no (non-zero) resources in the total set of resources,
+ so we're trying to average 0 values.
+ */
+ return 100.0;
+ } else {
+ return (total * 100.0) / divisor;
}
- //To get the count we divide by we need to take the maximum length because we are doing an average.
- return (total * 100.0) / (2 + Math.max(otherResources.length, used.otherResources.length));
}
/**
- * Calculate the minimum resource usage percentage with this being the total resources and
- * used being the amounts used.
+ * Calculate the minimum resource usage percentage with this being the total resources and used being the amounts used.
+ *
* @param used the amount of resources used.
- * @return the minimum percentage used 0.0 to 100.0.
+ * @return the minimum percentage used 0.0 to 100.0. Clamps to 100.0 in case there are no available resources in the total.
*/
public double calculateMinPercentageUsedBy(NormalizedResources used) {
double totalMemory = getTotalMemoryMb();
double totalCpu = getTotalCpu();
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Calculating min percentage used by. Used CPU: {} Total CPU: {} Used Mem: {} Total Mem: {}"
+ + " Other Used: {} Other Total: {}", totalCpu, used.getTotalCpu(), totalMemory, used.getTotalMemoryMb(),
+ toNormalizedOtherResources(), used.toNormalizedOtherResources());
+ }
+
+ if (used.otherResources.length > otherResources.length) {
+ throwBecauseResourceIsMissingFromTotal(used.otherResources.length);
+ }
+
if (used.otherResources.length != otherResources.length
|| totalMemory == 0.0
|| totalCpu == 0.0) {
@@ -289,16 +365,27 @@ public abstract class NormalizedResources {
// and so the min would be 0.0 (assuming that we can never go negative on a resource being used.
return 0.0;
}
- double min = used.getTotalMemoryMb() / totalMemory;
- min = Math.min(min, used.getTotalCpu() / getTotalCpu());
+
+ double min = 100.0;
+ if (totalMemory != 0.0) {
+ min = Math.min(min, used.getTotalMemoryMb() / totalMemory);
+ }
+ if (totalCpu != 0.0) {
+ min = Math.min(min, used.getTotalCpu() / totalCpu);
+ }
for (int i = 0; i < otherResources.length; i++) {
- if (otherResources[i] != 0.0) {
- min = Math.min(min, used.otherResources[i] / otherResources[i]);
- } else {
- return 0.0; //0 will be the minimum, because we count values not in here as 0
+ if (otherResources[i] == 0.0) {
+ //Skip any resources where the total is 0, the percent used for this resource isn't meaningful.
+ //We fall back to prioritizing by cpu, memory and any other resources by ignoring this value
+ continue;
+ }
+ if (i >= used.otherResources.length) {
+ //Resources missing from used are using none of that resource
+ return 0;
}
+ min = Math.min(min, used.otherResources[i] / otherResources[i]);
}
return min * 100.0;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/8aae491b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
index fc746b0..e0dc881 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
@@ -20,11 +20,8 @@ package org.apache.storm.scheduler.resource.strategies.scheduling;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashSet;
-import java.util.LinkedList;
import java.util.List;
-import java.util.Map;
import java.util.TreeSet;
import org.apache.storm.Config;
@@ -33,7 +30,6 @@ import org.apache.storm.scheduler.Component;
import org.apache.storm.scheduler.ExecutorDetails;
import org.apache.storm.scheduler.TopologyDetails;
-import org.apache.storm.scheduler.resource.ResourceUtils;
import org.apache.storm.scheduler.resource.SchedulingResult;
import org.apache.storm.scheduler.resource.SchedulingStatus;
import org.slf4j.Logger;
@@ -41,7 +37,7 @@ import org.slf4j.LoggerFactory;
public class DefaultResourceAwareStrategy extends BaseResourceAwareStrategy implements IStrategy {
- private static final Logger LOG = LoggerFactory.getLogger(BaseResourceAwareStrategy.class);
+ private static final Logger LOG = LoggerFactory.getLogger(DefaultResourceAwareStrategy.class);
@Override
public SchedulingResult schedule(Cluster cluster, TopologyDetails td) {
@@ -125,10 +121,15 @@ public class DefaultResourceAwareStrategy extends BaseResourceAwareStrategy impl
protected TreeSet<ObjectResources> sortObjectResources(
final AllResources allResources, ExecutorDetails exec, TopologyDetails topologyDetails,
final ExistingScheduleFunc existingScheduleFunc) {
-
+
for (ObjectResources objectResources : allResources.objectResources) {
objectResources.effectiveResources =
allResources.availableResourcesOverall.calculateMinPercentageUsedBy(objectResources.availableResources);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Effective resources for {} is {}, and numExistingSchedule is {}",
+ objectResources.id, objectResources.effectiveResources,
+ existingScheduleFunc.getNumExistingSchedule(objectResources.id));
+ }
}
TreeSet<ObjectResources> sortedObjectResources =
http://git-wip-us.apache.org/repos/asf/storm/blob/8aae491b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
index 00f8f14..6a3fcbb 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
@@ -28,7 +28,6 @@ import org.apache.storm.scheduler.SupervisorResources;
import org.apache.storm.scheduler.ExecutorDetails;
import org.apache.storm.scheduler.INimbus;
import org.apache.storm.scheduler.SchedulerAssignment;
-import org.apache.storm.scheduler.SchedulerAssignmentImpl;
import org.apache.storm.scheduler.SupervisorDetails;
import org.apache.storm.scheduler.Topologies;
import org.apache.storm.scheduler.TopologyDetails;
@@ -53,6 +52,7 @@ import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareSched
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashSet;
import java.util.HashMap;
import java.util.Iterator;
@@ -65,7 +65,7 @@ import java.util.TreeSet;
public class TestDefaultResourceAwareStrategy {
private static final Logger LOG = LoggerFactory.getLogger(TestDefaultResourceAwareStrategy.class);
- private static int currentTime = 1450418597;
+ private static final int CURRENT_TIME = 1450418597;
private static class TestDNSToSwitchMapping implements DNSToSwitchMapping {
private final Map<String, String> result;
@@ -124,10 +124,10 @@ public class TestDefaultResourceAwareStrategy {
conf.put(Config.TOPOLOGY_NAME, "testTopology");
conf.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, 2000);
TopologyDetails topo = new TopologyDetails("testTopology-id", conf, stormToplogy, 0,
- genExecsAndComps(stormToplogy), currentTime, "user");
+ genExecsAndComps(stormToplogy), CURRENT_TIME, "user");
Topologies topologies = new Topologies(topo);
- Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), topologies, conf);
+ Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<>(), topologies, conf);
ResourceAwareScheduler rs = new ResourceAwareScheduler();
@@ -191,10 +191,10 @@ public class TestDefaultResourceAwareStrategy {
conf.put(Config.TOPOLOGY_SUBMITTER_USER, "user");
TopologyDetails topo = new TopologyDetails("testTopology-id", conf, stormToplogy, 0,
- genExecsAndComps(stormToplogy), currentTime, "user");
+ genExecsAndComps(stormToplogy), CURRENT_TIME, "user");
Topologies topologies = new Topologies(topo);
- Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), topologies, conf);
+ Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<>(), topologies, conf);
ResourceAwareScheduler rs = new ResourceAwareScheduler();
@@ -239,11 +239,16 @@ public class TestDefaultResourceAwareStrategy {
//generate some that has alot of cpu but little of memory
final Map<String, SupervisorDetails> supMapRack4 = genSupervisors(10, 4, 40, 400 + 200 + 10, 1000);
+ //Generate some that have neither resource, to verify that the strategy will prioritize this last
+ //Also put a generic resource with 0 value in the resources list, to verify that it doesn't affect the sorting
+ final Map<String, SupervisorDetails> supMapRack5 = genSupervisors(10, 4, 50, 0.0, 0.0, Collections.singletonMap("gpu.count", 0.0));
+
supMap.putAll(supMapRack0);
supMap.putAll(supMapRack1);
supMap.putAll(supMapRack2);
supMap.putAll(supMapRack3);
supMap.putAll(supMapRack4);
+ supMap.putAll(supMapRack5);
Config config = createClusterConfig(100, 500, 500, null);
config.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, Double.MAX_VALUE);
@@ -251,14 +256,14 @@ public class TestDefaultResourceAwareStrategy {
//create test DNSToSwitchMapping plugin
DNSToSwitchMapping TestNetworkTopographyPlugin =
- new TestDNSToSwitchMapping(supMapRack0, supMapRack1, supMapRack2, supMapRack3, supMapRack4);
+ new TestDNSToSwitchMapping(supMapRack0, supMapRack1, supMapRack2, supMapRack3, supMapRack4, supMapRack5);
//generate topologies
- TopologyDetails topo1 = genTopology("topo-1", config, 8, 0, 2, 0, currentTime - 2, 10, "user");
- TopologyDetails topo2 = genTopology("topo-2", config, 8, 0, 2, 0, currentTime - 2, 10, "user");
+ TopologyDetails topo1 = genTopology("topo-1", config, 8, 0, 2, 0, CURRENT_TIME - 2, 10, "user");
+ TopologyDetails topo2 = genTopology("topo-2", config, 8, 0, 2, 0, CURRENT_TIME - 2, 10, "user");
Topologies topologies = new Topologies(topo1, topo2);
- Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), topologies, config);
+ Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<>(), topologies, config);
List<String> supHostnames = new LinkedList<>();
for (SupervisorDetails sup : supMap.values()) {
@@ -284,7 +289,7 @@ public class TestDefaultResourceAwareStrategy {
TreeSet<ObjectResources> sortedRacks = rs.sortRacks(null, topo1);
LOG.info("Sorted Racks {}", sortedRacks);
- Assert.assertEquals("# of racks sorted", 5, sortedRacks.size());
+ Assert.assertEquals("# of racks sorted", 6, sortedRacks.size());
Iterator<ObjectResources> it = sortedRacks.iterator();
// Ranked first since rack-0 has the most balanced set of resources
Assert.assertEquals("rack-0 should be ordered first", "rack-0", it.next().id);
@@ -294,8 +299,10 @@ public class TestDefaultResourceAwareStrategy {
Assert.assertEquals("rack-4 should be ordered third", "rack-4", it.next().id);
// Ranked fourth since rack-3 has alot of memory but not cpu
Assert.assertEquals("rack-3 should be ordered fourth", "rack-3", it.next().id);
- //Ranked last since rack-2 has not cpu resources
+ //Ranked fifth since rack-2 has not cpu resources
Assert.assertEquals("rack-2 should be ordered fifth", "rack-2", it.next().id);
+ //Ranked last since rack-5 has neither CPU nor memory available
+ assertEquals("Rack-5 should be ordered sixth", "rack-5", it.next().id);
SchedulingResult schedulingResult = rs.schedule(cluster, topo1);
assert(schedulingResult.isSuccess());
@@ -370,17 +377,17 @@ public class TestDefaultResourceAwareStrategy {
final List<String> t1UnfavoredHostIds = Arrays.asList("host-1", "host-2", "host-3");
t1Conf.put(Config.TOPOLOGY_SCHEDULER_UNFAVORED_NODES, t1UnfavoredHostIds);
//generate topologies
- TopologyDetails topo1 = genTopology("topo-1", t1Conf, 8, 0, 2, 0, currentTime - 2, 10, "user");
+ TopologyDetails topo1 = genTopology("topo-1", t1Conf, 8, 0, 2, 0, CURRENT_TIME - 2, 10, "user");
Config t2Conf = new Config();
t2Conf.putAll(config);
t2Conf.put(Config.TOPOLOGY_SCHEDULER_FAVORED_NODES, Arrays.asList("host-31", "host-32", "host-33"));
t2Conf.put(Config.TOPOLOGY_SCHEDULER_UNFAVORED_NODES, Arrays.asList("host-11", "host-12", "host-13"));
- TopologyDetails topo2 = genTopology("topo-2", t2Conf, 8, 0, 2, 0, currentTime - 2, 10, "user");
+ TopologyDetails topo2 = genTopology("topo-2", t2Conf, 8, 0, 2, 0, CURRENT_TIME - 2, 10, "user");
Topologies topologies = new Topologies(topo1, topo2);
- Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), topologies, config);
+ Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<>(), topologies, config);
List<String> supHostnames = new LinkedList<>();
for (SupervisorDetails sup : supMap.values()) {
@@ -393,7 +400,7 @@ public class TestDefaultResourceAwareStrategy {
String rack = entry.getValue();
List<String> nodesForRack = rackToNodes.get(rack);
if (nodesForRack == null) {
- nodesForRack = new ArrayList<String>();
+ nodesForRack = new ArrayList<>();
rackToNodes.put(rack, nodesForRack);
}
nodesForRack.add(hostName);
http://git-wip-us.apache.org/repos/asf/storm/blob/8aae491b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestGenericResourceAwareStrategy.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestGenericResourceAwareStrategy.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestGenericResourceAwareStrategy.java
index fc44568..f7a2a84 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestGenericResourceAwareStrategy.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestGenericResourceAwareStrategy.java
@@ -100,7 +100,7 @@ public class TestGenericResourceAwareStrategy {
Topologies topologies = new Topologies(topo);
- Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), topologies, conf);
+ Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<>(), topologies, conf);
ResourceAwareScheduler rs = new ResourceAwareScheduler();
@@ -188,7 +188,7 @@ public class TestGenericResourceAwareStrategy {
genExecsAndComps(stormToplogy), currentTime, "user");
Topologies topologies = new Topologies(topo);
- Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), topologies, conf);
+ Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<>(), topologies, conf);
ResourceAwareScheduler rs = new ResourceAwareScheduler();
[3/6] storm git commit: Split up NormalizedResources into a few
classes with more narrow responsibilities. Make NormalizedResources a regular
class instead of an abstract class. Add some tests. Make calculateAvg/Min
throw exceptions if the resource total
Posted by bo...@apache.org.
Split up NormalizedResources into a few classes with more narrow responsibilities. Make NormalizedResources a regular class instead of an abstract class. Add some tests. Make calculateAvg/Min throw exceptions if the resource total is not a superset of the used resources.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/41db23fe
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/41db23fe
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/41db23fe
Branch: refs/heads/master
Commit: 41db23fe86299f71b6ecc7deaa70ab731899c83f
Parents: 8aae491
Author: Stig Rohde Døssing <sr...@apache.org>
Authored: Fri Jan 5 21:56:54 2018 +0100
Committer: Stig Rohde Døssing <sr...@apache.org>
Committed: Sun Jan 7 18:27:04 2018 +0100
----------------------------------------------------------------------
storm-server/pom.xml | 2 +-
.../apache/storm/blobstore/BlobStoreUtils.java | 2 -
.../org/apache/storm/daemon/nimbus/Nimbus.java | 5 +-
.../supervisor/timer/SupervisorHeartbeat.java | 16 +-
.../org/apache/storm/scheduler/Cluster.java | 5 +-
.../resource/NormalizedResourceOffer.java | 89 ++--
.../resource/NormalizedResourceRequest.java | 65 ++-
.../scheduler/resource/NormalizedResources.java | 294 ++++++--------
.../resource/NormalizedResourcesWithMemory.java | 28 ++
.../storm/scheduler/resource/RAS_Node.java | 3 -
.../storm/scheduler/resource/RAS_Nodes.java | 1 -
.../resource/ResourceAwareScheduler.java | 1 -
.../resource/ResourceMapArrayBridge.java | 89 ++++
.../resource/ResourceNameNormalizer.java | 65 +++
.../storm/scheduler/resource/ResourceUtils.java | 14 +-
.../scheduling/BaseResourceAwareStrategy.java | 6 +-
.../DefaultResourceAwareStrategy.java | 2 -
.../GenericResourceAwareStrategy.java | 3 -
.../org/apache/storm/utils/ServerUtils.java | 16 +-
.../resource/TestResourceAwareScheduler.java | 7 +-
.../TestUtilsForResourceAwareScheduler.java | 4 +-
.../normalization/NormalizedResourcesRule.java | 50 +++
.../normalization/NormalizedResourcesTest.java | 404 +++++++++++++++++++
.../ResourceMapArrayBridgeTest.java | 65 +++
.../scheduling/NormalizedResourcesRule.java | 50 ---
.../TestDefaultResourceAwareStrategy.java | 1 +
26 files changed, 941 insertions(+), 346 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/pom.xml
----------------------------------------------------------------------
diff --git a/storm-server/pom.xml b/storm-server/pom.xml
index d6554de..ff917ca 100644
--- a/storm-server/pom.xml
+++ b/storm-server/pom.xml
@@ -130,7 +130,7 @@
<artifactId>maven-checkstyle-plugin</artifactId>
<!--Note - the version would be inherited-->
<configuration>
- <maxAllowedViolations>2630</maxAllowedViolations>
+ <maxAllowedViolations>2620</maxAllowedViolations>
</configuration>
</plugin>
<plugin>
http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/main/java/org/apache/storm/blobstore/BlobStoreUtils.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/blobstore/BlobStoreUtils.java b/storm-server/src/main/java/org/apache/storm/blobstore/BlobStoreUtils.java
index d660ab0..fd1cf40 100644
--- a/storm-server/src/main/java/org/apache/storm/blobstore/BlobStoreUtils.java
+++ b/storm-server/src/main/java/org/apache/storm/blobstore/BlobStoreUtils.java
@@ -24,9 +24,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
-
import javax.security.auth.Subject;
-
import org.apache.commons.collections.CollectionUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.storm.Config;
http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
index b495cfa..2c785a5 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -52,7 +52,6 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.function.UnaryOperator;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import java.util.stream.Collectors;
import javax.security.auth.Subject;
import org.apache.storm.Config;
import org.apache.storm.Constants;
@@ -136,7 +135,6 @@ import org.apache.storm.nimbus.ITopologyActionNotifierPlugin;
import org.apache.storm.nimbus.ITopologyValidator;
import org.apache.storm.nimbus.NimbusInfo;
import org.apache.storm.scheduler.Cluster;
-import org.apache.storm.scheduler.SupervisorResources;
import org.apache.storm.scheduler.DefaultScheduler;
import org.apache.storm.scheduler.ExecutorDetails;
import org.apache.storm.scheduler.INimbus;
@@ -144,14 +142,15 @@ import org.apache.storm.scheduler.IScheduler;
import org.apache.storm.scheduler.SchedulerAssignment;
import org.apache.storm.scheduler.SchedulerAssignmentImpl;
import org.apache.storm.scheduler.SupervisorDetails;
+import org.apache.storm.scheduler.SupervisorResources;
import org.apache.storm.scheduler.Topologies;
import org.apache.storm.scheduler.TopologyDetails;
import org.apache.storm.scheduler.WorkerSlot;
import org.apache.storm.scheduler.blacklist.BlacklistScheduler;
import org.apache.storm.scheduler.multitenant.MultitenantScheduler;
-import org.apache.storm.scheduler.resource.NormalizedResourceRequest;
import org.apache.storm.scheduler.resource.ResourceAwareScheduler;
import org.apache.storm.scheduler.resource.ResourceUtils;
+import org.apache.storm.scheduler.resource.NormalizedResourceRequest;
import org.apache.storm.security.INimbusCredentialPlugin;
import org.apache.storm.security.auth.AuthUtils;
import org.apache.storm.security.auth.IAuthorizer;
http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
index 87e310e..3faf1e4 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
@@ -17,22 +17,20 @@
*/
package org.apache.storm.daemon.supervisor.timer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import org.apache.storm.Config;
import org.apache.storm.DaemonConfig;
import org.apache.storm.cluster.IStormClusterState;
import org.apache.storm.daemon.supervisor.Supervisor;
import org.apache.storm.generated.SupervisorInfo;
+import org.apache.storm.scheduler.resource.NormalizedResources;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.Time;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.storm.scheduler.resource.NormalizedResources.normalizedResourceMap;
-
public class SupervisorHeartbeat implements Runnable {
private final IStormClusterState stormClusterState;
@@ -92,7 +90,7 @@ public class SupervisorHeartbeat implements Runnable {
ret.put(stringNumberEntry.getKey(), stringNumberEntry.getValue().doubleValue());
}
- return normalizedResourceMap(ret);
+ return NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(ret);
}
@Override
http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
index 9ab0c93..1ed88c7 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
@@ -37,6 +37,7 @@ import org.apache.storm.networktopography.DNSToSwitchMapping;
import org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping;
import org.apache.storm.scheduler.resource.NormalizedResourceOffer;
import org.apache.storm.scheduler.resource.NormalizedResourceRequest;
+import org.apache.storm.scheduler.resource.NormalizedResources;
import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.ReflectionUtils;
@@ -44,8 +45,6 @@ import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.storm.scheduler.resource.NormalizedResources.normalizedResourceMap;
-
public class Cluster implements ISchedulingState {
private static final Logger LOG = LoggerFactory.getLogger(Cluster.class);
private SchedulerAssignmentImpl assignment;
@@ -440,7 +439,7 @@ public class Cluster implements ISchedulingState {
Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, shared.get_on_heap()
);
}
- sharedTotalResources = normalizedResourceMap(sharedTotalResources);
+ sharedTotalResources = NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(sharedTotalResources);
WorkerResources ret = new WorkerResources();
ret.set_resources(totalResources.toNormalizedMap());
ret.set_shared_resources(sharedTotalResources);
http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceOffer.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceOffer.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceOffer.java
index 58f12d0..787bbce 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceOffer.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceOffer.java
@@ -18,68 +18,97 @@
package org.apache.storm.scheduler.resource;
-import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import org.apache.storm.Constants;
-import org.apache.storm.generated.WorkerResources;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * An offer of resources that has been normalized.
+ * An offer of resources with normalized resource names.
*/
-public class NormalizedResourceOffer extends NormalizedResources {
+public class NormalizedResourceOffer implements NormalizedResourcesWithMemory {
+
private static final Logger LOG = LoggerFactory.getLogger(NormalizedResourceOffer.class);
- private double totalMemory;
+ private final NormalizedResources normalizedResources;
+ private double totalMemoryMb;
/**
- * Create a new normalized set of resources. Note that memory is not covered here because it is not consistent in requests vs offers
- * because of how on heap vs off heap is used.
+ * Create a new normalized resource offer.
*
* @param resources the resources to be normalized.
*/
public NormalizedResourceOffer(Map<String, ? extends Number> resources) {
- super(resources, null);
- totalMemory = getNormalizedResources().getOrDefault(Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME, 0.0);
+ Map<String, Double> normalizedResourceMap = NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(resources);
+ totalMemoryMb = normalizedResourceMap.getOrDefault(Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME, 0.0);
+ this.normalizedResources = new NormalizedResources(normalizedResourceMap);
}
public NormalizedResourceOffer() {
- this((Map<String, ? extends Number>)null);
+ this((Map<String, ? extends Number>) null);
}
public NormalizedResourceOffer(NormalizedResourceOffer other) {
- super(other);
- this.totalMemory = other.totalMemory;
+ this.totalMemoryMb = other.totalMemoryMb;
+ this.normalizedResources = new NormalizedResources(other.normalizedResources);
}
@Override
public double getTotalMemoryMb() {
- return totalMemory;
+ return totalMemoryMb;
}
- @Override
- public String toString() {
- return super.toString() + " MEM: " + totalMemory;
+ public Map<String, Double> toNormalizedMap() {
+ Map<String, Double> ret = normalizedResources.toNormalizedMap();
+ ret.put(Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME, totalMemoryMb);
+ return ret;
}
- @Override
- public Map<String,Double> toNormalizedMap() {
- Map<String, Double> ret = super.toNormalizedMap();
- ret.put(Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME, totalMemory);
- return ret;
+ public void add(NormalizedResourcesWithMemory other) {
+ normalizedResources.add(other.getNormalizedResources());
+ totalMemoryMb += other.getTotalMemoryMb();
}
- @Override
- public void add(NormalizedResources other) {
- super.add(other);
- totalMemory += other.getTotalMemoryMb();
+ public void remove(NormalizedResourcesWithMemory other) {
+ normalizedResources.remove(other.getNormalizedResources());
+ totalMemoryMb -= other.getTotalMemoryMb();
+ if (totalMemoryMb < 0.0) {
+ normalizedResources.throwBecauseResourceBecameNegative(
+ Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME, totalMemoryMb, other.getTotalMemoryMb());
+ };
+ }
+
+ /**
+ * @see NormalizedResources#calculateAveragePercentageUsedBy(org.apache.storm.scheduler.resource.normalization.NormalizedResources,
+ * double, double).
+ */
+ public double calculateAveragePercentageUsedBy(NormalizedResourceOffer used) {
+ return normalizedResources.calculateAveragePercentageUsedBy(
+ used.getNormalizedResources(), getTotalMemoryMb(), used.getTotalMemoryMb());
+ }
+
+ /**
+ * @see NormalizedResources#calculateMinPercentageUsedBy(org.apache.storm.scheduler.resource.normalization.NormalizedResources, double,
+ * double)
+ */
+ public double calculateMinPercentageUsedBy(NormalizedResourceOffer used) {
+ return normalizedResources.calculateMinPercentageUsedBy(used.getNormalizedResources(), getTotalMemoryMb(), used.getTotalMemoryMb());
+ }
+
+ /**
+ * @see NormalizedResources#couldHoldIgnoringSharedMemory(org.apache.storm.scheduler.resource.normalization.NormalizedResources, double,
+ * double).
+ */
+ public boolean couldHoldIgnoringSharedMemory(NormalizedResourcesWithMemory other) {
+ return normalizedResources.couldHoldIgnoringSharedMemory(
+ other.getNormalizedResources(), getTotalMemoryMb(), other.getTotalMemoryMb());
+ }
+
+ public double getTotalCpu() {
+ return normalizedResources.getTotalCpu();
}
@Override
- public void remove(NormalizedResources other) {
- super.remove(other);
- totalMemory -= other.getTotalMemoryMb();
- assert totalMemory >= 0.0;
+ public NormalizedResources getNormalizedResources() {
+ return normalizedResources;
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceRequest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceRequest.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceRequest.java
index 5963750..2af90ab 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceRequest.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceRequest.java
@@ -18,12 +18,8 @@
package org.apache.storm.scheduler.resource;
-import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
import org.apache.storm.Config;
import org.apache.storm.Constants;
import org.apache.storm.generated.ComponentCommon;
@@ -36,14 +32,15 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * A request that has been normalized.
+ * A resource request with normalized resource names.
*/
-public class NormalizedResourceRequest extends NormalizedResources {
+public class NormalizedResourceRequest implements NormalizedResourcesWithMemory {
+
private static final Logger LOG = LoggerFactory.getLogger(NormalizedResourceRequest.class);
private static void putIfMissing(Map<String, Double> dest, String destKey, Map<String, Object> src, String srcKey) {
if (!dest.containsKey(destKey)) {
- Number value = (Number)src.get(srcKey);
+ Number value = (Number) src.get(srcKey);
if (value != null) {
dest.put(destKey, value.doubleValue());
}
@@ -51,7 +48,7 @@ public class NormalizedResourceRequest extends NormalizedResources {
}
private static Map<String, Double> getDefaultResources(Map<String, Object> topoConf) {
- Map<String, Double> ret = normalizedResourceMap((Map<String, Number>) topoConf.getOrDefault(
+ Map<String, Double> ret = NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap((Map<String, Number>) topoConf.getOrDefault(
Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, new HashMap<>()));
putIfMissing(ret, Constants.COMMON_CPU_RESOURCE_NAME, topoConf, Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);
putIfMissing(ret, Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, topoConf, Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB);
@@ -96,7 +93,6 @@ public class NormalizedResourceRequest extends NormalizedResources {
stringNumberEntry.getKey(), stringNumberEntry.getValue().doubleValue());
}
-
}
}
} catch (ParseException e) {
@@ -106,48 +102,38 @@ public class NormalizedResourceRequest extends NormalizedResources {
return topologyResources;
}
+ private final NormalizedResources normalizedResources;
private double onHeap;
private double offHeap;
- /**
- * Create a new normalized set of resources. Note that memory is not covered here because it is not consistent in requests vs offers
- * because of how on heap vs off heap is used.
- *
- * @param resources the resources to be normalized.
- * @param topologyConf the config for the topology
- */
private NormalizedResourceRequest(Map<String, ? extends Number> resources,
- Map<String, Object> topologyConf) {
- super(resources, getDefaultResources(topologyConf));
- initializeMemory(getNormalizedResources());
+ Map<String, Double> defaultResources) {
+ Map<String, Double> normalizedResourceMap = NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(defaultResources);
+ normalizedResourceMap.putAll(NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(resources));
+ onHeap = normalizedResourceMap.getOrDefault(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, 0.0);
+ offHeap = normalizedResourceMap.getOrDefault(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, 0.0);
+ normalizedResources = new NormalizedResources(normalizedResourceMap);
}
public NormalizedResourceRequest(ComponentCommon component, Map<String, Object> topoConf) {
- this(parseResources(component.get_json_conf()), topoConf);
+ this(parseResources(component.get_json_conf()), getDefaultResources(topoConf));
}
public NormalizedResourceRequest(Map<String, Object> topoConf) {
- this((Map<String, ? extends Number>) null, topoConf);
+ this((Map<String, ? extends Number>) null, getDefaultResources(topoConf));
}
public NormalizedResourceRequest() {
- super(null, null);
- initializeMemory(getNormalizedResources());
+ this((Map<String, ? extends Number>) null, null);
}
- @Override
- public Map<String,Double> toNormalizedMap() {
- Map<String, Double> ret = super.toNormalizedMap();
+ public Map<String, Double> toNormalizedMap() {
+ Map<String, Double> ret = this.normalizedResources.toNormalizedMap();
ret.put(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, offHeap);
ret.put(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, onHeap);
return ret;
}
- private void initializeMemory(Map<String, Double> normalizedResources) {
- onHeap = normalizedResources.getOrDefault(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, 0.0);
- offHeap = normalizedResources.getOrDefault(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, 0.0);
- }
-
public double getOnHeapMemoryMb() {
return onHeap;
}
@@ -166,17 +152,17 @@ public class NormalizedResourceRequest extends NormalizedResources {
/**
* Add the resources in other to this.
+ *
* @param other the other Request to add to this.
*/
public void add(NormalizedResourceRequest other) {
- super.add(other);
+ this.normalizedResources.add(other.normalizedResources);
onHeap += other.onHeap;
offHeap += other.offHeap;
}
- @Override
public void add(WorkerResources value) {
- super.add(value);
+ this.normalizedResources.add(value);
//The resources are already normalized
Map<String, Double> resources = value.get_resources();
onHeap += resources.getOrDefault(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, 0.0);
@@ -185,11 +171,20 @@ public class NormalizedResourceRequest extends NormalizedResources {
@Override
public double getTotalMemoryMb() {
- return getOnHeapMemoryMb() + getOffHeapMemoryMb();
+ return onHeap + offHeap;
}
@Override
public String toString() {
return super.toString() + " onHeap: " + onHeap + " offHeap: " + offHeap;
}
+
+ public double getTotalCpu() {
+ return this.normalizedResources.getTotalCpu();
+ }
+
+ @Override
+ public NormalizedResources getNormalizedResources() {
+ return this.normalizedResources;
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResources.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResources.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResources.java
index 846ab69..f8e911a 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResources.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResources.java
@@ -20,74 +20,39 @@ package org.apache.storm.scheduler.resource;
import com.google.common.annotations.VisibleForTesting;
import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
-import org.apache.storm.Config;
+import org.apache.commons.lang.Validate;
import org.apache.storm.Constants;
import org.apache.storm.generated.WorkerResources;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Resources that have been normalized.
+ * Resources that have been normalized. This class is intended as a delegate for more specific types of normalized resource set, since it
+ * does not keep track of memory as a resource.
*/
-public abstract class NormalizedResources {
+public class NormalizedResources {
private static final Logger LOG = LoggerFactory.getLogger(NormalizedResources.class);
- public static final Map<String, String> RESOURCE_NAME_MAPPING;
- static {
- Map<String, String> tmp = new HashMap<>();
- tmp.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, Constants.COMMON_CPU_RESOURCE_NAME);
- tmp.put(Config.SUPERVISOR_CPU_CAPACITY, Constants.COMMON_CPU_RESOURCE_NAME);
- tmp.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME);
- tmp.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME);
- tmp.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME);
- RESOURCE_NAME_MAPPING = Collections.unmodifiableMap(tmp);
- }
+ public static ResourceNameNormalizer RESOURCE_NAME_NORMALIZER;
+ private static ResourceMapArrayBridge RESOURCE_MAP_ARRAY_BRIDGE;
- private static double[] makeArray(Map<String, Double> normalizedResources) {
- //To avoid locking we will go through the map twice. It should be small so it is probably not a big deal
- for (String key : normalizedResources.keySet()) {
- //We are going to skip over CPU and Memory, because they are captured elsewhere
- if (!Constants.COMMON_CPU_RESOURCE_NAME.equals(key)
- && !Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME.equals(key)
- && !Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME.equals(key)
- && !Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME.equals(key)) {
- resourceNames.computeIfAbsent(key, (k) -> counter.getAndIncrement());
- }
- }
- //By default all of the values are 0
- double[] ret = new double[counter.get()];
- for (Map.Entry<String, Double> entry : normalizedResources.entrySet()) {
- Integer index = resourceNames.get(entry.getKey());
- if (index != null) {
- //index == null if it is memory or CPU
- ret[index] = entry.getValue();
- }
- }
- return ret;
+ static {
+ resetResourceNames();
}
- private static final ConcurrentMap<String, Integer> resourceNames = new ConcurrentHashMap<>();
- private static final AtomicInteger counter = new AtomicInteger(0);
private double cpu;
private double[] otherResources;
- private final Map<String, Double> normalizedResources;
/**
- * This is for testing only. It allows a test to reset the mapping of resource names in the array. We reset the mapping because some
+ * This is for testing only. It allows a test to reset the static state relating to resource names. We reset the mapping because some
* algorithms sadly have different behavior if a resource exists or not.
*/
@VisibleForTesting
public static void resetResourceNames() {
- resourceNames.clear();
- counter.set(0);
+ RESOURCE_NAME_NORMALIZER = new ResourceNameNormalizer();
+ RESOURCE_MAP_ARRAY_BRIDGE = new ResourceMapArrayBridge();
}
/**
@@ -96,53 +61,21 @@ public abstract class NormalizedResources {
public NormalizedResources(NormalizedResources other) {
cpu = other.cpu;
otherResources = Arrays.copyOf(other.otherResources, other.otherResources.length);
- normalizedResources = other.normalizedResources;
}
/**
- * Create a new normalized set of resources. Note that memory is not covered here because it is not consistent in requests vs offers
- * because of how on heap vs off heap is used.
+ * Create a new normalized set of resources. Note that memory is not managed by this class, as it is not consistent in requests vs
+ * offers because of how on heap vs off heap is used.
*
- * @param resources the resources to be normalized.
- * @param defaults the default resources that will also be normalized and combined with the real resources.
+ * @param normalizedResources the normalized resource map
+ * @param getTotalMemoryMb Supplier of total memory in MB.
*/
- public NormalizedResources(Map<String, ? extends Number> resources, Map<String, ? extends Number> defaults) {
- normalizedResources = normalizedResourceMap(defaults);
- normalizedResources.putAll(normalizedResourceMap(resources));
+ public NormalizedResources(Map<String, Double> normalizedResources) {
cpu = normalizedResources.getOrDefault(Constants.COMMON_CPU_RESOURCE_NAME, 0.0);
- otherResources = makeArray(normalizedResources);
- }
-
- protected final Map<String, Double> getNormalizedResources() {
- return this.normalizedResources;
+ otherResources = RESOURCE_MAP_ARRAY_BRIDGE.translateToResourceArray(normalizedResources);
}
/**
- * Normalizes a supervisor resource map or topology details map's keys to universal resource names.
- *
- * @param resourceMap resource map of either Supervisor or Topology
- * @return the resource map with common resource names
- */
- public static Map<String, Double> normalizedResourceMap(Map<String, ? extends Number> resourceMap) {
- if (resourceMap == null) {
- return new HashMap<>();
- }
- return new HashMap<>(resourceMap.entrySet().stream()
- .collect(Collectors.toMap(
- //Map the key if needed
- (e) -> RESOURCE_NAME_MAPPING.getOrDefault(e.getKey(), e.getKey()),
- //Map the value
- (e) -> e.getValue().doubleValue())));
- }
-
- /**
- * Get the total amount of memory.
- *
- * @return the total amount of memory requested or provided.
- */
- public abstract double getTotalMemoryMb();
-
- /**
* Get the total amount of cpu.
*
* @return the amount of cpu.
@@ -150,15 +83,18 @@ public abstract class NormalizedResources {
public double getTotalCpu() {
return cpu;
}
+
+ private void zeroPadOtherResourcesIfNecessary(int requiredLength) {
+ if (requiredLength > otherResources.length) {
+ double[] newResources = new double[requiredLength];
+ System.arraycopy(otherResources, 0, newResources, 0, otherResources.length);
+ otherResources = newResources;
+ }
+ }
private void add(double[] resourceArray) {
int otherLength = resourceArray.length;
- int length = otherResources.length;
- if (otherLength > length) {
- double[] newResources = new double[otherLength];
- System.arraycopy(newResources, 0, otherResources, 0, length);
- otherResources = newResources;
- }
+ zeroPadOtherResourcesIfNecessary(otherLength);
for (int i = 0; i < otherLength; i++) {
otherResources[i] += resourceArray[i];
}
@@ -177,45 +113,39 @@ public abstract class NormalizedResources {
public void add(WorkerResources value) {
Map<String, Double> workerNormalizedResources = value.get_resources();
cpu += workerNormalizedResources.getOrDefault(Constants.COMMON_CPU_RESOURCE_NAME, 0.0);
- add(makeArray(workerNormalizedResources));
+ add(RESOURCE_MAP_ARRAY_BRIDGE.translateToResourceArray(workerNormalizedResources));
}
+ public void throwBecauseResourceBecameNegative(String resourceName, double currentValue, double subtractedValue) {
+ throw new IllegalArgumentException(String.format("Resource amounts should never be negative."
+ + " Resource '%s' with current value '%f' became negative because '%f' was removed.",
+ resourceName, currentValue, subtractedValue));
+ }
+
/**
* Remove the other resources from this. This is the same as subtracting the resources in other from this.
*
* @param other the resources we want removed.
+ * @throws IllegalArgumentException if subtracting other from this would result in any resource amount becoming negative.
*/
public void remove(NormalizedResources other) {
this.cpu -= other.cpu;
- assert cpu >= 0.0;
- int otherLength = other.otherResources.length;
- int length = otherResources.length;
- if (otherLength > length) {
- double[] newResources = new double[otherLength];
- System.arraycopy(newResources, 0, otherResources, 0, length);
- otherResources = newResources;
+ if (cpu < 0.0) {
+ throwBecauseResourceBecameNegative(Constants.COMMON_CPU_RESOURCE_NAME, cpu, other.cpu);
}
+ int otherLength = other.otherResources.length;
+ zeroPadOtherResourcesIfNecessary(otherLength);
for (int i = 0; i < otherLength; i++) {
otherResources[i] -= other.otherResources[i];
- assert otherResources[i] >= 0.0;
+ if (otherResources[i] < 0.0) {
+ throwBecauseResourceBecameNegative(getResourceNameForResourceIndex(i), otherResources[i], other.otherResources[i]);
+ }
}
}
@Override
public String toString() {
- return "CPU: " + cpu + " Other resources: " + toNormalizedOtherResources();
- }
-
- private Map<String, Double> toNormalizedOtherResources() {
- Map<String, Double> ret = new HashMap<>();
- int length = otherResources.length;
- for (Map.Entry<String, Integer> entry : resourceNames.entrySet()) {
- int index = entry.getValue();
- if (index < length) {
- ret.put(entry.getKey(), otherResources[index]);
- }
- }
- return ret;
+ return "Normalized resources: " + toNormalizedMap();
}
/**
@@ -223,7 +153,7 @@ public abstract class NormalizedResources {
* user.
*/
public Map<String, Double> toNormalizedMap() {
- Map<String, Double> ret = toNormalizedOtherResources();
+ Map<String, Double> ret = RESOURCE_MAP_ARRAY_BRIDGE.translateFromResourceArray(otherResources);
ret.put(Constants.COMMON_CPU_RESOURCE_NAME, cpu);
return ret;
}
@@ -240,9 +170,11 @@ public abstract class NormalizedResources {
* does not check memory because with shared memory it is beyond the scope of this.
*
* @param other the resources that we want to check if they would fit in this.
+ * @param thisTotalMemoryMb The total memory in MB of this
+ * @param otherTotalMemoryMb The total memory in MB of other
* @return true if it might fit, else false if it could not possibly fit.
*/
- public boolean couldHoldIgnoringSharedMemory(NormalizedResources other) {
+ public boolean couldHoldIgnoringSharedMemory(NormalizedResources other, double thisTotalMemoryMb, double otherTotalMemoryMb) {
if (this.cpu < other.getTotalCpu()) {
return false;
}
@@ -253,69 +185,70 @@ public abstract class NormalizedResources {
}
}
- if (this.getTotalMemoryMb() < other.getTotalMemoryMb()) {
- return false;
- }
- return true;
+ return thisTotalMemoryMb >= otherTotalMemoryMb;
}
- private void throwBecauseResourceIsMissingFromTotal(int resourceIndex) {
- String resourceName = null;
- for (Map.Entry<String, Integer> entry : resourceNames.entrySet()) {
+ private String getResourceNameForResourceIndex(int resourceIndex) {
+ for (Map.Entry<String, Integer> entry : RESOURCE_MAP_ARRAY_BRIDGE.getResourceNamesToArrayIndex().entrySet()) {
int index = entry.getValue();
if (index == resourceIndex) {
- resourceName = entry.getKey();
- break;
+ return entry.getKey();
}
}
- if (resourceName == null) {
- throw new IllegalStateException("Array index " + resourceIndex + " is not mapped in the resource names map."
- + " This should not be possible, and is likely a bug in the Storm code.");
- }
- throw new IllegalArgumentException("Total resources does not contain resource '"
- + resourceName
- + "'. All resources should be represented in the total. This is likely a bug in the Storm code");
+ return null;
+ }
+
+ private void throwBecauseUsedIsNotSubsetOfTotal(NormalizedResources used, double totalMemoryMb, double usedMemoryMb) {
+ throw new IllegalArgumentException(String.format("The used resources must be a subset of the total resources."
+ + " Used: '%s', Total: '%s', Used Mem: '%f', Total Mem: '%f'",
+ used.toNormalizedMap(), this.toNormalizedMap(), usedMemoryMb, totalMemoryMb));
}
/**
- * Calculate the average resource usage percentage with this being the total resources and used being the amounts used.
+ * Calculate the average resource usage percentage with this being the total resources and used being the amounts used. Used must be a
+ * subset of the total resources. If a resource in the total has a value of zero, it will be skipped in the calculation to avoid
+ * division by 0. If all resources are skipped the result is defined to be 100.0.
*
* @param used the amount of resources used.
- * @return the average percentage used 0.0 to 100.0. Clamps to 100.0 in case there are no available resources in the total
+ * @param totalMemoryMb The total memory in MB
+ * @param usedMemoryMb The used memory in MB
+ * @return the average percentage used 0.0 to 100.0.
+ * @throws IllegalArgumentException if any resource in used has a greater value than the same resource in the total, or used has generic
+ * resources that are not present in the total.
*/
- public double calculateAveragePercentageUsedBy(NormalizedResources used) {
+ public double calculateAveragePercentageUsedBy(NormalizedResources used, double totalMemoryMb, double usedMemoryMb) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Calculating avg percentage used by. Used Mem: {} Total Mem: {}"
+ + " Used Normalized Resources: {} Total Normalized Resources: {}", totalMemoryMb, usedMemoryMb,
+ toNormalizedMap(), used.toNormalizedMap());
+ }
+
int skippedResourceTypes = 0;
double total = 0.0;
- double totalMemory = getTotalMemoryMb();
- if (totalMemory != 0.0) {
- total += used.getTotalMemoryMb() / totalMemory;
+ if (usedMemoryMb > totalMemoryMb) {
+ throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
+ }
+ if (totalMemoryMb != 0.0) {
+ total += usedMemoryMb / totalMemoryMb;
} else {
skippedResourceTypes++;
}
double totalCpu = getTotalCpu();
+ if (used.getTotalCpu() > getTotalCpu()) {
+ throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
+ }
if (totalCpu != 0.0) {
total += used.getTotalCpu() / getTotalCpu();
} else {
skippedResourceTypes++;
}
- if (LOG.isTraceEnabled()) {
- LOG.trace("Calculating avg percentage used by. Used CPU: {} Total CPU: {} Used Mem: {} Total Mem: {}"
- + " Other Used: {} Other Total: {}", totalCpu, used.getTotalCpu(), totalMemory, used.getTotalMemoryMb(),
- this.toNormalizedOtherResources(), used.toNormalizedOtherResources());
- }
if (used.otherResources.length > otherResources.length) {
- throwBecauseResourceIsMissingFromTotal(used.otherResources.length);
+ throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
}
for (int i = 0; i < otherResources.length; i++) {
double totalValue = otherResources[i];
- if (totalValue == 0.0) {
- //Skip any resources where the total is 0, the percent used for this resource isn't meaningful.
- //We fall back to prioritizing by cpu, memory and any other resources by ignoring this value
- skippedResourceTypes++;
- continue;
- }
double usedValue;
if (i >= used.otherResources.length) {
//Resources missing from used are using none of that resource
@@ -323,14 +256,24 @@ public abstract class NormalizedResources {
} else {
usedValue = used.otherResources[i];
}
+ if (usedValue > totalValue) {
+ throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
+ }
+ if (totalValue == 0.0) {
+ //Skip any resources where the total is 0, the percent used for this resource isn't meaningful.
+ //We fall back to prioritizing by cpu, memory and any other resources by ignoring this value
+ skippedResourceTypes++;
+ continue;
+ }
+
total += usedValue / totalValue;
}
//Adjust the divisor for the average to account for any skipped resources (those where the total was 0)
int divisor = 2 + otherResources.length - skippedResourceTypes;
if (divisor == 0) {
- /*This is an arbitrary choice to make the result consistent with calculateMin.
- Any value would be valid here, becase there are no (non-zero) resources in the total set of resources,
- so we're trying to average 0 values.
+ /*
+ * This is an arbitrary choice to make the result consistent with calculateMin. Any value would be valid here, becase there are
+ * no (non-zero) resources in the total set of resources, so we're trying to average 0 values.
*/
return 100.0;
} else {
@@ -339,41 +282,43 @@ public abstract class NormalizedResources {
}
/**
- * Calculate the minimum resource usage percentage with this being the total resources and used being the amounts used.
+ * Calculate the minimum resource usage percentage with this being the total resources and used being the amounts used. Used must be a
+ * subset of the total resources. If a resource in the total has a value of zero, it will be skipped in the calculation to avoid
+ * division by 0. If all resources are skipped the result is defined to be 100.0.
*
* @param used the amount of resources used.
- * @return the minimum percentage used 0.0 to 100.0. Clamps to 100.0 in case there are no available resources in the total.
+ * @param totalMemoryMb The total memory in MB
+ * @param usedMemoryMb The used memory in MB
+ * @return the minimum percentage used 0.0 to 100.0.
+ * @throws IllegalArgumentException if any resource in used has a greater value than the same resource in the total, or used has generic
+ * resources that are not present in the total.
*/
- public double calculateMinPercentageUsedBy(NormalizedResources used) {
- double totalMemory = getTotalMemoryMb();
- double totalCpu = getTotalCpu();
-
+ public double calculateMinPercentageUsedBy(NormalizedResources used, double totalMemoryMb, double usedMemoryMb) {
if (LOG.isTraceEnabled()) {
- LOG.trace("Calculating min percentage used by. Used CPU: {} Total CPU: {} Used Mem: {} Total Mem: {}"
- + " Other Used: {} Other Total: {}", totalCpu, used.getTotalCpu(), totalMemory, used.getTotalMemoryMb(),
- toNormalizedOtherResources(), used.toNormalizedOtherResources());
+ LOG.trace("Calculating min percentage used by. Used Mem: {} Total Mem: {}"
+ + " Used Normalized Resources: {} Total Normalized Resources: {}", totalMemoryMb, usedMemoryMb,
+ toNormalizedMap(), used.toNormalizedMap());
}
- if (used.otherResources.length > otherResources.length) {
- throwBecauseResourceIsMissingFromTotal(used.otherResources.length);
+ double min = 1.0;
+ if (usedMemoryMb > totalMemoryMb) {
+ throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
}
-
- if (used.otherResources.length != otherResources.length
- || totalMemory == 0.0
- || totalCpu == 0.0) {
- //If the lengths don't match one of the resources will be 0, which means we would calculate the percentage to be 0.0
- // and so the min would be 0.0 (assuming that we can never go negative on a resource being used.
- return 0.0;
+ if (totalMemoryMb != 0.0) {
+ min = Math.min(min, usedMemoryMb / totalMemoryMb);
}
-
- double min = 100.0;
- if (totalMemory != 0.0) {
- min = Math.min(min, used.getTotalMemoryMb() / totalMemory);
+ double totalCpu = getTotalCpu();
+ if (used.getTotalCpu() > totalCpu) {
+ throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
}
if (totalCpu != 0.0) {
min = Math.min(min, used.getTotalCpu() / totalCpu);
}
+ if (used.otherResources.length > otherResources.length) {
+ throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
+ }
+
for (int i = 0; i < otherResources.length; i++) {
if (otherResources[i] == 0.0) {
//Skip any resources where the total is 0, the percent used for this resource isn't meaningful.
@@ -384,6 +329,9 @@ public abstract class NormalizedResources {
//Resources missing from used are using none of that resource
return 0;
}
+ if (used.otherResources[i] > otherResources[i]) {
+ throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
+ }
min = Math.min(min, used.otherResources[i] / otherResources[i]);
}
return min * 100.0;
http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourcesWithMemory.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourcesWithMemory.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourcesWithMemory.java
new file mode 100644
index 0000000..640233a
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourcesWithMemory.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource;
+
+/**
+ * Intended for {@link NormalizedResources} wrappers that handle memory.
+ */
+public interface NormalizedResourcesWithMemory {
+
+ NormalizedResources getNormalizedResources();
+
+ double getTotalMemoryMb();
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
index 5350005..db9ca73 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
@@ -25,14 +25,11 @@ import java.util.LinkedList;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
-
-import org.apache.storm.Constants;
import org.apache.storm.scheduler.Cluster;
import org.apache.storm.scheduler.ExecutorDetails;
import org.apache.storm.scheduler.SupervisorDetails;
import org.apache.storm.scheduler.TopologyDetails;
import org.apache.storm.scheduler.WorkerSlot;
-import org.apache.storm.utils.ObjectReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Nodes.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Nodes.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Nodes.java
index 6c70ff1..2bd2b86 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Nodes.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Nodes.java
@@ -22,7 +22,6 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
-
import org.apache.storm.scheduler.Cluster;
import org.apache.storm.scheduler.ExecutorDetails;
import org.apache.storm.scheduler.SchedulerAssignment;
http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
index dc557ff..e730705 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
@@ -30,7 +30,6 @@ import org.apache.storm.DaemonConfig;
import org.apache.storm.scheduler.Cluster;
import org.apache.storm.scheduler.IScheduler;
import org.apache.storm.scheduler.SchedulerAssignment;
-import org.apache.storm.scheduler.SchedulerAssignmentImpl;
import org.apache.storm.scheduler.SingleTopologyCluster;
import org.apache.storm.scheduler.Topologies;
import org.apache.storm.scheduler.TopologyDetails;
http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceMapArrayBridge.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceMapArrayBridge.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceMapArrayBridge.java
new file mode 100644
index 0000000..cf4f80b
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceMapArrayBridge.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.storm.Constants;
+
+/**
+ * Provides translation between normalized resource maps and resource value arrays. Some operations use resource value arrays instead of the
+ * full normalized resource map as an optimization. See {@link NormalizedResources}.
+ */
+public class ResourceMapArrayBridge {
+
+ private final ConcurrentMap<String, Integer> resourceNamesToArrayIndex = new ConcurrentHashMap<>();
+ private final AtomicInteger counter = new AtomicInteger(0);
+
+ /**
+ * Translates a normalized resource map to an array of resource values. Each resource name will be assigned an index in the array, which
+ * is guaranteed to be consistent with subsequent invocations of this method. Note that CPU and memory resources are not translated by
+ * this method, as they are expected to be captured elsewhere.
+ *
+ * @param normalizedResources The resources to translate to an array
+ * @return The array of resource values
+ */
+ public double[] translateToResourceArray(Map<String, Double> normalizedResources) {
+ //To avoid locking we will go through the map twice. It should be small so it is probably not a big deal
+ for (String key : normalizedResources.keySet()) {
+ //We are going to skip over CPU and Memory, because they are captured elsewhere
+ if (!Constants.COMMON_CPU_RESOURCE_NAME.equals(key)
+ && !Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME.equals(key)
+ && !Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME.equals(key)
+ && !Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME.equals(key)) {
+ resourceNamesToArrayIndex.computeIfAbsent(key, (k) -> counter.getAndIncrement());
+ }
+ }
+ //By default all of the values are 0
+ double[] ret = new double[counter.get()];
+ for (Map.Entry<String, Double> entry : normalizedResources.entrySet()) {
+ Integer index = resourceNamesToArrayIndex.get(entry.getKey());
+ if (index != null) {
+ //index == null if it is memory or CPU
+ ret[index] = entry.getValue();
+ }
+ }
+ return ret;
+ }
+
+ /**
+ * Translates an array of resource values to a normalized resource map.
+ *
+ * @param resources The resource array to translate
+ * @return The normalized resource map
+ */
+ public Map<String, Double> translateFromResourceArray(double[] resources) {
+ Map<String, Double> ret = new HashMap<>();
+ int length = resources.length;
+ for (Map.Entry<String, Integer> entry : resourceNamesToArrayIndex.entrySet()) {
+ int index = entry.getValue();
+ if (index < length) {
+ ret.put(entry.getKey(), resources[index]);
+ }
+ }
+ return ret;
+ }
+
+ public Map<String, Integer> getResourceNamesToArrayIndex() {
+ return Collections.unmodifiableMap(resourceNamesToArrayIndex);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceNameNormalizer.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceNameNormalizer.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceNameNormalizer.java
new file mode 100644
index 0000000..d59ba92
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceNameNormalizer.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.storm.Config;
+import org.apache.storm.Constants;
+
+/**
+ * Provides resource name normalization for resource maps.
+ */
+public class ResourceNameNormalizer {
+
+ private final Map<String, String> resourceNameMapping;
+
+ public ResourceNameNormalizer() {
+ Map<String, String> tmp = new HashMap<>();
+ tmp.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, Constants.COMMON_CPU_RESOURCE_NAME);
+ tmp.put(Config.SUPERVISOR_CPU_CAPACITY, Constants.COMMON_CPU_RESOURCE_NAME);
+ tmp.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME);
+ tmp.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME);
+ tmp.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME);
+ resourceNameMapping = Collections.unmodifiableMap(tmp);
+ }
+
+ /**
+ * Normalizes a supervisor resource map or topology details map's keys to universal resource names.
+ *
+ * @param resourceMap resource map of either Supervisor or Topology
+ * @return the resource map with common resource names
+ */
+ public Map<String, Double> normalizedResourceMap(Map<String, ? extends Number> resourceMap) {
+ if (resourceMap == null) {
+ return new HashMap<>();
+ }
+ return new HashMap<>(resourceMap.entrySet().stream()
+ .collect(Collectors.toMap(
+ //Map the key if needed
+ (e) -> resourceNameMapping.getOrDefault(e.getKey(), e.getKey()),
+ //Map the value
+ (e) -> e.getValue().doubleValue())));
+ }
+
+ public Map<String, String> getResourceNameMapping() {
+ return resourceNameMapping;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java
index db9dbfa..ca4ea63 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java
@@ -20,9 +20,7 @@ package org.apache.storm.scheduler.resource;
import java.util.HashMap;
import java.util.Map;
-
import org.apache.storm.Config;
-import org.apache.storm.Constants;
import org.apache.storm.generated.Bolt;
import org.apache.storm.generated.ComponentCommon;
import org.apache.storm.generated.SpoutSpec;
@@ -33,8 +31,6 @@ import org.json.simple.parser.ParseException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.storm.scheduler.resource.NormalizedResources.normalizedResourceMap;
-
public class ResourceUtils {
private static final Logger LOG = LoggerFactory.getLogger(ResourceUtils.class);
@@ -95,7 +91,8 @@ public class ResourceUtils {
if (resourceUpdatesMap.containsKey(spoutName)) {
ComponentCommon spoutCommon = spoutSpec.get_common();
- Map<String, Double> resourcesUpdate = normalizedResourceMap(resourceUpdatesMap.get(spoutName));
+ Map<String, Double> resourcesUpdate = NormalizedResources
+ .RESOURCE_NAME_NORMALIZER.normalizedResourceMap(resourceUpdatesMap.get(spoutName));
String newJsonConf = getJsonWithUpdatedResources(spoutCommon.get_json_conf(), resourcesUpdate);
spoutCommon.set_json_conf(newJsonConf);
componentsUpdated.put(spoutName, resourcesUpdate);
@@ -110,7 +107,8 @@ public class ResourceUtils {
if (resourceUpdatesMap.containsKey(boltName)) {
ComponentCommon boltCommon = boltObj.get_common();
- Map<String, Double> resourcesUpdate = normalizedResourceMap(resourceUpdatesMap.get(boltName));
+ Map<String, Double> resourcesUpdate = NormalizedResources
+ .RESOURCE_NAME_NORMALIZER.normalizedResourceMap(resourceUpdatesMap.get(boltName));
String newJsonConf = getJsonWithUpdatedResources(boltCommon.get_json_conf(), resourceUpdatesMap.get(boltName));
boltCommon.set_json_conf(newJsonConf);
componentsUpdated.put(boltName, resourcesUpdate);
@@ -128,7 +126,7 @@ public class ResourceUtils {
}
public static String getCorrespondingLegacyResourceName(String normalizedResourceName) {
- for(Map.Entry<String, String> entry : NormalizedResources.RESOURCE_NAME_MAPPING.entrySet()) {
+ for(Map.Entry<String, String> entry : NormalizedResources.RESOURCE_NAME_NORMALIZER.getResourceNameMapping().entrySet()) {
if (entry.getValue().equals(normalizedResourceName)) {
return entry.getKey();
}
@@ -148,7 +146,7 @@ public class ResourceUtils {
);
for (Map.Entry<String, Double> resourceUpdateEntry : resourceUpdates.entrySet()) {
- if (NormalizedResources.RESOURCE_NAME_MAPPING.containsValue(resourceUpdateEntry.getKey())) {
+ if (NormalizedResources.RESOURCE_NAME_NORMALIZER.getResourceNameMapping().containsValue(resourceUpdateEntry.getKey())) {
// if there will be legacy values they will be in the outer conf
jsonObject.remove(getCorrespondingLegacyResourceName(resourceUpdateEntry.getKey()));
componentResourceMap.remove(getCorrespondingLegacyResourceName(resourceUpdateEntry.getKey()));
http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
index 67bc867..73e1aa0 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
@@ -19,7 +19,6 @@
package org.apache.storm.scheduler.resource.strategies.scheduling;
import com.google.common.annotations.VisibleForTesting;
-
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -31,18 +30,15 @@ import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.TreeSet;
-
-import org.apache.storm.executor.Executor;
import org.apache.storm.generated.ComponentType;
import org.apache.storm.scheduler.Cluster;
import org.apache.storm.scheduler.Component;
import org.apache.storm.scheduler.ExecutorDetails;
import org.apache.storm.scheduler.TopologyDetails;
import org.apache.storm.scheduler.WorkerSlot;
-import org.apache.storm.scheduler.resource.NormalizedResourceOffer;
import org.apache.storm.scheduler.resource.RAS_Node;
import org.apache.storm.scheduler.resource.RAS_Nodes;
-import org.apache.storm.scheduler.resource.ResourceUtils;
+import org.apache.storm.scheduler.resource.NormalizedResourceOffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
index e0dc881..efa3ecf 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
@@ -23,13 +23,11 @@ import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.TreeSet;
-
import org.apache.storm.Config;
import org.apache.storm.scheduler.Cluster;
import org.apache.storm.scheduler.Component;
import org.apache.storm.scheduler.ExecutorDetails;
import org.apache.storm.scheduler.TopologyDetails;
-
import org.apache.storm.scheduler.resource.SchedulingResult;
import org.apache.storm.scheduler.resource.SchedulingStatus;
import org.slf4j.Logger;
http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/GenericResourceAwareStrategy.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/GenericResourceAwareStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/GenericResourceAwareStrategy.java
index 108dc49..2b0ca40 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/GenericResourceAwareStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/GenericResourceAwareStrategy.java
@@ -21,16 +21,13 @@ package org.apache.storm.scheduler.resource.strategies.scheduling;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
-import java.util.LinkedList;
import java.util.List;
-import java.util.Map;
import java.util.TreeSet;
import org.apache.storm.Config;
import org.apache.storm.scheduler.Cluster;
import org.apache.storm.scheduler.Component;
import org.apache.storm.scheduler.ExecutorDetails;
import org.apache.storm.scheduler.TopologyDetails;
-import org.apache.storm.scheduler.resource.ResourceUtils;
import org.apache.storm.scheduler.resource.SchedulingResult;
import org.apache.storm.scheduler.resource.SchedulingStatus;
import org.slf4j.Logger;
http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java b/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java
index a64c9b4..6f33251 100644
--- a/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java
+++ b/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java
@@ -25,28 +25,27 @@ import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.FileWriter;
-import java.io.InputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.io.OutputStream;
import java.io.PrintStream;
import java.io.RandomAccessFile;
-import java.nio.file.Files;
import java.nio.file.FileSystems;
+import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
-import java.util.jar.JarEntry;
-import java.util.jar.JarFile;
import java.util.List;
import java.util.Map;
+import java.util.jar.JarEntry;
+import java.util.jar.JarFile;
import java.util.zip.GZIPInputStream;
import java.util.zip.ZipEntry;
import java.util.zip.ZipFile;
import javax.security.auth.Subject;
-
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.commons.exec.CommandLine;
@@ -54,16 +53,15 @@ import org.apache.commons.exec.DefaultExecutor;
import org.apache.commons.exec.ExecuteException;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
+import org.apache.storm.Config;
+import org.apache.storm.DaemonConfig;
import org.apache.storm.blobstore.BlobStore;
import org.apache.storm.blobstore.BlobStoreAclHandler;
import org.apache.storm.blobstore.ClientBlobStore;
import org.apache.storm.blobstore.InputStreamWithMeta;
import org.apache.storm.blobstore.LocalFsBlobStore;
import org.apache.storm.blobstore.LocalModeClientBlobStore;
-import org.apache.storm.Config;
-import org.apache.storm.Constants;
import org.apache.storm.daemon.StormCommon;
-import org.apache.storm.DaemonConfig;
import org.apache.storm.generated.AccessControl;
import org.apache.storm.generated.AccessControlType;
import org.apache.storm.generated.AuthorizationException;
@@ -73,8 +71,8 @@ import org.apache.storm.generated.ReadableBlobMeta;
import org.apache.storm.generated.SettableBlobMeta;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.nimbus.NimbusInfo;
-import org.apache.storm.scheduler.resource.NormalizedResourceRequest;
import org.apache.storm.scheduler.resource.ResourceUtils;
+import org.apache.storm.scheduler.resource.NormalizedResourceRequest;
import org.apache.storm.security.auth.SingleUserPrincipal;
import org.apache.thrift.TException;
import org.slf4j.Logger;
http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
index eb5f70f..360db3a 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
@@ -501,9 +501,6 @@ public class TestResourceAwareScheduler {
}
public void testHeterogeneousCluster(Config topologyConf, String strategyName) {
- Map<String, Double> test = new HashMap<>();
- test.put("gpu.count", 0.0);
- new NormalizedResourceOffer(test);
LOG.info("\n\n\t\ttestHeterogeneousCluster");
INimbus iNimbus = new INimbusTest();
Map<String, Double> resourceMap1 = new HashMap<>(); // strong supervisor node
@@ -513,8 +510,8 @@ public class TestResourceAwareScheduler {
resourceMap2.put(Config.SUPERVISOR_CPU_CAPACITY, 200.0);
resourceMap2.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 1024.0);
- resourceMap1 = NormalizedResources.normalizedResourceMap(resourceMap1);
- resourceMap2 = NormalizedResources.normalizedResourceMap(resourceMap2);
+ resourceMap1 = NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(resourceMap1);
+ resourceMap2 = NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(resourceMap2);
Map<String, SupervisorDetails> supMap = new HashMap<>();
for (int i = 0; i < 2; i++) {
http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
index 4a2e644..3d37ad9 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
@@ -65,8 +65,6 @@ import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import static org.apache.storm.scheduler.resource.NormalizedResources.normalizedResourceMap;
-
public class TestUtilsForResourceAwareScheduler {
private static final Logger LOG = LoggerFactory.getLogger(TestUtilsForResourceAwareScheduler.class);
@@ -155,7 +153,7 @@ public class TestUtilsForResourceAwareScheduler {
for (int j = 0; j < numPorts; j++) {
ports.add(j);
}
- SupervisorDetails sup = new SupervisorDetails("sup-" + i, "host-" + i, null, ports, normalizedResourceMap(resourceMap));
+ SupervisorDetails sup = new SupervisorDetails("sup-" + i, "host-" + i, null, ports, NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(resourceMap));
retList.put(sup.getId(), sup);
}
return retList;
http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/test/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourcesRule.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourcesRule.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourcesRule.java
new file mode 100644
index 0000000..1f78f53
--- /dev/null
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourcesRule.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource.normalization;
+
+import org.apache.storm.scheduler.resource.NormalizedResources;
+import org.junit.rules.TestRule;
+import org.junit.runner.Description;
+import org.junit.runners.model.Statement;
+
+public class NormalizedResourcesRule implements TestRule {
+
+ public static class NRStatement extends Statement {
+ private final Statement base;
+
+ public NRStatement(Statement base) {
+ this.base = base;
+ }
+
+ @Override
+ public void evaluate() throws Throwable {
+ NormalizedResources.resetResourceNames();
+ try {
+ base.evaluate();
+ } finally {
+ NormalizedResources.resetResourceNames();
+ }
+ }
+ }
+
+ @Override
+ public Statement apply(Statement statement, Description description) {
+ return new NRStatement(statement);
+ }
+}
[5/6] storm git commit: Move some resource normalization classes to a
new package since the resource package was getting crowded
Posted by bo...@apache.org.
Move some resource normalization classes to a new package since the resource package was getting crowded
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/88533346
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/88533346
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/88533346
Branch: refs/heads/master
Commit: 88533346dbfb318d8bf623703f6a16a1eab19534
Parents: 41db23f
Author: Stig Rohde Døssing <sr...@apache.org>
Authored: Sun Jan 7 18:12:55 2018 +0100
Committer: Stig Rohde Døssing <sr...@apache.org>
Committed: Sun Jan 7 18:43:11 2018 +0100
----------------------------------------------------------------------
storm-server/pom.xml | 2 +-
.../org/apache/storm/daemon/nimbus/Nimbus.java | 2 +-
.../supervisor/timer/SupervisorHeartbeat.java | 2 +-
.../org/apache/storm/scheduler/Cluster.java | 6 +-
.../storm/scheduler/ISchedulingState.java | 5 +-
.../storm/scheduler/SupervisorDetails.java | 2 +-
.../apache/storm/scheduler/TopologyDetails.java | 2 +-
.../resource/NormalizedResourceOffer.java | 114 ------
.../resource/NormalizedResourceRequest.java | 190 ----------
.../scheduler/resource/NormalizedResources.java | 339 ------------------
.../resource/NormalizedResourcesWithMemory.java | 28 --
.../storm/scheduler/resource/RAS_Node.java | 1 +
.../resource/ResourceMapArrayBridge.java | 89 -----
.../resource/ResourceNameNormalizer.java | 65 ----
.../storm/scheduler/resource/ResourceUtils.java | 2 +
.../normalization/NormalizedResourceOffer.java | 114 ++++++
.../NormalizedResourceRequest.java | 190 ++++++++++
.../normalization/NormalizedResources.java | 343 +++++++++++++++++++
.../NormalizedResourcesWithMemory.java | 28 ++
.../normalization/ResourceMapArrayBridge.java | 89 +++++
.../normalization/ResourceNameNormalizer.java | 68 ++++
.../scheduling/BaseResourceAwareStrategy.java | 2 +-
.../org/apache/storm/utils/ServerUtils.java | 2 +-
.../resource/TestResourceAwareScheduler.java | 1 +
.../TestUtilsForResourceAwareScheduler.java | 1 +
.../normalization/NormalizedResourcesRule.java | 1 -
.../normalization/NormalizedResourcesTest.java | 1 -
.../ResourceMapArrayBridgeTest.java | 2 -
28 files changed, 849 insertions(+), 842 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/pom.xml
----------------------------------------------------------------------
diff --git a/storm-server/pom.xml b/storm-server/pom.xml
index ff917ca..7c301a4 100644
--- a/storm-server/pom.xml
+++ b/storm-server/pom.xml
@@ -130,7 +130,7 @@
<artifactId>maven-checkstyle-plugin</artifactId>
<!--Note - the version would be inherited-->
<configuration>
- <maxAllowedViolations>2620</maxAllowedViolations>
+ <maxAllowedViolations>2617</maxAllowedViolations>
</configuration>
</plugin>
<plugin>
http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
index 2c785a5..e66dbe5 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -150,7 +150,7 @@ import org.apache.storm.scheduler.blacklist.BlacklistScheduler;
import org.apache.storm.scheduler.multitenant.MultitenantScheduler;
import org.apache.storm.scheduler.resource.ResourceAwareScheduler;
import org.apache.storm.scheduler.resource.ResourceUtils;
-import org.apache.storm.scheduler.resource.NormalizedResourceRequest;
+import org.apache.storm.scheduler.resource.normalization.NormalizedResourceRequest;
import org.apache.storm.security.INimbusCredentialPlugin;
import org.apache.storm.security.auth.AuthUtils;
import org.apache.storm.security.auth.IAuthorizer;
http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
index 3faf1e4..2be241a 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
@@ -27,7 +27,7 @@ import org.apache.storm.DaemonConfig;
import org.apache.storm.cluster.IStormClusterState;
import org.apache.storm.daemon.supervisor.Supervisor;
import org.apache.storm.generated.SupervisorInfo;
-import org.apache.storm.scheduler.resource.NormalizedResources;
+import org.apache.storm.scheduler.resource.normalization.NormalizedResources;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.Time;
http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
index 1ed88c7..625fe6f 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
@@ -35,9 +35,9 @@ import org.apache.storm.generated.SharedMemory;
import org.apache.storm.generated.WorkerResources;
import org.apache.storm.networktopography.DNSToSwitchMapping;
import org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping;
-import org.apache.storm.scheduler.resource.NormalizedResourceOffer;
-import org.apache.storm.scheduler.resource.NormalizedResourceRequest;
-import org.apache.storm.scheduler.resource.NormalizedResources;
+import org.apache.storm.scheduler.resource.normalization.NormalizedResourceOffer;
+import org.apache.storm.scheduler.resource.normalization.NormalizedResourceRequest;
+import org.apache.storm.scheduler.resource.normalization.NormalizedResources;
import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.ReflectionUtils;
http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java b/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java
index 7175b7e..187a03c 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java
@@ -22,11 +22,10 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
-
import org.apache.storm.daemon.nimbus.TopologyResources;
import org.apache.storm.generated.WorkerResources;
-import org.apache.storm.scheduler.resource.NormalizedResourceOffer;
-import org.apache.storm.scheduler.resource.NormalizedResourceRequest;
+import org.apache.storm.scheduler.resource.normalization.NormalizedResourceOffer;
+import org.apache.storm.scheduler.resource.normalization.NormalizedResourceRequest;
/** An interface that provides access to the current scheduling state. */
public interface ISchedulingState {
http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/main/java/org/apache/storm/scheduler/SupervisorDetails.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/SupervisorDetails.java b/storm-server/src/main/java/org/apache/storm/scheduler/SupervisorDetails.java
index 3d08715..242b54c 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/SupervisorDetails.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/SupervisorDetails.java
@@ -23,7 +23,7 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.storm.Constants;
-import org.apache.storm.scheduler.resource.NormalizedResourceOffer;
+import org.apache.storm.scheduler.resource.normalization.NormalizedResourceOffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java b/storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java
index 8e4c1d5..41e7edf 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java
@@ -35,7 +35,7 @@ import org.apache.storm.generated.GlobalStreamId;
import org.apache.storm.generated.SharedMemory;
import org.apache.storm.generated.SpoutSpec;
import org.apache.storm.generated.StormTopology;
-import org.apache.storm.scheduler.resource.NormalizedResourceRequest;
+import org.apache.storm.scheduler.resource.normalization.NormalizedResourceRequest;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.Time;
import org.apache.storm.utils.Utils;
http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceOffer.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceOffer.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceOffer.java
deleted file mode 100644
index 787bbce..0000000
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceOffer.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.scheduler.resource;
-
-import java.util.Map;
-import org.apache.storm.Constants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * An offer of resources with normalized resource names.
- */
-public class NormalizedResourceOffer implements NormalizedResourcesWithMemory {
-
- private static final Logger LOG = LoggerFactory.getLogger(NormalizedResourceOffer.class);
- private final NormalizedResources normalizedResources;
- private double totalMemoryMb;
-
- /**
- * Create a new normalized resource offer.
- *
- * @param resources the resources to be normalized.
- */
- public NormalizedResourceOffer(Map<String, ? extends Number> resources) {
- Map<String, Double> normalizedResourceMap = NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(resources);
- totalMemoryMb = normalizedResourceMap.getOrDefault(Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME, 0.0);
- this.normalizedResources = new NormalizedResources(normalizedResourceMap);
- }
-
- public NormalizedResourceOffer() {
- this((Map<String, ? extends Number>) null);
- }
-
- public NormalizedResourceOffer(NormalizedResourceOffer other) {
- this.totalMemoryMb = other.totalMemoryMb;
- this.normalizedResources = new NormalizedResources(other.normalizedResources);
- }
-
- @Override
- public double getTotalMemoryMb() {
- return totalMemoryMb;
- }
-
- public Map<String, Double> toNormalizedMap() {
- Map<String, Double> ret = normalizedResources.toNormalizedMap();
- ret.put(Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME, totalMemoryMb);
- return ret;
- }
-
- public void add(NormalizedResourcesWithMemory other) {
- normalizedResources.add(other.getNormalizedResources());
- totalMemoryMb += other.getTotalMemoryMb();
- }
-
- public void remove(NormalizedResourcesWithMemory other) {
- normalizedResources.remove(other.getNormalizedResources());
- totalMemoryMb -= other.getTotalMemoryMb();
- if (totalMemoryMb < 0.0) {
- normalizedResources.throwBecauseResourceBecameNegative(
- Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME, totalMemoryMb, other.getTotalMemoryMb());
- };
- }
-
- /**
- * @see NormalizedResources#calculateAveragePercentageUsedBy(org.apache.storm.scheduler.resource.normalization.NormalizedResources,
- * double, double).
- */
- public double calculateAveragePercentageUsedBy(NormalizedResourceOffer used) {
- return normalizedResources.calculateAveragePercentageUsedBy(
- used.getNormalizedResources(), getTotalMemoryMb(), used.getTotalMemoryMb());
- }
-
- /**
- * @see NormalizedResources#calculateMinPercentageUsedBy(org.apache.storm.scheduler.resource.normalization.NormalizedResources, double,
- * double)
- */
- public double calculateMinPercentageUsedBy(NormalizedResourceOffer used) {
- return normalizedResources.calculateMinPercentageUsedBy(used.getNormalizedResources(), getTotalMemoryMb(), used.getTotalMemoryMb());
- }
-
- /**
- * @see NormalizedResources#couldHoldIgnoringSharedMemory(org.apache.storm.scheduler.resource.normalization.NormalizedResources, double,
- * double).
- */
- public boolean couldHoldIgnoringSharedMemory(NormalizedResourcesWithMemory other) {
- return normalizedResources.couldHoldIgnoringSharedMemory(
- other.getNormalizedResources(), getTotalMemoryMb(), other.getTotalMemoryMb());
- }
-
- public double getTotalCpu() {
- return normalizedResources.getTotalCpu();
- }
-
- @Override
- public NormalizedResources getNormalizedResources() {
- return normalizedResources;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceRequest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceRequest.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceRequest.java
deleted file mode 100644
index 2af90ab..0000000
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceRequest.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.scheduler.resource;
-
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.storm.Config;
-import org.apache.storm.Constants;
-import org.apache.storm.generated.ComponentCommon;
-import org.apache.storm.generated.WorkerResources;
-import org.apache.storm.utils.ObjectReader;
-import org.json.simple.JSONObject;
-import org.json.simple.parser.JSONParser;
-import org.json.simple.parser.ParseException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A resource request with normalized resource names.
- */
-public class NormalizedResourceRequest implements NormalizedResourcesWithMemory {
-
- private static final Logger LOG = LoggerFactory.getLogger(NormalizedResourceRequest.class);
-
- private static void putIfMissing(Map<String, Double> dest, String destKey, Map<String, Object> src, String srcKey) {
- if (!dest.containsKey(destKey)) {
- Number value = (Number) src.get(srcKey);
- if (value != null) {
- dest.put(destKey, value.doubleValue());
- }
- }
- }
-
- private static Map<String, Double> getDefaultResources(Map<String, Object> topoConf) {
- Map<String, Double> ret = NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap((Map<String, Number>) topoConf.getOrDefault(
- Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, new HashMap<>()));
- putIfMissing(ret, Constants.COMMON_CPU_RESOURCE_NAME, topoConf, Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);
- putIfMissing(ret, Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, topoConf, Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB);
- putIfMissing(ret, Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, topoConf, Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
- return ret;
- }
-
- private static Map<String, Double> parseResources(String input) {
- Map<String, Double> topologyResources = new HashMap<>();
- JSONParser parser = new JSONParser();
- LOG.debug("Input to parseResources {}", input);
- try {
- if (input != null) {
- Object obj = parser.parse(input);
- JSONObject jsonObject = (JSONObject) obj;
-
- // Legacy resource parsing
- if (jsonObject.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB)) {
- Double topoMemOnHeap = ObjectReader
- .getDouble(jsonObject.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB), null);
- topologyResources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, topoMemOnHeap);
- }
- if (jsonObject.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB)) {
- Double topoMemOffHeap = ObjectReader
- .getDouble(jsonObject.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB), null);
- topologyResources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, topoMemOffHeap);
- }
- if (jsonObject.containsKey(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT)) {
- Double topoCpu = ObjectReader.getDouble(jsonObject.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT),
- null);
- topologyResources.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, topoCpu);
- }
-
- // If resource is also present in resources map will overwrite the above
- if (jsonObject.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP)) {
- Map<String, Number> rawResourcesMap =
- (Map<String, Number>) jsonObject.computeIfAbsent(
- Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, (k) -> new HashMap<>());
-
- for (Map.Entry<String, Number> stringNumberEntry : rawResourcesMap.entrySet()) {
- topologyResources.put(
- stringNumberEntry.getKey(), stringNumberEntry.getValue().doubleValue());
- }
-
- }
- }
- } catch (ParseException e) {
- LOG.error("Failed to parse component resources is:" + e.toString(), e);
- return null;
- }
- return topologyResources;
- }
-
- private final NormalizedResources normalizedResources;
- private double onHeap;
- private double offHeap;
-
- private NormalizedResourceRequest(Map<String, ? extends Number> resources,
- Map<String, Double> defaultResources) {
- Map<String, Double> normalizedResourceMap = NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(defaultResources);
- normalizedResourceMap.putAll(NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(resources));
- onHeap = normalizedResourceMap.getOrDefault(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, 0.0);
- offHeap = normalizedResourceMap.getOrDefault(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, 0.0);
- normalizedResources = new NormalizedResources(normalizedResourceMap);
- }
-
- public NormalizedResourceRequest(ComponentCommon component, Map<String, Object> topoConf) {
- this(parseResources(component.get_json_conf()), getDefaultResources(topoConf));
- }
-
- public NormalizedResourceRequest(Map<String, Object> topoConf) {
- this((Map<String, ? extends Number>) null, getDefaultResources(topoConf));
- }
-
- public NormalizedResourceRequest() {
- this((Map<String, ? extends Number>) null, null);
- }
-
- public Map<String, Double> toNormalizedMap() {
- Map<String, Double> ret = this.normalizedResources.toNormalizedMap();
- ret.put(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, offHeap);
- ret.put(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, onHeap);
- return ret;
- }
-
- public double getOnHeapMemoryMb() {
- return onHeap;
- }
-
- public void addOnHeap(final double onHeap) {
- this.onHeap += onHeap;
- }
-
- public double getOffHeapMemoryMb() {
- return offHeap;
- }
-
- public void addOffHeap(final double offHeap) {
- this.offHeap += offHeap;
- }
-
- /**
- * Add the resources in other to this.
- *
- * @param other the other Request to add to this.
- */
- public void add(NormalizedResourceRequest other) {
- this.normalizedResources.add(other.normalizedResources);
- onHeap += other.onHeap;
- offHeap += other.offHeap;
- }
-
- public void add(WorkerResources value) {
- this.normalizedResources.add(value);
- //The resources are already normalized
- Map<String, Double> resources = value.get_resources();
- onHeap += resources.getOrDefault(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, 0.0);
- offHeap += resources.getOrDefault(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, 0.0);
- }
-
- @Override
- public double getTotalMemoryMb() {
- return onHeap + offHeap;
- }
-
- @Override
- public String toString() {
- return super.toString() + " onHeap: " + onHeap + " offHeap: " + offHeap;
- }
-
- public double getTotalCpu() {
- return this.normalizedResources.getTotalCpu();
- }
-
- @Override
- public NormalizedResources getNormalizedResources() {
- return this.normalizedResources;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResources.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResources.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResources.java
deleted file mode 100644
index f8e911a..0000000
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResources.java
+++ /dev/null
@@ -1,339 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.scheduler.resource;
-
-import com.google.common.annotations.VisibleForTesting;
-import java.util.Arrays;
-import java.util.Map;
-import org.apache.commons.lang.Validate;
-import org.apache.storm.Constants;
-import org.apache.storm.generated.WorkerResources;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Resources that have been normalized. This class is intended as a delegate for more specific types of normalized resource set, since it
- * does not keep track of memory as a resource.
- */
-public class NormalizedResources {
-
- private static final Logger LOG = LoggerFactory.getLogger(NormalizedResources.class);
-
- public static ResourceNameNormalizer RESOURCE_NAME_NORMALIZER;
- private static ResourceMapArrayBridge RESOURCE_MAP_ARRAY_BRIDGE;
-
- static {
- resetResourceNames();
- }
-
- private double cpu;
- private double[] otherResources;
-
- /**
- * This is for testing only. It allows a test to reset the static state relating to resource names. We reset the mapping because some
- * algorithms sadly have different behavior if a resource exists or not.
- */
- @VisibleForTesting
- public static void resetResourceNames() {
- RESOURCE_NAME_NORMALIZER = new ResourceNameNormalizer();
- RESOURCE_MAP_ARRAY_BRIDGE = new ResourceMapArrayBridge();
- }
-
- /**
- * Copy constructor.
- */
- public NormalizedResources(NormalizedResources other) {
- cpu = other.cpu;
- otherResources = Arrays.copyOf(other.otherResources, other.otherResources.length);
- }
-
- /**
- * Create a new normalized set of resources. Note that memory is not managed by this class, as it is not consistent in requests vs
- * offers because of how on heap vs off heap is used.
- *
- * @param normalizedResources the normalized resource map
- * @param getTotalMemoryMb Supplier of total memory in MB.
- */
- public NormalizedResources(Map<String, Double> normalizedResources) {
- cpu = normalizedResources.getOrDefault(Constants.COMMON_CPU_RESOURCE_NAME, 0.0);
- otherResources = RESOURCE_MAP_ARRAY_BRIDGE.translateToResourceArray(normalizedResources);
- }
-
- /**
- * Get the total amount of cpu.
- *
- * @return the amount of cpu.
- */
- public double getTotalCpu() {
- return cpu;
- }
-
- private void zeroPadOtherResourcesIfNecessary(int requiredLength) {
- if (requiredLength > otherResources.length) {
- double[] newResources = new double[requiredLength];
- System.arraycopy(otherResources, 0, newResources, 0, otherResources.length);
- otherResources = newResources;
- }
- }
-
- private void add(double[] resourceArray) {
- int otherLength = resourceArray.length;
- zeroPadOtherResourcesIfNecessary(otherLength);
- for (int i = 0; i < otherLength; i++) {
- otherResources[i] += resourceArray[i];
- }
- }
-
- public void add(NormalizedResources other) {
- this.cpu += other.cpu;
- add(other.otherResources);
- }
-
- /**
- * Add the resources from a worker to this.
- *
- * @param value the worker resources that should be added to this.
- */
- public void add(WorkerResources value) {
- Map<String, Double> workerNormalizedResources = value.get_resources();
- cpu += workerNormalizedResources.getOrDefault(Constants.COMMON_CPU_RESOURCE_NAME, 0.0);
- add(RESOURCE_MAP_ARRAY_BRIDGE.translateToResourceArray(workerNormalizedResources));
- }
-
- public void throwBecauseResourceBecameNegative(String resourceName, double currentValue, double subtractedValue) {
- throw new IllegalArgumentException(String.format("Resource amounts should never be negative."
- + " Resource '%s' with current value '%f' became negative because '%f' was removed.",
- resourceName, currentValue, subtractedValue));
- }
-
- /**
- * Remove the other resources from this. This is the same as subtracting the resources in other from this.
- *
- * @param other the resources we want removed.
- * @throws IllegalArgumentException if subtracting other from this would result in any resource amount becoming negative.
- */
- public void remove(NormalizedResources other) {
- this.cpu -= other.cpu;
- if (cpu < 0.0) {
- throwBecauseResourceBecameNegative(Constants.COMMON_CPU_RESOURCE_NAME, cpu, other.cpu);
- }
- int otherLength = other.otherResources.length;
- zeroPadOtherResourcesIfNecessary(otherLength);
- for (int i = 0; i < otherLength; i++) {
- otherResources[i] -= other.otherResources[i];
- if (otherResources[i] < 0.0) {
- throwBecauseResourceBecameNegative(getResourceNameForResourceIndex(i), otherResources[i], other.otherResources[i]);
- }
- }
- }
-
- @Override
- public String toString() {
- return "Normalized resources: " + toNormalizedMap();
- }
-
- /**
- * Return a Map of the normalized resource name to a double. This should only be used when returning thrift resource requests to the end
- * user.
- */
- public Map<String, Double> toNormalizedMap() {
- Map<String, Double> ret = RESOURCE_MAP_ARRAY_BRIDGE.translateFromResourceArray(otherResources);
- ret.put(Constants.COMMON_CPU_RESOURCE_NAME, cpu);
- return ret;
- }
-
- private double getResourceAt(int index) {
- if (index >= otherResources.length) {
- return 0.0;
- }
- return otherResources[index];
- }
-
- /**
- * A simple sanity check to see if all of the resources in this would be large enough to hold the resources in other ignoring memory. It
- * does not check memory because with shared memory it is beyond the scope of this.
- *
- * @param other the resources that we want to check if they would fit in this.
- * @param thisTotalMemoryMb The total memory in MB of this
- * @param otherTotalMemoryMb The total memory in MB of other
- * @return true if it might fit, else false if it could not possibly fit.
- */
- public boolean couldHoldIgnoringSharedMemory(NormalizedResources other, double thisTotalMemoryMb, double otherTotalMemoryMb) {
- if (this.cpu < other.getTotalCpu()) {
- return false;
- }
- int length = Math.max(this.otherResources.length, other.otherResources.length);
- for (int i = 0; i < length; i++) {
- if (getResourceAt(i) < other.getResourceAt(i)) {
- return false;
- }
- }
-
- return thisTotalMemoryMb >= otherTotalMemoryMb;
- }
-
- private String getResourceNameForResourceIndex(int resourceIndex) {
- for (Map.Entry<String, Integer> entry : RESOURCE_MAP_ARRAY_BRIDGE.getResourceNamesToArrayIndex().entrySet()) {
- int index = entry.getValue();
- if (index == resourceIndex) {
- return entry.getKey();
- }
- }
- return null;
- }
-
- private void throwBecauseUsedIsNotSubsetOfTotal(NormalizedResources used, double totalMemoryMb, double usedMemoryMb) {
- throw new IllegalArgumentException(String.format("The used resources must be a subset of the total resources."
- + " Used: '%s', Total: '%s', Used Mem: '%f', Total Mem: '%f'",
- used.toNormalizedMap(), this.toNormalizedMap(), usedMemoryMb, totalMemoryMb));
- }
-
- /**
- * Calculate the average resource usage percentage with this being the total resources and used being the amounts used. Used must be a
- * subset of the total resources. If a resource in the total has a value of zero, it will be skipped in the calculation to avoid
- * division by 0. If all resources are skipped the result is defined to be 100.0.
- *
- * @param used the amount of resources used.
- * @param totalMemoryMb The total memory in MB
- * @param usedMemoryMb The used memory in MB
- * @return the average percentage used 0.0 to 100.0.
- * @throws IllegalArgumentException if any resource in used has a greater value than the same resource in the total, or used has generic
- * resources that are not present in the total.
- */
- public double calculateAveragePercentageUsedBy(NormalizedResources used, double totalMemoryMb, double usedMemoryMb) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Calculating avg percentage used by. Used Mem: {} Total Mem: {}"
- + " Used Normalized Resources: {} Total Normalized Resources: {}", totalMemoryMb, usedMemoryMb,
- toNormalizedMap(), used.toNormalizedMap());
- }
-
- int skippedResourceTypes = 0;
- double total = 0.0;
- if (usedMemoryMb > totalMemoryMb) {
- throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
- }
- if (totalMemoryMb != 0.0) {
- total += usedMemoryMb / totalMemoryMb;
- } else {
- skippedResourceTypes++;
- }
- double totalCpu = getTotalCpu();
- if (used.getTotalCpu() > getTotalCpu()) {
- throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
- }
- if (totalCpu != 0.0) {
- total += used.getTotalCpu() / getTotalCpu();
- } else {
- skippedResourceTypes++;
- }
-
- if (used.otherResources.length > otherResources.length) {
- throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
- }
-
- for (int i = 0; i < otherResources.length; i++) {
- double totalValue = otherResources[i];
- double usedValue;
- if (i >= used.otherResources.length) {
- //Resources missing from used are using none of that resource
- usedValue = 0.0;
- } else {
- usedValue = used.otherResources[i];
- }
- if (usedValue > totalValue) {
- throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
- }
- if (totalValue == 0.0) {
- //Skip any resources where the total is 0, the percent used for this resource isn't meaningful.
- //We fall back to prioritizing by cpu, memory and any other resources by ignoring this value
- skippedResourceTypes++;
- continue;
- }
-
- total += usedValue / totalValue;
- }
- //Adjust the divisor for the average to account for any skipped resources (those where the total was 0)
- int divisor = 2 + otherResources.length - skippedResourceTypes;
- if (divisor == 0) {
- /*
- * This is an arbitrary choice to make the result consistent with calculateMin. Any value would be valid here, becase there are
- * no (non-zero) resources in the total set of resources, so we're trying to average 0 values.
- */
- return 100.0;
- } else {
- return (total * 100.0) / divisor;
- }
- }
-
- /**
- * Calculate the minimum resource usage percentage with this being the total resources and used being the amounts used. Used must be a
- * subset of the total resources. If a resource in the total has a value of zero, it will be skipped in the calculation to avoid
- * division by 0. If all resources are skipped the result is defined to be 100.0.
- *
- * @param used the amount of resources used.
- * @param totalMemoryMb The total memory in MB
- * @param usedMemoryMb The used memory in MB
- * @return the minimum percentage used 0.0 to 100.0.
- * @throws IllegalArgumentException if any resource in used has a greater value than the same resource in the total, or used has generic
- * resources that are not present in the total.
- */
- public double calculateMinPercentageUsedBy(NormalizedResources used, double totalMemoryMb, double usedMemoryMb) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Calculating min percentage used by. Used Mem: {} Total Mem: {}"
- + " Used Normalized Resources: {} Total Normalized Resources: {}", totalMemoryMb, usedMemoryMb,
- toNormalizedMap(), used.toNormalizedMap());
- }
-
- double min = 1.0;
- if (usedMemoryMb > totalMemoryMb) {
- throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
- }
- if (totalMemoryMb != 0.0) {
- min = Math.min(min, usedMemoryMb / totalMemoryMb);
- }
- double totalCpu = getTotalCpu();
- if (used.getTotalCpu() > totalCpu) {
- throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
- }
- if (totalCpu != 0.0) {
- min = Math.min(min, used.getTotalCpu() / totalCpu);
- }
-
- if (used.otherResources.length > otherResources.length) {
- throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
- }
-
- for (int i = 0; i < otherResources.length; i++) {
- if (otherResources[i] == 0.0) {
- //Skip any resources where the total is 0, the percent used for this resource isn't meaningful.
- //We fall back to prioritizing by cpu, memory and any other resources by ignoring this value
- continue;
- }
- if (i >= used.otherResources.length) {
- //Resources missing from used are using none of that resource
- return 0;
- }
- if (used.otherResources[i] > otherResources[i]) {
- throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
- }
- min = Math.min(min, used.otherResources[i] / otherResources[i]);
- }
- return min * 100.0;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourcesWithMemory.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourcesWithMemory.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourcesWithMemory.java
deleted file mode 100644
index 640233a..0000000
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourcesWithMemory.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Copyright 2018 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.scheduler.resource;
-
-/**
- * Intended for {@link NormalizedResources} wrappers that handle memory.
- */
-public interface NormalizedResourcesWithMemory {
-
- NormalizedResources getNormalizedResources();
-
- double getTotalMemoryMb();
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
index db9ca73..84a95a7 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
@@ -30,6 +30,7 @@ import org.apache.storm.scheduler.ExecutorDetails;
import org.apache.storm.scheduler.SupervisorDetails;
import org.apache.storm.scheduler.TopologyDetails;
import org.apache.storm.scheduler.WorkerSlot;
+import org.apache.storm.scheduler.resource.normalization.NormalizedResourceOffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceMapArrayBridge.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceMapArrayBridge.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceMapArrayBridge.java
deleted file mode 100644
index cf4f80b..0000000
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceMapArrayBridge.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Copyright 2018 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.scheduler.resource;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.storm.Constants;
-
-/**
- * Provides translation between normalized resource maps and resource value arrays. Some operations use resource value arrays instead of the
- * full normalized resource map as an optimization. See {@link NormalizedResources}.
- */
-public class ResourceMapArrayBridge {
-
- private final ConcurrentMap<String, Integer> resourceNamesToArrayIndex = new ConcurrentHashMap<>();
- private final AtomicInteger counter = new AtomicInteger(0);
-
- /**
- * Translates a normalized resource map to an array of resource values. Each resource name will be assigned an index in the array, which
- * is guaranteed to be consistent with subsequent invocations of this method. Note that CPU and memory resources are not translated by
- * this method, as they are expected to be captured elsewhere.
- *
- * @param normalizedResources The resources to translate to an array
- * @return The array of resource values
- */
- public double[] translateToResourceArray(Map<String, Double> normalizedResources) {
- //To avoid locking we will go through the map twice. It should be small so it is probably not a big deal
- for (String key : normalizedResources.keySet()) {
- //We are going to skip over CPU and Memory, because they are captured elsewhere
- if (!Constants.COMMON_CPU_RESOURCE_NAME.equals(key)
- && !Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME.equals(key)
- && !Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME.equals(key)
- && !Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME.equals(key)) {
- resourceNamesToArrayIndex.computeIfAbsent(key, (k) -> counter.getAndIncrement());
- }
- }
- //By default all of the values are 0
- double[] ret = new double[counter.get()];
- for (Map.Entry<String, Double> entry : normalizedResources.entrySet()) {
- Integer index = resourceNamesToArrayIndex.get(entry.getKey());
- if (index != null) {
- //index == null if it is memory or CPU
- ret[index] = entry.getValue();
- }
- }
- return ret;
- }
-
- /**
- * Translates an array of resource values to a normalized resource map.
- *
- * @param resources The resource array to translate
- * @return The normalized resource map
- */
- public Map<String, Double> translateFromResourceArray(double[] resources) {
- Map<String, Double> ret = new HashMap<>();
- int length = resources.length;
- for (Map.Entry<String, Integer> entry : resourceNamesToArrayIndex.entrySet()) {
- int index = entry.getValue();
- if (index < length) {
- ret.put(entry.getKey(), resources[index]);
- }
- }
- return ret;
- }
-
- public Map<String, Integer> getResourceNamesToArrayIndex() {
- return Collections.unmodifiableMap(resourceNamesToArrayIndex);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceNameNormalizer.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceNameNormalizer.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceNameNormalizer.java
deleted file mode 100644
index d59ba92..0000000
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceNameNormalizer.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Copyright 2018 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.scheduler.resource;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.stream.Collectors;
-import org.apache.storm.Config;
-import org.apache.storm.Constants;
-
-/**
- * Provides resource name normalization for resource maps.
- */
-public class ResourceNameNormalizer {
-
- private final Map<String, String> resourceNameMapping;
-
- public ResourceNameNormalizer() {
- Map<String, String> tmp = new HashMap<>();
- tmp.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, Constants.COMMON_CPU_RESOURCE_NAME);
- tmp.put(Config.SUPERVISOR_CPU_CAPACITY, Constants.COMMON_CPU_RESOURCE_NAME);
- tmp.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME);
- tmp.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME);
- tmp.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME);
- resourceNameMapping = Collections.unmodifiableMap(tmp);
- }
-
- /**
- * Normalizes a supervisor resource map or topology details map's keys to universal resource names.
- *
- * @param resourceMap resource map of either Supervisor or Topology
- * @return the resource map with common resource names
- */
- public Map<String, Double> normalizedResourceMap(Map<String, ? extends Number> resourceMap) {
- if (resourceMap == null) {
- return new HashMap<>();
- }
- return new HashMap<>(resourceMap.entrySet().stream()
- .collect(Collectors.toMap(
- //Map the key if needed
- (e) -> resourceNameMapping.getOrDefault(e.getKey(), e.getKey()),
- //Map the value
- (e) -> e.getValue().doubleValue())));
- }
-
- public Map<String, String> getResourceNameMapping() {
- return resourceNameMapping;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java
index ca4ea63..2f9ab4c 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java
@@ -25,6 +25,8 @@ import org.apache.storm.generated.Bolt;
import org.apache.storm.generated.ComponentCommon;
import org.apache.storm.generated.SpoutSpec;
import org.apache.storm.generated.StormTopology;
+import org.apache.storm.scheduler.resource.normalization.NormalizedResourceRequest;
+import org.apache.storm.scheduler.resource.normalization.NormalizedResources;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;
http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceOffer.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceOffer.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceOffer.java
new file mode 100644
index 0000000..417dca9
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceOffer.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource.normalization;
+
+import java.util.Map;
+import org.apache.storm.Constants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An offer of resources with normalized resource names.
+ */
+public class NormalizedResourceOffer implements NormalizedResourcesWithMemory {
+
+ private static final Logger LOG = LoggerFactory.getLogger(NormalizedResourceOffer.class);
+ private final NormalizedResources normalizedResources;
+ private double totalMemoryMb;
+
+ /**
+ * Create a new normalized resource offer.
+ *
+ * @param resources the resources to be normalized.
+ */
+ public NormalizedResourceOffer(Map<String, ? extends Number> resources) {
+ Map<String, Double> normalizedResourceMap = NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(resources);
+ totalMemoryMb = normalizedResourceMap.getOrDefault(Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME, 0.0);
+ this.normalizedResources = new NormalizedResources(normalizedResourceMap);
+ }
+
+ public NormalizedResourceOffer() {
+ this((Map<String, ? extends Number>) null);
+ }
+
+ public NormalizedResourceOffer(NormalizedResourceOffer other) {
+ this.totalMemoryMb = other.totalMemoryMb;
+ this.normalizedResources = new NormalizedResources(other.normalizedResources);
+ }
+
+ @Override
+ public double getTotalMemoryMb() {
+ return totalMemoryMb;
+ }
+
+ public Map<String, Double> toNormalizedMap() {
+ Map<String, Double> ret = normalizedResources.toNormalizedMap();
+ ret.put(Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME, totalMemoryMb);
+ return ret;
+ }
+
+ public void add(NormalizedResourcesWithMemory other) {
+ normalizedResources.add(other.getNormalizedResources());
+ totalMemoryMb += other.getTotalMemoryMb();
+ }
+
+ public void remove(NormalizedResourcesWithMemory other) {
+ normalizedResources.remove(other.getNormalizedResources());
+ totalMemoryMb -= other.getTotalMemoryMb();
+ if (totalMemoryMb < 0.0) {
+ normalizedResources.throwBecauseResourceBecameNegative(
+ Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME, totalMemoryMb, other.getTotalMemoryMb());
+ };
+ }
+
+ /**
+ * @see NormalizedResources#calculateAveragePercentageUsedBy(org.apache.storm.scheduler.resource.normalization.NormalizedResources,
+ * double, double).
+ */
+ public double calculateAveragePercentageUsedBy(NormalizedResourceOffer used) {
+ return normalizedResources.calculateAveragePercentageUsedBy(
+ used.getNormalizedResources(), getTotalMemoryMb(), used.getTotalMemoryMb());
+ }
+
+ /**
+ * @see NormalizedResources#calculateMinPercentageUsedBy(org.apache.storm.scheduler.resource.normalization.NormalizedResources, double,
+ * double)
+ */
+ public double calculateMinPercentageUsedBy(NormalizedResourceOffer used) {
+ return normalizedResources.calculateMinPercentageUsedBy(used.getNormalizedResources(), getTotalMemoryMb(), used.getTotalMemoryMb());
+ }
+
+ /**
+ * @see NormalizedResources#couldHoldIgnoringSharedMemory(org.apache.storm.scheduler.resource.normalization.NormalizedResources, double,
+ * double).
+ */
+ public boolean couldHoldIgnoringSharedMemory(NormalizedResourcesWithMemory other) {
+ return normalizedResources.couldHoldIgnoringSharedMemory(
+ other.getNormalizedResources(), getTotalMemoryMb(), other.getTotalMemoryMb());
+ }
+
+ public double getTotalCpu() {
+ return normalizedResources.getTotalCpu();
+ }
+
+ @Override
+ public NormalizedResources getNormalizedResources() {
+ return normalizedResources;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceRequest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceRequest.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceRequest.java
new file mode 100644
index 0000000..6627bb5
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceRequest.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource.normalization;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.storm.Config;
+import org.apache.storm.Constants;
+import org.apache.storm.generated.ComponentCommon;
+import org.apache.storm.generated.WorkerResources;
+import org.apache.storm.utils.ObjectReader;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A resource request with normalized resource names.
+ */
+public class NormalizedResourceRequest implements NormalizedResourcesWithMemory {
+
+ private static final Logger LOG = LoggerFactory.getLogger(NormalizedResourceRequest.class);
+
+ private static void putIfMissing(Map<String, Double> dest, String destKey, Map<String, Object> src, String srcKey) {
+ if (!dest.containsKey(destKey)) {
+ Number value = (Number) src.get(srcKey);
+ if (value != null) {
+ dest.put(destKey, value.doubleValue());
+ }
+ }
+ }
+
+ private static Map<String, Double> getDefaultResources(Map<String, Object> topoConf) {
+ Map<String, Double> ret = NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap((Map<String, Number>) topoConf.getOrDefault(
+ Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, new HashMap<>()));
+ putIfMissing(ret, Constants.COMMON_CPU_RESOURCE_NAME, topoConf, Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);
+ putIfMissing(ret, Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, topoConf, Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB);
+ putIfMissing(ret, Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, topoConf, Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
+ return ret;
+ }
+
+ private static Map<String, Double> parseResources(String input) {
+ Map<String, Double> topologyResources = new HashMap<>();
+ JSONParser parser = new JSONParser();
+ LOG.debug("Input to parseResources {}", input);
+ try {
+ if (input != null) {
+ Object obj = parser.parse(input);
+ JSONObject jsonObject = (JSONObject) obj;
+
+ // Legacy resource parsing
+ if (jsonObject.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB)) {
+ Double topoMemOnHeap = ObjectReader
+ .getDouble(jsonObject.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB), null);
+ topologyResources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, topoMemOnHeap);
+ }
+ if (jsonObject.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB)) {
+ Double topoMemOffHeap = ObjectReader
+ .getDouble(jsonObject.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB), null);
+ topologyResources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, topoMemOffHeap);
+ }
+ if (jsonObject.containsKey(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT)) {
+ Double topoCpu = ObjectReader.getDouble(jsonObject.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT),
+ null);
+ topologyResources.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, topoCpu);
+ }
+
+ // If resource is also present in resources map will overwrite the above
+ if (jsonObject.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP)) {
+ Map<String, Number> rawResourcesMap =
+ (Map<String, Number>) jsonObject.computeIfAbsent(
+ Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, (k) -> new HashMap<>());
+
+ for (Map.Entry<String, Number> stringNumberEntry : rawResourcesMap.entrySet()) {
+ topologyResources.put(
+ stringNumberEntry.getKey(), stringNumberEntry.getValue().doubleValue());
+ }
+
+ }
+ }
+ } catch (ParseException e) {
+ LOG.error("Failed to parse component resources is:" + e.toString(), e);
+ return null;
+ }
+ return topologyResources;
+ }
+
+ private final NormalizedResources normalizedResources;
+ private double onHeap;
+ private double offHeap;
+
+ private NormalizedResourceRequest(Map<String, ? extends Number> resources,
+ Map<String, Double> defaultResources) {
+ Map<String, Double> normalizedResourceMap = NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(defaultResources);
+ normalizedResourceMap.putAll(NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(resources));
+ onHeap = normalizedResourceMap.getOrDefault(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, 0.0);
+ offHeap = normalizedResourceMap.getOrDefault(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, 0.0);
+ normalizedResources = new NormalizedResources(normalizedResourceMap);
+ }
+
+ public NormalizedResourceRequest(ComponentCommon component, Map<String, Object> topoConf) {
+ this(parseResources(component.get_json_conf()), getDefaultResources(topoConf));
+ }
+
+ public NormalizedResourceRequest(Map<String, Object> topoConf) {
+ this((Map<String, ? extends Number>) null, getDefaultResources(topoConf));
+ }
+
+ public NormalizedResourceRequest() {
+ this((Map<String, ? extends Number>) null, null);
+ }
+
+ public Map<String, Double> toNormalizedMap() {
+ Map<String, Double> ret = this.normalizedResources.toNormalizedMap();
+ ret.put(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, offHeap);
+ ret.put(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, onHeap);
+ return ret;
+ }
+
+ public double getOnHeapMemoryMb() {
+ return onHeap;
+ }
+
+ public void addOnHeap(final double onHeap) {
+ this.onHeap += onHeap;
+ }
+
+ public double getOffHeapMemoryMb() {
+ return offHeap;
+ }
+
+ public void addOffHeap(final double offHeap) {
+ this.offHeap += offHeap;
+ }
+
+ /**
+ * Add the resources in other to this.
+ *
+ * @param other the other Request to add to this.
+ */
+ public void add(NormalizedResourceRequest other) {
+ this.normalizedResources.add(other.normalizedResources);
+ onHeap += other.onHeap;
+ offHeap += other.offHeap;
+ }
+
+ public void add(WorkerResources value) {
+ this.normalizedResources.add(value);
+ //The resources are already normalized
+ Map<String, Double> resources = value.get_resources();
+ onHeap += resources.getOrDefault(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, 0.0);
+ offHeap += resources.getOrDefault(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, 0.0);
+ }
+
+ @Override
+ public double getTotalMemoryMb() {
+ return onHeap + offHeap;
+ }
+
+ @Override
+ public String toString() {
+ return super.toString() + " onHeap: " + onHeap + " offHeap: " + offHeap;
+ }
+
+ public double getTotalCpu() {
+ return this.normalizedResources.getTotalCpu();
+ }
+
+ @Override
+ public NormalizedResources getNormalizedResources() {
+ return this.normalizedResources;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java
new file mode 100644
index 0000000..76d5ce2
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource.normalization;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Arrays;
+import java.util.Map;
+import org.apache.storm.Constants;
+import org.apache.storm.generated.WorkerResources;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Resources that have been normalized. This class is intended as a delegate for more specific types of normalized resource set, since it
+ * does not keep track of memory as a resource.
+ */
+public class NormalizedResources {
+
+ private static final Logger LOG = LoggerFactory.getLogger(NormalizedResources.class);
+
+ public static ResourceNameNormalizer RESOURCE_NAME_NORMALIZER;
+ private static ResourceMapArrayBridge RESOURCE_MAP_ARRAY_BRIDGE;
+
+ static {
+ resetResourceNames();
+ }
+
+ private double cpu;
+ private double[] otherResources;
+
+ /**
+ * This is for testing only. It allows a test to reset the static state relating to resource names. We reset the mapping because some
+ * algorithms sadly have different behavior if a resource exists or not.
+ */
+ @VisibleForTesting
+ public static void resetResourceNames() {
+ RESOURCE_NAME_NORMALIZER = new ResourceNameNormalizer();
+ RESOURCE_MAP_ARRAY_BRIDGE = new ResourceMapArrayBridge();
+ }
+
+ /**
+ * Copy constructor.
+ */
+ public NormalizedResources(NormalizedResources other) {
+ cpu = other.cpu;
+ otherResources = Arrays.copyOf(other.otherResources, other.otherResources.length);
+ }
+
+ /**
+ * Create a new normalized set of resources. Note that memory is not managed by this class, as it is not consistent in requests vs
+ * offers because of how on heap vs off heap is used.
+ *
+ * @param normalizedResources the normalized resource map
+ */
+ public NormalizedResources(Map<String, Double> normalizedResources) {
+ cpu = normalizedResources.getOrDefault(Constants.COMMON_CPU_RESOURCE_NAME, 0.0);
+ otherResources = RESOURCE_MAP_ARRAY_BRIDGE.translateToResourceArray(normalizedResources);
+ }
+
+ /**
+ * Get the total amount of cpu.
+ *
+ * @return the amount of cpu.
+ */
+ public double getTotalCpu() {
+ return cpu;
+ }
+
+ private void zeroPadOtherResourcesIfNecessary(int requiredLength) {
+ if (requiredLength > otherResources.length) {
+ double[] newResources = new double[requiredLength];
+ System.arraycopy(otherResources, 0, newResources, 0, otherResources.length);
+ otherResources = newResources;
+ }
+ }
+
+ private void add(double[] resourceArray) {
+ int otherLength = resourceArray.length;
+ zeroPadOtherResourcesIfNecessary(otherLength);
+ for (int i = 0; i < otherLength; i++) {
+ otherResources[i] += resourceArray[i];
+ }
+ }
+
+ public void add(NormalizedResources other) {
+ this.cpu += other.cpu;
+ add(other.otherResources);
+ }
+
+ /**
+ * Add the resources from a worker to this.
+ *
+ * @param value the worker resources that should be added to this.
+ */
+ public void add(WorkerResources value) {
+ Map<String, Double> workerNormalizedResources = value.get_resources();
+ cpu += workerNormalizedResources.getOrDefault(Constants.COMMON_CPU_RESOURCE_NAME, 0.0);
+ add(RESOURCE_MAP_ARRAY_BRIDGE.translateToResourceArray(workerNormalizedResources));
+ }
+
+ /**
+ * Throw an IllegalArgumentException because a resource became negative during remove.
+ * @param resourceName The name of the resource that became negative
+ * @param currentValue The current value of the resource
+ * @param subtractedValue The value that was subtracted to make the resource negative
+ */
+ public void throwBecauseResourceBecameNegative(String resourceName, double currentValue, double subtractedValue) {
+ throw new IllegalArgumentException(String.format("Resource amounts should never be negative."
+ + " Resource '%s' with current value '%f' became negative because '%f' was removed.",
+ resourceName, currentValue, subtractedValue));
+ }
+
+ /**
+ * Remove the other resources from this. This is the same as subtracting the resources in other from this.
+ *
+ * @param other the resources we want removed.
+ * @throws IllegalArgumentException if subtracting other from this would result in any resource amount becoming negative.
+ */
+ public void remove(NormalizedResources other) {
+ this.cpu -= other.cpu;
+ if (cpu < 0.0) {
+ throwBecauseResourceBecameNegative(Constants.COMMON_CPU_RESOURCE_NAME, cpu, other.cpu);
+ }
+ int otherLength = other.otherResources.length;
+ zeroPadOtherResourcesIfNecessary(otherLength);
+ for (int i = 0; i < otherLength; i++) {
+ otherResources[i] -= other.otherResources[i];
+ if (otherResources[i] < 0.0) {
+ throwBecauseResourceBecameNegative(getResourceNameForResourceIndex(i), otherResources[i], other.otherResources[i]);
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "Normalized resources: " + toNormalizedMap();
+ }
+
+ /**
+ * Return a Map of the normalized resource name to a double. This should only be used when returning thrift resource requests to the end
+ * user.
+ */
+ public Map<String, Double> toNormalizedMap() {
+ Map<String, Double> ret = RESOURCE_MAP_ARRAY_BRIDGE.translateFromResourceArray(otherResources);
+ ret.put(Constants.COMMON_CPU_RESOURCE_NAME, cpu);
+ return ret;
+ }
+
+ private double getResourceAt(int index) {
+ if (index >= otherResources.length) {
+ return 0.0;
+ }
+ return otherResources[index];
+ }
+
+ /**
+ * A simple sanity check to see if all of the resources in this would be large enough to hold the resources in other ignoring memory. It
+ * does not check memory because with shared memory it is beyond the scope of this.
+ *
+ * @param other the resources that we want to check if they would fit in this.
+ * @param thisTotalMemoryMb The total memory in MB of this
+ * @param otherTotalMemoryMb The total memory in MB of other
+ * @return true if it might fit, else false if it could not possibly fit.
+ */
+ public boolean couldHoldIgnoringSharedMemory(NormalizedResources other, double thisTotalMemoryMb, double otherTotalMemoryMb) {
+ if (this.cpu < other.getTotalCpu()) {
+ return false;
+ }
+ int length = Math.max(this.otherResources.length, other.otherResources.length);
+ for (int i = 0; i < length; i++) {
+ if (getResourceAt(i) < other.getResourceAt(i)) {
+ return false;
+ }
+ }
+
+ return thisTotalMemoryMb >= otherTotalMemoryMb;
+ }
+
+ private String getResourceNameForResourceIndex(int resourceIndex) {
+ for (Map.Entry<String, Integer> entry : RESOURCE_MAP_ARRAY_BRIDGE.getResourceNamesToArrayIndex().entrySet()) {
+ int index = entry.getValue();
+ if (index == resourceIndex) {
+ return entry.getKey();
+ }
+ }
+ return null;
+ }
+
+ private void throwBecauseUsedIsNotSubsetOfTotal(NormalizedResources used, double totalMemoryMb, double usedMemoryMb) {
+ throw new IllegalArgumentException(String.format("The used resources must be a subset of the total resources."
+ + " Used: '%s', Total: '%s', Used Mem: '%f', Total Mem: '%f'",
+ used.toNormalizedMap(), this.toNormalizedMap(), usedMemoryMb, totalMemoryMb));
+ }
+
+ /**
+ * Calculate the average resource usage percentage with this being the total resources and used being the amounts used. Used must be a
+ * subset of the total resources. If a resource in the total has a value of zero, it will be skipped in the calculation to avoid
+ * division by 0. If all resources are skipped the result is defined to be 100.0.
+ *
+ * @param used the amount of resources used.
+ * @param totalMemoryMb The total memory in MB
+ * @param usedMemoryMb The used memory in MB
+ * @return the average percentage used 0.0 to 100.0.
+ * @throws IllegalArgumentException if any resource in used has a greater value than the same resource in the total, or used has generic
+ * resources that are not present in the total.
+ */
+ public double calculateAveragePercentageUsedBy(NormalizedResources used, double totalMemoryMb, double usedMemoryMb) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Calculating avg percentage used by. Used Mem: {} Total Mem: {}"
+ + " Used Normalized Resources: {} Total Normalized Resources: {}", totalMemoryMb, usedMemoryMb,
+ toNormalizedMap(), used.toNormalizedMap());
+ }
+
+ int skippedResourceTypes = 0;
+ double total = 0.0;
+ if (usedMemoryMb > totalMemoryMb) {
+ throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
+ }
+ if (totalMemoryMb != 0.0) {
+ total += usedMemoryMb / totalMemoryMb;
+ } else {
+ skippedResourceTypes++;
+ }
+ double totalCpu = getTotalCpu();
+ if (used.getTotalCpu() > getTotalCpu()) {
+ throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
+ }
+ if (totalCpu != 0.0) {
+ total += used.getTotalCpu() / getTotalCpu();
+ } else {
+ skippedResourceTypes++;
+ }
+
+ if (used.otherResources.length > otherResources.length) {
+ throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
+ }
+
+ for (int i = 0; i < otherResources.length; i++) {
+ double totalValue = otherResources[i];
+ double usedValue;
+ if (i >= used.otherResources.length) {
+ //Resources missing from used are using none of that resource
+ usedValue = 0.0;
+ } else {
+ usedValue = used.otherResources[i];
+ }
+ if (usedValue > totalValue) {
+ throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
+ }
+ if (totalValue == 0.0) {
+ //Skip any resources where the total is 0, the percent used for this resource isn't meaningful.
+ //We fall back to prioritizing by cpu, memory and any other resources by ignoring this value
+ skippedResourceTypes++;
+ continue;
+ }
+
+ total += usedValue / totalValue;
+ }
+ //Adjust the divisor for the average to account for any skipped resources (those where the total was 0)
+ int divisor = 2 + otherResources.length - skippedResourceTypes;
+ if (divisor == 0) {
+ /*
+ * This is an arbitrary choice to make the result consistent with calculateMin. Any value would be valid here, becase there are
+ * no (non-zero) resources in the total set of resources, so we're trying to average 0 values.
+ */
+ return 100.0;
+ } else {
+ return (total * 100.0) / divisor;
+ }
+ }
+
+ /**
+ * Calculate the minimum resource usage percentage with this being the total resources and used being the amounts used. Used must be a
+ * subset of the total resources. If a resource in the total has a value of zero, it will be skipped in the calculation to avoid
+ * division by 0. If all resources are skipped the result is defined to be 100.0.
+ *
+ * @param used the amount of resources used.
+ * @param totalMemoryMb The total memory in MB
+ * @param usedMemoryMb The used memory in MB
+ * @return the minimum percentage used 0.0 to 100.0.
+ * @throws IllegalArgumentException if any resource in used has a greater value than the same resource in the total, or used has generic
+ * resources that are not present in the total.
+ */
+ public double calculateMinPercentageUsedBy(NormalizedResources used, double totalMemoryMb, double usedMemoryMb) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Calculating min percentage used by. Used Mem: {} Total Mem: {}"
+ + " Used Normalized Resources: {} Total Normalized Resources: {}", totalMemoryMb, usedMemoryMb,
+ toNormalizedMap(), used.toNormalizedMap());
+ }
+
+ double min = 1.0;
+ if (usedMemoryMb > totalMemoryMb) {
+ throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
+ }
+ if (totalMemoryMb != 0.0) {
+ min = Math.min(min, usedMemoryMb / totalMemoryMb);
+ }
+ double totalCpu = getTotalCpu();
+ if (used.getTotalCpu() > totalCpu) {
+ throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
+ }
+ if (totalCpu != 0.0) {
+ min = Math.min(min, used.getTotalCpu() / totalCpu);
+ }
+
+ if (used.otherResources.length > otherResources.length) {
+ throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
+ }
+
+ for (int i = 0; i < otherResources.length; i++) {
+ if (otherResources[i] == 0.0) {
+ //Skip any resources where the total is 0, the percent used for this resource isn't meaningful.
+ //We fall back to prioritizing by cpu, memory and any other resources by ignoring this value
+ continue;
+ }
+ if (i >= used.otherResources.length) {
+ //Resources missing from used are using none of that resource
+ return 0;
+ }
+ if (used.otherResources[i] > otherResources[i]) {
+ throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
+ }
+ min = Math.min(min, used.otherResources[i] / otherResources[i]);
+ }
+ return min * 100.0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourcesWithMemory.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourcesWithMemory.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourcesWithMemory.java
new file mode 100644
index 0000000..5001645
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourcesWithMemory.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource.normalization;
+
+/**
+ * Intended for {@link NormalizedResources} wrappers that handle memory.
+ */
+public interface NormalizedResourcesWithMemory {
+
+ NormalizedResources getNormalizedResources();
+
+ double getTotalMemoryMb();
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/ResourceMapArrayBridge.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/ResourceMapArrayBridge.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/ResourceMapArrayBridge.java
new file mode 100644
index 0000000..2b07c65
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/ResourceMapArrayBridge.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource.normalization;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.storm.Constants;
+
+/**
+ * Provides translation between normalized resource maps and resource value arrays. Some operations use resource value arrays instead of the
+ * full normalized resource map as an optimization. See {@link NormalizedResources}.
+ */
+public class ResourceMapArrayBridge {
+
+ private final ConcurrentMap<String, Integer> resourceNamesToArrayIndex = new ConcurrentHashMap<>();
+ private final AtomicInteger counter = new AtomicInteger(0);
+
+ /**
+ * Translates a normalized resource map to an array of resource values. Each resource name will be assigned an index in the array, which
+ * is guaranteed to be consistent with subsequent invocations of this method. Note that CPU and memory resources are not translated by
+ * this method, as they are expected to be captured elsewhere.
+ *
+ * @param normalizedResources The resources to translate to an array
+ * @return The array of resource values
+ */
+ public double[] translateToResourceArray(Map<String, Double> normalizedResources) {
+ //To avoid locking we will go through the map twice. It should be small so it is probably not a big deal
+ for (String key : normalizedResources.keySet()) {
+ //We are going to skip over CPU and Memory, because they are captured elsewhere
+ if (!Constants.COMMON_CPU_RESOURCE_NAME.equals(key)
+ && !Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME.equals(key)
+ && !Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME.equals(key)
+ && !Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME.equals(key)) {
+ resourceNamesToArrayIndex.computeIfAbsent(key, (k) -> counter.getAndIncrement());
+ }
+ }
+ //By default all of the values are 0
+ double[] ret = new double[counter.get()];
+ for (Map.Entry<String, Double> entry : normalizedResources.entrySet()) {
+ Integer index = resourceNamesToArrayIndex.get(entry.getKey());
+ if (index != null) {
+ //index == null if it is memory or CPU
+ ret[index] = entry.getValue();
+ }
+ }
+ return ret;
+ }
+
+ /**
+ * Translates an array of resource values to a normalized resource map.
+ *
+ * @param resources The resource array to translate
+ * @return The normalized resource map
+ */
+ public Map<String, Double> translateFromResourceArray(double[] resources) {
+ Map<String, Double> ret = new HashMap<>();
+ int length = resources.length;
+ for (Map.Entry<String, Integer> entry : resourceNamesToArrayIndex.entrySet()) {
+ int index = entry.getValue();
+ if (index < length) {
+ ret.put(entry.getKey(), resources[index]);
+ }
+ }
+ return ret;
+ }
+
+ public Map<String, Integer> getResourceNamesToArrayIndex() {
+ return Collections.unmodifiableMap(resourceNamesToArrayIndex);
+ }
+
+}
[6/6] storm git commit: Merge branch 'STORM-2859' of
https://github.com/srdo/storm into STORM-2859
Posted by bo...@apache.org.
Merge branch 'STORM-2859' of https://github.com/srdo/storm into STORM-2859
STORM-2859: Fix a number of issues with NormalizedResources when resource totals are zero
This closes #2485
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e8dd1f7e
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e8dd1f7e
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e8dd1f7e
Branch: refs/heads/master
Commit: e8dd1f7e2df5e034a20573b89cb89e9f3529e373
Parents: 7ecb3d7 8853334
Author: Robert Evans <ev...@yahoo-inc.com>
Authored: Mon Jan 8 15:02:55 2018 -0600
Committer: Robert Evans <ev...@yahoo-inc.com>
Committed: Mon Jan 8 15:02:55 2018 -0600
----------------------------------------------------------------------
storm-server/pom.xml | 2 +-
.../apache/storm/blobstore/BlobStoreUtils.java | 6 +-
.../org/apache/storm/daemon/nimbus/Nimbus.java | 5 +-
.../supervisor/timer/SupervisorHeartbeat.java | 16 +-
.../org/apache/storm/scheduler/Cluster.java | 9 +-
.../storm/scheduler/ISchedulingState.java | 5 +-
.../storm/scheduler/SupervisorDetails.java | 2 +-
.../apache/storm/scheduler/TopologyDetails.java | 2 +-
.../resource/NormalizedResourceOffer.java | 89 ----
.../resource/NormalizedResourceRequest.java | 194 ---------
.../scheduler/resource/NormalizedResources.java | 304 --------------
.../storm/scheduler/resource/RAS_Node.java | 4 +-
.../storm/scheduler/resource/RAS_Nodes.java | 1 -
.../resource/ResourceAwareScheduler.java | 1 -
.../storm/scheduler/resource/ResourceUtils.java | 16 +-
.../normalization/NormalizedResourceOffer.java | 114 ++++++
.../NormalizedResourceRequest.java | 190 +++++++++
.../normalization/NormalizedResources.java | 343 ++++++++++++++++
.../NormalizedResourcesWithMemory.java | 28 ++
.../normalization/ResourceMapArrayBridge.java | 89 ++++
.../normalization/ResourceNameNormalizer.java | 68 ++++
.../scheduling/BaseResourceAwareStrategy.java | 6 +-
.../DefaultResourceAwareStrategy.java | 15 +-
.../GenericResourceAwareStrategy.java | 3 -
.../org/apache/storm/utils/ServerUtils.java | 16 +-
.../resource/TestResourceAwareScheduler.java | 8 +-
.../TestUtilsForResourceAwareScheduler.java | 5 +-
.../normalization/NormalizedResourcesRule.java | 49 +++
.../normalization/NormalizedResourcesTest.java | 403 +++++++++++++++++++
.../ResourceMapArrayBridgeTest.java | 63 +++
.../scheduling/NormalizedResourcesRule.java | 50 ---
.../TestDefaultResourceAwareStrategy.java | 40 +-
.../TestGenericResourceAwareStrategy.java | 4 +-
33 files changed, 1422 insertions(+), 728 deletions(-)
----------------------------------------------------------------------
[2/6] storm git commit: Split up NormalizedResources into a few
classes with more narrow responsibilities. Make NormalizedResources a regular
class instead of an abstract class. Add some tests. Make calculateAvg/Min
throw exceptions if the resource total
Posted by bo...@apache.org.
http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/test/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourcesTest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourcesTest.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourcesTest.java
new file mode 100644
index 0000000..d25e76c
--- /dev/null
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourcesTest.java
@@ -0,0 +1,404 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource.normalization;
+
+import org.apache.storm.scheduler.resource.NormalizedResources;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.BiConsumer;
+import org.apache.storm.Constants;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+public class NormalizedResourcesTest {
+
+ @Rule
+ public NormalizedResourcesRule normalizedResourcesRule = new NormalizedResourcesRule();
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
+ private final String gpuResourceName = "gpu";
+
+ private Map<String, Double> normalize(Map<String, ? extends Number> resources) {
+ return NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(resources);
+ }
+
+ @Test
+ public void testAddCpu() {
+ NormalizedResources resources = new NormalizedResources(normalize(Collections.singletonMap(Constants.COMMON_CPU_RESOURCE_NAME, 1)));
+ NormalizedResources addedResources = new NormalizedResources(normalize(Collections.singletonMap(Constants.COMMON_CPU_RESOURCE_NAME, 1)));
+
+ resources.add(addedResources);
+
+ Map<String, Double> normalizedMap = resources.toNormalizedMap();
+ assertThat(normalizedMap.get(Constants.COMMON_CPU_RESOURCE_NAME), is(2.0));
+ assertThat(resources.getTotalCpu(), is(2.0));
+ }
+
+ @Test
+ public void testAddToExistingResource() {
+ NormalizedResources resources = new NormalizedResources(normalize(Collections.singletonMap(gpuResourceName, 1)));
+ NormalizedResources addedResources = new NormalizedResources(normalize(Collections.singletonMap(gpuResourceName, 1)));
+
+ resources.add(addedResources);
+
+ Map<String, Double> normalizedMap = resources.toNormalizedMap();
+ assertThat(normalizedMap.get(gpuResourceName), is(2.0));
+ }
+
+ @Test
+ public void testAddWhenOtherHasMoreResourcesThanThis() {
+ NormalizedResources resources = new NormalizedResources(normalize(Collections.emptyMap()));
+ NormalizedResources addedResources = new NormalizedResources(normalize(Collections.singletonMap(gpuResourceName, 1)));
+
+ resources.add(addedResources);
+
+ Map<String, Double> normalizedMap = resources.toNormalizedMap();
+ assertThat(normalizedMap.get(gpuResourceName), is(1.0));
+ }
+
+ @Test
+ public void testAddWhenOtherHasDifferentResourceThanThis() {
+ String disks = "disks";
+ NormalizedResources resources = new NormalizedResources(normalize(Collections.singletonMap(disks, 23)));
+ NormalizedResources addedResources = new NormalizedResources(normalize(Collections.singletonMap(gpuResourceName, 1)));
+
+ resources.add(addedResources);
+
+ Map<String, Double> normalizedMap = resources.toNormalizedMap();
+ assertThat(normalizedMap.get(disks), is(23.0));
+ assertThat(normalizedMap.get(gpuResourceName), is(1.0));
+ }
+
+ @Test
+ public void testRemoveThrowsIfResourcesBecomeNegative() {
+ NormalizedResources resources = new NormalizedResources(normalize(Collections.singletonMap(gpuResourceName, 1)));
+ NormalizedResources removedResources = new NormalizedResources(normalize(Collections.singletonMap(gpuResourceName, 2)));
+
+ expectedException.expect(IllegalArgumentException.class);
+ resources.remove(removedResources);
+ }
+
+ @Test
+ public void testRemoveThrowsIfCpuBecomesNegative() {
+ NormalizedResources resources = new NormalizedResources(normalize(Collections.singletonMap(Constants.COMMON_CPU_RESOURCE_NAME, 1)));
+ NormalizedResources removedResources = new NormalizedResources(normalize(Collections.singletonMap(Constants.COMMON_CPU_RESOURCE_NAME, 2)));
+
+ expectedException.expect(IllegalArgumentException.class);
+ resources.remove(removedResources);
+ }
+
+ @Test
+ public void testRemoveFromCpu() {
+ NormalizedResources resources = new NormalizedResources(normalize(Collections.singletonMap(Constants.COMMON_CPU_RESOURCE_NAME, 2)));
+ NormalizedResources removedResources = new NormalizedResources(normalize(Collections.singletonMap(Constants.COMMON_CPU_RESOURCE_NAME, 1)));
+
+ resources.remove(removedResources);
+
+ Map<String, Double> normalizedMap = resources.toNormalizedMap();
+ assertThat(normalizedMap.get(Constants.COMMON_CPU_RESOURCE_NAME), is(1.0));
+ assertThat(resources.getTotalCpu(), is(1.0));
+ }
+
+ @Test
+ public void testRemoveFromExistingResources() {
+ NormalizedResources resources = new NormalizedResources(normalize(Collections.singletonMap(gpuResourceName, 15)));
+ NormalizedResources removedResources = new NormalizedResources(normalize(Collections.singletonMap(gpuResourceName, 1)));
+
+ resources.remove(removedResources);
+
+ Map<String, Double> normalizedMap = resources.toNormalizedMap();
+ assertThat(normalizedMap.get(gpuResourceName), is(14.0));
+ }
+
+ @Test
+ public void testCouldHoldWithTooFewCpus() {
+ NormalizedResources resources = new NormalizedResources(normalize(Collections.singletonMap(Constants.COMMON_CPU_RESOURCE_NAME, 1)));
+ NormalizedResources resourcesToCheck = new NormalizedResources(normalize(Collections.singletonMap(Constants.COMMON_CPU_RESOURCE_NAME, 2)));
+
+ boolean couldHold = resources.couldHoldIgnoringSharedMemory(resourcesToCheck, 100, 1);
+
+ assertThat(couldHold, is(false));
+ }
+
+ @Test
+ public void testCouldHoldWithTooFewResource() {
+ NormalizedResources resources = new NormalizedResources(normalize(Collections.singletonMap(gpuResourceName, 1)));
+ NormalizedResources resourcesToCheck = new NormalizedResources(normalize(Collections.singletonMap(gpuResourceName, 2)));
+
+ boolean couldHold = resources.couldHoldIgnoringSharedMemory(resourcesToCheck, 100, 1);
+
+ assertThat(couldHold, is(false));
+ }
+
+ @Test
+ public void testCouldHoldWithTooLittleMemory() {
+ NormalizedResources resources = new NormalizedResources(normalize(Collections.singletonMap(gpuResourceName, 1)));
+ NormalizedResources resourcesToCheck = new NormalizedResources(normalize(Collections.singletonMap(gpuResourceName, 1)));
+
+ boolean couldHold = resources.couldHoldIgnoringSharedMemory(resourcesToCheck, 100, 200);
+
+ assertThat(couldHold, is(false));
+ }
+
+ @Test
+ public void testCouldHoldWithMissingResource() {
+ NormalizedResources resources = new NormalizedResources(normalize(Collections.emptyMap()));
+ NormalizedResources resourcesToCheck = new NormalizedResources(normalize(Collections.singletonMap(gpuResourceName, 1)));
+
+ boolean couldHold = resources.couldHoldIgnoringSharedMemory(resourcesToCheck, 100, 1);
+
+ assertThat(couldHold, is(false));
+ }
+
+ @Test
+ public void testCouldHoldWithEnoughResources() {
+ Map<String, Double> allResources = new HashMap<>();
+ allResources.put(Constants.COMMON_CPU_RESOURCE_NAME, 2.0);
+ allResources.put(gpuResourceName, 2.0);
+ NormalizedResources resources = new NormalizedResources(normalize(allResources));
+ NormalizedResources resourcesToCheck = new NormalizedResources(normalize(allResources));
+
+ boolean couldHold = resources.couldHoldIgnoringSharedMemory(resourcesToCheck, 100, 100);
+
+ assertThat(couldHold, is(true));
+ }
+
+ @Test
+ public void testCalculateAvgUsageWithNoResourcesInTotal() {
+ NormalizedResources resources = new NormalizedResources(normalize(Collections.emptyMap()));
+ NormalizedResources usedResources = new NormalizedResources(normalize(Collections.emptyMap()));
+
+ double avg = resources.calculateAveragePercentageUsedBy(usedResources, 0, 0);
+
+ assertThat(avg, is(100.0));
+ }
+
+ @Test
+ public void testCalculateAvgWithOnlyCpu() {
+ NormalizedResources resources = new NormalizedResources(normalize(Collections.singletonMap(Constants.COMMON_CPU_RESOURCE_NAME, 2)));
+ NormalizedResources usedResources = new NormalizedResources(normalize(Collections.singletonMap(Constants.COMMON_CPU_RESOURCE_NAME, 1)));
+
+ double avg = resources.calculateAveragePercentageUsedBy(usedResources, 0, 0);
+
+ assertThat(avg, is(50.0));
+ }
+
+ @Test
+ public void testCalculateAvgWithCpuAndMem() {
+ NormalizedResources resources = new NormalizedResources(normalize(Collections.singletonMap(Constants.COMMON_CPU_RESOURCE_NAME, 2)));
+ NormalizedResources usedResources = new NormalizedResources(normalize(Collections.singletonMap(Constants.COMMON_CPU_RESOURCE_NAME, 1)));
+
+ double avg = resources.calculateAveragePercentageUsedBy(usedResources, 4, 1);
+
+ assertThat(avg, is((50.0 + 25.0)/2));
+ }
+
+ @Test
+ public void testCalculateAvgWithCpuMemAndGenericResource() {
+ Map<String, Double> allResourcesMap = new HashMap<>();
+ allResourcesMap.put(Constants.COMMON_CPU_RESOURCE_NAME, 2.0);
+ allResourcesMap.put(gpuResourceName, 10.0);
+ NormalizedResources resources = new NormalizedResources(normalize(allResourcesMap));
+ Map<String, Double> usedResourcesMap = new HashMap<>();
+ usedResourcesMap.put(Constants.COMMON_CPU_RESOURCE_NAME, 1.0);
+ usedResourcesMap.put(gpuResourceName, 1.0);
+ NormalizedResources usedResources = new NormalizedResources(normalize(usedResourcesMap));
+
+ double avg = resources.calculateAveragePercentageUsedBy(usedResources, 4, 1);
+
+ assertThat(avg, is((50.0 + 25.0 + 10.0)/3));
+ }
+
+ @Test
+ public void testCalculateAvgWithUnusedResource() {
+ Map<String, Double> allResourcesMap = new HashMap<>();
+ allResourcesMap.put(Constants.COMMON_CPU_RESOURCE_NAME, 2.0);
+ allResourcesMap.put(gpuResourceName, 10.0);
+ NormalizedResources resources = new NormalizedResources(normalize(allResourcesMap));
+ Map<String, Double> usedResourcesMap = new HashMap<>();
+ usedResourcesMap.put(Constants.COMMON_CPU_RESOURCE_NAME, 1.0);
+ NormalizedResources usedResources = new NormalizedResources(normalize(usedResourcesMap));
+
+ double avg = resources.calculateAveragePercentageUsedBy(usedResources, 4, 1);
+
+ //The resource that is not used should count as if it is being used 0%
+ assertThat(avg, is((50.0 + 25.0)/3));
+ }
+
+ @Test
+ public void testCalculateAvgThrowsIfTotalIsMissingCpu() {
+ NormalizedResources resources = new NormalizedResources(normalize(Collections.singletonMap(Constants.COMMON_CPU_RESOURCE_NAME, 2)));
+ NormalizedResources usedResources = new NormalizedResources(normalize(Collections.singletonMap(Constants.COMMON_CPU_RESOURCE_NAME, 5)));
+
+ expectedException.expect(IllegalArgumentException.class);
+ resources.calculateAveragePercentageUsedBy(usedResources, 0, 0);
+ }
+
+ @Test
+ public void testCalculateAvgThrowsIfTotalIsMissingMemory() {
+ NormalizedResources resources = new NormalizedResources(normalize(Collections.singletonMap(Constants.COMMON_CPU_RESOURCE_NAME, 2)));
+ NormalizedResources usedResources = new NormalizedResources(normalize(Collections.singletonMap(Constants.COMMON_CPU_RESOURCE_NAME, 1)));
+
+ expectedException.expect(IllegalArgumentException.class);
+ resources.calculateAveragePercentageUsedBy(usedResources, 100, 500);
+ }
+
+ @Test
+ public void testCalculateAvgWithResourceMissingFromTotal() {
+ Map<String, Double> allResourcesMap = new HashMap<>();
+ allResourcesMap.put(Constants.COMMON_CPU_RESOURCE_NAME, 2.0);
+ NormalizedResources resources = new NormalizedResources(normalize(allResourcesMap));
+ Map<String, Double> usedResourcesMap = new HashMap<>();
+ usedResourcesMap.put(Constants.COMMON_CPU_RESOURCE_NAME, 1.0);
+ usedResourcesMap.put(gpuResourceName, 1.0);
+ NormalizedResources usedResources = new NormalizedResources(normalize(usedResourcesMap));
+
+ expectedException.expect(IllegalArgumentException.class);
+ resources.calculateAveragePercentageUsedBy(usedResources, 4, 1);
+ }
+
+ @Test
+ public void testCalculateAvgWithTooLittleResourceInTotal() {
+ Map<String, Double> allResourcesMap = new HashMap<>();
+ allResourcesMap.put(Constants.COMMON_CPU_RESOURCE_NAME, 2.0);
+ allResourcesMap.put(gpuResourceName, 1.0);
+ NormalizedResources resources = new NormalizedResources(normalize(allResourcesMap));
+ Map<String, Double> usedResourcesMap = new HashMap<>();
+ usedResourcesMap.put(Constants.COMMON_CPU_RESOURCE_NAME, 1.0);
+ usedResourcesMap.put(gpuResourceName, 5.0);
+ NormalizedResources usedResources = new NormalizedResources(normalize(usedResourcesMap));
+
+ expectedException.expect(IllegalArgumentException.class);
+ resources.calculateAveragePercentageUsedBy(usedResources, 4, 1);
+ }
+
+ @Test
+ public void testCalculateMinUsageWithNoResourcesInTotal() {
+ NormalizedResources resources = new NormalizedResources(normalize(Collections.emptyMap()));
+ NormalizedResources usedResources = new NormalizedResources(normalize(Collections.emptyMap()));
+
+ double min = resources.calculateMinPercentageUsedBy(usedResources, 0, 0);
+
+ assertThat(min, is(100.0));
+ }
+
+ @Test
+ public void testCalculateMinWithOnlyCpu() {
+ NormalizedResources resources = new NormalizedResources(normalize(Collections.singletonMap(Constants.COMMON_CPU_RESOURCE_NAME, 2)));
+ NormalizedResources usedResources = new NormalizedResources(normalize(Collections.singletonMap(Constants.COMMON_CPU_RESOURCE_NAME, 1)));
+
+ double min = resources.calculateMinPercentageUsedBy(usedResources, 0, 0);
+
+ assertThat(min, is(50.0));
+ }
+
+ @Test
+ public void testCalculateMinWithCpuAndMem() {
+ NormalizedResources resources = new NormalizedResources(normalize(Collections.singletonMap(Constants.COMMON_CPU_RESOURCE_NAME, 2)));
+ NormalizedResources usedResources = new NormalizedResources(normalize(Collections.singletonMap(Constants.COMMON_CPU_RESOURCE_NAME, 1)));
+
+ double min = resources.calculateMinPercentageUsedBy(usedResources, 4, 1);
+
+ assertThat(min, is(25.0));
+ }
+
+ @Test
+ public void testCalculateMinWithCpuMemAndGenericResource() {
+ Map<String, Double> allResourcesMap = new HashMap<>();
+ allResourcesMap.put(Constants.COMMON_CPU_RESOURCE_NAME, 2.0);
+ allResourcesMap.put(gpuResourceName, 10.0);
+ NormalizedResources resources = new NormalizedResources(normalize(allResourcesMap));
+ Map<String, Double> usedResourcesMap = new HashMap<>();
+ usedResourcesMap.put(Constants.COMMON_CPU_RESOURCE_NAME, 1.0);
+ usedResourcesMap.put(gpuResourceName, 1.0);
+ NormalizedResources usedResources = new NormalizedResources(normalize(usedResourcesMap));
+
+ double min = resources.calculateMinPercentageUsedBy(usedResources, 4, 1);
+
+ assertThat(min, is(10.0));
+ }
+
+ @Test
+ public void testCalculateMinWithUnusedResource() {
+ Map<String, Double> allResourcesMap = new HashMap<>();
+ allResourcesMap.put(Constants.COMMON_CPU_RESOURCE_NAME, 2.0);
+ allResourcesMap.put(gpuResourceName, 10.0);
+ NormalizedResources resources = new NormalizedResources(normalize(allResourcesMap));
+ Map<String, Double> usedResourcesMap = new HashMap<>();
+ usedResourcesMap.put(Constants.COMMON_CPU_RESOURCE_NAME, 1.0);
+ NormalizedResources usedResources = new NormalizedResources(normalize(usedResourcesMap));
+
+ double min = resources.calculateMinPercentageUsedBy(usedResources, 4, 1);
+
+ //The resource that is not used should count as if it is being used 0%
+ assertThat(min, is(0.0));
+ }
+
+ @Test
+ public void testCalculateMinThrowsIfTotalIsMissingCpu() {
+ NormalizedResources resources = new NormalizedResources(normalize(Collections.singletonMap(Constants.COMMON_CPU_RESOURCE_NAME, 2)));
+ NormalizedResources usedResources = new NormalizedResources(normalize(Collections.singletonMap(Constants.COMMON_CPU_RESOURCE_NAME, 5)));
+
+ expectedException.expect(IllegalArgumentException.class);
+ resources.calculateMinPercentageUsedBy(usedResources, 0, 0);
+ }
+
+ @Test
+ public void testCalculateMinThrowsIfTotalIsMissingMemory() {
+ NormalizedResources resources = new NormalizedResources(normalize(Collections.singletonMap(Constants.COMMON_CPU_RESOURCE_NAME, 2)));
+ NormalizedResources usedResources = new NormalizedResources(normalize(Collections.singletonMap(Constants.COMMON_CPU_RESOURCE_NAME, 1)));
+
+ expectedException.expect(IllegalArgumentException.class);
+ resources.calculateMinPercentageUsedBy(usedResources, 100, 500);
+ }
+
+ @Test
+ public void testCalculateMinWithResourceMissingFromTotal() {
+ Map<String, Double> allResourcesMap = new HashMap<>();
+ allResourcesMap.put(Constants.COMMON_CPU_RESOURCE_NAME, 2.0);
+ NormalizedResources resources = new NormalizedResources(normalize(allResourcesMap));
+ Map<String, Double> usedResourcesMap = new HashMap<>();
+ usedResourcesMap.put(Constants.COMMON_CPU_RESOURCE_NAME, 1.0);
+ usedResourcesMap.put(gpuResourceName, 1.0);
+ NormalizedResources usedResources = new NormalizedResources(normalize(usedResourcesMap));
+
+ expectedException.expect(IllegalArgumentException.class);
+ resources.calculateMinPercentageUsedBy(usedResources, 4, 1);
+ }
+
+ @Test
+ public void testCalculateMinWithTooLittleResourceInTotal() {
+ Map<String, Double> allResourcesMap = new HashMap<>();
+ allResourcesMap.put(Constants.COMMON_CPU_RESOURCE_NAME, 2.0);
+ allResourcesMap.put(gpuResourceName, 1.0);
+ NormalizedResources resources = new NormalizedResources(normalize(allResourcesMap));
+ Map<String, Double> usedResourcesMap = new HashMap<>();
+ usedResourcesMap.put(Constants.COMMON_CPU_RESOURCE_NAME, 1.0);
+ usedResourcesMap.put(gpuResourceName, 5.0);
+ NormalizedResources usedResources = new NormalizedResources(normalize(usedResourcesMap));
+
+ expectedException.expect(IllegalArgumentException.class);
+ resources.calculateMinPercentageUsedBy(usedResources, 4, 1);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/test/java/org/apache/storm/scheduler/resource/normalization/ResourceMapArrayBridgeTest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/normalization/ResourceMapArrayBridgeTest.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/normalization/ResourceMapArrayBridgeTest.java
new file mode 100644
index 0000000..f6dedb3
--- /dev/null
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/normalization/ResourceMapArrayBridgeTest.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource.normalization;
+
+import org.apache.storm.scheduler.resource.ResourceMapArrayBridge;
+import org.apache.storm.scheduler.resource.NormalizedResources;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.junit.Test;
+
+public class ResourceMapArrayBridgeTest {
+
+ private final String gpuResourceName = "gpu";
+ private final String disksResourceName = "disks";
+
+ private Map<String, Double> normalize(Map<String, ? extends Number> resources) {
+ return NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(resources);
+ }
+
+ @Test
+ public void testCanTranslateBackAndForthBetweenMapAndArrayConsistently() {
+ ResourceMapArrayBridge bridge = new ResourceMapArrayBridge();
+
+ Map<String, Double> allResources = new HashMap<>();
+ allResources.put(gpuResourceName, 2.0);
+ allResources.put(disksResourceName, 64.0);
+ Map<String, Double> normalizedResources = normalize(allResources);
+ double[] resources = bridge.translateToResourceArray(normalizedResources);
+
+ Map<String, Integer> resourceNamesToArrayIndex = bridge.getResourceNamesToArrayIndex();
+ assertThat(resourceNamesToArrayIndex.size(), is(2));
+ int gpuIndex = resourceNamesToArrayIndex.get(gpuResourceName);
+ int disksIndex = resourceNamesToArrayIndex.get(disksResourceName);
+
+ assertThat(resources.length, is(2));
+ assertThat(resources[gpuIndex], is(2.0));
+ assertThat(resources[disksIndex], is(64.0));
+
+ Map<String, Double> roundTrippedResources = bridge.translateFromResourceArray(resources);
+ assertThat(roundTrippedResources, is(normalizedResources));
+
+ double[] roundTrippedResourceArray = bridge.translateToResourceArray(roundTrippedResources);
+ assertThat(roundTrippedResourceArray, equalTo(resources));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/NormalizedResourcesRule.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/NormalizedResourcesRule.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/NormalizedResourcesRule.java
deleted file mode 100644
index 26bc14d..0000000
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/NormalizedResourcesRule.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.scheduler.resource.strategies.scheduling;
-
-import org.apache.storm.scheduler.resource.NormalizedResources;
-import org.junit.rules.TestRule;
-import org.junit.runner.Description;
-import org.junit.runners.model.Statement;
-
-public class NormalizedResourcesRule implements TestRule {
-
- public static class NRStatement extends Statement {
- private final Statement base;
-
- public NRStatement(Statement base) {
- this.base = base;
- }
-
- @Override
- public void evaluate() throws Throwable {
- NormalizedResources.resetResourceNames();
- try {
- base.evaluate();
- } finally {
- NormalizedResources.resetResourceNames();
- }
- }
- }
-
- @Override
- public Statement apply(Statement statement, Description description) {
- return new NRStatement(statement);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
index 6a3fcbb..f82d0f7 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
@@ -18,6 +18,7 @@
package org.apache.storm.scheduler.resource.strategies.scheduling;
+import org.apache.storm.scheduler.resource.normalization.NormalizedResourcesRule;
import java.util.Collections;
import org.apache.storm.Config;
import org.apache.storm.generated.StormTopology;
[4/6] storm git commit: Move some resource normalization classes to a
new package since the resource package was getting crowded
Posted by bo...@apache.org.
http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/ResourceNameNormalizer.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/ResourceNameNormalizer.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/ResourceNameNormalizer.java
new file mode 100644
index 0000000..fc9182d
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/ResourceNameNormalizer.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource.normalization;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.storm.Config;
+import org.apache.storm.Constants;
+
+/**
+ * Provides resource name normalization for resource maps.
+ */
+public class ResourceNameNormalizer {
+
+ private final Map<String, String> resourceNameMapping;
+
+ /**
+ * Creates a new resource name normalizer.
+ */
+ public ResourceNameNormalizer() {
+ Map<String, String> tmp = new HashMap<>();
+ tmp.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, Constants.COMMON_CPU_RESOURCE_NAME);
+ tmp.put(Config.SUPERVISOR_CPU_CAPACITY, Constants.COMMON_CPU_RESOURCE_NAME);
+ tmp.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME);
+ tmp.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME);
+ tmp.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME);
+ resourceNameMapping = Collections.unmodifiableMap(tmp);
+ }
+
+ /**
+ * Normalizes a supervisor resource map or topology details map's keys to universal resource names.
+ *
+ * @param resourceMap resource map of either Supervisor or Topology
+ * @return the resource map with common resource names
+ */
+ public Map<String, Double> normalizedResourceMap(Map<String, ? extends Number> resourceMap) {
+ if (resourceMap == null) {
+ return new HashMap<>();
+ }
+ return new HashMap<>(resourceMap.entrySet().stream()
+ .collect(Collectors.toMap(
+ //Map the key if needed
+ (e) -> resourceNameMapping.getOrDefault(e.getKey(), e.getKey()),
+ //Map the value
+ (e) -> e.getValue().doubleValue())));
+ }
+
+ public Map<String, String> getResourceNameMapping() {
+ return resourceNameMapping;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
index 73e1aa0..02d06a9 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
@@ -38,7 +38,7 @@ import org.apache.storm.scheduler.TopologyDetails;
import org.apache.storm.scheduler.WorkerSlot;
import org.apache.storm.scheduler.resource.RAS_Node;
import org.apache.storm.scheduler.resource.RAS_Nodes;
-import org.apache.storm.scheduler.resource.NormalizedResourceOffer;
+import org.apache.storm.scheduler.resource.normalization.NormalizedResourceOffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java b/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java
index 6f33251..cd16259 100644
--- a/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java
+++ b/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java
@@ -72,7 +72,7 @@ import org.apache.storm.generated.SettableBlobMeta;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.nimbus.NimbusInfo;
import org.apache.storm.scheduler.resource.ResourceUtils;
-import org.apache.storm.scheduler.resource.NormalizedResourceRequest;
+import org.apache.storm.scheduler.resource.normalization.NormalizedResourceRequest;
import org.apache.storm.security.auth.SingleUserPrincipal;
import org.apache.thrift.TException;
import org.slf4j.Logger;
http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
index 360db3a..85fb569 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
@@ -18,6 +18,7 @@
package org.apache.storm.scheduler.resource;
+import org.apache.storm.scheduler.resource.normalization.NormalizedResources;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
index 3d37ad9..f534f2e 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
@@ -18,6 +18,7 @@
package org.apache.storm.scheduler.resource;
+import org.apache.storm.scheduler.resource.normalization.NormalizedResources;
import org.apache.storm.Config;
import org.apache.storm.DaemonConfig;
import org.apache.storm.generated.Bolt;
http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/test/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourcesRule.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourcesRule.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourcesRule.java
index 1f78f53..b6952c6 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourcesRule.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourcesRule.java
@@ -18,7 +18,6 @@
package org.apache.storm.scheduler.resource.normalization;
-import org.apache.storm.scheduler.resource.NormalizedResources;
import org.junit.rules.TestRule;
import org.junit.runner.Description;
import org.junit.runners.model.Statement;
http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/test/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourcesTest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourcesTest.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourcesTest.java
index d25e76c..591207d 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourcesTest.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourcesTest.java
@@ -16,7 +16,6 @@
package org.apache.storm.scheduler.resource.normalization;
-import org.apache.storm.scheduler.resource.NormalizedResources;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/test/java/org/apache/storm/scheduler/resource/normalization/ResourceMapArrayBridgeTest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/normalization/ResourceMapArrayBridgeTest.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/normalization/ResourceMapArrayBridgeTest.java
index f6dedb3..1476d89 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/normalization/ResourceMapArrayBridgeTest.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/normalization/ResourceMapArrayBridgeTest.java
@@ -16,8 +16,6 @@
package org.apache.storm.scheduler.resource.normalization;
-import org.apache.storm.scheduler.resource.ResourceMapArrayBridge;
-import org.apache.storm.scheduler.resource.NormalizedResources;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;