You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/09/19 21:01:35 UTC

[6/8] storm git commit: STORM-2018: Supervisor V2

http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Container.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Container.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Container.java
new file mode 100644
index 0000000..d76b950
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Container.java
@@ -0,0 +1,549 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Writer;
+import java.lang.ProcessBuilder.Redirect;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.storm.Config;
+import org.apache.storm.container.ResourceIsolationInterface;
+import org.apache.storm.generated.LSWorkerHeartbeat;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.generated.ProfileRequest;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+
+/**
+ * Represents a container that a worker will run in.
+ */
+public abstract class Container implements Killable {
+    private static final Logger LOG = LoggerFactory.getLogger(Container.class);
+    public static enum ContainerType {
+        LAUNCH(false, false),
+        RECOVER_FULL(true, false),
+        RECOVER_PARTIAL(true, true);
+
+        private final boolean _recovery;
+        private final boolean _onlyKillable;
+        
+        ContainerType(boolean recovery, boolean onlyKillable) {
+            _recovery = recovery;
+            _onlyKillable = onlyKillable;
+        }
+        
+        public boolean isRecovery() {
+            return _recovery;
+        }
+        
+        public void assertFull() {
+            if (_onlyKillable) {
+                throw new IllegalStateException("Container is only Killable.");
+            }
+        }
+        
+        public boolean isOnlyKillable() {
+            return _onlyKillable;
+        }
+    }
+    
+    protected final Map<String, Object> _conf;
+    protected final Map<String, Object> _topoConf; //Not set if RECOVER_PARTIAL
+    protected String _workerId; 
+    protected final String _topologyId; //Not set if RECOVER_PARTIAL
+    protected final String _supervisorId;
+    protected final int _port; //Not set if RECOVER_PARTIAL
+    protected final LocalAssignment _assignment; //Not set if RECOVER_PARTIAL
+    protected final AdvancedFSOps _ops;
+    protected final ResourceIsolationInterface _resourceIsolationManager;
+    protected ContainerType _type;
+    
+    /**
+     * Create a new Container.
+     * @param type the type of container being made.
+     * @param conf the supervisor config
+     * @param supervisorId the ID of the supervisor this is a part of.
+     * @param port the port the container is on.  Should be <= 0 if only a partial recovery
+     * @param assignment the assignment for this container. Should be null if only a partial recovery.
+     * @param resourceIsolationManager used to isolate resources for a container can be null if no isolation is used.
+     * @param workerId the id of the worker to use.  Must not be null if doing a partial recovery.
+     * @param topoConf the config of the topology (mostly for testing) if null 
+     * and not a partial recovery the real conf is read.
+     * @param ops file system operations (mostly for testing) if null a new one is made
+     * @throws IOException on any error.
+     */
+    protected Container(ContainerType type, Map<String, Object> conf, String supervisorId,
+            int port, LocalAssignment assignment, ResourceIsolationInterface resourceIsolationManager,
+            String workerId, Map<String, Object> topoConf,  AdvancedFSOps ops) throws IOException {
+        assert(type != null);
+        assert(conf != null);
+        assert(supervisorId != null);
+        
+        if (ops == null) {
+            ops = AdvancedFSOps.make(conf);
+        }
+        
+        _workerId = workerId;
+        _type = type;
+        _port = port;
+        _ops = ops;
+        _conf = conf;
+        _supervisorId = supervisorId;
+        _resourceIsolationManager = resourceIsolationManager;
+        _assignment = assignment;
+        
+        if (_type.isOnlyKillable()) {
+            assert(_assignment == null);
+            assert(_port <= 0);
+            assert(_workerId != null);
+            _topologyId = null;
+            _topoConf = null;
+        } else {
+            assert(assignment != null);
+            assert(port > 0);
+            _topologyId = assignment.get_topology_id();
+            if (!_ops.doRequiredTopoFilesExist(_conf, _topologyId)) {
+                LOG.info("Missing topology storm code, so can't launch  worker with assignment {} for this supervisor {} on port {} with id {}", _assignment,
+                        _supervisorId, _port, _workerId);
+                throw new ContainerRecoveryException("Missing required topology files...");
+            }
+            if (topoConf == null) {
+                _topoConf = readTopoConf();
+            } else {
+                //For testing...
+                _topoConf = topoConf;
+            }
+        }
+    }
+    
+    @Override
+    public String toString() {
+        return "topo:" + _topologyId + " worker:" + _workerId;
+    }
+    
+    protected Map<String, Object> readTopoConf() throws IOException {
+        assert(_topologyId != null);
+        return ConfigUtils.readSupervisorStormConf(_conf, _topologyId);
+    }
+    
+    /**
+     * Kill a given process
+     * @param pid the id of the process to kill
+     * @throws IOException
+     */
+    protected void kill(long pid) throws IOException {
+        Utils.killProcessWithSigTerm(String.valueOf(pid));
+    }
+    
+    /**
+     * Kill a given process
+     * @param pid the id of the process to kill
+     * @throws IOException
+     */
+    protected void forceKill(long pid) throws IOException {
+        Utils.forceKillProcess(String.valueOf(pid));
+    }
+    
+    @Override
+    public void kill() throws IOException {
+        LOG.info("Killing {}:{}", _supervisorId, _workerId);
+        Set<Long> pids = getAllPids();
+
+        for (Long pid : pids) {
+            kill(pid);
+        }
+    }
+    
+    @Override
+    public void forceKill() throws IOException {
+        LOG.info("Force Killing {}:{}", _supervisorId, _workerId);
+        Set<Long> pids = getAllPids();
+        
+        for (Long pid : pids) {
+            forceKill(pid);
+        }
+    }
+    
+    /**
+     * Read the Heartbeat for the current container.
+     * @return the Heartbeat
+     * @throws IOException on any error
+     */
+    public LSWorkerHeartbeat readHeartbeat() throws IOException {
+        LocalState localState = ConfigUtils.workerState(_conf, _workerId);
+        LSWorkerHeartbeat hb = localState.getWorkerHeartBeat();
+        LOG.trace("{}: Reading heartbeat {}", _workerId, hb);
+        return hb;
+    }
+
+    /**
+     * Is a process alive and running?
+     * @param pid the PID of the running process
+     * @param user the user that is expected to own that process
+     * @return true if it is, else false
+     * @throws IOException on any error
+     */
+    protected boolean isProcessAlive(long pid, String user) throws IOException {
+        if (Utils.IS_ON_WINDOWS) {
+            return isWindowsProcessAlive(pid, user);
+        }
+        return isPosixProcessAlive(pid, user);
+    }
+    
+    private boolean isWindowsProcessAlive(long pid, String user) throws IOException {
+        boolean ret = false;
+        ProcessBuilder pb = new ProcessBuilder("tasklist", "/fo", "list", "/fi", "pid eq " + pid, "/v");
+        pb.redirectError(Redirect.INHERIT);
+        Process p = pb.start();
+        try (BufferedReader in = new BufferedReader(new InputStreamReader(p.getInputStream()))) {
+            String read;
+            while ((read = in.readLine()) != null) {
+                if (read.contains("User Name:")) { //Check for : in case someone called their user "User Name"
+                    //This line contains the user name for the pid we're looking up
+                    //Example line: "User Name:    exampleDomain\exampleUser"
+                    List<String> userNameLineSplitOnWhitespace = Arrays.asList(read.split(":"));
+                    if(userNameLineSplitOnWhitespace.size() == 2){
+                        List<String> userAndMaybeDomain = Arrays.asList(userNameLineSplitOnWhitespace.get(1).trim().split("\\\\"));
+                        String processUser = userAndMaybeDomain.size() == 2 ? userAndMaybeDomain.get(1) : userAndMaybeDomain.get(0);
+                        if(user.equals(processUser)){
+                            ret = true;
+                        } else {
+                            LOG.info("Found {} running as {}, but expected it to be {}", pid, processUser, user);
+                        }
+                    } else {
+                        LOG.error("Received unexpected output from tasklist command. Expected one colon in user name line. Line was {}", read);
+                    }
+                    break;
+                }
+            }
+        }
+        return ret;
+    }
+    
+    private boolean isPosixProcessAlive(long pid, String user) throws IOException {
+        boolean ret = false;
+        ProcessBuilder pb = new ProcessBuilder("ps", "-o", "user", "-p", String.valueOf(pid));
+        pb.redirectError(Redirect.INHERIT);
+        Process p = pb.start();
+        try (BufferedReader in = new BufferedReader(new InputStreamReader(p.getInputStream()))) {
+            String first = in.readLine();
+            assert("USER".equals(first));
+            String processUser;
+            while ((processUser = in.readLine()) != null) {
+                if (user.equals(processUser)) {
+                    ret = true;
+                    break;
+                } else {
+                    LOG.info("Found {} running as {}, but expected it to be {}", pid, processUser, user);
+                }
+            }
+        }
+        return ret;
+    }
+    
+    @Override
+    public boolean areAllProcessesDead() throws IOException {
+        Set<Long> pids = getAllPids();
+        String user = getWorkerUser();
+        
+        boolean allDead = true;
+        for (Long pid: pids) {
+            if (!isProcessAlive(pid, user)) {
+                LOG.debug("{}: PID {} is dead", _workerId, pid);
+            } else {
+                allDead = false;
+                break;
+            }
+        }
+        return allDead;
+    }
+
+    @Override
+    public void cleanUp() throws IOException {
+        cleanUpForRestart();
+    }
+    
+    /**
+     * Setup the container to run.  By default this creates the needed directories/links in the
+     * local file system
+     * PREREQUISITE: All needed blobs and topology, jars/configs have been downloaded and
+     * placed in the appropriate locations
+     * @throws IOException on any error
+     */
+    protected void setup() throws IOException {
+        _type.assertFull();
+        if (!_ops.doRequiredTopoFilesExist(_conf, _topologyId)) {
+            LOG.info("Missing topology storm code, so can't launch  worker with assignment {} for this supervisor {} on port {} with id {}", _assignment,
+                    _supervisorId, _port, _workerId);
+            throw new IllegalStateException("Not all needed files are here!!!!");
+        } 
+        LOG.info("Setting up {}:{}", _supervisorId, _workerId);
+
+        _ops.forceMkdir(new File(ConfigUtils.workerPidsRoot(_conf, _workerId)));
+        _ops.forceMkdir(new File(ConfigUtils.workerTmpRoot(_conf, _workerId)));
+        _ops.forceMkdir(new File(ConfigUtils.workerHeartbeatsRoot(_conf, _workerId)));
+        
+        File workerArtifacts = new File(ConfigUtils.workerArtifactsRoot(_conf, _topologyId, _port));
+        if (!_ops.fileExists(workerArtifacts)) {
+            _ops.forceMkdir(workerArtifacts);
+            _ops.setupStormCodeDir(_topoConf, workerArtifacts);
+        }
+    
+        String user = getWorkerUser();
+        writeLogMetadata(user);
+        saveWorkerUser(user);
+        createArtifactsLink();
+        createBlobstoreLinks();
+    }
+    
+    /**
+     * Write out the file used by the log viewer to allow/reject log access
+     * @param user the user this is going to run as
+     * @throws IOException on any error
+     */
+    @SuppressWarnings("unchecked")
+    protected void writeLogMetadata(String user) throws IOException {
+        _type.assertFull();
+        Map<String, Object> data = new HashMap<>();
+        data.put(Config.TOPOLOGY_SUBMITTER_USER, user);
+        data.put("worker-id", _workerId);
+
+        Set<String> logsGroups = new HashSet<>();
+        if (_topoConf.get(Config.LOGS_GROUPS) != null) {
+            List<String> groups = (List<String>) _topoConf.get(Config.LOGS_GROUPS);
+            for (String group : groups){
+                logsGroups.add(group);
+            }
+        }
+        if (_topoConf.get(Config.TOPOLOGY_GROUPS) != null) {
+            List<String> topGroups = (List<String>) _topoConf.get(Config.TOPOLOGY_GROUPS);
+            logsGroups.addAll(topGroups);
+        }
+        data.put(Config.LOGS_GROUPS, logsGroups.toArray());
+
+        Set<String> logsUsers = new HashSet<>();
+        if (_topoConf.get(Config.LOGS_USERS) != null) {
+            List<String> logUsers = (List<String>) _topoConf.get(Config.LOGS_USERS);
+            for (String logUser : logUsers){
+                logsUsers.add(logUser);
+            }
+        }
+        if (_topoConf.get(Config.TOPOLOGY_USERS) != null) {
+            List<String> topUsers = (List<String>) _topoConf.get(Config.TOPOLOGY_USERS);
+            for (String logUser : topUsers){
+                logsUsers.add(logUser);
+            }
+        }
+        data.put(Config.LOGS_USERS, logsUsers.toArray());
+
+        File file = ConfigUtils.getLogMetaDataFile(_conf, _topologyId, _port);
+
+        Yaml yaml = new Yaml();
+        try (Writer writer = _ops.getWriter(file)) {
+            yaml.dump(data, writer);
+        }
+    }
+    
+    /**
+     * Create symlink from the containers directory/artifacts to the artifacts directory
+     * @throws IOException on any error
+     */
+    protected void createArtifactsLink() throws IOException {
+        _type.assertFull();
+        File workerDir = new File(ConfigUtils.workerRoot(_conf, _workerId));
+        File topoDir = new File(ConfigUtils.workerArtifactsRoot(_conf, _topologyId, _port));
+        if (_ops.fileExists(workerDir)) {
+            LOG.debug("Creating symlinks for worker-id: {} topology-id: {} to its port artifacts directory", _workerId, _topologyId);
+            _ops.createSymlink(new File(workerDir, "artifacts"), topoDir);
+        }
+    }
+    
+    /**
+     * Create symlinks for each of the blobs from the container's directory to
+     * corresponding links in the storm dist directory.
+     * @throws IOException on any error.
+     */
+    protected void createBlobstoreLinks() throws IOException {
+        _type.assertFull();
+        String stormRoot = ConfigUtils.supervisorStormDistRoot(_conf, _topologyId);
+        String workerRoot = ConfigUtils.workerRoot(_conf, _workerId);
+        
+        @SuppressWarnings("unchecked")
+        Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String, Object>>) _topoConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
+        List<String> blobFileNames = new ArrayList<>();
+        if (blobstoreMap != null) {
+            for (Map.Entry<String, Map<String, Object>> entry : blobstoreMap.entrySet()) {
+                String key = entry.getKey();
+                Map<String, Object> blobInfo = entry.getValue();
+                String ret = null;
+                if (blobInfo != null && blobInfo.containsKey("localname")) {
+                    ret = (String) blobInfo.get("localname");
+                } else {
+                    ret = key;
+                }
+                blobFileNames.add(ret);
+            }
+        }
+        List<String> resourceFileNames = new ArrayList<>();
+        resourceFileNames.add(ConfigUtils.RESOURCES_SUBDIR);
+        resourceFileNames.addAll(blobFileNames);
+        LOG.info("Creating symlinks for worker-id: {} storm-id: {} for files({}): {}", _workerId, _topologyId, resourceFileNames.size(), resourceFileNames);
+        _ops.createSymlink(new File(workerRoot, ConfigUtils.RESOURCES_SUBDIR), 
+                new File(stormRoot, ConfigUtils.RESOURCES_SUBDIR));
+        for (String fileName : blobFileNames) {
+            _ops.createSymlink(new File(workerRoot, fileName),
+                    new File(stormRoot, fileName));
+        }
+    }
+    
+    /**
+     * @return all of the pids that are a part of this container.
+     */
+    protected Set<Long> getAllPids() throws IOException {
+        Set<Long> ret = new HashSet<>();
+        for (String listing: Utils.readDirContents(ConfigUtils.workerPidsRoot(_conf, _workerId))) {
+            ret.add(Long.valueOf(listing));
+        }
+        
+        if (_resourceIsolationManager != null) {
+            Set<Long> morePids = _resourceIsolationManager.getRunningPIDs(_workerId);
+            assert(morePids != null);
+            ret.addAll(morePids);
+        }
+        
+        return ret;
+    }
+    
+    /** 
+     * @return the user that some operations should be done as.
+     * @throws IOException on any error
+     */
+    protected String getWorkerUser() throws IOException {
+        LOG.info("GET worker-user for {}", _workerId);
+        File file = new File(ConfigUtils.workerUserFile(_conf, _workerId));
+
+        if (_ops.fileExists(file)) {
+            return _ops.slurpString(file).trim();
+        } else if (_topoConf != null) { 
+            return (String) _topoConf.get(Config.TOPOLOGY_SUBMITTER_USER);
+        }
+        if (ConfigUtils.isLocalMode(_conf)) {
+            return System.getProperty("user.name");
+        } else {
+            File f = new File(ConfigUtils.workerArtifactsRoot(_conf));
+            if (f.exists()) {
+                return Files.getOwner(f.toPath()).getName();
+            }
+            throw new IllegalStateException("Could not recover the user for " + _workerId);
+        }
+    }
+    
+    protected void saveWorkerUser(String user) throws IOException {
+        _type.assertFull();
+        LOG.info("SET worker-user {} {}", _workerId, user);
+        _ops.dump(new File(ConfigUtils.workerUserFile(_conf, _workerId)), user);
+    }
+    
+    protected void deleteSavedWorkerUser() throws IOException {
+        LOG.info("REMOVE worker-user {}", _workerId);
+        _ops.deleteIfExists(new File(ConfigUtils.workerUserFile(_conf, _workerId)));
+    }
+    
+    /**
+     * Clean up the container partly preparing for restart.
+     * By default delete all of the temp directories we are going
+     * to get a new worker_id anyways.
+     * POST CONDITION: the workerId will be set to null
+     * @throws IOException on any error
+     */
+    public void cleanUpForRestart() throws IOException {
+        LOG.info("Cleaning up {}:{}", _supervisorId, _workerId);
+        Set<Long> pids = getAllPids();
+        String user = getWorkerUser();
+        
+        for (Long pid : pids) {
+            File path = new File(ConfigUtils.workerPidPath(_conf, _workerId, pid));
+            _ops.deleteIfExists(path, user, _workerId);
+        }
+        
+        //clean up for resource isolation if enabled
+        if (_resourceIsolationManager != null) {
+            _resourceIsolationManager.releaseResourcesForWorker(_workerId);
+        }
+        
+        //Always make sure to clean up everything else before worker directory
+        //is removed since that is what is going to trigger the retry for cleanup
+        _ops.deleteIfExists(new File(ConfigUtils.workerHeartbeatsRoot(_conf, _workerId)), user, _workerId);
+        _ops.deleteIfExists(new File(ConfigUtils.workerPidsRoot(_conf, _workerId)), user, _workerId);
+        _ops.deleteIfExists(new File(ConfigUtils.workerTmpRoot(_conf, _workerId)), user, _workerId);
+        _ops.deleteIfExists(new File(ConfigUtils.workerRoot(_conf, _workerId)), user, _workerId);
+        deleteSavedWorkerUser();
+        _workerId = null;
+    }
+    
+    /**
+     * Launch the process for the first time
+     * PREREQUISITE: setup has run and passed
+     * @throws IOException on any error
+     */
+    public abstract void launch() throws IOException;
+    
+    /**
+     * Restart the processes in this container
+     * PREREQUISITE: cleanUpForRestart has run and passed
+     * @throws IOException on any error
+     */
+    public abstract void relaunch() throws IOException;
+
+    /**
+     * @return true if the main process exited, else false. This is just best effort return false if unknown.
+     */
+    public abstract boolean didMainProcessExit();
+
+    /**
+     * Run a profiling request
+     * @param request the request to run
+     * @param stop is this a stop request?
+     * @return true if it succeeded, else false
+     * @throws IOException on any error
+     * @throws InterruptedException if running the command is interrupted.
+     */
+    public abstract boolean runProfiling(ProfileRequest request, boolean stop) throws IOException, InterruptedException;
+
+    /**
+     * @return the id of the container or null if there is no worker id right now.
+     */
+    public String getWorkerId() {
+        return _workerId;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ContainerLauncher.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ContainerLauncher.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ContainerLauncher.java
new file mode 100644
index 0000000..55f167c
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ContainerLauncher.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.storm.Config;
+import org.apache.storm.container.ResourceIsolationInterface;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.messaging.IContext;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Launches containers
+ */
+public abstract class ContainerLauncher {
+    private static final Logger LOG = LoggerFactory.getLogger(ContainerLauncher.class);
+    
+    /**
+     * Factory to create the right container launcher 
+     * for the config and the environment.
+     * @param conf the config
+     * @param supervisorId the ID of the supervisor
+     * @param sharedContext Used in local mode to let workers talk together without netty
+     * @return the proper container launcher
+     * @throws IOException on any error
+     */
+    public static ContainerLauncher make(Map<String, Object> conf, String supervisorId, IContext sharedContext) throws IOException {
+        if (ConfigUtils.isLocalMode(conf)) {
+            return new LocalContainerLauncher(conf, supervisorId, sharedContext);
+        }
+        
+        ResourceIsolationInterface resourceIsolationManager = null;
+        if (Utils.getBoolean(conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN_ENABLE), false)) {
+            resourceIsolationManager = Utils.newInstance((String) conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN));
+            resourceIsolationManager.prepare(conf);
+            LOG.info("Using resource isolation plugin {} {}", conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN), resourceIsolationManager);
+        }
+
+        if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
+            return new RunAsUserContainerLauncher(conf, supervisorId, resourceIsolationManager);
+        }
+        return new BasicContainerLauncher(conf, supervisorId, resourceIsolationManager);
+    }
+    
+    protected ContainerLauncher() {
+        //Empty
+    }
+
+    /**
+     * Launch a container in a given slot
+     * @param port the port to run this on
+     * @param assignment what to launch
+     * @param state the current state of the supervisor
+     * @return The container that can be used to manager the processes.
+     * @throws IOException on any error 
+     */
+    public abstract Container launchContainer(int port, LocalAssignment assignment, LocalState state) throws IOException;
+    
+    /**
+     * Recover a container for a running process
+     * @param port the port the assignment is running on
+     * @param assignment the assignment that was launched
+     * @param state the current state of the supervisor
+     * @return The container that can be used to manage the processes.
+     * @throws IOException on any error
+     * @throws ContainerRecoveryException if the Container could not be recovered
+     */
+    public abstract Container recoverContainer(int port, LocalAssignment assignment, LocalState state) throws IOException, ContainerRecoveryException;
+    
+    /**
+     * Try to recover a container using just the worker ID.  
+     * The result is really only useful for killing the container
+     * and so is returning a Killable.  Even if a Container is returned
+     * do not case the result to Container because only the Killable APIs
+     * are guaranteed to work. 
+     * @param workerId the id of the worker to use
+     * @param localState the state of the running supervisor
+     * @return a Killable that can be used to kill the underlying container.
+     * @throws IOException on any error
+     * @throws ContainerRecoveryException if the Container could not be recovered
+     */
+    public abstract Killable recoverContainer(String workerId, LocalState localState) throws IOException, ContainerRecoveryException;
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ContainerRecoveryException.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ContainerRecoveryException.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ContainerRecoveryException.java
new file mode 100644
index 0000000..7ab6e67
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ContainerRecoveryException.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+/**
+ * Could not recover the container.
+ */
+public class ContainerRecoveryException extends RuntimeException {
+
+    public ContainerRecoveryException(String message) {
+        super(message);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ExitCodeCallback.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ExitCodeCallback.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ExitCodeCallback.java
new file mode 100644
index 0000000..082f205
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ExitCodeCallback.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+/**
+ * A callback that can accept an integer.
+ */
+public interface ExitCodeCallback {
+    
+    /**
+     * The process finished 
+     * @param exitCode the exit code of the finished process.
+     */
+    public void call(int exitCode);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Killable.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Killable.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Killable.java
new file mode 100644
index 0000000..8d6d8e0
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Killable.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import java.io.IOException;
+
+public interface Killable {
+    
+    /**
+     * Kill the processes in this container nicely.
+     * kill -15 equivalent
+     * @throws IOException on any error
+     */
+    public void kill() throws IOException;
+    
+    /**
+     * Kill the processes in this container violently.
+     * kill -9 equivalent
+     * @throws IOException on any error
+     */
+    public void forceKill() throws IOException;
+    
+    /**
+     * @return true if all of the processes are dead, else false
+     * @throws IOException on any error
+     */
+    public boolean areAllProcessesDead() throws IOException;
+    
+    /**
+     * Clean up the container. It is not coming back.
+     * by default do the same thing as when restarting.
+     * @throws IOException on any error
+     */
+    public void cleanUp() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/daemon/supervisor/LocalContainer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/LocalContainer.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/LocalContainer.java
new file mode 100644
index 0000000..9b196d4
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/LocalContainer.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.storm.ProcessSimulator;
+import org.apache.storm.daemon.Shutdownable;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.generated.ProfileRequest;
+import org.apache.storm.messaging.IContext;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import clojure.java.api.Clojure;
+import clojure.lang.IFn;
+
+public class LocalContainer extends Container {
+    private static final Logger LOG = LoggerFactory.getLogger(LocalContainer.class);
+    private volatile boolean _isAlive = false;
+    private final IContext _sharedContext;
+    
+    public LocalContainer(Map<String, Object> conf, String supervisorId, int port, LocalAssignment assignment, IContext sharedContext) throws IOException {
+        super(ContainerType.LAUNCH, conf, supervisorId, port, assignment, null, null, null, null);
+        _sharedContext = sharedContext;
+        _workerId = Utils.uuid();
+    }
+    
+    @Override
+    public void launch() throws IOException {
+        //TODO when worker goes to java, just call it directly (not through clojure)
+        IFn mkWorker = Clojure.var("org.apache.storm.daemon.worker", "mk-worker");
+
+        Shutdownable worker = (Shutdownable) mkWorker.invoke(_conf, _sharedContext, _topologyId, _supervisorId, _port, _workerId);
+        saveWorkerUser(System.getProperty("user.name"));
+        ProcessSimulator.registerProcess(_workerId, worker);
+        _isAlive = true;
+    }
+
+    @Override
+    public void kill() throws IOException {
+        ProcessSimulator.killProcess(_workerId);
+        _isAlive = false;
+        //Make sure the worker is down before we try to shoot any child processes
+        super.kill();
+    }
+
+    @Override
+    public boolean areAllProcessesDead() throws IOException {
+        return !_isAlive && super.areAllProcessesDead();
+    }
+
+    @Override
+    public void relaunch() throws IOException {
+        LOG.warn("NOOP relaunch in local mode...");
+    }
+
+    @Override
+    public boolean didMainProcessExit() {
+        //In local mode the main process should never exit on it's own
+        return false;
+    }
+
+    @Override
+    public boolean runProfiling(ProfileRequest request, boolean stop) throws IOException, InterruptedException {
+        throw new RuntimeException("Profiling requests are not supported in local mode");
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/daemon/supervisor/LocalContainerLauncher.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/LocalContainerLauncher.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/LocalContainerLauncher.java
new file mode 100644
index 0000000..c25bc49
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/LocalContainerLauncher.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.messaging.IContext;
+import org.apache.storm.utils.LocalState;
+
+/**
+ * Launch Containers in local mode.
+ */
+public class LocalContainerLauncher extends ContainerLauncher {
+    private final Map<String, Object> _conf;
+    private final String _supervisorId;
+    private final IContext _sharedContext;
+
+    public LocalContainerLauncher(Map<String, Object> conf, String supervisorId, IContext sharedContext) {
+        _conf = conf;
+        _supervisorId = supervisorId;
+        _sharedContext = sharedContext;
+    }
+
+    @Override
+    public Container launchContainer(int port, LocalAssignment assignment, LocalState state) throws IOException {
+        LocalContainer ret = new LocalContainer(_conf, _supervisorId, port, assignment, _sharedContext);
+        ret.setup();
+        ret.launch();
+        return ret;
+    }
+
+    @Override
+    public Container recoverContainer(int port, LocalAssignment assignment, LocalState state) throws IOException {
+        //We are in the same process we cannot recover anything
+        throw new ContainerRecoveryException("Local Mode Recovery is not supported");
+    }
+
+    @Override
+    public Killable recoverContainer(String workerId, LocalState localState) throws IOException {
+        //We are in the same process we cannot recover anything
+        throw new ContainerRecoveryException("Local Mode Recovery is not supported");
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ReadClusterState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ReadClusterState.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ReadClusterState.java
new file mode 100644
index 0000000..27f18b6
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ReadClusterState.java
@@ -0,0 +1,327 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.storm.Config;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.cluster.VersionedData;
+import org.apache.storm.daemon.supervisor.Slot.MachineState;
+import org.apache.storm.daemon.supervisor.Slot.TopoProfileAction;
+import org.apache.storm.event.EventManager;
+import org.apache.storm.generated.Assignment;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.generated.NodeInfo;
+import org.apache.storm.generated.ProfileRequest;
+import org.apache.storm.generated.WorkerResources;
+import org.apache.storm.localizer.ILocalizer;
+import org.apache.storm.scheduler.ISupervisor;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReadClusterState implements Runnable, AutoCloseable {
+    private static final Logger LOG = LoggerFactory.getLogger(ReadClusterState.class);
+    
+    private final Map<String, Object> superConf;
+    private final IStormClusterState stormClusterState;
+    private final EventManager syncSupEventManager;
+    private final AtomicReference<Map<String, VersionedData<Assignment>>> assignmentVersions;
+    private final Map<Integer, Slot> slots = new HashMap<>();
+    private final AtomicInteger readRetry = new AtomicInteger(0);
+    private final String assignmentId;
+    private final ISupervisor iSuper;
+    private final ILocalizer localizer;
+    private final ContainerLauncher launcher;
+    private final String host;
+    private final LocalState localState;
+    private final IStormClusterState clusterState;
+    private final AtomicReference<Map<Long, LocalAssignment>> cachedAssignments;
+    
+    public ReadClusterState(Supervisor supervisor) throws Exception {
+        this.superConf = supervisor.getConf();
+        this.stormClusterState = supervisor.getStormClusterState();
+        this.syncSupEventManager = supervisor.getEventManger();
+        this.assignmentVersions = new AtomicReference<>(new HashMap<>());
+        this.assignmentId = supervisor.getAssignmentId();
+        this.iSuper = supervisor.getiSupervisor();
+        this.localizer = supervisor.getAsyncLocalizer();
+        this.host = supervisor.getHostName();
+        this.localState = supervisor.getLocalState();
+        this.clusterState = supervisor.getStormClusterState();
+        this.cachedAssignments = supervisor.getCurrAssignment();
+        
+        this.launcher = ContainerLauncher.make(superConf, assignmentId, supervisor.getSharedContext());
+        
+        @SuppressWarnings("unchecked")
+        List<Number> ports = (List<Number>)superConf.get(Config.SUPERVISOR_SLOTS_PORTS);
+        for (Number port: ports) {
+            slots.put(port.intValue(), mkSlot(port.intValue()));
+        }
+        
+        try {
+            Collection<String> workers = SupervisorUtils.supervisorWorkerIds(superConf);
+            for (Slot slot: slots.values()) {
+                String workerId = slot.getWorkerId();
+                if (workerId != null) {
+                    workers.remove(workerId);
+                }
+            }
+            if (!workers.isEmpty()) {
+                supervisor.killWorkers(workers, launcher);
+            }
+        } catch (Exception e) {
+            LOG.warn("Error trying to clean up old workers", e);
+        }
+
+        //All the slots/assignments should be recovered now, so we can clean up anything that we don't expect to be here
+        try {
+            localizer.cleanupUnusedTopologies();
+        } catch (Exception e) {
+            LOG.warn("Error trying to clean up old topologies", e);
+        }
+        
+        for (Slot slot: slots.values()) {
+            slot.start();
+        }
+    }
+
+    private Slot mkSlot(int port) throws Exception {
+        return new Slot(localizer, superConf, launcher, host, port,
+                localState, clusterState, iSuper, cachedAssignments);
+    }
+    
+    @Override
+    public synchronized void run() {
+        try {
+            Runnable syncCallback = new EventManagerPushCallback(this, syncSupEventManager);
+            List<String> stormIds = stormClusterState.assignments(syncCallback);
+            Map<String, VersionedData<Assignment>> assignmentsSnapshot =
+                    getAssignmentsSnapshot(stormClusterState, stormIds, assignmentVersions.get(), syncCallback);
+            
+            Map<Integer, LocalAssignment> allAssignments =
+                    readAssignments(assignmentsSnapshot);
+            if (allAssignments == null) {
+                //Something odd happened try again later
+                return;
+            }
+            Map<String, List<ProfileRequest>> topoIdToProfilerActions = getProfileActions(stormClusterState, stormIds);
+            
+            HashSet<Integer> assignedPorts = new HashSet<>();
+            LOG.debug("Synchronizing supervisor");
+            LOG.debug("All assignment: {}", allAssignments);
+            LOG.debug("Topology Ids -> Profiler Actions {}", topoIdToProfilerActions);
+            for (Integer port: allAssignments.keySet()) {
+                if (iSuper.confirmAssigned(port)) {
+                    assignedPorts.add(port);
+                }
+            }
+            HashSet<Integer> allPorts = new HashSet<>(assignedPorts);
+            allPorts.addAll(slots.keySet());
+            
+            Map<Integer, Set<TopoProfileAction>> filtered = new HashMap<>();
+            for (Entry<String, List<ProfileRequest>> entry: topoIdToProfilerActions.entrySet()) {
+                String topoId = entry.getKey();
+                if (entry.getValue() != null) {
+                    for (ProfileRequest req: entry.getValue()) {
+                        NodeInfo ni = req.get_nodeInfo();
+                        if (host.equals(ni.get_node())) {
+                            Long port = ni.get_port().iterator().next();
+                            Set<TopoProfileAction> actions = filtered.get(port);
+                            if (actions == null) {
+                                actions = new HashSet<>();
+                                filtered.put(port.intValue(), actions);
+                            }
+                            actions.add(new TopoProfileAction(topoId, req));
+                        }
+                    }
+                }
+            }
+            
+            for (Integer port: allPorts) {
+                Slot slot = slots.get(port);
+                if (slot == null) {
+                    slot = mkSlot(port);
+                    slots.put(port, slot);
+                    slot.start();
+                }
+                slot.setNewAssignment(allAssignments.get(port));
+                slot.addProfilerActions(filtered.get(port));
+            }
+            
+        } catch (Exception e) {
+            LOG.error("Failed to Sync Supervisor", e);
+            throw new RuntimeException(e);
+        }
+    }
+    
+    protected Map<String, VersionedData<Assignment>> getAssignmentsSnapshot(IStormClusterState stormClusterState, List<String> topoIds,
+            Map<String, VersionedData<Assignment>> localAssignmentVersion, Runnable callback) throws Exception {
+        Map<String, VersionedData<Assignment>> updateAssignmentVersion = new HashMap<>();
+        for (String topoId : topoIds) {
+            Integer recordedVersion = -1;
+            Integer version = stormClusterState.assignmentVersion(topoId, callback);
+            VersionedData<Assignment> locAssignment = localAssignmentVersion.get(topoId);
+            if (locAssignment != null) {
+                recordedVersion = locAssignment.getVersion();
+            }
+            if (version == null) {
+                // ignore
+            } else if (version == recordedVersion) {
+                updateAssignmentVersion.put(topoId, locAssignment);
+            } else {
+                VersionedData<Assignment> assignmentVersion = stormClusterState.assignmentInfoWithVersion(topoId, callback);
+                updateAssignmentVersion.put(topoId, assignmentVersion);
+            }
+        }
+        return updateAssignmentVersion;
+    }
+    
+    protected Map<String, List<ProfileRequest>> getProfileActions(IStormClusterState stormClusterState, List<String> stormIds) throws Exception {
+        Map<String, List<ProfileRequest>> ret = new HashMap<String, List<ProfileRequest>>();
+        for (String stormId : stormIds) {
+            List<ProfileRequest> profileRequests = stormClusterState.getTopologyProfileRequests(stormId);
+            ret.put(stormId, profileRequests);
+        }
+        return ret;
+    }
+    
+    protected Map<Integer, LocalAssignment> readAssignments(Map<String, VersionedData<Assignment>> assignmentsSnapshot) {
+        try {
+            Map<Integer, LocalAssignment> portLA = new HashMap<>();
+            for (Map.Entry<String, VersionedData<Assignment>> assignEntry : assignmentsSnapshot.entrySet()) {
+                String topoId = assignEntry.getKey();
+                Assignment assignment = assignEntry.getValue().getData();
+
+                Map<Integer, LocalAssignment> portTasks = readMyExecutors(topoId, assignmentId, assignment);
+
+                for (Map.Entry<Integer, LocalAssignment> entry : portTasks.entrySet()) {
+
+                    Integer port = entry.getKey();
+
+                    LocalAssignment la = entry.getValue();
+
+                    if (!portLA.containsKey(port)) {
+                        portLA.put(port, la);
+                    } else {
+                        throw new RuntimeException("Should not have multiple topologies assigned to one port "
+                          + port + " " + la + " " + portLA);
+                    }
+                }
+            }
+            readRetry.set(0);
+            return portLA;
+        } catch (RuntimeException e) {
+            if (readRetry.get() > 2) {
+                throw e;
+            } else {
+                readRetry.addAndGet(1);
+            }
+            LOG.warn("{} : retrying {} of 3", e.getMessage(), readRetry.get());
+            return null;
+        }
+    }
+    
+    protected Map<Integer, LocalAssignment> readMyExecutors(String stormId, String assignmentId, Assignment assignment) {
+        Map<Integer, LocalAssignment> portTasks = new HashMap<>();
+        Map<Long, WorkerResources> slotsResources = new HashMap<>();
+        Map<NodeInfo, WorkerResources> nodeInfoWorkerResourcesMap = assignment.get_worker_resources();
+        if (nodeInfoWorkerResourcesMap != null) {
+            for (Map.Entry<NodeInfo, WorkerResources> entry : nodeInfoWorkerResourcesMap.entrySet()) {
+                if (entry.getKey().get_node().equals(assignmentId)) {
+                    Set<Long> ports = entry.getKey().get_port();
+                    for (Long port : ports) {
+                        slotsResources.put(port, entry.getValue());
+                    }
+                }
+            }
+        }
+        Map<List<Long>, NodeInfo> executorNodePort = assignment.get_executor_node_port();
+        if (executorNodePort != null) {
+            for (Map.Entry<List<Long>, NodeInfo> entry : executorNodePort.entrySet()) {
+                if (entry.getValue().get_node().equals(assignmentId)) {
+                    for (Long port : entry.getValue().get_port()) {
+                        LocalAssignment localAssignment = portTasks.get(port.intValue());
+                        if (localAssignment == null) {
+                            List<ExecutorInfo> executors = new ArrayList<>();
+                            localAssignment = new LocalAssignment(stormId, executors);
+                            if (slotsResources.containsKey(port)) {
+                                localAssignment.set_resources(slotsResources.get(port));
+                            }
+                            portTasks.put(port.intValue(), localAssignment);
+                        }
+                        List<ExecutorInfo> executorInfoList = localAssignment.get_executors();
+                        executorInfoList.add(new ExecutorInfo(entry.getKey().get(0).intValue(), entry.getKey().get(entry.getKey().size() - 1).intValue()));
+                    }
+                }
+            }
+        }
+        return portTasks;
+    }
+
+    public synchronized void shutdownAllWorkers() {
+        for (Slot slot: slots.values()) {
+            slot.setNewAssignment(null);
+        }
+
+        for (Slot slot: slots.values()) {
+            try {
+                int count = 0;
+                while (slot.getMachineState() != MachineState.EMPTY) {
+                    if (count > 10) {
+                        LOG.warn("DONE waiting for {} to finish {}", slot, slot.getMachineState());
+                        break;
+                    }
+                    if (Time.isSimulating()) {
+                        Time.advanceTime(1000);
+                        Thread.sleep(100);
+                    } else {
+                        Time.sleep(100);
+                    }
+                    count++;
+                }
+            } catch (Exception e) {
+                LOG.error("Error trying to shutdown workers in {}", slot, e);
+            }
+        }
+    }
+    
+    @Override
+    public void close() {
+        for (Slot slot: slots.values()) {
+            try {
+                slot.close();
+            } catch (Exception e) {
+                LOG.error("Error trying to shutdown {}", slot, e);
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/daemon/supervisor/RunAsUserContainer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/RunAsUserContainer.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/RunAsUserContainer.java
new file mode 100644
index 0000000..a2d8991
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/RunAsUserContainer.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.storm.container.ResourceIsolationInterface;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RunAsUserContainer extends BasicContainer {
+    private static final Logger LOG = LoggerFactory.getLogger(RunAsUserContainer.class);
+
+    public RunAsUserContainer(ContainerType type, Map<String, Object> conf, String supervisorId, int port,
+            LocalAssignment assignment, ResourceIsolationInterface resourceIsolationManager, LocalState localState,
+            String workerId) throws IOException {
+        this(type, conf, supervisorId, port, assignment, resourceIsolationManager, localState, workerId, null, null, null);
+    }
+    
+    RunAsUserContainer(ContainerType type, Map<String, Object> conf, String supervisorId, int port,
+            LocalAssignment assignment, ResourceIsolationInterface resourceIsolationManager, LocalState localState,
+            String workerId, Map<String, Object> topoConf, AdvancedFSOps ops, String profileCmd) throws IOException {
+        super(type, conf, supervisorId, port, assignment, resourceIsolationManager, localState, workerId, topoConf, ops,
+                profileCmd);
+        if (Utils.isOnWindows()) {
+            throw new UnsupportedOperationException("ERROR: Windows doesn't support running workers as different users yet");
+        }
+    }
+
+    private void signal(long pid, int signal) throws IOException {
+        List<String> commands = Arrays.asList("signal", String.valueOf(pid), String.valueOf(signal));
+        String user = getWorkerUser();
+        String logPrefix = "kill -"+signal+" " + pid;
+        SupervisorUtils.processLauncherAndWait(_conf, user, commands, null, logPrefix);
+    }
+    
+    @Override
+    protected void kill(long pid) throws IOException {
+        signal(pid, 15);
+    }
+    
+    @Override
+    protected void forceKill(long pid) throws IOException {
+        signal(pid, 9);
+    }
+    
+    @Override
+    protected boolean runProfilingCommand(List<String> command, Map<String, String> env, String logPrefix, File targetDir) throws IOException, InterruptedException {
+        String user = this.getWorkerUser();
+        String td = targetDir.getAbsolutePath();
+        LOG.info("Running as user: {} command: {}", user, command);
+        String containerFile = Utils.containerFilePath(td);
+        if (Utils.checkFileExists(containerFile)) {
+            SupervisorUtils.rmrAsUser(_conf, containerFile, containerFile);
+        }
+        String scriptFile = Utils.scriptFilePath(td);
+        if (Utils.checkFileExists(scriptFile)) {
+            SupervisorUtils.rmrAsUser(_conf, scriptFile, scriptFile);
+        }
+        String script = Utils.writeScript(td, command, env);
+        List<String> args = Arrays.asList("profiler", td, script);
+        int ret = SupervisorUtils.processLauncherAndWait(_conf, user, args, env, logPrefix);
+        return ret == 0;
+    }
+
+    @Override
+    protected void launchWorkerProcess(List<String> command, Map<String, String> env, 
+            String logPrefix, ExitCodeCallback processExitCallback, File targetDir) throws IOException {
+        String workerDir = targetDir.getAbsolutePath();
+        String user = this.getWorkerUser();
+        List<String> args = Arrays.asList("worker", workerDir, Utils.writeScript(workerDir, command, env));
+        List<String> commandPrefix = null;
+        if (_resourceIsolationManager != null) {
+            commandPrefix = _resourceIsolationManager.getLaunchCommandPrefix(_workerId);
+        }
+        SupervisorUtils.processLauncher(_conf, user, commandPrefix, args, null, logPrefix, processExitCallback, targetDir);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/daemon/supervisor/RunAsUserContainerLauncher.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/RunAsUserContainerLauncher.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/RunAsUserContainerLauncher.java
new file mode 100644
index 0000000..c8bee27
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/RunAsUserContainerLauncher.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.storm.container.ResourceIsolationInterface;
+import org.apache.storm.daemon.supervisor.Container.ContainerType;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.utils.LocalState;
+
+public class RunAsUserContainerLauncher extends ContainerLauncher {
+    private final Map<String, Object> _conf;
+    private final String _supervisorId;
+    protected final ResourceIsolationInterface _resourceIsolationManager;
+    
+    public RunAsUserContainerLauncher(Map<String, Object> conf, String supervisorId, ResourceIsolationInterface resourceIsolationManager) throws IOException {
+        _conf = conf;
+        _supervisorId = supervisorId;
+        _resourceIsolationManager = resourceIsolationManager;
+    }
+
+    @Override
+    public Container launchContainer(int port, LocalAssignment assignment, LocalState state) throws IOException {
+        Container container = new RunAsUserContainer(ContainerType.LAUNCH, _conf, _supervisorId, port, assignment,
+                _resourceIsolationManager, state, null, null, null, null);
+        container.setup();
+        container.launch();
+        return container;
+    }
+
+    @Override
+    public Container recoverContainer(int port, LocalAssignment assignment, LocalState state) throws IOException {
+        return new RunAsUserContainer(ContainerType.RECOVER_FULL, _conf, _supervisorId, port, assignment,
+                _resourceIsolationManager, state, null, null, null, null);
+    }
+    
+    @Override
+    public Killable recoverContainer(String workerId, LocalState localState) throws IOException {
+        return new RunAsUserContainer(ContainerType.RECOVER_PARTIAL, _conf, _supervisorId, -1, null,
+                _resourceIsolationManager, localState, workerId, null, null, null);
+    }
+
+}