You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2018/01/08 21:20:58 UTC
[5/6] storm git commit: Move some resource normalization classes to a
new package since the resource package was getting crowded
Move some resource normalization classes to a new package since the resource package was getting crowded
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/88533346
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/88533346
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/88533346
Branch: refs/heads/master
Commit: 88533346dbfb318d8bf623703f6a16a1eab19534
Parents: 41db23f
Author: Stig Rohde Døssing <sr...@apache.org>
Authored: Sun Jan 7 18:12:55 2018 +0100
Committer: Stig Rohde Døssing <sr...@apache.org>
Committed: Sun Jan 7 18:43:11 2018 +0100
----------------------------------------------------------------------
storm-server/pom.xml | 2 +-
.../org/apache/storm/daemon/nimbus/Nimbus.java | 2 +-
.../supervisor/timer/SupervisorHeartbeat.java | 2 +-
.../org/apache/storm/scheduler/Cluster.java | 6 +-
.../storm/scheduler/ISchedulingState.java | 5 +-
.../storm/scheduler/SupervisorDetails.java | 2 +-
.../apache/storm/scheduler/TopologyDetails.java | 2 +-
.../resource/NormalizedResourceOffer.java | 114 ------
.../resource/NormalizedResourceRequest.java | 190 ----------
.../scheduler/resource/NormalizedResources.java | 339 ------------------
.../resource/NormalizedResourcesWithMemory.java | 28 --
.../storm/scheduler/resource/RAS_Node.java | 1 +
.../resource/ResourceMapArrayBridge.java | 89 -----
.../resource/ResourceNameNormalizer.java | 65 ----
.../storm/scheduler/resource/ResourceUtils.java | 2 +
.../normalization/NormalizedResourceOffer.java | 114 ++++++
.../NormalizedResourceRequest.java | 190 ++++++++++
.../normalization/NormalizedResources.java | 343 +++++++++++++++++++
.../NormalizedResourcesWithMemory.java | 28 ++
.../normalization/ResourceMapArrayBridge.java | 89 +++++
.../normalization/ResourceNameNormalizer.java | 68 ++++
.../scheduling/BaseResourceAwareStrategy.java | 2 +-
.../org/apache/storm/utils/ServerUtils.java | 2 +-
.../resource/TestResourceAwareScheduler.java | 1 +
.../TestUtilsForResourceAwareScheduler.java | 1 +
.../normalization/NormalizedResourcesRule.java | 1 -
.../normalization/NormalizedResourcesTest.java | 1 -
.../ResourceMapArrayBridgeTest.java | 2 -
28 files changed, 849 insertions(+), 842 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/pom.xml
----------------------------------------------------------------------
diff --git a/storm-server/pom.xml b/storm-server/pom.xml
index ff917ca..7c301a4 100644
--- a/storm-server/pom.xml
+++ b/storm-server/pom.xml
@@ -130,7 +130,7 @@
<artifactId>maven-checkstyle-plugin</artifactId>
<!--Note - the version would be inherited-->
<configuration>
- <maxAllowedViolations>2620</maxAllowedViolations>
+ <maxAllowedViolations>2617</maxAllowedViolations>
</configuration>
</plugin>
<plugin>
http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
index 2c785a5..e66dbe5 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -150,7 +150,7 @@ import org.apache.storm.scheduler.blacklist.BlacklistScheduler;
import org.apache.storm.scheduler.multitenant.MultitenantScheduler;
import org.apache.storm.scheduler.resource.ResourceAwareScheduler;
import org.apache.storm.scheduler.resource.ResourceUtils;
-import org.apache.storm.scheduler.resource.NormalizedResourceRequest;
+import org.apache.storm.scheduler.resource.normalization.NormalizedResourceRequest;
import org.apache.storm.security.INimbusCredentialPlugin;
import org.apache.storm.security.auth.AuthUtils;
import org.apache.storm.security.auth.IAuthorizer;
http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
index 3faf1e4..2be241a 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
@@ -27,7 +27,7 @@ import org.apache.storm.DaemonConfig;
import org.apache.storm.cluster.IStormClusterState;
import org.apache.storm.daemon.supervisor.Supervisor;
import org.apache.storm.generated.SupervisorInfo;
-import org.apache.storm.scheduler.resource.NormalizedResources;
+import org.apache.storm.scheduler.resource.normalization.NormalizedResources;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.Time;
http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
index 1ed88c7..625fe6f 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
@@ -35,9 +35,9 @@ import org.apache.storm.generated.SharedMemory;
import org.apache.storm.generated.WorkerResources;
import org.apache.storm.networktopography.DNSToSwitchMapping;
import org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping;
-import org.apache.storm.scheduler.resource.NormalizedResourceOffer;
-import org.apache.storm.scheduler.resource.NormalizedResourceRequest;
-import org.apache.storm.scheduler.resource.NormalizedResources;
+import org.apache.storm.scheduler.resource.normalization.NormalizedResourceOffer;
+import org.apache.storm.scheduler.resource.normalization.NormalizedResourceRequest;
+import org.apache.storm.scheduler.resource.normalization.NormalizedResources;
import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.ReflectionUtils;
http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java b/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java
index 7175b7e..187a03c 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java
@@ -22,11 +22,10 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
-
import org.apache.storm.daemon.nimbus.TopologyResources;
import org.apache.storm.generated.WorkerResources;
-import org.apache.storm.scheduler.resource.NormalizedResourceOffer;
-import org.apache.storm.scheduler.resource.NormalizedResourceRequest;
+import org.apache.storm.scheduler.resource.normalization.NormalizedResourceOffer;
+import org.apache.storm.scheduler.resource.normalization.NormalizedResourceRequest;
/** An interface that provides access to the current scheduling state. */
public interface ISchedulingState {
http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/main/java/org/apache/storm/scheduler/SupervisorDetails.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/SupervisorDetails.java b/storm-server/src/main/java/org/apache/storm/scheduler/SupervisorDetails.java
index 3d08715..242b54c 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/SupervisorDetails.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/SupervisorDetails.java
@@ -23,7 +23,7 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.storm.Constants;
-import org.apache.storm.scheduler.resource.NormalizedResourceOffer;
+import org.apache.storm.scheduler.resource.normalization.NormalizedResourceOffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java b/storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java
index 8e4c1d5..41e7edf 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java
@@ -35,7 +35,7 @@ import org.apache.storm.generated.GlobalStreamId;
import org.apache.storm.generated.SharedMemory;
import org.apache.storm.generated.SpoutSpec;
import org.apache.storm.generated.StormTopology;
-import org.apache.storm.scheduler.resource.NormalizedResourceRequest;
+import org.apache.storm.scheduler.resource.normalization.NormalizedResourceRequest;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.Time;
import org.apache.storm.utils.Utils;
http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceOffer.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceOffer.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceOffer.java
deleted file mode 100644
index 787bbce..0000000
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceOffer.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.scheduler.resource;
-
-import java.util.Map;
-import org.apache.storm.Constants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * An offer of resources with normalized resource names.
- */
-public class NormalizedResourceOffer implements NormalizedResourcesWithMemory {
-
- private static final Logger LOG = LoggerFactory.getLogger(NormalizedResourceOffer.class);
- private final NormalizedResources normalizedResources;
- private double totalMemoryMb;
-
- /**
- * Create a new normalized resource offer.
- *
- * @param resources the resources to be normalized.
- */
- public NormalizedResourceOffer(Map<String, ? extends Number> resources) {
- Map<String, Double> normalizedResourceMap = NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(resources);
- totalMemoryMb = normalizedResourceMap.getOrDefault(Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME, 0.0);
- this.normalizedResources = new NormalizedResources(normalizedResourceMap);
- }
-
- public NormalizedResourceOffer() {
- this((Map<String, ? extends Number>) null);
- }
-
- public NormalizedResourceOffer(NormalizedResourceOffer other) {
- this.totalMemoryMb = other.totalMemoryMb;
- this.normalizedResources = new NormalizedResources(other.normalizedResources);
- }
-
- @Override
- public double getTotalMemoryMb() {
- return totalMemoryMb;
- }
-
- public Map<String, Double> toNormalizedMap() {
- Map<String, Double> ret = normalizedResources.toNormalizedMap();
- ret.put(Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME, totalMemoryMb);
- return ret;
- }
-
- public void add(NormalizedResourcesWithMemory other) {
- normalizedResources.add(other.getNormalizedResources());
- totalMemoryMb += other.getTotalMemoryMb();
- }
-
- public void remove(NormalizedResourcesWithMemory other) {
- normalizedResources.remove(other.getNormalizedResources());
- totalMemoryMb -= other.getTotalMemoryMb();
- if (totalMemoryMb < 0.0) {
- normalizedResources.throwBecauseResourceBecameNegative(
- Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME, totalMemoryMb, other.getTotalMemoryMb());
- };
- }
-
- /**
- * @see NormalizedResources#calculateAveragePercentageUsedBy(org.apache.storm.scheduler.resource.normalization.NormalizedResources,
- * double, double).
- */
- public double calculateAveragePercentageUsedBy(NormalizedResourceOffer used) {
- return normalizedResources.calculateAveragePercentageUsedBy(
- used.getNormalizedResources(), getTotalMemoryMb(), used.getTotalMemoryMb());
- }
-
- /**
- * @see NormalizedResources#calculateMinPercentageUsedBy(org.apache.storm.scheduler.resource.normalization.NormalizedResources, double,
- * double)
- */
- public double calculateMinPercentageUsedBy(NormalizedResourceOffer used) {
- return normalizedResources.calculateMinPercentageUsedBy(used.getNormalizedResources(), getTotalMemoryMb(), used.getTotalMemoryMb());
- }
-
- /**
- * @see NormalizedResources#couldHoldIgnoringSharedMemory(org.apache.storm.scheduler.resource.normalization.NormalizedResources, double,
- * double).
- */
- public boolean couldHoldIgnoringSharedMemory(NormalizedResourcesWithMemory other) {
- return normalizedResources.couldHoldIgnoringSharedMemory(
- other.getNormalizedResources(), getTotalMemoryMb(), other.getTotalMemoryMb());
- }
-
- public double getTotalCpu() {
- return normalizedResources.getTotalCpu();
- }
-
- @Override
- public NormalizedResources getNormalizedResources() {
- return normalizedResources;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceRequest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceRequest.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceRequest.java
deleted file mode 100644
index 2af90ab..0000000
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceRequest.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.scheduler.resource;
-
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.storm.Config;
-import org.apache.storm.Constants;
-import org.apache.storm.generated.ComponentCommon;
-import org.apache.storm.generated.WorkerResources;
-import org.apache.storm.utils.ObjectReader;
-import org.json.simple.JSONObject;
-import org.json.simple.parser.JSONParser;
-import org.json.simple.parser.ParseException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A resource request with normalized resource names.
- */
-public class NormalizedResourceRequest implements NormalizedResourcesWithMemory {
-
- private static final Logger LOG = LoggerFactory.getLogger(NormalizedResourceRequest.class);
-
- private static void putIfMissing(Map<String, Double> dest, String destKey, Map<String, Object> src, String srcKey) {
- if (!dest.containsKey(destKey)) {
- Number value = (Number) src.get(srcKey);
- if (value != null) {
- dest.put(destKey, value.doubleValue());
- }
- }
- }
-
- private static Map<String, Double> getDefaultResources(Map<String, Object> topoConf) {
- Map<String, Double> ret = NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap((Map<String, Number>) topoConf.getOrDefault(
- Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, new HashMap<>()));
- putIfMissing(ret, Constants.COMMON_CPU_RESOURCE_NAME, topoConf, Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);
- putIfMissing(ret, Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, topoConf, Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB);
- putIfMissing(ret, Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, topoConf, Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
- return ret;
- }
-
- private static Map<String, Double> parseResources(String input) {
- Map<String, Double> topologyResources = new HashMap<>();
- JSONParser parser = new JSONParser();
- LOG.debug("Input to parseResources {}", input);
- try {
- if (input != null) {
- Object obj = parser.parse(input);
- JSONObject jsonObject = (JSONObject) obj;
-
- // Legacy resource parsing
- if (jsonObject.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB)) {
- Double topoMemOnHeap = ObjectReader
- .getDouble(jsonObject.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB), null);
- topologyResources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, topoMemOnHeap);
- }
- if (jsonObject.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB)) {
- Double topoMemOffHeap = ObjectReader
- .getDouble(jsonObject.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB), null);
- topologyResources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, topoMemOffHeap);
- }
- if (jsonObject.containsKey(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT)) {
- Double topoCpu = ObjectReader.getDouble(jsonObject.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT),
- null);
- topologyResources.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, topoCpu);
- }
-
- // If resource is also present in resources map will overwrite the above
- if (jsonObject.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP)) {
- Map<String, Number> rawResourcesMap =
- (Map<String, Number>) jsonObject.computeIfAbsent(
- Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, (k) -> new HashMap<>());
-
- for (Map.Entry<String, Number> stringNumberEntry : rawResourcesMap.entrySet()) {
- topologyResources.put(
- stringNumberEntry.getKey(), stringNumberEntry.getValue().doubleValue());
- }
-
- }
- }
- } catch (ParseException e) {
- LOG.error("Failed to parse component resources is:" + e.toString(), e);
- return null;
- }
- return topologyResources;
- }
-
- private final NormalizedResources normalizedResources;
- private double onHeap;
- private double offHeap;
-
- private NormalizedResourceRequest(Map<String, ? extends Number> resources,
- Map<String, Double> defaultResources) {
- Map<String, Double> normalizedResourceMap = NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(defaultResources);
- normalizedResourceMap.putAll(NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(resources));
- onHeap = normalizedResourceMap.getOrDefault(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, 0.0);
- offHeap = normalizedResourceMap.getOrDefault(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, 0.0);
- normalizedResources = new NormalizedResources(normalizedResourceMap);
- }
-
- public NormalizedResourceRequest(ComponentCommon component, Map<String, Object> topoConf) {
- this(parseResources(component.get_json_conf()), getDefaultResources(topoConf));
- }
-
- public NormalizedResourceRequest(Map<String, Object> topoConf) {
- this((Map<String, ? extends Number>) null, getDefaultResources(topoConf));
- }
-
- public NormalizedResourceRequest() {
- this((Map<String, ? extends Number>) null, null);
- }
-
- public Map<String, Double> toNormalizedMap() {
- Map<String, Double> ret = this.normalizedResources.toNormalizedMap();
- ret.put(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, offHeap);
- ret.put(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, onHeap);
- return ret;
- }
-
- public double getOnHeapMemoryMb() {
- return onHeap;
- }
-
- public void addOnHeap(final double onHeap) {
- this.onHeap += onHeap;
- }
-
- public double getOffHeapMemoryMb() {
- return offHeap;
- }
-
- public void addOffHeap(final double offHeap) {
- this.offHeap += offHeap;
- }
-
- /**
- * Add the resources in other to this.
- *
- * @param other the other Request to add to this.
- */
- public void add(NormalizedResourceRequest other) {
- this.normalizedResources.add(other.normalizedResources);
- onHeap += other.onHeap;
- offHeap += other.offHeap;
- }
-
- public void add(WorkerResources value) {
- this.normalizedResources.add(value);
- //The resources are already normalized
- Map<String, Double> resources = value.get_resources();
- onHeap += resources.getOrDefault(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, 0.0);
- offHeap += resources.getOrDefault(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, 0.0);
- }
-
- @Override
- public double getTotalMemoryMb() {
- return onHeap + offHeap;
- }
-
- @Override
- public String toString() {
- return super.toString() + " onHeap: " + onHeap + " offHeap: " + offHeap;
- }
-
- public double getTotalCpu() {
- return this.normalizedResources.getTotalCpu();
- }
-
- @Override
- public NormalizedResources getNormalizedResources() {
- return this.normalizedResources;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResources.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResources.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResources.java
deleted file mode 100644
index f8e911a..0000000
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResources.java
+++ /dev/null
@@ -1,339 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.scheduler.resource;
-
-import com.google.common.annotations.VisibleForTesting;
-import java.util.Arrays;
-import java.util.Map;
-import org.apache.commons.lang.Validate;
-import org.apache.storm.Constants;
-import org.apache.storm.generated.WorkerResources;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Resources that have been normalized. This class is intended as a delegate for more specific types of normalized resource set, since it
- * does not keep track of memory as a resource.
- */
-public class NormalizedResources {
-
- private static final Logger LOG = LoggerFactory.getLogger(NormalizedResources.class);
-
- public static ResourceNameNormalizer RESOURCE_NAME_NORMALIZER;
- private static ResourceMapArrayBridge RESOURCE_MAP_ARRAY_BRIDGE;
-
- static {
- resetResourceNames();
- }
-
- private double cpu;
- private double[] otherResources;
-
- /**
- * This is for testing only. It allows a test to reset the static state relating to resource names. We reset the mapping because some
- * algorithms sadly have different behavior if a resource exists or not.
- */
- @VisibleForTesting
- public static void resetResourceNames() {
- RESOURCE_NAME_NORMALIZER = new ResourceNameNormalizer();
- RESOURCE_MAP_ARRAY_BRIDGE = new ResourceMapArrayBridge();
- }
-
- /**
- * Copy constructor.
- */
- public NormalizedResources(NormalizedResources other) {
- cpu = other.cpu;
- otherResources = Arrays.copyOf(other.otherResources, other.otherResources.length);
- }
-
- /**
- * Create a new normalized set of resources. Note that memory is not managed by this class, as it is not consistent in requests vs
- * offers because of how on heap vs off heap is used.
- *
- * @param normalizedResources the normalized resource map
- * @param getTotalMemoryMb Supplier of total memory in MB.
- */
- public NormalizedResources(Map<String, Double> normalizedResources) {
- cpu = normalizedResources.getOrDefault(Constants.COMMON_CPU_RESOURCE_NAME, 0.0);
- otherResources = RESOURCE_MAP_ARRAY_BRIDGE.translateToResourceArray(normalizedResources);
- }
-
- /**
- * Get the total amount of cpu.
- *
- * @return the amount of cpu.
- */
- public double getTotalCpu() {
- return cpu;
- }
-
- private void zeroPadOtherResourcesIfNecessary(int requiredLength) {
- if (requiredLength > otherResources.length) {
- double[] newResources = new double[requiredLength];
- System.arraycopy(otherResources, 0, newResources, 0, otherResources.length);
- otherResources = newResources;
- }
- }
-
- private void add(double[] resourceArray) {
- int otherLength = resourceArray.length;
- zeroPadOtherResourcesIfNecessary(otherLength);
- for (int i = 0; i < otherLength; i++) {
- otherResources[i] += resourceArray[i];
- }
- }
-
- public void add(NormalizedResources other) {
- this.cpu += other.cpu;
- add(other.otherResources);
- }
-
- /**
- * Add the resources from a worker to this.
- *
- * @param value the worker resources that should be added to this.
- */
- public void add(WorkerResources value) {
- Map<String, Double> workerNormalizedResources = value.get_resources();
- cpu += workerNormalizedResources.getOrDefault(Constants.COMMON_CPU_RESOURCE_NAME, 0.0);
- add(RESOURCE_MAP_ARRAY_BRIDGE.translateToResourceArray(workerNormalizedResources));
- }
-
- public void throwBecauseResourceBecameNegative(String resourceName, double currentValue, double subtractedValue) {
- throw new IllegalArgumentException(String.format("Resource amounts should never be negative."
- + " Resource '%s' with current value '%f' became negative because '%f' was removed.",
- resourceName, currentValue, subtractedValue));
- }
-
- /**
- * Remove the other resources from this. This is the same as subtracting the resources in other from this.
- *
- * @param other the resources we want removed.
- * @throws IllegalArgumentException if subtracting other from this would result in any resource amount becoming negative.
- */
- public void remove(NormalizedResources other) {
- this.cpu -= other.cpu;
- if (cpu < 0.0) {
- throwBecauseResourceBecameNegative(Constants.COMMON_CPU_RESOURCE_NAME, cpu, other.cpu);
- }
- int otherLength = other.otherResources.length;
- zeroPadOtherResourcesIfNecessary(otherLength);
- for (int i = 0; i < otherLength; i++) {
- otherResources[i] -= other.otherResources[i];
- if (otherResources[i] < 0.0) {
- throwBecauseResourceBecameNegative(getResourceNameForResourceIndex(i), otherResources[i], other.otherResources[i]);
- }
- }
- }
-
- @Override
- public String toString() {
- return "Normalized resources: " + toNormalizedMap();
- }
-
- /**
- * Return a Map of the normalized resource name to a double. This should only be used when returning thrift resource requests to the end
- * user.
- */
- public Map<String, Double> toNormalizedMap() {
- Map<String, Double> ret = RESOURCE_MAP_ARRAY_BRIDGE.translateFromResourceArray(otherResources);
- ret.put(Constants.COMMON_CPU_RESOURCE_NAME, cpu);
- return ret;
- }
-
- private double getResourceAt(int index) {
- if (index >= otherResources.length) {
- return 0.0;
- }
- return otherResources[index];
- }
-
- /**
- * A simple sanity check to see if all of the resources in this would be large enough to hold the resources in other ignoring memory. It
- * does not check memory because with shared memory it is beyond the scope of this.
- *
- * @param other the resources that we want to check if they would fit in this.
- * @param thisTotalMemoryMb The total memory in MB of this
- * @param otherTotalMemoryMb The total memory in MB of other
- * @return true if it might fit, else false if it could not possibly fit.
- */
- public boolean couldHoldIgnoringSharedMemory(NormalizedResources other, double thisTotalMemoryMb, double otherTotalMemoryMb) {
- if (this.cpu < other.getTotalCpu()) {
- return false;
- }
- int length = Math.max(this.otherResources.length, other.otherResources.length);
- for (int i = 0; i < length; i++) {
- if (getResourceAt(i) < other.getResourceAt(i)) {
- return false;
- }
- }
-
- return thisTotalMemoryMb >= otherTotalMemoryMb;
- }
-
- private String getResourceNameForResourceIndex(int resourceIndex) {
- for (Map.Entry<String, Integer> entry : RESOURCE_MAP_ARRAY_BRIDGE.getResourceNamesToArrayIndex().entrySet()) {
- int index = entry.getValue();
- if (index == resourceIndex) {
- return entry.getKey();
- }
- }
- return null;
- }
-
- private void throwBecauseUsedIsNotSubsetOfTotal(NormalizedResources used, double totalMemoryMb, double usedMemoryMb) {
- throw new IllegalArgumentException(String.format("The used resources must be a subset of the total resources."
- + " Used: '%s', Total: '%s', Used Mem: '%f', Total Mem: '%f'",
- used.toNormalizedMap(), this.toNormalizedMap(), usedMemoryMb, totalMemoryMb));
- }
-
- /**
- * Calculate the average resource usage percentage with this being the total resources and used being the amounts used. Used must be a
- * subset of the total resources. If a resource in the total has a value of zero, it will be skipped in the calculation to avoid
- * division by 0. If all resources are skipped the result is defined to be 100.0.
- *
- * @param used the amount of resources used.
- * @param totalMemoryMb The total memory in MB
- * @param usedMemoryMb The used memory in MB
- * @return the average percentage used 0.0 to 100.0.
- * @throws IllegalArgumentException if any resource in used has a greater value than the same resource in the total, or used has generic
- * resources that are not present in the total.
- */
- public double calculateAveragePercentageUsedBy(NormalizedResources used, double totalMemoryMb, double usedMemoryMb) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Calculating avg percentage used by. Used Mem: {} Total Mem: {}"
- + " Used Normalized Resources: {} Total Normalized Resources: {}", totalMemoryMb, usedMemoryMb,
- toNormalizedMap(), used.toNormalizedMap());
- }
-
- int skippedResourceTypes = 0;
- double total = 0.0;
- if (usedMemoryMb > totalMemoryMb) {
- throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
- }
- if (totalMemoryMb != 0.0) {
- total += usedMemoryMb / totalMemoryMb;
- } else {
- skippedResourceTypes++;
- }
- double totalCpu = getTotalCpu();
- if (used.getTotalCpu() > getTotalCpu()) {
- throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
- }
- if (totalCpu != 0.0) {
- total += used.getTotalCpu() / getTotalCpu();
- } else {
- skippedResourceTypes++;
- }
-
- if (used.otherResources.length > otherResources.length) {
- throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
- }
-
- for (int i = 0; i < otherResources.length; i++) {
- double totalValue = otherResources[i];
- double usedValue;
- if (i >= used.otherResources.length) {
- //Resources missing from used are using none of that resource
- usedValue = 0.0;
- } else {
- usedValue = used.otherResources[i];
- }
- if (usedValue > totalValue) {
- throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
- }
- if (totalValue == 0.0) {
- //Skip any resources where the total is 0, the percent used for this resource isn't meaningful.
- //We fall back to prioritizing by cpu, memory and any other resources by ignoring this value
- skippedResourceTypes++;
- continue;
- }
-
- total += usedValue / totalValue;
- }
- //Adjust the divisor for the average to account for any skipped resources (those where the total was 0)
- int divisor = 2 + otherResources.length - skippedResourceTypes;
- if (divisor == 0) {
- /*
- * This is an arbitrary choice to make the result consistent with calculateMin. Any value would be valid here, becase there are
- * no (non-zero) resources in the total set of resources, so we're trying to average 0 values.
- */
- return 100.0;
- } else {
- return (total * 100.0) / divisor;
- }
- }
-
- /**
- * Calculate the minimum resource usage percentage with this being the total resources and used being the amounts used. Used must be a
- * subset of the total resources. If a resource in the total has a value of zero, it will be skipped in the calculation to avoid
- * division by 0. If all resources are skipped the result is defined to be 100.0.
- *
- * @param used the amount of resources used.
- * @param totalMemoryMb The total memory in MB
- * @param usedMemoryMb The used memory in MB
- * @return the minimum percentage used 0.0 to 100.0.
- * @throws IllegalArgumentException if any resource in used has a greater value than the same resource in the total, or used has generic
- * resources that are not present in the total.
- */
- public double calculateMinPercentageUsedBy(NormalizedResources used, double totalMemoryMb, double usedMemoryMb) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Calculating min percentage used by. Used Mem: {} Total Mem: {}"
- + " Used Normalized Resources: {} Total Normalized Resources: {}", totalMemoryMb, usedMemoryMb,
- toNormalizedMap(), used.toNormalizedMap());
- }
-
- double min = 1.0;
- if (usedMemoryMb > totalMemoryMb) {
- throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
- }
- if (totalMemoryMb != 0.0) {
- min = Math.min(min, usedMemoryMb / totalMemoryMb);
- }
- double totalCpu = getTotalCpu();
- if (used.getTotalCpu() > totalCpu) {
- throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
- }
- if (totalCpu != 0.0) {
- min = Math.min(min, used.getTotalCpu() / totalCpu);
- }
-
- if (used.otherResources.length > otherResources.length) {
- throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
- }
-
- for (int i = 0; i < otherResources.length; i++) {
- if (otherResources[i] == 0.0) {
- //Skip any resources where the total is 0, the percent used for this resource isn't meaningful.
- //We fall back to prioritizing by cpu, memory and any other resources by ignoring this value
- continue;
- }
- if (i >= used.otherResources.length) {
- //Resources missing from used are using none of that resource
- return 0;
- }
- if (used.otherResources[i] > otherResources[i]) {
- throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
- }
- min = Math.min(min, used.otherResources[i] / otherResources[i]);
- }
- return min * 100.0;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourcesWithMemory.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourcesWithMemory.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourcesWithMemory.java
deleted file mode 100644
index 640233a..0000000
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourcesWithMemory.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Copyright 2018 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.scheduler.resource;
-
-/**
- * Intended for {@link NormalizedResources} wrappers that handle memory.
- */
-public interface NormalizedResourcesWithMemory {
-
- NormalizedResources getNormalizedResources();
-
- double getTotalMemoryMb();
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
index db9ca73..84a95a7 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
@@ -30,6 +30,7 @@ import org.apache.storm.scheduler.ExecutorDetails;
import org.apache.storm.scheduler.SupervisorDetails;
import org.apache.storm.scheduler.TopologyDetails;
import org.apache.storm.scheduler.WorkerSlot;
+import org.apache.storm.scheduler.resource.normalization.NormalizedResourceOffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceMapArrayBridge.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceMapArrayBridge.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceMapArrayBridge.java
deleted file mode 100644
index cf4f80b..0000000
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceMapArrayBridge.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Copyright 2018 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.scheduler.resource;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.storm.Constants;
-
-/**
- * Provides translation between normalized resource maps and resource value arrays. Some operations use resource value arrays instead of the
- * full normalized resource map as an optimization. See {@link NormalizedResources}.
- */
-public class ResourceMapArrayBridge {
-
- private final ConcurrentMap<String, Integer> resourceNamesToArrayIndex = new ConcurrentHashMap<>();
- private final AtomicInteger counter = new AtomicInteger(0);
-
- /**
- * Translates a normalized resource map to an array of resource values. Each resource name will be assigned an index in the array, which
- * is guaranteed to be consistent with subsequent invocations of this method. Note that CPU and memory resources are not translated by
- * this method, as they are expected to be captured elsewhere.
- *
- * @param normalizedResources The resources to translate to an array
- * @return The array of resource values
- */
- public double[] translateToResourceArray(Map<String, Double> normalizedResources) {
- //To avoid locking we will go through the map twice. It should be small so it is probably not a big deal
- for (String key : normalizedResources.keySet()) {
- //We are going to skip over CPU and Memory, because they are captured elsewhere
- if (!Constants.COMMON_CPU_RESOURCE_NAME.equals(key)
- && !Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME.equals(key)
- && !Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME.equals(key)
- && !Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME.equals(key)) {
- resourceNamesToArrayIndex.computeIfAbsent(key, (k) -> counter.getAndIncrement());
- }
- }
- //By default all of the values are 0
- double[] ret = new double[counter.get()];
- for (Map.Entry<String, Double> entry : normalizedResources.entrySet()) {
- Integer index = resourceNamesToArrayIndex.get(entry.getKey());
- if (index != null) {
- //index == null if it is memory or CPU
- ret[index] = entry.getValue();
- }
- }
- return ret;
- }
-
- /**
- * Translates an array of resource values to a normalized resource map.
- *
- * @param resources The resource array to translate
- * @return The normalized resource map
- */
- public Map<String, Double> translateFromResourceArray(double[] resources) {
- Map<String, Double> ret = new HashMap<>();
- int length = resources.length;
- for (Map.Entry<String, Integer> entry : resourceNamesToArrayIndex.entrySet()) {
- int index = entry.getValue();
- if (index < length) {
- ret.put(entry.getKey(), resources[index]);
- }
- }
- return ret;
- }
-
- public Map<String, Integer> getResourceNamesToArrayIndex() {
- return Collections.unmodifiableMap(resourceNamesToArrayIndex);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceNameNormalizer.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceNameNormalizer.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceNameNormalizer.java
deleted file mode 100644
index d59ba92..0000000
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceNameNormalizer.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Copyright 2018 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.scheduler.resource;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.stream.Collectors;
-import org.apache.storm.Config;
-import org.apache.storm.Constants;
-
-/**
- * Provides resource name normalization for resource maps.
- */
-public class ResourceNameNormalizer {
-
- private final Map<String, String> resourceNameMapping;
-
- public ResourceNameNormalizer() {
- Map<String, String> tmp = new HashMap<>();
- tmp.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, Constants.COMMON_CPU_RESOURCE_NAME);
- tmp.put(Config.SUPERVISOR_CPU_CAPACITY, Constants.COMMON_CPU_RESOURCE_NAME);
- tmp.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME);
- tmp.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME);
- tmp.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME);
- resourceNameMapping = Collections.unmodifiableMap(tmp);
- }
-
- /**
- * Normalizes a supervisor resource map or topology details map's keys to universal resource names.
- *
- * @param resourceMap resource map of either Supervisor or Topology
- * @return the resource map with common resource names
- */
- public Map<String, Double> normalizedResourceMap(Map<String, ? extends Number> resourceMap) {
- if (resourceMap == null) {
- return new HashMap<>();
- }
- return new HashMap<>(resourceMap.entrySet().stream()
- .collect(Collectors.toMap(
- //Map the key if needed
- (e) -> resourceNameMapping.getOrDefault(e.getKey(), e.getKey()),
- //Map the value
- (e) -> e.getValue().doubleValue())));
- }
-
- public Map<String, String> getResourceNameMapping() {
- return resourceNameMapping;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java
index ca4ea63..2f9ab4c 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java
@@ -25,6 +25,8 @@ import org.apache.storm.generated.Bolt;
import org.apache.storm.generated.ComponentCommon;
import org.apache.storm.generated.SpoutSpec;
import org.apache.storm.generated.StormTopology;
+import org.apache.storm.scheduler.resource.normalization.NormalizedResourceRequest;
+import org.apache.storm.scheduler.resource.normalization.NormalizedResources;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;
http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceOffer.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceOffer.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceOffer.java
new file mode 100644
index 0000000..417dca9
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceOffer.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource.normalization;
+
+import java.util.Map;
+import org.apache.storm.Constants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An offer of resources with normalized resource names.
+ */
+public class NormalizedResourceOffer implements NormalizedResourcesWithMemory {
+
+ private static final Logger LOG = LoggerFactory.getLogger(NormalizedResourceOffer.class);
+ private final NormalizedResources normalizedResources;
+ private double totalMemoryMb;
+
+ /**
+ * Create a new normalized resource offer.
+ *
+ * @param resources the resources to be normalized.
+ */
+ public NormalizedResourceOffer(Map<String, ? extends Number> resources) {
+ Map<String, Double> normalizedResourceMap = NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(resources);
+ totalMemoryMb = normalizedResourceMap.getOrDefault(Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME, 0.0);
+ this.normalizedResources = new NormalizedResources(normalizedResourceMap);
+ }
+
+ public NormalizedResourceOffer() {
+ this((Map<String, ? extends Number>) null);
+ }
+
+ public NormalizedResourceOffer(NormalizedResourceOffer other) {
+ this.totalMemoryMb = other.totalMemoryMb;
+ this.normalizedResources = new NormalizedResources(other.normalizedResources);
+ }
+
+ @Override
+ public double getTotalMemoryMb() {
+ return totalMemoryMb;
+ }
+
+ public Map<String, Double> toNormalizedMap() {
+ Map<String, Double> ret = normalizedResources.toNormalizedMap();
+ ret.put(Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME, totalMemoryMb);
+ return ret;
+ }
+
+ public void add(NormalizedResourcesWithMemory other) {
+ normalizedResources.add(other.getNormalizedResources());
+ totalMemoryMb += other.getTotalMemoryMb();
+ }
+
+ public void remove(NormalizedResourcesWithMemory other) {
+ normalizedResources.remove(other.getNormalizedResources());
+ totalMemoryMb -= other.getTotalMemoryMb();
+ if (totalMemoryMb < 0.0) {
+ normalizedResources.throwBecauseResourceBecameNegative(
+ Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME, totalMemoryMb, other.getTotalMemoryMb());
+ };
+ }
+
+ /**
+ * @see NormalizedResources#calculateAveragePercentageUsedBy(org.apache.storm.scheduler.resource.normalization.NormalizedResources,
+ * double, double).
+ */
+ public double calculateAveragePercentageUsedBy(NormalizedResourceOffer used) {
+ return normalizedResources.calculateAveragePercentageUsedBy(
+ used.getNormalizedResources(), getTotalMemoryMb(), used.getTotalMemoryMb());
+ }
+
+ /**
+ * @see NormalizedResources#calculateMinPercentageUsedBy(org.apache.storm.scheduler.resource.normalization.NormalizedResources, double,
+ * double)
+ */
+ public double calculateMinPercentageUsedBy(NormalizedResourceOffer used) {
+ return normalizedResources.calculateMinPercentageUsedBy(used.getNormalizedResources(), getTotalMemoryMb(), used.getTotalMemoryMb());
+ }
+
+ /**
+ * @see NormalizedResources#couldHoldIgnoringSharedMemory(org.apache.storm.scheduler.resource.normalization.NormalizedResources, double,
+ * double).
+ */
+ public boolean couldHoldIgnoringSharedMemory(NormalizedResourcesWithMemory other) {
+ return normalizedResources.couldHoldIgnoringSharedMemory(
+ other.getNormalizedResources(), getTotalMemoryMb(), other.getTotalMemoryMb());
+ }
+
+ public double getTotalCpu() {
+ return normalizedResources.getTotalCpu();
+ }
+
+ @Override
+ public NormalizedResources getNormalizedResources() {
+ return normalizedResources;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceRequest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceRequest.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceRequest.java
new file mode 100644
index 0000000..6627bb5
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceRequest.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource.normalization;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.storm.Config;
+import org.apache.storm.Constants;
+import org.apache.storm.generated.ComponentCommon;
+import org.apache.storm.generated.WorkerResources;
+import org.apache.storm.utils.ObjectReader;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A resource request with normalized resource names.
+ */
+public class NormalizedResourceRequest implements NormalizedResourcesWithMemory {
+
+ private static final Logger LOG = LoggerFactory.getLogger(NormalizedResourceRequest.class);
+
+ private static void putIfMissing(Map<String, Double> dest, String destKey, Map<String, Object> src, String srcKey) {
+ if (!dest.containsKey(destKey)) {
+ Number value = (Number) src.get(srcKey);
+ if (value != null) {
+ dest.put(destKey, value.doubleValue());
+ }
+ }
+ }
+
+ private static Map<String, Double> getDefaultResources(Map<String, Object> topoConf) {
+ Map<String, Double> ret = NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap((Map<String, Number>) topoConf.getOrDefault(
+ Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, new HashMap<>()));
+ putIfMissing(ret, Constants.COMMON_CPU_RESOURCE_NAME, topoConf, Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);
+ putIfMissing(ret, Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, topoConf, Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB);
+ putIfMissing(ret, Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, topoConf, Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
+ return ret;
+ }
+
+ private static Map<String, Double> parseResources(String input) {
+ Map<String, Double> topologyResources = new HashMap<>();
+ JSONParser parser = new JSONParser();
+ LOG.debug("Input to parseResources {}", input);
+ try {
+ if (input != null) {
+ Object obj = parser.parse(input);
+ JSONObject jsonObject = (JSONObject) obj;
+
+ // Legacy resource parsing
+ if (jsonObject.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB)) {
+ Double topoMemOnHeap = ObjectReader
+ .getDouble(jsonObject.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB), null);
+ topologyResources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, topoMemOnHeap);
+ }
+ if (jsonObject.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB)) {
+ Double topoMemOffHeap = ObjectReader
+ .getDouble(jsonObject.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB), null);
+ topologyResources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, topoMemOffHeap);
+ }
+ if (jsonObject.containsKey(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT)) {
+ Double topoCpu = ObjectReader.getDouble(jsonObject.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT),
+ null);
+ topologyResources.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, topoCpu);
+ }
+
+ // If resource is also present in resources map will overwrite the above
+ if (jsonObject.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP)) {
+ Map<String, Number> rawResourcesMap =
+ (Map<String, Number>) jsonObject.computeIfAbsent(
+ Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, (k) -> new HashMap<>());
+
+ for (Map.Entry<String, Number> stringNumberEntry : rawResourcesMap.entrySet()) {
+ topologyResources.put(
+ stringNumberEntry.getKey(), stringNumberEntry.getValue().doubleValue());
+ }
+
+ }
+ }
+ } catch (ParseException e) {
+ LOG.error("Failed to parse component resources is:" + e.toString(), e);
+ return null;
+ }
+ return topologyResources;
+ }
+
+ private final NormalizedResources normalizedResources;
+ private double onHeap;
+ private double offHeap;
+
+ private NormalizedResourceRequest(Map<String, ? extends Number> resources,
+ Map<String, Double> defaultResources) {
+ Map<String, Double> normalizedResourceMap = NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(defaultResources);
+ normalizedResourceMap.putAll(NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(resources));
+ onHeap = normalizedResourceMap.getOrDefault(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, 0.0);
+ offHeap = normalizedResourceMap.getOrDefault(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, 0.0);
+ normalizedResources = new NormalizedResources(normalizedResourceMap);
+ }
+
+ public NormalizedResourceRequest(ComponentCommon component, Map<String, Object> topoConf) {
+ this(parseResources(component.get_json_conf()), getDefaultResources(topoConf));
+ }
+
+ public NormalizedResourceRequest(Map<String, Object> topoConf) {
+ this((Map<String, ? extends Number>) null, getDefaultResources(topoConf));
+ }
+
+ public NormalizedResourceRequest() {
+ this((Map<String, ? extends Number>) null, null);
+ }
+
+ public Map<String, Double> toNormalizedMap() {
+ Map<String, Double> ret = this.normalizedResources.toNormalizedMap();
+ ret.put(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, offHeap);
+ ret.put(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, onHeap);
+ return ret;
+ }
+
+ public double getOnHeapMemoryMb() {
+ return onHeap;
+ }
+
+ public void addOnHeap(final double onHeap) {
+ this.onHeap += onHeap;
+ }
+
+ public double getOffHeapMemoryMb() {
+ return offHeap;
+ }
+
+ public void addOffHeap(final double offHeap) {
+ this.offHeap += offHeap;
+ }
+
+ /**
+ * Add the resources in other to this.
+ *
+ * @param other the other Request to add to this.
+ */
+ public void add(NormalizedResourceRequest other) {
+ this.normalizedResources.add(other.normalizedResources);
+ onHeap += other.onHeap;
+ offHeap += other.offHeap;
+ }
+
+ public void add(WorkerResources value) {
+ this.normalizedResources.add(value);
+ //The resources are already normalized
+ Map<String, Double> resources = value.get_resources();
+ onHeap += resources.getOrDefault(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, 0.0);
+ offHeap += resources.getOrDefault(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, 0.0);
+ }
+
+ @Override
+ public double getTotalMemoryMb() {
+ return onHeap + offHeap;
+ }
+
+ @Override
+ public String toString() {
+ return super.toString() + " onHeap: " + onHeap + " offHeap: " + offHeap;
+ }
+
+ public double getTotalCpu() {
+ return this.normalizedResources.getTotalCpu();
+ }
+
+ @Override
+ public NormalizedResources getNormalizedResources() {
+ return this.normalizedResources;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java
new file mode 100644
index 0000000..76d5ce2
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource.normalization;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Arrays;
+import java.util.Map;
+import org.apache.storm.Constants;
+import org.apache.storm.generated.WorkerResources;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Resources that have been normalized. This class is intended as a delegate for more specific types of normalized resource set, since it
+ * does not keep track of memory as a resource.
+ */
+public class NormalizedResources {
+
+ private static final Logger LOG = LoggerFactory.getLogger(NormalizedResources.class);
+
+ public static ResourceNameNormalizer RESOURCE_NAME_NORMALIZER;
+ private static ResourceMapArrayBridge RESOURCE_MAP_ARRAY_BRIDGE;
+
+ static {
+ resetResourceNames();
+ }
+
+ private double cpu;
+ private double[] otherResources;
+
+ /**
+ * This is for testing only. It allows a test to reset the static state relating to resource names. We reset the mapping because some
+ * algorithms sadly have different behavior if a resource exists or not.
+ */
+ @VisibleForTesting
+ public static void resetResourceNames() {
+ RESOURCE_NAME_NORMALIZER = new ResourceNameNormalizer();
+ RESOURCE_MAP_ARRAY_BRIDGE = new ResourceMapArrayBridge();
+ }
+
+ /**
+ * Copy constructor.
+ */
+ public NormalizedResources(NormalizedResources other) {
+ cpu = other.cpu;
+ otherResources = Arrays.copyOf(other.otherResources, other.otherResources.length);
+ }
+
+ /**
+ * Create a new normalized set of resources. Note that memory is not managed by this class, as it is not consistent in requests vs
+ * offers because of how on heap vs off heap is used.
+ *
+ * @param normalizedResources the normalized resource map
+ */
+ public NormalizedResources(Map<String, Double> normalizedResources) {
+ cpu = normalizedResources.getOrDefault(Constants.COMMON_CPU_RESOURCE_NAME, 0.0);
+ otherResources = RESOURCE_MAP_ARRAY_BRIDGE.translateToResourceArray(normalizedResources);
+ }
+
+ /**
+ * Get the total amount of cpu.
+ *
+ * @return the amount of cpu.
+ */
+ public double getTotalCpu() {
+ return cpu;
+ }
+
+ private void zeroPadOtherResourcesIfNecessary(int requiredLength) {
+ if (requiredLength > otherResources.length) {
+ double[] newResources = new double[requiredLength];
+ System.arraycopy(otherResources, 0, newResources, 0, otherResources.length);
+ otherResources = newResources;
+ }
+ }
+
+ private void add(double[] resourceArray) {
+ int otherLength = resourceArray.length;
+ zeroPadOtherResourcesIfNecessary(otherLength);
+ for (int i = 0; i < otherLength; i++) {
+ otherResources[i] += resourceArray[i];
+ }
+ }
+
+ public void add(NormalizedResources other) {
+ this.cpu += other.cpu;
+ add(other.otherResources);
+ }
+
+ /**
+ * Add the resources from a worker to this.
+ *
+ * @param value the worker resources that should be added to this.
+ */
+ public void add(WorkerResources value) {
+ Map<String, Double> workerNormalizedResources = value.get_resources();
+ cpu += workerNormalizedResources.getOrDefault(Constants.COMMON_CPU_RESOURCE_NAME, 0.0);
+ add(RESOURCE_MAP_ARRAY_BRIDGE.translateToResourceArray(workerNormalizedResources));
+ }
+
+ /**
+ * Throw an IllegalArgumentException because a resource became negative during remove.
+ * @param resourceName The name of the resource that became negative
+ * @param currentValue The current value of the resource
+ * @param subtractedValue The value that was subtracted to make the resource negative
+ */
+ public void throwBecauseResourceBecameNegative(String resourceName, double currentValue, double subtractedValue) {
+ throw new IllegalArgumentException(String.format("Resource amounts should never be negative."
+ + " Resource '%s' with current value '%f' became negative because '%f' was removed.",
+ resourceName, currentValue, subtractedValue));
+ }
+
+ /**
+ * Remove the other resources from this. This is the same as subtracting the resources in other from this.
+ *
+ * @param other the resources we want removed.
+ * @throws IllegalArgumentException if subtracting other from this would result in any resource amount becoming negative.
+ */
+ public void remove(NormalizedResources other) {
+ this.cpu -= other.cpu;
+ if (cpu < 0.0) {
+ throwBecauseResourceBecameNegative(Constants.COMMON_CPU_RESOURCE_NAME, cpu, other.cpu);
+ }
+ int otherLength = other.otherResources.length;
+ zeroPadOtherResourcesIfNecessary(otherLength);
+ for (int i = 0; i < otherLength; i++) {
+ otherResources[i] -= other.otherResources[i];
+ if (otherResources[i] < 0.0) {
+ throwBecauseResourceBecameNegative(getResourceNameForResourceIndex(i), otherResources[i], other.otherResources[i]);
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "Normalized resources: " + toNormalizedMap();
+ }
+
+ /**
+ * Return a Map of the normalized resource name to a double. This should only be used when returning thrift resource requests to the end
+ * user.
+ */
+ public Map<String, Double> toNormalizedMap() {
+ Map<String, Double> ret = RESOURCE_MAP_ARRAY_BRIDGE.translateFromResourceArray(otherResources);
+ ret.put(Constants.COMMON_CPU_RESOURCE_NAME, cpu);
+ return ret;
+ }
+
+ private double getResourceAt(int index) {
+ if (index >= otherResources.length) {
+ return 0.0;
+ }
+ return otherResources[index];
+ }
+
+ /**
+ * A simple sanity check to see if all of the resources in this would be large enough to hold the resources in other ignoring memory. It
+ * does not check memory because with shared memory it is beyond the scope of this.
+ *
+ * @param other the resources that we want to check if they would fit in this.
+ * @param thisTotalMemoryMb The total memory in MB of this
+ * @param otherTotalMemoryMb The total memory in MB of other
+ * @return true if it might fit, else false if it could not possibly fit.
+ */
+ public boolean couldHoldIgnoringSharedMemory(NormalizedResources other, double thisTotalMemoryMb, double otherTotalMemoryMb) {
+ if (this.cpu < other.getTotalCpu()) {
+ return false;
+ }
+ int length = Math.max(this.otherResources.length, other.otherResources.length);
+ for (int i = 0; i < length; i++) {
+ if (getResourceAt(i) < other.getResourceAt(i)) {
+ return false;
+ }
+ }
+
+ return thisTotalMemoryMb >= otherTotalMemoryMb;
+ }
+
+ private String getResourceNameForResourceIndex(int resourceIndex) {
+ for (Map.Entry<String, Integer> entry : RESOURCE_MAP_ARRAY_BRIDGE.getResourceNamesToArrayIndex().entrySet()) {
+ int index = entry.getValue();
+ if (index == resourceIndex) {
+ return entry.getKey();
+ }
+ }
+ return null;
+ }
+
+ private void throwBecauseUsedIsNotSubsetOfTotal(NormalizedResources used, double totalMemoryMb, double usedMemoryMb) {
+ throw new IllegalArgumentException(String.format("The used resources must be a subset of the total resources."
+ + " Used: '%s', Total: '%s', Used Mem: '%f', Total Mem: '%f'",
+ used.toNormalizedMap(), this.toNormalizedMap(), usedMemoryMb, totalMemoryMb));
+ }
+
+ /**
+ * Calculate the average resource usage percentage with this being the total resources and used being the amounts used. Used must be a
+ * subset of the total resources. If a resource in the total has a value of zero, it will be skipped in the calculation to avoid
+ * division by 0. If all resources are skipped the result is defined to be 100.0.
+ *
+ * @param used the amount of resources used.
+ * @param totalMemoryMb The total memory in MB
+ * @param usedMemoryMb The used memory in MB
+ * @return the average percentage used 0.0 to 100.0.
+ * @throws IllegalArgumentException if any resource in used has a greater value than the same resource in the total, or used has generic
+ * resources that are not present in the total.
+ */
+ public double calculateAveragePercentageUsedBy(NormalizedResources used, double totalMemoryMb, double usedMemoryMb) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Calculating avg percentage used by. Used Mem: {} Total Mem: {}"
+ + " Used Normalized Resources: {} Total Normalized Resources: {}", totalMemoryMb, usedMemoryMb,
+ toNormalizedMap(), used.toNormalizedMap());
+ }
+
+ int skippedResourceTypes = 0;
+ double total = 0.0;
+ if (usedMemoryMb > totalMemoryMb) {
+ throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
+ }
+ if (totalMemoryMb != 0.0) {
+ total += usedMemoryMb / totalMemoryMb;
+ } else {
+ skippedResourceTypes++;
+ }
+ double totalCpu = getTotalCpu();
+ if (used.getTotalCpu() > getTotalCpu()) {
+ throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
+ }
+ if (totalCpu != 0.0) {
+ total += used.getTotalCpu() / getTotalCpu();
+ } else {
+ skippedResourceTypes++;
+ }
+
+ if (used.otherResources.length > otherResources.length) {
+ throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
+ }
+
+ for (int i = 0; i < otherResources.length; i++) {
+ double totalValue = otherResources[i];
+ double usedValue;
+ if (i >= used.otherResources.length) {
+ //Resources missing from used are using none of that resource
+ usedValue = 0.0;
+ } else {
+ usedValue = used.otherResources[i];
+ }
+ if (usedValue > totalValue) {
+ throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
+ }
+ if (totalValue == 0.0) {
+ //Skip any resources where the total is 0, the percent used for this resource isn't meaningful.
+ //We fall back to prioritizing by cpu, memory and any other resources by ignoring this value
+ skippedResourceTypes++;
+ continue;
+ }
+
+ total += usedValue / totalValue;
+ }
+ //Adjust the divisor for the average to account for any skipped resources (those where the total was 0)
+ int divisor = 2 + otherResources.length - skippedResourceTypes;
+ if (divisor == 0) {
+ /*
+ * This is an arbitrary choice to make the result consistent with calculateMin. Any value would be valid here, becase there are
+ * no (non-zero) resources in the total set of resources, so we're trying to average 0 values.
+ */
+ return 100.0;
+ } else {
+ return (total * 100.0) / divisor;
+ }
+ }
+
+ /**
+ * Calculate the minimum resource usage percentage with this being the total resources and used being the amounts used. Used must be a
+ * subset of the total resources. If a resource in the total has a value of zero, it will be skipped in the calculation to avoid
+ * division by 0. If all resources are skipped the result is defined to be 100.0.
+ *
+ * @param used the amount of resources used.
+ * @param totalMemoryMb The total memory in MB
+ * @param usedMemoryMb The used memory in MB
+ * @return the minimum percentage used 0.0 to 100.0.
+ * @throws IllegalArgumentException if any resource in used has a greater value than the same resource in the total, or used has generic
+ * resources that are not present in the total.
+ */
+ public double calculateMinPercentageUsedBy(NormalizedResources used, double totalMemoryMb, double usedMemoryMb) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Calculating min percentage used by. Used Mem: {} Total Mem: {}"
+ + " Used Normalized Resources: {} Total Normalized Resources: {}", totalMemoryMb, usedMemoryMb,
+ toNormalizedMap(), used.toNormalizedMap());
+ }
+
+ double min = 1.0;
+ if (usedMemoryMb > totalMemoryMb) {
+ throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
+ }
+ if (totalMemoryMb != 0.0) {
+ min = Math.min(min, usedMemoryMb / totalMemoryMb);
+ }
+ double totalCpu = getTotalCpu();
+ if (used.getTotalCpu() > totalCpu) {
+ throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
+ }
+ if (totalCpu != 0.0) {
+ min = Math.min(min, used.getTotalCpu() / totalCpu);
+ }
+
+ if (used.otherResources.length > otherResources.length) {
+ throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
+ }
+
+ for (int i = 0; i < otherResources.length; i++) {
+ if (otherResources[i] == 0.0) {
+ //Skip any resources where the total is 0, the percent used for this resource isn't meaningful.
+ //We fall back to prioritizing by cpu, memory and any other resources by ignoring this value
+ continue;
+ }
+ if (i >= used.otherResources.length) {
+ //Resources missing from used are using none of that resource
+ return 0;
+ }
+ if (used.otherResources[i] > otherResources[i]) {
+ throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
+ }
+ min = Math.min(min, used.otherResources[i] / otherResources[i]);
+ }
+ return min * 100.0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourcesWithMemory.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourcesWithMemory.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourcesWithMemory.java
new file mode 100644
index 0000000..5001645
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourcesWithMemory.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource.normalization;
+
+/**
+ * Intended for {@link NormalizedResources} wrappers that handle memory.
+ */
+public interface NormalizedResourcesWithMemory {
+
+ NormalizedResources getNormalizedResources();
+
+ double getTotalMemoryMb();
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/ResourceMapArrayBridge.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/ResourceMapArrayBridge.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/ResourceMapArrayBridge.java
new file mode 100644
index 0000000..2b07c65
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/ResourceMapArrayBridge.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource.normalization;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.storm.Constants;
+
+/**
+ * Provides translation between normalized resource maps and resource value arrays. Some operations use resource value arrays instead of the
+ * full normalized resource map as an optimization. See {@link NormalizedResources}.
+ */
+public class ResourceMapArrayBridge {
+
+ private final ConcurrentMap<String, Integer> resourceNamesToArrayIndex = new ConcurrentHashMap<>();
+ private final AtomicInteger counter = new AtomicInteger(0);
+
+ /**
+ * Translates a normalized resource map to an array of resource values. Each resource name will be assigned an index in the array, which
+ * is guaranteed to be consistent with subsequent invocations of this method. Note that CPU and memory resources are not translated by
+ * this method, as they are expected to be captured elsewhere.
+ *
+ * @param normalizedResources The resources to translate to an array
+ * @return The array of resource values
+ */
+ public double[] translateToResourceArray(Map<String, Double> normalizedResources) {
+ //To avoid locking we will go through the map twice. It should be small so it is probably not a big deal
+ for (String key : normalizedResources.keySet()) {
+ //We are going to skip over CPU and Memory, because they are captured elsewhere
+ if (!Constants.COMMON_CPU_RESOURCE_NAME.equals(key)
+ && !Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME.equals(key)
+ && !Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME.equals(key)
+ && !Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME.equals(key)) {
+ resourceNamesToArrayIndex.computeIfAbsent(key, (k) -> counter.getAndIncrement());
+ }
+ }
+ //By default all of the values are 0
+ double[] ret = new double[counter.get()];
+ for (Map.Entry<String, Double> entry : normalizedResources.entrySet()) {
+ Integer index = resourceNamesToArrayIndex.get(entry.getKey());
+ if (index != null) {
+ //index == null if it is memory or CPU
+ ret[index] = entry.getValue();
+ }
+ }
+ return ret;
+ }
+
+ /**
+ * Translates an array of resource values to a normalized resource map.
+ *
+ * @param resources The resource array to translate
+ * @return The normalized resource map
+ */
+ public Map<String, Double> translateFromResourceArray(double[] resources) {
+ Map<String, Double> ret = new HashMap<>();
+ int length = resources.length;
+ for (Map.Entry<String, Integer> entry : resourceNamesToArrayIndex.entrySet()) {
+ int index = entry.getValue();
+ if (index < length) {
+ ret.put(entry.getKey(), resources[index]);
+ }
+ }
+ return ret;
+ }
+
+ public Map<String, Integer> getResourceNamesToArrayIndex() {
+ return Collections.unmodifiableMap(resourceNamesToArrayIndex);
+ }
+
+}