You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/04/01 16:07:41 UTC

[18/35] storm git commit: add the plugin to use for manager worker

add the plugin to use for manager worker


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a1e47352
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a1e47352
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a1e47352

Branch: refs/heads/master
Commit: a1e473526b5d9074ae1f9ff98162ddc78e426a73
Parents: cc95d4f
Author: xiaojian.fxj <xi...@alibaba-inc.com>
Authored: Mon Mar 14 16:54:36 2016 +0800
Committer: xiaojian.fxj <xi...@alibaba-inc.com>
Committed: Mon Mar 14 18:55:57 2016 +0800

----------------------------------------------------------------------
 conf/defaults.yaml                              |   4 +
 .../org/apache/storm/command/kill_workers.clj   |  11 +-
 .../apache/storm/daemon/local_supervisor.clj    |  16 +-
 storm-core/src/clj/org/apache/storm/testing.clj |  16 +-
 storm-core/src/jvm/org/apache/storm/Config.java |   7 +
 .../storm/daemon/supervisor/DaemonCommon.java   |  22 -
 .../daemon/supervisor/StandaloneSupervisor.java |   1 -
 .../storm/daemon/supervisor/Supervisor.java     |  14 +-
 .../storm/daemon/supervisor/SupervisorData.java |  24 +-
 .../daemon/supervisor/SupervisorManager.java    | 103 +++++
 .../daemon/supervisor/SupervisorManger.java     |  97 -----
 .../daemon/supervisor/SupervisorUtils.java      | 105 +----
 .../daemon/supervisor/SyncProcessEvent.java     | 274 +------------
 .../daemon/supervisor/SyncSupervisorEvent.java  |  16 +-
 .../supervisor/timer/RunProfilerActions.java    |   2 +-
 .../supervisor/timer/SupervisorHealthCheck.java |   8 +-
 .../workermanager/DefaultWorkerManager.java     | 397 +++++++++++++++++++
 .../workermanager/IWorkerManager.java           |  38 ++
 .../supervisor/workermanager/IWorkerResult.java |  21 +
 .../clj/org/apache/storm/supervisor_test.clj    |  84 ++--
 20 files changed, 706 insertions(+), 554 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/a1e47352/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index 9817161..da25ef8 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -289,6 +289,10 @@ storm.daemon.metrics.reporter.plugins:
 storm.resource.isolation.plugin: "org.apache.storm.container.cgroup.CgroupManager"
 storm.resource.isolation.plugin.enable: false
 
+
+# Default plugin to use for manager worker
+storm.supervisor.worker.manager.plugin: org.apache.storm.daemon.supervisor.workermanager.DefaultWorkerManager
+
 # Configs for CGroup support
 storm.cgroup.hierarchy.dir: "/cgroup/storm_resources"
 storm.cgroup.resources:

http://git-wip-us.apache.org/repos/asf/storm/blob/a1e47352/storm-core/src/clj/org/apache/storm/command/kill_workers.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/command/kill_workers.clj b/storm-core/src/clj/org/apache/storm/command/kill_workers.clj
index aadc9fd..08de3ed 100644
--- a/storm-core/src/clj/org/apache/storm/command/kill_workers.clj
+++ b/storm-core/src/clj/org/apache/storm/command/kill_workers.clj
@@ -28,6 +28,13 @@
         conf (assoc conf STORM-LOCAL-DIR (. (File. (conf STORM-LOCAL-DIR)) getCanonicalPath))
         isupervisor (StandaloneSupervisor.)
         supervisor-data (SupervisorData. conf nil isupervisor)
-        ids (SupervisorUtils/supervisorWorkerIds conf)]
+        worker-manager  (.getWorkerManager supervisor-data)
+        ids (SupervisorUtils/supervisorWorkerIds conf)
+        supervisor-id (.getSupervisorId supervisor-data)
+        worker-pids (.getWorkerThreadPids supervisor-data)
+        dead-workers (.getDeadWorkers supervisor-data)]
     (doseq [id ids]
-      (SupervisorUtils/shutWorker supervisor-data id))))
+      (.shutdownWorker worker-manager supervisor-id id worker-pids)
+      (if (.cleanupWorker worker-manager id)
+        (.remove dead-workers id))
+      )))

http://git-wip-us.apache.org/repos/asf/storm/blob/a1e47352/storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj b/storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj
index c8ae2d6..b28ae08 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj
@@ -36,17 +36,21 @@
     (ProcessSimulator/registerProcess pid worker)
     (.put (.getWorkerThreadPids supervisorData) workerId pid)
     ))
-
-(defn shutdown-local-worker [supervisorData workerId]
-    (log-message "shutdown-local-worker")
-    (SupervisorUtils/shutWorker supervisorData workerId))
+(defn shutdown-local-worker [supervisorData worker-manager workerId]
+  (log-message "shutdown-local-worker")
+  (let [supervisor-id (.getSupervisorId supervisorData)
+        worker-pids (.getWorkerThreadPids supervisorData)
+        dead-workers (.getDeadWorkers supervisorData)]
+    (.shutdownWorker worker-manager supervisor-id workerId worker-pids)
+    (if (.cleanupWorker worker-manager workerId)
+      (.remove dead-workers workerId))))
 
 (defn local-process []
   "Create a local process event"
   (proxy [SyncProcessEvent] []
-    (launchWorker [supervisorData stormId port workerId resources]
+    (launchLocalWorker [supervisorData stormId port workerId resources]
       (launch-local-worker supervisorData stormId port workerId resources))
-    (shutWorker [supervisorData workerId] (shutdown-local-worker supervisorData workerId))))
+    (shutWorker [supervisorData worker-manager workerId] (shutdown-local-worker supervisorData worker-manager workerId))))
 
 
 (defserverfn mk-local-supervisor [conf shared-context isupervisor]

http://git-wip-us.apache.org/repos/asf/storm/blob/a1e47352/storm-core/src/clj/org/apache/storm/testing.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/testing.clj b/storm-core/src/clj/org/apache/storm/testing.clj
index 7804747..5000fd3 100644
--- a/storm-core/src/clj/org/apache/storm/testing.clj
+++ b/storm-core/src/clj/org/apache/storm/testing.clj
@@ -25,7 +25,7 @@
            [org.apache.storm.utils]
            [org.apache.storm.zookeeper Zookeeper]
            [org.apache.storm ProcessSimulator]
-           [org.apache.storm.daemon.supervisor StandaloneSupervisor SupervisorData SupervisorManger SupervisorUtils])
+           [org.apache.storm.daemon.supervisor StandaloneSupervisor SupervisorData SupervisorManager SupervisorUtils SupervisorManager])
   (:import [java.io File])
   (:import [java.util HashMap ArrayList])
   (:import [java.util.concurrent.atomic AtomicInteger])
@@ -137,7 +137,8 @@
         supervisor-conf (merge (:daemon-conf cluster-map)
                                conf
                                {STORM-LOCAL-DIR tmp-dir
-                                SUPERVISOR-SLOTS-PORTS port-ids})
+                                SUPERVISOR-SLOTS-PORTS port-ids
+                                STORM-SUPERVISOR-WORKER-MANAGER-PLUGIN "org.apache.storm.daemon.supervisor.workermanager.DefaultWorkerManager"})
         id-fn (if id id (Utils/uuid))
         isupervisor (proxy [StandaloneSupervisor] []
                         (generateSupervisorId [] id-fn))
@@ -282,7 +283,7 @@
   ([timeout-ms apredicate]
     (while-timeout timeout-ms (not (apredicate))
       (Time/sleep 100))))
-(defn is-supervisor-waiting [^SupervisorManger supervisor]
+(defn is-supervisor-waiting [^SupervisorManager supervisor]
   (.isWaiting supervisor))
 
 (defn wait-until-cluster-waiting
@@ -415,15 +416,18 @@
 
 (defn mk-capture-shutdown-fn
   [capture-atom]
-    (fn [supervisorData workerId]
+    (fn [supervisorData worker-manager workerId]
       (let [conf (.getConf supervisorData)
             supervisor-id (.getSupervisorId supervisorData)
             port (find-worker-port conf workerId)
+            worker-pids (.getWorkerThreadPids supervisorData)
+            dead-workers (.getDeadWorkers supervisorData)
             existing (get @capture-atom [supervisor-id port] 0)]
         (log-message "mk-capture-shutdown-fn")
         (swap! capture-atom assoc [supervisor-id port] (inc existing))
-        (SupervisorUtils/shutWorker supervisorData workerId))))
-
+        (.shutdownWorker worker-manager supervisor-id workerId worker-pids)
+        (if (.cleanupWorker worker-manager workerId)
+          (.remove dead-workers workerId)))))
 (defmacro capture-changed-workers
   [& body]
   `(let [launch-captured# (atom {})

http://git-wip-us.apache.org/repos/asf/storm/blob/a1e47352/storm-core/src/jvm/org/apache/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/Config.java b/storm-core/src/jvm/org/apache/storm/Config.java
index 6ea8b0f..103e585 100644
--- a/storm-core/src/jvm/org/apache/storm/Config.java
+++ b/storm-core/src/jvm/org/apache/storm/Config.java
@@ -18,6 +18,7 @@
 package org.apache.storm;
 
 import org.apache.storm.container.ResourceIsolationInterface;
+import org.apache.storm.daemon.supervisor.workermanager.IWorkerManager;
 import org.apache.storm.scheduler.resource.strategies.eviction.IEvictionStrategy;
 import org.apache.storm.scheduler.resource.strategies.priority.ISchedulingPriorityStrategy;
 import org.apache.storm.scheduler.resource.strategies.scheduling.IStrategy;
@@ -2212,6 +2213,12 @@ public class Config extends HashMap<String, Object> {
     public static final Object STORM_RESOURCE_ISOLATION_PLUGIN = "storm.resource.isolation.plugin";
 
     /**
+     * The plugin to be used for manager worker
+     */
+    @isImplementationOfClass(implementsClass = IWorkerManager.class)
+    public static final Object STORM_SUPERVISOR_WORKER_MANAGER_PLUGIN = "storm.supervisor.worker.manager.plugin";
+
+    /**
      * CGroup Setting below
      */
 

http://git-wip-us.apache.org/repos/asf/storm/blob/a1e47352/storm-core/src/jvm/org/apache/storm/daemon/supervisor/DaemonCommon.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/DaemonCommon.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/DaemonCommon.java
deleted file mode 100644
index 3b7a18e..0000000
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/DaemonCommon.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.daemon.supervisor;
-
-public interface DaemonCommon {
-    boolean isWaiting();
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/a1e47352/storm-core/src/jvm/org/apache/storm/daemon/supervisor/StandaloneSupervisor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/StandaloneSupervisor.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/StandaloneSupervisor.java
index 4947c6f..a1fa798 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/StandaloneSupervisor.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/StandaloneSupervisor.java
@@ -57,7 +57,6 @@ public class StandaloneSupervisor implements ISupervisor {
     }
 
     @Override
-    // @return is vector which need be converted to be int
     public Object getMetadata() {
         Object ports = conf.get(Config.SUPERVISOR_SLOTS_PORTS);
         return ports;

http://git-wip-us.apache.org/repos/asf/storm/blob/a1e47352/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java
index 6124aef..1dd44a9 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java
@@ -61,8 +61,8 @@ public class Supervisor {
      * @return
      * @throws Exception
      */
-    public SupervisorManger mkSupervisor(final Map conf, IContext sharedContext, ISupervisor iSupervisor) throws Exception {
-        SupervisorManger supervisorManger = null;
+    public SupervisorManager mkSupervisor(final Map conf, IContext sharedContext, ISupervisor iSupervisor) throws Exception {
+        SupervisorManager supervisorManager = null;
         try {
             LOG.info("Starting Supervisor with conf {}", conf);
             iSupervisor.prepare(conf, ConfigUtils.supervisorIsupervisorDir(conf));
@@ -78,8 +78,8 @@ public class Supervisor {
             Integer heartbeatFrequency = Utils.getInt(conf.get(Config.SUPERVISOR_HEARTBEAT_FREQUENCY_SECS));
             supervisorData.getHeartbeatTimer().scheduleRecurring(0, heartbeatFrequency, hb);
 
-            Set<String> downdedStormId = SupervisorUtils.readDownLoadedStormIds(conf);
-            for (String stormId : downdedStormId) {
+            Set<String> downloadedStormIds = SupervisorUtils.readDownLoadedStormIds(conf);
+            for (String stormId : downloadedStormIds) {
                 SupervisorUtils.addBlobReferences(localizer, stormId, conf);
             }
             // do this after adding the references so we don't try to clean things being used
@@ -119,7 +119,7 @@ public class Supervisor {
                 eventTimer.scheduleRecurring(30, 30, new EventManagerPushCallback(runProfilerActionThread, syncSupEventManager));
             }
             LOG.info("Starting supervisor with id {} at host {}.", supervisorData.getSupervisorId(), supervisorData.getHostName());
-            supervisorManger = new SupervisorManger(supervisorData, syncSupEventManager, syncProcessManager);
+            supervisorManager = new SupervisorManager(supervisorData, syncSupEventManager, syncProcessManager);
         } catch (Throwable t) {
             if (Utils.exceptionCauseIsInstanceOf(InterruptedIOException.class, t)) {
                 throw t;
@@ -130,7 +130,7 @@ public class Supervisor {
                 Utils.exitProcess(13, "Error on initialization");
             }
         }
-        return supervisorManger;
+        return supervisorManager;
     }
 
     /**
@@ -138,7 +138,7 @@ public class Supervisor {
      */
     private void launch(ISupervisor iSupervisor) {
         LOG.info("Starting supervisor for storm version '{}'.", VersionInfo.getVersion());
-        SupervisorManger supervisorManager;
+        SupervisorManager supervisorManager;
         try {
             Map<Object, Object> conf = Utils.readStormConfig();
             if (ConfigUtils.isLocalMode(conf)) {

http://git-wip-us.apache.org/repos/asf/storm/blob/a1e47352/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorData.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorData.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorData.java
index 8c17edc..213457d 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorData.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorData.java
@@ -23,7 +23,7 @@ import org.apache.storm.cluster.ClusterStateContext;
 import org.apache.storm.cluster.ClusterUtils;
 import org.apache.storm.cluster.DaemonType;
 import org.apache.storm.cluster.IStormClusterState;
-import org.apache.storm.container.cgroup.CgroupManager;
+import org.apache.storm.daemon.supervisor.workermanager.IWorkerManager;
 import org.apache.storm.generated.LocalAssignment;
 import org.apache.storm.generated.ProfileRequest;
 import org.apache.storm.localizer.Localizer;
@@ -73,8 +73,8 @@ public class SupervisorData {
     private AtomicInteger syncRetry;
     private final Object downloadLock = new Object();
     private AtomicReference<Map<String, List<ProfileRequest>>> stormIdToProfileActions;
-    private CgroupManager resourceIsolationManager;
     private ConcurrentHashSet<String> deadWorkers;
+    private final IWorkerManager workerManager;
 
     public SupervisorData(Map conf, IContext sharedContext, ISupervisor iSupervisor) {
         this.conf = conf;
@@ -124,17 +124,8 @@ public class SupervisorData {
         this.assignmentVersions = new AtomicReference<Map<String, Map<String, Object>>>(new HashMap<String, Map<String, Object>>());
         this.syncRetry = new AtomicInteger(0);
         this.stormIdToProfileActions = new AtomicReference<Map<String, List<ProfileRequest>>>(new HashMap<String, List<ProfileRequest>>());
-        if (Utils.getBoolean(conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN_ENABLE), false)) {
-            try {
-                this.resourceIsolationManager = (CgroupManager) Utils.newInstance((String) conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN));
-                this.resourceIsolationManager.prepare(conf);
-                LOG.info("Using resource isolation plugin {} {}", conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN), resourceIsolationManager);
-            } catch (IOException e) {
-                throw Utils.wrapInRuntime(e);
-            }
-        } else {
-            this.resourceIsolationManager = null;
-        }
+        this.workerManager =  Utils.newInstance((String) conf.get(Config.STORM_SUPERVISOR_WORKER_MANAGER_PLUGIN));
+        this.workerManager.prepareWorker(conf, localizer);
     }
 
     public AtomicReference<Map<String, List<ProfileRequest>>> getStormIdToProfileActions() {
@@ -233,12 +224,11 @@ public class SupervisorData {
         this.assignmentVersions.set(assignmentVersions);
     }
 
-    public CgroupManager getResourceIsolationManager() {
-        return resourceIsolationManager;
-    }
-
     public ConcurrentHashSet getDeadWorkers() {
         return deadWorkers;
     }
 
+    public IWorkerManager getWorkerManager() {
+        return workerManager;
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/a1e47352/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorManager.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorManager.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorManager.java
new file mode 100644
index 0000000..d593d3c
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorManager.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import org.apache.storm.daemon.DaemonCommon;
+import org.apache.storm.daemon.supervisor.workermanager.IWorkerManager;
+import org.apache.storm.event.EventManager;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Map;
+
+public class SupervisorManager implements SupervisorDaemon, DaemonCommon, Runnable {
+
+    private static final Logger LOG = LoggerFactory.getLogger(SupervisorManager.class);
+    private final EventManager eventManager;
+    private final EventManager processesEventManager;
+    private SupervisorData supervisorData;
+
+    public SupervisorManager(SupervisorData supervisorData, EventManager eventManager, EventManager processesEventManager) {
+        this.eventManager = eventManager;
+        this.supervisorData = supervisorData;
+        this.processesEventManager = processesEventManager;
+    }
+
+    public void shutdown() {
+        LOG.info("Shutting down supervisor{}", supervisorData.getSupervisorId());
+        supervisorData.setActive(false);
+        try {
+            supervisorData.getHeartbeatTimer().close();
+            supervisorData.getEventTimer().close();
+            supervisorData.getBlobUpdateTimer().close();
+            eventManager.close();
+            processesEventManager.close();
+        } catch (Exception e) {
+            throw Utils.wrapInRuntime(e);
+        }
+        supervisorData.getStormClusterState().disconnect();
+    }
+
+    @Override
+    public void shutdownAllWorkers() {
+        Collection<String> workerIds = SupervisorUtils.supervisorWorkerIds(supervisorData.getConf());
+        IWorkerManager workerManager = supervisorData.getWorkerManager();
+        try {
+            for (String workerId : workerIds) {
+                workerManager.shutdownWorker(supervisorData.getSupervisorId(), workerId, supervisorData.getWorkerThreadPids());
+                boolean success = workerManager.cleanupWorker(workerId);
+                if (success){
+                    supervisorData.getDeadWorkers().remove(workerId);
+                }
+            }
+        } catch (Exception e) {
+            LOG.error("shutWorker failed");
+            throw Utils.wrapInRuntime(e);
+        }
+    }
+
+    @Override
+    public Map getConf() {
+        return supervisorData.getConf();
+    }
+
+    @Override
+    public String getId() {
+        return supervisorData.getSupervisorId();
+    }
+
+    @Override
+    public boolean isWaiting() {
+        if (!supervisorData.isActive()) {
+            return true;
+        }
+
+        if (supervisorData.getHeartbeatTimer().isTimerWaiting() && supervisorData.getEventTimer().isTimerWaiting() && eventManager.waiting()
+                && processesEventManager.waiting()) {
+            return true;
+        }
+        return false;
+    }
+
+    public void run() {
+        shutdown();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/a1e47352/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorManger.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorManger.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorManger.java
deleted file mode 100644
index 26f0aae..0000000
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorManger.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.daemon.supervisor;
-
-import org.apache.storm.event.EventManager;
-import org.apache.storm.utils.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Collection;
-import java.util.Map;
-
-public class SupervisorManger implements SupervisorDaemon, DaemonCommon, Runnable {
-
-    private static final Logger LOG = LoggerFactory.getLogger(SupervisorManger.class);
-    private final EventManager eventManager;
-    private final EventManager processesEventManager;
-    private SupervisorData supervisorData;
-
-    public SupervisorManger(SupervisorData supervisorData, EventManager eventManager, EventManager processesEventManager) {
-        this.eventManager = eventManager;
-        this.supervisorData = supervisorData;
-        this.processesEventManager = processesEventManager;
-    }
-
-    public void shutdown() {
-        LOG.info("Shutting down supervisor{}", supervisorData.getSupervisorId());
-        supervisorData.setActive(false);
-        try {
-            supervisorData.getHeartbeatTimer().close();
-            supervisorData.getEventTimer().close();
-            supervisorData.getBlobUpdateTimer().close();
-            eventManager.close();
-            processesEventManager.close();
-        } catch (Exception e) {
-            throw Utils.wrapInRuntime(e);
-        }
-        supervisorData.getStormClusterState().disconnect();
-    }
-
-    @Override
-    public void shutdownAllWorkers() {
-
-        Collection<String> workerIds = SupervisorUtils.supervisorWorkerIds(supervisorData.getConf());
-        try {
-            for (String workerId : workerIds) {
-                SupervisorUtils.shutWorker(supervisorData, workerId);
-            }
-        } catch (Exception e) {
-            LOG.error("shutWorker failed");
-            throw Utils.wrapInRuntime(e);
-        }
-    }
-
-    @Override
-    public Map getConf() {
-        return supervisorData.getConf();
-    }
-
-    @Override
-    public String getId() {
-        return supervisorData.getSupervisorId();
-    }
-
-    @Override
-    public boolean isWaiting() {
-        if (!supervisorData.isActive()) {
-            return true;
-        }
-
-        if (supervisorData.getHeartbeatTimer().isTimerWaiting() && supervisorData.getEventTimer().isTimerWaiting() && eventManager.waiting()
-                && processesEventManager.waiting()) {
-            return true;
-        }
-        return false;
-    }
-
-    public void run() {
-        shutdown();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/a1e47352/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java
index ae3422e..bb2525a 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java
@@ -50,10 +50,10 @@ public class SupervisorUtils {
         _instance = INSTANCE;
     }
 
-    public static Process workerLauncher(Map conf, String user, List<String> args, Map<String, String> environment, final String logPreFix,
-            final Utils.ExitCodeCallable exitCodeCallback, File dir) throws IOException {
+    public static Process processLauncher(Map conf, String user, List<String> args, Map<String, String> environment, final String logPreFix,
+                                          final Utils.ExitCodeCallable exitCodeCallback, File dir) throws IOException {
         if (StringUtils.isBlank(user)) {
-            throw new IllegalArgumentException("User cannot be blank when calling workerLauncher.");
+            throw new IllegalArgumentException("User cannot be blank when calling processLauncher.");
         }
         String wlinitial = (String) (conf.get(Config.SUPERVISOR_WORKER_LAUNCHER));
         String stormHome = ConfigUtils.concatIfNotNull(System.getProperty("storm.home"));
@@ -71,10 +71,10 @@ public class SupervisorUtils {
         return Utils.launchProcess(commands, environment, logPreFix, exitCodeCallback, dir);
     }
 
-    public static int workerLauncherAndWait(Map conf, String user, List<String> args, final Map<String, String> environment, final String logPreFix)
+    public static int processLauncherAndWait(Map conf, String user, List<String> args, final Map<String, String> environment, final String logPreFix)
             throws IOException {
         int ret = 0;
-        Process process = workerLauncher(conf, user, args, environment, logPreFix, null, null);
+        Process process = processLauncher(conf, user, args, environment, logPreFix, null, null);
         if (StringUtils.isNotBlank(logPreFix))
             Utils.readAndLogStream(logPreFix, process.getInputStream());
         try {
@@ -92,7 +92,7 @@ public class SupervisorUtils {
             List<String> commands = new ArrayList<>();
             commands.add("code-dir");
             commands.add(dir);
-            workerLauncherAndWait(conf, (String) (stormConf.get(Config.TOPOLOGY_SUBMITTER_USER)), commands, null, logPrefix);
+            processLauncherAndWait(conf, (String) (stormConf.get(Config.TOPOLOGY_SUBMITTER_USER)), commands, null, logPrefix);
         }
     }
 
@@ -102,7 +102,7 @@ public class SupervisorUtils {
         List<String> commands = new ArrayList<>();
         commands.add("rmr");
         commands.add(path);
-        SupervisorUtils.workerLauncherAndWait(conf, user, commands, null, logPreFix);
+        SupervisorUtils.processLauncherAndWait(conf, user, commands, null, logPreFix);
         if (Utils.checkFileExists(path)) {
             throw new RuntimeException(path + " was not deleted.");
         }
@@ -116,11 +116,11 @@ public class SupervisorUtils {
      * @return
      */
     public static Boolean shouldUncompressBlob(Map<String, Object> blobInfo) {
-        return new Boolean((String) blobInfo.get("uncompress"));
+        return Utils.getBoolean(blobInfo.get("uncompress"), false);
     }
 
     /**
-     * Remove a reference to a blob when its no longer needed
+     * Returns a list of LocalResources based on the blobstore-map passed in
      * 
      * @param blobstoreMap
      * @return
@@ -186,7 +186,7 @@ public class SupervisorUtils {
     }
 
     /**
-     * Returns map from worr id to heartbeat
+     * map from worker id to heartbeat
      *
      * @param conf
      * @return
@@ -265,89 +265,4 @@ public class SupervisorUtils {
         acls.add(new ACL((ZooDefs.Perms.READ ^ ZooDefs.Perms.CREATE), ZooDefs.Ids.ANYONE_ID_UNSAFE));
         return acls;
     }
-
-    public static void shutWorker(SupervisorData supervisorData, String workerId) throws IOException, InterruptedException {
-        LOG.info("Shutting down {}:{}", supervisorData.getSupervisorId(), workerId);
-        Map conf = supervisorData.getConf();
-        Collection<String> pids = Utils.readDirContents(ConfigUtils.workerPidsRoot(conf, workerId));
-        Integer shutdownSleepSecs = Utils.getInt(conf.get(Config.SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS));
-        Boolean asUser = Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false);
-        String user = ConfigUtils.getWorkerUser(conf, workerId);
-        String threadPid = supervisorData.getWorkerThreadPids().get(workerId);
-        if (StringUtils.isNotBlank(threadPid)) {
-            ProcessSimulator.killProcess(threadPid);
-        }
-
-        for (String pid : pids) {
-            if (asUser) {
-                List<String> commands = new ArrayList<>();
-                commands.add("signal");
-                commands.add(pid);
-                commands.add("15");
-                String logPrefix = "kill -15 " + pid;
-                SupervisorUtils.workerLauncherAndWait(conf, user, commands, null, logPrefix);
-            } else {
-                Utils.killProcessWithSigTerm(pid);
-            }
-        }
-
-        if (pids.size() > 0) {
-            LOG.info("Sleep {} seconds for execution of cleanup threads on worker.", shutdownSleepSecs);
-            Time.sleepSecs(shutdownSleepSecs);
-        }
-
-        for (String pid : pids) {
-            if (asUser) {
-                List<String> commands = new ArrayList<>();
-                commands.add("signal");
-                commands.add(pid);
-                commands.add("9");
-                String logPrefix = "kill -9 " + pid;
-                SupervisorUtils.workerLauncherAndWait(conf, user, commands, null, logPrefix);
-            } else {
-                Utils.forceKillProcess(pid);
-            }
-            String path = ConfigUtils.workerPidPath(conf, workerId, pid);
-            if (asUser) {
-                SupervisorUtils.rmrAsUser(conf, workerId, path);
-            } else {
-                try {
-                    LOG.debug("Removing path {}", path);
-                    new File(path).delete();
-                } catch (Exception e) {
-                    // on windows, the supervisor may still holds the lock on the worker directory
-                    // ignore
-                }
-            }
-        }
-        tryCleanupWorker(conf, supervisorData, workerId);
-        LOG.info("Shut down {}:{}", supervisorData.getSupervisorId(), workerId);
-
-    }
-
-    public static void tryCleanupWorker(Map conf, SupervisorData supervisorData, String workerId) {
-        try {
-            String workerRoot = ConfigUtils.workerRoot(conf, workerId);
-            if (Utils.checkFileExists(workerRoot)) {
-                if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
-                    SupervisorUtils.rmrAsUser(conf, workerId, workerRoot);
-                } else {
-                    Utils.forceDelete(ConfigUtils.workerHeartbeatsRoot(conf, workerId));
-                    Utils.forceDelete(ConfigUtils.workerPidsRoot(conf, workerId));
-                    Utils.forceDelete(ConfigUtils.workerTmpRoot(conf, workerId));
-                    Utils.forceDelete(ConfigUtils.workerRoot(conf, workerId));
-                }
-                ConfigUtils.removeWorkerUserWSE(conf, workerId);
-                supervisorData.getDeadWorkers().remove(workerId);
-            }
-            if (Utils.getBoolean(conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN_ENABLE), false)){
-                supervisorData.getResourceIsolationManager().releaseResourcesForWorker(workerId);
-            }
-        } catch (IOException e) {
-            LOG.warn("Failed to cleanup worker {}. Will retry later", workerId, e);
-        } catch (RuntimeException e) {
-            LOG.warn("Failed to cleanup worker {}. Will retry later", workerId, e);
-        }
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/a1e47352/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java
index 068c442..41fa01d 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java
@@ -21,6 +21,7 @@ import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.storm.Config;
 import org.apache.storm.container.cgroup.CgroupManager;
+import org.apache.storm.daemon.supervisor.workermanager.IWorkerManager;
 import org.apache.storm.generated.ExecutorInfo;
 import org.apache.storm.generated.LSWorkerHeartbeat;
 import org.apache.storm.generated.LocalAssignment;
@@ -88,13 +89,6 @@ public class SyncProcessEvent implements Runnable {
         this.localState = supervisorData.getLocalState();
     }
 
-
-    /**
-     * 1. to kill are those in allocated that are dead or disallowed 2. kill the ones that should be dead - read pids, kill -9 and individually remove file -
-     * rmr heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log) 3. of the rest, figure out what assignments aren't yet satisfied 4. generate new
-     * worker ids, write new "approved workers" to LS 5. create local dir for worker id 5. launch new workers (give worker-id, port, and supervisor-id) 6. wait
-     * for workers launch
-     */
     @Override
     public void run() {
         LOG.debug("Syncing processes");
@@ -132,7 +126,7 @@ public class SyncProcessEvent implements Runnable {
                 if (stateHeartbeat.getState() != State.VALID) {
                     LOG.info("Shutting down and clearing state for id {}, Current supervisor time: {}, State: {}, Heartbeat: {}", entry.getKey(), now,
                             stateHeartbeat.getState(), stateHeartbeat.getHeartbeat());
-                    shutWorker(supervisorData, entry.getKey());
+                    shutWorker(supervisorData, supervisorData.getWorkerManager(), entry.getKey());
                 }
             }
             // start new workers
@@ -244,261 +238,24 @@ public class SyncProcessEvent implements Runnable {
     /**
      * launch a worker in local mode.
      */
-    protected void launchWorker(SupervisorData supervisorData, String stormId, Long port, String workerId, WorkerResources resources) throws IOException {
+    protected void launchLocalWorker(SupervisorData supervisorData, String stormId, Long port, String workerId, WorkerResources resources) throws IOException {
         // port this function after porting worker to java
     }
 
-    protected String getWorkerClassPath(String stormJar, Map stormConf) {
-        List<String> topoClasspath = new ArrayList<>();
-        Object object = stormConf.get(Config.TOPOLOGY_CLASSPATH);
-
-        if (object instanceof List) {
-            topoClasspath.addAll((List<String>) object);
-        } else if (object instanceof String){
-            topoClasspath.add((String)object);
-        }else {
-            //ignore
-        }
-        String classPath = Utils.workerClasspath();
-        String classAddPath = Utils.addToClasspath(classPath, Arrays.asList(stormJar));
-        return Utils.addToClasspath(classAddPath, topoClasspath);
-    }
-
-    /**
-     * "Generates runtime childopts by replacing keys with topology-id, worker-id, port, mem-onheap"
-     * 
-     * @param value
-     * @param workerId
-     * @param stormId
-     * @param port
-     * @param memOnheap
-     */
-    public List<String> substituteChildopts(Object value, String workerId, String stormId, Long port, int memOnheap) {
-        List<String> rets = new ArrayList<>();
-        if (value instanceof String) {
-            String string = (String) value;
-            string = string.replace("%ID%", String.valueOf(port));
-            string = string.replace("%WORKER-ID%", workerId);
-            string = string.replace("%TOPOLOGY-ID%", stormId);
-            string = string.replace("%WORKER-PORT%", String.valueOf(port));
-            string = string.replace("%HEAP-MEM%", String.valueOf(memOnheap));
-            String[] strings = string.split("\\s+");
-            rets.addAll(Arrays.asList(strings));
-        } else if (value instanceof List) {
-            List<Object> objects = (List<Object>) value;
-            for (Object object : objects) {
-                String str = (String)object;
-                str = str.replace("%ID%", String.valueOf(port));
-                str = str.replace("%WORKER-ID%", workerId);
-                str = str.replace("%TOPOLOGY-ID%", stormId);
-                str = str.replace("%WORKER-PORT%", String.valueOf(port));
-                str = str.replace("%HEAP-MEM%", String.valueOf(memOnheap));
-                rets.add(str);
-            }
-        }
-        return rets;
-    }
-
-
-
-    /**
-     * launch a worker in distributed mode
-     * supervisorId for testing
-     * @throws IOException
-     */
-    protected void launchWorker(Map conf, String supervisorId, String assignmentId, String stormId, Long port, String workerId,
-            WorkerResources resources, CgroupManager cgroupManager, ConcurrentHashSet deadWorkers) throws IOException {
-
-        Boolean runWorkerAsUser = Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false);
-        String stormHome = ConfigUtils.concatIfNotNull(System.getProperty("storm.home"));
-        String stormOptions = ConfigUtils.concatIfNotNull(System.getProperty("storm.options"));
-        String stormConfFile = ConfigUtils.concatIfNotNull(System.getProperty("storm.conf.file"));
-        String workerTmpDir = ConfigUtils.workerTmpRoot(conf, workerId);
-
-        String stormLogDir = ConfigUtils.getLogDir();
-        String stormLogConfDir = (String) (conf.get(Config.STORM_LOG4J2_CONF_DIR));
-
-        String stormLog4j2ConfDir;
-        if (StringUtils.isNotBlank(stormLogConfDir)) {
-            if (Utils.isAbsolutePath(stormLogConfDir)) {
-                stormLog4j2ConfDir = stormLogConfDir;
-            } else {
-                stormLog4j2ConfDir = stormHome + Utils.FILE_PATH_SEPARATOR + stormLogConfDir;
-            }
-        } else {
-            stormLog4j2ConfDir = stormHome + Utils.FILE_PATH_SEPARATOR + "log4j2";
-        }
-
-        String stormRoot = ConfigUtils.supervisorStormDistRoot(conf, stormId);
-
-        String jlp = jlp(stormRoot, conf);
-
-        String stormJar = ConfigUtils.supervisorStormJarPath(stormRoot);
-
+    protected void launchDistributedWorker(IWorkerManager workerManager, Map conf, String supervisorId, String assignmentId, String stormId, Long port, String workerId,
+                                           WorkerResources resources, ConcurrentHashSet deadWorkers) throws IOException {
         Map stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId);
-
-        String workerClassPath = getWorkerClassPath(stormJar, stormConf);
-
-        Object topGcOptsObject = stormConf.get(Config.TOPOLOGY_WORKER_GC_CHILDOPTS);
-        List<String> topGcOpts = new ArrayList<>();
-        if (topGcOptsObject instanceof String) {
-            topGcOpts.add((String) topGcOptsObject);
-        } else if (topGcOptsObject instanceof List) {
-            topGcOpts.addAll((List<String>) topGcOptsObject);
-        }
-
-        int memOnheap = 0;
-        if (resources.get_mem_on_heap() > 0) {
-            memOnheap = (int) Math.ceil(resources.get_mem_on_heap());
-        } else {
-            //set the default heap memory size for supervisor-test
-            memOnheap = Utils.getInt(stormConf.get(Config.WORKER_HEAP_MEMORY_MB), 768);
-        }
-
-        int memoffheap = (int) Math.ceil(resources.get_mem_off_heap());
-
-        int cpu = (int) Math.ceil(resources.get_cpu());
-
-        List<String> gcOpts = null;
-
-        if (topGcOpts != null) {
-            gcOpts = substituteChildopts(topGcOpts, workerId, stormId, port, memOnheap);
-        } else {
-            gcOpts = substituteChildopts(conf.get(Config.WORKER_GC_CHILDOPTS), workerId, stormId, port, memOnheap);
-        }
-
-        Object topoWorkerLogwriterObject = stormConf.get(Config.TOPOLOGY_WORKER_LOGWRITER_CHILDOPTS);
-        List<String> topoWorkerLogwriterChildopts = new ArrayList<>();
-        if (topoWorkerLogwriterObject instanceof String) {
-            topoWorkerLogwriterChildopts.add((String) topoWorkerLogwriterObject);
-        } else if (topoWorkerLogwriterObject instanceof List) {
-            topoWorkerLogwriterChildopts.addAll((List<String>) topoWorkerLogwriterObject);
-        }
-
         String user = (String) stormConf.get(Config.TOPOLOGY_SUBMITTER_USER);
-
-        String logfileName = "worker.log";
-
-        String workersArtifacets = ConfigUtils.workerArtifactsRoot(conf);
-
-        String loggingSensitivity = (String) stormConf.get(Config.TOPOLOGY_LOGGING_SENSITIVITY);
-        if (loggingSensitivity == null) {
-            loggingSensitivity = "S3";
-        }
-
-        List<String> workerChildopts = substituteChildopts(conf.get(Config.WORKER_CHILDOPTS), workerId, stormId, port, memOnheap);
-
-        List<String> topWorkerChildopts = substituteChildopts(stormConf.get(Config.TOPOLOGY_WORKER_CHILDOPTS), workerId, stormId, port, memOnheap);
-
-        List<String> workerProfilerChildopts = null;
-        if (Utils.getBoolean(conf.get(Config.WORKER_PROFILER_ENABLED), false)) {
-            workerProfilerChildopts = substituteChildopts(conf.get(Config.WORKER_PROFILER_CHILDOPTS), workerId, stormId, port, memOnheap);
-        }else {
-            workerProfilerChildopts = new ArrayList<>();
-        }
-
-        Map<String, String> topEnvironment = new HashMap<String, String>();
-        Map<String, String> environment = (Map<String, String>) stormConf.get(Config.TOPOLOGY_ENVIRONMENT);
-        if (environment != null) {
-            topEnvironment.putAll(environment);
-        }
-        topEnvironment.put("LD_LIBRARY_PATH", jlp);
-
-        String log4jConfigurationFile = null;
-        if (System.getProperty("os.name").startsWith("Windows") && !stormLog4j2ConfDir.startsWith("file:")) {
-            log4jConfigurationFile = "file:///" + stormLog4j2ConfDir;
-        } else {
-            log4jConfigurationFile = stormLog4j2ConfDir;
-        }
-        log4jConfigurationFile = log4jConfigurationFile + Utils.FILE_PATH_SEPARATOR + "worker.xml";
-
-        List<String> commandList = new ArrayList<>();
-        commandList.add(SupervisorUtils.javaCmd("java"));
-        commandList.add("-cp");
-        commandList.add(workerClassPath);
-        commandList.addAll(topoWorkerLogwriterChildopts);
-        commandList.add("-Dlogfile.name=" + logfileName);
-        commandList.add("-Dstorm.home=" + stormHome);
-        commandList.add("-Dworkers.artifacts=" + workersArtifacets);
-        commandList.add("-Dstorm.id=" + stormId);
-        commandList.add("-Dworker.id=" + workerId);
-        commandList.add("-Dworker.port=" + port);
-        commandList.add("-Dstorm.log.dir=" + stormLogDir);
-        commandList.add("-Dlog4j.configurationFile=" + log4jConfigurationFile);
-        commandList.add("-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector");
-        commandList.add("org.apache.storm.LogWriter");
-
-        commandList.add(SupervisorUtils.javaCmd("java"));
-        commandList.add("-server");
-        commandList.addAll(workerChildopts);
-        commandList.addAll(topWorkerChildopts);
-        commandList.addAll(gcOpts);
-        commandList.addAll(workerProfilerChildopts);
-        commandList.add("-Djava.library.path=" + jlp);
-        commandList.add("-Dlogfile.name=" + logfileName);
-        commandList.add("-Dstorm.home=" + stormHome);
-        commandList.add("-Dworkers.artifacts=" + workersArtifacets);
-        commandList.add("-Dstorm.conf.file=" + stormConfFile);
-        commandList.add("-Dstorm.options=" + stormOptions);
-        commandList.add("-Dstorm.log.dir=" + stormLogDir);
-        commandList.add("-Djava.io.tmpdir=" + workerTmpDir);
-        commandList.add("-Dlogging.sensitivity=" + loggingSensitivity);
-        commandList.add("-Dlog4j.configurationFile=" + log4jConfigurationFile);
-        commandList.add("-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector");
-        commandList.add("-Dstorm.id=" + stormId);
-        commandList.add("-Dworker.id=" + workerId);
-        commandList.add("-Dworker.port=" + port);
-        commandList.add("-cp");
-        commandList.add(workerClassPath);
-        commandList.add("org.apache.storm.daemon.worker");
-        commandList.add(stormId);
-        commandList.add(assignmentId);
-        commandList.add(String.valueOf(port));
-        commandList.add(workerId);
-
-        // {"cpu" cpu "memory" (+ mem-onheap mem-offheap (int (Math/ceil (conf STORM-CGROUP-MEMORY-LIMIT-TOLERANCE-MARGIN-MB))))
-        if (Utils.getBoolean(conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN_ENABLE), false)) {
-            int cgRoupMem = (int) (Math.ceil((double) conf.get(Config.STORM_CGROUP_MEMORY_LIMIT_TOLERANCE_MARGIN_MB)));
-            int memoryValue = memoffheap + memOnheap + cgRoupMem;
-            int cpuValue = cpu;
-            Map<String, Number> map = new HashMap<>();
-            map.put("cpu", cpuValue);
-            map.put("memory", memoryValue);
-            cgroupManager.reserveResourcesForWorker(workerId, map);
-            commandList = cgroupManager.getLaunchCommand(workerId, commandList);
-        }
-
-        LOG.info("Launching worker with command: {}. ", Utils.shellCmd(commandList));
         writeLogMetadata(stormConf, user, workerId, stormId, port, conf);
         ConfigUtils.setWorkerUserWSE(conf, workerId, user);
         createArtifactsLink(conf, stormId, port, workerId);
 
         String logPrefix = "Worker Process " + workerId;
-        String workerDir = ConfigUtils.workerRoot(conf, workerId);
-
         if (deadWorkers != null)
             deadWorkers.remove(workerId);
         createBlobstoreLinks(conf, stormId, workerId);
-
         ProcessExitCallback processExitCallback = new ProcessExitCallback(logPrefix, workerId);
-        if (runWorkerAsUser) {
-            List<String> args = new ArrayList<>();
-            args.add("worker");
-            args.add(workerDir);
-            args.add(Utils.writeScript(workerDir, commandList, topEnvironment));
-            SupervisorUtils.workerLauncher(conf, user, args, null, logPrefix, processExitCallback, new File(workerDir));
-        } else {
-            Utils.launchProcess(commandList, topEnvironment, logPrefix, processExitCallback, new File(workerDir));
-        }
-    }
-
-    protected String jlp(String stormRoot, Map conf) {
-        String resourceRoot = stormRoot + Utils.FILE_PATH_SEPARATOR + ConfigUtils.RESOURCES_SUBDIR;
-        String os = System.getProperty("os.name").replaceAll("\\s+", "_");
-        String arch = System.getProperty("os.arch");
-        String archResourceRoot = resourceRoot + Utils.FILE_PATH_SEPARATOR + os + "-" + arch;
-        String ret = archResourceRoot + Utils.FILE_PATH_SEPARATOR + resourceRoot + Utils.FILE_PATH_SEPARATOR + conf.get(Config.JAVA_LIBRARY_PATH);
-        return ret;
+        workerManager.launchWorker(supervisorId, assignmentId, stormId, port, workerId, resources, processExitCallback);
     }
 
     protected Map<String, Integer> startNewWorkers(Map<Integer, String> newWorkerIds, Map<Integer, LocalAssignment> reassignExecutors) throws IOException {
@@ -528,10 +285,9 @@ public class SyncProcessEvent implements Runnable {
                 FileUtils.forceMkdir(new File(hbPath));
 
                 if (clusterMode.endsWith("distributed")) {
-                    launchWorker(conf, supervisorId, supervisorData.getAssignmentId(), stormId, port.longValue(), workerId, resources,
-                            supervisorData.getResourceIsolationManager(), supervisorData.getDeadWorkers());
+                    launchDistributedWorker(supervisorData.getWorkerManager(), conf, supervisorId, supervisorData.getAssignmentId(), stormId, port.longValue(), workerId, resources, supervisorData.getDeadWorkers());
                 } else if (clusterMode.endsWith("local")) {
-                    launchWorker(supervisorData, stormId, port.longValue(), workerId, resources);
+                    launchLocalWorker(supervisorData, stormId, port.longValue(), workerId, resources);
                 }
                 newValidWorkerIds.put(workerId, port);
 
@@ -559,9 +315,7 @@ public class SyncProcessEvent implements Runnable {
         }
         if (stormconf.get(Config.TOPOLOGY_GROUPS) != null) {
             List<String> topGroups = (List<String>) stormconf.get(Config.TOPOLOGY_GROUPS);
-            for (String group : topGroups){
-                logsGroups.add(group);
-            }
+            logsGroups.addAll(topGroups);
         }
         data.put(Config.LOGS_GROUPS, logsGroups.toArray());
 
@@ -609,7 +363,6 @@ public class SyncProcessEvent implements Runnable {
         }finally {
             writer.close();
         }
-
     }
 
     /**
@@ -665,8 +418,11 @@ public class SyncProcessEvent implements Runnable {
         }
     }
 
-    //for supervisor-test
-    public void shutWorker(SupervisorData supervisorData, String workerId) throws IOException, InterruptedException{
-        SupervisorUtils.shutWorker(supervisorData, workerId);
+    public void shutWorker(SupervisorData supervisorData, IWorkerManager workerManager, String workerId) throws IOException, InterruptedException{
+        workerManager.shutdownWorker(supervisorData.getSupervisorId(), workerId, supervisorData.getWorkerThreadPids());
+        boolean success = workerManager.cleanupWorker(workerId);
+        if (success){
+            supervisorData.getDeadWorkers().remove(workerId);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/a1e47352/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java
index 4c08014..47cf440 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java
@@ -109,6 +109,7 @@ public class SyncSupervisorEvent implements Runnable {
             LOG.debug("Checked Downloaded Ids {}", srashStormIds);
             LOG.debug("Downloaded Ids {}", downloadedStormIds);
             LOG.debug("Storm Ids Profiler Actions {}", stormIdToProfilerActions);
+
             // download code first
             // This might take awhile
             // - should this be done separately from usual monitoring?
@@ -204,12 +205,12 @@ public class SyncSupervisorEvent implements Runnable {
             List<ExecutorInfo> existExecutors = existingAssignment.get(port).get_executors();
             List<ExecutorInfo> newExecutors = newAssignment.get(port).get_executors();
             if (newExecutors.size() != existExecutors.size()) {
-                syncProcesses.shutWorker(supervisorData, vaildPortToWorkerIds.get(port));
+                syncProcesses.shutWorker(supervisorData, supervisorData.getWorkerManager(), vaildPortToWorkerIds.get(port));
                 continue;
             }
             for (ExecutorInfo executorInfo : newExecutors) {
                 if (!existExecutors.contains(executorInfo)) {
-                    syncProcesses.shutWorker(supervisorData, vaildPortToWorkerIds.get(port));
+                    syncProcesses.shutWorker(supervisorData, supervisorData.getWorkerManager(), vaildPortToWorkerIds.get(port));
                     break;
                 }
             }
@@ -353,7 +354,12 @@ public class SyncSupervisorEvent implements Runnable {
         } finally {
             blobStore.shutdown();
         }
-        FileUtils.moveDirectory(new File(tmproot), new File(stormroot));
+        try {
+            FileUtils.moveDirectory(new File(tmproot), new File(stormroot));
+        }catch (Exception e){
+            //igonre
+        }
+
         SupervisorUtils.setupStormCodeDir(conf, ConfigUtils.readSupervisorStormConf(conf, stormId), stormroot);
         ClassLoader classloader = Thread.currentThread().getContextClassLoader();
 
@@ -503,7 +509,7 @@ public class SyncSupervisorEvent implements Runnable {
     protected void setupBlobPermission(Map conf, String user, String path) throws IOException {
         if (Utils.getBoolean(Config.SUPERVISOR_RUN_WORKER_AS_USER, false)) {
             String logPrefix = "setup blob permissions for " + path;
-            SupervisorUtils.workerLauncherAndWait(conf, user, Arrays.asList("blob", path), null, logPrefix);
+            SupervisorUtils.processLauncherAndWait(conf, user, Arrays.asList("blob", path), null, logPrefix);
         }
 
     }
@@ -623,7 +629,7 @@ public class SyncSupervisorEvent implements Runnable {
             String workerId = entry.getKey();
             StateHeartbeat stateHeartbeat = entry.getValue();
             if (stateHeartbeat.getState() == State.DISALLOWED) {
-                syncProcesses.shutWorker(supervisorData, workerId);
+                syncProcesses.shutWorker(supervisorData, supervisorData.getWorkerManager(), workerId);
                 LOG.debug("{}'s state disallowed, so shutdown this worker");
             }
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/a1e47352/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java
index d39a679..ec29855 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java
@@ -171,7 +171,7 @@ public class RunProfilerActions implements Runnable {
             newCommands.add("profiler");
             newCommands.add(targetDir);
             newCommands.add(script);
-            SupervisorUtils.workerLauncher(conf, user, newCommands, environment, logPrefix, exitCodeCallable, targetFile);
+            SupervisorUtils.processLauncher(conf, user, newCommands, environment, logPrefix, exitCodeCallable, targetFile);
         } else {
             Utils.launchProcess(commands, environment, logPrefix, exitCodeCallable, targetFile);
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/a1e47352/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java
index f6b3ed6..5e7b6d3 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java
@@ -21,6 +21,7 @@ package org.apache.storm.daemon.supervisor.timer;
 import org.apache.storm.command.HealthCheck;
 import org.apache.storm.daemon.supervisor.SupervisorData;
 import org.apache.storm.daemon.supervisor.SupervisorUtils;
+import org.apache.storm.daemon.supervisor.workermanager.IWorkerManager;
 import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,12 +42,17 @@ public class SupervisorHealthCheck implements Runnable {
     @Override
     public void run() {
         Map conf = supervisorData.getConf();
+        IWorkerManager workerManager = supervisorData.getWorkerManager();
         int healthCode = HealthCheck.healthCheck(conf);
         Collection<String> workerIds = SupervisorUtils.supervisorWorkerIds(conf);
         if (healthCode != 0) {
             for (String workerId : workerIds) {
                 try {
-                    SupervisorUtils.shutWorker(supervisorData, workerId);
+                    workerManager.shutdownWorker(supervisorData.getSupervisorId(), workerId, supervisorData.getWorkerThreadPids());
+                    boolean success = workerManager.cleanupWorker(workerId);
+                    if (success){
+                        supervisorData.getDeadWorkers().remove(workerId);
+                    }
                 } catch (Exception e) {
                     throw Utils.wrapInRuntime(e);
                 }

http://git-wip-us.apache.org/repos/asf/storm/blob/a1e47352/storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/DefaultWorkerManager.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/DefaultWorkerManager.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/DefaultWorkerManager.java
new file mode 100644
index 0000000..b19fd89
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/DefaultWorkerManager.java
@@ -0,0 +1,397 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor.workermanager;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.storm.Config;
+import org.apache.storm.ProcessSimulator;
+import org.apache.storm.container.cgroup.CgroupManager;
+import org.apache.storm.daemon.supervisor.SupervisorUtils;
+import org.apache.storm.generated.WorkerResources;
+import org.apache.storm.localizer.Localizer;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+
+public class DefaultWorkerManager implements IWorkerManager {
+
+    private static Logger LOG = LoggerFactory.getLogger(DefaultWorkerManager.class);
+
+    private Map conf;
+    private CgroupManager resourceIsolationManager;
+    private boolean runWorkerAsUser;
+
+    @Override
+    public void prepareWorker(Map conf, Localizer localizer) {
+        this.conf = conf;
+        if (Utils.getBoolean(conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN_ENABLE), false)) {
+            try {
+                this.resourceIsolationManager = Utils.newInstance((String) conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN));
+                this.resourceIsolationManager.prepare(conf);
+                LOG.info("Using resource isolation plugin {} {}", conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN), resourceIsolationManager);
+            } catch (IOException e) {
+                throw Utils.wrapInRuntime(e);
+            }
+        } else {
+            this.resourceIsolationManager = null;
+        }
+        this.runWorkerAsUser = Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false);
+    }
+
+    @Override
+    public IWorkerResult launchWorker(String supervisorId, String assignmentId, String stormId, Long port, String workerId, WorkerResources resources,
+            Utils.ExitCodeCallable workerExitCallback) {
+        try {
+
+            String stormHome = ConfigUtils.concatIfNotNull(System.getProperty("storm.home"));
+            String stormOptions = ConfigUtils.concatIfNotNull(System.getProperty("storm.options"));
+            String stormConfFile = ConfigUtils.concatIfNotNull(System.getProperty("storm.conf.file"));
+            String workerTmpDir = ConfigUtils.workerTmpRoot(conf, workerId);
+
+            String stormLogDir = ConfigUtils.getLogDir();
+            String stormLogConfDir = (String) (conf.get(Config.STORM_LOG4J2_CONF_DIR));
+
+            String stormLog4j2ConfDir;
+            if (StringUtils.isNotBlank(stormLogConfDir)) {
+                if (Utils.isAbsolutePath(stormLogConfDir)) {
+                    stormLog4j2ConfDir = stormLogConfDir;
+                } else {
+                    stormLog4j2ConfDir = stormHome + Utils.FILE_PATH_SEPARATOR + stormLogConfDir;
+                }
+            } else {
+                stormLog4j2ConfDir = stormHome + Utils.FILE_PATH_SEPARATOR + "log4j2";
+            }
+
+            String stormRoot = ConfigUtils.supervisorStormDistRoot(conf, stormId);
+
+            String jlp = jlp(stormRoot, conf);
+
+            String stormJar = ConfigUtils.supervisorStormJarPath(stormRoot);
+
+            Map stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId);
+
+            String workerClassPath = getWorkerClassPath(stormJar, stormConf);
+
+            Object topGcOptsObject = stormConf.get(Config.TOPOLOGY_WORKER_GC_CHILDOPTS);
+            List<String> topGcOpts = new ArrayList<>();
+            if (topGcOptsObject instanceof String) {
+                topGcOpts.add((String) topGcOptsObject);
+            } else if (topGcOptsObject instanceof List) {
+                topGcOpts.addAll((List<String>) topGcOptsObject);
+            }
+
+            int memOnheap = 0;
+            if (resources.get_mem_on_heap() > 0) {
+                memOnheap = (int) Math.ceil(resources.get_mem_on_heap());
+            } else {
+                // set the default heap memory size for supervisor-test
+                memOnheap = Utils.getInt(stormConf.get(Config.WORKER_HEAP_MEMORY_MB), 768);
+            }
+
+            int memoffheap = (int) Math.ceil(resources.get_mem_off_heap());
+
+            int cpu = (int) Math.ceil(resources.get_cpu());
+
+            List<String> gcOpts = null;
+
+            if (topGcOpts.size() > 0) {
+                gcOpts = substituteChildopts(topGcOpts, workerId, stormId, port, memOnheap);
+            } else {
+                gcOpts = substituteChildopts(conf.get(Config.WORKER_GC_CHILDOPTS), workerId, stormId, port, memOnheap);
+            }
+
+            Object topoWorkerLogwriterObject = stormConf.get(Config.TOPOLOGY_WORKER_LOGWRITER_CHILDOPTS);
+            List<String> topoWorkerLogwriterChildopts = new ArrayList<>();
+            if (topoWorkerLogwriterObject instanceof String) {
+                topoWorkerLogwriterChildopts.add((String) topoWorkerLogwriterObject);
+            } else if (topoWorkerLogwriterObject instanceof List) {
+                topoWorkerLogwriterChildopts.addAll((List<String>) topoWorkerLogwriterObject);
+            }
+
+            String user = (String) stormConf.get(Config.TOPOLOGY_SUBMITTER_USER);
+
+            String logfileName = "worker.log";
+
+            String workersArtifacets = ConfigUtils.workerArtifactsRoot(conf);
+
+            String loggingSensitivity = (String) stormConf.get(Config.TOPOLOGY_LOGGING_SENSITIVITY);
+            if (loggingSensitivity == null) {
+                loggingSensitivity = "S3";
+            }
+
+            List<String> workerChildopts = substituteChildopts(conf.get(Config.WORKER_CHILDOPTS), workerId, stormId, port, memOnheap);
+
+            List<String> topWorkerChildopts = substituteChildopts(stormConf.get(Config.TOPOLOGY_WORKER_CHILDOPTS), workerId, stormId, port, memOnheap);
+
+            List<String> workerProfilerChildopts = null;
+            if (Utils.getBoolean(conf.get(Config.WORKER_PROFILER_ENABLED), false)) {
+                workerProfilerChildopts = substituteChildopts(conf.get(Config.WORKER_PROFILER_CHILDOPTS), workerId, stormId, port, memOnheap);
+            } else {
+                workerProfilerChildopts = new ArrayList<>();
+            }
+
+            Map<String, String> topEnvironment = new HashMap<String, String>();
+            Map<String, String> environment = (Map<String, String>) stormConf.get(Config.TOPOLOGY_ENVIRONMENT);
+            if (environment != null) {
+                topEnvironment.putAll(environment);
+            }
+            topEnvironment.put("LD_LIBRARY_PATH", jlp);
+
+            String log4jConfigurationFile = null;
+            if (System.getProperty("os.name").startsWith("Windows") && !stormLog4j2ConfDir.startsWith("file:")) {
+                log4jConfigurationFile = "file:///" + stormLog4j2ConfDir;
+            } else {
+                log4jConfigurationFile = stormLog4j2ConfDir;
+            }
+            log4jConfigurationFile = log4jConfigurationFile + Utils.FILE_PATH_SEPARATOR + "worker.xml";
+
+            List<String> commandList = new ArrayList<>();
+            commandList.add(SupervisorUtils.javaCmd("java"));
+            commandList.add("-cp");
+            commandList.add(workerClassPath);
+            commandList.addAll(topoWorkerLogwriterChildopts);
+            commandList.add("-Dlogfile.name=" + logfileName);
+            commandList.add("-Dstorm.home=" + stormHome);
+            commandList.add("-Dworkers.artifacts=" + workersArtifacets);
+            commandList.add("-Dstorm.id=" + stormId);
+            commandList.add("-Dworker.id=" + workerId);
+            commandList.add("-Dworker.port=" + port);
+            commandList.add("-Dstorm.log.dir=" + stormLogDir);
+            commandList.add("-Dlog4j.configurationFile=" + log4jConfigurationFile);
+            commandList.add("-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector");
+            commandList.add("org.apache.storm.LogWriter");
+
+            commandList.add(SupervisorUtils.javaCmd("java"));
+            commandList.add("-server");
+            commandList.addAll(workerChildopts);
+            commandList.addAll(topWorkerChildopts);
+            commandList.addAll(gcOpts);
+            commandList.addAll(workerProfilerChildopts);
+            commandList.add("-Djava.library.path=" + jlp);
+            commandList.add("-Dlogfile.name=" + logfileName);
+            commandList.add("-Dstorm.home=" + stormHome);
+            commandList.add("-Dworkers.artifacts=" + workersArtifacets);
+            commandList.add("-Dstorm.conf.file=" + stormConfFile);
+            commandList.add("-Dstorm.options=" + stormOptions);
+            commandList.add("-Dstorm.log.dir=" + stormLogDir);
+            commandList.add("-Djava.io.tmpdir=" + workerTmpDir);
+            commandList.add("-Dlogging.sensitivity=" + loggingSensitivity);
+            commandList.add("-Dlog4j.configurationFile=" + log4jConfigurationFile);
+            commandList.add("-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector");
+            commandList.add("-Dstorm.id=" + stormId);
+            commandList.add("-Dworker.id=" + workerId);
+            commandList.add("-Dworker.port=" + port);
+            commandList.add("-cp");
+            commandList.add(workerClassPath);
+            commandList.add("org.apache.storm.daemon.worker");
+            commandList.add(stormId);
+            commandList.add(assignmentId);
+            commandList.add(String.valueOf(port));
+            commandList.add(workerId);
+
+            // {"cpu" cpu "memory" (+ mem-onheap mem-offheap (int (Math/ceil (conf STORM-CGROUP-MEMORY-LIMIT-TOLERANCE-MARGIN-MB))))
+            if (resourceIsolationManager != null) {
+                int cGroupMem = (int) (Math.ceil((double) conf.get(Config.STORM_CGROUP_MEMORY_LIMIT_TOLERANCE_MARGIN_MB)));
+                int memoryValue = memoffheap + memOnheap + cGroupMem;
+                int cpuValue = cpu;
+                Map<String, Number> map = new HashMap<>();
+                map.put("cpu", cpuValue);
+                map.put("memory", memoryValue);
+                resourceIsolationManager.reserveResourcesForWorker(workerId, map);
+                commandList = resourceIsolationManager.getLaunchCommand(workerId, commandList);
+            }
+
+            LOG.info("Launching worker with command: {}. ", Utils.shellCmd(commandList));
+
+            String logPrefix = "Worker Process " + workerId;
+            String workerDir = ConfigUtils.workerRoot(conf, workerId);
+
+            if (runWorkerAsUser) {
+                List<String> args = new ArrayList<>();
+                args.add("worker");
+                args.add(workerDir);
+                args.add(Utils.writeScript(workerDir, commandList, topEnvironment));
+                SupervisorUtils.processLauncher(conf, user, args, null, logPrefix, workerExitCallback, new File(workerDir));
+            } else {
+                Utils.launchProcess(commandList, topEnvironment, logPrefix, workerExitCallback, new File(workerDir));
+            }
+        } catch (IOException e) {
+            throw Utils.wrapInRuntime(e);
+        }
+        return null;
+    }
+
+    @Override
+    public IWorkerResult shutdownWorker(String supervisorId, String workerId, Map<String, String> workerThreadPids) {
+        try {
+            LOG.info("Shutting down {}:{}", supervisorId, workerId);
+            Collection<String> pids = Utils.readDirContents(ConfigUtils.workerPidsRoot(conf, workerId));
+            Integer shutdownSleepSecs = Utils.getInt(conf.get(Config.SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS));
+            String user = ConfigUtils.getWorkerUser(conf, workerId);
+            String threadPid = workerThreadPids.get(workerId);
+            if (StringUtils.isNotBlank(threadPid)) {
+                ProcessSimulator.killProcess(threadPid);
+            }
+
+            for (String pid : pids) {
+                if (runWorkerAsUser) {
+                    List<String> commands = new ArrayList<>();
+                    commands.add("signal");
+                    commands.add(pid);
+                    commands.add("15");
+                    String logPrefix = "kill -15 " + pid;
+                    SupervisorUtils.processLauncherAndWait(conf, user, commands, null, logPrefix);
+                } else {
+                    Utils.killProcessWithSigTerm(pid);
+                }
+            }
+
+            if (pids.size() > 0) {
+                LOG.info("Sleep {} seconds for execution of cleanup threads on worker.", shutdownSleepSecs);
+                Time.sleepSecs(shutdownSleepSecs);
+            }
+
+            for (String pid : pids) {
+                if (runWorkerAsUser) {
+                    List<String> commands = new ArrayList<>();
+                    commands.add("signal");
+                    commands.add(pid);
+                    commands.add("9");
+                    String logPrefix = "kill -9 " + pid;
+                    SupervisorUtils.processLauncherAndWait(conf, user, commands, null, logPrefix);
+                } else {
+                    Utils.forceKillProcess(pid);
+                }
+                String path = ConfigUtils.workerPidPath(conf, workerId, pid);
+                if (runWorkerAsUser) {
+                    SupervisorUtils.rmrAsUser(conf, workerId, path);
+                } else {
+                    try {
+                        LOG.debug("Removing path {}", path);
+                        new File(path).delete();
+                    } catch (Exception e) {
+                        // on windows, the supervisor may still holds the lock on the worker directory
+                        // ignore
+                    }
+                }
+            }
+            LOG.info("Shut down {}:{}", supervisorId, workerId);
+        } catch (Exception e) {
+            throw Utils.wrapInRuntime(e);
+        }
+        return null;
+    }
+
+    @Override
+    public boolean cleanupWorker(String workerId) {
+        try {
+            String workerRoot = ConfigUtils.workerRoot(conf, workerId);
+            if (Utils.checkFileExists(workerRoot)) {
+                if (runWorkerAsUser) {
+                    SupervisorUtils.rmrAsUser(conf, workerId, workerRoot);
+                } else {
+                    Utils.forceDelete(ConfigUtils.workerHeartbeatsRoot(conf, workerId));
+                    Utils.forceDelete(ConfigUtils.workerPidsRoot(conf, workerId));
+                    Utils.forceDelete(ConfigUtils.workerTmpRoot(conf, workerId));
+                    Utils.forceDelete(ConfigUtils.workerRoot(conf, workerId));
+                }
+                ConfigUtils.removeWorkerUserWSE(conf, workerId);
+            }
+            if (resourceIsolationManager != null) {
+                resourceIsolationManager.releaseResourcesForWorker(workerId);
+            }
+            return true;
+        } catch (IOException e) {
+            LOG.warn("Failed to cleanup worker {}. Will retry later", workerId, e);
+        } catch (RuntimeException e) {
+            LOG.warn("Failed to cleanup worker {}. Will retry later", workerId, e);
+        }
+        return false;
+    }
+
+    @Override
+    public IWorkerResult resizeWorker(String supervisorId, String assignmentId, String stormId, Long port, String workerId, WorkerResources resources) {
+        return null;
+    }
+
+    protected String jlp(String stormRoot, Map conf) {
+        String resourceRoot = stormRoot + Utils.FILE_PATH_SEPARATOR + ConfigUtils.RESOURCES_SUBDIR;
+        String os = System.getProperty("os.name").replaceAll("\\s+", "_");
+        String arch = System.getProperty("os.arch");
+        String archResourceRoot = resourceRoot + Utils.FILE_PATH_SEPARATOR + os + "-" + arch;
+        String ret = archResourceRoot + Utils.FILE_PATH_SEPARATOR + resourceRoot + Utils.FILE_PATH_SEPARATOR + conf.get(Config.JAVA_LIBRARY_PATH);
+        return ret;
+    }
+
+    protected String getWorkerClassPath(String stormJar, Map stormConf) {
+        List<String> topoClasspath = new ArrayList<>();
+        Object object = stormConf.get(Config.TOPOLOGY_CLASSPATH);
+
+        if (object instanceof List) {
+            topoClasspath.addAll((List<String>) object);
+        } else if (object instanceof String) {
+            topoClasspath.add((String) object);
+        } else {
+            LOG.error("topology specific classpath is invaild");
+        }
+        String classPath = Utils.workerClasspath();
+        String classAddPath = Utils.addToClasspath(classPath, Arrays.asList(stormJar));
+        return Utils.addToClasspath(classAddPath, topoClasspath);
+    }
+
+    /**
+     * "Generates runtime childopts by replacing keys with topology-id, worker-id, port, mem-onheap"
+     *
+     * @param value
+     * @param workerId
+     * @param stormId
+     * @param port
+     * @param memOnheap
+     */
+    public List<String> substituteChildopts(Object value, String workerId, String stormId, Long port, int memOnheap) {
+        List<String> rets = new ArrayList<>();
+        if (value instanceof String) {
+            String string = (String) value;
+            string = string.replace("%ID%", String.valueOf(port));
+            string = string.replace("%WORKER-ID%", workerId);
+            string = string.replace("%TOPOLOGY-ID%", stormId);
+            string = string.replace("%WORKER-PORT%", String.valueOf(port));
+            string = string.replace("%HEAP-MEM%", String.valueOf(memOnheap));
+            String[] strings = string.split("\\s+");
+            rets.addAll(Arrays.asList(strings));
+        } else if (value instanceof List) {
+            List<Object> objects = (List<Object>) value;
+            for (Object object : objects) {
+                String str = (String) object;
+                str = str.replace("%ID%", String.valueOf(port));
+                str = str.replace("%WORKER-ID%", workerId);
+                str = str.replace("%TOPOLOGY-ID%", stormId);
+                str = str.replace("%WORKER-PORT%", String.valueOf(port));
+                str = str.replace("%HEAP-MEM%", String.valueOf(memOnheap));
+                rets.add(str);
+            }
+        }
+        return rets;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/a1e47352/storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/IWorkerManager.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/IWorkerManager.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/IWorkerManager.java
new file mode 100644
index 0000000..3b0912a
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/IWorkerManager.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor.workermanager;
+
+import org.apache.storm.generated.WorkerResources;
+import org.apache.storm.localizer.Localizer;
+import org.apache.storm.utils.Utils;
+
+import java.util.List;
+import java.util.Map;
+
+public interface IWorkerManager {
+    public void prepareWorker(Map conf, Localizer localizer);
+
+    IWorkerResult launchWorker(String supervisorId, String assignmentId, String stormId, Long port, String workerId, WorkerResources resources,
+                               Utils.ExitCodeCallable workerExitCallback);
+
+    IWorkerResult shutdownWorker(String supervisorId, String workerId, Map<String, String> workerThreadPids);
+
+    IWorkerResult resizeWorker(String supervisorId, String assignmentId, String stormId, Long port, String workerId, WorkerResources resources);
+
+    public boolean cleanupWorker(String workerId);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/a1e47352/storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/IWorkerResult.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/IWorkerResult.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/IWorkerResult.java
new file mode 100644
index 0000000..8bf5b14
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/IWorkerResult.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor.workermanager;
+
+public interface IWorkerResult {
+}