You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by go...@apache.org on 2020/04/16 15:15:37 UTC

[storm] branch master updated: STORM-3259: Adds NUMA awareness to enable worker pinning

This is an automated email from the ASF dual-hosted git repository.

govind pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git


The following commit(s) were added to refs/heads/master by this push:
     new 632d320  STORM-3259: Adds NUMA awareness to enable worker pinning
     new ec4065a  Merge pull request #3237 from govind-menon/STORM-3259-Containers
632d320 is described below

commit 632d32039031c7e6bbe29ecce2d9833c1a677b76
Author: Govind Menon <go...@gmail.com>
AuthorDate: Wed Mar 25 17:53:12 2020 -0500

    STORM-3259: Adds NUMA awareness to enable worker pinning
    
    STORM-3259: Properly sends supervisor assignments on topology kill. Originally done by Aaron Gresch.
    
    STORM-3259: Addresses review comments on NUMA pinning
    
    STORM-3259: Addresses 2nd round of review comments
    
    STORM-3259: Moves getNumaConf to SupervisorUtils from Utils
    
    STORM-3259: Cleanup of unnecessary changes
    
    STORM-3259: Creates separate storm-server specific constants and helper files and moves NUMA to them
---
 .../main/java/org/apache/storm/DaemonConfig.java   |  18 ++++
 .../java/org/apache/storm/ServerConstants.java     |  21 ++++
 .../container/ResourceIsolationInterface.java      |   3 +-
 .../storm/container/cgroup/CgroupManager.java      |  38 ++++++-
 .../org/apache/storm/daemon/nimbus/Nimbus.java     |  47 ++++++---
 .../storm/daemon/supervisor/BasicContainer.java    |  22 +++--
 .../storm/daemon/supervisor/SupervisorUtils.java   |  37 ++++++-
 .../supervisor/timer/SupervisorHeartbeat.java      | 110 +++++++++++++++++----
 .../apache/storm/utils/DaemonConfigValidation.java |  62 ++++++++++++
 .../test/java/org/apache/storm/TestCgroups.java    |   2 +-
 10 files changed, 317 insertions(+), 43 deletions(-)

diff --git a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
index 482846d..731ca7a 100644
--- a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
+++ b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
@@ -43,6 +43,7 @@ import org.apache.storm.scheduler.resource.strategies.priority.ISchedulingPriori
 import org.apache.storm.scheduler.resource.strategies.scheduling.IStrategy;
 import org.apache.storm.security.auth.IAuthorizer;
 import org.apache.storm.security.auth.IHttpCredentialsPlugin;
+import org.apache.storm.utils.DaemonConfigValidation;
 import org.apache.storm.validation.ConfigValidation;
 import org.apache.storm.validation.Validated;
 
@@ -751,6 +752,23 @@ public class DaemonConfig implements Validated {
     public static final String SUPERVISOR_BLOBSTORE_DOWNLOAD_MAX_RETRIES = "supervisor.blobstore.download.max_retries";
 
     /**
+     * A map with keys mapped to each NUMA Node on the supervisor that will be used
+     * by scheduler. CPUs, memory and ports available on each NUMA node will be provided.
+     * Each supervisor will have different map of NUMAs.
+     * Example: "supervisor.numa.meta": {
+     *  "0": { "numa.memory.mb": 122880, "numa.cores": [ 0, 12, 1, 13, 2, 14, 3, 15, 4, 16, 5, 17],
+     *      "numa.ports": [6700, 6701]},
+     *  "1" : {"numa.memory.mb": 122880, "numa.cores": [ 6, 18, 7, 19, 8, 20, 9, 21, 10, 22, 11, 23],
+     *      "numa.ports": [6702, 6703], "numa.generic.resources.map": {"gpu.count" : 1}}
+     *  }
+     */
+    @IsMapEntryCustom(
+            keyValidatorClasses = { ConfigValidation.StringValidator.class },
+            valueValidatorClasses = { DaemonConfigValidation.NumaEntryValidator.class}
+    )
+    public static final String SUPERVISOR_NUMA_META = "supervisor.numa.meta";
+
+    /**
      * What blobstore implementation nimbus should use.
      */
     @IsString
diff --git a/storm-server/src/main/java/org/apache/storm/ServerConstants.java b/storm-server/src/main/java/org/apache/storm/ServerConstants.java
new file mode 100644
index 0000000..0a7f6a4
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/ServerConstants.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm;
+
+public class ServerConstants {
+    public static final String NUMA_MEMORY_IN_MB = "numa.memory.mb";
+    public static final String NUMA_CORES = "numa.cores";
+    public static final String NUMA_PORTS = "numa.ports";
+    public static final String NUMA_GENERIC_RESOURCES_MAP = "numa.generic.resources.map";
+    public static final String NUMA_ID_SEPARATOR = "-numa-";
+}
diff --git a/storm-server/src/main/java/org/apache/storm/container/ResourceIsolationInterface.java b/storm-server/src/main/java/org/apache/storm/container/ResourceIsolationInterface.java
index 89df29a..ae30c29 100644
--- a/storm-server/src/main/java/org/apache/storm/container/ResourceIsolationInterface.java
+++ b/storm-server/src/main/java/org/apache/storm/container/ResourceIsolationInterface.java
@@ -36,8 +36,9 @@ public interface ResourceIsolationInterface {
      * @param workerId worker id of the worker to start
      * @param workerMemory the amount of memory for the worker or null if not enforced
      * @param workerCpu the amount of cpu for the worker or null if not enforced
+     * @param numaId NUMA zone if applicable the worker should be bound to
      */
-    void reserveResourcesForWorker(String workerId, Integer workerMemory, Integer workerCpu);
+    void reserveResourcesForWorker(String workerId, Integer workerMemory, Integer workerCpu, String numaId);
 
     /**
      * This function will be called when the worker needs to shutdown. This function should include logic to clean up
diff --git a/storm-server/src/main/java/org/apache/storm/container/cgroup/CgroupManager.java b/storm-server/src/main/java/org/apache/storm/container/cgroup/CgroupManager.java
index fbdb364..6677da7 100644
--- a/storm-server/src/main/java/org/apache/storm/container/cgroup/CgroupManager.java
+++ b/storm-server/src/main/java/org/apache/storm/container/cgroup/CgroupManager.java
@@ -26,8 +26,11 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+
+import org.apache.commons.lang.SystemUtils;
 import org.apache.storm.Config;
 import org.apache.storm.DaemonConfig;
 import org.apache.storm.container.ResourceIsolationInterface;
@@ -50,6 +53,7 @@ public class CgroupManager implements ResourceIsolationInterface {
     private CgroupCommon rootCgroup;
     private String rootDir;
     private Map<String, Object> conf;
+    private Map<String, String> workerToNumaId;
 
     static long getMemInfoFreeMb() throws IOException {
         //MemFree:        14367072 kB
@@ -102,6 +106,7 @@ public class CgroupManager implements ResourceIsolationInterface {
             throw new RuntimeException("Cgroup error, please check /proc/cgroups");
         }
         this.prepareSubSystem(this.conf);
+        workerToNumaId = new ConcurrentHashMap();
     }
 
     /**
@@ -147,7 +152,7 @@ public class CgroupManager implements ResourceIsolationInterface {
     }
 
     @Override
-    public void reserveResourcesForWorker(String workerId, Integer totalMem, Integer cpuNum) throws SecurityException {
+    public void reserveResourcesForWorker(String workerId, Integer totalMem, Integer cpuNum, String numaId) throws SecurityException {
         LOG.info("Creating cgroup for worker {} with resources {} MB {} % CPU", workerId, totalMem, cpuNum);
         // The manually set STORM_WORKER_CGROUP_CPU_LIMIT config on supervisor will overwrite resources assigned by
         // RAS (Resource Aware Scheduler)
@@ -202,7 +207,7 @@ public class CgroupManager implements ResourceIsolationInterface {
                 }
             }
         }
-
+        
         if ((boolean) this.conf.get(DaemonConfig.STORM_CGROUP_INHERIT_CPUSET_CONFIGS)) {
             if (workerGroup.getParent().getCores().containsKey(SubSystemType.cpuset)) {
                 CpusetCore parentCpusetCore = (CpusetCore) workerGroup.getParent().getCores().get(SubSystemType.cpuset);
@@ -219,6 +224,10 @@ public class CgroupManager implements ResourceIsolationInterface {
                 }
             }
         }
+
+        if (numaId != null) {
+            workerToNumaId.put(workerId, numaId);
+        }
     }
 
     @Override
@@ -236,9 +245,34 @@ public class CgroupManager implements ResourceIsolationInterface {
         }
     }
 
+    /**
+     * Extracting out to mock it for tests.
+     * @return true if on Linux.
+     */
+    protected static boolean isOnLinux() {
+        return SystemUtils.IS_OS_LINUX;
+    }
+
+    private void prefixNumaPinning(List<String> command, String numaId) {
+        if (isOnLinux()) {
+            command.add(0, "numactl");
+            command.add(1, "--cpunodebind=" + numaId);
+            command.add(2, "--membind=" + numaId);
+            return;
+        } else {
+            // TODO : Add support for pinning on Windows host
+            throw new RuntimeException("numactl pinning currently not supported on non-Linux hosts");
+        }
+    }
+
     @Override
     public List<String> getLaunchCommand(String workerId, List<String> existingCommand) {
         List<String> newCommand = getLaunchCommandPrefix(workerId);
+
+        if (workerToNumaId.containsKey(workerId)) {
+            prefixNumaPinning(newCommand, workerToNumaId.get(workerId));
+        }
+
         newCommand.addAll(existingCommand);
         return newCommand;
     }
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
index daccf15..eee199f 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -1553,23 +1553,45 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
     }
 
     /**
-     * Pick out assignments for specific node from all assignments.
+     * Pick out assignments for a specific host from all assignments.  This could include multiple NUMA
+     * supervisors on an individual host.
+     * @param assignmentMap stormId -> assignment map
+     * @param hostname        hostname
+     * @return stormId -> assignment map for the node
+     */
+    private static Map<String, Assignment> assignmentsForHost(Map<String, Assignment> assignmentMap, String hostname) {
+        Map<String, Assignment> ret = new HashMap<>();
+
+        assignmentMap.entrySet().stream().filter(assignmentEntry -> assignmentEntry.getValue().get_node_host().values()
+                .contains(hostname))
+                .forEach(assignmentEntry -> {
+                    ret.put(assignmentEntry.getKey(), assignmentEntry.getValue());
+                });
+
+        return ret;
+    }
+
+    /**
+     * Pick out assignments for specific NodeId from all assignments.
      *
      * @param assignmentMap stormId -> assignment map
-     * @param nodeId        supervisor/node id
+     * @param nodeId        supervisor node id
      * @return stormId -> assignment map for the node
      */
-    private static Map<String, Assignment> assignmentsForNode(Map<String, Assignment> assignmentMap, String nodeId) {
+    private static Map<String, Assignment> assignmentsForNodeId(Map<String, Assignment> assignmentMap, String nodeId) {
         Map<String, Assignment> ret = new HashMap<>();
+
         assignmentMap.entrySet().stream().filter(assignmentEntry -> assignmentEntry.getValue().get_node_host().keySet()
-                                                                                   .contains(nodeId))
-                     .forEach(assignmentEntry -> {
-                         ret.put(assignmentEntry.getKey(), assignmentEntry.getValue());
-                     });
+
+                .contains(nodeId))
+                .forEach(assignmentEntry -> {
+                    ret.put(assignmentEntry.getKey(), assignmentEntry.getValue());
+                });
 
         return ret;
     }
 
+
     /**
      * Notify supervisors/nodes assigned assignments.
      *
@@ -1585,8 +1607,9 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
         for (Map.Entry<String, String> nodeEntry : nodeHost.entrySet()) {
             try {
                 String nodeId = nodeEntry.getKey();
+                String hostname = nodeEntry.getValue();
                 SupervisorAssignments supervisorAssignments = new SupervisorAssignments();
-                supervisorAssignments.set_storm_assignment(assignmentsForNode(assignments, nodeEntry.getKey()));
+                supervisorAssignments.set_storm_assignment(assignmentsForHost(assignments, hostname));
                 SupervisorDetails details = supervisorDetails.get(nodeId);
                 Integer serverPort = details != null ? details.getServerPort() : null;
                 service.addAssignmentsForNode(nodeId, nodeEntry.getValue(), serverPort, supervisorAssignments, metricsRegistry);
@@ -4769,23 +4792,23 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
     }
 
     @Override
-    public SupervisorAssignments getSupervisorAssignments(String node) throws AuthorizationException, TException {
+    public SupervisorAssignments getSupervisorAssignments(String nodeId) throws AuthorizationException, TException {
         checkAuthorization(null, null, "getSupervisorAssignments");
         try {
             if (isLeader() && isAssignmentsRecovered()) {
                 SupervisorAssignments supervisorAssignments = new SupervisorAssignments();
-                supervisorAssignments.set_storm_assignment(assignmentsForNode(stormClusterState.assignmentsInfo(), node));
+                supervisorAssignments.set_storm_assignment(assignmentsForNodeId(stormClusterState.assignmentsInfo(), nodeId));
                 return supervisorAssignments;
             }
         } catch (Exception e) {
-            LOG.debug("Exception when node {} fetching assignments", node);
+            LOG.debug("Exception when node {} fetching assignments", nodeId);
             if (e instanceof TException) {
                 throw (TException) e;
             }
             // When this master is not leader and get a sync request from node,
             // just return nil which will cause client/node to get an unknown error,
             // the node/supervisor will sync it as a timer task.
-            LOG.debug("Exception when node {} fetching assignments", node);
+            LOG.debug("Exception when node {} fetching assignments", nodeId);
         }
         return null;
     }
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java
index 153ac77..4db6f3f 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java
@@ -37,6 +37,7 @@ import java.util.NavigableMap;
 import org.apache.commons.lang.StringUtils;
 import org.apache.storm.Config;
 import org.apache.storm.DaemonConfig;
+import org.apache.storm.ServerConstants;
 import org.apache.storm.container.ResourceIsolationInterface;
 import org.apache.storm.generated.LocalAssignment;
 import org.apache.storm.generated.ProfileAction;
@@ -615,7 +616,7 @@ public class BasicContainer extends Container {
      * @throws IOException on any error.
      */
     private List<String> mkLaunchCommand(final int memOnheap, final String stormRoot,
-                                         final String jlp) throws IOException {
+                                         final String jlp, final String numaId) throws IOException {
         final String javaCmd = javaCmd("java");
         final String stormOptions = ConfigUtils.concatIfNotNull(System.getProperty("storm.options"));
         final String topoConfFile = ConfigUtils.concatIfNotNull(System.getProperty("storm.conf.file"));
@@ -665,8 +666,11 @@ public class BasicContainer extends Container {
         commandList.addAll(classPathParams);
         commandList.add(getWorkerMain(topoVersion));
         commandList.add(topologyId);
+        String supervisorId = this.supervisorId;
+        if (numaId != null) {
+            supervisorId += ServerConstants.NUMA_ID_SEPARATOR + numaId;
+        }
         commandList.add(supervisorId);
-
         // supervisor port should be only presented to worker which supports RPC heartbeat
         // unknown version should be treated as "current version", which supports RPC heartbeat
         if ((topoVersion.getMajor() == -1 && topoVersion.getMinor() == -1)
@@ -817,8 +821,14 @@ public class BasicContainer extends Container {
     @Override
     public void launch() throws IOException {
         type.assertFull();
-        LOG.info("Launching worker with assignment {} for this supervisor {} on port {} with id {}", assignment,
-                supervisorId, port, workerId);
+        String numaId = SupervisorUtils.getNumaIdForPort(port, conf);
+        if (numaId == null) {
+            LOG.info("Launching worker with assignment {} for this supervisor {} on port {} with id {}",
+                    assignment, supervisorId, port, workerId);
+        } else {
+            LOG.info("Launching worker with assignment {} for this supervisor {} on port {} with id {}  bound to numa zone {}",
+                    assignment, supervisorId, port, workerId, numaId);
+        }
         exitedEarly = false;
 
         final WorkerResources resources = assignment.get_resources();
@@ -844,10 +854,10 @@ public class BasicContainer extends Container {
         if (resourceIsolationManager != null) {
             final int cpu = (int) Math.ceil(resources.get_cpu());
             //Save the memory limit so we can enforce it less strictly
-            resourceIsolationManager.reserveResourcesForWorker(workerId, (int) memoryLimitMb, cpu);
+            resourceIsolationManager.reserveResourcesForWorker(workerId, (int) memoryLimitMb, cpu, numaId);
         }
 
-        List<String> commandList = mkLaunchCommand(memOnHeap, stormRoot, jlp);
+        List<String> commandList = mkLaunchCommand(memOnHeap, stormRoot, jlp, numaId);
 
         LOG.info("Launching worker with command: {}. ", ServerUtils.shellCmd(commandList));
 
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java
index 5e4ce3f..3460600 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java
@@ -18,13 +18,16 @@
 
 package org.apache.storm.daemon.supervisor;
 
+import static org.apache.storm.ServerConstants.NUMA_PORTS;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import org.apache.storm.Config;
+import org.apache.storm.DaemonConfig;
 import org.apache.storm.generated.LSWorkerHeartbeat;
 import org.apache.storm.localizer.LocalResource;
 import org.apache.storm.utils.ConfigUtils;
@@ -35,6 +38,7 @@ import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 public class SupervisorUtils {
 
     private static final Logger LOG = LoggerFactory.getLogger(SupervisorUtils.class);
@@ -50,6 +54,24 @@ public class SupervisorUtils {
         _instance = INSTANCE;
     }
 
+    /**
+     * getNumaIdForPort for a specific supervisor.
+     * @param port port
+     * @param supervisorConf supervisorConf
+     * @return getNumaIdForPort
+     */
+    public static String getNumaIdForPort(Integer port, Map<String, Object> supervisorConf) {
+        Map<String, Object> validatedNumaMap = getNumaMap(supervisorConf);
+        for (Map.Entry<String, Object> numaEntry : validatedNumaMap.entrySet()) {
+            Map<String, Object> numaMap  = (Map<String, Object>) numaEntry.getValue();
+            List<Integer> portList = (List<Integer>) numaMap.get(NUMA_PORTS);
+            if (portList.contains(port)) {
+                return numaEntry.getKey();
+            }
+        }
+        return null;
+    }
+
     public static void rmrAsUser(Map<String, Object> conf, String id, String path) throws IOException {
         String user = ServerUtils.getFileOwner(path);
         String logPreFix = "rmr " + id;
@@ -117,6 +139,19 @@ public class SupervisorUtils {
         return _instance.readWorkerHeartbeatImpl(conf, workerId);
     }
 
+    /**
+     * Return supervisor numa configuration.
+     * @param stormConf stormConf
+     * @return getNumaMap
+     */
+    public static Map<String, Object> getNumaMap(Map<String, Object> stormConf) {
+        Object numa = stormConf.get(DaemonConfig.SUPERVISOR_NUMA_META);
+        if (numa == null) {
+            return Collections.emptyMap();
+        }
+        return (Map<String, Object>) numa;
+    }
+
     public Map<String, LSWorkerHeartbeat> readWorkerHeartbeatsImpl(Map<String, Object> conf) {
         Map<String, LSWorkerHeartbeat> workerHeartbeats = new HashMap<>();
 
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
index 24d4471..55a497b 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
@@ -17,14 +17,22 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.commons.collections.ListUtils;
 import org.apache.storm.Config;
 import org.apache.storm.DaemonConfig;
+import org.apache.storm.ServerConstants;
 import org.apache.storm.cluster.IStormClusterState;
 import org.apache.storm.daemon.supervisor.Supervisor;
+import org.apache.storm.daemon.supervisor.SupervisorUtils;
 import org.apache.storm.generated.SupervisorInfo;
+import org.apache.storm.scheduler.resource.normalization.NormalizedResourceOffer;
 import org.apache.storm.scheduler.resource.normalization.NormalizedResources;
 import org.apache.storm.utils.ObjectReader;
 import org.apache.storm.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class SupervisorHeartbeat implements Runnable {
 
@@ -33,6 +41,9 @@ public class SupervisorHeartbeat implements Runnable {
     private final Map<String, Object> conf;
     private final Supervisor supervisor;
 
+    public static final Logger LOG = LoggerFactory.getLogger(SupervisorHeartbeat.class);
+
+
     public SupervisorHeartbeat(Map<String, Object> conf, Supervisor supervisor) {
         this.stormClusterState = supervisor.getStormClusterState();
         this.supervisorId = supervisor.getId();
@@ -40,33 +51,88 @@ public class SupervisorHeartbeat implements Runnable {
         this.conf = conf;
     }
 
-    private SupervisorInfo buildSupervisorInfo(Map<String, Object> conf, Supervisor supervisor) {
-        SupervisorInfo supervisorInfo = new SupervisorInfo();
-        supervisorInfo.set_time_secs(Time.currentTimeSecs());
-        supervisorInfo.set_hostname(supervisor.getHostName());
-        supervisorInfo.set_assignment_id(supervisor.getAssignmentId());
-        supervisorInfo.set_server_port(supervisor.getThriftServerPort());
-
-        List<Long> usedPorts = new ArrayList<>();
-        usedPorts.addAll(supervisor.getCurrAssignment().get().keySet());
-        supervisorInfo.set_used_ports(usedPorts);
+    private Map<String, SupervisorInfo> buildSupervisorInfo(Map<String, Object> conf, Supervisor supervisor,
+                                                            Map<String, Object> validatedNumaMap) {
         List metaDatas = (List) supervisor.getiSupervisor().getMetadata();
-        List<Long> portList = new ArrayList<>();
+        List<Long> allPortList = new ArrayList<>();
         if (metaDatas != null) {
             for (Object data : metaDatas) {
                 Integer port = ObjectReader.getInt(data);
                 if (port != null) {
-                    portList.add(port.longValue());
+                    allPortList.add(port.longValue());
                 }
             }
         }
 
-        supervisorInfo.set_meta(portList);
-        supervisorInfo.set_scheduler_meta((Map<String, String>) conf.get(DaemonConfig.SUPERVISOR_SCHEDULER_META));
-        supervisorInfo.set_uptime_secs(supervisor.getUpTime().upTime());
-        supervisorInfo.set_version(supervisor.getStormVersion());
-        supervisorInfo.set_resources_map(mkSupervisorCapacities(conf));
-        return supervisorInfo;
+        List<Long> allUsedPorts = new ArrayList<>();
+        allUsedPorts.addAll(supervisor.getCurrAssignment().get().keySet());
+        Map<String, Double> totalSupervisorResources = mkSupervisorCapacities(conf);
+        NormalizedResourceOffer totalSupervisorNormalizedResources = new NormalizedResourceOffer(totalSupervisorResources);
+       
+        Map<String, SupervisorInfo> result = new HashMap();
+
+        if (validatedNumaMap != null) {
+            for (Map.Entry<String, Object> numaMapEntry : validatedNumaMap.entrySet()) {
+                SupervisorInfo supervisorInfo = new SupervisorInfo();
+                supervisorInfo.set_time_secs(Time.currentTimeSecs());
+                supervisorInfo.set_hostname(supervisor.getHostName());
+                supervisorInfo.set_assignment_id(
+                        supervisor.getAssignmentId() + ServerConstants.NUMA_ID_SEPARATOR + numaMapEntry.getKey()
+                );
+                supervisorInfo.set_server_port(supervisor.getThriftServerPort());
+
+                Map<String, Object> numaMap = (Map<String, Object>) numaMapEntry.getValue();
+                List numaPortList = ((List<Integer>) numaMap.get(ServerConstants.NUMA_PORTS)).stream()
+                    .map(e -> e.longValue()).collect(Collectors.toList());
+
+                List<Long> usedNumaPorts = ListUtils.intersection(numaPortList, allUsedPorts);
+                supervisorInfo.set_used_ports(usedNumaPorts);
+                supervisorInfo.set_meta(numaPortList);
+                allPortList = ListUtils.subtract(allPortList, numaPortList);
+                allUsedPorts = ListUtils.subtract(allUsedPorts, usedNumaPorts);
+                supervisorInfo.set_scheduler_meta(
+                        (Map<String, String>) conf.get(DaemonConfig.SUPERVISOR_SCHEDULER_META)
+                );
+                supervisorInfo.set_uptime_secs(supervisor.getUpTime().upTime());
+                supervisorInfo.set_version(supervisor.getStormVersion());
+                Map<String, Double> supervisorCapacitiesFromNumaMap = mkSupervisorCapacitiesFromNumaMap(numaMap);
+                NormalizedResourceOffer numaNormalizedResources = new NormalizedResourceOffer(supervisorCapacitiesFromNumaMap);
+                totalSupervisorNormalizedResources.remove(numaNormalizedResources);
+                supervisorInfo.set_resources_map(supervisorCapacitiesFromNumaMap);
+                result.put(supervisor.getId() + ServerConstants.NUMA_ID_SEPARATOR + numaMapEntry.getKey(), supervisorInfo);
+            }
+        }
+
+        if (totalSupervisorNormalizedResources.getTotalCpu() > 0
+            && totalSupervisorNormalizedResources.getTotalMemoryMb() > 0 && !allPortList.isEmpty()) {
+            SupervisorInfo supervisorInfo = new SupervisorInfo();
+            supervisorInfo.set_time_secs(Time.currentTimeSecs());
+            supervisorInfo.set_hostname(supervisor.getHostName());
+            supervisorInfo.set_assignment_id(supervisor.getAssignmentId());
+            supervisorInfo.set_server_port(supervisor.getThriftServerPort());
+            supervisorInfo.set_used_ports(allUsedPorts);
+            supervisorInfo.set_meta(allPortList);
+            supervisorInfo.set_scheduler_meta((Map<String, String>) conf.get(DaemonConfig.SUPERVISOR_SCHEDULER_META));
+            supervisorInfo.set_uptime_secs(supervisor.getUpTime().upTime());
+            supervisorInfo.set_version(supervisor.getStormVersion());
+            supervisorInfo.set_resources_map(totalSupervisorNormalizedResources.toNormalizedMap());
+            result.put(supervisor.getId(), supervisorInfo);
+        }
+        return result;
+    }
+
+    private Map<String, Double> mkSupervisorCapacitiesFromNumaMap(Map<String, Object> numaMap) {
+        Map<String, Double> ret = new HashMap();
+        ret.put(
+                Config.SUPERVISOR_CPU_CAPACITY,
+                (double) (((List<Integer>) numaMap.get(ServerConstants.NUMA_CORES)).size() * 100)
+        );
+        ret.put(
+                Config.SUPERVISOR_MEMORY_CAPACITY_MB,
+                Double.valueOf((Integer) numaMap.get(ServerConstants.NUMA_MEMORY_IN_MB))
+        );
+        ret.putAll((Map<String, Double>) numaMap.getOrDefault(ServerConstants.NUMA_GENERIC_RESOURCES_MAP, Collections.emptyMap()));
+        return NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(ret);
     }
 
     private Map<String, Double> mkSupervisorCapacities(Map<String, Object> conf) {
@@ -87,12 +153,16 @@ public class SupervisorHeartbeat implements Runnable {
             ret.put(stringNumberEntry.getKey(), stringNumberEntry.getValue().doubleValue());
         }
 
+        LOG.debug(NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(ret).toString());
         return NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(ret);
     }
 
     @Override
     public void run() {
-        SupervisorInfo supervisorInfo = buildSupervisorInfo(conf, supervisor);
-        stormClusterState.supervisorHeartbeat(supervisorId, supervisorInfo);
+        Map<String, Object> validatedNumaMap = SupervisorUtils.getNumaMap(conf);
+        Map<String, SupervisorInfo> supervisorInfoList = buildSupervisorInfo(conf, supervisor, validatedNumaMap);
+        for (Map.Entry<String, SupervisorInfo> supervisorInfoEntry: supervisorInfoList.entrySet()) {
+            stormClusterState.supervisorHeartbeat(supervisorInfoEntry.getKey(), supervisorInfoEntry.getValue());
+        }
     }
 }
diff --git a/storm-server/src/main/java/org/apache/storm/utils/DaemonConfigValidation.java b/storm-server/src/main/java/org/apache/storm/utils/DaemonConfigValidation.java
new file mode 100644
index 0000000..a031258
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/utils/DaemonConfigValidation.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.utils;
+
+import static org.apache.storm.ServerConstants.NUMA_CORES;
+import static org.apache.storm.ServerConstants.NUMA_GENERIC_RESOURCES_MAP;
+import static org.apache.storm.ServerConstants.NUMA_MEMORY_IN_MB;
+import static org.apache.storm.ServerConstants.NUMA_PORTS;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.storm.validation.ConfigValidation;
+
+public class DaemonConfigValidation {
+    public static class NumaEntryValidator extends ConfigValidation.Validator {
+
+        @Override
+        public void validateField(String name, Object o) {
+            if (o == null) {
+                return;
+            }
+            Map<String, Object> numa = (Map<String, Object>) o;
+            for (String key : new String[]{NUMA_CORES, NUMA_MEMORY_IN_MB, NUMA_PORTS}) {
+                if (!numa.containsKey(key)) {
+                    throw new IllegalArgumentException(
+                            "The numa configuration key [" + key + "] is missing!"
+                    );
+                }
+            }
+
+            List<Integer> cores = (List<Integer>) numa.get(NUMA_CORES);
+            Set<Integer> coreSet = new HashSet();
+            coreSet.addAll(cores);
+            if (coreSet.size() != cores.size()) {
+                throw new IllegalArgumentException(
+                        "Duplicate cores in NUMA config"
+                );
+            }
+            try {
+                Map<String, Double> numaGenericResources =
+                        (Map<String, Double>) numa.getOrDefault(NUMA_GENERIC_RESOURCES_MAP, Collections.EMPTY_MAP);
+            } catch (Exception e) {
+                throw new IllegalArgumentException(
+                        "Invalid generic resources in NUMA config"
+                );
+            }
+        }
+    }
+}
diff --git a/storm-server/src/test/java/org/apache/storm/TestCgroups.java b/storm-server/src/test/java/org/apache/storm/TestCgroups.java
index 874bd13..765283c 100644
--- a/storm-server/src/test/java/org/apache/storm/TestCgroups.java
+++ b/storm-server/src/test/java/org/apache/storm/TestCgroups.java
@@ -54,7 +54,7 @@ public class TestCgroups {
         manager.prepare(config);
 
         String workerId = UUID.randomUUID().toString();
-        manager.reserveResourcesForWorker(workerId, 1024, 200);
+        manager.reserveResourcesForWorker(workerId, 1024, 200, null);
 
         List<String> commandList = manager.getLaunchCommand(workerId, new ArrayList<String>());
         StringBuilder command = new StringBuilder();