You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/09/19 21:01:36 UTC

[7/8] storm git commit: STORM-2018: Supervisor V2

STORM-2018: Supervisor V2


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/5a320461
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/5a320461
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/5a320461

Branch: refs/heads/master
Commit: 5a3204610734871f18ba38a29b271cfb814ff1bc
Parents: 4ce6f04
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Fri Aug 5 16:15:21 2016 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Mon Sep 19 15:57:57 2016 -0500

----------------------------------------------------------------------
 conf/defaults.yaml                              |   4 -
 log4j2/cluster.xml                              |   2 +-
 .../apache/storm/daemon/local_supervisor.clj    |  48 +-
 .../src/clj/org/apache/storm/daemon/nimbus.clj  |   5 +-
 .../src/clj/org/apache/storm/daemon/worker.clj  |   9 +-
 storm-core/src/clj/org/apache/storm/testing.clj |  53 +-
 storm-core/src/jvm/org/apache/storm/Config.java |  17 +-
 .../org/apache/storm/cluster/IStateStorage.java |   9 +-
 .../storm/cluster/IStormClusterState.java       |   2 +-
 .../storm/cluster/PaceMakerStateStorage.java    |   2 +-
 .../storm/cluster/StormClusterStateImpl.java    |  18 +-
 .../org/apache/storm/cluster/VersionedData.java |  36 +
 .../apache/storm/cluster/ZKStateStorage.java    |   2 +-
 .../org/apache/storm/command/KillWorkers.java   |  33 +-
 .../container/ResourceIsolationInterface.java   |  21 +-
 .../storm/container/cgroup/CgroupCommon.java    |   6 +-
 .../container/cgroup/CgroupCommonOperation.java |   2 +-
 .../storm/container/cgroup/CgroupManager.java   |  37 +-
 .../storm/daemon/supervisor/AdvancedFSOps.java  | 335 +++++++
 .../storm/daemon/supervisor/BasicContainer.java | 658 +++++++++++++
 .../supervisor/BasicContainerLauncher.java      |  62 ++
 .../storm/daemon/supervisor/Container.java      | 549 +++++++++++
 .../daemon/supervisor/ContainerLauncher.java    | 104 +++
 .../supervisor/ContainerRecoveryException.java  |  29 +
 .../daemon/supervisor/ExitCodeCallback.java     |  30 +
 .../storm/daemon/supervisor/Killable.java       |  50 +
 .../storm/daemon/supervisor/LocalContainer.java |  85 ++
 .../supervisor/LocalContainerLauncher.java      |  60 ++
 .../daemon/supervisor/ReadClusterState.java     | 327 +++++++
 .../daemon/supervisor/RunAsUserContainer.java   | 100 ++
 .../supervisor/RunAsUserContainerLauncher.java  |  60 ++
 .../apache/storm/daemon/supervisor/Slot.java    | 785 ++++++++++++++++
 .../apache/storm/daemon/supervisor/State.java   |  22 -
 .../storm/daemon/supervisor/StateHeartbeat.java |  45 -
 .../storm/daemon/supervisor/Supervisor.java     | 362 ++++++--
 .../daemon/supervisor/SupervisorDaemon.java     |  28 -
 .../storm/daemon/supervisor/SupervisorData.java | 234 -----
 .../daemon/supervisor/SupervisorManager.java    |  92 --
 .../daemon/supervisor/SupervisorUtils.java      | 170 ++--
 .../daemon/supervisor/SyncProcessEvent.java     | 448 ---------
 .../daemon/supervisor/SyncSupervisorEvent.java  | 612 ------------
 .../supervisor/timer/RunProfilerActions.java    | 211 -----
 .../supervisor/timer/SupervisorHealthCheck.java |  27 +-
 .../supervisor/timer/SupervisorHeartbeat.java   |  30 +-
 .../daemon/supervisor/timer/UpdateBlobs.java    |  16 +-
 .../workermanager/DefaultWorkerManager.java     | 429 ---------
 .../workermanager/IWorkerManager.java           |  35 -
 .../apache/storm/localizer/AsyncLocalizer.java  | 432 +++++++++
 .../org/apache/storm/localizer/ILocalizer.java  |  70 ++
 .../localizer/LocalDownloadedResource.java      | 146 +++
 .../LocalizedResourceRetentionSet.java          |   2 +-
 .../storm/localizer/LocalizedResourceSet.java   |   8 +-
 .../org/apache/storm/localizer/Localizer.java   |  38 +-
 .../jvm/org/apache/storm/task/ShellBolt.java    |   7 +-
 .../org/apache/storm/testing/FeederSpout.java   |  29 +-
 .../apache/storm/trident/util/TridentUtils.java |   2 +-
 .../jvm/org/apache/storm/utils/ConfigUtils.java |  86 +-
 .../org/apache/storm/utils/InprocMessaging.java |  69 +-
 .../src/jvm/org/apache/storm/utils/Time.java    |  26 +-
 .../src/jvm/org/apache/storm/utils/Utils.java   | 219 +----
 .../org/apache/storm/zookeeper/Zookeeper.java   |  17 +-
 .../org/apache/storm/integration_test.clj       |  67 --
 .../test/clj/org/apache/storm/metrics_test.clj  |   4 +-
 .../test/clj/org/apache/storm/nimbus_test.clj   |  10 +-
 .../storm/security/auth/drpc_auth_test.clj      |  36 +-
 .../clj/org/apache/storm/supervisor_test.clj    | 926 -------------------
 .../test/jvm/org/apache/storm/TestCgroups.java  |   3 +-
 .../daemon/supervisor/BasicContainerTest.java   | 484 ++++++++++
 .../storm/daemon/supervisor/ContainerTest.java  | 269 ++++++
 .../storm/daemon/supervisor/SlotTest.java       | 515 +++++++++++
 .../storm/executor/error/ReportErrorTest.java   |  76 ++
 .../storm/localizer/AsyncLocalizerTest.java     | 187 ++++
 .../LocalizedResourceRetentionSetTest.java      |  10 +-
 .../localizer/LocalizedResourceSetTest.java     |  12 +-
 74 files changed, 6163 insertions(+), 3888 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index 8392936..2a6f18d 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -296,10 +296,6 @@ storm.resource.isolation.plugin: "org.apache.storm.container.cgroup.CgroupManage
 # If storm.resource.isolation.plugin.enable is set to false the unit tests for cgroups will not run
 storm.resource.isolation.plugin.enable: false
 
-
-# Default plugin to use for manager worker
-storm.supervisor.worker.manager.plugin: org.apache.storm.daemon.supervisor.workermanager.DefaultWorkerManager
-
 # Configs for CGroup support
 storm.cgroup.hierarchy.dir: "/cgroup/storm_resources"
 storm.cgroup.resources:

http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/log4j2/cluster.xml
----------------------------------------------------------------------
diff --git a/log4j2/cluster.xml b/log4j2/cluster.xml
index e911823..a0ba3d1 100644
--- a/log4j2/cluster.xml
+++ b/log4j2/cluster.xml
@@ -18,7 +18,7 @@
 
 <configuration monitorInterval="60">
 <properties>
-    <property name="pattern">%d{yyyy-MM-dd HH:mm:ss.SSS} %c{1.} [%p] %msg%n</property>
+    <property name="pattern">%d{yyyy-MM-dd HH:mm:ss.SSS} %t %c{1.} [%p] %msg%n</property>
 </properties>
 <appenders>
     <RollingFile name="A1" immediateFlush="false"

http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj b/storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj
index 560ae3e..a02a8e2 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj
@@ -14,51 +14,15 @@
 ;; See the License for the specific language governing permissions and
 ;; limitations under the License.
 (ns org.apache.storm.daemon.local-supervisor
-  (:import [org.apache.storm.daemon.supervisor SyncProcessEvent SupervisorData Supervisor SupervisorUtils]
-           [org.apache.storm.utils Utils ConfigUtils]
-           [org.apache.storm ProcessSimulator])
-  (:use [org.apache.storm.daemon common]
-        [org.apache.storm log])
-  (:require [org.apache.storm.daemon [worker :as worker] ])
-  (:require [clojure.string :as str])
+  (:import [org.apache.storm.daemon.supervisor Supervisor]
+           [org.apache.storm.utils ConfigUtils])
+  (:use [org.apache.storm.daemon common])
   (:gen-class))
 
-(defn launch-local-worker [supervisorData stormId port workerId resources]
-  (let [conf (.getConf supervisorData)
-         pid (Utils/uuid)
-        worker (worker/mk-worker conf
-                 (.getSharedContext supervisorData)
-                 stormId
-                 (.getAssignmentId supervisorData)
-                 (int port)
-                 workerId)]
-    (ConfigUtils/setWorkerUserWSE conf workerId "")
-    (ProcessSimulator/registerProcess pid worker)
-    (.put (.getWorkerThreadPids supervisorData) workerId pid)))
-
-(defn shutdown-local-worker [supervisorData worker-manager workerId]
-  (log-message "shutdown-local-worker")
-  (let [supervisor-id (.getSupervisorId supervisorData)
-        worker-pids (.getWorkerThreadPids supervisorData)
-        dead-workers (.getDeadWorkers supervisorData)]
-    (.shutdownWorker worker-manager supervisor-id workerId worker-pids)
-    (if (.cleanupWorker worker-manager workerId)
-      (.remove dead-workers workerId))))
-
-(defn local-process []
-  "Create a local process event"
-  (proxy [SyncProcessEvent] []
-    (launchLocalWorker [supervisorData stormId port workerId resources]
-      (launch-local-worker supervisorData stormId port workerId resources))
-    (killWorker [supervisorData worker-manager workerId] (shutdown-local-worker supervisorData worker-manager workerId))))
-
-
 (defserverfn mk-local-supervisor [conf shared-context isupervisor]
-  (log-message "Starting local Supervisor with conf " conf)
   (if (not (ConfigUtils/isLocalMode conf))
     (throw
       (IllegalArgumentException. "Cannot start server in distrubuted mode!")))
-  (let [local-process (local-process)
-        supervisor-server (Supervisor.)]
-    (.setLocalSyncProcess supervisor-server local-process)
-    (.mkSupervisor supervisor-server conf shared-context isupervisor)))
\ No newline at end of file
+  (let [supervisor-server (Supervisor. conf shared-context isupervisor)]
+    (.launch supervisor-server)
+    supervisor-server))

http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
index 76b3917..1a83c0f 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
@@ -1616,7 +1616,7 @@
                :all-components all-components
                :launch-time-secs launch-time-secs
                :assignment assignment
-               :beats beats
+               :beats (or beats {})
                :topology topology
                :task->component task->component
                :base base}))
@@ -1995,7 +1995,8 @@
               executor-summaries (dofor [[executor [node port]] (:executor->node+port assignment)]
                                    (let [host (-> assignment :node->host (get node))
                                             heartbeat (.get beats (StatsUtil/convertExecutor executor))
-                                            hb (if heartbeat (.get heartbeat "heartbeat"))
+                                            heartbeat (or heartbeat {})
+                                            hb (.get heartbeat "heartbeat")
                                             excutorstats (if hb (.get hb "stats"))
                                             excutorstats (if excutorstats
                                                     (StatsUtil/thriftifyExecutorStats excutorstats))]

http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/clj/org/apache/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/worker.clj b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
index 222de36..6b165b2 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
@@ -36,13 +36,14 @@
   (:import [org.apache.storm.messaging TransportFactory])
   (:import [org.apache.storm.messaging TaskMessage IContext IConnection ConnectionWithStatus ConnectionWithStatus$Status DeserializingConnectionCallback])
   (:import [org.apache.storm.daemon Shutdownable StormCommon DaemonCommon])
+  (:import [org.apache.storm.daemon.supervisor AdvancedFSOps])
   (:import [org.apache.storm.serialization KryoTupleSerializer])
   (:import [org.apache.storm.generated StormTopology LSWorkerHeartbeat])
   (:import [org.apache.storm.tuple AddressedTuple Fields])
   (:import [org.apache.storm.task WorkerTopologyContext])
   (:import [org.apache.storm Constants])
   (:import [org.apache.storm.security.auth AuthUtils])
-  (:import [org.apache.storm.cluster ClusterStateContext DaemonType ZKStateStorage StormClusterStateImpl ClusterUtils IStateStorage])
+  (:import [org.apache.storm.cluster ClusterStateContext DaemonType ZKStateStorage StormClusterStateImpl ClusterUtils])
   (:import [javax.security.auth Subject])
   (:import [java.security PrivilegedExceptionAction])
   (:import [org.apache.logging.log4j LogManager])
@@ -272,8 +273,8 @@
         receive-queue-map (->> executor-receive-queue-map
                                (mapcat (fn [[e queue]] (for [t (executor->tasks e)] [t queue])))
                                (into {}))
-
-        topology (ConfigUtils/readSupervisorTopology conf storm-id)
+        ops (AdvancedFSOps/make conf)
+        topology (ConfigUtils/readSupervisorTopology conf storm-id ops)
         mq-context  (if mq-context
                       mq-context
                       (TransportFactory/makeContext storm-conf))]
@@ -404,7 +405,7 @@
                assignment (if (= version (:version (get @(:assignment-versions worker) storm-id)))
                             (:data (get @(:assignment-versions worker) storm-id))
                             (let [thriftify-assignment-version (.assignmentInfoWithVersion storm-cluster-state storm-id callback)
-                              new-assignment {:data (clojurify-assignment (.get thriftify-assignment-version (IStateStorage/DATA))) :version version}]
+                              new-assignment {:data (clojurify-assignment (.getData thriftify-assignment-version)) :version version}]
                               (swap! (:assignment-versions worker) assoc storm-id new-assignment)
                               (:data new-assignment)))
               my-assignment (-> assignment

http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/clj/org/apache/storm/testing.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/testing.clj b/storm-core/src/clj/org/apache/storm/testing.clj
index 801f9bd..0b5f9f7 100644
--- a/storm-core/src/clj/org/apache/storm/testing.clj
+++ b/storm-core/src/clj/org/apache/storm/testing.clj
@@ -25,7 +25,7 @@
            [org.apache.storm.utils]
            [org.apache.storm.zookeeper Zookeeper]
            [org.apache.storm ProcessSimulator]
-           [org.apache.storm.daemon.supervisor StandaloneSupervisor SupervisorData SupervisorManager SupervisorUtils SupervisorManager]
+           [org.apache.storm.daemon.supervisor Supervisor StandaloneSupervisor SupervisorUtils]
            [org.apache.storm.executor Executor]
            [java.util.concurrent.atomic AtomicBoolean])
   (:import [java.io File])
@@ -139,8 +139,7 @@
         supervisor-conf (merge (:daemon-conf cluster-map)
                                conf
                                {STORM-LOCAL-DIR tmp-dir
-                                SUPERVISOR-SLOTS-PORTS port-ids
-                                STORM-SUPERVISOR-WORKER-MANAGER-PLUGIN "org.apache.storm.daemon.supervisor.workermanager.DefaultWorkerManager"})
+                                SUPERVISOR-SLOTS-PORTS port-ids})
         id-fn (if id id (Utils/uuid))
         isupervisor (proxy [StandaloneSupervisor] []
                         (generateSupervisorId [] id-fn))
@@ -178,18 +177,19 @@
   (let [zk-tmp (local-temp-path)
         [zk-port zk-handle] (if-not (contains? daemon-conf STORM-ZOOKEEPER-SERVERS)
                               (Zookeeper/mkInprocessZookeeper zk-tmp nil))
+        nimbus-tmp (local-temp-path)
         daemon-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
                            {TOPOLOGY-SKIP-MISSING-KRYO-REGISTRATIONS true
                             ZMQ-LINGER-MILLIS 0
                             TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS false
                             TOPOLOGY-TRIDENT-BATCH-EMIT-INTERVAL-MILLIS 50
                             STORM-CLUSTER-MODE "local"
-                            BLOBSTORE-SUPERUSER (System/getProperty "user.name")}
+                            BLOBSTORE-SUPERUSER (System/getProperty "user.name")
+                            BLOBSTORE-DIR nimbus-tmp}
                            (if-not (contains? daemon-conf STORM-ZOOKEEPER-SERVERS)
                              {STORM-ZOOKEEPER-PORT zk-port
                               STORM-ZOOKEEPER-SERVERS ["localhost"]})
                            daemon-conf)
-        nimbus-tmp (local-temp-path)
         port-counter (mk-counter supervisor-slot-port-min)
         nimbus (nimbus/service-handler
                 (assoc daemon-conf STORM-LOCAL-DIR nimbus-tmp)
@@ -233,7 +233,7 @@
                            supervisors)]
     ;; tmp-dir will be taken care of by shutdown
     (reset! (:supervisors cluster-map) (remove-first finder-fn supervisors))
-    (.shutdown sup)))
+    (.close sup)))
 
 (defn kill-local-storm-cluster [cluster-map]
   (.shutdown (:nimbus cluster-map))
@@ -249,7 +249,7 @@
   (doseq [s @(:supervisors cluster-map)]
     (.shutdownAllWorkers s)
     ;; race condition here? will it launch the workers again?
-    (.shutdown s))
+    (.close s))
   (ProcessSimulator/killAllProcesses)
   (if (not-nil? (:zookeeper cluster-map))
     (do
@@ -285,7 +285,7 @@
   ([timeout-ms apredicate]
     (while-timeout timeout-ms (not (apredicate))
       (Time/sleep 100))))
-(defn is-supervisor-waiting [^SupervisorManager supervisor]
+(defn is-supervisor-waiting [^Supervisor supervisor]
   (.isWaiting supervisor))
 
 (defn wait-until-cluster-waiting
@@ -395,15 +395,6 @@
                                                                           executor->node+port)]
         (submit-local-topology nimbus storm-name conf topology)))))
 
-(defn mk-capture-launch-fn [capture-atom]
-  (fn [supervisorData stormId port workerId resources]
-    (let [conf (.getConf supervisorData)
-          supervisorId (.getSupervisorId supervisorData)
-          existing (get @capture-atom [supervisorId port] [])]
-      (log-message "mk-capture-launch-fn")
-      (ConfigUtils/setWorkerUserWSE conf workerId "")
-      (swap! capture-atom assoc [supervisorId port] (conj existing stormId)))))
-
 (defn find-worker-id
   [supervisor-conf port]
   (let [supervisor-state (ConfigUtils/supervisorState supervisor-conf)
@@ -430,23 +421,6 @@
         (.shutdownWorker worker-manager supervisor-id workerId worker-pids)
         (if (.cleanupWorker worker-manager workerId)
           (.remove dead-workers workerId)))))
-(defmacro capture-changed-workers
-  [& body]
-  `(let [launch-captured# (atom {})
-         shutdown-captured# (atom {})]
-     (with-var-roots [local-supervisor/launch-local-worker (mk-capture-launch-fn launch-captured#)
-                      local-supervisor/shutdown-local-worker (mk-capture-shutdown-fn shutdown-captured#)]
-                     ~@body
-                     {:launched @launch-captured#
-                      :shutdown @shutdown-captured#})))
-
-(defmacro capture-launched-workers
-  [& body]
-  `(:launched (capture-changed-workers ~@body)))
-
-(defmacro capture-shutdown-workers
-  [& body]
-  `(:shutdown (capture-changed-workers ~@body)))
 
 (defnk aggregated-stat
   [cluster-map storm-name stat-key :component-ids nil]
@@ -723,13 +697,18 @@
     (let [target (+ amt @(:last-spout-emit tracked-topology))
           track-id (-> tracked-topology :cluster ::track-id)
           waiting? (fn []
-                     (or (not= target (global-amt track-id "spout-emitted"))
-                         (not= (global-amt track-id "transferred")
-                               (global-amt track-id "processed"))))]
+                     (let [spout-emitted (global-amt track-id "spout-emitted")
+                           transferred (global-amt track-id "transferred")
+                           processed (global-amt track-id "processed")]
+                       (log-message "emitted " spout-emitted " target " target " transferred " transferred " processed " processed)
+                       (or (not= target spout-emitted)
+                           (not= transferred
+                                 processed))))]
       (while-timeout timeout-ms (waiting?)
                      ;; (println "Spout emitted: " (global-amt track-id "spout-emitted"))
                      ;; (println "Processed: " (global-amt track-id "processed"))
                      ;; (println "Transferred: " (global-amt track-id "transferred"))
+                    (advance-time-secs! 1)
                     (Thread/sleep (rand-int 200)))
       (reset! (:last-spout-emit tracked-topology) target))))
 

http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/Config.java b/storm-core/src/jvm/org/apache/storm/Config.java
index 43369bd..f38ca0f 100644
--- a/storm-core/src/jvm/org/apache/storm/Config.java
+++ b/storm-core/src/jvm/org/apache/storm/Config.java
@@ -18,7 +18,6 @@
 package org.apache.storm;
 
 import org.apache.storm.container.ResourceIsolationInterface;
-import org.apache.storm.daemon.supervisor.workermanager.IWorkerManager;
 import org.apache.storm.scheduler.resource.strategies.eviction.IEvictionStrategy;
 import org.apache.storm.scheduler.resource.strategies.priority.ISchedulingPriorityStrategy;
 import org.apache.storm.scheduler.resource.strategies.scheduling.IStrategy;
@@ -2264,20 +2263,14 @@ public class Config extends HashMap<String, Object> {
      * future releases.
      */
     @isString
-    public static final Object CLIENT_JAR_TRANSFORMER = "client.jartransformer.class";
+    public static final String CLIENT_JAR_TRANSFORMER = "client.jartransformer.class";
 
 
     /**
      * The plugin to be used for resource isolation
      */
     @isImplementationOfClass(implementsClass = ResourceIsolationInterface.class)
-    public static final Object STORM_RESOURCE_ISOLATION_PLUGIN = "storm.resource.isolation.plugin";
-
-    /**
-     * The plugin to be used for manager worker
-     */
-    @isImplementationOfClass(implementsClass = IWorkerManager.class)
-    public static final Object STORM_SUPERVISOR_WORKER_MANAGER_PLUGIN = "storm.supervisor.worker.manager.plugin";
+    public static final String STORM_RESOURCE_ISOLATION_PLUGIN = "storm.resource.isolation.plugin";
 
     /**
      * CGroup Setting below
@@ -2287,19 +2280,19 @@ public class Config extends HashMap<String, Object> {
      * root directory of the storm cgroup hierarchy
      */
     @isString
-    public static final Object STORM_CGROUP_HIERARCHY_DIR = "storm.cgroup.hierarchy.dir";
+    public static final String STORM_CGROUP_HIERARCHY_DIR = "storm.cgroup.hierarchy.dir";
 
     /**
      * resources to to be controlled by cgroups
      */
     @isStringList
-    public static final Object STORM_CGROUP_RESOURCES = "storm.cgroup.resources";
+    public static final String STORM_CGROUP_RESOURCES = "storm.cgroup.resources";
 
     /**
      * name for the cgroup hierarchy
      */
     @isString
-    public static final Object STORM_CGROUP_HIERARCHY_NAME = "storm.cgroup.hierarchy.name";
+    public static final String STORM_CGROUP_HIERARCHY_NAME = "storm.cgroup.hierarchy.name";
 
     /**
      * flag to determine whether to use a resource isolation plugin

http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/cluster/IStateStorage.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/cluster/IStateStorage.java b/storm-core/src/jvm/org/apache/storm/cluster/IStateStorage.java
index 0b6f043..aa731ff 100644
--- a/storm-core/src/jvm/org/apache/storm/cluster/IStateStorage.java
+++ b/storm-core/src/jvm/org/apache/storm/cluster/IStateStorage.java
@@ -41,10 +41,7 @@ import org.apache.zookeeper.data.ACL;
  * Never use the same paths with the *_hb* methods as you do with the others.
  */
 public interface IStateStorage {
-
-    public static final String DATA = "data";
-    public static final String VERSION = "version";
-
+    
     /**
      * Registers a callback function that gets called when CuratorEvents happen.
      * @param callback is a clojure IFn that accepts the type - translated to
@@ -157,9 +154,9 @@ public interface IStateStorage {
      * @param watch Whether or not to set a watch on the path. Watched paths
      * emit events which are consumed by functions registered with the
      * register method. Very useful for catching updates to nodes.
-     * @return An Map in the form {:data data :version version}
+     * @return the data with a version
      */
-    Map get_data_with_version(String path, boolean watch);
+    VersionedData<byte[]> get_data_with_version(String path, boolean watch);
 
     /**
      * Write a worker heartbeat at the path.

http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/cluster/IStormClusterState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/cluster/IStormClusterState.java b/storm-core/src/jvm/org/apache/storm/cluster/IStormClusterState.java
index b016997..a6f07ed 100644
--- a/storm-core/src/jvm/org/apache/storm/cluster/IStormClusterState.java
+++ b/storm-core/src/jvm/org/apache/storm/cluster/IStormClusterState.java
@@ -29,7 +29,7 @@ public interface IStormClusterState {
 
     public Assignment assignmentInfo(String stormId, Runnable callback);
 
-    public Map assignmentInfoWithVersion(String stormId, Runnable callback);
+    public VersionedData<Assignment> assignmentInfoWithVersion(String stormId, Runnable callback);
 
     public Integer assignmentVersion(String stormId, Runnable callback) throws Exception;
 

http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/cluster/PaceMakerStateStorage.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/cluster/PaceMakerStateStorage.java b/storm-core/src/jvm/org/apache/storm/cluster/PaceMakerStateStorage.java
index c42bd38..80a398e 100644
--- a/storm-core/src/jvm/org/apache/storm/cluster/PaceMakerStateStorage.java
+++ b/storm-core/src/jvm/org/apache/storm/cluster/PaceMakerStateStorage.java
@@ -104,7 +104,7 @@ public class PaceMakerStateStorage implements IStateStorage {
     }
 
     @Override
-    public Map get_data_with_version(String path, boolean watch) {
+    public VersionedData<byte[]> get_data_with_version(String path, boolean watch) {
         return stateStorage.get_data_with_version(path, watch);
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java b/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
index 4f02beb..972d778 100644
--- a/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
+++ b/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
@@ -17,7 +17,6 @@
  */
 package org.apache.storm.cluster;
 
-import clojure.lang.*;
 import org.apache.commons.lang.StringUtils;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.state.*;
@@ -164,21 +163,18 @@ public class StormClusterStateImpl implements IStormClusterState {
     }
 
     @Override
-    public Map assignmentInfoWithVersion(String stormId, Runnable callback) {
-        Map map = new HashMap();
+    public VersionedData<Assignment> assignmentInfoWithVersion(String stormId, Runnable callback) {
         if (callback != null) {
             assignmentInfoWithVersionCallback.put(stormId, callback);
         }
         Assignment assignment = null;
         Integer version = 0;
-        Map dataWithVersionMap = stateStorage.get_data_with_version(ClusterUtils.assignmentPath(stormId), callback != null);
-        if (dataWithVersionMap != null) {
-            assignment = ClusterUtils.maybeDeserialize((byte[]) dataWithVersionMap.get(IStateStorage.DATA), Assignment.class);
-            version = (Integer) dataWithVersionMap.get(IStateStorage.VERSION);
-        }
-        map.put(IStateStorage.DATA, assignment);
-        map.put(IStateStorage.VERSION, version);
-        return map;
+        VersionedData<byte[]> dataWithVersion = stateStorage.get_data_with_version(ClusterUtils.assignmentPath(stormId), callback != null);
+        if (dataWithVersion != null) {
+            assignment = ClusterUtils.maybeDeserialize(dataWithVersion.getData(), Assignment.class);
+            version = dataWithVersion.getVersion();
+        }
+        return new VersionedData<Assignment>(version, assignment);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/cluster/VersionedData.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/cluster/VersionedData.java b/storm-core/src/jvm/org/apache/storm/cluster/VersionedData.java
new file mode 100644
index 0000000..3de2a88
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/cluster/VersionedData.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.cluster;
+
+public class VersionedData<D> {
+    private final int version;
+    private final D data;
+    
+    public VersionedData(int version, D data) {
+        this.version = version;
+        this.data = data;
+    }
+    
+    public int getVersion() {
+        return version;
+    }
+    
+    public D getData() {
+        return data;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorage.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorage.java b/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorage.java
index 87e7dfc..e337b1f 100644
--- a/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorage.java
+++ b/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorage.java
@@ -211,7 +211,7 @@ public class ZKStateStorage implements IStateStorage {
     }
 
     @Override
-    public Map get_data_with_version(String path, boolean watch) {
+    public VersionedData<byte[]> get_data_with_version(String path, boolean watch) {
         return Zookeeper.getDataWithVersion(zkReader, path, watch);
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/command/KillWorkers.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/command/KillWorkers.java b/storm-core/src/jvm/org/apache/storm/command/KillWorkers.java
index 87a1459..c59efd7 100644
--- a/storm-core/src/jvm/org/apache/storm/command/KillWorkers.java
+++ b/storm-core/src/jvm/org/apache/storm/command/KillWorkers.java
@@ -17,39 +17,20 @@
  */
 package org.apache.storm.command;
 
+import java.io.File;
+import java.util.Map;
+
 import org.apache.storm.Config;
 import org.apache.storm.daemon.supervisor.StandaloneSupervisor;
-import org.apache.storm.daemon.supervisor.SupervisorData;
-import org.apache.storm.daemon.supervisor.SupervisorUtils;
-import org.apache.storm.daemon.supervisor.workermanager.IWorkerManager;
+import org.apache.storm.daemon.supervisor.Supervisor;
 import org.apache.storm.utils.ConfigUtils;
 
-import org.eclipse.jetty.util.ConcurrentHashSet;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.util.Collection;
-import java.util.Map;
-
 public class KillWorkers {
-    private static final Logger LOG = LoggerFactory.getLogger(KillWorkers.class);
-
     public static void main(String [] args) throws Exception {
-        Map conf = ConfigUtils.readStormConfig();
+        Map<String, Object> conf = ConfigUtils.readStormConfig();
         conf.put(Config.STORM_LOCAL_DIR, new File((String)conf.get(Config.STORM_LOCAL_DIR)).getCanonicalPath());
-        SupervisorData supervisorData = new SupervisorData(conf, null, new StandaloneSupervisor());
-        IWorkerManager workerManager = supervisorData.getWorkerManager();
-        Collection<String> workerIds = SupervisorUtils.supervisorWorkerIds(conf);
-        String supervisorId = supervisorData.getSupervisorId();
-        Map<String, String> workerToThreadPids = supervisorData.getWorkerThreadPids();
-        ConcurrentHashSet deadWorkers = supervisorData.getDeadWorkers();
-        for (String workerId : workerIds) {
-            LOG.info("Killing worker: {} through CLI.", workerId);
-            workerManager.shutdownWorker(supervisorId, workerId, workerToThreadPids);
-            if (workerManager.cleanupWorker(workerId)) {
-                deadWorkers.remove(workerId);
-            }
+        try (Supervisor supervisor = new Supervisor(conf, null, new StandaloneSupervisor())) {
+            supervisor.shutdownAllWorkers();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/container/ResourceIsolationInterface.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/container/ResourceIsolationInterface.java b/storm-core/src/jvm/org/apache/storm/container/ResourceIsolationInterface.java
index c5cad02..7bf0249 100644
--- a/storm-core/src/jvm/org/apache/storm/container/ResourceIsolationInterface.java
+++ b/storm-core/src/jvm/org/apache/storm/container/ResourceIsolationInterface.java
@@ -18,20 +18,29 @@
 
 package org.apache.storm.container;
 
+import java.io.IOException;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 /**
  * A plugin to support resource isolation and limitation within Storm
  */
 public interface ResourceIsolationInterface {
+    
+    /**
+     * Called when starting up
+     * @param conf the cluster config
+     * @throws IOException on any error.
+     */
+    void prepare(Map<String, Object> conf) throws IOException;
 
     /**
      * This function should be used prior to starting the worker to reserve resources for the worker
      * @param workerId worker id of the worker to start
      * @param resources set of resources to limit
      */
-    void reserveResourcesForWorker(String workerId, Map resources);
+    void reserveResourcesForWorker(String workerId, Map<String, Number> resources);
 
     /**
      * This function will be called when the worker needs to shutdown.  This function should include logic to clean up after a worker is shutdown
@@ -39,7 +48,6 @@ public interface ResourceIsolationInterface {
      */
     void releaseResourcesForWorker(String workerId);
 
-
     /**
      * After reserving resources for the worker (i.e. calling reserveResourcesForWorker). This function can be used
      * to get the modified command line to launch the worker with resource isolation
@@ -56,4 +64,13 @@ public interface ResourceIsolationInterface {
      */
     List<String> getLaunchCommandPrefix(String workerId);
 
+    /**
+     * Get the list of PIDs currently in an isolated container
+     * @param workerId the id of the worker to get these for
+     * @return the set of PIDs, this will be combined with
+     * other ways of getting PIDs. An Empty set if
+     * no PIDs are found.
+     * @throws IOException on any error
+     */
+    Set<Long> getRunningPIDs(String workerId) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCommon.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCommon.java b/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCommon.java
index b12fcc0..c8bb304 100755
--- a/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCommon.java
+++ b/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCommon.java
@@ -89,11 +89,11 @@ public class CgroupCommon implements CgroupCommonOperation {
     }
 
     @Override
-    public Set<Integer> getPids() throws IOException {
+    public Set<Long> getPids() throws IOException {
         List<String> stringPids = CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, CGROUP_PROCS));
-        Set<Integer> pids = new HashSet<Integer>();
+        Set<Long> pids = new HashSet<>();
         for (String task : stringPids) {
-            pids.add(Integer.valueOf(task));
+            pids.add(Long.valueOf(task));
         }
         return pids;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCommonOperation.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCommonOperation.java b/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCommonOperation.java
index 54368b6..eecba69 100755
--- a/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCommonOperation.java
+++ b/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCommonOperation.java
@@ -42,7 +42,7 @@ public interface CgroupCommonOperation {
     /**
      * get the PIDs of processes running in cgroup
      */
-    public Set<Integer> getPids() throws IOException;
+    public Set<Long> getPids() throws IOException;
 
     /**
      * to set notify_on_release config in cgroup

http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupManager.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupManager.java b/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupManager.java
index 80093b3..9856595 100644
--- a/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupManager.java
+++ b/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupManager.java
@@ -18,19 +18,11 @@
 
 package org.apache.storm.container.cgroup;
 
-import org.apache.commons.lang.ArrayUtils;
-import org.apache.storm.Config;
-import org.apache.storm.container.ResourceIsolationInterface;
-import org.apache.storm.container.cgroup.core.CpuCore;
-import org.apache.storm.container.cgroup.core.MemoryCore;
-import org.apache.storm.utils.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -38,6 +30,13 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.storm.Config;
+import org.apache.storm.container.ResourceIsolationInterface;
+import org.apache.storm.container.cgroup.core.CpuCore;
+import org.apache.storm.container.cgroup.core.MemoryCore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  * Class that implements ResourceIsolationInterface that manages cgroups
  */
@@ -51,15 +50,15 @@ public class CgroupManager implements ResourceIsolationInterface {
 
     private CgroupCommon rootCgroup;
 
-    private static String rootDir;
+    private String rootDir;
 
-    private Map conf;
+    private Map<String, Object> conf;
 
     /**
      * initialize intial data structures
      * @param conf storm confs
      */
-    public void prepare(Map conf) throws IOException {
+    public void prepare(Map<String, Object> conf) throws IOException {
         this.conf = conf;
         this.rootDir = Config.getCgroupRootDir(this.conf);
         if (this.rootDir == null) {
@@ -81,7 +80,7 @@ public class CgroupManager implements ResourceIsolationInterface {
     /**
      * initalize subsystems
      */
-    private void prepareSubSystem(Map conf) throws IOException {
+    private void prepareSubSystem(Map<String, Object> conf) throws IOException {
         List<SubSystemType> subSystemTypes = new LinkedList<>();
         for (String resource : Config.getCgroupStormResources(conf)) {
             subSystemTypes.add(SubSystemType.getSubSystem(resource));
@@ -118,7 +117,7 @@ public class CgroupManager implements ResourceIsolationInterface {
         }
     }
 
-    public void reserveResourcesForWorker(String workerId, Map resourcesMap) throws SecurityException {
+    public void reserveResourcesForWorker(String workerId, Map<String, Number> resourcesMap) throws SecurityException {
         Number cpuNum = null;
         // The manually set STORM_WORKER_CGROUP_CPU_LIMIT config on supervisor will overwrite resources assigned by RAS (Resource Aware Scheduler)
         if (this.conf.get(Config.STORM_WORKER_CGROUP_CPU_LIMIT) != null) {
@@ -211,4 +210,14 @@ public class CgroupManager implements ResourceIsolationInterface {
     public void close() throws IOException {
         this.center.deleteCgroup(this.rootCgroup);
     }
+
+    @Override
+    public Set<Long> getRunningPIDs(String workerId) throws IOException {
+        CgroupCommon workerGroup = new CgroupCommon(workerId, this.hierarchy, this.rootCgroup);
+        if (!this.rootCgroup.getChildren().contains(workerGroup)) {
+            LOG.warn("cgroup {} doesn't exist!", workerGroup);
+            return Collections.emptySet();
+        }
+        return workerGroup.getPids();
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/daemon/supervisor/AdvancedFSOps.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/AdvancedFSOps.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/AdvancedFSOps.java
new file mode 100644
index 0000000..361328e
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/AdvancedFSOps.java
@@ -0,0 +1,335 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.Writer;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+import java.nio.file.attribute.PosixFilePermission;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.storm.Config;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AdvancedFSOps {
+    private static final Logger LOG = LoggerFactory.getLogger(AdvancedFSOps.class);
+    
+    /**
+     * Factory to create a new AdvancedFSOps
+     * @param conf the configuration of the process
+     * @return the appropriate instance of the class for this config and environment.
+     */
+    public static AdvancedFSOps make(Map<String, Object> conf) {
+        if (Utils.isOnWindows()) {
+            return new AdvancedWindowsFSOps(conf);
+        }
+        if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
+            return new AdvancedRunAsUserFSOps(conf);
+        }
+        return new AdvancedFSOps();
+    }
+    
+    private static class AdvancedRunAsUserFSOps extends AdvancedFSOps {
+        private final Map<String, Object> _conf;
+        
+        public AdvancedRunAsUserFSOps(Map<String, Object> conf) {
+            if (Utils.isOnWindows()) {
+                throw new UnsupportedOperationException("ERROR: Windows doesn't support running workers as different users yet");
+            }
+            _conf = conf;
+        }
+        
+        @Override
+        public void setupBlobPermissions(File path, String user) throws IOException {
+            String logPrefix = "setup blob permissions for " + path;
+            SupervisorUtils.processLauncherAndWait(_conf, user, Arrays.asList("blob", path.toString()), null, logPrefix);
+        }
+        
+        @Override
+        public void deleteIfExists(File path, String user, String logPrefix) throws IOException {
+            String absolutePath = path.getAbsolutePath();
+            LOG.info("Deleting path {}", absolutePath);
+            if (user == null) {
+                user = Files.getOwner(path.toPath()).getName();
+            }
+            List<String> commands = new ArrayList<>();
+            commands.add("rmr");
+            commands.add(absolutePath);
+            SupervisorUtils.processLauncherAndWait(_conf, user, commands, null, logPrefix);
+            if (Utils.checkFileExists(absolutePath)) {
+                throw new RuntimeException(path + " was not deleted.");
+            }
+        }
+        
+        @Override
+        public void setupStormCodeDir(Map<String, Object> topologyConf, File path) throws IOException {
+            SupervisorUtils.setupStormCodeDir(_conf, topologyConf, path.getCanonicalPath());
+        }
+    }
+    
+    /**
+     * Operations that need to override the default ones when running on Windows
+     *
+     */
+    private static class AdvancedWindowsFSOps extends AdvancedFSOps {
+
+        public AdvancedWindowsFSOps(Map<String, Object> conf) {
+            if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
+                throw new RuntimeException("ERROR: Windows doesn't support running workers as different users yet");
+            }
+        }
+        
+        @Override
+        public void restrictDirectoryPermissions(File dir) throws IOException {
+            //NOOP, if windows gets support for run as user we will need to find a way to support this
+        }
+        
+        @Override
+        public void moveDirectoryPreferAtomic(File fromDir, File toDir) throws IOException {
+            // Files/move with non-empty directory doesn't work well on Windows
+            // This is not atomic but it does work
+            FileUtils.moveDirectory(fromDir, toDir);
+        }
+        
+        @Override
+        public boolean supportsAtomicDirectoryMove() {
+            // Files/move with non-empty directory doesn't work well on Windows
+            // FileUtils.moveDirectory is not atomic
+            return false;
+        }
+    }
+    
+    
+    protected AdvancedFSOps() {
+        //NOOP, but restricted permissions
+    }
+
+    /**
+     * Set directory permissions to (OWNER)RWX (GROUP)R-X (OTHER)---
+     * On some systems that do not support this, it may become a noop
+     * @param dir the directory to change permissions on
+     * @throws IOException on any error
+     */
+    public void restrictDirectoryPermissions(File dir) throws IOException {
+        Set<PosixFilePermission> perms = new HashSet<>(
+                Arrays.asList(PosixFilePermission.OWNER_READ, PosixFilePermission.OWNER_WRITE,
+                        PosixFilePermission.OWNER_EXECUTE, PosixFilePermission.GROUP_READ,
+                        PosixFilePermission.GROUP_EXECUTE));
+        Files.setPosixFilePermissions(dir.toPath(), perms);
+    }
+
+    /**
+     * Move fromDir to toDir, and try to make it an atomic move if possible
+     * @param fromDir what to move
+     * @param toDir where to move it from
+     * @throws IOException on any error
+     */
+    public void moveDirectoryPreferAtomic(File fromDir, File toDir) throws IOException {
+        FileUtils.forceMkdir(toDir);
+        Files.move(fromDir.toPath(), toDir.toPath(), StandardCopyOption.ATOMIC_MOVE);
+    }
+    
+    /**
+     * @return true if an atomic directory move works, else false.
+     */
+    public boolean supportsAtomicDirectoryMove() {
+        return true;
+    }
+    
+    /**
+     * Copy a directory
+     * @param fromDir from where
+     * @param toDir to where
+     * @throws IOException on any error
+     */
+    public void copyDirectory(File fromDir, File toDir) throws IOException {
+        FileUtils.copyDirectory(fromDir, toDir);
+    }
+    
+    /**
+     * Setup permissions properly for an internal blob store path
+     * @param path the path to set the permissions on
+     * @param user the user to change the permissions for
+     * @throws IOException on any error
+     */
+    public void setupBlobPermissions(File path, String user) throws IOException {
+        //Normally this is a NOOP
+    }
+
+    /**
+     * Delete a file or a directory and all of the children. If it exists.
+     * @param path what to delete
+     * @param user who to delete it as if doing it as someone else is supported
+     * @param logPrefix if an external process needs to be launched to delete 
+     * the object what prefix to include in the logs
+     * @throws IOException on any error.
+     */
+    public void deleteIfExists(File path, String user, String logPrefix) throws IOException {
+        //by default no need to do this as a different user
+        deleteIfExists(path);
+    }
+    
+    /**
+     * Delete a file or a directory and all of the children. If it exists.
+     * @param path what to delete
+     * @throws IOException on any error.
+     */
+    public void deleteIfExists(File path) throws IOException {
+        LOG.info("Deleting path {}", path);
+        Path p = path.toPath();
+        if (Files.exists(p)) {
+            try {
+                FileUtils.forceDelete(path);
+            } catch (FileNotFoundException ignored) {}
+        }
+    }
+
+    /**
+     * Setup the permissions for the storm code dir
+     * @param topologyConf the config of the Topology
+     * @param path the directory to set the permissions on
+     * @throws IOException on any error
+     */
+    public void setupStormCodeDir(Map<String, Object> topologyConf, File path) throws IOException {
+        //By default this is a NOOP
+    }
+
+    /**
+     * Sanity check if everything the topology needs is there for it to run.
+     * @param conf the config of the supervisor
+     * @param topologyId the ID of the topology
+     * @return true if everything is there, else false
+     * @throws IOException on any error
+     */
+    public boolean doRequiredTopoFilesExist(Map<String, Object> conf, String topologyId) throws IOException {
+        return SupervisorUtils.doRequiredTopoFilesExist(conf, topologyId);
+    }
+    
+    /**
+     * Makes a directory, including any necessary but nonexistent parent
+     * directories. 
+     *
+     * @param path the directory to create
+     * @throws IOException on any error
+     */
+    public void forceMkdir(File path) throws IOException {
+        FileUtils.forceMkdir(path);
+    }
+    
+    /**
+     * Check if a file exists or not
+     * @param path the path to check
+     * @return true if it exists else false
+     * @throws IOException on any error.
+     */
+    public boolean fileExists(File path) throws IOException {
+        return path.exists();
+    }
+
+    /**
+     * Get a writer for the given location 
+     * @param file the file to write to
+     * @return the Writer to use.
+     * @throws IOException on any error
+     */
+    public Writer getWriter(File file) throws IOException {
+        return new FileWriter(file);
+    }
+    
+    /**
+     * Get an output stream to write to a given file
+     * @param file the file to write to
+     * @return an OutputStream for that file
+     * @throws IOException on any error
+     */
+    public OutputStream getOutputStream(File file) throws IOException {
+        return new FileOutputStream(file);
+    }
+    
+    /**
+     * Dump a string to a file
+     * @param location where to write to
+     * @param data the data to write
+     * @throws IOException on any error
+     */
+    public void dump(File location, String data) throws IOException {
+        File parent = location.getParentFile();
+        if (!parent.exists()) {
+            forceMkdir(parent);
+        }
+        try (Writer w = getWriter(location)) {
+            w.write(data);
+        }
+    }
+    
+    /**
+     * Read the contents of a file into a String
+     * @param location the file to read
+     * @return the contents of the file
+     * @throws IOException on any error
+     */
+    public String slurpString(File location) throws IOException {
+        return FileUtils.readFileToString(location, "UTF-8");
+    }
+
+    /**
+     * Read the contents of a file into a byte array.
+     * @param localtion the file to read
+     * @return the contents of the file
+     * @throws IOException on any error
+     */ 
+    public byte[] slurp(File location) throws IOException {
+        return FileUtils.readFileToByteArray(location);
+    }
+
+    /**
+     * Create a symbolic link pointing at target
+     * @param link the link to create
+     * @param target where it should point to
+     * @throws IOException on any error.
+     */
+    public void createSymlink(File link, File target) throws IOException {
+        Path plink = link.toPath().toAbsolutePath();
+        Path ptarget = target.toPath().toAbsolutePath();
+        LOG.debug("Creating symlink [{}] to [{}]", plink, ptarget);
+        if (Files.exists(plink)) {
+            if (Files.isSameFile(plink, ptarget)) {
+                //It already points where we want it to
+                return;
+            }
+            FileUtils.forceDelete(link);
+        }
+        Files.createSymbolicLink(plink, ptarget);
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/daemon/supervisor/BasicContainer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/BasicContainer.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/BasicContainer.java
new file mode 100644
index 0000000..efaa352
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/BasicContainer.java
@@ -0,0 +1,658 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.stream.Collectors;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.storm.Config;
+import org.apache.storm.container.ResourceIsolationInterface;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.generated.ProfileAction;
+import org.apache.storm.generated.ProfileRequest;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.generated.WorkerResources;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+
+/**
+ * A container that runs processes on the local box.
+ */
+public class BasicContainer extends Container {
+    private static final Logger LOG = LoggerFactory.getLogger(BasicContainer.class);
+    private static final FilenameFilter jarFilter = (dir, name) -> name.endsWith(".jar");
+    private static final Joiner CPJ = 
+            Joiner.on(Utils.CLASS_PATH_SEPARATOR).skipNulls();
+    
+    protected final LocalState _localState;
+    protected final String _profileCmd;
+    protected final String _stormHome = System.getProperty("storm.home");
+    protected volatile boolean _exitedEarly = false;
+
+    private class ProcessExitCallback implements ExitCodeCallback {
+        private final String _logPrefix;
+
+        public ProcessExitCallback(String logPrefix) {
+            _logPrefix = logPrefix;
+        }
+
+        @Override
+        public void call(int exitCode) {
+            LOG.info("{} exited with code: {}", _logPrefix, exitCode);
+            _exitedEarly = true;
+        }
+    }
+    
+    /**
+     * Create a new BasicContainer
+     * @param type the type of container being made.
+     * @param conf the supervisor config
+     * @param supervisorId the ID of the supervisor this is a part of.
+     * @param port the port the container is on.  Should be <= 0 if only a partial recovery
+     * @param assignment the assignment for this container. Should be null if only a partial recovery.
+     * @param resourceIsolationManager used to isolate resources for a container can be null if no isolation is used.
+     * @param localState the local state of the supervisor.  May be null if partial recovery
+     * @param workerId the id of the worker to use.  Must not be null if doing a partial recovery.
+     */
+    public BasicContainer(ContainerType type, Map<String, Object> conf, String supervisorId, int port,
+            LocalAssignment assignment, ResourceIsolationInterface resourceIsolationManager,
+            LocalState localState, String workerId) throws IOException {
+        this(type, conf, supervisorId, port, assignment, resourceIsolationManager, localState, workerId, null, null, null);
+    }
+    
+    /**
+     * Create a new BasicContainer
+     * @param type the type of container being made.
+     * @param conf the supervisor config
+     * @param supervisorId the ID of the supervisor this is a part of.
+     * @param port the port the container is on.  Should be <= 0 if only a partial recovery
+     * @param assignment the assignment for this container. Should be null if only a partial recovery.
+     * @param resourceIsolationManager used to isolate resources for a container can be null if no isolation is used.
+     * @param localState the local state of the supervisor.  May be null if partial recovery
+     * @param workerId the id of the worker to use.  Must not be null if doing a partial recovery.
+     * @param ops file system operations (mostly for testing) if null a new one is made
+     * @param topoConf the config of the topology (mostly for testing) if null 
+     * and not a partial recovery the real conf is read.
+     * @param profileCmd the command to use when profiling (used for testing)
+     * @throws IOException on any error
+     * @throws ContainerRecoveryException if the Container could not be recovered.
+     */
+    BasicContainer(ContainerType type, Map<String, Object> conf, String supervisorId, int port,
+            LocalAssignment assignment, ResourceIsolationInterface resourceIsolationManager,
+            LocalState localState, String workerId, Map<String, Object> topoConf, 
+            AdvancedFSOps ops, String profileCmd) throws IOException {
+        super(type, conf, supervisorId, port, assignment, resourceIsolationManager, workerId, topoConf, ops);
+        assert(localState != null);
+        _localState = localState;
+
+        if (type.isRecovery() && !type.isOnlyKillable()) {
+            synchronized (localState) {
+                String wid = null;
+                Map<String, Integer> workerToPort = localState.getApprovedWorkers();
+                for (Map.Entry<String, Integer> entry : workerToPort.entrySet()) {
+                    if (port == entry.getValue().intValue()) {
+                        wid = entry.getKey();
+                    }
+                }
+                if (wid == null) {
+                    throw new ContainerRecoveryException("Could not find worker id for " + port + " " + assignment);
+                }
+                LOG.info("Recovered Worker {}", wid);
+                _workerId = wid;
+            }
+        } else if (_workerId == null){
+            createNewWorkerId();
+        }
+
+        if (profileCmd == null) {
+            profileCmd = _stormHome + Utils.FILE_PATH_SEPARATOR + "bin" + Utils.FILE_PATH_SEPARATOR
+                    + conf.get(Config.WORKER_PROFILER_COMMAND);
+        }
+        _profileCmd = profileCmd;
+    }
+
+    /**
+     * Create a new worker ID for this process and store in in this object and
+     * in the local state.  Never call this if a worker is currently up and running.
+     * We will lose track of the process.
+     */
+    protected void createNewWorkerId() {
+        _type.assertFull();
+        assert(_workerId == null);
+        synchronized (_localState) {
+            _workerId = Utils.uuid();
+            Map<String, Integer> workerToPort = _localState.getApprovedWorkers();
+            if (workerToPort == null) {
+                workerToPort = new HashMap<>(1);
+            }
+            removeWorkersOn(workerToPort, _port);
+            workerToPort.put(_workerId, _port);
+            _localState.setApprovedWorkers(workerToPort);
+            LOG.info("Created Worker ID {}", _workerId);
+        }
+    }
+
+    private static void removeWorkersOn(Map<String, Integer> workerToPort, int _port) {
+        for (Iterator<Entry<String, Integer>> i = workerToPort.entrySet().iterator(); i.hasNext();) {
+            Entry<String, Integer> found = i.next();
+            if (_port == found.getValue().intValue()) {
+                LOG.warn("Deleting worker {} from state", found.getKey());
+                i.remove();
+            }
+        }
+    }
+
+    @Override
+    public void cleanUpForRestart() throws IOException {
+        String origWorkerId = _workerId;
+        super.cleanUpForRestart();
+        synchronized (_localState) {
+            Map<String, Integer> workersToPort = _localState.getApprovedWorkers();
+            workersToPort.remove(origWorkerId);
+            removeWorkersOn(workersToPort, _port);
+            _localState.setApprovedWorkers(workersToPort);
+            LOG.info("Removed Worker ID {}", origWorkerId);
+        }
+    }
+
+    @Override
+    public void relaunch() throws IOException {
+        _type.assertFull();
+        //We are launching it now...
+        _type = ContainerType.LAUNCH;
+        createNewWorkerId();
+        setup();
+        launch();
+    }
+
+    @Override
+    public boolean didMainProcessExit() {
+        return _exitedEarly;
+    }
+
+    /**
+     * Run the given command for profiling
+     * 
+     * @param command
+     *            the command to run
+     * @param env
+     *            the environment to run the command
+     * @param logPrefix
+     *            the prefix to include in the logs
+     * @param targetDir
+     *            the working directory to run the command in
+     * @return true if it ran successfully, else false
+     * @throws IOException
+     *             on any error
+     * @throws InterruptedException
+     *             if interrupted wile waiting for the process to exit.
+     */
+    protected boolean runProfilingCommand(List<String> command, Map<String, String> env, String logPrefix,
+            File targetDir) throws IOException, InterruptedException {
+        _type.assertFull();
+        Process p = SupervisorUtils.launchProcess(command, env, logPrefix, null, targetDir);
+        int ret = p.waitFor();
+        return ret == 0;
+    }
+
+    @Override
+    public boolean runProfiling(ProfileRequest request, boolean stop) throws IOException, InterruptedException {
+        _type.assertFull();
+        String targetDir = ConfigUtils.workerArtifactsRoot(_conf, _topologyId, _port);
+
+        @SuppressWarnings("unchecked")
+        Map<String, String> env = (Map<String, String>) _topoConf.get(Config.TOPOLOGY_ENVIRONMENT);
+        if (env == null) {
+            env = new HashMap<String, String>();
+        }
+
+        String str = ConfigUtils.workerArtifactsPidPath(_conf, _topologyId, _port);
+
+        String workerPid = _ops.slurpString(new File(str)).trim();
+
+        ProfileAction profileAction = request.get_action();
+        String logPrefix = "ProfilerAction process " + _topologyId + ":" + _port + " PROFILER_ACTION: " + profileAction
+                + " ";
+
+        List<String> command = mkProfileCommand(profileAction, stop, workerPid, targetDir);
+
+        File targetFile = new File(targetDir);
+        if (command.size() > 0) {
+            return runProfilingCommand(command, env, logPrefix, targetFile);
+        }
+        LOG.warn("PROFILING REQUEST NOT SUPPORTED {} IGNORED...", request);
+        return true;
+    }
+
+    /**
+     * Get the command to run when doing profiling
+     * @param action the profiling action to perform
+     * @param stop if this is meant to stop the profiling or start it
+     * @param workerPid the PID of the process to profile
+     * @param targetDir the current working directory of the worker process
+     * @return the command to run for profiling.
+     */
+    private List<String> mkProfileCommand(ProfileAction action, boolean stop, String workerPid, String targetDir) {
+        switch(action) {
+            case JMAP_DUMP:
+                return jmapDumpCmd(workerPid, targetDir);
+            case JSTACK_DUMP:
+                return jstackDumpCmd(workerPid, targetDir);
+            case JPROFILE_DUMP:
+                return jprofileDump(workerPid, targetDir);
+            case JVM_RESTART:
+                return jprofileJvmRestart(workerPid);
+            case JPROFILE_STOP:
+                if (stop) {
+                    return jprofileStop(workerPid, targetDir);
+                }
+                return jprofileStart(workerPid);
+            default:
+                return Lists.newArrayList();
+        }
+    }
+
+    private List<String> jmapDumpCmd(String pid, String targetDir) {
+        return Lists.newArrayList(_profileCmd, pid, "jmap", targetDir);
+    }
+
+    private List<String> jstackDumpCmd(String pid, String targetDir) {
+        return Lists.newArrayList(_profileCmd, pid, "jstack", targetDir);
+    }
+
+    private List<String> jprofileStart(String pid) {
+        return Lists.newArrayList(_profileCmd, pid, "start");
+    }
+
+    private List<String> jprofileStop(String pid, String targetDir) {
+        return Lists.newArrayList(_profileCmd, pid, "stop", targetDir);
+    }
+
+    private List<String> jprofileDump(String pid, String targetDir) {
+        return Lists.newArrayList(_profileCmd, pid, "dump", targetDir);
+    }
+
+    private List<String> jprofileJvmRestart(String pid) {
+        return Lists.newArrayList(_profileCmd, pid, "kill");
+    }
+
+    /**
+     * Compute the java.library.path that should be used for the worker.
+     * This helps it to load JNI libraries that are packaged in the uber jar.
+     * @param stormRoot the root directory of the worker process
+     * @param conf the config for the supervisor.
+     * @return the java.library.path/LD_LIBRARY_PATH to use so native libraries load correctly.
+     */
+    protected String javaLibraryPath(String stormRoot, Map<String, Object> conf) {
+        String resourceRoot = stormRoot + Utils.FILE_PATH_SEPARATOR + ConfigUtils.RESOURCES_SUBDIR;
+        String os = System.getProperty("os.name").replaceAll("\\s+", "_");
+        String arch = System.getProperty("os.arch");
+        String archResourceRoot = resourceRoot + Utils.FILE_PATH_SEPARATOR + os + "-" + arch;
+        String ret = CPJ.join(archResourceRoot, resourceRoot,
+                conf.get(Config.JAVA_LIBRARY_PATH));
+        return ret;
+    }
+
+    /**
+     * Returns a collection of jar file names found under the given directory.
+     * @param dir the directory to search
+     * @return the jar file names
+     */
+    protected List<String> getFullJars(File dir) {
+        File[] files = dir.listFiles(jarFilter);
+
+        if (files == null) {
+            return Collections.emptyList();
+        }
+
+        return Arrays.stream(files).map(f -> f.getAbsolutePath())
+                .collect(Collectors.toList());
+    }
+    
+    protected List<String> frameworkClasspath() {
+        File stormLibDir = new File(_stormHome, "lib");
+        String stormConfDir =
+                System.getenv("STORM_CONF_DIR") != null ?
+                System.getenv("STORM_CONF_DIR") :
+                new File(_stormHome, "conf").getAbsolutePath();
+        File stormExtlibDir = new File(_stormHome, "extlib");
+        String extcp = System.getenv("STORM_EXT_CLASSPATH");
+        List<String> pathElements = new LinkedList<>();
+        pathElements.addAll(getFullJars(stormLibDir));
+        pathElements.addAll(getFullJars(stormExtlibDir));
+        pathElements.add(extcp);
+        pathElements.add(stormConfDir);
+
+        return pathElements;
+    }
+    
+    @SuppressWarnings("unchecked")
+    private List<String> asStringList(Object o) {
+        if (o instanceof String) {
+            return Arrays.asList((String)o);
+        } else if (o instanceof List) {
+            return (List<String>)o;
+        }
+        return Collections.EMPTY_LIST;
+    }
+    
+    /**
+     * Compute the classpath for the worker process
+     * @param stormJar the topology jar
+     * @param dependencyLocations any dependencies from the topology
+     * @return the full classpath
+     */
+    protected String getWorkerClassPath(String stormJar, List<String> dependencyLocations) {
+        List<String> workercp = new ArrayList<>();
+        workercp.addAll(asStringList(_topoConf.get(Config.TOPOLOGY_CLASSPATH_BEGINNING)));
+        workercp.addAll(frameworkClasspath());
+        workercp.add(stormJar);
+        workercp.addAll(dependencyLocations);
+        workercp.addAll(asStringList(_topoConf.get(Config.TOPOLOGY_CLASSPATH)));
+        return CPJ.join(workercp);
+    }
+
+    private String substituteChildOptsInternal(String string, int memOnheap) {
+        if (StringUtils.isNotBlank(string)) {
+            String p = String.valueOf(_port);
+            string = string.replace("%ID%", p);
+            string = string.replace("%WORKER-ID%", _workerId);
+            string = string.replace("%TOPOLOGY-ID%", _topologyId);
+            string = string.replace("%WORKER-PORT%", p);
+            if (memOnheap > 0) {
+                string = string.replace("%HEAP-MEM%", String.valueOf(memOnheap));
+            }
+        }
+        return string;
+    }
+    
+    protected List<String> substituteChildopts(Object value) {
+        return substituteChildopts(value, -1);
+    }
+
+    protected List<String> substituteChildopts(Object value, int memOnheap) {
+        List<String> rets = new ArrayList<>();
+        if (value instanceof String) {
+            String string = substituteChildOptsInternal((String) value, memOnheap);
+            if (StringUtils.isNotBlank(string)) {
+                String[] strings = string.split("\\s+");
+                rets.addAll(Arrays.asList(strings));
+            }
+        } else if (value instanceof List) {
+            @SuppressWarnings("unchecked")
+            List<String> objects = (List<String>) value;
+            for (String object : objects) {
+                String str = substituteChildOptsInternal(object, memOnheap);
+                if (StringUtils.isNotBlank(str)) {
+                    rets.add(str);
+                }
+            }
+        }
+        return rets;
+    }
+
+    /**
+     * Launch the worker process (non-blocking)
+     * 
+     * @param command
+     *            the command to run
+     * @param env
+     *            the environment to run the command
+     * @param processExitcallback
+     *            a callback for when the process exits
+     * @param logPrefix
+     *            the prefix to include in the logs
+     * @param targetDir
+     *            the working directory to run the command in
+     * @return true if it ran successfully, else false
+     * @throws IOException
+     *             on any error
+     */
+    protected void launchWorkerProcess(List<String> command, Map<String, String> env, String logPrefix,
+            ExitCodeCallback processExitCallback, File targetDir) throws IOException {
+        if (_resourceIsolationManager != null) {
+          command = _resourceIsolationManager.getLaunchCommand(_workerId, command);
+        }
+        SupervisorUtils.launchProcess(command, env, logPrefix, processExitCallback, targetDir);
+    }
+
+    private String getWorkerLoggingConfigFile() {
+        String log4jConfigurationDir = (String) (_conf.get(Config.STORM_LOG4J2_CONF_DIR));
+
+        if (StringUtils.isNotBlank(log4jConfigurationDir)) {
+            if (!Utils.isAbsolutePath(log4jConfigurationDir)) {
+                log4jConfigurationDir = _stormHome + Utils.FILE_PATH_SEPARATOR + log4jConfigurationDir;
+            }
+        } else {
+            log4jConfigurationDir = _stormHome + Utils.FILE_PATH_SEPARATOR + "log4j2";
+        }
+ 
+        if (Utils.IS_ON_WINDOWS && !log4jConfigurationDir.startsWith("file:")) {
+            log4jConfigurationDir = "file:///" + log4jConfigurationDir;
+        }
+        return log4jConfigurationDir + Utils.FILE_PATH_SEPARATOR + "worker.xml";
+    }
+    
+    /**
+     * Get parameters for the class path of the worker process.  Also used by the
+     * log Writer
+     * @param stormRoot the root dist dir for the topology
+     * @return the classpath for the topology as command line arguments.
+     * @throws IOException on any error.
+     */
+    private List<String> getClassPathParams(final String stormRoot) throws IOException {
+        final String stormJar = ConfigUtils.supervisorStormJarPath(stormRoot);
+        final StormTopology stormTopology = ConfigUtils.readSupervisorTopology(_conf, _topologyId, _ops);
+        final List<String> dependencyLocations = new ArrayList<>();
+        if (stormTopology.get_dependency_jars() != null) {
+            for (String dependency : stormTopology.get_dependency_jars()) {
+                dependencyLocations.add(new File(stormRoot, dependency).getAbsolutePath());
+            }
+        }
+
+        if (stormTopology.get_dependency_artifacts() != null) {
+            for (String dependency : stormTopology.get_dependency_artifacts()) {
+                dependencyLocations.add(new File(stormRoot, dependency).getAbsolutePath());
+            }
+        }
+        final String workerClassPath = getWorkerClassPath(stormJar, dependencyLocations);
+        
+        List<String> classPathParams = new ArrayList<>();
+        classPathParams.add("-cp");
+        classPathParams.add(workerClassPath);
+        return classPathParams;
+    }
+    
+    /**
+     * Get a set of java properties that are common to both the log writer and the worker processes.
+     * These are mostly system properties that are used by logging.
+     * @return a list of command line options
+     */
+    private List<String> getCommonParams() {
+        final String workersArtifacts = ConfigUtils.workerArtifactsRoot(_conf);
+        String stormLogDir = ConfigUtils.getLogDir();
+        String log4jConfigurationFile = getWorkerLoggingConfigFile();
+        
+        List<String> commonParams = new ArrayList<>();
+        commonParams.add("-Dlogging.sensitivity=" + OR((String) _topoConf.get(Config.TOPOLOGY_LOGGING_SENSITIVITY), "S3"));
+        commonParams.add("-Dlogfile.name=worker.log");
+        commonParams.add("-Dstorm.home=" + OR(_stormHome, ""));
+        commonParams.add("-Dworkers.artifacts=" + workersArtifacts);
+        commonParams.add("-Dstorm.id=" + _topologyId);
+        commonParams.add("-Dworker.id=" + _workerId);
+        commonParams.add("-Dworker.port=" + _port);
+        commonParams.add("-Dstorm.log.dir=" + stormLogDir);
+        commonParams.add("-Dlog4j.configurationFile=" + log4jConfigurationFile);
+        commonParams.add("-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector");
+        commonParams.add("-Dstorm.local.dir=" + _conf.get(Config.STORM_LOCAL_DIR));
+        return commonParams;
+    }
+    
+    private int getMemOnHeap(WorkerResources resources) {
+        int memOnheap = 0;
+        if (resources != null && resources.is_set_mem_on_heap() && 
+                resources.get_mem_on_heap() > 0) {
+            memOnheap = (int) Math.ceil(resources.get_mem_on_heap());
+        } else {
+            // set the default heap memory size for supervisor-test
+            memOnheap = Utils.getInt(_topoConf.get(Config.WORKER_HEAP_MEMORY_MB), 768);
+        }
+        return memOnheap;
+    }
+    
+    private List<String> getWorkerProfilerChildOpts(int memOnheap) {
+        List<String> workerProfilerChildopts = new ArrayList<>();
+        if (Utils.getBoolean(_conf.get(Config.WORKER_PROFILER_ENABLED), false)) {
+            workerProfilerChildopts = substituteChildopts(_conf.get(Config.WORKER_PROFILER_CHILDOPTS), memOnheap);
+        }
+        return workerProfilerChildopts;
+    }
+    
+    /**
+     * a or b the first one that is not null
+     * @param a something
+     * @param b something else
+     * @return a or b the first one that is not null
+     */
+    private <V> V OR(V a, V b) {
+        return a == null ? b : a;
+    }
+    
+    protected String javaCmd(String cmd) {
+        String ret = null;
+        String javaHome = System.getenv().get("JAVA_HOME");
+        if (StringUtils.isNotBlank(javaHome)) {
+            ret = javaHome + Utils.FILE_PATH_SEPARATOR + "bin" + Utils.FILE_PATH_SEPARATOR + cmd;
+        } else {
+            ret = cmd;
+        }
+        return ret;
+    }
+    
+    /**
+     * Create the command to launch the worker process
+     * @param memOnheap the on heap memory for the worker
+     * @param stormRoot the root dist dir for the topology
+     * @param jlp java library path for the topology
+     * @return the command to run
+     * @throws IOException on any error.
+     */
+    private List<String> mkLaunchCommand(final int memOnheap, final String stormRoot,
+            final String jlp) throws IOException {
+        final String javaCmd = javaCmd("java");
+        final String stormOptions = ConfigUtils.concatIfNotNull(System.getProperty("storm.options"));
+        final String stormConfFile = ConfigUtils.concatIfNotNull(System.getProperty("storm.conf.file"));
+        final String workerTmpDir = ConfigUtils.workerTmpRoot(_conf, _workerId);
+        
+        List<String> classPathParams = getClassPathParams(stormRoot);
+        List<String> commonParams = getCommonParams();
+        
+        List<String> commandList = new ArrayList<>();
+        //Log Writer Command...
+        commandList.add(javaCmd);
+        commandList.addAll(classPathParams);
+        commandList.addAll(substituteChildopts(_topoConf.get(Config.TOPOLOGY_WORKER_LOGWRITER_CHILDOPTS)));
+        commandList.addAll(commonParams);
+        commandList.add("org.apache.storm.LogWriter"); //The LogWriter in turn launches the actual worker.
+
+        //Worker Command...
+        commandList.add(javaCmd);
+        commandList.add("-server");
+        commandList.addAll(commonParams);
+        commandList.addAll(substituteChildopts(_conf.get(Config.WORKER_CHILDOPTS), memOnheap));
+        commandList.addAll(substituteChildopts(_topoConf.get(Config.TOPOLOGY_WORKER_CHILDOPTS), memOnheap));
+        commandList.addAll(substituteChildopts(OR(
+                _topoConf.get(Config.TOPOLOGY_WORKER_GC_CHILDOPTS),
+                _conf.get(Config.WORKER_GC_CHILDOPTS)), memOnheap));
+        commandList.addAll(getWorkerProfilerChildOpts(memOnheap));
+        commandList.add("-Djava.library.path=" + jlp);
+        commandList.add("-Dstorm.conf.file=" + stormConfFile);
+        commandList.add("-Dstorm.options=" + stormOptions);
+        commandList.add("-Djava.io.tmpdir=" + workerTmpDir);
+        commandList.addAll(classPathParams);
+        commandList.add("org.apache.storm.daemon.worker");
+        commandList.add(_topologyId);
+        commandList.add(_supervisorId);
+        commandList.add(String.valueOf(_port));
+        commandList.add(_workerId);
+        
+        return commandList;
+    }
+
+    @Override
+    public void launch() throws IOException {
+        _type.assertFull();
+        LOG.info("Launching worker with assignment {} for this supervisor {} on port {} with id {}", _assignment,
+                _supervisorId, _port, _workerId);
+        String logPrefix = "Worker Process " + _workerId;
+        ProcessExitCallback processExitCallback = new ProcessExitCallback(logPrefix);
+        _exitedEarly = false;
+        
+        final WorkerResources resources = _assignment.get_resources();
+        final int memOnheap = getMemOnHeap(resources);
+        final String stormRoot = ConfigUtils.supervisorStormDistRoot(_conf, _topologyId);
+        final String jlp = javaLibraryPath(stormRoot, _conf);
+        
+        List<String> commandList = mkLaunchCommand(memOnheap, stormRoot, jlp);
+
+        Map<String, String> topEnvironment = new HashMap<String, String>();
+        @SuppressWarnings("unchecked")
+        Map<String, String> environment = (Map<String, String>) _topoConf.get(Config.TOPOLOGY_ENVIRONMENT);
+        if (environment != null) {
+            topEnvironment.putAll(environment);
+        }
+        topEnvironment.put("LD_LIBRARY_PATH", jlp);
+
+        if (_resourceIsolationManager != null) {
+            int memoffheap = (int) Math.ceil(resources.get_mem_off_heap());
+            int cpu = (int) Math.ceil(resources.get_cpu());
+            
+            int cGroupMem = (int) (Math.ceil((double) _conf.get(Config.STORM_CGROUP_MEMORY_LIMIT_TOLERANCE_MARGIN_MB)));
+            int memoryValue = memoffheap + memOnheap + cGroupMem;
+            int cpuValue = cpu;
+            Map<String, Number> map = new HashMap<>();
+            map.put("cpu", cpuValue);
+            map.put("memory", memoryValue);
+            _resourceIsolationManager.reserveResourcesForWorker(_workerId, map);
+        }
+
+        LOG.info("Launching worker with command: {}. ", Utils.shellCmd(commandList));
+
+        String workerDir = ConfigUtils.workerRoot(_conf, _workerId);
+
+        launchWorkerProcess(commandList, topEnvironment, logPrefix, processExitCallback, new File(workerDir));
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/daemon/supervisor/BasicContainerLauncher.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/BasicContainerLauncher.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/BasicContainerLauncher.java
new file mode 100644
index 0000000..4915650
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/BasicContainerLauncher.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.storm.container.ResourceIsolationInterface;
+import org.apache.storm.daemon.supervisor.Container.ContainerType;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.utils.LocalState;
+
+/**
+ * Launch containers with no security using standard java commands
+ */
+public class BasicContainerLauncher extends ContainerLauncher {
+    private final Map<String, Object> _conf;
+    private final String _supervisorId;
+    protected final ResourceIsolationInterface _resourceIsolationManager;
+    
+    public BasicContainerLauncher(Map<String, Object> conf, String supervisorId, ResourceIsolationInterface resourceIsolationManager) throws IOException {
+        _conf = conf;
+        _supervisorId = supervisorId;
+        _resourceIsolationManager = resourceIsolationManager;
+    }
+
+    @Override
+    public Container launchContainer(int port, LocalAssignment assignment, LocalState state) throws IOException {
+        Container container = new BasicContainer(ContainerType.LAUNCH, _conf, _supervisorId, port, assignment,
+                _resourceIsolationManager, state, null);
+        container.setup();
+        container.launch();
+        return container;
+    }
+
+    @Override
+    public Container recoverContainer(int port, LocalAssignment assignment, LocalState state) throws IOException {
+        return new BasicContainer(ContainerType.RECOVER_FULL, _conf, _supervisorId, port, assignment,
+                _resourceIsolationManager, state, null);
+    }
+
+    @Override
+    public Killable recoverContainer(String workerId, LocalState localState) throws IOException {
+        return new BasicContainer(ContainerType.RECOVER_PARTIAL, _conf, _supervisorId, -1, null,
+                    _resourceIsolationManager, localState, workerId);
+    }
+}