You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by go...@apache.org on 2020/04/16 15:15:37 UTC
[storm] branch master updated: STORM-3259: Adds NUMA awareness to
enable worker pinning
This is an automated email from the ASF dual-hosted git repository.
govind pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push:
new 632d320 STORM-3259: Adds NUMA awareness to enable worker pinning
new ec4065a Merge pull request #3237 from govind-menon/STORM-3259-Containers
632d320 is described below
commit 632d32039031c7e6bbe29ecce2d9833c1a677b76
Author: Govind Menon <go...@gmail.com>
AuthorDate: Wed Mar 25 17:53:12 2020 -0500
STORM-3259: Adds NUMA awareness to enable worker pinning
STORM-3259: Properly sends supervisor assignments on topology kill. Originally done by Aaron Gresch.
STORM-3259: Addresses review comments on NUMA pinning
STORM-3259: Addresses 2nd round of review comments
STORM-3259: Moves getNumaConf to SupervisorUtils from Utils
STORM-3259: Cleanup of unnecessary changes
STORM-3259: Creates separate storm-server specific constants and helper files and moves NUMA to them
---
.../main/java/org/apache/storm/DaemonConfig.java | 18 ++++
.../java/org/apache/storm/ServerConstants.java | 21 ++++
.../container/ResourceIsolationInterface.java | 3 +-
.../storm/container/cgroup/CgroupManager.java | 38 ++++++-
.../org/apache/storm/daemon/nimbus/Nimbus.java | 47 ++++++---
.../storm/daemon/supervisor/BasicContainer.java | 22 +++--
.../storm/daemon/supervisor/SupervisorUtils.java | 37 ++++++-
.../supervisor/timer/SupervisorHeartbeat.java | 110 +++++++++++++++++----
.../apache/storm/utils/DaemonConfigValidation.java | 62 ++++++++++++
.../test/java/org/apache/storm/TestCgroups.java | 2 +-
10 files changed, 317 insertions(+), 43 deletions(-)
diff --git a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
index 482846d..731ca7a 100644
--- a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
+++ b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
@@ -43,6 +43,7 @@ import org.apache.storm.scheduler.resource.strategies.priority.ISchedulingPriori
import org.apache.storm.scheduler.resource.strategies.scheduling.IStrategy;
import org.apache.storm.security.auth.IAuthorizer;
import org.apache.storm.security.auth.IHttpCredentialsPlugin;
+import org.apache.storm.utils.DaemonConfigValidation;
import org.apache.storm.validation.ConfigValidation;
import org.apache.storm.validation.Validated;
@@ -751,6 +752,23 @@ public class DaemonConfig implements Validated {
public static final String SUPERVISOR_BLOBSTORE_DOWNLOAD_MAX_RETRIES = "supervisor.blobstore.download.max_retries";
/**
+ * A map with keys mapped to each NUMA Node on the supervisor that will be used
+ * by scheduler. CPUs, memory and ports available on each NUMA node will be provided.
+ * Each supervisor will have different map of NUMAs.
+ * Example: "supervisor.numa.meta": {
+ * "0": { "numa.memory.mb": 122880, "numa.cores": [ 0, 12, 1, 13, 2, 14, 3, 15, 4, 16, 5, 17],
+ * "numa.ports": [6700, 6701]},
+ * "1" : {"numa.memory.mb": 122880, "numa.cores": [ 6, 18, 7, 19, 8, 20, 9, 21, 10, 22, 11, 23],
+ * "numa.ports": [6702, 6703], "numa.generic.resources.map": {"gpu.count" : 1}}
+ * }
+ */
+ @IsMapEntryCustom(
+ keyValidatorClasses = { ConfigValidation.StringValidator.class },
+ valueValidatorClasses = { DaemonConfigValidation.NumaEntryValidator.class}
+ )
+ public static final String SUPERVISOR_NUMA_META = "supervisor.numa.meta";
+
+ /**
* What blobstore implementation nimbus should use.
*/
@IsString
diff --git a/storm-server/src/main/java/org/apache/storm/ServerConstants.java b/storm-server/src/main/java/org/apache/storm/ServerConstants.java
new file mode 100644
index 0000000..0a7f6a4
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/ServerConstants.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm;
+
+public class ServerConstants {
+ public static final String NUMA_MEMORY_IN_MB = "numa.memory.mb";
+ public static final String NUMA_CORES = "numa.cores";
+ public static final String NUMA_PORTS = "numa.ports";
+ public static final String NUMA_GENERIC_RESOURCES_MAP = "numa.generic.resources.map";
+ public static final String NUMA_ID_SEPARATOR = "-numa-";
+}
diff --git a/storm-server/src/main/java/org/apache/storm/container/ResourceIsolationInterface.java b/storm-server/src/main/java/org/apache/storm/container/ResourceIsolationInterface.java
index 89df29a..ae30c29 100644
--- a/storm-server/src/main/java/org/apache/storm/container/ResourceIsolationInterface.java
+++ b/storm-server/src/main/java/org/apache/storm/container/ResourceIsolationInterface.java
@@ -36,8 +36,9 @@ public interface ResourceIsolationInterface {
* @param workerId worker id of the worker to start
* @param workerMemory the amount of memory for the worker or null if not enforced
* @param workerCpu the amount of cpu for the worker or null if not enforced
+ * @param numaId NUMA zone if applicable the worker should be bound to
*/
- void reserveResourcesForWorker(String workerId, Integer workerMemory, Integer workerCpu);
+ void reserveResourcesForWorker(String workerId, Integer workerMemory, Integer workerCpu, String numaId);
/**
* This function will be called when the worker needs to shutdown. This function should include logic to clean up
diff --git a/storm-server/src/main/java/org/apache/storm/container/cgroup/CgroupManager.java b/storm-server/src/main/java/org/apache/storm/container/cgroup/CgroupManager.java
index fbdb364..6677da7 100644
--- a/storm-server/src/main/java/org/apache/storm/container/cgroup/CgroupManager.java
+++ b/storm-server/src/main/java/org/apache/storm/container/cgroup/CgroupManager.java
@@ -26,8 +26,11 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+
+import org.apache.commons.lang.SystemUtils;
import org.apache.storm.Config;
import org.apache.storm.DaemonConfig;
import org.apache.storm.container.ResourceIsolationInterface;
@@ -50,6 +53,7 @@ public class CgroupManager implements ResourceIsolationInterface {
private CgroupCommon rootCgroup;
private String rootDir;
private Map<String, Object> conf;
+ private Map<String, String> workerToNumaId;
static long getMemInfoFreeMb() throws IOException {
//MemFree: 14367072 kB
@@ -102,6 +106,7 @@ public class CgroupManager implements ResourceIsolationInterface {
throw new RuntimeException("Cgroup error, please check /proc/cgroups");
}
this.prepareSubSystem(this.conf);
+ workerToNumaId = new ConcurrentHashMap();
}
/**
@@ -147,7 +152,7 @@ public class CgroupManager implements ResourceIsolationInterface {
}
@Override
- public void reserveResourcesForWorker(String workerId, Integer totalMem, Integer cpuNum) throws SecurityException {
+ public void reserveResourcesForWorker(String workerId, Integer totalMem, Integer cpuNum, String numaId) throws SecurityException {
LOG.info("Creating cgroup for worker {} with resources {} MB {} % CPU", workerId, totalMem, cpuNum);
// The manually set STORM_WORKER_CGROUP_CPU_LIMIT config on supervisor will overwrite resources assigned by
// RAS (Resource Aware Scheduler)
@@ -202,7 +207,7 @@ public class CgroupManager implements ResourceIsolationInterface {
}
}
}
-
+
if ((boolean) this.conf.get(DaemonConfig.STORM_CGROUP_INHERIT_CPUSET_CONFIGS)) {
if (workerGroup.getParent().getCores().containsKey(SubSystemType.cpuset)) {
CpusetCore parentCpusetCore = (CpusetCore) workerGroup.getParent().getCores().get(SubSystemType.cpuset);
@@ -219,6 +224,10 @@ public class CgroupManager implements ResourceIsolationInterface {
}
}
}
+
+ if (numaId != null) {
+ workerToNumaId.put(workerId, numaId);
+ }
}
@Override
@@ -236,9 +245,34 @@ public class CgroupManager implements ResourceIsolationInterface {
}
}
+ /**
+ * Extracting out to mock it for tests.
+ * @return true if on Linux.
+ */
+ protected static boolean isOnLinux() {
+ return SystemUtils.IS_OS_LINUX;
+ }
+
+ private void prefixNumaPinning(List<String> command, String numaId) {
+ if (isOnLinux()) {
+ command.add(0, "numactl");
+ command.add(1, "--cpunodebind=" + numaId);
+ command.add(2, "--membind=" + numaId);
+ return;
+ } else {
+ // TODO : Add support for pinning on Windows host
+ throw new RuntimeException("numactl pinning currently not supported on non-Linux hosts");
+ }
+ }
+
@Override
public List<String> getLaunchCommand(String workerId, List<String> existingCommand) {
List<String> newCommand = getLaunchCommandPrefix(workerId);
+
+ if (workerToNumaId.containsKey(workerId)) {
+ prefixNumaPinning(newCommand, workerToNumaId.get(workerId));
+ }
+
newCommand.addAll(existingCommand);
return newCommand;
}
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
index daccf15..eee199f 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -1553,23 +1553,45 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
}
/**
- * Pick out assignments for specific node from all assignments.
+ * Pick out assignments for a specific host from all assignments. This could include multiple NUMA
+ * supervisors on an individual host.
+ * @param assignmentMap stormId -> assignment map
+ * @param hostname hostname
+ * @return stormId -> assignment map for the node
+ */
+ private static Map<String, Assignment> assignmentsForHost(Map<String, Assignment> assignmentMap, String hostname) {
+ Map<String, Assignment> ret = new HashMap<>();
+
+ assignmentMap.entrySet().stream().filter(assignmentEntry -> assignmentEntry.getValue().get_node_host().values()
+ .contains(hostname))
+ .forEach(assignmentEntry -> {
+ ret.put(assignmentEntry.getKey(), assignmentEntry.getValue());
+ });
+
+ return ret;
+ }
+
+ /**
+ * Pick out assignments for specific NodeId from all assignments.
*
* @param assignmentMap stormId -> assignment map
- * @param nodeId supervisor/node id
+ * @param nodeId supervisor node id
* @return stormId -> assignment map for the node
*/
- private static Map<String, Assignment> assignmentsForNode(Map<String, Assignment> assignmentMap, String nodeId) {
+ private static Map<String, Assignment> assignmentsForNodeId(Map<String, Assignment> assignmentMap, String nodeId) {
Map<String, Assignment> ret = new HashMap<>();
+
assignmentMap.entrySet().stream().filter(assignmentEntry -> assignmentEntry.getValue().get_node_host().keySet()
- .contains(nodeId))
- .forEach(assignmentEntry -> {
- ret.put(assignmentEntry.getKey(), assignmentEntry.getValue());
- });
+
+ .contains(nodeId))
+ .forEach(assignmentEntry -> {
+ ret.put(assignmentEntry.getKey(), assignmentEntry.getValue());
+ });
return ret;
}
+
/**
* Notify supervisors/nodes assigned assignments.
*
@@ -1585,8 +1607,9 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
for (Map.Entry<String, String> nodeEntry : nodeHost.entrySet()) {
try {
String nodeId = nodeEntry.getKey();
+ String hostname = nodeEntry.getValue();
SupervisorAssignments supervisorAssignments = new SupervisorAssignments();
- supervisorAssignments.set_storm_assignment(assignmentsForNode(assignments, nodeEntry.getKey()));
+ supervisorAssignments.set_storm_assignment(assignmentsForHost(assignments, hostname));
SupervisorDetails details = supervisorDetails.get(nodeId);
Integer serverPort = details != null ? details.getServerPort() : null;
service.addAssignmentsForNode(nodeId, nodeEntry.getValue(), serverPort, supervisorAssignments, metricsRegistry);
@@ -4769,23 +4792,23 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
}
@Override
- public SupervisorAssignments getSupervisorAssignments(String node) throws AuthorizationException, TException {
+ public SupervisorAssignments getSupervisorAssignments(String nodeId) throws AuthorizationException, TException {
checkAuthorization(null, null, "getSupervisorAssignments");
try {
if (isLeader() && isAssignmentsRecovered()) {
SupervisorAssignments supervisorAssignments = new SupervisorAssignments();
- supervisorAssignments.set_storm_assignment(assignmentsForNode(stormClusterState.assignmentsInfo(), node));
+ supervisorAssignments.set_storm_assignment(assignmentsForNodeId(stormClusterState.assignmentsInfo(), nodeId));
return supervisorAssignments;
}
} catch (Exception e) {
- LOG.debug("Exception when node {} fetching assignments", node);
+ LOG.debug("Exception when node {} fetching assignments", nodeId);
if (e instanceof TException) {
throw (TException) e;
}
// When this master is not leader and get a sync request from node,
// just return nil which will cause client/node to get an unknown error,
// the node/supervisor will sync it as a timer task.
- LOG.debug("Exception when node {} fetching assignments", node);
+ LOG.debug("Exception when node {} fetching assignments", nodeId);
}
return null;
}
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java
index 153ac77..4db6f3f 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java
@@ -37,6 +37,7 @@ import java.util.NavigableMap;
import org.apache.commons.lang.StringUtils;
import org.apache.storm.Config;
import org.apache.storm.DaemonConfig;
+import org.apache.storm.ServerConstants;
import org.apache.storm.container.ResourceIsolationInterface;
import org.apache.storm.generated.LocalAssignment;
import org.apache.storm.generated.ProfileAction;
@@ -615,7 +616,7 @@ public class BasicContainer extends Container {
* @throws IOException on any error.
*/
private List<String> mkLaunchCommand(final int memOnheap, final String stormRoot,
- final String jlp) throws IOException {
+ final String jlp, final String numaId) throws IOException {
final String javaCmd = javaCmd("java");
final String stormOptions = ConfigUtils.concatIfNotNull(System.getProperty("storm.options"));
final String topoConfFile = ConfigUtils.concatIfNotNull(System.getProperty("storm.conf.file"));
@@ -665,8 +666,11 @@ public class BasicContainer extends Container {
commandList.addAll(classPathParams);
commandList.add(getWorkerMain(topoVersion));
commandList.add(topologyId);
+ String supervisorId = this.supervisorId;
+ if (numaId != null) {
+ supervisorId += ServerConstants.NUMA_ID_SEPARATOR + numaId;
+ }
commandList.add(supervisorId);
-
// supervisor port should be only presented to worker which supports RPC heartbeat
// unknown version should be treated as "current version", which supports RPC heartbeat
if ((topoVersion.getMajor() == -1 && topoVersion.getMinor() == -1)
@@ -817,8 +821,14 @@ public class BasicContainer extends Container {
@Override
public void launch() throws IOException {
type.assertFull();
- LOG.info("Launching worker with assignment {} for this supervisor {} on port {} with id {}", assignment,
- supervisorId, port, workerId);
+ String numaId = SupervisorUtils.getNumaIdForPort(port, conf);
+ if (numaId == null) {
+ LOG.info("Launching worker with assignment {} for this supervisor {} on port {} with id {}",
+ assignment, supervisorId, port, workerId);
+ } else {
+ LOG.info("Launching worker with assignment {} for this supervisor {} on port {} with id {} bound to numa zone {}",
+ assignment, supervisorId, port, workerId, numaId);
+ }
exitedEarly = false;
final WorkerResources resources = assignment.get_resources();
@@ -844,10 +854,10 @@ public class BasicContainer extends Container {
if (resourceIsolationManager != null) {
final int cpu = (int) Math.ceil(resources.get_cpu());
//Save the memory limit so we can enforce it less strictly
- resourceIsolationManager.reserveResourcesForWorker(workerId, (int) memoryLimitMb, cpu);
+ resourceIsolationManager.reserveResourcesForWorker(workerId, (int) memoryLimitMb, cpu, numaId);
}
- List<String> commandList = mkLaunchCommand(memOnHeap, stormRoot, jlp);
+ List<String> commandList = mkLaunchCommand(memOnHeap, stormRoot, jlp, numaId);
LOG.info("Launching worker with command: {}. ", ServerUtils.shellCmd(commandList));
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java
index 5e4ce3f..3460600 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java
@@ -18,13 +18,16 @@
package org.apache.storm.daemon.supervisor;
+import static org.apache.storm.ServerConstants.NUMA_PORTS;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.apache.storm.Config;
+import org.apache.storm.DaemonConfig;
import org.apache.storm.generated.LSWorkerHeartbeat;
import org.apache.storm.localizer.LocalResource;
import org.apache.storm.utils.ConfigUtils;
@@ -35,6 +38,7 @@ import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
public class SupervisorUtils {
private static final Logger LOG = LoggerFactory.getLogger(SupervisorUtils.class);
@@ -50,6 +54,24 @@ public class SupervisorUtils {
_instance = INSTANCE;
}
+ /**
+ * getNumaIdForPort for a specific supervisor.
+ * @param port port
+ * @param supervisorConf supervisorConf
+ * @return getNumaIdForPort
+ */
+ public static String getNumaIdForPort(Integer port, Map<String, Object> supervisorConf) {
+ Map<String, Object> validatedNumaMap = getNumaMap(supervisorConf);
+ for (Map.Entry<String, Object> numaEntry : validatedNumaMap.entrySet()) {
+ Map<String, Object> numaMap = (Map<String, Object>) numaEntry.getValue();
+ List<Integer> portList = (List<Integer>) numaMap.get(NUMA_PORTS);
+ if (portList.contains(port)) {
+ return numaEntry.getKey();
+ }
+ }
+ return null;
+ }
+
public static void rmrAsUser(Map<String, Object> conf, String id, String path) throws IOException {
String user = ServerUtils.getFileOwner(path);
String logPreFix = "rmr " + id;
@@ -117,6 +139,19 @@ public class SupervisorUtils {
return _instance.readWorkerHeartbeatImpl(conf, workerId);
}
+ /**
+ * Return supervisor numa configuration.
+ * @param stormConf stormConf
+ * @return getNumaMap
+ */
+ public static Map<String, Object> getNumaMap(Map<String, Object> stormConf) {
+ Object numa = stormConf.get(DaemonConfig.SUPERVISOR_NUMA_META);
+ if (numa == null) {
+ return Collections.emptyMap();
+ }
+ return (Map<String, Object>) numa;
+ }
+
public Map<String, LSWorkerHeartbeat> readWorkerHeartbeatsImpl(Map<String, Object> conf) {
Map<String, LSWorkerHeartbeat> workerHeartbeats = new HashMap<>();
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
index 24d4471..55a497b 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
@@ -17,14 +17,22 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.commons.collections.ListUtils;
import org.apache.storm.Config;
import org.apache.storm.DaemonConfig;
+import org.apache.storm.ServerConstants;
import org.apache.storm.cluster.IStormClusterState;
import org.apache.storm.daemon.supervisor.Supervisor;
+import org.apache.storm.daemon.supervisor.SupervisorUtils;
import org.apache.storm.generated.SupervisorInfo;
+import org.apache.storm.scheduler.resource.normalization.NormalizedResourceOffer;
import org.apache.storm.scheduler.resource.normalization.NormalizedResources;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class SupervisorHeartbeat implements Runnable {
@@ -33,6 +41,9 @@ public class SupervisorHeartbeat implements Runnable {
private final Map<String, Object> conf;
private final Supervisor supervisor;
+ public static final Logger LOG = LoggerFactory.getLogger(SupervisorHeartbeat.class);
+
+
public SupervisorHeartbeat(Map<String, Object> conf, Supervisor supervisor) {
this.stormClusterState = supervisor.getStormClusterState();
this.supervisorId = supervisor.getId();
@@ -40,33 +51,88 @@ public class SupervisorHeartbeat implements Runnable {
this.conf = conf;
}
- private SupervisorInfo buildSupervisorInfo(Map<String, Object> conf, Supervisor supervisor) {
- SupervisorInfo supervisorInfo = new SupervisorInfo();
- supervisorInfo.set_time_secs(Time.currentTimeSecs());
- supervisorInfo.set_hostname(supervisor.getHostName());
- supervisorInfo.set_assignment_id(supervisor.getAssignmentId());
- supervisorInfo.set_server_port(supervisor.getThriftServerPort());
-
- List<Long> usedPorts = new ArrayList<>();
- usedPorts.addAll(supervisor.getCurrAssignment().get().keySet());
- supervisorInfo.set_used_ports(usedPorts);
+ private Map<String, SupervisorInfo> buildSupervisorInfo(Map<String, Object> conf, Supervisor supervisor,
+ Map<String, Object> validatedNumaMap) {
List metaDatas = (List) supervisor.getiSupervisor().getMetadata();
- List<Long> portList = new ArrayList<>();
+ List<Long> allPortList = new ArrayList<>();
if (metaDatas != null) {
for (Object data : metaDatas) {
Integer port = ObjectReader.getInt(data);
if (port != null) {
- portList.add(port.longValue());
+ allPortList.add(port.longValue());
}
}
}
- supervisorInfo.set_meta(portList);
- supervisorInfo.set_scheduler_meta((Map<String, String>) conf.get(DaemonConfig.SUPERVISOR_SCHEDULER_META));
- supervisorInfo.set_uptime_secs(supervisor.getUpTime().upTime());
- supervisorInfo.set_version(supervisor.getStormVersion());
- supervisorInfo.set_resources_map(mkSupervisorCapacities(conf));
- return supervisorInfo;
+ List<Long> allUsedPorts = new ArrayList<>();
+ allUsedPorts.addAll(supervisor.getCurrAssignment().get().keySet());
+ Map<String, Double> totalSupervisorResources = mkSupervisorCapacities(conf);
+ NormalizedResourceOffer totalSupervisorNormalizedResources = new NormalizedResourceOffer(totalSupervisorResources);
+
+ Map<String, SupervisorInfo> result = new HashMap();
+
+ if (validatedNumaMap != null) {
+ for (Map.Entry<String, Object> numaMapEntry : validatedNumaMap.entrySet()) {
+ SupervisorInfo supervisorInfo = new SupervisorInfo();
+ supervisorInfo.set_time_secs(Time.currentTimeSecs());
+ supervisorInfo.set_hostname(supervisor.getHostName());
+ supervisorInfo.set_assignment_id(
+ supervisor.getAssignmentId() + ServerConstants.NUMA_ID_SEPARATOR + numaMapEntry.getKey()
+ );
+ supervisorInfo.set_server_port(supervisor.getThriftServerPort());
+
+ Map<String, Object> numaMap = (Map<String, Object>) numaMapEntry.getValue();
+ List numaPortList = ((List<Integer>) numaMap.get(ServerConstants.NUMA_PORTS)).stream()
+ .map(e -> e.longValue()).collect(Collectors.toList());
+
+ List<Long> usedNumaPorts = ListUtils.intersection(numaPortList, allUsedPorts);
+ supervisorInfo.set_used_ports(usedNumaPorts);
+ supervisorInfo.set_meta(numaPortList);
+ allPortList = ListUtils.subtract(allPortList, numaPortList);
+ allUsedPorts = ListUtils.subtract(allUsedPorts, usedNumaPorts);
+ supervisorInfo.set_scheduler_meta(
+ (Map<String, String>) conf.get(DaemonConfig.SUPERVISOR_SCHEDULER_META)
+ );
+ supervisorInfo.set_uptime_secs(supervisor.getUpTime().upTime());
+ supervisorInfo.set_version(supervisor.getStormVersion());
+ Map<String, Double> supervisorCapacitiesFromNumaMap = mkSupervisorCapacitiesFromNumaMap(numaMap);
+ NormalizedResourceOffer numaNormalizedResources = new NormalizedResourceOffer(supervisorCapacitiesFromNumaMap);
+ totalSupervisorNormalizedResources.remove(numaNormalizedResources);
+ supervisorInfo.set_resources_map(supervisorCapacitiesFromNumaMap);
+ result.put(supervisor.getId() + ServerConstants.NUMA_ID_SEPARATOR + numaMapEntry.getKey(), supervisorInfo);
+ }
+ }
+
+ if (totalSupervisorNormalizedResources.getTotalCpu() > 0
+ && totalSupervisorNormalizedResources.getTotalMemoryMb() > 0 && !allPortList.isEmpty()) {
+ SupervisorInfo supervisorInfo = new SupervisorInfo();
+ supervisorInfo.set_time_secs(Time.currentTimeSecs());
+ supervisorInfo.set_hostname(supervisor.getHostName());
+ supervisorInfo.set_assignment_id(supervisor.getAssignmentId());
+ supervisorInfo.set_server_port(supervisor.getThriftServerPort());
+ supervisorInfo.set_used_ports(allUsedPorts);
+ supervisorInfo.set_meta(allPortList);
+ supervisorInfo.set_scheduler_meta((Map<String, String>) conf.get(DaemonConfig.SUPERVISOR_SCHEDULER_META));
+ supervisorInfo.set_uptime_secs(supervisor.getUpTime().upTime());
+ supervisorInfo.set_version(supervisor.getStormVersion());
+ supervisorInfo.set_resources_map(totalSupervisorNormalizedResources.toNormalizedMap());
+ result.put(supervisor.getId(), supervisorInfo);
+ }
+ return result;
+ }
+
+ private Map<String, Double> mkSupervisorCapacitiesFromNumaMap(Map<String, Object> numaMap) {
+ Map<String, Double> ret = new HashMap();
+ ret.put(
+ Config.SUPERVISOR_CPU_CAPACITY,
+ (double) (((List<Integer>) numaMap.get(ServerConstants.NUMA_CORES)).size() * 100)
+ );
+ ret.put(
+ Config.SUPERVISOR_MEMORY_CAPACITY_MB,
+ Double.valueOf((Integer) numaMap.get(ServerConstants.NUMA_MEMORY_IN_MB))
+ );
+ ret.putAll((Map<String, Double>) numaMap.getOrDefault(ServerConstants.NUMA_GENERIC_RESOURCES_MAP, Collections.emptyMap()));
+ return NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(ret);
}
private Map<String, Double> mkSupervisorCapacities(Map<String, Object> conf) {
@@ -87,12 +153,16 @@ public class SupervisorHeartbeat implements Runnable {
ret.put(stringNumberEntry.getKey(), stringNumberEntry.getValue().doubleValue());
}
+ LOG.debug(NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(ret).toString());
return NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(ret);
}
@Override
public void run() {
- SupervisorInfo supervisorInfo = buildSupervisorInfo(conf, supervisor);
- stormClusterState.supervisorHeartbeat(supervisorId, supervisorInfo);
+ Map<String, Object> validatedNumaMap = SupervisorUtils.getNumaMap(conf);
+ Map<String, SupervisorInfo> supervisorInfoList = buildSupervisorInfo(conf, supervisor, validatedNumaMap);
+ for (Map.Entry<String, SupervisorInfo> supervisorInfoEntry: supervisorInfoList.entrySet()) {
+ stormClusterState.supervisorHeartbeat(supervisorInfoEntry.getKey(), supervisorInfoEntry.getValue());
+ }
}
}
diff --git a/storm-server/src/main/java/org/apache/storm/utils/DaemonConfigValidation.java b/storm-server/src/main/java/org/apache/storm/utils/DaemonConfigValidation.java
new file mode 100644
index 0000000..a031258
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/utils/DaemonConfigValidation.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.utils;
+
+import static org.apache.storm.ServerConstants.NUMA_CORES;
+import static org.apache.storm.ServerConstants.NUMA_GENERIC_RESOURCES_MAP;
+import static org.apache.storm.ServerConstants.NUMA_MEMORY_IN_MB;
+import static org.apache.storm.ServerConstants.NUMA_PORTS;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.storm.validation.ConfigValidation;
+
+public class DaemonConfigValidation {
+ public static class NumaEntryValidator extends ConfigValidation.Validator {
+
+ @Override
+ public void validateField(String name, Object o) {
+ if (o == null) {
+ return;
+ }
+ Map<String, Object> numa = (Map<String, Object>) o;
+ for (String key : new String[]{NUMA_CORES, NUMA_MEMORY_IN_MB, NUMA_PORTS}) {
+ if (!numa.containsKey(key)) {
+ throw new IllegalArgumentException(
+ "The numa configuration key [" + key + "] is missing!"
+ );
+ }
+ }
+
+ List<Integer> cores = (List<Integer>) numa.get(NUMA_CORES);
+ Set<Integer> coreSet = new HashSet();
+ coreSet.addAll(cores);
+ if (coreSet.size() != cores.size()) {
+ throw new IllegalArgumentException(
+ "Duplicate cores in NUMA config"
+ );
+ }
+ try {
+ Map<String, Double> numaGenericResources =
+ (Map<String, Double>) numa.getOrDefault(NUMA_GENERIC_RESOURCES_MAP, Collections.EMPTY_MAP);
+ } catch (Exception e) {
+ throw new IllegalArgumentException(
+ "Invalid generic resources in NUMA config"
+ );
+ }
+ }
+ }
+}
diff --git a/storm-server/src/test/java/org/apache/storm/TestCgroups.java b/storm-server/src/test/java/org/apache/storm/TestCgroups.java
index 874bd13..765283c 100644
--- a/storm-server/src/test/java/org/apache/storm/TestCgroups.java
+++ b/storm-server/src/test/java/org/apache/storm/TestCgroups.java
@@ -54,7 +54,7 @@ public class TestCgroups {
manager.prepare(config);
String workerId = UUID.randomUUID().toString();
- manager.reserveResourcesForWorker(workerId, 1024, 200);
+ manager.reserveResourcesForWorker(workerId, 1024, 200, null);
List<String> commandList = manager.getLaunchCommand(workerId, new ArrayList<String>());
StringBuilder command = new StringBuilder();