You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by govind-menon <gi...@git.apache.org> on 2018/10/17 19:04:13 UTC

[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

GitHub user govind-menon opened a pull request:

    https://github.com/apache/storm/pull/2881

    STORM-3259: NUMA Support for Storm

    Only functional changes - putting up for review now and tests soon to follow.
    
    Have Done the following tests
    
    1. Mixed cluster - numa supervisors and normal supervisor
    2. Numa only supervisors
    3. Profiling actions
    4. Rebalance

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/govind-menon/storm STORM-3259

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/2881.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2881
    
----
commit 243594eb14301a3367ddf6b0574afb13898cc4af
Author: Govind Menon <go...@...>
Date:   2018-10-07T20:11:43Z

    STORM-3259: NUMA Support for Storm
    
    Supervisor will heartbeat as multiple supervisors if there's an appropriate numa config
    
    [Numa] Fixiing worker sync assignments
    
    [numa] Moving NUMA config validation to common Utils

----


---

[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2881#discussion_r227979286
  
    --- Diff: storm-client/src/jvm/org/apache/storm/utils/Utils.java ---
    @@ -119,15 +120,81 @@
         // tests by subclassing.
         private static Utils _instance = new Utils();
         private static String memoizedLocalHostnameString = null;
    -    public static final Pattern TOPOLOGY_KEY_PATTERN = Pattern.compile("^[\\w \\t\\._-]+$", Pattern.UNICODE_CHARACTER_CLASS);
    +    public static final Pattern TOPOLOGY_KEY_PATTERN =
    +            Pattern.compile("^[\\w \\t\\._-]+$", Pattern.UNICODE_CHARACTER_CLASS);
    +
    +    public static final String NUMA_MEMORY_IN_MB = "memory.mb";
    +    public static final String NUMA_CORES = "cores";
    +    public static final String NUMAS_PORTS = "ports";
    +    public static final String NUMA_ID = "node_id";
    +    public static final String NUMAS_BASE = "numas";
     
         static {
             localConf = readStormConfig();
             serializationDelegate = getSerializationDelegate(localConf);
         }
     
         /**
    -     * Provide an instance of this class for delegates to use.  To mock out delegated methods, provide an instance of a subclass that
    +     * Validate supervisor numa configuration.
    +     * @param stormConf stormConf
    +     * @return getValidatedNumaMap
    +     * @throws KeyNotFoundException
    +     */
    +    public static Map<String, Object> getValidatedNumaMap(Map<String, Object> stormConf) throws KeyNotFoundException {
    +        Map<String, Object> validatedNumaMap = new HashMap();
    +        Map<String, Object> supervisorNumaMap =
    +                (Map<String, Object>) stormConf.get(Config.SUPERVISOR_NUMA_META);
    +        if (supervisorNumaMap == null) {
    +            return validatedNumaMap;
    +        }
    +        if (!supervisorNumaMap.containsKey(NUMAS_BASE)) {
    +            throw new KeyNotFoundException(
    +                    "The numa configurations [" + NUMAS_BASE + "] is missing!"
    +            );
    +        }
    +        List<Map> numaEntries = (List<Map>) supervisorNumaMap.get(NUMAS_BASE);
    --- End diff --
    
    Why is this a list of maps having `NUMA_ID` as a key instead of a Map of `NUMA_ID` to Maps having keys `NUMA_CORES, NUMA_MEMORY_IN_MB, NUMAS_PORTS`?


---

[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

Posted by Ethanlm <gi...@git.apache.org>.
Github user Ethanlm commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2881#discussion_r228307991
  
    --- Diff: storm-client/src/jvm/org/apache/storm/utils/Utils.java ---
    @@ -119,15 +120,81 @@
         // tests by subclassing.
         private static Utils _instance = new Utils();
         private static String memoizedLocalHostnameString = null;
    -    public static final Pattern TOPOLOGY_KEY_PATTERN = Pattern.compile("^[\\w \\t\\._-]+$", Pattern.UNICODE_CHARACTER_CLASS);
    +    public static final Pattern TOPOLOGY_KEY_PATTERN =
    +            Pattern.compile("^[\\w \\t\\._-]+$", Pattern.UNICODE_CHARACTER_CLASS);
    +
    +    public static final String NUMA_MEMORY_IN_MB = "memory.mb";
    +    public static final String NUMA_CORES = "cores";
    +    public static final String NUMAS_PORTS = "ports";
    +    public static final String NUMA_ID = "node_id";
    +    public static final String NUMAS_BASE = "numas";
     
         static {
             localConf = readStormConfig();
             serializationDelegate = getSerializationDelegate(localConf);
         }
     
         /**
    -     * Provide an instance of this class for delegates to use.  To mock out delegated methods, provide an instance of a subclass that
    +     * Validate supervisor numa configuration.
    +     * @param stormConf stormConf
    +     * @return getValidatedNumaMap
    +     * @throws KeyNotFoundException
    +     */
    +    public static Map<String, Object> getValidatedNumaMap(Map<String, Object> stormConf) throws KeyNotFoundException {
    +        Map<String, Object> validatedNumaMap = new HashMap();
    +        Map<String, Object> supervisorNumaMap =
    +                (Map<String, Object>) stormConf.get(Config.SUPERVISOR_NUMA_META);
    +        if (supervisorNumaMap == null) {
    +            return validatedNumaMap;
    +        }
    +        if (!supervisorNumaMap.containsKey(NUMAS_BASE)) {
    +            throw new KeyNotFoundException(
    +                    "The numa configurations [" + NUMAS_BASE + "] is missing!"
    +            );
    +        }
    +        List<Map> numaEntries = (List<Map>) supervisorNumaMap.get(NUMAS_BASE);
    +        if (numaEntries == null) return validatedNumaMap;
    +        for (Map numa : numaEntries) {
    +            for (String key : new String[]{NUMA_ID, NUMA_CORES, NUMA_MEMORY_IN_MB, NUMAS_PORTS}) {
    +                if (!numa.containsKey(key)) {
    --- End diff --
    
    Since we calculate cpu capacity based on the size of `NUMA_CORES`, do we need to validate that there is no dups in `NUMA_CORES` ? For example, what will happen if `NUMA_CORES` is `[0, 1, 1, 3]`?
    
    Another example: 
    ```
    {"node_id": 0, "memory.mb": 122880, "cores": [ 0, 1], "ports": [6700, 6701]},
    {"node_id": 1, "memory.mb": 122880, "cores": [ 0, 1], "ports": [6702, 6703]}
    ```


---

[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

Posted by agresch <gi...@git.apache.org>.
Github user agresch commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2881#discussion_r228287958
  
    --- Diff: storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java ---
    @@ -607,6 +608,27 @@ protected String javaCmd(String cmd) {
             return ret;
         }
     
    +    /**
    +     * Extracting out to mock it for tests.
    +     * @return true if on Linux.
    +     */
    +    protected boolean isOnLinux() {
    +        return SystemUtils.IS_OS_LINUX;
    +    }
    +
    +    private void addNumaPinningIfApplicable(String numaId, List<String> commandList) {
    +        if (numaId != null) {
    +            if (isOnLinux()) {
    +                commandList.add("numactl");
    +                commandList.add("--i=" + numaId);
    +                return;
    +            } else {
    +                // TODO : Add support for pinning on Windows host
    --- End diff --
    
    throw an exception?


---

[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

Posted by agresch <gi...@git.apache.org>.
Github user agresch commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2881#discussion_r226065179
  
    --- Diff: storm-client/src/jvm/org/apache/storm/utils/Utils.java ---
    @@ -121,11 +121,68 @@
         private static String memoizedLocalHostnameString = null;
         public static final Pattern TOPOLOGY_KEY_PATTERN = Pattern.compile("^[\\w \\t\\._-]+$", Pattern.UNICODE_CHARACTER_CLASS);
     
    +    public static final String NUMA_MEMORY_IN_MB = "MemoryInMB";
    +    public static final String NUMA_CORES = "Cores";
    +    public static final String NUMAS_PORTS = "Ports";
    +    public static final String NUMA_ID = "Id";
    +    public static final String NUMAS_BASE = "Numas";
    +
         static {
             localConf = readStormConfig();
             serializationDelegate = getSerializationDelegate(localConf);
         }
     
    +    /**
    +     * Validate supervisor numa configuration.
    +     * @param stormConf stormConf
    +     * @return getValidatedNumaMap
    +     * @throws KeyNotFoundException
    +     */
    +    public static Map<String, Object> getValidatedNumaMap(Map<String, Object> stormConf) throws KeyNotFoundException {
    +        Map<String, Object> validatedNumaMap = new HashMap();
    +        Map<String, Object> supervisorNumaMap = (Map<String, Object>) stormConf.get(Config.SUPERVISOR_NUMA_META);
    +        if (supervisorNumaMap == null) return validatedNumaMap;
    +        if (!supervisorNumaMap.containsKey(NUMAS_BASE)) {
    +            throw new KeyNotFoundException("The numa configurations [" + NUMAS_BASE + "] is missing!");
    +        }
    +        List<Map> numaEntries = (List<Map>) supervisorNumaMap.get(NUMAS_BASE);
    +        if (numaEntries == null) return validatedNumaMap;
    +        for (Map numa : numaEntries) {
    +            for (String key : new String[]{NUMA_ID, NUMA_CORES, NUMA_MEMORY_IN_MB, NUMAS_PORTS}) {
    +                if (!numa.containsKey(key)) {
    +                    throw new KeyNotFoundException("The numa configuration key [" + key + "] is missing!");
    +                }
    +            }
    +            validatedNumaMap.put((String) numa.get(NUMA_ID), numa);
    +        }
    +        return validatedNumaMap;
    +    }
    +
    +    /**
    +     * getNumaIdForPort.
    +     * @param port port
    +     * @param stormConf stormConf
    +     * @return getNumaIdForPort
    +     * @throws KeyNotFoundException
    +     */
    +    public static String getNumaIdForPort(Number port, Map<String, Object> stormConf) {
    +        Map validatedNumaMap = null;
    +        try {
    +            validatedNumaMap = getValidatedNumaMap(stormConf);
    +        } catch (KeyNotFoundException e) {
    +            LOG.error("Exception while getting NUMA config", e);
    +            return null;
    --- End diff --
    
    This sounds like it should be a serious misconfig.  I would think we should throw an exception?


---

[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2881#discussion_r227896771
  
    --- Diff: storm-client/src/jvm/org/apache/storm/Config.java ---
    @@ -1041,6 +1041,19 @@
         @isPositiveNumber
         @NotNull
         public static final String SUPERVISOR_WORKER_TIMEOUT_SECS = "supervisor.worker.timeout.secs";
    +
    +    /**
    +     * A map with blobstore keys mapped to each NUMA Node on the supervisor that will be used
    +     * by scheduler. CPUs, memory and ports available on each NUMA node will be provided.
    +     * Each supervisor will have different map of NUMAs.
    +     * Example: "supervisor.numa.meta": { "numas": [
    +     * {"node_id": 0, "memory.mb": 122880, "cores": [ 0, 12, 1, 13, 2, 14, 3, 15, 4, 16, 5, 17],
    +     *  "ports": [6700, 6701]},
    +     * {"node_id": 1, "memory.mb": 122880, "cores": [ 6, 18, 7, 19, 8, 20, 9, 21, 10, 22, 11, 23],
    +     *  "ports": [6702, 6703]}]}
    +     */
    +    public static final String SUPERVISOR_NUMA_META = "supervisor.numa.meta";
    --- End diff --
    
    We could add `@isType(type = Map.class)` here for some shallow validation.


---

[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

Posted by Ethanlm <gi...@git.apache.org>.
Github user Ethanlm commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2881#discussion_r228300609
  
    --- Diff: storm-client/src/jvm/org/apache/storm/utils/Utils.java ---
    @@ -121,11 +121,68 @@
         private static String memoizedLocalHostnameString = null;
         public static final Pattern TOPOLOGY_KEY_PATTERN = Pattern.compile("^[\\w \\t\\._-]+$", Pattern.UNICODE_CHARACTER_CLASS);
     
    +    public static final String NUMA_MEMORY_IN_MB = "MemoryInMB";
    +    public static final String NUMA_CORES = "Cores";
    +    public static final String NUMAS_PORTS = "Ports";
    +    public static final String NUMA_ID = "Id";
    +    public static final String NUMAS_BASE = "Numas";
    +
         static {
             localConf = readStormConfig();
             serializationDelegate = getSerializationDelegate(localConf);
         }
     
    +    /**
    +     * Validate supervisor numa configuration.
    +     * @param stormConf stormConf
    +     * @return getValidatedNumaMap
    +     * @throws KeyNotFoundException
    +     */
    +    public static Map<String, Object> getValidatedNumaMap(Map<String, Object> stormConf) throws KeyNotFoundException {
    +        Map<String, Object> validatedNumaMap = new HashMap();
    +        Map<String, Object> supervisorNumaMap = (Map<String, Object>) stormConf.get(Config.SUPERVISOR_NUMA_META);
    +        if (supervisorNumaMap == null) return validatedNumaMap;
    +        if (!supervisorNumaMap.containsKey(NUMAS_BASE)) {
    +            throw new KeyNotFoundException("The numa configurations [" + NUMAS_BASE + "] is missing!");
    +        }
    +        List<Map> numaEntries = (List<Map>) supervisorNumaMap.get(NUMAS_BASE);
    +        if (numaEntries == null) return validatedNumaMap;
    +        for (Map numa : numaEntries) {
    +            for (String key : new String[]{NUMA_ID, NUMA_CORES, NUMA_MEMORY_IN_MB, NUMAS_PORTS}) {
    +                if (!numa.containsKey(key)) {
    +                    throw new KeyNotFoundException("The numa configuration key [" + key + "] is missing!");
    +                }
    +            }
    +            validatedNumaMap.put((String) numa.get(NUMA_ID), numa);
    +        }
    +        return validatedNumaMap;
    +    }
    +
    +    /**
    +     * getNumaIdForPort.
    +     * @param port port
    +     * @param stormConf stormConf
    +     * @return getNumaIdForPort
    +     * @throws KeyNotFoundException
    +     */
    +    public static String getNumaIdForPort(Number port, Map<String, Object> stormConf) {
    +        Map validatedNumaMap = null;
    +        try {
    +            validatedNumaMap = getValidatedNumaMap(stormConf);
    +        } catch (KeyNotFoundException e) {
    +            LOG.error("Exception while getting NUMA config", e);
    +            return null;
    --- End diff --
    
    I agree that we should throw an exception if it's misconfigured. If numa is configured, I think we want to make sure it's working properly or fail fast if misconfigured. Storm admins might not notice there is an "ERROR" message at the first place and wondering why storm is  not working as expected (after a relatively long time). If they don't want numa, they will not set numa configs. 


---

[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

Posted by govind-menon <gi...@git.apache.org>.
Github user govind-menon commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2881#discussion_r226081834
  
    --- Diff: storm-client/src/jvm/org/apache/storm/utils/Utils.java ---
    @@ -121,11 +121,68 @@
         private static String memoizedLocalHostnameString = null;
         public static final Pattern TOPOLOGY_KEY_PATTERN = Pattern.compile("^[\\w \\t\\._-]+$", Pattern.UNICODE_CHARACTER_CLASS);
     
    +    public static final String NUMA_MEMORY_IN_MB = "MemoryInMB";
    +    public static final String NUMA_CORES = "Cores";
    +    public static final String NUMAS_PORTS = "Ports";
    +    public static final String NUMA_ID = "Id";
    +    public static final String NUMAS_BASE = "Numas";
    +
         static {
             localConf = readStormConfig();
             serializationDelegate = getSerializationDelegate(localConf);
         }
     
    +    /**
    +     * Validate supervisor numa configuration.
    +     * @param stormConf stormConf
    +     * @return getValidatedNumaMap
    +     * @throws KeyNotFoundException
    +     */
    +    public static Map<String, Object> getValidatedNumaMap(Map<String, Object> stormConf) throws KeyNotFoundException {
    +        Map<String, Object> validatedNumaMap = new HashMap();
    +        Map<String, Object> supervisorNumaMap = (Map<String, Object>) stormConf.get(Config.SUPERVISOR_NUMA_META);
    +        if (supervisorNumaMap == null) return validatedNumaMap;
    +        if (!supervisorNumaMap.containsKey(NUMAS_BASE)) {
    +            throw new KeyNotFoundException("The numa configurations [" + NUMAS_BASE + "] is missing!");
    +        }
    +        List<Map> numaEntries = (List<Map>) supervisorNumaMap.get(NUMAS_BASE);
    +        if (numaEntries == null) return validatedNumaMap;
    +        for (Map numa : numaEntries) {
    +            for (String key : new String[]{NUMA_ID, NUMA_CORES, NUMA_MEMORY_IN_MB, NUMAS_PORTS}) {
    +                if (!numa.containsKey(key)) {
    +                    throw new KeyNotFoundException("The numa configuration key [" + key + "] is missing!");
    +                }
    +            }
    +            validatedNumaMap.put((String) numa.get(NUMA_ID), numa);
    +        }
    +        return validatedNumaMap;
    +    }
    +
    +    /**
    +     * getNumaIdForPort.
    +     * @param port port
    +     * @param stormConf stormConf
    +     * @return getNumaIdForPort
    +     * @throws KeyNotFoundException
    +     */
    +    public static String getNumaIdForPort(Number port, Map<String, Object> stormConf) {
    +        Map validatedNumaMap = null;
    +        try {
    +            validatedNumaMap = getValidatedNumaMap(stormConf);
    +        } catch (KeyNotFoundException e) {
    +            LOG.error("Exception while getting NUMA config", e);
    +            return null;
    --- End diff --
    
    Not necessarily - it just means that the NUMA config is improperly formed or missing - the latter would be the default in normal clusters


---

[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2881#discussion_r227979722
  
    --- Diff: storm-client/src/jvm/org/apache/storm/utils/Utils.java ---
    @@ -119,15 +120,81 @@
         // tests by subclassing.
         private static Utils _instance = new Utils();
         private static String memoizedLocalHostnameString = null;
    -    public static final Pattern TOPOLOGY_KEY_PATTERN = Pattern.compile("^[\\w \\t\\._-]+$", Pattern.UNICODE_CHARACTER_CLASS);
    +    public static final Pattern TOPOLOGY_KEY_PATTERN =
    +            Pattern.compile("^[\\w \\t\\._-]+$", Pattern.UNICODE_CHARACTER_CLASS);
    +
    +    public static final String NUMA_MEMORY_IN_MB = "memory.mb";
    +    public static final String NUMA_CORES = "cores";
    +    public static final String NUMAS_PORTS = "ports";
    +    public static final String NUMA_ID = "node_id";
    --- End diff --
    
    Maybe `NUMA_ID = "numa_id"`?


---

[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

Posted by Ethanlm <gi...@git.apache.org>.
Github user Ethanlm commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2881#discussion_r228266466
  
    --- Diff: storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java ---
    @@ -17,14 +17,23 @@
     import java.util.HashMap;
     import java.util.List;
     import java.util.Map;
    +import java.util.stream.Collectors;
    +
    --- End diff --
    
    ```suggestion
    
    ```


---

[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

Posted by govind-menon <gi...@git.apache.org>.
Github user govind-menon commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2881#discussion_r227413704
  
    --- Diff: storm-client/src/jvm/org/apache/storm/Config.java ---
    @@ -1041,6 +1041,19 @@
         @isPositiveNumber
         @NotNull
         public static final String SUPERVISOR_WORKER_TIMEOUT_SECS = "supervisor.worker.timeout.secs";
    +
    +    /**
    +     * A map with blobstore keys mapped to each NUMA Node on the supervisor that will be used
    +     * by scheduler. CPUs, memory and ports available on each NUMA node will be provided.
    +     * Each supervisor will have different map of NUMAs.
    +     * Example: "supervisor.numa.meta": { "Numas": [
    +     * {"Id": 0, "MemoryInMB": 122880, "Cores": [ 0, 12, 1, 13, 2, 14, 3, 15, 4, 16, 5, 17],
    +     *  "Ports": [6700, 6701]},
    +     * {"Id": 1, "MemoryInMB": 122880, "Cores": [ 6, 18, 7, 19, 8, 20, 9, 21, 10, 22, 11, 23],
    +     *  "Ports": [6702, 6703]}]}
    +     */
    +    public static final String SUPERVISOR_NUMA_META = "supervisor.numa.meta";
    --- End diff --
    
    There is verification that we do on reading the config that all the fields are present and exceptions that get thrown for invalid values.


---

[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

Posted by govind-menon <gi...@git.apache.org>.
Github user govind-menon commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2881#discussion_r229869065
  
    --- Diff: storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java ---
    @@ -622,22 +624,26 @@ public boolean areAllConnectionsReady() {
             return this.autoCredentials;
         }
     
    -    private List<List<Long>> readWorkerExecutors(IStormClusterState stormClusterState, String topologyId, String assignmentId,
    +    private List<List<Long>> readWorkerExecutors(IStormClusterState stormClusterState,
    +                                                 String topologyId, String assignmentId,
                                                      int port) {
             LOG.info("Reading assignments");
             List<List<Long>> executorsAssignedToThisWorker = new ArrayList<>();
             executorsAssignedToThisWorker.add(Constants.SYSTEM_EXECUTOR_ID);
    -        Map<List<Long>, NodeInfo> executorToNodePort = getLocalAssignment(conf, stormClusterState, topologyId).get_executor_node_port();
    +        Map<List<Long>, NodeInfo> executorToNodePort =
    +                getLocalAssignment(conf, stormClusterState, topologyId).get_executor_node_port();
             for (Map.Entry<List<Long>, NodeInfo> entry : executorToNodePort.entrySet()) {
                 NodeInfo nodeInfo = entry.getValue();
    -            if (nodeInfo.get_node().equals(assignmentId) && nodeInfo.get_port().iterator().next() == port) {
    +            if (nodeInfo.get_node().startsWith(assignmentId) && nodeInfo.get_port().iterator().next() == port) {
    --- End diff --
    
    The supervisor Id is unique and unlikely to be the same


---

[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

Posted by Ethanlm <gi...@git.apache.org>.
Github user Ethanlm commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2881#discussion_r228303554
  
    --- Diff: storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java ---
    @@ -87,7 +87,7 @@ public Slot(AsyncLocalizer localizer, Map<String, Object> conf,
                     AtomicReference<Map<Long, LocalAssignment>> cachedCurrentAssignments,
                     OnlyLatestExecutor<Integer> metricsExec,
                     WorkerMetricsProcessor metricsProcessor,
    -                SlotMetrics slotMetrics) throws Exception {
    +                SlotMetrics slotMetrics, String numaId) throws Exception {
    --- End diff --
    
    Can we remove this? It looks like it's not used anywhere


---

[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2881#discussion_r227936615
  
    --- Diff: storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java ---
    @@ -155,6 +156,7 @@ public WorkerState(Map<String, Object> conf, IContext mqContext, String topology
             this.receiver = this.mqContext.bind(topologyId, port);
             this.topologyId = topologyId;
             this.assignmentId = assignmentId;
    +        this.numaId = "";
    --- End diff --
    
    Do we need this? It is always going to be an empty String.


---

[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

Posted by govind-menon <gi...@git.apache.org>.
Github user govind-menon commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2881#discussion_r226082082
  
    --- Diff: storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java ---
    @@ -630,7 +632,7 @@ public boolean areAllConnectionsReady() {
             Map<List<Long>, NodeInfo> executorToNodePort = getLocalAssignment(conf, stormClusterState, topologyId).get_executor_node_port();
             for (Map.Entry<List<Long>, NodeInfo> entry : executorToNodePort.entrySet()) {
                 NodeInfo nodeInfo = entry.getValue();
    -            if (nodeInfo.get_node().equals(assignmentId) && nodeInfo.get_port().iterator().next() == port) {
    +            if (nodeInfo.get_node().startsWith(assignmentId) && nodeInfo.get_port().iterator().next() == port) {
    --- End diff --
    
    It's an ID that links supervisors to their assignments - since Nimbus thinks there are multiple NUMA supervisors the real supervisor will now read all assignments linked to them


---

[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

Posted by Ethanlm <gi...@git.apache.org>.
Github user Ethanlm commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2881#discussion_r229880617
  
    --- Diff: storm-client/src/jvm/org/apache/storm/validation/ConfigValidation.java ---
    @@ -414,6 +418,34 @@ public void validateInteger(String name, Object o) {
             }
         }
     
    +    public static class NumaEntryValidator extends Validator {
    +
    +        @Override
    +        public void validateField(String name, Object o) {
    +            if (o == null) {
    +                return;
    +            }
    +            Map numa = (Map<String, Object>) o;
    +            for (String key : new String[]{NUMA_CORES, NUMA_MEMORY_IN_MB, NUMA_PORTS}) {
    +                if (!numa.containsKey(key)) {
    +                    throw new IllegalArgumentException(
    +                            "The numa configuration key [" + key + "] is missing!"
    +                    );
    +                }
    +            }
    +
    +            List<Integer> cores = (List<Integer>) numa.get(NUMA_CORES);
    +            Set coreSet = new HashSet();
    +            coreSet.addAll(cores);
    +            if (coreSet.size() != cores.size()) {
    --- End diff --
    
    Thanks for adding this. Do we care about duplicate cores across the numa zones?  (I don't  
    for example.
    ```
    numaid=0,  cores=[0,1,2,3]
    numaid=1, cores=[0,1,2,3]
    ```


---

[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2881#discussion_r227976069
  
    --- Diff: storm-client/src/jvm/org/apache/storm/utils/Utils.java ---
    @@ -119,15 +120,81 @@
         // tests by subclassing.
         private static Utils _instance = new Utils();
         private static String memoizedLocalHostnameString = null;
    -    public static final Pattern TOPOLOGY_KEY_PATTERN = Pattern.compile("^[\\w \\t\\._-]+$", Pattern.UNICODE_CHARACTER_CLASS);
    +    public static final Pattern TOPOLOGY_KEY_PATTERN =
    +            Pattern.compile("^[\\w \\t\\._-]+$", Pattern.UNICODE_CHARACTER_CLASS);
    +
    +    public static final String NUMA_MEMORY_IN_MB = "memory.mb";
    +    public static final String NUMA_CORES = "cores";
    +    public static final String NUMAS_PORTS = "ports";
    +    public static final String NUMA_ID = "node_id";
    +    public static final String NUMAS_BASE = "numas";
     
         static {
             localConf = readStormConfig();
             serializationDelegate = getSerializationDelegate(localConf);
         }
     
         /**
    -     * Provide an instance of this class for delegates to use.  To mock out delegated methods, provide an instance of a subclass that
    +     * Validate supervisor numa configuration.
    +     * @param stormConf stormConf
    +     * @return getValidatedNumaMap
    +     * @throws KeyNotFoundException
    --- End diff --
    
    We're catching `KeyNotFoundException`.


---

[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

Posted by govind-menon <gi...@git.apache.org>.
GitHub user govind-menon reopened a pull request:

    https://github.com/apache/storm/pull/2881

    STORM-3259: NUMA Support for Storm

    Only functional changes - putting up for review now and tests soon to follow.
    
    Have Done the following tests
    
    1. Mixed cluster - numa supervisors and normal supervisor
    2. Numa only supervisors
    3. Profiling actions
    4. Rebalance

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/govind-menon/storm STORM-3259

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/2881.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2881
    
----
commit 4597f7ac25fafb70dcdf2a63551650cee90ab8e3
Author: Govind Menon <go...@...>
Date:   2018-12-04T19:50:20Z

    STORM-3259: NUMA Support for Storm

----


---

[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2881#discussion_r226707991
  
    --- Diff: storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java ---
    @@ -820,8 +824,15 @@ private long calculateMemoryLimit(final WorkerResources resources, final int mem
         @Override
         public void launch() throws IOException {
             _type.assertFull();
    -        LOG.info("Launching worker with assignment {} for this supervisor {} on port {} with id {}", _assignment,
    -                 _supervisorId, _port, _workerId);
    +        String numaId = Utils.getNumaIdForPort(_port, _conf);
    +        if (numaId == null) {
    +            LOG.info("Launching worker with assignment {} for this supervisor {} on port {} with id {}", _assignment,
    +                    _supervisorId, _port, _workerId);
    +        } else {
    +            LOG.info("Launching worker with assignment {} for this supervisor {} on port {} with id {}" +
    +                            "bound to numa zone {}", _assignment,
    --- End diff --
    
    nit there is no space in between the {} above and the bound on this line.


---

[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2881#discussion_r226707791
  
    --- Diff: storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java ---
    @@ -639,6 +639,10 @@ protected String javaCmd(String cmd) {
             }
     
             List<String> commandList = new ArrayList<>();
    +        if (numaId != null) {
    +            commandList.add("numactl");
    +            commandList.add("--i=" + numaId);
    +        }
    --- End diff --
    
    This is a Linux specific way of pinning a process to a numa zone, and it may not even be for all flavors of Linux.  We need a good way to turning this off for other systems, and ideally providing a way for them to override this if needed so things will work for them in the future.  It is okay to output a large warning if you are on an unsupported system.


---

[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

Posted by Ethanlm <gi...@git.apache.org>.
Github user Ethanlm commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2881#discussion_r228264226
  
    --- Diff: storm-server/src/main/java/org/apache/storm/blobstore/BlobStoreUtils.java ---
    @@ -101,7 +124,11 @@ public static BlobKeySequenceInfo normalizeNimbusHostPortSequenceNumberInfo(Stri
             return nimbusInfoSet;
         }
     
    -    // Get sequence number details from latest sequence number of the blob
    +    /**
    +     * Get sequence number details from latest sequence number of the blob
    --- End diff --
    
    ```suggestion
         * Get sequence number details from latest sequence number of the blob.
    ```


---

[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

Posted by govind-menon <gi...@git.apache.org>.
Github user govind-menon commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2881#discussion_r227146299
  
    --- Diff: storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java ---
    @@ -348,14 +350,27 @@ public void doExecutorHeartbeats() {
             if (null == executors) {
                 stats = ClientStatsUtil.mkEmptyExecutorZkHbs(workerState.localExecutors);
             } else {
    -            stats = ClientStatsUtil.convertExecutorZkHbs(executors.stream().collect(Collectors
    -                                                                                  .toMap(IRunningExecutor::getExecutorId,
    -                                                                                         IRunningExecutor::renderStats)));
    +            stats = ClientStatsUtil.convertExecutorZkHbs(
    +                    executors.stream().collect(
    +                            Collectors.toMap(
    +                                    IRunningExecutor::getExecutorId,
    +                                    IRunningExecutor::renderStats
    +                            )
    +                    )
    +            );
             }
    -        Map<String, Object> zkHB = ClientStatsUtil.mkZkWorkerHb(workerState.topologyId, stats, workerState.uptime.upTime());
    +
    +        Map<String, Object> zkHB = ClientStatsUtil.mkZkWorkerHb(
    +                workerState.topologyId, stats, workerState.uptime.upTime()
    +        );
    +
             try {
    +            String assignmentId = workerState.assignmentId;
    +            if (this.numaId != null) {
    +                assignmentId += Constants.NUMA_ID_SEPARATOR + this.numaId;
    --- End diff --
    
    I've tested this out - the worker will launch correctly but wont' report back to nimbus and from the UI it will appear the workers haven't started yet.


---

[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2881#discussion_r227975023
  
    --- Diff: storm-client/src/jvm/org/apache/storm/utils/Utils.java ---
    @@ -121,11 +121,68 @@
         private static String memoizedLocalHostnameString = null;
         public static final Pattern TOPOLOGY_KEY_PATTERN = Pattern.compile("^[\\w \\t\\._-]+$", Pattern.UNICODE_CHARACTER_CLASS);
     
    +    public static final String NUMA_MEMORY_IN_MB = "MemoryInMB";
    +    public static final String NUMA_CORES = "Cores";
    +    public static final String NUMAS_PORTS = "Ports";
    +    public static final String NUMA_ID = "Id";
    +    public static final String NUMAS_BASE = "Numas";
    +
         static {
             localConf = readStormConfig();
             serializationDelegate = getSerializationDelegate(localConf);
         }
     
    +    /**
    +     * Validate supervisor numa configuration.
    +     * @param stormConf stormConf
    +     * @return getValidatedNumaMap
    +     * @throws KeyNotFoundException
    +     */
    +    public static Map<String, Object> getValidatedNumaMap(Map<String, Object> stormConf) throws KeyNotFoundException {
    +        Map<String, Object> validatedNumaMap = new HashMap();
    +        Map<String, Object> supervisorNumaMap = (Map<String, Object>) stormConf.get(Config.SUPERVISOR_NUMA_META);
    +        if (supervisorNumaMap == null) return validatedNumaMap;
    +        if (!supervisorNumaMap.containsKey(NUMAS_BASE)) {
    +            throw new KeyNotFoundException("The numa configurations [" + NUMAS_BASE + "] is missing!");
    +        }
    +        List<Map> numaEntries = (List<Map>) supervisorNumaMap.get(NUMAS_BASE);
    +        if (numaEntries == null) return validatedNumaMap;
    +        for (Map numa : numaEntries) {
    +            for (String key : new String[]{NUMA_ID, NUMA_CORES, NUMA_MEMORY_IN_MB, NUMAS_PORTS}) {
    +                if (!numa.containsKey(key)) {
    +                    throw new KeyNotFoundException("The numa configuration key [" + key + "] is missing!");
    +                }
    +            }
    +            validatedNumaMap.put((String) numa.get(NUMA_ID), numa);
    +        }
    +        return validatedNumaMap;
    +    }
    +
    +    /**
    +     * getNumaIdForPort.
    +     * @param port port
    +     * @param stormConf stormConf
    +     * @return getNumaIdForPort
    +     * @throws KeyNotFoundException
    +     */
    +    public static String getNumaIdForPort(Number port, Map<String, Object> stormConf) {
    +        Map validatedNumaMap = null;
    +        try {
    +            validatedNumaMap = getValidatedNumaMap(stormConf);
    +        } catch (KeyNotFoundException e) {
    +            LOG.error("Exception while getting NUMA config", e);
    +            return null;
    --- End diff --
    
    An improperly formed numa config seems like an error that should be logged or worse.
    But if it is normal that the numa config is not present, then it is alarming to see an error logged about this.


---

[GitHub] storm issue #2881: STORM-3259: NUMA Support for Storm

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:

    https://github.com/apache/storm/pull/2881
  
    I have thought about it more and I am fine with NUMA support being at the supervisor level.  I think in the future we will need to move it so that Nimbus is aware of NUMA simply to be able to combat fragmentation more, but for now I am fine with it.


---

[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

Posted by Ethanlm <gi...@git.apache.org>.
Github user Ethanlm commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2881#discussion_r228321490
  
    --- Diff: storm-server/src/test/java/org/apache/storm/daemon/supervisor/BasicContainerTest.java ---
    @@ -319,7 +319,15 @@ public void testLaunch() throws Exception {
             superConf.put(Config.STORM_WORKERS_ARTIFACTS_DIR, stormLocal);
             superConf.put(DaemonConfig.STORM_LOG4J2_CONF_DIR, log4jdir);
             superConf.put(Config.WORKER_CHILDOPTS, " -Dtesting=true");
    -
    +        Map numaNode = new HashMap();
    +        numaNode.put(Utils.NUMA_ID, "0");
    +        numaNode.put(Utils.NUMA_CORES, Collections.singletonList("0"));
    +        numaNode.put(Utils.NUMAS_PORTS, Collections.singletonList(8081));
    +        numaNode.put(Utils.NUMA_MEMORY_IN_MB, 2048);
    +        Map numaMap = new HashMap();
    +        numaMap.put(Utils.NUMAS_BASE, Collections.singletonList(numaNode));
    +
    +        superConf.put(Config.SUPERVISOR_NUMA_META, numaMap);
    --- End diff --
    
    Remove this  since `testNumaPinnedLaunch()` is available  ?


---

[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2881#discussion_r226693412
  
    --- Diff: storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java ---
    @@ -348,14 +350,27 @@ public void doExecutorHeartbeats() {
             if (null == executors) {
                 stats = ClientStatsUtil.mkEmptyExecutorZkHbs(workerState.localExecutors);
             } else {
    -            stats = ClientStatsUtil.convertExecutorZkHbs(executors.stream().collect(Collectors
    -                                                                                  .toMap(IRunningExecutor::getExecutorId,
    -                                                                                         IRunningExecutor::renderStats)));
    +            stats = ClientStatsUtil.convertExecutorZkHbs(
    +                    executors.stream().collect(
    +                            Collectors.toMap(
    +                                    IRunningExecutor::getExecutorId,
    +                                    IRunningExecutor::renderStats
    +                            )
    +                    )
    +            );
             }
    -        Map<String, Object> zkHB = ClientStatsUtil.mkZkWorkerHb(workerState.topologyId, stats, workerState.uptime.upTime());
    +
    +        Map<String, Object> zkHB = ClientStatsUtil.mkZkWorkerHb(
    +                workerState.topologyId, stats, workerState.uptime.upTime()
    +        );
    +
             try {
    +            String assignmentId = workerState.assignmentId;
    +            if (this.numaId != null) {
    +                assignmentId += Constants.NUMA_ID_SEPARATOR + this.numaId;
    --- End diff --
    
    What would happen if we didn't add in the numaId to the heartbeat?  This is because we are not going to be able to guarantee during an upgrade that the worker will be up to date with the latest code on nimbus.


---

[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

Posted by Ethanlm <gi...@git.apache.org>.
Github user Ethanlm commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2881#discussion_r228266730
  
    --- Diff: storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SynchronizeAssignments.java ---
    @@ -12,16 +12,24 @@
     
     package org.apache.storm.daemon.supervisor.timer;
     
    +import java.util.ArrayList;
    +import java.util.Collections;
     import java.util.HashMap;
    +import java.util.List;
     import java.util.Map;
    +
    --- End diff --
    
    ```suggestion
    
    ```


---

[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

Posted by Ethanlm <gi...@git.apache.org>.
Github user Ethanlm commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2881#discussion_r229876743
  
    --- Diff: storm-client/src/jvm/org/apache/storm/validation/ConfigValidation.java ---
    @@ -414,6 +418,34 @@ public void validateInteger(String name, Object o) {
             }
         }
     
    +    public static class NumaEntryValidator extends Validator {
    +
    +        @Override
    +        public void validateField(String name, Object o) {
    +            if (o == null) {
    +                return;
    +            }
    +            Map numa = (Map<String, Object>) o;
    +            for (String key : new String[]{NUMA_CORES, NUMA_MEMORY_IN_MB, NUMA_PORTS}) {
    +                if (!numa.containsKey(key)) {
    +                    throw new IllegalArgumentException(
    +                            "The numa configuration key [" + key + "] is missing!"
    +                    );
    +                }
    +            }
    +
    +            List<Integer> cores = (List<Integer>) numa.get(NUMA_CORES);
    +            Set coreSet = new HashSet();
    +            coreSet.addAll(cores);
    +            if (coreSet.size() != cores.size()) {
    +                throw new IllegalArgumentException(
    +                        "No duplicate cores in NUMA config"
    --- End diff --
    
    Better to be "duplicate cores in NUMA config"?


---

[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

Posted by Ethanlm <gi...@git.apache.org>.
Github user Ethanlm commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2881#discussion_r228264081
  
    --- Diff: storm-server/src/main/java/org/apache/storm/blobstore/BlobStoreUtils.java ---
    @@ -77,6 +91,15 @@ public static BlobKeySequenceInfo normalizeNimbusHostPortSequenceNumberInfo(Stri
         }
     
         // Check for latest sequence number of a key inside zookeeper and return nimbodes containing the latest sequence number
    --- End diff --
    
    ```suggestion
    
    ```


---

[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

Posted by govind-menon <gi...@git.apache.org>.
Github user govind-menon commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2881#discussion_r226082482
  
    --- Diff: storm-client/src/jvm/org/apache/storm/utils/Utils.java ---
    @@ -121,11 +121,68 @@
         private static String memoizedLocalHostnameString = null;
         public static final Pattern TOPOLOGY_KEY_PATTERN = Pattern.compile("^[\\w \\t\\._-]+$", Pattern.UNICODE_CHARACTER_CLASS);
     
    +    public static final String NUMA_MEMORY_IN_MB = "MemoryInMB";
    +    public static final String NUMA_CORES = "Cores";
    +    public static final String NUMAS_PORTS = "Ports";
    +    public static final String NUMA_ID = "Id";
    +    public static final String NUMAS_BASE = "Numas";
    +
         static {
             localConf = readStormConfig();
             serializationDelegate = getSerializationDelegate(localConf);
         }
     
    +    /**
    +     * Validate supervisor numa configuration.
    +     * @param stormConf stormConf
    +     * @return getValidatedNumaMap
    +     * @throws KeyNotFoundException
    +     */
    +    public static Map<String, Object> getValidatedNumaMap(Map<String, Object> stormConf) throws KeyNotFoundException {
    +        Map<String, Object> validatedNumaMap = new HashMap();
    +        Map<String, Object> supervisorNumaMap = (Map<String, Object>) stormConf.get(Config.SUPERVISOR_NUMA_META);
    +        if (supervisorNumaMap == null) return validatedNumaMap;
    +        if (!supervisorNumaMap.containsKey(NUMAS_BASE)) {
    +            throw new KeyNotFoundException("The numa configurations [" + NUMAS_BASE + "] is missing!");
    +        }
    +        List<Map> numaEntries = (List<Map>) supervisorNumaMap.get(NUMAS_BASE);
    +        if (numaEntries == null) return validatedNumaMap;
    +        for (Map numa : numaEntries) {
    +            for (String key : new String[]{NUMA_ID, NUMA_CORES, NUMA_MEMORY_IN_MB, NUMAS_PORTS}) {
    +                if (!numa.containsKey(key)) {
    +                    throw new KeyNotFoundException("The numa configuration key [" + key + "] is missing!");
    +                }
    +            }
    +            validatedNumaMap.put((String) numa.get(NUMA_ID), numa);
    +        }
    +        return validatedNumaMap;
    +    }
    +
    +    /**
    +     * getNumaIdForPort.
    +     * @param port port
    +     * @param stormConf stormConf
    +     * @return getNumaIdForPort
    +     * @throws KeyNotFoundException
    +     */
    +    public static String getNumaIdForPort(Number port, Map<String, Object> stormConf) {
    +        Map validatedNumaMap = null;
    +        try {
    +            validatedNumaMap = getValidatedNumaMap(stormConf);
    +        } catch (KeyNotFoundException e) {
    +            LOG.error("Exception while getting NUMA config", e);
    +            return null;
    +        }
    +        for (Object numaEntry : validatedNumaMap.values()) {
    +            Map numaMap  = (Map<String, Object>) numaEntry;
    +            List<Integer> portList = (List<Integer>) numaMap.get(NUMAS_PORTS);
    +            if (portList.contains(port)) {
    --- End diff --
    
    Yes - you could have a worker that's not part of a NUMA supervisor in which case the numaID should be null and therefore it would be launched without pinning.


---

[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2881#discussion_r226692517
  
    --- Diff: storm-client/src/jvm/org/apache/storm/Config.java ---
    @@ -1041,6 +1041,19 @@
         @isPositiveNumber
         @NotNull
         public static final String SUPERVISOR_WORKER_TIMEOUT_SECS = "supervisor.worker.timeout.secs";
    +
    +    /**
    +     * A map with blobstore keys mapped to each NUMA Node on the supervisor that will be used
    +     * by scheduler. CPUs, memory and ports available on each NUMA node will be provided.
    +     * Each supervisor will have different map of NUMAs.
    +     * Example: "supervisor.numa.meta": { "Numas": [
    +     * {"Id": 0, "MemoryInMB": 122880, "Cores": [ 0, 12, 1, 13, 2, 14, 3, 15, 4, 16, 5, 17],
    +     *  "Ports": [6700, 6701]},
    +     * {"Id": 1, "MemoryInMB": 122880, "Cores": [ 6, 18, 7, 19, 8, 20, 9, 21, 10, 22, 11, 23],
    +     *  "Ports": [6702, 6703]}]}
    +     */
    +    public static final String SUPERVISOR_NUMA_META = "supervisor.numa.meta";
    --- End diff --
    
    Could we do some verification of the config please.  This is a complex enough config that we would want to catch it being wrong early on.
    
    Also as a nit: "Cores", "MemoryInMB", "Ports", and "Id" don't really follow the naming convention that we have used with configs or maps elsewhere in the code.  We tend to use lowercase with '.' or '_' as separators instead of CamelCase, but we are no where near consistent, so it is probably fine.


---

[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

Posted by govind-menon <gi...@git.apache.org>.
Github user govind-menon commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2881#discussion_r229854058
  
    --- Diff: storm-server/src/test/java/org/apache/storm/daemon/supervisor/BasicContainerTest.java ---
    @@ -319,7 +319,15 @@ public void testLaunch() throws Exception {
             superConf.put(Config.STORM_WORKERS_ARTIFACTS_DIR, stormLocal);
             superConf.put(DaemonConfig.STORM_LOG4J2_CONF_DIR, log4jdir);
             superConf.put(Config.WORKER_CHILDOPTS, " -Dtesting=true");
    -
    +        Map numaNode = new HashMap();
    +        numaNode.put(Utils.NUMA_ID, "0");
    +        numaNode.put(Utils.NUMA_CORES, Collections.singletonList("0"));
    +        numaNode.put(Utils.NUMAS_PORTS, Collections.singletonList(8081));
    +        numaNode.put(Utils.NUMA_MEMORY_IN_MB, 2048);
    +        Map numaMap = new HashMap();
    +        numaMap.put(Utils.NUMAS_BASE, Collections.singletonList(numaNode));
    +
    +        superConf.put(Config.SUPERVISOR_NUMA_META, numaMap);
    --- End diff --
    
    No i explicitly want this so that we show that there is no numa pinning when there is no config.


---

[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

Posted by Ethanlm <gi...@git.apache.org>.
Github user Ethanlm commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2881#discussion_r228298411
  
    --- Diff: storm-client/src/jvm/org/apache/storm/utils/Utils.java ---
    @@ -119,15 +120,81 @@
         // tests by subclassing.
         private static Utils _instance = new Utils();
         private static String memoizedLocalHostnameString = null;
    -    public static final Pattern TOPOLOGY_KEY_PATTERN = Pattern.compile("^[\\w \\t\\._-]+$", Pattern.UNICODE_CHARACTER_CLASS);
    +    public static final Pattern TOPOLOGY_KEY_PATTERN =
    +            Pattern.compile("^[\\w \\t\\._-]+$", Pattern.UNICODE_CHARACTER_CLASS);
    +
    +    public static final String NUMA_MEMORY_IN_MB = "memory.mb";
    +    public static final String NUMA_CORES = "cores";
    +    public static final String NUMAS_PORTS = "ports";
    +    public static final String NUMA_ID = "node_id";
    +    public static final String NUMAS_BASE = "numas";
     
         static {
             localConf = readStormConfig();
             serializationDelegate = getSerializationDelegate(localConf);
         }
     
         /**
    -     * Provide an instance of this class for delegates to use.  To mock out delegated methods, provide an instance of a subclass that
    +     * Validate supervisor numa configuration.
    +     * @param stormConf stormConf
    +     * @return getValidatedNumaMap
    +     * @throws KeyNotFoundException
    +     */
    +    public static Map<String, Object> getValidatedNumaMap(Map<String, Object> stormConf) throws KeyNotFoundException {
    +        Map<String, Object> validatedNumaMap = new HashMap();
    +        Map<String, Object> supervisorNumaMap =
    +                (Map<String, Object>) stormConf.get(Config.SUPERVISOR_NUMA_META);
    +        if (supervisorNumaMap == null) {
    +            return validatedNumaMap;
    +        }
    +        if (!supervisorNumaMap.containsKey(NUMAS_BASE)) {
    +            throw new KeyNotFoundException(
    --- End diff --
    
    I think `KeyNotFoundException` is mostly about `blobstore`. And I don't think we should use it here. I think it's more like a `IllegalArgumentException`


---

[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

Posted by Ethanlm <gi...@git.apache.org>.
Github user Ethanlm commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2881#discussion_r229878430
  
    --- Diff: storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java ---
    @@ -607,6 +608,27 @@ protected String javaCmd(String cmd) {
             return ret;
         }
     
    +    /**
    +     * Extracting out to mock it for tests.
    +     * @return true if on Linux.
    +     */
    +    protected boolean isOnLinux() {
    +        return SystemUtils.IS_OS_LINUX;
    +    }
    +
    +    private void prefixNumaPinningIfApplicable(String numaId, List<String> commandList) {
    +        if (numaId != null) {
    +            if (isOnLinux()) {
    +                commandList.add("numactl");
    +                commandList.add("--i=" + numaId);
    +                return;
    +            } else {
    +                // TODO : Add support for pinning on Windows host
    +                throw new RuntimeException("numactl pinning currently not supported on non-Linux hosts");
    --- End diff --
    
    Maybe `UnsupportedOperationException` is a better option. But I am fine with this.


---

[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

Posted by Ethanlm <gi...@git.apache.org>.
Github user Ethanlm commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2881#discussion_r228301282
  
    --- Diff: storm-client/src/jvm/org/apache/storm/utils/Utils.java ---
    @@ -327,10 +396,12 @@ public static boolean isSystemId(String id) {
         }
     
         /**
    -     * Creates a thread that calls the given code repeatedly, sleeping for an interval of seconds equal to the return value of the previous
    +     * Creates a thread that calls the given code repeatedly, sleeping for an interval of seconds
    +     * equal to the return value of the previous
          * call.
    --- End diff --
    
    nit: put "call" in the same line with other words?


---

[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2881#discussion_r227974287
  
    --- Diff: storm-client/src/jvm/org/apache/storm/utils/Utils.java ---
    @@ -119,15 +120,81 @@
         // tests by subclassing.
         private static Utils _instance = new Utils();
         private static String memoizedLocalHostnameString = null;
    -    public static final Pattern TOPOLOGY_KEY_PATTERN = Pattern.compile("^[\\w \\t\\._-]+$", Pattern.UNICODE_CHARACTER_CLASS);
    +    public static final Pattern TOPOLOGY_KEY_PATTERN =
    +            Pattern.compile("^[\\w \\t\\._-]+$", Pattern.UNICODE_CHARACTER_CLASS);
    +
    +    public static final String NUMA_MEMORY_IN_MB = "memory.mb";
    +    public static final String NUMA_CORES = "cores";
    +    public static final String NUMAS_PORTS = "ports";
    +    public static final String NUMA_ID = "node_id";
    +    public static final String NUMAS_BASE = "numas";
     
         static {
             localConf = readStormConfig();
             serializationDelegate = getSerializationDelegate(localConf);
         }
     
         /**
    -     * Provide an instance of this class for delegates to use.  To mock out delegated methods, provide an instance of a subclass that
    +     * Validate supervisor numa configuration.
    +     * @param stormConf stormConf
    +     * @return getValidatedNumaMap
    +     * @throws KeyNotFoundException
    +     */
    +    public static Map<String, Object> getValidatedNumaMap(Map<String, Object> stormConf) throws KeyNotFoundException {
    +        Map<String, Object> validatedNumaMap = new HashMap();
    +        Map<String, Object> supervisorNumaMap =
    +                (Map<String, Object>) stormConf.get(Config.SUPERVISOR_NUMA_META);
    +        if (supervisorNumaMap == null) {
    +            return validatedNumaMap;
    +        }
    +        if (!supervisorNumaMap.containsKey(NUMAS_BASE)) {
    +            throw new KeyNotFoundException(
    +                    "The numa configurations [" + NUMAS_BASE + "] is missing!"
    +            );
    +        }
    +        List<Map> numaEntries = (List<Map>) supervisorNumaMap.get(NUMAS_BASE);
    +        if (numaEntries == null) return validatedNumaMap;
    +        for (Map numa : numaEntries) {
    +            for (String key : new String[]{NUMA_ID, NUMA_CORES, NUMA_MEMORY_IN_MB, NUMAS_PORTS}) {
    +                if (!numa.containsKey(key)) {
    +                    throw new KeyNotFoundException(
    +                            "The numa configuration key [" + key + "] is missing!"
    +                    );
    +                }
    +            }
    +            validatedNumaMap.put((String) numa.get(NUMA_ID), numa);
    +        }
    +        return validatedNumaMap;
    +    }
    --- End diff --
    
    Should this be a custom map validator as with [`nimbus.impersonation.acl`](https://github.com/apache/storm/blob/730c1a302d857121542ce27b6a40b05b90f7f3ed/storm-client/src/jvm/org/apache/storm/Config.java#L1414-L1416)?


---

[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

Posted by Ethanlm <gi...@git.apache.org>.
Github user Ethanlm commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2881#discussion_r228319880
  
    --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceOffer.java ---
    @@ -81,6 +81,20 @@ public void add(NormalizedResourcesWithMemory other) {
             totalMemoryMb += other.getTotalMemoryMb();
         }
     
    +    /**
    +     * Remove the resources in other from this.
    +     * @param other the resources to be removed.
    +     * @return true if one or more resources in other were larger than available resources in this, else false.
    +     */
    +    public boolean remove(NormalizedResourcesWithMemory other) {
    --- End diff --
    
    Maybe change `public boolean remove(NormalizedResourcesWithMemory other, ResourceMetrics resourceMetrics)` and then just call `remove(other, null)`


---

[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

Posted by govind-menon <gi...@git.apache.org>.
Github user govind-menon commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2881#discussion_r229853904
  
    --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceOffer.java ---
    @@ -81,6 +81,20 @@ public void add(NormalizedResourcesWithMemory other) {
             totalMemoryMb += other.getTotalMemoryMb();
         }
     
    +    /**
    +     * Remove the resources in other from this.
    +     * @param other the resources to be removed.
    +     * @return true if one or more resources in other were larger than available resources in this, else false.
    +     */
    +    public boolean remove(NormalizedResourcesWithMemory other) {
    --- End diff --
    
    I considered that - but my approach seems cleaner so that in the place where it's called you don't have to wonder about the null.


---

[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

Posted by agresch <gi...@git.apache.org>.
Github user agresch commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2881#discussion_r226060529
  
    --- Diff: storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java ---
    @@ -630,7 +632,7 @@ public boolean areAllConnectionsReady() {
             Map<List<Long>, NodeInfo> executorToNodePort = getLocalAssignment(conf, stormClusterState, topologyId).get_executor_node_port();
             for (Map.Entry<List<Long>, NodeInfo> entry : executorToNodePort.entrySet()) {
                 NodeInfo nodeInfo = entry.getValue();
    -            if (nodeInfo.get_node().equals(assignmentId) && nodeInfo.get_port().iterator().next() == port) {
    +            if (nodeInfo.get_node().startsWith(assignmentId) && nodeInfo.get_port().iterator().next() == port) {
    --- End diff --
    
    what will assignmentId be?  Why does this change?


---

[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

Posted by Ethanlm <gi...@git.apache.org>.
Github user Ethanlm commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2881#discussion_r229875813
  
    --- Diff: storm-client/src/jvm/org/apache/storm/Config.java ---
    @@ -1041,6 +1041,24 @@
         @isPositiveNumber
         @NotNull
         public static final String SUPERVISOR_WORKER_TIMEOUT_SECS = "supervisor.worker.timeout.secs";
    +
    +    /**
    +     * A map with keys mapped to each NUMA Node on the supervisor that will be used
    +     * by scheduler. CPUs, memory and ports available on each NUMA node will be provided.
    +     * Each supervisor will have different map of NUMAs.
    +     * Example: "supervisor.numa.meta": {
    +     *  "0": { "memory.mb": 122880, "cores": [ 0, 12, 1, 13, 2, 14, 3, 15, 4, 16, 5, 17],
    +     *      "ports": [6700, 6701]},
    +     *  "1" : {"memory.mb": 122880, "cores": [ 6, 18, 7, 19, 8, 20, 9, 21, 10, 22, 11, 23],
    --- End diff --
    
    "ports" is numa.ports" now since ` "NUMA_PORTS = "numa.ports"`


---

[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

Posted by agresch <gi...@git.apache.org>.
Github user agresch commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2881#discussion_r228287568
  
    --- Diff: storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java ---
    @@ -622,22 +624,26 @@ public boolean areAllConnectionsReady() {
             return this.autoCredentials;
         }
     
    -    private List<List<Long>> readWorkerExecutors(IStormClusterState stormClusterState, String topologyId, String assignmentId,
    +    private List<List<Long>> readWorkerExecutors(IStormClusterState stormClusterState,
    +                                                 String topologyId, String assignmentId,
                                                      int port) {
             LOG.info("Reading assignments");
             List<List<Long>> executorsAssignedToThisWorker = new ArrayList<>();
             executorsAssignedToThisWorker.add(Constants.SYSTEM_EXECUTOR_ID);
    -        Map<List<Long>, NodeInfo> executorToNodePort = getLocalAssignment(conf, stormClusterState, topologyId).get_executor_node_port();
    +        Map<List<Long>, NodeInfo> executorToNodePort =
    +                getLocalAssignment(conf, stormClusterState, topologyId).get_executor_node_port();
             for (Map.Entry<List<Long>, NodeInfo> entry : executorToNodePort.entrySet()) {
                 NodeInfo nodeInfo = entry.getValue();
    -            if (nodeInfo.get_node().equals(assignmentId) && nodeInfo.get_port().iterator().next() == port) {
    +            if (nodeInfo.get_node().startsWith(assignmentId) && nodeInfo.get_port().iterator().next() == port) {
    --- End diff --
    
    Started a new / non-resolved conversation.....
    
    Can we have assignment id = supervisor1 and another assignment id = supervisor11 that would cause aliasing?
    
    Instead of startsWith, can we use some known delimiter to grab up to and make sure that matches exactly?


---

[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

Posted by Ethanlm <gi...@git.apache.org>.
Github user Ethanlm commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2881#discussion_r228227560
  
    --- Diff: storm-client/src/jvm/org/apache/storm/Config.java ---
    @@ -1041,6 +1041,19 @@
         @isPositiveNumber
         @NotNull
         public static final String SUPERVISOR_WORKER_TIMEOUT_SECS = "supervisor.worker.timeout.secs";
    +
    +    /**
    +     * A map with blobstore keys mapped to each NUMA Node on the supervisor that will be used
    --- End diff --
    
    What's `blobstore keys` here


---

[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

Posted by Ethanlm <gi...@git.apache.org>.
Github user Ethanlm commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2881#discussion_r228321712
  
    --- Diff: storm-server/src/test/java/org/apache/storm/daemon/supervisor/BasicContainerTest.java ---
    @@ -390,6 +398,116 @@ public void testLaunch() throws Exception {
                        "storm.log.dir", stormLogDir);
         }
     
    +    @Test
    +    public void testNumaPinnedLaunch() throws Exception {
    +        final String topoId = "test_topology_current";
    --- End diff --
    
    it doesn't matter but it seems to better have `topoId = "test_topology_numa_pinned";`


---

[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

Posted by Ethanlm <gi...@git.apache.org>.
Github user Ethanlm commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2881#discussion_r228265639
  
    --- Diff: storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java ---
    @@ -607,6 +608,27 @@ protected String javaCmd(String cmd) {
             return ret;
         }
     
    +    /**
    +     * Extracting out to mock it for tests.
    +     * @return true if on Linux.
    +     */
    +    protected boolean isOnLinux() {
    +        return SystemUtils.IS_OS_LINUX;
    +    }
    +
    +    private void addNumaPinningIfApplicable(String numaId, List<String> commandList) {
    --- End diff --
    
    It can be confusing since we are actually adding `numactl` as a prefix. This function looks like we are adding `numactl` to the end of the `commandList`. Although when we use it, `commandList` is empty.


---

[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

Posted by govind-menon <gi...@git.apache.org>.
Github user govind-menon commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2881#discussion_r229883970
  
    --- Diff: storm-client/src/jvm/org/apache/storm/validation/ConfigValidation.java ---
    @@ -414,6 +418,34 @@ public void validateInteger(String name, Object o) {
             }
         }
     
    +    public static class NumaEntryValidator extends Validator {
    +
    +        @Override
    +        public void validateField(String name, Object o) {
    +            if (o == null) {
    +                return;
    +            }
    +            Map numa = (Map<String, Object>) o;
    +            for (String key : new String[]{NUMA_CORES, NUMA_MEMORY_IN_MB, NUMA_PORTS}) {
    +                if (!numa.containsKey(key)) {
    +                    throw new IllegalArgumentException(
    +                            "The numa configuration key [" + key + "] is missing!"
    +                    );
    +                }
    +            }
    +
    +            List<Integer> cores = (List<Integer>) numa.get(NUMA_CORES);
    +            Set coreSet = new HashSet();
    +            coreSet.addAll(cores);
    +            if (coreSet.size() != cores.size()) {
    --- End diff --
    
    We don't - the only reason I'm even keeping the core list as opposed to number of cores is if in the future users wanted even more fine grained control of core pinning. In that case duplicates would mean they wanted the core to be used by multiple node


---

[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2881#discussion_r227972886
  
    --- Diff: storm-client/src/jvm/org/apache/storm/utils/Utils.java ---
    @@ -119,15 +120,81 @@
         // tests by subclassing.
         private static Utils _instance = new Utils();
         private static String memoizedLocalHostnameString = null;
    -    public static final Pattern TOPOLOGY_KEY_PATTERN = Pattern.compile("^[\\w \\t\\._-]+$", Pattern.UNICODE_CHARACTER_CLASS);
    +    public static final Pattern TOPOLOGY_KEY_PATTERN =
    +            Pattern.compile("^[\\w \\t\\._-]+$", Pattern.UNICODE_CHARACTER_CLASS);
    +
    +    public static final String NUMA_MEMORY_IN_MB = "memory.mb";
    +    public static final String NUMA_CORES = "cores";
    +    public static final String NUMAS_PORTS = "ports";
    +    public static final String NUMA_ID = "node_id";
    +    public static final String NUMAS_BASE = "numas";
    --- End diff --
    
    Let's make these names consistent. ("numa" vs. "numas").
    
    Why don't the values here contain "numa"? I'd expect `NUMA_CORES = "numa.cores"` or something like it.


---

[GitHub] storm issue #2881: STORM-3259: NUMA Support for Storm

Posted by govind-menon <gi...@git.apache.org>.
Github user govind-menon commented on the issue:

    https://github.com/apache/storm/pull/2881
  
    Includes work by @kishorvpatil 


---

[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

Posted by agresch <gi...@git.apache.org>.
Github user agresch commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2881#discussion_r226065664
  
    --- Diff: storm-client/src/jvm/org/apache/storm/utils/Utils.java ---
    @@ -121,11 +121,68 @@
         private static String memoizedLocalHostnameString = null;
         public static final Pattern TOPOLOGY_KEY_PATTERN = Pattern.compile("^[\\w \\t\\._-]+$", Pattern.UNICODE_CHARACTER_CLASS);
     
    +    public static final String NUMA_MEMORY_IN_MB = "MemoryInMB";
    +    public static final String NUMA_CORES = "Cores";
    +    public static final String NUMAS_PORTS = "Ports";
    +    public static final String NUMA_ID = "Id";
    +    public static final String NUMAS_BASE = "Numas";
    +
         static {
             localConf = readStormConfig();
             serializationDelegate = getSerializationDelegate(localConf);
         }
     
    +    /**
    +     * Validate supervisor numa configuration.
    +     * @param stormConf stormConf
    +     * @return getValidatedNumaMap
    +     * @throws KeyNotFoundException
    +     */
    +    public static Map<String, Object> getValidatedNumaMap(Map<String, Object> stormConf) throws KeyNotFoundException {
    +        Map<String, Object> validatedNumaMap = new HashMap();
    +        Map<String, Object> supervisorNumaMap = (Map<String, Object>) stormConf.get(Config.SUPERVISOR_NUMA_META);
    +        if (supervisorNumaMap == null) return validatedNumaMap;
    +        if (!supervisorNumaMap.containsKey(NUMAS_BASE)) {
    +            throw new KeyNotFoundException("The numa configurations [" + NUMAS_BASE + "] is missing!");
    +        }
    +        List<Map> numaEntries = (List<Map>) supervisorNumaMap.get(NUMAS_BASE);
    +        if (numaEntries == null) return validatedNumaMap;
    +        for (Map numa : numaEntries) {
    +            for (String key : new String[]{NUMA_ID, NUMA_CORES, NUMA_MEMORY_IN_MB, NUMAS_PORTS}) {
    +                if (!numa.containsKey(key)) {
    +                    throw new KeyNotFoundException("The numa configuration key [" + key + "] is missing!");
    +                }
    +            }
    +            validatedNumaMap.put((String) numa.get(NUMA_ID), numa);
    +        }
    +        return validatedNumaMap;
    +    }
    +
    +    /**
    +     * getNumaIdForPort.
    +     * @param port port
    +     * @param stormConf stormConf
    +     * @return getNumaIdForPort
    +     * @throws KeyNotFoundException
    +     */
    +    public static String getNumaIdForPort(Number port, Map<String, Object> stormConf) {
    +        Map validatedNumaMap = null;
    +        try {
    +            validatedNumaMap = getValidatedNumaMap(stormConf);
    +        } catch (KeyNotFoundException e) {
    +            LOG.error("Exception while getting NUMA config", e);
    +            return null;
    +        }
    +        for (Object numaEntry : validatedNumaMap.values()) {
    +            Map numaMap  = (Map<String, Object>) numaEntry;
    +            List<Integer> portList = (List<Integer>) numaMap.get(NUMAS_PORTS);
    +            if (portList.contains(port)) {
    --- End diff --
    
    does it make sense to have a NUMA map and not be able to find a port?  Should this be an error?


---

[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

Posted by Ethanlm <gi...@git.apache.org>.
Github user Ethanlm commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2881#discussion_r229879326
  
    --- Diff: storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java ---
    @@ -607,6 +608,27 @@ protected String javaCmd(String cmd) {
             return ret;
         }
     
    +    /**
    +     * Extracting out to mock it for tests.
    +     * @return true if on Linux.
    +     */
    +    protected boolean isOnLinux() {
    +        return SystemUtils.IS_OS_LINUX;
    +    }
    +
    +    private void prefixNumaPinningIfApplicable(String numaId, List<String> commandList) {
    +        if (numaId != null) {
    +            if (isOnLinux()) {
    +                commandList.add("numactl");
    +                commandList.add("--i=" + numaId);
    --- End diff --
    
    sorry to keep beating you on this :D  How about:
    ```
    commandList.add(0, "numactl");
    commandList.add(1, "--i=" + numaId);
    ```
    since we want to add prefix to `commandList` no matter it's empty or not


---

[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

Posted by Ethanlm <gi...@git.apache.org>.
Github user Ethanlm commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2881#discussion_r228298658
  
    --- Diff: storm-client/src/jvm/org/apache/storm/utils/Utils.java ---
    @@ -119,15 +120,81 @@
         // tests by subclassing.
         private static Utils _instance = new Utils();
         private static String memoizedLocalHostnameString = null;
    -    public static final Pattern TOPOLOGY_KEY_PATTERN = Pattern.compile("^[\\w \\t\\._-]+$", Pattern.UNICODE_CHARACTER_CLASS);
    +    public static final Pattern TOPOLOGY_KEY_PATTERN =
    +            Pattern.compile("^[\\w \\t\\._-]+$", Pattern.UNICODE_CHARACTER_CLASS);
    +
    +    public static final String NUMA_MEMORY_IN_MB = "memory.mb";
    +    public static final String NUMA_CORES = "cores";
    +    public static final String NUMAS_PORTS = "ports";
    +    public static final String NUMA_ID = "node_id";
    +    public static final String NUMAS_BASE = "numas";
     
         static {
             localConf = readStormConfig();
             serializationDelegate = getSerializationDelegate(localConf);
         }
     
         /**
    -     * Provide an instance of this class for delegates to use.  To mock out delegated methods, provide an instance of a subclass that
    +     * Validate supervisor numa configuration.
    +     * @param stormConf stormConf
    +     * @return getValidatedNumaMap
    +     * @throws KeyNotFoundException
    +     */
    +    public static Map<String, Object> getValidatedNumaMap(Map<String, Object> stormConf) throws KeyNotFoundException {
    +        Map<String, Object> validatedNumaMap = new HashMap();
    +        Map<String, Object> supervisorNumaMap =
    +                (Map<String, Object>) stormConf.get(Config.SUPERVISOR_NUMA_META);
    +        if (supervisorNumaMap == null) {
    +            return validatedNumaMap;
    +        }
    +        if (!supervisorNumaMap.containsKey(NUMAS_BASE)) {
    +            throw new KeyNotFoundException(
    +                    "The numa configurations [" + NUMAS_BASE + "] is missing!"
    +            );
    +        }
    +        List<Map> numaEntries = (List<Map>) supervisorNumaMap.get(NUMAS_BASE);
    +        if (numaEntries == null) return validatedNumaMap;
    +        for (Map numa : numaEntries) {
    +            for (String key : new String[]{NUMA_ID, NUMA_CORES, NUMA_MEMORY_IN_MB, NUMAS_PORTS}) {
    +                if (!numa.containsKey(key)) {
    +                    throw new KeyNotFoundException(
    --- End diff --
    
    Same here. I think `IllegalArgumentException` is better


---

[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

Posted by govind-menon <gi...@git.apache.org>.
Github user govind-menon closed the pull request at:

    https://github.com/apache/storm/pull/2881


---