You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by et...@apache.org on 2020/06/10 20:35:55 UTC

[storm] branch master updated: [STORM-3271] Refactor ResourceIsolationInterface in preparation for container support

This is an automated email from the ASF dual-hosted git repository.

ethanli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git


The following commit(s) were added to refs/heads/master by this push:
     new 14b762a  [STORM-3271] Refactor ResourceIsolationInterface in preparation for container support
     new 879087e  Merge pull request #3284 from Ethanlm/STORM-3271
14b762a is described below

commit 14b762ac5190939af038612c1160936b45cefcdf
Author: Ethan Li <et...@gmail.com>
AuthorDate: Fri Jun 5 14:21:05 2020 -0500

    [STORM-3271] Refactor ResourceIsolationInterface in preparation for container support
---
 docs/cgroups_in_storm.md                           |   3 +-
 .../daemon/supervisor/ClientSupervisorUtils.java   |   2 +-
 .../jvm/org/apache/storm/utils/ConfigUtils.java    |   8 +
 .../container/DefaultResourceIsolationManager.java | 187 ++++++++
 .../container/ResourceIsolationInterface.java      | 105 +++--
 .../storm/container/cgroup/CgroupManager.java      | 101 +++--
 .../storm/daemon/supervisor/BasicContainer.java    |  63 +--
 .../apache/storm/daemon/supervisor/Container.java  | 503 +--------------------
 .../storm/daemon/supervisor/ContainerLauncher.java |  21 +-
 .../daemon/supervisor/RunAsUserContainer.java      | 120 -----
 .../supervisor/RunAsUserContainerLauncher.java     |  65 ---
 .../storm/localizer/LocallyCachedTopologyBlob.java |   4 +-
 .../java/org/apache/storm/utils/ServerUtils.java   | 449 ++++++++++++++++++
 .../daemon/supervisor/BasicContainerTest.java      | 106 ++---
 .../storm/daemon/supervisor/ContainerTest.java     | 197 ++++----
 .../org/apache/storm/utils/ServerUtilsTest.java    |  93 ++++
 16 files changed, 1053 insertions(+), 974 deletions(-)

diff --git a/docs/cgroups_in_storm.md b/docs/cgroups_in_storm.md
index 0cc7b22..146ba6b 100644
--- a/docs/cgroups_in_storm.md
+++ b/docs/cgroups_in_storm.md
@@ -65,7 +65,8 @@ If "run as user" is enabled so that the supervisor spawns other processes as the
 
 | Setting                       | Function                                                                                                                                                                                                                                                                                                                                                                                                                                                                   [...]
 |-------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- [...]
-| storm.resource.isolation.plugin.enable  | This config is used to set whether or not cgroups will be used.  Set "true" to enable use of cgroups.  Set "false" to not use cgroups. When this config is set to false, unit tests related to cgroups will be skipped. Default set to "false"                                                                                                                                                                                                                   [...]
+| storm.resource.isolation.plugin.enable  | This config is used to set whether a resource isolation plugin will be used. Default set to "false". When this config is set to false, unit tests related to cgroups will be skipped.                                                                                                                                                                                                                                                              |
+| storm.resource.isolation.plugin| Select a resource isolation plugin to use when `storm.resource.isolation.plugin.enable` is set to true. Default to "org.apache.storm.container.cgroup.CgroupManager" |
 | storm.cgroup.hierarchy.dir   | The path to the cgroup hierarchy that storm will use.  Default set to "/cgroup/storm_resources"                                                                                                                                                                                                                                                                                                                                                                             [...]
 | storm.cgroup.resources       | A list of subsystems that will be regulated by CGroups. Default set to cpu and memory.  Currently only cpu and memory are supported                                                                                                                                                                                                                                                                                                                                         [...]
 | storm.supervisor.cgroup.rootdir     | The root cgroup used by the supervisor.  The path to the cgroup will be \<storm.cgroup.hierarchy.dir>/\<storm.supervisor.cgroup.rootdir>.  Default set to "storm"                                                                                                                                                                                                                                                                                                    [...]
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/supervisor/ClientSupervisorUtils.java b/storm-client/src/jvm/org/apache/storm/daemon/supervisor/ClientSupervisorUtils.java
index 6bf6752..db93fa3 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/supervisor/ClientSupervisorUtils.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/supervisor/ClientSupervisorUtils.java
@@ -77,7 +77,7 @@ public class ClientSupervisorUtils {
         return ret;
     }
 
-    static Process processLauncher(Map<String, Object> conf, String user, List<String> commandPrefix, List<String> args,
+    public static Process processLauncher(Map<String, Object> conf, String user, List<String> commandPrefix, List<String> args,
                                    Map<String, String> environment, final String logPreFix,
                                    final ExitCodeCallback exitCodeCallback, File dir) throws IOException {
         if (StringUtils.isBlank(user)) {
diff --git a/storm-client/src/jvm/org/apache/storm/utils/ConfigUtils.java b/storm-client/src/jvm/org/apache/storm/utils/ConfigUtils.java
index 745be60..fe6d124 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/ConfigUtils.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/ConfigUtils.java
@@ -224,6 +224,10 @@ public class ConfigUtils {
         return instance.supervisorStormDistRootImpl(conf, stormId);
     }
 
+    public static String sharedByTopologyDir(Map<String, Object> conf, String stormId) throws IOException {
+        return supervisorStormDistRoot(conf, stormId) + FILE_SEPARATOR + "shared_by_topology";
+    }
+
     public static String supervisorStormJarPath(String stormRoot) {
         return (concatIfNotNull(stormRoot) + FILE_SEPARATOR + "stormjar.jar");
     }
@@ -277,6 +281,10 @@ public class ConfigUtils {
         return (workerRoot(conf) + FILE_SEPARATOR + id);
     }
 
+    public static String workerArtifactsSymlink(Map<String, Object> conf, String id) {
+        return workerRoot(conf, id) + FILE_SEPARATOR + "artifacts";
+    }
+
     public static String workerPidsRoot(Map<String, Object> conf, String id) {
         return (workerRoot(conf, id) + FILE_SEPARATOR + "pids");
     }
diff --git a/storm-server/src/main/java/org/apache/storm/container/DefaultResourceIsolationManager.java b/storm-server/src/main/java/org/apache/storm/container/DefaultResourceIsolationManager.java
new file mode 100644
index 0000000..f0eb3a5
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/container/DefaultResourceIsolationManager.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.container;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.storm.Config;
+import org.apache.storm.daemon.supervisor.ClientSupervisorUtils;
+import org.apache.storm.daemon.supervisor.ExitCodeCallback;
+import org.apache.storm.daemon.supervisor.SupervisorUtils;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.ServerUtils;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is the default class to manage worker processes, including launching, killing, profiling and etc.
+ */
+public class DefaultResourceIsolationManager implements ResourceIsolationInterface {
+    private static final Logger LOG = LoggerFactory.getLogger(DefaultResourceIsolationManager.class);
+    protected Map<String, Object> conf;
+    protected boolean runAsUser;
+
+    @Override
+    public void prepare(Map<String, Object> conf) throws IOException {
+        this.conf = conf;
+        runAsUser = ObjectReader.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false);
+    }
+
+    @Override
+    public void reserveResourcesForWorker(String workerId, Integer workerMemory, Integer workerCpu, String numaId) {
+        //NO OP
+    }
+
+    @Override
+    public void releaseResourcesForWorker(String workerId) {
+        //NO OP
+    }
+
+    @Override
+    public void cleanup(String user, String workerId, int port) throws IOException {
+        //NO OP
+    }
+
+    @Override
+    public void launchWorkerProcess(String user, String topologyId,  Map<String, Object> topoConf,
+                                    int port, String workerId, List<String> command, Map<String, String> env,
+                                    String logPrefix, ExitCodeCallback processExitCallback, File targetDir) throws IOException {
+        if (runAsUser) {
+            String workerDir = targetDir.getAbsolutePath();
+            List<String> args = Arrays.asList("worker", workerDir, ServerUtils.writeScript(workerDir, command, env));
+            ClientSupervisorUtils.processLauncher(
+                conf, user, null, args, null, logPrefix,
+                processExitCallback, targetDir
+            );
+        } else {
+            ClientSupervisorUtils.launchProcess(command, env, logPrefix, processExitCallback, targetDir);
+        }
+    }
+
+    @Override
+    public long getMemoryUsage(String user, String workerId, int port) throws IOException {
+        return 0;
+    }
+
+    @Override
+    public long getSystemFreeMemoryMb() throws IOException {
+        return 0;
+    }
+
+    @Override
+    public void kill(String user, String workerId) throws IOException {
+        Set<Long> pids = getAllPids(workerId);
+        for (Long pid : pids) {
+            kill(pid, user);
+        }
+    }
+
+    /**
+     * Kill a given process.
+     * @param pid the id of the process to kill
+     * @throws IOException on I/O exception
+     */
+    private void kill(long pid, String user) throws IOException {
+        if (runAsUser) {
+            signal(pid, 15, user);
+        } else {
+            ServerUtils.killProcessWithSigTerm(String.valueOf(pid));
+        }
+    }
+
+    @Override
+    public void forceKill(String user, String workerId) throws IOException {
+        Set<Long> pids = getAllPids(workerId);
+        for (Long pid : pids) {
+            forceKill(pid, user);
+        }
+    }
+
+    /**
+     * Kill a given process forcefully.
+     * @param pid the id of the process to kill
+     * @throws IOException on I/O exception
+     */
+    private void forceKill(long pid, String user) throws IOException {
+        if (runAsUser) {
+            signal(pid, 9, user);
+        } else {
+            ServerUtils.forceKillProcess(String.valueOf(pid));
+        }
+    }
+
+    /**
+     * Get all the pids that are a part of the container.
+     * @return all of the pids that are a part of this container
+     */
+    protected Set<Long> getAllPids(String workerId) throws IOException {
+        Set<Long> ret = new HashSet<>();
+        for (String listing : ConfigUtils.readDirContents(ConfigUtils.workerPidsRoot(conf, workerId))) {
+            ret.add(Long.valueOf(listing));
+        }
+        return ret;
+    }
+
+    private void signal(long pid, int signal, String user) throws IOException {
+        List<String> commands = Arrays.asList("signal", String.valueOf(pid), String.valueOf(signal));
+        String logPrefix = "kill -" + signal + " " + pid;
+        ClientSupervisorUtils.processLauncherAndWait(conf, user, commands, null, logPrefix);
+    }
+
+    @Override
+    public boolean areAllProcessesDead(String user, String workerId) throws IOException {
+        Set<Long> pids = getAllPids(workerId);
+        return ServerUtils.areAllProcessesDead(conf, user, workerId, pids);
+    }
+
+    @Override
+    public boolean runProfilingCommand(String user, String workerId, List<String> command, Map<String, String> env,
+                                       String logPrefix, File targetDir) throws IOException, InterruptedException {
+        if (runAsUser) {
+            String td = targetDir.getAbsolutePath();
+            LOG.info("Running as user: {} command: {}", user, command);
+            String containerFile = ServerUtils.containerFilePath(td);
+            if (Utils.checkFileExists(containerFile)) {
+                SupervisorUtils.rmrAsUser(conf, containerFile, containerFile);
+            }
+            String scriptFile = ServerUtils.scriptFilePath(td);
+            if (Utils.checkFileExists(scriptFile)) {
+                SupervisorUtils.rmrAsUser(conf, scriptFile, scriptFile);
+            }
+            String script = ServerUtils.writeScript(td, command, env);
+            List<String> args = Arrays.asList("profiler", td, script);
+            int ret = ClientSupervisorUtils.processLauncherAndWait(conf, user, args, env, logPrefix);
+            return ret == 0;
+        } else {
+            Process p = ClientSupervisorUtils.launchProcess(command, env, logPrefix, null, targetDir);
+            int ret = p.waitFor();
+            return ret == 0;
+        }
+    }
+
+    /**
+     * This class doesn't really manage resources.
+     * @return false
+     */
+    @Override
+    public boolean isResourceManaged() {
+        return false;
+    }
+}
diff --git a/storm-server/src/main/java/org/apache/storm/container/ResourceIsolationInterface.java b/storm-server/src/main/java/org/apache/storm/container/ResourceIsolationInterface.java
index ae30c29..77fcd8d 100644
--- a/storm-server/src/main/java/org/apache/storm/container/ResourceIsolationInterface.java
+++ b/storm-server/src/main/java/org/apache/storm/container/ResourceIsolationInterface.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
  * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
@@ -12,10 +12,12 @@
 
 package org.apache.storm.container;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import org.apache.storm.daemon.supervisor.ExitCodeCallback;
 
 /**
  * A plugin to support resource isolation and limitation within Storm.
@@ -49,47 +51,90 @@ public interface ResourceIsolationInterface {
     void releaseResourcesForWorker(String workerId);
 
     /**
-     * After reserving resources for the worker (i.e. calling reserveResourcesForWorker). This function can be used
-     * to get the modified command line to launch the worker with resource isolation
-     *
-     * @param existingCommand the current command to run that may need to be modified.
-     * @return new commandline with necessary additions to launch worker with resource isolation
+     * After reserving resources for the worker (i.e. calling reserveResourcesForWorker),
+     * this function can be used to launch worker process.
+     * @param user                the user who runs the command as
+     * @param topologyId          the Id of the topology
+     * @param topoConf            the topology configuration
+     * @param port                the port where the worker is on
+     * @param workerId            the Id of the worker
+     * @param command             the command to run
+     * @param env                 the environment to run the command
+     * @param logPrefix           the prefix to include in the logs
+     * @param processExitCallback a callback for when the process exits
+     * @param targetDir           the working directory to run the command in
+     * @throws IOException on I/O exception
      */
-    List<String> getLaunchCommand(String workerId, List<String> existingCommand);
+    void launchWorkerProcess(String user, String topologyId, Map<String, Object> topoConf,
+                             int port, String workerId,
+                             List<String> command, Map<String, String> env,
+                             String logPrefix, ExitCodeCallback processExitCallback, File targetDir) throws IOException;
 
     /**
-     * After reserving resources for the worker (i.e. calling reserveResourcesForWorker). this function can be used
-     * to get the launch command prefix
-     *
-     * @param workerId the of the worker
-     * @return the command line prefix for launching a worker with resource isolation
+     * Get the current memory usage of the a given worker.
+     * @param user the user that the worker is running as
+     * @param workerId the id of the worker
+     * @param port the port of the worker
+     * @return the amount of memory the worker is using in bytes or -1 if not supported
+     * @throws IOException on I/O exception
      */
-    List<String> getLaunchCommandPrefix(String workerId);
+    long getMemoryUsage(String user, String workerId, int port) throws IOException;
 
     /**
-     * Get the list of PIDs currently in an isolated container.
-     *
-     * @param workerId the id of the worker to get these for
-     * @return the set of PIDs, this will be combined with other ways of getting PIDs. An Empty set if no PIDs are
-     *     found.
-     * @throws IOException on any error
+     * Get the amount of free memory in MB.
+     * This might not be the entire box, it might be within a parent resource group.
+     * @return The amount of memory in MB that are free on the system.
+     * @throws IOException on I/O exception
      */
-    Set<Long> getRunningPids(String workerId) throws IOException;
+    long getSystemFreeMemoryMb() throws IOException;
 
     /**
-     * Get the current memory usage of the a given worker.
-     *
+     * Kill the given worker.
+     * @param user the user that the worker is running as
+     * @param workerId the id of the worker to kill
+     * @throws IOException on I/O exception
+     */
+    void kill(String user, String workerId) throws IOException;
+
+    /**
+     * Kill the given worker forcefully.
+     * @param user the user that the worker is running as
+     * @param workerId the id of the worker to kill
+     * @throws IOException on I/O exception
+     */
+    void forceKill(String user, String workerId) throws IOException;
+
+    /**
+     * Check if all the processes are dead.
+     * @param user the user that the processes are running as
+     * @param workerId the id of the worker to kill
+     * @return true if all the processed are dead; false otherwise
+     * @throws IOException on I/O exception
+     */
+    boolean areAllProcessesDead(String user, String workerId) throws IOException;
+
+    /**
+     * Run profiling command.
+     * @param user the user that the worker is running as
      * @param workerId the id of the worker
-     * @return the amount of memory the worker is using in bytes or -1 if not supported
-     * @throws IOException on any error.
+     * @param command the command to run
+     * @param env the environment to run the command
+     * @param logPrefix the prefix to include in the logs
+     * @param targetDir the working directory to run the command in
+     * @return true if succeeds; false otherwise
+     * @throws IOException on I/O exception
+     * @throws InterruptedException if interrupted
      */
-    long getMemoryUsage(String workerId) throws IOException;
+    boolean runProfilingCommand(String user, String workerId, List<String> command, Map<String, String> env,
+                                String logPrefix, File targetDir) throws IOException, InterruptedException;
+
+
+    void cleanup(String user, String workerId, int port) throws IOException;
 
     /**
-     * Get the system free memory in MB.
-     * @return The amount of memory in bytes that are free on the system. This might not be the entire box, it might be
-     *     within a parent resource group.
-     * @throws IOException on any error.
+     * Return true if resources are being managed.
+     * The {@link DefaultResourceIsolationManager} will have it return false since it doesn't really manage resources.
+     * @return true if resources are being managed.
      */
-    long getSystemFreeMemoryMb() throws IOException;
+    boolean isResourceManaged();
 }
diff --git a/storm-server/src/main/java/org/apache/storm/container/cgroup/CgroupManager.java b/storm-server/src/main/java/org/apache/storm/container/cgroup/CgroupManager.java
index 6677da7..0a845ae 100644
--- a/storm-server/src/main/java/org/apache/storm/container/cgroup/CgroupManager.java
+++ b/storm-server/src/main/java/org/apache/storm/container/cgroup/CgroupManager.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
  * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
@@ -12,10 +12,9 @@
 
 package org.apache.storm.container.cgroup;
 
-import java.io.BufferedReader;
+import com.google.common.annotations.VisibleForTesting;
 import java.io.File;
 import java.io.FileNotFoundException;
-import java.io.FileReader;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -27,61 +26,33 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 
 import org.apache.commons.lang.SystemUtils;
 import org.apache.storm.Config;
 import org.apache.storm.DaemonConfig;
-import org.apache.storm.container.ResourceIsolationInterface;
+import org.apache.storm.container.DefaultResourceIsolationManager;
 import org.apache.storm.container.cgroup.core.CpuCore;
 import org.apache.storm.container.cgroup.core.CpusetCore;
 import org.apache.storm.container.cgroup.core.MemoryCore;
+import org.apache.storm.daemon.supervisor.ClientSupervisorUtils;
+import org.apache.storm.daemon.supervisor.ExitCodeCallback;
 import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.ServerUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * Class that implements ResourceIsolationInterface that manages cgroups.
  */
-public class CgroupManager implements ResourceIsolationInterface {
+public class CgroupManager extends DefaultResourceIsolationManager {
 
     private static final Logger LOG = LoggerFactory.getLogger(CgroupManager.class);
-    private static final Pattern MEMINFO_PATTERN = Pattern.compile("^([^:\\s]+):\\s*([0-9]+)\\s*kB$");
     private CgroupCenter center;
     private Hierarchy hierarchy;
     private CgroupCommon rootCgroup;
     private String rootDir;
-    private Map<String, Object> conf;
     private Map<String, String> workerToNumaId;
 
-    static long getMemInfoFreeMb() throws IOException {
-        //MemFree:        14367072 kB
-        //Buffers:          536512 kB
-        //Cached:          1192096 kB
-        // MemFree + Buffers + Cached
-        long memFree = 0;
-        long buffers = 0;
-        long cached = 0;
-        try (BufferedReader in = new BufferedReader(new FileReader("/proc/meminfo"))) {
-            String line = null;
-            while ((line = in.readLine()) != null) {
-                Matcher match = MEMINFO_PATTERN.matcher(line);
-                if (match.matches()) {
-                    String tag = match.group(1);
-                    if (tag.equalsIgnoreCase("MemFree")) {
-                        memFree = Long.parseLong(match.group(2));
-                    } else if (tag.equalsIgnoreCase("Buffers")) {
-                        buffers = Long.parseLong(match.group(2));
-                    } else if (tag.equalsIgnoreCase("Cached")) {
-                        cached = Long.parseLong(match.group(2));
-                    }
-                }
-            }
-        }
-        return (memFree + buffers + cached) / 1024;
-    }
-
     /**
      * initialize data structures.
      *
@@ -89,7 +60,7 @@ public class CgroupManager implements ResourceIsolationInterface {
      */
     @Override
     public void prepare(Map<String, Object> conf) throws IOException {
-        this.conf = conf;
+        super.prepare(conf);
         this.rootDir = DaemonConfig.getCgroupRootDir(this.conf);
         if (this.rootDir == null) {
             throw new RuntimeException("Check configuration file. The storm.supervisor.cgroup.rootdir is missing.");
@@ -266,6 +237,33 @@ public class CgroupManager implements ResourceIsolationInterface {
     }
 
     @Override
+    public void launchWorkerProcess(String user, String topologyId, Map<String, Object> topoConf,
+                                    int port, String workerId,
+                                    List<String> command, Map<String, String> env, String logPrefix,
+                                    ExitCodeCallback processExitCallback, File targetDir) throws IOException {
+        if (workerToNumaId.containsKey(workerId)) {
+            prefixNumaPinning(command, workerToNumaId.get(workerId));
+        }
+
+        if (runAsUser) {
+            String workerDir = targetDir.getAbsolutePath();
+            List<String> args = Arrays.asList("worker", workerDir, ServerUtils.writeScript(workerDir, command, env));
+            List<String> commandPrefix = getLaunchCommandPrefix(workerId);
+            ClientSupervisorUtils.processLauncher(conf, user, commandPrefix, args, null,
+                logPrefix, processExitCallback, targetDir);
+        } else {
+            command = getLaunchCommand(workerId, command);
+            ClientSupervisorUtils.launchProcess(command, env, logPrefix, processExitCallback, targetDir);
+        }
+    }
+
+    /**
+     * To compose launch command based on workerId and existing command.
+     * @param workerId the worker id
+     * @param existingCommand the current command to run that may need to be modified
+     * @return new commandline with necessary additions to launch worker
+     */
+    @VisibleForTesting
     public List<String> getLaunchCommand(String workerId, List<String> existingCommand) {
         List<String> newCommand = getLaunchCommandPrefix(workerId);
 
@@ -277,8 +275,7 @@ public class CgroupManager implements ResourceIsolationInterface {
         return newCommand;
     }
 
-    @Override
-    public List<String> getLaunchCommandPrefix(String workerId) {
+    private List<String> getLaunchCommandPrefix(String workerId) {
         CgroupCommon workerGroup = new CgroupCommon(workerId, this.hierarchy, this.rootCgroup);
 
         if (!this.rootCgroup.getChildren().contains(workerGroup)) {
@@ -305,8 +302,7 @@ public class CgroupManager implements ResourceIsolationInterface {
         return newCommand;
     }
 
-    @Override
-    public Set<Long> getRunningPids(String workerId) throws IOException {
+    private Set<Long> getRunningPids(String workerId) throws IOException {
         CgroupCommon workerGroup = new CgroupCommon(workerId, this.hierarchy, this.rootCgroup);
         if (!this.rootCgroup.getChildren().contains(workerGroup)) {
             LOG.warn("cgroup {} doesn't exist!", workerGroup);
@@ -315,8 +311,22 @@ public class CgroupManager implements ResourceIsolationInterface {
         return workerGroup.getPids();
     }
 
+    /**
+     * Get all of the pids that are a part of this container.
+     * @param workerId the worker id
+     * @return all of the pids that are a part of this container
+     */
+    @Override
+    protected Set<Long> getAllPids(String workerId) throws IOException {
+        Set<Long> ret = super.getAllPids(workerId);
+        Set<Long> morePids = getRunningPids(workerId);
+        assert (morePids != null);
+        ret.addAll(morePids);
+        return ret;
+    }
+
     @Override
-    public long getMemoryUsage(String workerId) throws IOException {
+    public long getMemoryUsage(String user, String workerId, int port) throws IOException {
         CgroupCommon workerGroup = new CgroupCommon(workerId, this.hierarchy, this.rootCgroup);
         MemoryCore memCore = (MemoryCore) workerGroup.getCores().get(SubSystemType.memory);
         return memCore.getPhysicalUsage();
@@ -337,6 +347,11 @@ public class CgroupManager implements ResourceIsolationInterface {
             //Ignored if cgroups is not setup don't do anything with it
         }
 
-        return Long.min(rootCgroupLimitFree, getMemInfoFreeMb());
+        return Long.min(rootCgroupLimitFree, ServerUtils.getMemInfoFreeMb());
+    }
+
+    @Override
+    public boolean isResourceManaged() {
+        return true;
     }
 }
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java
index 76c0e65..51fef1a 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java
@@ -133,11 +133,11 @@ public class BasicContainer extends Container {
         this.localState = localState;
 
         if (type.isRecovery() && !type.isOnlyKillable()) {
-            synchronized (localState) {
+            synchronized (this.localState) {
                 String wid = null;
-                Map<String, Integer> workerToPort = localState.getApprovedWorkers();
+                Map<String, Integer> workerToPort = this.localState.getApprovedWorkers();
                 for (Map.Entry<String, Integer> entry : workerToPort.entrySet()) {
-                    if (port == entry.getValue().intValue()) {
+                    if (port == entry.getValue()) {
                         wid = entry.getKey();
                     }
                 }
@@ -176,7 +176,7 @@ public class BasicContainer extends Container {
     private static void removeWorkersOn(Map<String, Integer> workerToPort, int port) {
         for (Iterator<Entry<String, Integer>> i = workerToPort.entrySet().iterator(); i.hasNext(); ) {
             Entry<String, Integer> found = i.next();
-            if (port == found.getValue().intValue()) {
+            if (port == found.getValue()) {
                 LOG.warn("Deleting worker {} from state", found.getKey());
                 i.remove();
             }
@@ -245,26 +245,6 @@ public class BasicContainer extends Container {
         return exitedEarly;
     }
 
-    /**
-     * Run the given command for profiling.
-     *
-     * @param command   the command to run
-     * @param env       the environment to run the command
-     * @param logPrefix the prefix to include in the logs
-     * @param targetDir the working directory to run the command in
-     * @return true if it ran successfully, else false
-     *
-     * @throws IOException          on any error
-     * @throws InterruptedException if interrupted wile waiting for the process to exit.
-     */
-    protected boolean runProfilingCommand(List<String> command, Map<String, String> env, String logPrefix,
-                                          File targetDir) throws IOException, InterruptedException {
-        type.assertFull();
-        Process p = ClientSupervisorUtils.launchProcess(command, env, logPrefix, null, targetDir);
-        int ret = p.waitFor();
-        return ret == 0;
-    }
-
     @Override
     public boolean runProfiling(ProfileRequest request, boolean stop) throws IOException, InterruptedException {
         type.assertFull();
@@ -288,7 +268,7 @@ public class BasicContainer extends Container {
 
         File targetFile = new File(targetDir);
         if (command.size() > 0) {
-            return runProfilingCommand(command, env, logPrefix, targetFile);
+            return resourceIsolationManager.runProfilingCommand(getWorkerUser(), workerId, command, env, logPrefix, targetFile);
         }
         LOG.warn("PROFILING REQUEST NOT SUPPORTED {} IGNORED...", request);
         return true;
@@ -493,26 +473,6 @@ public class BasicContainer extends Container {
         return rets;
     }
 
-    /**
-     * Launch the worker process (non-blocking).
-     *
-     * @param command             the command to run
-     * @param env                 the environment to run the command
-     * @param processExitCallback a callback for when the process exits
-     * @param logPrefix           the prefix to include in the logs
-     * @param targetDir           the working directory to run the command in
-     * @return true if it ran successfully, else false
-     *
-     * @throws IOException on any error
-     */
-    protected void launchWorkerProcess(List<String> command, Map<String, String> env, String logPrefix,
-                                       ExitCodeCallback processExitCallback, File targetDir) throws IOException {
-        if (resourceIsolationManager != null) {
-            command = resourceIsolationManager.getLaunchCommand(workerId, command);
-        }
-        ClientSupervisorUtils.launchProcess(command, env, logPrefix, processExitCallback, targetDir);
-    }
-
     private String getWorkerLoggingConfigFile() {
         String log4jConfigurationDir = (String) (conf.get(DaemonConfig.STORM_LOG4J2_CONF_DIR));
 
@@ -702,7 +662,7 @@ public class BasicContainer extends Container {
         if (super.isMemoryLimitViolated(withUpdatedLimits)) {
             return true;
         }
-        if (resourceIsolationManager != null) {
+        if (resourceIsolationManager.isResourceManaged()) {
             // In the short term the goal is to not shoot anyone unless we really need to.
             // The on heap should limit the memory usage in most cases to a reasonable amount
             // If someone is using way more than they requested this is a bug and we should
@@ -798,8 +758,8 @@ public class BasicContainer extends Container {
     public long getMemoryUsageMb() {
         try {
             long ret = 0;
-            if (resourceIsolationManager != null) {
-                long usageBytes = resourceIsolationManager.getMemoryUsage(workerId);
+            if (resourceIsolationManager.isResourceManaged()) {
+                long usageBytes = resourceIsolationManager.getMemoryUsage(getWorkerUser(), workerId, port);
                 if (usageBytes >= 0) {
                     ret = usageBytes / 1024 / 1024;
                 }
@@ -818,7 +778,7 @@ public class BasicContainer extends Container {
 
     private long calculateMemoryLimit(final WorkerResources resources, final int memOnHeap) {
         long ret = memOnHeap;
-        if (resourceIsolationManager != null) {
+        if (resourceIsolationManager.isResourceManaged()) {
             final int memoffheap = (int) Math.ceil(resources.get_mem_off_heap());
             final int extraMem =
                 (int)
@@ -865,7 +825,7 @@ public class BasicContainer extends Container {
 
         topEnvironment.put("LD_LIBRARY_PATH", jlp);
 
-        if (resourceIsolationManager != null) {
+        if (resourceIsolationManager.isResourceManaged()) {
             final int cpu = (int) Math.ceil(resources.get_cpu());
             //Save the memory limit so we can enforce it less strictly
             resourceIsolationManager.reserveResourcesForWorker(workerId, (int) memoryLimitMb, cpu, numaId);
@@ -879,7 +839,8 @@ public class BasicContainer extends Container {
 
         String logPrefix = "Worker Process " + workerId;
         ProcessExitCallback processExitCallback = new ProcessExitCallback(logPrefix);
-        launchWorkerProcess(commandList, topEnvironment, logPrefix, processExitCallback, new File(workerDir));
+        resourceIsolationManager.launchWorkerProcess(getWorkerUser(), topologyId, topoConf, port, workerId,
+            commandList, topEnvironment, logPrefix, processExitCallback, new File(workerDir));
     }
 
     private static class TopologyMetaData {
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java
index 83e5a84..ba7fb9d 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java
@@ -20,22 +20,17 @@ package org.apache.storm.daemon.supervisor;
 
 import com.codahale.metrics.Meter;
 import com.codahale.metrics.Timer;
-import java.io.BufferedReader;
 import java.io.File;
 import java.io.IOException;
-import java.io.InputStreamReader;
 import java.io.Writer;
 import java.nio.file.Files;
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
-import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.commons.lang.StringUtils;
@@ -55,7 +50,6 @@ import org.apache.storm.utils.ConfigUtils;
 import org.apache.storm.utils.LocalState;
 import org.apache.storm.utils.ObjectReader;
 import org.apache.storm.utils.ServerConfigUtils;
-import org.apache.storm.utils.ServerUtils;
 import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -78,7 +72,6 @@ public abstract class Container implements Killable {
     private final Meter numForceKill;
     private final Timer shutdownDuration;
     private final Timer cleanupDuration;
-
     protected final Map<String, Object> conf;
     protected final Map<String, Object> topoConf; //Not set if RECOVER_PARTIAL
     protected final String topologyId; //Not set if RECOVER_PARTIAL
@@ -94,6 +87,7 @@ public abstract class Container implements Killable {
     protected ContainerMemoryTracker containerMemoryTracker;
     private long lastMetricProcessTime = 0L;
     private Timer.Context shutdownTimer = null;
+    protected boolean runAsUser;
     private String cachedUser;
 
     /**
@@ -137,6 +131,11 @@ public abstract class Container implements Killable {
         this.resourceIsolationManager = resourceIsolationManager;
         this.assignment = assignment;
 
+        runAsUser = ObjectReader.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false);
+        if (runAsUser && Utils.isOnWindows()) {
+            throw new UnsupportedOperationException("ERROR: Windows doesn't support running workers as different users yet");
+        }
+
         if (this.type.isOnlyKillable()) {
             assert (this.assignment == null);
             assert (this.port <= 0);
@@ -180,15 +179,6 @@ public abstract class Container implements Killable {
         return ConfigUtils.readSupervisorStormConf(conf, topologyId);
     }
 
-    /**
-     * Kill a given process.
-     *
-     * @param pid the id of the process to kill
-     */
-    protected void kill(long pid) throws IOException {
-        ServerUtils.killProcessWithSigTerm(String.valueOf(pid));
-    }
-
     @Override
     public void kill() throws IOException {
         LOG.info("Killing {}:{}", supervisorId, workerId);
@@ -196,10 +186,8 @@ public abstract class Container implements Killable {
             shutdownTimer = shutdownDuration.time();
         }
         try {
-            Set<Long> pids = getAllPids();
-
-            for (Long pid : pids) {
-                kill(pid);
+            if (resourceIsolationManager != null) {
+                resourceIsolationManager.kill(getWorkerUser(), workerId);
             }
         } catch (IOException e) {
             numKillExceptions.mark();
@@ -207,24 +195,13 @@ public abstract class Container implements Killable {
         }
     }
 
-    /**
-     * Kill a given process.
-     *
-     * @param pid the id of the process to kill
-     */
-    protected void forceKill(long pid) throws IOException {
-        ServerUtils.forceKillProcess(String.valueOf(pid));
-    }
-
     @Override
     public void forceKill() throws IOException {
         LOG.info("Force Killing {}:{}", supervisorId, workerId);
         numForceKill.mark();
         try {
-            Set<Long> pids = getAllPids();
-
-            for (Long pid : pids) {
-                forceKill(pid);
+            if (resourceIsolationManager != null) {
+                resourceIsolationManager.forceKill(getWorkerUser(), workerId);
             }
         } catch (IOException e) {
             numForceKillExceptions.mark();
@@ -246,419 +223,19 @@ public abstract class Container implements Killable {
         return hb;
     }
 
-    /**
-     * Is a process alive and running?.
-     *
-     * @param pid the PID of the running process
-     * @param user the user that is expected to own that process
-     * @return true if it is, else false
-     *
-     * @throws IOException on any error
-     */
-    public static boolean isProcessAlive(long pid, String user) throws IOException {
-        if (ServerUtils.IS_ON_WINDOWS) {
-            return isWindowsProcessAlive(pid, user);
-        }
-        return isPosixProcessAlive(pid, user);
-    }
-
-    private static boolean isWindowsProcessAlive(long pid, String user) throws IOException {
-        boolean ret = false;
-        LOG.debug("CMD: tasklist /fo list /fi \"pid eq {}\" /v", pid);
-        ProcessBuilder pb = new ProcessBuilder("tasklist", "/fo", "list", "/fi", "pid eq " + pid, "/v");
-        pb.redirectError(ProcessBuilder.Redirect.INHERIT);
-        try (BufferedReader in = new BufferedReader(new InputStreamReader(pb.start().getInputStream()))) {
-            int lineNo = 0;
-            String line;
-            while ((line = in.readLine()) != null) {
-                lineNo++;
-                LOG.debug("CMD=LINE#{}: {}", lineNo, line);
-                if (line.contains("User Name:")) { //Check for : in case someone called their user "User Name"
-                    //This line contains the user name for the pid we're looking up
-                    //Example line: "User Name:    exampleDomain\exampleUser"
-                    List<String> userNameLineSplitOnWhitespace = Arrays.asList(line.split(":"));
-                    if (userNameLineSplitOnWhitespace.size() == 2) {
-                        List<String> userAndMaybeDomain = Arrays.asList(userNameLineSplitOnWhitespace.get(1).trim().split("\\\\"));
-                        String processUser = userAndMaybeDomain.size() == 2 ? userAndMaybeDomain.get(1) : userAndMaybeDomain.get(0);
-                        processUser = processUser.trim();
-                        if (user.equals(processUser)) {
-                            ret = true;
-                        } else {
-                            LOG.info("Found {} running as {}, but expected it to be {}", pid, processUser, user);
-                        }
-                    } else {
-                        LOG.error("Received unexpected output from tasklist command. Expected one colon in user name line. Line was {}",
-                                line);
-                    }
-                    break;
-                }
-            }
-        }
-        return ret;
-    }
-
-    private static boolean isPosixProcessAlive(long pid, String user) throws IOException {
-        LOG.debug("CMD: ps -o user -p {}", pid);
-        ProcessBuilder pb = new ProcessBuilder("ps", "-o", "user", "-p", String.valueOf(pid));
-        pb.redirectError(ProcessBuilder.Redirect.INHERIT);
-        try (BufferedReader in = new BufferedReader(new InputStreamReader(pb.start().getInputStream()))) {
-            int lineNo = 1;
-            String line = in.readLine();
-            LOG.debug("CMD-LINE#{}: {}", lineNo, line);
-            if (!"USER".equals(line.trim())) {
-                LOG.error("Expecting first line to contain USER, found \"{}\"", line);
-                return false;
-            }
-            while ((line = in.readLine()) != null) {
-                lineNo++;
-                LOG.debug("CMD-LINE#{}: {}", lineNo, line);
-                line = line.trim();
-                if (user.equals(line)) {
-                    return true;
-                }
-                LOG.info("Found {} running as {}, but expected it to be {}", pid, line, user);
-            }
-        } catch (IOException ex) {
-            String err = String.format("Cannot read output of command \"ps -o user -p %d\"", pid);
-            throw new IOException(err, ex);
-        }
-        return false;
-    }
-
-    /**
-     * Are any of the processes alive and running for the specified user. If collection is empty or null
-     * then the return value is trivially false.
-     *
-     * @param pids the PIDs of the running processes
-     * @param user the user that is expected to own that process
-     * @return true if any one of the processes is owned by user and alive, else false
-     * @throws IOException on I/O exception
-     */
-    public static boolean isAnyProcessAlive(Collection<Long> pids, String user) throws IOException {
-        if (pids == null || pids.isEmpty()) {
-            return false;
-        }
-
-        if (ServerUtils.IS_ON_WINDOWS) {
-            return isAnyWindowsProcessAlive(pids, user);
-        }
-        return isAnyPosixProcessAlive(pids, user);
-    }
-
-    /**
-     * Are any of the processes alive and running for the specified userId. If collection is empty or null
-     * then the return value is trivially false.
-     *
-     * @param pids the PIDs of the running processes
-     * @param uid the user that is expected to own that process
-     * @return true if any one of the processes is owned by user and alive, else false
-     * @throws IOException on I/O exception
-     */
-    public static boolean isAnyProcessAlive(Collection<Long> pids, int uid) throws IOException {
-        if (pids == null || pids.isEmpty()) {
-            return false;
-        }
-        if (ServerUtils.IS_ON_WINDOWS) {
-            return isAnyWindowsProcessAlive(pids, uid);
-        }
-        return isAnyPosixProcessAlive(pids, uid);
-    }
-
-    /**
-     * Find if any of the Windows processes are alive and owned by the specified user.
-     * Command reference https://docs.microsoft.com/en-us/windows-server/administration/windows-commands/tasklist.
-     *
-     * @param pids the PIDs of the running processes
-     * @param user the user that is expected to own that process
-     * @return true if any one of the processes is owned by user and alive, else false
-     * @throws IOException on I/O exception
-     */
-    private static boolean isAnyWindowsProcessAlive(Collection<Long> pids, String user) throws IOException {
-        List<String> cmdArgs = new ArrayList<>();
-        cmdArgs.add("tasklist");
-        cmdArgs.add("/fo");
-        cmdArgs.add("list");
-        pids.forEach(pid -> {
-            cmdArgs.add("/fi");
-            cmdArgs.add("pid eq " + pid);
-        });
-        cmdArgs.add("/v");
-        LOG.debug("CMD: {}", String.join(" ", cmdArgs));
-        ProcessBuilder pb = new ProcessBuilder(cmdArgs);
-        pb.redirectError(ProcessBuilder.Redirect.INHERIT);
-        List<String> unexpectedUsers = new ArrayList<>();
-        try (BufferedReader in = new BufferedReader(new InputStreamReader(pb.start().getInputStream()))) {
-            int lineNo = 0;
-            String line;
-            while ((line = in.readLine()) != null) {
-                lineNo++;
-                LOG.debug("CMD-LINE#{}: {}", lineNo, line);
-                if (line.contains("User Name:")) { //Check for : in case someone called their user "User Name"
-                    //This line contains the user name for the pid we're looking up
-                    //Example line: "User Name:    exampleDomain\exampleUser"
-                    List<String> userNameLineSplitOnWhitespace = Arrays.asList(line.split(":"));
-                    if (userNameLineSplitOnWhitespace.size() == 2) {
-                        List<String> userAndMaybeDomain = Arrays.asList(userNameLineSplitOnWhitespace.get(1).trim().split("\\\\"));
-                        String processUser = userAndMaybeDomain.size() == 2 ? userAndMaybeDomain.get(1) : userAndMaybeDomain.get(0);
-                        processUser = processUser.trim();
-                        if (user.equals(processUser)) {
-                            return true;
-                        }
-                        unexpectedUsers.add(processUser);
-                    } else {
-                        LOG.error("Received unexpected output from tasklist command. Expected one colon in user name line. Line was {}",
-                                line);
-                    }
-                }
-            }
-        } catch (IOException ex) {
-            String err = String.format("Cannot read output of command \"%s\"", String.join(" ", cmdArgs));
-            throw new IOException(err, ex);
-        }
-        String pidsAsStr = StringUtils.join(pids, ",");
-        if (unexpectedUsers.isEmpty()) {
-            LOG.info("None of the processes {} are alive", pidsAsStr);
-        } else {
-            LOG.info("{} of the Processes {} are running as user(s) {}: but expected user is {}",
-                    unexpectedUsers.size(), pidsAsStr, String.join(",", new TreeSet<>(unexpectedUsers)), user);
-        }
-        return false;
-    }
-
-    /**
-     * Find if any of the Windows processes are alive and owned by the specified userId.
-     * This overridden method is provided for symmetry, but is not implemented.
-     *
-     * @param pids the PIDs of the running processes
-     * @param uid the user that is expected to own that process
-     * @return true if any one of the processes is owned by user and alive, else false
-     * @throws IOException on I/O exception
-     */
-    private static boolean isAnyWindowsProcessAlive(Collection<Long> pids, int uid) throws IOException {
-        throw new IllegalArgumentException("UID is not supported on Windows");
-    }
-
-    /**
-     * Are any of the processes alive and running for the specified user.
-     *
-     * @param pids the PIDs of the running processes
-     * @param user the user that is expected to own that process
-     * @return true if any one of the processes is owned by user and alive, else false
-     * @throws IOException on I/O exception
-     */
-    private static boolean isAnyPosixProcessAlive(Collection<Long> pids, String user) throws IOException {
-        String pidParams = StringUtils.join(pids, ",");
-        LOG.debug("CMD: ps -o user -p {}", pidParams);
-        ProcessBuilder pb = new ProcessBuilder("ps", "-o", "user", "-p", pidParams);
-        pb.redirectError(ProcessBuilder.Redirect.INHERIT);
-        List<String> unexpectedUsers = new ArrayList<>();
-        try (BufferedReader in = new BufferedReader(new InputStreamReader(pb.start().getInputStream()))) {
-            int lineNo = 1;
-            String line = in.readLine();
-            LOG.debug("CMD-LINE#{}: {}", lineNo, line);
-            if (!"USER".equals(line.trim())) {
-                LOG.error("Expecting first line to contain USER, found \"{}\"", line);
-                return false;
-            }
-            while ((line = in.readLine()) != null) {
-                lineNo++;
-                LOG.debug("CMD-LINE#{}: {}", lineNo, line);
-                line = line.trim();
-                if (user.equals(line)) {
-                    return true;
-                }
-                unexpectedUsers.add(line);
-            }
-        } catch (IOException ex) {
-            String err = String.format("Cannot read output of command \"ps -o user -p %s\"", pidParams);
-            throw new IOException(err, ex);
-        }
-        if (unexpectedUsers.isEmpty()) {
-            LOG.info("None of the processes {} are alive", pidParams);
-        } else {
-            LOG.info("{} of {} Processes {} are running as user(s) {}: but expected user is {}",
-                    unexpectedUsers.size(), pids.size(), pidParams, String.join(",", new TreeSet<>(unexpectedUsers)), user);
-        }
-        return false;
-    }
-
-    /**
-     * Are any of the processes alive and running for the specified UID.
-     *
-     * @param pids the PIDs of the running processes
-     * @param uid the userId that is expected to own that process
-     * @return true if any one of the processes is owned by user and alive, else false
-     * @throws IOException on I/O exception
-     */
-    private static boolean isAnyPosixProcessAlive(Collection<Long> pids, int uid) throws IOException {
-        String pidParams = StringUtils.join(pids, ",");
-        LOG.debug("CMD: ps -o uid -p {}", pidParams);
-        ProcessBuilder pb = new ProcessBuilder("ps", "-o", "uid", "-p", pidParams);
-        pb.redirectError(ProcessBuilder.Redirect.INHERIT);
-        List<String> unexpectedUsers = new ArrayList<>();
-        try (BufferedReader in = new BufferedReader(new InputStreamReader(pb.start().getInputStream()))) {
-            int lineNo = 1;
-            String line = in.readLine();
-            LOG.debug("CMD-LINE#{}: {}", lineNo, line);
-            if (!"UID".equals(line.trim())) {
-                LOG.error("Expecting first line to contain UID, found \"{}\"", line);
-                return false;
-            }
-            while ((line = in.readLine()) != null) {
-                lineNo++;
-                LOG.debug("CMD-LINE#{}: {}", lineNo, line);
-                line = line.trim();
-                try {
-                    if (uid == Integer.parseInt(line)) {
-                        return true;
-                    }
-                } catch (Exception ex) {
-                    LOG.warn("Expecting UID integer but got {} in output of ps command", line);
-                }
-                unexpectedUsers.add(line);
-            }
-        } catch (IOException ex) {
-            String err = String.format("Cannot read output of command \"ps -o uid -p %s\"", pidParams);
-            throw new IOException(err, ex);
-        }
-        if (unexpectedUsers.isEmpty()) {
-            LOG.info("None of the processes {} are alive", pidParams);
-        } else {
-            LOG.info("{} of {} Processes {} are running as UIDs {}: but expected userId is {}",
-                    unexpectedUsers.size(), pids.size(), pidParams, String.join(",", new TreeSet<>(unexpectedUsers)), uid);
-        }
-        return false;
-    }
-
-    /**
-     * Get the userId for a user name. This works on Posix systems by using "id -u" command.
-     * Throw IllegalArgumentException on Windows.
-     *
-     * @param user username to be converted to UID. This is optional, in which case current user is returned.
-     * @return UID for the specified user (if supplied), else UID of current user, -1 upon Exception.
-     */
-    public static int getUserId(String user) {
-        if (ServerUtils.IS_ON_WINDOWS) {
-            throw new IllegalArgumentException("Not supported in Windows platform");
-        }
-        List<String> cmdArgs = new ArrayList<>();
-        cmdArgs.add("id");
-        cmdArgs.add("-u");
-        if (user != null && !user.isEmpty()) {
-            cmdArgs.add(user);
-        }
-        LOG.debug("CMD: {}", String.join(" ", cmdArgs));
-        ProcessBuilder pb = new ProcessBuilder(cmdArgs);
-        pb.redirectError(ProcessBuilder.Redirect.INHERIT);
-        try (BufferedReader in = new BufferedReader(new InputStreamReader(pb.start().getInputStream()))) {
-            String line = in.readLine();
-            LOG.debug("CMD-LINE#1: {}", line);
-            try {
-                return Integer.parseInt(line.trim());
-            } catch (NumberFormatException ex) {
-                LOG.error("Expecting UID integer but got {} in output of \"id -u {}\" command", line, user);
-                return -1;
-            }
-        } catch (IOException ex) {
-            LOG.error(String.format("Cannot read output of command \"%s\"", String.join(" ", cmdArgs)), ex);
-            return -1;
-        }
-    }
-
-    /**
-     * Get the userId of the onwer of the path by running "ls -dn path" command.
-     * This command works on Posix systems only.
-     *
-     * @param fpath full path to the file or directory.
-     * @return UID for the specified if successful, -1 upon failure.
-     */
-    public static int getPathOwnerUid(String fpath) {
-        if (ServerUtils.IS_ON_WINDOWS) {
-            throw new IllegalArgumentException("Not supported in Windows platform");
-        }
-        File f = new File(fpath);
-        if (!f.exists()) {
-            LOG.error("Cannot determine owner of non-existent file {}", fpath);
-            return -1;
-        }
-        LOG.debug("CMD: ls -dn {}", fpath);
-        ProcessBuilder pb = new ProcessBuilder("ls", "-dn", fpath);
-        pb.redirectError(ProcessBuilder.Redirect.INHERIT);
-        try (BufferedReader in = new BufferedReader(new InputStreamReader(pb.start().getInputStream()))) {
-            String line = in.readLine();
-            LOG.debug("CMD-OUTLINE: {}", line);
-            line = line.trim();
-            String[] parts = line.split("\\s+");
-            if (parts.length < 3) {
-                LOG.error("Expecting at least 3 space separated fields in \"ls -dn {}\" output, got {}", fpath, line);
-                return -1;
-            }
-            try {
-                return Integer.parseInt(parts[2]);
-            } catch (NumberFormatException ex) {
-                LOG.error("Expecting at third field {} to be numeric UID \"ls -dn {}\" output, got {}", parts[2], fpath, line);
-                return -1;
-            }
-        } catch (IOException ex) {
-            LOG.error(String.format("Cannot read output of command \"ls -dn %s\"", fpath), ex);
-            return -1;
-        }
-    }
-
-    /**
-     * Get UID of the owner to the workerId Root directory.
-     *
-     * @return User ID (UID) of owner of the workerId root directory, -1 if directory is missing.
-     */
-    private int getWorkerPathOwnerUid(String workerId) {
-        return getPathOwnerUid(ConfigUtils.workerRoot(conf, workerId));
-    }
-
-    /**
-     * Find if all processes for the user on workId are dead.
-     * This method attempts to optimize the calls by:
-     * <p>
-     *     <li>creating a collection of ProcessIds and checking all of them at once</li>
-     *     <li>using userId one Posix systems instead of user</li>
-     * </p>
-     *
-     * @return true if all processes for the user are dead on the worker
-     * @throws IOException if external commands have exception.
-     */
     @Override
     public boolean areAllProcessesDead() throws IOException {
-        Set<Long> pids = getAllPids();
-        String user = getRunWorkerAsUser();
-
         boolean allDead = true;
-        try {
-            if (pids.isEmpty()) {
-                return true;
-            }
-            if (ServerUtils.IS_ON_WINDOWS) {
-                return allDead = !isAnyProcessAlive(pids, user);
-            }
-            // optimized for Posix - try to use uid
-            if (!cachedUserToUidMap.containsKey(user)) {
-                int uid = getWorkerPathOwnerUid(workerId);
-                if (uid < 0) {
-                    uid = getUserId(user);
-                }
-                if (uid >= 0) {
-                    cachedUserToUidMap.put(user, uid);
-                }
-            }
-            if (cachedUserToUidMap.containsKey(user)) {
-                return allDead = !isAnyProcessAlive(pids, cachedUserToUidMap.get(user));
-            } else {
-                return allDead = !isAnyProcessAlive(pids, user);
-            }
-        } finally {
-            if (allDead && shutdownTimer != null) {
-                shutdownTimer.stop();
-                shutdownTimer = null;
-            }
+        if (resourceIsolationManager != null) {
+            allDead = resourceIsolationManager.areAllProcessesDead(getWorkerUser(), workerId);
+        }
+
+        if (allDead && shutdownTimer != null) {
+            shutdownTimer.stop();
+            shutdownTimer = null;
         }
+
+        return allDead;
     }
 
     @Override
@@ -767,7 +344,7 @@ public abstract class Container implements Killable {
             File topoDir = new File(ConfigUtils.workerArtifactsRoot(conf, topologyId, port));
             if (ops.fileExists(workerDir)) {
                 LOG.debug("Creating symlinks for worker-id: {} topology-id: {} to its port artifacts directory", workerId, topologyId);
-                ops.createSymlink(new File(workerDir, "artifacts"), topoDir);
+                ops.createSymlink(new File(ConfigUtils.workerArtifactsSymlink(conf, workerId)), topoDir);
             }
         }
     }
@@ -824,28 +401,8 @@ public abstract class Container implements Killable {
     }
 
     /**
-     * Get all PIDs.
-     * @return all of the pids that are a part of this container.
-     */
-    protected Set<Long> getAllPids() throws IOException {
-        Set<Long> ret = new HashSet<>();
-        for (String listing : ConfigUtils.readDirContents(ConfigUtils.workerPidsRoot(conf, workerId))) {
-            ret.add(Long.valueOf(listing));
-        }
-
-        if (resourceIsolationManager != null) {
-            Set<Long> morePids = resourceIsolationManager.getRunningPids(workerId);
-            assert (morePids != null);
-            ret.addAll(morePids);
-        }
-
-        return ret;
-    }
-
-    /**
-     * Get worker user.
+     * Get the user of the worker.
      * @return the user that some operations should be done as.
-     *
      * @throws IOException on any error
      */
     protected String getWorkerUser() throws IOException {
@@ -884,17 +441,6 @@ public abstract class Container implements Killable {
         }
     }
 
-    /**
-     * Returns the user that the worker process is running as.
-     *
-     * <p>The default behavior is to launch the worker as the user supervisor is running as (e.g. 'storm')
-     *
-     * @return the user that the worker process is running as.
-     */
-    protected String getRunWorkerAsUser() {
-        return System.getProperty("user.name");
-    }
-
     protected void saveWorkerUser(String user) throws IOException {
         type.assertFull();
         LOG.info("SET worker-user {} {}", workerId, user);
@@ -914,17 +460,12 @@ public abstract class Container implements Killable {
      */
     public void cleanUpForRestart() throws IOException {
         LOG.info("Cleaning up {}:{}", supervisorId, workerId);
-        Set<Long> pids = getAllPids();
         String user = getWorkerUser();
 
-        for (Long pid : pids) {
-            File path = new File(ConfigUtils.workerPidPath(conf, workerId, pid));
-            ops.deleteIfExists(path, user, workerId);
-        }
-
         //clean up for resource isolation if enabled
         if (resourceIsolationManager != null) {
             resourceIsolationManager.releaseResourcesForWorker(workerId);
+            resourceIsolationManager.cleanup(user, workerId, port);
         }
 
         //Always make sure to clean up everything else before worker directory
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ContainerLauncher.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ContainerLauncher.java
index d5ae161..1160028 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ContainerLauncher.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ContainerLauncher.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
  * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
@@ -14,8 +14,8 @@ package org.apache.storm.daemon.supervisor;
 
 import java.io.IOException;
 import java.util.Map;
-import org.apache.storm.Config;
 import org.apache.storm.DaemonConfig;
+import org.apache.storm.container.DefaultResourceIsolationManager;
 import org.apache.storm.container.ResourceIsolationInterface;
 import org.apache.storm.generated.LocalAssignment;
 import org.apache.storm.messaging.IContext;
@@ -59,18 +59,19 @@ public abstract class ContainerLauncher {
                 localSupervisor);
         }
 
-        ResourceIsolationInterface resourceIsolationManager = null;
+        ResourceIsolationInterface resourceIsolationManager;
         if (ObjectReader.getBoolean(conf.get(DaemonConfig.STORM_RESOURCE_ISOLATION_PLUGIN_ENABLE), false)) {
             resourceIsolationManager = ReflectionUtils.newInstance((String) conf.get(DaemonConfig.STORM_RESOURCE_ISOLATION_PLUGIN));
-            resourceIsolationManager.prepare(conf);
-            LOG.info("Using resource isolation plugin {} {}", conf.get(DaemonConfig.STORM_RESOURCE_ISOLATION_PLUGIN),
-                     resourceIsolationManager);
+            LOG.info("Using resource isolation plugin {}: {}", conf.get(DaemonConfig.STORM_RESOURCE_ISOLATION_PLUGIN),
+                resourceIsolationManager);
+        } else {
+            resourceIsolationManager = new DefaultResourceIsolationManager();
+            LOG.info("{} is false. Using default resource isolation plugin: {}", DaemonConfig.STORM_RESOURCE_ISOLATION_PLUGIN_ENABLE,
+                resourceIsolationManager);
         }
 
-        if (ObjectReader.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
-            return new RunAsUserContainerLauncher(conf, supervisorId, supervisorPort, resourceIsolationManager, metricsRegistry, 
-                containerMemoryTracker);
-        }
+        resourceIsolationManager.prepare(conf);
+
         return new BasicContainerLauncher(conf, supervisorId, supervisorPort, resourceIsolationManager, metricsRegistry, 
             containerMemoryTracker);
     }
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/RunAsUserContainer.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/RunAsUserContainer.java
deleted file mode 100644
index b2681ea..0000000
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/RunAsUserContainer.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.daemon.supervisor;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import org.apache.storm.container.ResourceIsolationInterface;
-import org.apache.storm.generated.LocalAssignment;
-import org.apache.storm.metric.StormMetricsRegistry;
-import org.apache.storm.utils.LocalState;
-import org.apache.storm.utils.ServerUtils;
-import org.apache.storm.utils.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class RunAsUserContainer extends BasicContainer {
-    private static final Logger LOG = LoggerFactory.getLogger(RunAsUserContainer.class);
-
-    public RunAsUserContainer(Container.ContainerType type, Map<String, Object> conf, String supervisorId,
-                              int supervisorPort, int port, LocalAssignment assignment,
-                              ResourceIsolationInterface resourceIsolationManager, LocalState localState,
-                              String workerId, StormMetricsRegistry metricsRegistry, 
-                              ContainerMemoryTracker containerMemoryTracker) throws IOException {
-        this(type, conf, supervisorId, supervisorPort, port, assignment, resourceIsolationManager, localState, workerId, metricsRegistry,
-            containerMemoryTracker, null, null, null);
-    }
-
-    RunAsUserContainer(Container.ContainerType type, Map<String, Object> conf, String supervisorId, int supervisorPort,
-                       int port, LocalAssignment assignment, ResourceIsolationInterface resourceIsolationManager,
-                       LocalState localState, String workerId, StormMetricsRegistry metricsRegistry,
-                       ContainerMemoryTracker containerMemoryTracker, Map<String, Object> topoConf,
-                       AdvancedFSOps ops, String profileCmd) throws IOException {
-        super(type, conf, supervisorId, supervisorPort, port, assignment, resourceIsolationManager, localState,
-              workerId, metricsRegistry, containerMemoryTracker, topoConf, ops, profileCmd);
-        if (Utils.isOnWindows()) {
-            throw new UnsupportedOperationException("ERROR: Windows doesn't support running workers as different users yet");
-        }
-    }
-
-    private void signal(long pid, int signal) throws IOException {
-        List<String> commands = Arrays.asList("signal", String.valueOf(pid), String.valueOf(signal));
-        String user = getWorkerUser();
-        String logPrefix = "kill -" + signal + " " + pid;
-        ClientSupervisorUtils.processLauncherAndWait(conf, user, commands, null, logPrefix);
-    }
-
-    @Override
-    protected void kill(long pid) throws IOException {
-        signal(pid, 15);
-    }
-
-    @Override
-    protected void forceKill(long pid) throws IOException {
-        signal(pid, 9);
-    }
-
-    @Override
-    protected boolean runProfilingCommand(List<String> command, Map<String, String> env, String logPrefix, File targetDir) throws
-        IOException, InterruptedException {
-        String user = this.getWorkerUser();
-        String td = targetDir.getAbsolutePath();
-        LOG.info("Running as user: {} command: {}", user, command);
-        String containerFile = ServerUtils.containerFilePath(td);
-        if (Utils.checkFileExists(containerFile)) {
-            SupervisorUtils.rmrAsUser(conf, containerFile, containerFile);
-        }
-        String scriptFile = ServerUtils.scriptFilePath(td);
-        if (Utils.checkFileExists(scriptFile)) {
-            SupervisorUtils.rmrAsUser(conf, scriptFile, scriptFile);
-        }
-        String script = ServerUtils.writeScript(td, command, env);
-        List<String> args = Arrays.asList("profiler", td, script);
-        int ret = ClientSupervisorUtils.processLauncherAndWait(conf, user, args, env, logPrefix);
-        return ret == 0;
-    }
-
-    @Override
-    protected void launchWorkerProcess(List<String> command, Map<String, String> env,
-                                       String logPrefix, ExitCodeCallback processExitCallback, File targetDir) throws IOException {
-        String workerDir = targetDir.getAbsolutePath();
-        String user = this.getWorkerUser();
-        List<String> args = Arrays.asList("worker", workerDir, ServerUtils.writeScript(workerDir, command, env));
-        List<String> commandPrefix = null;
-        if (resourceIsolationManager != null) {
-            commandPrefix = resourceIsolationManager.getLaunchCommandPrefix(workerId);
-        }
-        ClientSupervisorUtils.processLauncher(conf, user, commandPrefix, args, null, logPrefix, processExitCallback, targetDir);
-    }
-
-    /**
-     * If 'supervisor.run.worker.as.user' is set, worker will be launched as the user that launched the topology.
-     */
-    @Override
-    protected String getRunWorkerAsUser() {
-        try {
-            return getWorkerUser();
-        } catch (Exception ex) {
-            throw new RuntimeException(ex);
-        }
-    }
-}
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/RunAsUserContainerLauncher.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/RunAsUserContainerLauncher.java
deleted file mode 100644
index 7706662..0000000
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/RunAsUserContainerLauncher.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.daemon.supervisor;
-
-import java.io.IOException;
-import java.util.Map;
-import org.apache.storm.container.ResourceIsolationInterface;
-import org.apache.storm.daemon.supervisor.Container.ContainerType;
-import org.apache.storm.generated.LocalAssignment;
-import org.apache.storm.metric.StormMetricsRegistry;
-import org.apache.storm.utils.LocalState;
-
-public class RunAsUserContainerLauncher extends ContainerLauncher {
-    protected final ResourceIsolationInterface resourceIsolationManager;
-    private final Map<String, Object> conf;
-    private final String supervisorId;
-    private final int supervisorPort;
-    private final StormMetricsRegistry metricsRegistry;
-    private final ContainerMemoryTracker containerMemoryTracker;
-
-    public RunAsUserContainerLauncher(Map<String, Object> conf, String supervisorId, int supervisorPort,
-                                      ResourceIsolationInterface resourceIsolationManager, StormMetricsRegistry metricsRegistry, 
-                                      ContainerMemoryTracker containerMemoryTracker) throws IOException {
-        this.conf = conf;
-        this.supervisorId = supervisorId;
-        this.supervisorPort = supervisorPort;
-        this.resourceIsolationManager = resourceIsolationManager;
-        this.metricsRegistry = metricsRegistry;
-        this.containerMemoryTracker = containerMemoryTracker;
-    }
-
-    @Override
-    public Container launchContainer(int port, LocalAssignment assignment, LocalState state) throws IOException {
-        Container container = new RunAsUserContainer(ContainerType.LAUNCH, conf, supervisorId, supervisorPort, port,
-            assignment, resourceIsolationManager, state, null, metricsRegistry, containerMemoryTracker, null, null, null);
-        container.setup();
-        container.launch();
-        return container;
-    }
-
-    @Override
-    public Container recoverContainer(int port, LocalAssignment assignment, LocalState state) throws IOException {
-        return new RunAsUserContainer(ContainerType.RECOVER_FULL, conf, supervisorId, supervisorPort, port,
-            assignment, resourceIsolationManager, state, null, metricsRegistry, containerMemoryTracker,
-            null, null, null);
-    }
-
-    @Override
-    public Killable recoverContainer(String workerId, LocalState localState) throws IOException {
-        return new RunAsUserContainer(ContainerType.RECOVER_PARTIAL, conf, supervisorId, supervisorPort, -1, null,
-                resourceIsolationManager, localState, workerId, metricsRegistry, containerMemoryTracker,
-            null, null, null);
-    }
-
-}
diff --git a/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedTopologyBlob.java b/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedTopologyBlob.java
index babed56..773b464 100644
--- a/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedTopologyBlob.java
+++ b/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedTopologyBlob.java
@@ -57,6 +57,7 @@ public class LocallyCachedTopologyBlob extends LocallyCachedBlob {
     private final String owner;
     private volatile long version = NOT_DOWNLOADED_VERSION;
     private volatile long size = 0;
+    private final Map<String, Object> conf;
 
     /**
      * Create a new LocallyCachedBlob.
@@ -73,6 +74,7 @@ public class LocallyCachedTopologyBlob extends LocallyCachedBlob {
         this.isLocalMode = isLocalMode;
         this.fsOps = fsOps;
         this.owner = owner;
+        this.conf = conf;
         topologyBasicBlobsRootDir = Paths.get(ConfigUtils.supervisorStormDistRoot(conf, topologyId));
         readVersion();
         updateSizeOnDisk();
@@ -239,7 +241,7 @@ public class LocallyCachedTopologyBlob extends LocallyCachedBlob {
             // any races between multiple versions running at the same time.  Ideally this would be on a per topology
             // basis, but that is a lot harder and the changes run fairly quickly so it should not be a big deal.
             fsOps.setupStormCodeDir(owner, topologyBasicBlobsRootDir.toFile());
-            File sharedMemoryDirFinalLocation = new File(topologyBasicBlobsRootDir.toFile(), "shared_by_topology");
+            File sharedMemoryDirFinalLocation = new File(ConfigUtils.sharedByTopologyDir(conf, topologyId));
             sharedMemoryDirFinalLocation.mkdirs();
             fsOps.setupWorkerArtifactsDir(owner, sharedMemoryDirFinalLocation);
         }
diff --git a/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java b/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java
index b099bc1..fba6c57 100644
--- a/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java
+++ b/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java
@@ -20,13 +20,16 @@ package org.apache.storm.utils;
 
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
+import java.io.BufferedReader;
 import java.io.BufferedWriter;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
+import java.io.FileReader;
 import java.io.FileWriter;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.InputStreamReader;
 import java.io.OutputStream;
 import java.io.RandomAccessFile;
 import java.net.URL;
@@ -37,6 +40,7 @@ import java.nio.file.Paths;
 import java.nio.file.attribute.PosixFilePermission;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Enumeration;
 import java.util.HashMap;
@@ -44,7 +48,11 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.jar.JarFile;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 import java.util.zip.GZIPInputStream;
 import java.util.zip.ZipEntry;
 import java.util.zip.ZipFile;
@@ -773,4 +781,445 @@ public class ServerUtils {
             Utils.sleep(attemptsIntervalTime);
         }
     }
+
+    private static final Pattern MEMINFO_PATTERN = Pattern.compile("^([^:\\s]+):\\s*([0-9]+)\\s*kB$");
+
+    /**
+     * Get system free memory in megabytes.
+     * @return system free memory in megabytes
+     * @throws IOException on I/O exception
+     */
+    public static long getMemInfoFreeMb() throws IOException {
+        //MemFree:        14367072 kB
+        //Buffers:          536512 kB
+        //Cached:          1192096 kB
+        // MemFree + Buffers + Cached
+        long memFree = 0;
+        long buffers = 0;
+        long cached = 0;
+        try (BufferedReader in = new BufferedReader(new FileReader("/proc/meminfo"))) {
+            String line = null;
+            while ((line = in.readLine()) != null) {
+                Matcher match = MEMINFO_PATTERN.matcher(line);
+                if (match.matches()) {
+                    String tag = match.group(1);
+                    if (tag.equalsIgnoreCase("MemFree")) {
+                        memFree = Long.parseLong(match.group(2));
+                    } else if (tag.equalsIgnoreCase("Buffers")) {
+                        buffers = Long.parseLong(match.group(2));
+                    } else if (tag.equalsIgnoreCase("Cached")) {
+                        cached = Long.parseLong(match.group(2));
+                    }
+                }
+            }
+        }
+        return (memFree + buffers + cached) / 1024;
+    }
+
+    /**
+     * Is a process alive and running?.
+     *
+     * @param pid the PID of the running process
+     * @param user the user that is expected to own that process
+     * @return true if it is, else false
+     *
+     * @throws IOException on any error
+     */
+    public static boolean isProcessAlive(long pid, String user) throws IOException {
+        if (ServerUtils.IS_ON_WINDOWS) {
+            return isWindowsProcessAlive(pid, user);
+        }
+        return isPosixProcessAlive(pid, user);
+    }
+
+    private static boolean isWindowsProcessAlive(long pid, String user) throws IOException {
+        boolean ret = false;
+        LOG.debug("CMD: tasklist /fo list /fi \"pid eq {}\" /v", pid);
+        ProcessBuilder pb = new ProcessBuilder("tasklist", "/fo", "list", "/fi", "pid eq " + pid, "/v");
+        pb.redirectError(ProcessBuilder.Redirect.INHERIT);
+        try (BufferedReader in = new BufferedReader(new InputStreamReader(pb.start().getInputStream()))) {
+            int lineNo = 0;
+            String line;
+            while ((line = in.readLine()) != null) {
+                lineNo++;
+                LOG.debug("CMD=LINE#{}: {}", lineNo, line);
+                if (line.contains("User Name:")) { //Check for : in case someone called their user "User Name"
+                    //This line contains the user name for the pid we're looking up
+                    //Example line: "User Name:    exampleDomain\exampleUser"
+                    List<String> userNameLineSplitOnWhitespace = Arrays.asList(line.split(":"));
+                    if (userNameLineSplitOnWhitespace.size() == 2) {
+                        List<String> userAndMaybeDomain = Arrays.asList(userNameLineSplitOnWhitespace.get(1).trim().split("\\\\"));
+                        String processUser = userAndMaybeDomain.size() == 2 ? userAndMaybeDomain.get(1) : userAndMaybeDomain.get(0);
+                        processUser = processUser.trim();
+                        if (user.equals(processUser)) {
+                            ret = true;
+                        } else {
+                            LOG.info("Found {} running as {}, but expected it to be {}", pid, processUser, user);
+                        }
+                    } else {
+                        LOG.error("Received unexpected output from tasklist command. Expected one colon in user name line. Line was {}",
+                            line);
+                    }
+                    break;
+                }
+            }
+        }
+        return ret;
+    }
+
+    private static boolean isPosixProcessAlive(long pid, String user) throws IOException {
+        LOG.debug("CMD: ps -o user -p {}", pid);
+        ProcessBuilder pb = new ProcessBuilder("ps", "-o", "user", "-p", String.valueOf(pid));
+        pb.redirectError(ProcessBuilder.Redirect.INHERIT);
+        try (BufferedReader in = new BufferedReader(new InputStreamReader(pb.start().getInputStream()))) {
+            int lineNo = 1;
+            String line = in.readLine();
+            LOG.debug("CMD-LINE#{}: {}", lineNo, line);
+            if (!"USER".equals(line.trim())) {
+                LOG.error("Expecting first line to contain USER, found \"{}\"", line);
+                return false;
+            }
+            while ((line = in.readLine()) != null) {
+                lineNo++;
+                LOG.debug("CMD-LINE#{}: {}", lineNo, line);
+                line = line.trim();
+                if (user.equals(line)) {
+                    return true;
+                }
+                LOG.info("Found {} running as {}, but expected it to be {}", pid, line, user);
+            }
+        } catch (IOException ex) {
+            String err = String.format("Cannot read output of command \"ps -o user -p %d\"", pid);
+            throw new IOException(err, ex);
+        }
+        return false;
+    }
+
+    /**
+     * Are any of the processes alive and running for the specified user. If collection is empty or null
+     * then the return value is trivially false.
+     *
+     * @param pids the PIDs of the running processes
+     * @param user the user that is expected to own that process
+     * @return true if any one of the processes is owned by user and alive, else false
+     * @throws IOException on I/O exception
+     */
+    public static boolean isAnyProcessAlive(Collection<Long> pids, String user) throws IOException {
+        if (pids == null || pids.isEmpty()) {
+            return false;
+        }
+
+        if (ServerUtils.IS_ON_WINDOWS) {
+            return isAnyWindowsProcessAlive(pids, user);
+        }
+        return isAnyPosixProcessAlive(pids, user);
+    }
+
+    /**
+     * Are any of the processes alive and running for the specified userId. If collection is empty or null
+     * then the return value is trivially false.
+     *
+     * @param pids the PIDs of the running processes
+     * @param uid the user that is expected to own that process
+     * @return true if any one of the processes is owned by user and alive, else false
+     * @throws IOException on I/O exception
+     */
+    public static boolean isAnyProcessAlive(Collection<Long> pids, int uid) throws IOException {
+        if (pids == null || pids.isEmpty()) {
+            return false;
+        }
+        if (ServerUtils.IS_ON_WINDOWS) {
+            return isAnyWindowsProcessAlive(pids, uid);
+        }
+        return isAnyPosixProcessAlive(pids, uid);
+    }
+
+    /**
+     * Find if any of the Windows processes are alive and owned by the specified user.
+     * Command reference https://docs.microsoft.com/en-us/windows-server/administration/windows-commands/tasklist.
+     *
+     * @param pids the PIDs of the running processes
+     * @param user the user that is expected to own that process
+     * @return true if any one of the processes is owned by user and alive, else false
+     * @throws IOException on I/O exception
+     */
+    private static boolean isAnyWindowsProcessAlive(Collection<Long> pids, String user) throws IOException {
+        List<String> cmdArgs = new ArrayList<>();
+        cmdArgs.add("tasklist");
+        cmdArgs.add("/fo");
+        cmdArgs.add("list");
+        pids.forEach(pid -> {
+            cmdArgs.add("/fi");
+            cmdArgs.add("pid eq " + pid);
+        });
+        cmdArgs.add("/v");
+        LOG.debug("CMD: {}", String.join(" ", cmdArgs));
+        ProcessBuilder pb = new ProcessBuilder(cmdArgs);
+        pb.redirectError(ProcessBuilder.Redirect.INHERIT);
+        List<String> unexpectedUsers = new ArrayList<>();
+        try (BufferedReader in = new BufferedReader(new InputStreamReader(pb.start().getInputStream()))) {
+            int lineNo = 0;
+            String line;
+            while ((line = in.readLine()) != null) {
+                lineNo++;
+                LOG.debug("CMD-LINE#{}: {}", lineNo, line);
+                if (line.contains("User Name:")) { //Check for : in case someone called their user "User Name"
+                    //This line contains the user name for the pid we're looking up
+                    //Example line: "User Name:    exampleDomain\exampleUser"
+                    List<String> userNameLineSplitOnWhitespace = Arrays.asList(line.split(":"));
+                    if (userNameLineSplitOnWhitespace.size() == 2) {
+                        List<String> userAndMaybeDomain = Arrays.asList(userNameLineSplitOnWhitespace.get(1).trim().split("\\\\"));
+                        String processUser = userAndMaybeDomain.size() == 2 ? userAndMaybeDomain.get(1) : userAndMaybeDomain.get(0);
+                        processUser = processUser.trim();
+                        if (user.equals(processUser)) {
+                            return true;
+                        }
+                        unexpectedUsers.add(processUser);
+                    } else {
+                        LOG.error("Received unexpected output from tasklist command. Expected one colon in user name line. Line was {}",
+                            line);
+                    }
+                }
+            }
+        } catch (IOException ex) {
+            String err = String.format("Cannot read output of command \"%s\"", String.join(" ", cmdArgs));
+            throw new IOException(err, ex);
+        }
+        String pidsAsStr = StringUtils.join(pids, ",");
+        if (unexpectedUsers.isEmpty()) {
+            LOG.info("None of the processes {} are alive", pidsAsStr);
+        } else {
+            LOG.info("{} of the Processes {} are running as user(s) {}: but expected user is {}",
+                unexpectedUsers.size(), pidsAsStr, String.join(",", new TreeSet<>(unexpectedUsers)), user);
+        }
+        return false;
+    }
+
+    /**
+     * Find if any of the Windows processes are alive and owned by the specified userId.
+     * This overridden method is provided for symmetry, but is not implemented.
+     *
+     * @param pids the PIDs of the running processes
+     * @param uid the user that is expected to own that process
+     * @return true if any one of the processes is owned by user and alive, else false
+     * @throws IOException on I/O exception
+     */
+    private static boolean isAnyWindowsProcessAlive(Collection<Long> pids, int uid) throws IOException {
+        throw new IllegalArgumentException("UID is not supported on Windows");
+    }
+
+    /**
+     * Are any of the processes alive and running for the specified user.
+     *
+     * @param pids the PIDs of the running processes
+     * @param user the user that is expected to own that process
+     * @return true if any one of the processes is owned by user and alive, else false
+     * @throws IOException on I/O exception
+     */
+    private static boolean isAnyPosixProcessAlive(Collection<Long> pids, String user) throws IOException {
+        String pidParams = StringUtils.join(pids, ",");
+        LOG.debug("CMD: ps -o user -p {}", pidParams);
+        ProcessBuilder pb = new ProcessBuilder("ps", "-o", "user", "-p", pidParams);
+        pb.redirectError(ProcessBuilder.Redirect.INHERIT);
+        List<String> unexpectedUsers = new ArrayList<>();
+        try (BufferedReader in = new BufferedReader(new InputStreamReader(pb.start().getInputStream()))) {
+            int lineNo = 1;
+            String line = in.readLine();
+            LOG.debug("CMD-LINE#{}: {}", lineNo, line);
+            if (!"USER".equals(line.trim())) {
+                LOG.error("Expecting first line to contain USER, found \"{}\"", line);
+                return false;
+            }
+            while ((line = in.readLine()) != null) {
+                lineNo++;
+                LOG.debug("CMD-LINE#{}: {}", lineNo, line);
+                line = line.trim();
+                if (user.equals(line)) {
+                    return true;
+                }
+                unexpectedUsers.add(line);
+            }
+        } catch (IOException ex) {
+            String err = String.format("Cannot read output of command \"ps -o user -p %s\"", pidParams);
+            throw new IOException(err, ex);
+        }
+        if (unexpectedUsers.isEmpty()) {
+            LOG.info("None of the processes {} are alive", pidParams);
+        } else {
+            LOG.info("{} of {} Processes {} are running as user(s) {}: but expected user is {}",
+                unexpectedUsers.size(), pids.size(), pidParams, String.join(",", new TreeSet<>(unexpectedUsers)), user);
+        }
+        return false;
+    }
+
+    /**
+     * Are any of the processes alive and running for the specified UID.
+     *
+     * @param pids the PIDs of the running processes
+     * @param uid the userId that is expected to own that process
+     * @return true if any one of the processes is owned by user and alive, else false
+     * @throws IOException on I/O exception
+     */
+    private static boolean isAnyPosixProcessAlive(Collection<Long> pids, int uid) throws IOException {
+        String pidParams = StringUtils.join(pids, ",");
+        LOG.debug("CMD: ps -o uid -p {}", pidParams);
+        ProcessBuilder pb = new ProcessBuilder("ps", "-o", "uid", "-p", pidParams);
+        pb.redirectError(ProcessBuilder.Redirect.INHERIT);
+        List<String> unexpectedUsers = new ArrayList<>();
+        try (BufferedReader in = new BufferedReader(new InputStreamReader(pb.start().getInputStream()))) {
+            int lineNo = 1;
+            String line = in.readLine();
+            LOG.debug("CMD-LINE#{}: {}", lineNo, line);
+            if (!"UID".equals(line.trim())) {
+                LOG.error("Expecting first line to contain UID, found \"{}\"", line);
+                return false;
+            }
+            while ((line = in.readLine()) != null) {
+                lineNo++;
+                LOG.debug("CMD-LINE#{}: {}", lineNo, line);
+                line = line.trim();
+                try {
+                    if (uid == Integer.parseInt(line)) {
+                        return true;
+                    }
+                } catch (Exception ex) {
+                    LOG.warn("Expecting UID integer but got {} in output of ps command", line);
+                }
+                unexpectedUsers.add(line);
+            }
+        } catch (IOException ex) {
+            String err = String.format("Cannot read output of command \"ps -o uid -p %s\"", pidParams);
+            throw new IOException(err, ex);
+        }
+        if (unexpectedUsers.isEmpty()) {
+            LOG.info("None of the processes {} are alive", pidParams);
+        } else {
+            LOG.info("{} of {} Processes {} are running as UIDs {}: but expected userId is {}",
+                unexpectedUsers.size(), pids.size(), pidParams, String.join(",", new TreeSet<>(unexpectedUsers)), uid);
+        }
+        return false;
+    }
+
+    /**
+     * Get the userId for a user name. This works on Posix systems by using "id -u" command.
+     * Throw IllegalArgumentException on Windows.
+     *
+     * @param user username to be converted to UID. This is optional, in which case current user is returned.
+     * @return UID for the specified user (if supplied), else UID of current user, -1 upon Exception.
+     */
+    public static int getUserId(String user) {
+        if (ServerUtils.IS_ON_WINDOWS) {
+            throw new IllegalArgumentException("Not supported in Windows platform");
+        }
+        List<String> cmdArgs = new ArrayList<>();
+        cmdArgs.add("id");
+        cmdArgs.add("-u");
+        if (user != null && !user.isEmpty()) {
+            cmdArgs.add(user);
+        }
+        LOG.debug("CMD: {}", String.join(" ", cmdArgs));
+        ProcessBuilder pb = new ProcessBuilder(cmdArgs);
+        pb.redirectError(ProcessBuilder.Redirect.INHERIT);
+        try (BufferedReader in = new BufferedReader(new InputStreamReader(pb.start().getInputStream()))) {
+            String line = in.readLine();
+            LOG.debug("CMD-LINE#1: {}", line);
+            try {
+                return Integer.parseInt(line.trim());
+            } catch (NumberFormatException ex) {
+                LOG.error("Expecting UID integer but got {} in output of \"id -u {}\" command", line, user);
+                return -1;
+            }
+        } catch (IOException ex) {
+            LOG.error(String.format("Cannot read output of command \"%s\"", String.join(" ", cmdArgs)), ex);
+            return -1;
+        }
+    }
+
+    /**
+     * Get the userId of the onwer of the path by running "ls -dn path" command.
+     * This command works on Posix systems only.
+     *
+     * @param fpath full path to the file or directory.
+     * @return UID for the specified if successful, -1 upon failure.
+     */
+    public static int getPathOwnerUid(String fpath) {
+        if (ServerUtils.IS_ON_WINDOWS) {
+            throw new IllegalArgumentException("Not supported in Windows platform");
+        }
+        File f = new File(fpath);
+        if (!f.exists()) {
+            LOG.error("Cannot determine owner of non-existent file {}", fpath);
+            return -1;
+        }
+        LOG.debug("CMD: ls -dn {}", fpath);
+        ProcessBuilder pb = new ProcessBuilder("ls", "-dn", fpath);
+        pb.redirectError(ProcessBuilder.Redirect.INHERIT);
+        try (BufferedReader in = new BufferedReader(new InputStreamReader(pb.start().getInputStream()))) {
+            String line = in.readLine();
+            LOG.debug("CMD-OUTLINE: {}", line);
+            line = line.trim();
+            String[] parts = line.split("\\s+");
+            if (parts.length < 3) {
+                LOG.error("Expecting at least 3 space separated fields in \"ls -dn {}\" output, got {}", fpath, line);
+                return -1;
+            }
+            try {
+                return Integer.parseInt(parts[2]);
+            } catch (NumberFormatException ex) {
+                LOG.error("Expecting at third field {} to be numeric UID \"ls -dn {}\" output, got {}", parts[2], fpath, line);
+                return -1;
+            }
+        } catch (IOException ex) {
+            LOG.error(String.format("Cannot read output of command \"ls -dn %s\"", fpath), ex);
+            return -1;
+        }
+    }
+
+    /**
+     * Get UID of the owner to the workerId Root directory.
+     *
+     * @return User ID (UID) of owner of the workerId root directory, -1 if directory is missing.
+     */
+    private static int getWorkerPathOwnerUid(Map<String, Object> conf, String workerId) {
+        return getPathOwnerUid(ConfigUtils.workerRoot(conf, workerId));
+    }
+
+    private static final Map<String, Integer> cachedUserToUidMap = new ConcurrentHashMap<>();
+
+    /**
+     * Find if all processes for the user on workId are dead.
+     * This method attempts to optimize the calls by:
+     * <p>
+     *     <li>checking a collection of ProcessIds at once</li>
+     *     <li>using userId one Posix systems instead of user</li>
+     * </p>
+     *
+     * @return true if all processes for the user are dead on the worker
+     * @throws IOException if external commands have exception.
+     */
+    public static boolean areAllProcessesDead(Map<String, Object> conf, String user, String workerId, Set<Long> pids) throws IOException {
+        if (pids == null || pids.isEmpty()) {
+            return true;
+        }
+
+        boolean allDead = true;
+        if (ServerUtils.IS_ON_WINDOWS) {
+            return allDead = !isAnyProcessAlive(pids, user);
+        }
+        // optimized for Posix - try to use uid
+        if (!cachedUserToUidMap.containsKey(user)) {
+            int uid = ServerUtils.getWorkerPathOwnerUid(conf, workerId);
+            if (uid < 0) {
+                uid = ServerUtils.getUserId(user);
+            }
+            if (uid >= 0) {
+                cachedUserToUidMap.put(user, uid);
+            }
+        }
+        if (cachedUserToUidMap.containsKey(user)) {
+            return allDead = !ServerUtils.isAnyProcessAlive(pids, cachedUserToUidMap.get(user));
+        } else {
+            return allDead = !ServerUtils.isAnyProcessAlive(pids, user);
+        }
+    }
 }
diff --git a/storm-server/src/test/java/org/apache/storm/daemon/supervisor/BasicContainerTest.java b/storm-server/src/test/java/org/apache/storm/daemon/supervisor/BasicContainerTest.java
index cb95229..51cd6a7 100644
--- a/storm-server/src/test/java/org/apache/storm/daemon/supervisor/BasicContainerTest.java
+++ b/storm-server/src/test/java/org/apache/storm/daemon/supervisor/BasicContainerTest.java
@@ -24,6 +24,7 @@ import org.apache.storm.Config;
 import org.apache.storm.DaemonConfig;
 import org.apache.storm.container.ResourceIsolationInterface;
 import org.apache.storm.daemon.supervisor.Container.ContainerType;
+import org.apache.storm.daemon.supervisor.ContainerTest.MockResourceIsolationManager;
 import org.apache.storm.generated.LocalAssignment;
 import org.apache.storm.generated.ProfileAction;
 import org.apache.storm.generated.ProfileRequest;
@@ -102,8 +103,10 @@ public class BasicContainerTest {
 
         LocalState ls = mock(LocalState.class);
 
+        ResourceIsolationInterface iso = mock(ResourceIsolationInterface.class);
+
         MockBasicContainer mc = new MockBasicContainer(ContainerType.LAUNCH, superConf,
-            "SUPERVISOR", supervisorPort, port, la, null, ls, null, new StormMetricsRegistry(),
+            "SUPERVISOR", supervisorPort, port, la, iso, ls, null, new StormMetricsRegistry(),
             new HashMap<>(), ops, "profile");
         //null worker id means generate one...
 
@@ -133,8 +136,10 @@ public class BasicContainerTest {
         AdvancedFSOps ops = mock(AdvancedFSOps.class);
         when(ops.doRequiredTopoFilesExist(superConf, topoId)).thenReturn(true);
 
+        ResourceIsolationInterface iso = mock(ResourceIsolationInterface.class);
+
         MockBasicContainer mc = new MockBasicContainer(ContainerType.RECOVER_FULL, superConf,
-            "SUPERVISOR", supervisorPort, port, la, null, ls, null, new StormMetricsRegistry(),
+            "SUPERVISOR", supervisorPort, port, la, iso, ls, null, new StormMetricsRegistry(),
             new HashMap<>(), ops, "profile");
 
         assertEquals(workerId, mc.workerId);
@@ -183,8 +188,10 @@ public class BasicContainerTest {
         LocalState ls = mock(LocalState.class);
         when(ls.getApprovedWorkers()).thenReturn(new HashMap<>(workerState));
 
+        ResourceIsolationInterface iso = mock(ResourceIsolationInterface.class);
+
         MockBasicContainer mc = new MockBasicContainer(ContainerType.LAUNCH, superConf,
-            "SUPERVISOR", supervisorPort, port, la, null, ls, workerId, new StormMetricsRegistry(), new HashMap<>(), ops,
+            "SUPERVISOR", supervisorPort, port, la, iso, ls, workerId, new StormMetricsRegistry(), new HashMap<>(), ops,
                                                        "profile");
 
         mc.cleanUp();
@@ -219,8 +226,10 @@ public class BasicContainerTest {
 
         LocalState ls = mock(LocalState.class);
 
+        MockResourceIsolationManager iso = new MockResourceIsolationManager();
+
         MockBasicContainer mc = new MockBasicContainer(ContainerType.LAUNCH, superConf,
-            "SUPERVISOR", supervisorPort, port, la, null, ls, workerId, new StormMetricsRegistry(),
+            "SUPERVISOR", supervisorPort, port, la, iso, ls, workerId, new StormMetricsRegistry(),
             new HashMap<>(), ops, "profile");
 
         //HEAP DUMP
@@ -229,9 +238,9 @@ public class BasicContainerTest {
 
         mc.runProfiling(req, false);
 
-        assertEquals(1, mc.profileCmds.size());
-        CommandRun cmd = mc.profileCmds.get(0);
-        mc.profileCmds.clear();
+        assertEquals(1, iso.profileCmds.size());
+        CommandRun cmd = iso.profileCmds.get(0);
+        iso.profileCmds.clear();
         assertEquals(Arrays.asList("profile", String.valueOf(pid), "jmap", topoRoot), cmd.cmd);
         assertEquals(new File(topoRoot), cmd.pwd);
 
@@ -240,9 +249,9 @@ public class BasicContainerTest {
 
         mc.runProfiling(req, false);
 
-        assertEquals(1, mc.profileCmds.size());
-        cmd = mc.profileCmds.get(0);
-        mc.profileCmds.clear();
+        assertEquals(1, iso.profileCmds.size());
+        cmd = iso.profileCmds.get(0);
+        iso.profileCmds.clear();
         assertEquals(Arrays.asList("profile", String.valueOf(pid), "jstack", topoRoot), cmd.cmd);
         assertEquals(new File(topoRoot), cmd.pwd);
 
@@ -251,9 +260,9 @@ public class BasicContainerTest {
 
         mc.runProfiling(req, false);
 
-        assertEquals(1, mc.profileCmds.size());
-        cmd = mc.profileCmds.get(0);
-        mc.profileCmds.clear();
+        assertEquals(1, iso.profileCmds.size());
+        cmd = iso.profileCmds.get(0);
+        iso.profileCmds.clear();
         assertEquals(Arrays.asList("profile", String.valueOf(pid), "kill"), cmd.cmd);
         assertEquals(new File(topoRoot), cmd.pwd);
 
@@ -262,9 +271,9 @@ public class BasicContainerTest {
 
         mc.runProfiling(req, false);
 
-        assertEquals(1, mc.profileCmds.size());
-        cmd = mc.profileCmds.get(0);
-        mc.profileCmds.clear();
+        assertEquals(1, iso.profileCmds.size());
+        cmd = iso.profileCmds.get(0);
+        iso.profileCmds.clear();
         assertEquals(Arrays.asList("profile", String.valueOf(pid), "dump", topoRoot), cmd.cmd);
         assertEquals(new File(topoRoot), cmd.pwd);
 
@@ -273,9 +282,9 @@ public class BasicContainerTest {
 
         mc.runProfiling(req, false);
 
-        assertEquals(1, mc.profileCmds.size());
-        cmd = mc.profileCmds.get(0);
-        mc.profileCmds.clear();
+        assertEquals(1, iso.profileCmds.size());
+        cmd = iso.profileCmds.get(0);
+        iso.profileCmds.clear();
         assertEquals(Arrays.asList("profile", String.valueOf(pid), "start"), cmd.cmd);
         assertEquals(new File(topoRoot), cmd.pwd);
 
@@ -284,9 +293,9 @@ public class BasicContainerTest {
 
         mc.runProfiling(req, true);
 
-        assertEquals(1, mc.profileCmds.size());
-        cmd = mc.profileCmds.get(0);
-        mc.profileCmds.clear();
+        assertEquals(1, iso.profileCmds.size());
+        cmd = iso.profileCmds.get(0);
+        iso.profileCmds.clear();
         assertEquals(Arrays.asList("profile", String.valueOf(pid), "stop", topoRoot), cmd.cmd);
         assertEquals(new File(topoRoot), cmd.pwd);
     }
@@ -329,16 +338,18 @@ public class BasicContainerTest {
 
         LocalState ls = mock(LocalState.class);
 
+        MockResourceIsolationManager iso = new MockResourceIsolationManager();
+
         checkpoint(() -> {
                        MockBasicContainer mc = new MockBasicContainer(ContainerType.LAUNCH, superConf,
-                "SUPERVISOR", supervisorPort, port, la, null, ls, workerId, new StormMetricsRegistry(),
+                "SUPERVISOR", supervisorPort, port, la, iso, ls, workerId, new StormMetricsRegistry(),
                 new HashMap<>(), ops, "profile");
 
                        mc.launch();
 
-                       assertEquals(1, mc.workerCmds.size());
-                       CommandRun cmd = mc.workerCmds.get(0);
-                       mc.workerCmds.clear();
+                       assertEquals(1, iso.workerCmds.size());
+                       CommandRun cmd = iso.workerCmds.get(0);
+                       iso.workerCmds.clear();
                        assertListEquals(Arrays.asList(
                            "java",
                            "-cp",
@@ -431,16 +442,18 @@ public class BasicContainerTest {
 
         LocalState ls = mock(LocalState.class);
 
+        MockResourceIsolationManager iso = new MockResourceIsolationManager();
+
         checkpoint(() -> {
                        MockBasicContainer mc = new MockBasicContainer(ContainerType.LAUNCH, superConf,
-                "SUPERVISOR", supervisorPort, port, la, null, ls, workerId, new StormMetricsRegistry(),
+                "SUPERVISOR", supervisorPort, port, la, iso, ls, workerId, new StormMetricsRegistry(),
                 new HashMap<>(), ops, "profile");
 
                        mc.launch();
 
-                       assertEquals(1, mc.workerCmds.size());
-                       CommandRun cmd = mc.workerCmds.get(0);
-                       mc.workerCmds.clear();
+                       assertEquals(1, iso.workerCmds.size());
+                       CommandRun cmd = iso.workerCmds.get(0);
+                       iso.workerCmds.clear();
                        assertListEquals(Arrays.asList(
                            "java",
                            "-cp",
@@ -532,16 +545,18 @@ public class BasicContainerTest {
 
         LocalState ls = mock(LocalState.class);
 
+        MockResourceIsolationManager iso = new MockResourceIsolationManager();
+
         checkpoint(() -> {
                        MockBasicContainer mc = new MockBasicContainer(ContainerType.LAUNCH, superConf,
-                "SUPERVISOR", supervisorPort, port, la, null, ls, workerId, new StormMetricsRegistry(),
+                "SUPERVISOR", supervisorPort, port, la, iso, ls, workerId, new StormMetricsRegistry(),
                 new HashMap<>(), ops, "profile");
 
                        mc.launch();
 
-                       assertEquals(1, mc.workerCmds.size());
-                       CommandRun cmd = mc.workerCmds.get(0);
-                       mc.workerCmds.clear();
+                       assertEquals(1, iso.workerCmds.size());
+                       CommandRun cmd = iso.workerCmds.get(0);
+                       iso.workerCmds.clear();
                        assertListEquals(Arrays.asList(
                            "java",
                            "-cp",
@@ -611,8 +626,10 @@ public class BasicContainerTest {
 
         LocalState ls = mock(LocalState.class);
 
+        ResourceIsolationInterface iso = mock(ResourceIsolationInterface.class);
+
         MockBasicContainer mc = new MockBasicContainer(ContainerType.LAUNCH, superConf,
-            "SUPERVISOR", supervisorPort, port, la, null, ls, workerId, new StormMetricsRegistry(),
+            "SUPERVISOR", supervisorPort, port, la, iso, ls, workerId, new StormMetricsRegistry(),
             new HashMap<>(), ops, "profile");
 
         assertListEquals(Arrays.asList(
@@ -641,8 +658,8 @@ public class BasicContainerTest {
                          mc.substituteChildopts(null));
     }
 
-    private static interface Run {
-        public void run() throws Exception;
+    private interface Run {
+        void run() throws Exception;
     }
 
     public static class CommandRun {
@@ -658,8 +675,6 @@ public class BasicContainerTest {
     }
 
     public static class MockBasicContainer extends BasicContainer {
-        public final List<CommandRun> profileCmds = new ArrayList<>();
-        public final List<CommandRun> workerCmds = new ArrayList<>();
         public MockBasicContainer(ContainerType type, Map<String, Object> conf, String supervisorId, int supervisorPort,
                                   int port, LocalAssignment assignment, ResourceIsolationInterface resourceIsolationManager,
                                   LocalState localState, String workerId, StormMetricsRegistry metricsRegistry, 
@@ -684,19 +699,6 @@ public class BasicContainerTest {
         }
 
         @Override
-        protected boolean runProfilingCommand(List<String> command, Map<String, String> env, String logPrefix,
-                                              File targetDir) throws IOException, InterruptedException {
-            profileCmds.add(new CommandRun(command, env, targetDir));
-            return true;
-        }
-
-        @Override
-        protected void launchWorkerProcess(List<String> command, Map<String, String> env, String logPrefix,
-                                           ExitCodeCallback processExitCallback, File targetDir) throws IOException {
-            workerCmds.add(new CommandRun(command, env, targetDir));
-        }
-
-        @Override
         protected String javaCmd(String cmd) {
             //avoid system dependent things
             return cmd;
diff --git a/storm-server/src/test/java/org/apache/storm/daemon/supervisor/ContainerTest.java b/storm-server/src/test/java/org/apache/storm/daemon/supervisor/ContainerTest.java
index 5c4483d..8390090 100644
--- a/storm-server/src/test/java/org/apache/storm/daemon/supervisor/ContainerTest.java
+++ b/storm-server/src/test/java/org/apache/storm/daemon/supervisor/ContainerTest.java
@@ -14,38 +14,29 @@ package org.apache.storm.daemon.supervisor;
 
 import com.google.common.base.Joiner;
 
-import java.io.BufferedReader;
 import java.io.File;
 import java.io.IOException;
-import java.io.InputStreamReader;
 import java.io.StringWriter;
-import java.nio.file.Files;
-import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
-import org.apache.commons.lang.RandomStringUtils;
 import org.apache.storm.Config;
 import org.apache.storm.DaemonConfig;
 import org.apache.storm.container.ResourceIsolationInterface;
+import org.apache.storm.daemon.supervisor.BasicContainerTest.CommandRun;
 import org.apache.storm.daemon.supervisor.Container.ContainerType;
 import org.apache.storm.generated.LocalAssignment;
 import org.apache.storm.generated.ProfileRequest;
 import org.apache.storm.utils.ObjectReader;
-import org.apache.storm.utils.ServerUtils;
 import org.junit.Test;
 import org.yaml.snakeyaml.Yaml;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.eq;
@@ -84,26 +75,23 @@ public class ContainerTest {
         when(ops.doRequiredTopoFilesExist(superConf, topoId)).thenReturn(true);
         LocalAssignment la = new LocalAssignment();
         la.set_topology_id(topoId);
+        MockResourceIsolationManager iso = new MockResourceIsolationManager();
+        String workerId = "worker-id";
         MockContainer mc = new MockContainer(ContainerType.LAUNCH, superConf,
-                                             "SUPERVISOR", 6628, 8080, la, null, "worker", new HashMap<>(), ops, new StormMetricsRegistry());
-        mc.kill();
-        assertEquals(Collections.EMPTY_LIST, mc.killedPids);
-        assertEquals(Collections.EMPTY_LIST, mc.forceKilledPids);
-        mc.forceKill();
-        assertEquals(Collections.EMPTY_LIST, mc.killedPids);
-        assertEquals(Collections.EMPTY_LIST, mc.forceKilledPids);
+            "SUPERVISOR", 6628, 8080, la, iso, workerId, new HashMap<>(), ops, new StormMetricsRegistry());
+        iso.allWorkerIds.add(workerId);
 
-        long pid = 987654321;
-        mc.allPids.add(pid);
+        assertEquals(Collections.EMPTY_LIST, iso.killedWorkerIds);
+        assertEquals(Collections.EMPTY_LIST, iso.forceKilledWorkerIds);
 
         mc.kill();
-        assertEquals(mc.allPids, new HashSet<>(mc.killedPids));
-        assertEquals(Collections.EMPTY_LIST, mc.forceKilledPids);
-        mc.killedPids.clear();
+        assertEquals(iso.allWorkerIds, iso.killedWorkerIds);
+        assertEquals(Collections.EMPTY_LIST, iso.forceKilledWorkerIds);
+        iso.killedWorkerIds.clear();
 
         mc.forceKill();
-        assertEquals(Collections.EMPTY_LIST, mc.killedPids);
-        assertEquals(mc.allPids, new HashSet<>(mc.forceKilledPids));
+        assertEquals(Collections.EMPTY_LIST, iso.killedWorkerIds);
+        assertEquals(iso.allWorkerIds, iso.forceKilledWorkerIds);
     }
 
     @SuppressWarnings("unchecked")
@@ -147,8 +135,9 @@ public class ContainerTest {
         LocalAssignment la = new LocalAssignment();
         la.set_topology_id(topoId);
         la.set_owner(user);
+        ResourceIsolationInterface iso = mock(ResourceIsolationInterface.class);
         MockContainer mc = new MockContainer(ContainerType.LAUNCH, superConf,
-                                             "SUPERVISOR", 6628, 8080, la, null, workerId, topoConf, ops, new StormMetricsRegistry());
+                                             "SUPERVISOR", 6628, 8080, la, iso, workerId, topoConf, ops, new StormMetricsRegistry());
 
         mc.setup();
 
@@ -163,7 +152,7 @@ public class ContainerTest {
 
         String yamlResult = yamlDump.toString();
         Yaml yaml = new Yaml();
-        Map<String, Object> result = (Map<String, Object>) yaml.load(yamlResult);
+        Map<String, Object> result = yaml.load(yamlResult);
         assertEquals(workerId, result.get("worker-id"));
         assertEquals(user, result.get(Config.TOPOLOGY_SUBMITTER_USER));
         HashSet<String> allowedUsers = new HashSet<>(topoUsers);
@@ -188,7 +177,6 @@ public class ContainerTest {
     public void testCleanup() throws Exception {
         final int supervisorPort = 6628;
         final int port = 8080;
-        final long pid = 100;
         final String topoId = "test_topology";
         final String workerId = "worker_id";
         final String user = "me";
@@ -197,7 +185,6 @@ public class ContainerTest {
         final File logMetadataFile = new File(workerArtifacts, "worker.yaml");
         final File workerUserFile = asAbsFile(stormLocal, "workers-users", workerId);
         final File workerRoot = asAbsFile(stormLocal, "workers", workerId);
-        final File workerPidsRoot = new File(workerRoot, "pids");
 
         final Map<String, Object> topoConf = new HashMap<>();
 
@@ -214,16 +201,15 @@ public class ContainerTest {
         when(ops.getWriter(logMetadataFile)).thenReturn(yamlDump);
 
         ResourceIsolationInterface iso = mock(ResourceIsolationInterface.class);
+        when(iso.isResourceManaged()).thenReturn(true);
 
         LocalAssignment la = new LocalAssignment();
         la.set_owner(user);
         la.set_topology_id(topoId);
         MockContainer mc = new MockContainer(ContainerType.LAUNCH, superConf,
                                              "SUPERVISOR", supervisorPort, port, la, iso, workerId, topoConf, ops, new StormMetricsRegistry());
-        mc.allPids.add(pid);
 
         mc.cleanUp();
-        verify(ops).deleteIfExists(eq(new File(workerPidsRoot, String.valueOf(pid))), eq(user), any(String.class));
         verify(iso).releaseResourcesForWorker(workerId);
 
         verify(ops).deleteIfExists(eq(new File(workerRoot, "pids")), eq(user), any(String.class));
@@ -235,9 +221,6 @@ public class ContainerTest {
 
     public static class MockContainer extends Container {
 
-        public final List<Long> killedPids = new ArrayList<>();
-        public final List<Long> forceKilledPids = new ArrayList<>();
-        public final Set<Long> allPids = new HashSet<>();
         protected MockContainer(ContainerType type, Map<String, Object> conf, String supervisorId, int supervisorPort,
                                 int port, LocalAssignment assignment, ResourceIsolationInterface resourceIsolationManager,
                                 String workerId, Map<String, Object> topoConf, AdvancedFSOps ops, StormMetricsRegistry metricsRegistry) throws IOException {
@@ -246,124 +229,100 @@ public class ContainerTest {
         }
 
         @Override
-        protected void kill(long pid) {
-            killedPids.add(pid);
+        public void launch() throws IOException {
+            fail("THIS IS NOT UNDER TEST");
         }
 
         @Override
-        protected void forceKill(long pid) {
-            forceKilledPids.add(pid);
+        public void relaunch() throws IOException {
+            fail("THIS IS NOT UNDER TEST");
         }
 
         @Override
-        protected Set<Long> getAllPids() throws IOException {
-            return allPids;
+        public boolean didMainProcessExit() {
+            fail("THIS IS NOT UNDER TEST");
+            return false;
         }
 
         @Override
-        public void launch() throws IOException {
+        public boolean runProfiling(ProfileRequest request, boolean stop) throws IOException, InterruptedException {
             fail("THIS IS NOT UNDER TEST");
+            return false;
         }
+    }
+
+    public static class MockResourceIsolationManager implements ResourceIsolationInterface {
+        public final List<String> killedWorkerIds = new ArrayList<>();
+        public final List<String> forceKilledWorkerIds = new ArrayList<>();
+        public final List<String> allWorkerIds = new ArrayList<>();
+
+        public final List<CommandRun> profileCmds = new ArrayList<>();
+        public final List<CommandRun> workerCmds = new ArrayList<>();
 
         @Override
-        public void relaunch() throws IOException {
+        public void prepare(Map<String, Object> conf) throws IOException {
             fail("THIS IS NOT UNDER TEST");
         }
 
         @Override
-        public boolean didMainProcessExit() {
+        public void reserveResourcesForWorker(String workerId, Integer workerMemory, Integer workerCpu, String numaId) {
             fail("THIS IS NOT UNDER TEST");
-            return false;
         }
 
         @Override
-        public boolean runProfiling(ProfileRequest request, boolean stop) throws IOException, InterruptedException {
+        public void releaseResourcesForWorker(String workerId) {
+        }
+
+        @Override
+        public void launchWorkerProcess(String user, String topologyId, Map<String, Object> topoConf,
+                                        int port, String workerId, List<String> command,
+                                        Map<String, String> env, String logPrefix,
+                                        ExitCodeCallback processExitCallback, File targetDir) throws IOException {
+            workerCmds.add(new CommandRun(command, env, targetDir));
+        }
+
+        @Override
+        public long getMemoryUsage(String user, String workerId, int port) throws IOException {
             fail("THIS IS NOT UNDER TEST");
-            return false;
+            return 0;
         }
-    }
 
-    private Collection<Long> getRunningProcessIds() throws IOException {
-        // get list of few running processes
-        Collection<Long> pids = new ArrayList<>();
-        Process p = Runtime.getRuntime().exec(ServerUtils.IS_ON_WINDOWS ? "tasklist" : "ps -e");
-        try (BufferedReader input = new BufferedReader(new InputStreamReader(p.getInputStream()))) {
-            String line;
-            while ((line = input.readLine()) != null) {
-                line = line.trim();
-                if (line.isEmpty()) {
-                    continue;
-                }
-                try {
-                    pids.add(Long.parseLong(line.split("\\s")[0]));
-                } catch (Exception ex) {
-                    ex.printStackTrace();
-                }
-            }
+        @Override
+        public long getSystemFreeMemoryMb() throws IOException {
+            fail("THIS IS NOT UNDER TEST");
+            return 0;
         }
-        return pids;
-    }
 
-    @Test
-    public void testIsProcessAlive() throws Exception {
-        // specific selected process should not be alive for a randomly generated user
-        String randomUser = RandomStringUtils.randomAlphanumeric(12);
-
-        // get list of few running processes
-        Collection<Long> pids = getRunningProcessIds();
-        assertFalse(pids.isEmpty());
-        for (long pid: pids) {
-            boolean status = Container.isProcessAlive(pid, randomUser);
-            assertFalse("Random user " + randomUser + " is not expected to own any process", status);
+        @Override
+        public void kill(String user, String workerId) throws IOException {
+            killedWorkerIds.add(workerId);
         }
 
-        boolean status = false;
-        String currentUser = System.getProperty("user.name");
-        for (long pid: pids) {
-            // at least one pid will be owned by the current user (doing the testing)
-            if (Container.isProcessAlive(pid, currentUser)) {
-                status = true;
-                break;
-            }
+        @Override
+        public void forceKill(String user, String workerId) throws IOException {
+            forceKilledWorkerIds.add(workerId);
         }
-        assertTrue("Expecting user " + currentUser + " to own at least one process", status);
-    }
 
-    @Test
-    public void testIsAnyProcessAlive() throws Exception {
-        // no process should be alive for a randomly generated user
-        String randomUser = RandomStringUtils.randomAlphanumeric(12);
-        Collection<Long> pids = getRunningProcessIds();
-
-        assertFalse(pids.isEmpty());
-        boolean status = Container.isAnyProcessAlive(pids, randomUser);
-        assertFalse("Random user " + randomUser + " is not expected to own any process", status);
-
-        // at least one pid will be owned by the current user (doing the testing)
-        String currentUser = System.getProperty("user.name");
-        status = Container.isAnyProcessAlive(pids, currentUser);
-        assertTrue("Expecting user " + currentUser + " to own at least one process", status);
-
-        if (!ServerUtils.IS_ON_WINDOWS) {
-            // userid test is valid only on Posix platforms
-            int inValidUserId = -1;
-            status = Container.isAnyProcessAlive(pids, inValidUserId);
-            assertFalse("Invalid userId " + randomUser + " is not expected to own any process", status);
-
-            int currentUid = Container.getUserId(null);
-            status = Container.isAnyProcessAlive(pids, currentUid);
-            assertTrue("Expecting uid " + currentUid + " to own at least one process", status);
+        @Override
+        public boolean areAllProcessesDead(String user, String workerId) throws IOException {
+            fail("THIS IS NOT UNDER TEST");
+            return false;
         }
-    }
 
-    @Test
-    public void testGetUserId() throws Exception {
-        if (ServerUtils.IS_ON_WINDOWS) {
-            return; // trivially succeed on Windows, since this test is not for Windows platform
+        @Override
+        public boolean runProfilingCommand(String user, String workerId, List<String> command, Map<String, String> env, String logPrefix, File targetDir) throws IOException, InterruptedException {
+            profileCmds.add(new CommandRun(command, env, targetDir));
+            return true;
+        }
+
+        @Override
+        public void cleanup(String user, String workerId, int port) {
+            //NO OP
+        }
+
+        @Override
+        public boolean isResourceManaged() {
+            return false;
         }
-        int uid1 = Container.getUserId(null);
-        Path p = Files.createTempFile("testGetUser", ".txt");
-        int uid2 = Container.getPathOwnerUid(p.toString());
-        assertEquals("User UID " + uid1 + " is not same as file " + p.toString() + " owner UID of " + uid2, uid1, uid2);
     }
 }
diff --git a/storm-server/src/test/java/org/apache/storm/utils/ServerUtilsTest.java b/storm-server/src/test/java/org/apache/storm/utils/ServerUtilsTest.java
index 2dc63e0..c4dc5ed 100644
--- a/storm-server/src/test/java/org/apache/storm/utils/ServerUtilsTest.java
+++ b/storm-server/src/test/java/org/apache/storm/utils/ServerUtilsTest.java
@@ -20,12 +20,21 @@
 package org.apache.storm.utils;
 
 import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.zip.ZipFile;
+import org.apache.commons.lang.RandomStringUtils;
 import org.apache.storm.testing.TmpPath;
 import org.junit.jupiter.api.Test;
 
@@ -71,4 +80,88 @@ public class ServerUtilsTest {
             assertThat(Files.exists(destParent.resolve("evil.txt")), is(false));
         }
     }
+
+    private Collection<Long> getRunningProcessIds() throws IOException {
+        // get list of few running processes
+        Collection<Long> pids = new ArrayList<>();
+        Process p = Runtime.getRuntime().exec(ServerUtils.IS_ON_WINDOWS ? "tasklist" : "ps -e");
+        try (BufferedReader input = new BufferedReader(new InputStreamReader(p.getInputStream()))) {
+            String line;
+            while ((line = input.readLine()) != null) {
+                line = line.trim();
+                if (line.isEmpty()) {
+                    continue;
+                }
+                try {
+                    pids.add(Long.parseLong(line.split("\\s")[0]));
+                } catch (Exception ex) {
+                    ex.printStackTrace();
+                }
+            }
+        }
+        return pids;
+    }
+
+    @Test
+    public void testIsProcessAlive() throws Exception {
+        // specific selected process should not be alive for a randomly generated user
+        String randomUser = RandomStringUtils.randomAlphanumeric(12);
+
+        // get list of few running processes
+        Collection<Long> pids = getRunningProcessIds();
+        assertFalse(pids.isEmpty());
+        for (long pid: pids) {
+            boolean status = ServerUtils.isProcessAlive(pid, randomUser);
+            assertFalse("Random user " + randomUser + " is not expected to own any process", status);
+        }
+
+        boolean status = false;
+        String currentUser = System.getProperty("user.name");
+        for (long pid: pids) {
+            // at least one pid will be owned by the current user (doing the testing)
+            if (ServerUtils.isProcessAlive(pid, currentUser)) {
+                status = true;
+                break;
+            }
+        }
+        assertTrue("Expecting user " + currentUser + " to own at least one process", status);
+    }
+
+    @Test
+    public void testIsAnyProcessAlive() throws Exception {
+        // no process should be alive for a randomly generated user
+        String randomUser = RandomStringUtils.randomAlphanumeric(12);
+        Collection<Long> pids = getRunningProcessIds();
+
+        assertFalse(pids.isEmpty());
+        boolean status = ServerUtils.isAnyProcessAlive(pids, randomUser);
+        assertFalse("Random user " + randomUser + " is not expected to own any process", status);
+
+        // at least one pid will be owned by the current user (doing the testing)
+        String currentUser = System.getProperty("user.name");
+        status = ServerUtils.isAnyProcessAlive(pids, currentUser);
+        assertTrue("Expecting user " + currentUser + " to own at least one process", status);
+
+        if (!ServerUtils.IS_ON_WINDOWS) {
+            // userid test is valid only on Posix platforms
+            int inValidUserId = -1;
+            status = ServerUtils.isAnyProcessAlive(pids, inValidUserId);
+            assertFalse("Invalid userId " + randomUser + " is not expected to own any process", status);
+
+            int currentUid = ServerUtils.getUserId(null);
+            status = ServerUtils.isAnyProcessAlive(pids, currentUid);
+            assertTrue("Expecting uid " + currentUid + " to own at least one process", status);
+        }
+    }
+
+    @Test
+    public void testGetUserId() throws Exception {
+        if (ServerUtils.IS_ON_WINDOWS) {
+            return; // trivially succeed on Windows, since this test is not for Windows platform
+        }
+        int uid1 = ServerUtils.getUserId(null);
+        Path p = Files.createTempFile("testGetUser", ".txt");
+        int uid2 = ServerUtils.getPathOwnerUid(p.toString());
+        assertEquals("User UID " + uid1 + " is not same as file " + p.toString() + " owner UID of " + uid2, uid1, uid2);
+    }
 }