You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/04/01 16:07:32 UTC

[09/35] storm git commit: xxxx

xxxx


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/465a4b89
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/465a4b89
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/465a4b89

Branch: refs/heads/master
Commit: 465a4b89521a4ac15b81969009133bdfa12d0655
Parents: 42bacde
Author: xiaojian.fxj <xi...@alibaba-inc.com>
Authored: Thu Mar 10 20:12:18 2016 +0800
Committer: xiaojian.fxj <xi...@alibaba-inc.com>
Committed: Thu Mar 10 20:12:18 2016 +0800

----------------------------------------------------------------------
 .../org/apache/storm/command/kill_workers.clj   |   5 +-
 .../apache/storm/daemon/local_supervisor.clj    |   5 +-
 .../storm/daemon/supervisor/ShutdownWork.java   |   7 +-
 .../daemon/supervisor/StandaloneSupervisor.java |   2 -
 .../apache/storm/daemon/supervisor/State.java   |   2 +-
 .../storm/daemon/supervisor/Supervisor.java     |   9 +-
 .../storm/daemon/supervisor/SupervisorData.java | 112 ++++---------------
 .../daemon/supervisor/SupervisorManger.java     |   5 +-
 .../daemon/supervisor/SupervisorUtils.java      | 101 +++++++++++++++--
 .../daemon/supervisor/SyncProcessEvent.java     |  33 +++---
 .../daemon/supervisor/SyncSupervisorEvent.java  |  17 ++-
 .../supervisor/timer/RunProfilerActions.java    |   2 +-
 .../supervisor/timer/SupervisorHealthCheck.java |   4 +-
 .../supervisor/timer/SupervisorHeartbeat.java   |  14 +--
 .../daemon/supervisor/timer/UpdateBlobs.java    |   5 +-
 15 files changed, 168 insertions(+), 155 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/465a4b89/storm-core/src/clj/org/apache/storm/command/kill_workers.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/command/kill_workers.clj b/storm-core/src/clj/org/apache/storm/command/kill_workers.clj
index a7de176..4ddc993 100644
--- a/storm-core/src/clj/org/apache/storm/command/kill_workers.clj
+++ b/storm-core/src/clj/org/apache/storm/command/kill_workers.clj
@@ -28,7 +28,6 @@
         conf (assoc conf STORM-LOCAL-DIR (. (File. (conf STORM-LOCAL-DIR)) getCanonicalPath))
         isupervisor (StandaloneSupervisor.)
         supervisor-data (SupervisorData. conf nil isupervisor)
-        ids (SupervisorUtils/myWorkerIds conf)
-        shut-workers (ShutdownWork.)]
+        ids (SupervisorUtils/supervisorWorkerIds conf)]
     (doseq [id ids]
-      (.shutWorker shut-workers supervisor-data id))))
+      (SupervisorUtils/shutWorker supervisor-data id))))

http://git-wip-us.apache.org/repos/asf/storm/blob/465a4b89/storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj b/storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj
index 3dfed6f..70c280a 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj
@@ -14,7 +14,7 @@
 ;; See the License for the specific language governing permissions and
 ;; limitations under the License.
 (ns org.apache.storm.daemon.local-supervisor
-  (:import [org.apache.storm.daemon.supervisor SyncProcessEvent SupervisorData ShutdownWork Supervisor]
+  (:import [org.apache.storm.daemon.supervisor SyncProcessEvent SupervisorData ShutdownWork Supervisor SupervisorUtils]
            [org.apache.storm.utils Utils ConfigUtils]
            [org.apache.storm ProcessSimulator])
   (:use [org.apache.storm.daemon common]
@@ -38,9 +38,8 @@
     ))
 
 (defn shutdown-local-worker [supervisorData workerId]
-  (let [shut-workers (ShutdownWork.)]
     (log-message "shutdown-local-worker")
-    (.shutWorker shut-workers supervisorData workerId)))
+    (SupervisorUtils/shutWorker supervisorData workerId))
 
 (defn local-process []
   "Create a local process event"

http://git-wip-us.apache.org/repos/asf/storm/blob/465a4b89/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ShutdownWork.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ShutdownWork.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ShutdownWork.java
index 5018ce1..ec69980 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ShutdownWork.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ShutdownWork.java
@@ -26,7 +26,6 @@ import org.apache.storm.utils.Time;
 import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import java.io.File;
 import java.io.IOException;
 import java.util.*;
@@ -42,7 +41,7 @@ public  class ShutdownWork implements Shutdownable {
         Integer shutdownSleepSecs = Utils.getInt(conf.get(Config.SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS));
         Boolean asUser = Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false);
         String user = ConfigUtils.getWorkerUser(conf, workerId);
-        String threadPid = supervisorData.getWorkerThreadPidsAtom().get(workerId);
+        String threadPid = supervisorData.getWorkerThreadPids().get(workerId);
         if (StringUtils.isNotBlank(threadPid)) {
             ProcessSimulator.killProcess(threadPid);
         }
@@ -53,7 +52,7 @@ public  class ShutdownWork implements Shutdownable {
                 commands.add("signal");
                 commands.add(pid);
                 commands.add("15");
-                String logPrefix = "kill - 15 " + pid;
+                String logPrefix = "kill -15 " + pid;
                 SupervisorUtils.workerLauncherAndWait(conf, user, commands, null, logPrefix);
             } else {
                 Utils.killProcessWithSigTerm(pid);
@@ -71,7 +70,7 @@ public  class ShutdownWork implements Shutdownable {
                 commands.add("signal");
                 commands.add(pid);
                 commands.add("9");
-                String logPrefix = "kill - 9 " + pid;
+                String logPrefix = "kill -9 " + pid;
                 SupervisorUtils.workerLauncherAndWait(conf, user, commands, null, logPrefix);
             } else {
                 Utils.forceKillProcess(pid);

http://git-wip-us.apache.org/repos/asf/storm/blob/465a4b89/storm-core/src/jvm/org/apache/storm/daemon/supervisor/StandaloneSupervisor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/StandaloneSupervisor.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/StandaloneSupervisor.java
index c13df8b..d4ce623 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/StandaloneSupervisor.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/StandaloneSupervisor.java
@@ -28,9 +28,7 @@ import java.util.Map;
 import java.util.UUID;
 
 public class StandaloneSupervisor implements ISupervisor {
-
     private String supervisorId;
-
     private Map conf;
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/465a4b89/storm-core/src/jvm/org/apache/storm/daemon/supervisor/State.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/State.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/State.java
index 1913c91..28dffd7 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/State.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/State.java
@@ -18,5 +18,5 @@
 package org.apache.storm.daemon.supervisor;
 
 public enum State {
-    valid, disallowed, notStarted, timedOut;
+    VALID, DISALLOWED, NOT_STARTED, TIMED_OUT;
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/465a4b89/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java
index 2c7810d..847b38d 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java
@@ -140,7 +140,7 @@ public class Supervisor {
     /**
      * start distribute supervisor
      */
-    private void launch() {
+    private void launch(ISupervisor iSupervisor) {
         LOG.info("Starting supervisor for storm version '{}'.", VersionInfo.getVersion());
         SupervisorManger supervisorManager;
         try {
@@ -148,11 +148,10 @@ public class Supervisor {
             if (ConfigUtils.isLocalMode(conf)) {
                 throw new IllegalArgumentException("Cannot start server in local mode!");
             }
-            ISupervisor iSupervisor = new StandaloneSupervisor();
             supervisorManager = mkSupervisor(conf, null, iSupervisor);
             if (supervisorManager != null)
                 Utils.addShutdownHookWithForceKillIn1Sec(supervisorManager);
-            registerWorkerNumGauge("drpc:num-execute-http-requests", conf);
+            registerWorkerNumGauge("supervisor:num-slots-used-gauge", conf);
             startMetricsReporters(conf);
         } catch (Exception e) {
             LOG.error("Failed to start supervisor\n", e);
@@ -167,7 +166,7 @@ public class Supervisor {
         metricRegistry.register(name, new Gauge<Integer>() {
             @Override
             public Integer getValue() {
-                Collection<String> pids = SupervisorUtils.myWorkerIds(conf);
+                Collection<String> pids = SupervisorUtils.supervisorWorkerIds(conf);
                 return pids.size();
             }
         });
@@ -191,6 +190,6 @@ public class Supervisor {
     public static void main(String[] args) {
         Utils.setupDefaultUncaughtExceptionHandler();
         Supervisor instance = new Supervisor();
-        instance.launch();
+        instance.launch(new StandaloneSupervisor());
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/465a4b89/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorData.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorData.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorData.java
index 039fe30..be39b4e 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorData.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorData.java
@@ -42,23 +42,25 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
 public class SupervisorData {
 
     private static final Logger LOG = LoggerFactory.getLogger(SupervisorData.class);
 
-    private Map conf;
-    private IContext sharedContext;
+    private final Map conf;
+    private final IContext sharedContext;
     private volatile boolean active;
     private ISupervisor iSupervisor;
     private Utils.UptimeComputer upTime;
     private String stormVersion;
 
-    private ConcurrentHashMap<String, String> workerThreadPidsAtom; // for local mode
+    private ConcurrentHashMap<String, String> workerThreadPids; // for local mode
 
     private IStormClusterState stormClusterState;
 
@@ -71,7 +73,7 @@ public class SupervisorData {
     private String hostName;
 
     // used for reporting used ports when heartbeating
-    private ConcurrentHashMap<Long, LocalAssignment> currAssignment;
+    private AtomicReference<Map<Long, LocalAssignment>> currAssignment;
 
     private StormTimer heartbeatTimer;
 
@@ -81,13 +83,13 @@ public class SupervisorData {
 
     private Localizer localizer;
 
-    private ConcurrentHashMap<String, Map<String, Object>> assignmentVersions;
+    private AtomicReference<Map<String, Map<String, Object>>> assignmentVersions;
 
     private AtomicInteger syncRetry;
 
     private final Object downloadLock = new Object();
 
-    private ConcurrentHashMap<String, List<ProfileRequest>> stormIdToProfileActions;
+    private AtomicReference<Map<String, List<ProfileRequest>>> stormIdToProfileActions;
 
     private CgroupManager resourceIsolationManager;
 
@@ -100,7 +102,7 @@ public class SupervisorData {
         this.active = true;
         this.upTime = Utils.makeUptimeComputer();
         this.stormVersion = VersionInfo.getVersion();
-        this.workerThreadPidsAtom = new ConcurrentHashMap<String, String>();
+        this.workerThreadPids = new ConcurrentHashMap<String, String>();
         this.deadWorkers = new ConcurrentHashSet();
 
         List<ACL> acls = null;
@@ -130,7 +132,7 @@ public class SupervisorData {
             throw Utils.wrapInRuntime(e);
         }
 
-        this.currAssignment = new ConcurrentHashMap<>();
+        this.currAssignment = new AtomicReference<Map<Long, LocalAssignment>>(new HashMap<Long,LocalAssignment>());
 
         this.heartbeatTimer = new StormTimer(null, new DefaultUncaughtExceptionHandler());
 
@@ -138,9 +140,9 @@ public class SupervisorData {
 
         this.blobUpdateTimer = new StormTimer("blob-update-timer", new DefaultUncaughtExceptionHandler());
 
-        this.assignmentVersions = new ConcurrentHashMap<>();
+        this.assignmentVersions = new AtomicReference<Map<String, Map<String, Object>>>(new HashMap<String, Map<String, Object>>());
         this.syncRetry = new AtomicInteger(0);
-        this.stormIdToProfileActions = new ConcurrentHashMap<>();
+        this.stormIdToProfileActions = new AtomicReference<Map<String, List<ProfileRequest>>>(new HashMap<String, List<ProfileRequest>>());
         if (Utils.getBoolean(conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN_ENABLE), false)) {
             try {
                 this.resourceIsolationManager = (CgroupManager) Utils.newInstance((String) conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN));
@@ -154,31 +156,22 @@ public class SupervisorData {
         }
     }
 
-    public ConcurrentHashMap<String, List<ProfileRequest>> getStormIdToProfileActions() {
+    public AtomicReference<Map<String, List<ProfileRequest>>> getStormIdToProfileActions() {
         return stormIdToProfileActions;
     }
 
     public void setStormIdToProfileActions(Map<String, List<ProfileRequest>> stormIdToProfileActions) {
-        this.stormIdToProfileActions.clear();
-        this.stormIdToProfileActions.putAll(stormIdToProfileActions);
+        this.stormIdToProfileActions.set(stormIdToProfileActions);
     }
 
     public Map getConf() {
         return conf;
     }
 
-    public void setConf(Map conf) {
-        this.conf = conf;
-    }
-
     public IContext getSharedContext() {
         return sharedContext;
     }
 
-    public void setSharedContext(IContext sharedContext) {
-        this.sharedContext = sharedContext;
-    }
-
     public boolean isActive() {
         return active;
     }
@@ -191,107 +184,58 @@ public class SupervisorData {
         return iSupervisor;
     }
 
-    public void setiSupervisor(ISupervisor iSupervisor) {
-        this.iSupervisor = iSupervisor;
-    }
-
     public Utils.UptimeComputer getUpTime() {
         return upTime;
     }
 
-    public void setUpTime(Utils.UptimeComputer upTime) {
-        this.upTime = upTime;
-    }
-
     public String getStormVersion() {
         return stormVersion;
     }
 
-    public void setStormVersion(String stormVersion) {
-        this.stormVersion = stormVersion;
-    }
-
-    public ConcurrentHashMap<String, String> getWorkerThreadPidsAtom() {
-        return workerThreadPidsAtom;
-    }
-
-    public void setWorkerThreadPidsAtom(ConcurrentHashMap<String, String> workerThreadPidsAtom) {
-        this.workerThreadPidsAtom = workerThreadPidsAtom;
+    public ConcurrentHashMap<String, String> getWorkerThreadPids() {
+        return workerThreadPids;
     }
 
     public IStormClusterState getStormClusterState() {
         return stormClusterState;
     }
 
-    public void setStormClusterState(IStormClusterState stormClusterState) {
-        this.stormClusterState = stormClusterState;
-    }
-
     public LocalState getLocalState() {
         return localState;
     }
 
-    public void setLocalState(LocalState localState) {
-        this.localState = localState;
-    }
-
     public String getSupervisorId() {
         return supervisorId;
     }
 
-    public void setSupervisorId(String supervisorId) {
-        this.supervisorId = supervisorId;
-    }
-
     public String getAssignmentId() {
         return assignmentId;
     }
 
-    public void setAssignmentId(String assignmentId) {
-        this.assignmentId = assignmentId;
-    }
-
     public String getHostName() {
         return hostName;
     }
 
-    public void setHostName(String hostName) {
-        this.hostName = hostName;
-    }
-
-    public ConcurrentHashMap<Long, LocalAssignment> getCurrAssignment() {
+    public AtomicReference<Map<Long, LocalAssignment>> getCurrAssignment() {
         return currAssignment;
     }
 
     public void setCurrAssignment(Map<Long, LocalAssignment> currAssignment) {
-        this.currAssignment.clear();
-        this.currAssignment.putAll(currAssignment);
+        this.currAssignment.set(currAssignment);
     }
 
     public StormTimer getHeartbeatTimer() {
         return heartbeatTimer;
     }
 
-    public void setHeartbeatTimer(StormTimer heartbeatTimer) {
-        this.heartbeatTimer = heartbeatTimer;
-    }
-
     public StormTimer getEventTimer() {
         return eventTimer;
     }
 
-    public void setEventTimer(StormTimer eventTimer) {
-        this.eventTimer = eventTimer;
-    }
-
     public StormTimer getBlobUpdateTimer() {
         return blobUpdateTimer;
     }
 
-    public void setBlobUpdateTimer(StormTimer blobUpdateTimer) {
-        this.blobUpdateTimer = blobUpdateTimer;
-    }
-
     public Localizer getLocalizer() {
         return localizer;
     }
@@ -304,36 +248,20 @@ public class SupervisorData {
         return syncRetry;
     }
 
-    public void setSyncRetry(AtomicInteger syncRetry) {
-        this.syncRetry = syncRetry;
-    }
-
-    public ConcurrentHashMap<String, Map<String, Object>> getAssignmentVersions() {
+    public AtomicReference<Map<String, Map<String, Object>>> getAssignmentVersions() {
         return assignmentVersions;
     }
 
     public void setAssignmentVersions(Map<String, Map<String, Object>> assignmentVersions) {
-        this.assignmentVersions.clear();
-        this.assignmentVersions.putAll(assignmentVersions);
+        this.assignmentVersions.set(assignmentVersions);
     }
 
     public CgroupManager getResourceIsolationManager() {
         return resourceIsolationManager;
     }
 
-    public void setResourceIsolationManager(CgroupManager resourceIsolationManager) {
-        this.resourceIsolationManager = resourceIsolationManager;
-    }
-
-    public Object getDownloadLock() {
-        return downloadLock;
-    }
-
     public ConcurrentHashSet getDeadWorkers() {
         return deadWorkers;
     }
 
-    public void setDeadWorkers(ConcurrentHashSet deadWorkers) {
-        this.deadWorkers = deadWorkers;
-    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/465a4b89/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorManger.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorManger.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorManger.java
index acc2cb8..6578529 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorManger.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorManger.java
@@ -25,7 +25,7 @@ import org.slf4j.LoggerFactory;
 import java.util.Collection;
 import java.util.Map;
 
-public class SupervisorManger extends ShutdownWork implements SupervisorDaemon, DaemonCommon, Runnable {
+public class SupervisorManger implements SupervisorDaemon, DaemonCommon, Runnable {
 
     private static final Logger LOG = LoggerFactory.getLogger(SupervisorManger.class);
 
@@ -41,7 +41,6 @@ public class SupervisorManger extends ShutdownWork implements SupervisorDaemon,
         this.processesEventManager = processesEventManager;
     }
 
-    @Override
     public void shutdown() {
         LOG.info("Shutting down supervisor{}", supervisorData.getSupervisorId());
         supervisorData.setActive(false);
@@ -63,7 +62,7 @@ public class SupervisorManger extends ShutdownWork implements SupervisorDaemon,
         Collection<String> workerIds = SupervisorUtils.supervisorWorkerIds(supervisorData.getConf());
         try {
             for (String workerId : workerIds) {
-                shutWorker(supervisorData, workerId);
+                SupervisorUtils.shutWorker(supervisorData, workerId);
             }
         } catch (Exception e) {
             LOG.error("shutWorker failed");

http://git-wip-us.apache.org/repos/asf/storm/blob/465a4b89/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java
index 9d0b343..dd2a538 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java
@@ -20,11 +20,13 @@ package org.apache.storm.daemon.supervisor;
 import org.apache.commons.lang.StringUtils;
 import org.apache.curator.utils.PathUtils;
 import org.apache.storm.Config;
+import org.apache.storm.ProcessSimulator;
 import org.apache.storm.generated.LSWorkerHeartbeat;
 import org.apache.storm.localizer.LocalResource;
 import org.apache.storm.localizer.Localizer;
 import org.apache.storm.utils.ConfigUtils;
 import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Time;
 import org.apache.storm.utils.Utils;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.data.ACL;
@@ -68,6 +70,7 @@ public class SupervisorUtils {
         commands.add(wl);
         commands.add(user);
         commands.addAll(args);
+        LOG.info("Running as user: {} command: {}", user, commands);
         return Utils.launchProcess(commands, environment, logPreFix, exitCodeCallback, dir);
     }
 
@@ -115,7 +118,7 @@ public class SupervisorUtils {
      * @param blobInfo
      * @return
      */
-    public static Boolean isShouldUncompressBlob(Map<String, Object> blobInfo) {
+    public static Boolean shouldUncompressBlob(Map<String, Object> blobInfo) {
         return new Boolean((String) blobInfo.get("uncompress"));
     }
 
@@ -129,7 +132,7 @@ public class SupervisorUtils {
         List<LocalResource> localResourceList = new ArrayList<>();
         if (blobstoreMap != null) {
             for (Map.Entry<String, Map<String, Object>> map : blobstoreMap.entrySet()) {
-                LocalResource localResource = new LocalResource(map.getKey(), isShouldUncompressBlob(map.getValue()));
+                LocalResource localResource = new LocalResource(map.getKey(), shouldUncompressBlob(map.getValue()));
                 localResourceList.add(localResource);
             }
         }
@@ -169,7 +172,7 @@ public class SupervisorUtils {
         return Utils.readDirContents(workerRoot);
     }
 
-    public static boolean checkTopoFilesExist(Map conf, String stormId) throws IOException {
+    public static boolean doRequiredTopoFilesExist(Map conf, String stormId) throws IOException {
         String stormroot = ConfigUtils.supervisorStormDistRoot(conf, stormId);
         String stormjarpath = ConfigUtils.supervisorStormJarPath(stormroot);
         String stormcodepath = ConfigUtils.supervisorStormCodePath(stormroot);
@@ -185,10 +188,6 @@ public class SupervisorUtils {
         return false;
     }
 
-    public static Collection<String> myWorkerIds(Map conf){
-        return  Utils.readDirContents(ConfigUtils.workerRoot(conf));
-    }
-
     /**
      * Returns map from worr id to heartbeat
      *
@@ -263,11 +262,95 @@ public class SupervisorUtils {
         return ret;
     }
     
-    public static List<ACL> supervisorZkAcls() {
-        List<ACL> acls = new ArrayList<>();
+    public final static List<ACL> supervisorZkAcls() {
+        final List<ACL> acls = new ArrayList<>();
         acls.add(ZooDefs.Ids.CREATOR_ALL_ACL.get(0));
         acls.add(new ACL((ZooDefs.Perms.READ ^ ZooDefs.Perms.CREATE), ZooDefs.Ids.ANYONE_ID_UNSAFE));
         return acls;
     }
 
+    public static void shutWorker(SupervisorData supervisorData, String workerId) throws IOException, InterruptedException {
+        LOG.info("Shutting down {}:{}", supervisorData.getSupervisorId(), workerId);
+        Map conf = supervisorData.getConf();
+        Collection<String> pids = Utils.readDirContents(ConfigUtils.workerPidsRoot(conf, workerId));
+        Integer shutdownSleepSecs = Utils.getInt(conf.get(Config.SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS));
+        Boolean asUser = Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false);
+        String user = ConfigUtils.getWorkerUser(conf, workerId);
+        String threadPid = supervisorData.getWorkerThreadPids().get(workerId);
+        if (StringUtils.isNotBlank(threadPid)) {
+            ProcessSimulator.killProcess(threadPid);
+        }
+
+        for (String pid : pids) {
+            if (asUser) {
+                List<String> commands = new ArrayList<>();
+                commands.add("signal");
+                commands.add(pid);
+                commands.add("15");
+                String logPrefix = "kill -15 " + pid;
+                SupervisorUtils.workerLauncherAndWait(conf, user, commands, null, logPrefix);
+            } else {
+                Utils.killProcessWithSigTerm(pid);
+            }
+        }
+
+        if (pids.size() > 0) {
+            LOG.info("Sleep {} seconds for execution of cleanup threads on worker.", shutdownSleepSecs);
+            Time.sleepSecs(shutdownSleepSecs);
+        }
+
+        for (String pid : pids) {
+            if (asUser) {
+                List<String> commands = new ArrayList<>();
+                commands.add("signal");
+                commands.add(pid);
+                commands.add("9");
+                String logPrefix = "kill -9 " + pid;
+                SupervisorUtils.workerLauncherAndWait(conf, user, commands, null, logPrefix);
+            } else {
+                Utils.forceKillProcess(pid);
+            }
+            String path = ConfigUtils.workerPidPath(conf, workerId, pid);
+            if (asUser) {
+                SupervisorUtils.rmrAsUser(conf, workerId, path);
+            } else {
+                try {
+                    LOG.debug("Removing path {}", path);
+                    new File(path).delete();
+                } catch (Exception e) {
+                    // on windows, the supervisor may still holds the lock on the worker directory
+                    // ignore
+                }
+            }
+        }
+        tryCleanupWorker(conf, supervisorData, workerId);
+        LOG.info("Shut down {}:{}", supervisorData.getSupervisorId(), workerId);
+
+    }
+
+    public static void tryCleanupWorker(Map conf, SupervisorData supervisorData, String workerId) {
+        try {
+            String workerRoot = ConfigUtils.workerRoot(conf, workerId);
+            if (Utils.checkFileExists(workerRoot)) {
+                if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
+                    SupervisorUtils.rmrAsUser(conf, workerId, workerRoot);
+                } else {
+                    Utils.forceDelete(ConfigUtils.workerHeartbeatsRoot(conf, workerId));
+                    Utils.forceDelete(ConfigUtils.workerPidsRoot(conf, workerId));
+                    Utils.forceDelete(ConfigUtils.workerTmpRoot(conf, workerId));
+                    Utils.forceDelete(ConfigUtils.workerRoot(conf, workerId));
+                }
+                ConfigUtils.removeWorkerUserWSE(conf, workerId);
+                supervisorData.getDeadWorkers().remove(workerId);
+            }
+            if (Utils.getBoolean(conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN_ENABLE), false)){
+                supervisorData.getResourceIsolationManager().releaseResourcesForWorker(workerId);
+            }
+        } catch (IOException e) {
+            LOG.warn("Failed to cleanup worker {}. Will retry later", workerId, e);
+        } catch (RuntimeException e) {
+            LOG.warn("Failed to cleanup worker {}. Will retry later", workerId, e);
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/465a4b89/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java
index 172d223..cf26896 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java
@@ -45,7 +45,7 @@ import java.util.*;
  * ids, write new "approved workers" to LS 5. create local dir for worker id 5. launch new workers (give worker-id, port, and supervisor-id) 6. wait for workers
  * launch
  */
-public class SyncProcessEvent extends ShutdownWork implements Runnable {
+public class SyncProcessEvent implements Runnable {
 
     private static Logger LOG = LoggerFactory.getLogger(SyncProcessEvent.class);
 
@@ -53,6 +53,8 @@ public class SyncProcessEvent extends ShutdownWork implements Runnable {
 
     private SupervisorData supervisorData;
 
+    public static final ExecutorInfo SYSTEM_EXECUTOR_INFO = new ExecutorInfo(-1, -1);
+
     private class ProcessExitCallback implements Utils.ExitCodeCallable {
         private final String logPrefix;
         private final String workerId;
@@ -113,7 +115,7 @@ public class SyncProcessEvent extends ShutdownWork implements Runnable {
             Set<Integer> keepPorts = new HashSet<>();
             for (Map.Entry<String, StateHeartbeat> entry : localWorkerStats.entrySet()) {
                 StateHeartbeat stateHeartbeat = entry.getValue();
-                if (stateHeartbeat.getState() == State.valid) {
+                if (stateHeartbeat.getState() == State.VALID) {
                     keeperWorkerIds.add(entry.getKey());
                     keepPorts.add(stateHeartbeat.getHeartbeat().get_port());
                 }
@@ -129,7 +131,7 @@ public class SyncProcessEvent extends ShutdownWork implements Runnable {
 
             for (Map.Entry<String, StateHeartbeat> entry : localWorkerStats.entrySet()) {
                 StateHeartbeat stateHeartbeat = entry.getValue();
-                if (stateHeartbeat.getState() != State.valid) {
+                if (stateHeartbeat.getState() != State.VALID) {
                     LOG.info("Shutting down and clearing state for id {}, Current supervisor time: {}, State: {}, Heartbeat: {}", entry.getKey(), now,
                             stateHeartbeat.getState(), stateHeartbeat.getHeartbeat());
                     shutWorker(supervisorData, entry.getKey());
@@ -180,9 +182,7 @@ public class SyncProcessEvent extends ShutdownWork implements Runnable {
         }
         return reassignExecutors;
     }
-
-
-
+    
     /**
      * Returns map from worker id to worker heartbeat. if the heartbeat is nil, then the worker is dead
      * 
@@ -205,16 +205,16 @@ public class SyncProcessEvent extends ShutdownWork implements Runnable {
             LSWorkerHeartbeat whb = entry.getValue();
             State state;
             if (whb == null) {
-                state = State.notStarted;
+                state = State.NOT_STARTED;
             } else if (!approvedIds.contains(workerId) || !matchesAssignment(whb, assignedExecutors)) {
-                state = State.disallowed;
+                state = State.DISALLOWED;
             } else if (supervisorData.getDeadWorkers().contains(workerId)) {
-                LOG.info("Worker Process {}as died", workerId);
-                state = State.timedOut;
+                LOG.info("Worker Process {} has died", workerId);
+                state = State.TIMED_OUT;
             } else if (SupervisorUtils.isWorkerHbTimedOut(now, whb, conf)) {
-                state = State.timedOut;
+                state = State.TIMED_OUT;
             } else {
-                state = State.valid;
+                state = State.VALID;
             }
             LOG.debug("Worker:{} state:{} WorkerHeartbeat:{} at supervisor time-secs {}", workerId, state, whb, now);
             workerIdHbstate.put(workerId, new StateHeartbeat(state, whb));
@@ -230,7 +230,7 @@ public class SyncProcessEvent extends ShutdownWork implements Runnable {
         List<ExecutorInfo> executorInfos = new ArrayList<>();
         executorInfos.addAll(whb.get_executors());
         // remove SYSTEM_EXECUTOR_ID
-        executorInfos.remove(new ExecutorInfo(-1, -1));
+        executorInfos.remove(SYSTEM_EXECUTOR_INFO);
         List<ExecutorInfo> localExecuorInfos = localAssignment.get_executors();
 
         if (localExecuorInfos.size() != executorInfos.size())
@@ -518,7 +518,7 @@ public class SyncProcessEvent extends ShutdownWork implements Runnable {
             WorkerResources resources = assignment.get_resources();
 
             // This condition checks for required files exist before launching the worker
-            if (SupervisorUtils.checkTopoFilesExist(conf, stormId)) {
+            if (SupervisorUtils.doRequiredTopoFilesExist(conf, stormId)) {
                 String pidsPath = ConfigUtils.workerPidsRoot(conf, workerId);
                 String hbPath = ConfigUtils.workerHeartbeatsRoot(conf, workerId);
 
@@ -666,4 +666,9 @@ public class SyncProcessEvent extends ShutdownWork implements Runnable {
             Utils.createSymlink(workerRoot, stormRoot, fileName, fileName);
         }
     }
+
+    //for supervisor-test
+    public void shutWorker(SupervisorData supervisorData, String workerId) throws IOException, InterruptedException{
+        SupervisorUtils.shutWorker(supervisorData, workerId);
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/465a4b89/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java
index 29aad12..e96395f 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java
@@ -75,7 +75,7 @@ public class SyncSupervisorEvent implements Runnable {
             Runnable syncCallback = new EventManagerPushCallback(this, syncSupEventManager);
             List<String> stormIds = stormClusterState.assignments(syncCallback);
             Map<String, Map<String, Object>> assignmentsSnapshot =
-                    getAssignmentsSnapshot(stormClusterState, stormIds, supervisorData.getAssignmentVersions(), syncCallback);
+                    getAssignmentsSnapshot(stormClusterState, stormIds, supervisorData.getAssignmentVersions().get(), syncCallback);
             Map<String, List<ProfileRequest>> stormIdToProfilerActions = getProfileActions(stormClusterState, stormIds);
 
             Set<String> allDownloadedTopologyIds = SupervisorUtils.readDownLoadedStormIds(conf);
@@ -191,7 +191,7 @@ public class SyncSupervisorEvent implements Runnable {
         for (Map.Entry<String, StateHeartbeat> entry : workerIdHbstate.entrySet()) {
             String workerId = entry.getKey();
             StateHeartbeat stateHeartbeat = entry.getValue();
-            if (stateHeartbeat != null && stateHeartbeat.getState() == State.valid) {
+            if (stateHeartbeat != null && stateHeartbeat.getState() == State.VALID) {
                 vaildPortToWorkerIds.put(stateHeartbeat.getHeartbeat().get_port(), workerId);
             }
         }
@@ -277,7 +277,7 @@ public class SyncSupervisorEvent implements Runnable {
             for (Map.Entry<String, Map<String, Object>> entry : blobstoreMap.entrySet()) {
                 String key = entry.getKey();
                 Map<String, Object> blobInfo = entry.getValue();
-                localizer.removeBlobReference(key, user, topoName, SupervisorUtils.isShouldUncompressBlob(blobInfo));
+                localizer.removeBlobReference(key, user, topoName, SupervisorUtils.shouldUncompressBlob(blobInfo));
             }
         }
     }
@@ -312,7 +312,7 @@ public class SyncSupervisorEvent implements Runnable {
         Set<String> srashStormIds = new HashSet<>();
         for (String stormId : allDownloadedTopologyIds) {
             if (assignedStormIds.contains(stormId)) {
-                if (!SupervisorUtils.checkTopoFilesExist(conf, stormId)) {
+                if (!SupervisorUtils.doRequiredTopoFilesExist(conf, stormId)) {
                     LOG.debug("Files not present in topology directory");
                     rmTopoFiles(conf, stormId, localizer, false);
                     srashStormIds.add(stormId);
@@ -357,7 +357,12 @@ public class SyncSupervisorEvent implements Runnable {
             blobStore.shutdown();
         }
 
-        FileUtils.moveDirectory(new File(tmproot), new File(stormroot));
+        try {
+            FileUtils.moveDirectory(new File(tmproot), new File(stormroot));
+        }catch (Exception e){
+            ;
+        }
+
 
         SupervisorUtils.setupStormCodeDir(conf, ConfigUtils.readSupervisorStormConf(conf, stormId), stormroot);
         ClassLoader classloader = Thread.currentThread().getContextClassLoader();
@@ -627,7 +632,7 @@ public class SyncSupervisorEvent implements Runnable {
         for (Map.Entry<String, StateHeartbeat> entry : workerIdHbstate.entrySet()){
             String workerId = entry.getKey();
             StateHeartbeat stateHeartbeat = entry.getValue();
-            if (stateHeartbeat.getState() == State.disallowed){
+            if (stateHeartbeat.getState() == State.DISALLOWED){
                 syncProcesses.shutWorker(supervisorData, workerId);
                 LOG.debug("{}'s state disallowed, so shutdown this worker");
             }

http://git-wip-us.apache.org/repos/asf/storm/blob/465a4b89/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java
index 91044cc..d39a679 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java
@@ -84,7 +84,7 @@ public class RunProfilerActions implements Runnable {
 
     @Override
     public void run() {
-        Map<String, List<ProfileRequest>> stormIdToActions = supervisorData.getStormIdToProfileActions();
+        Map<String, List<ProfileRequest>> stormIdToActions = supervisorData.getStormIdToProfileActions().get();
         try {
             for (Map.Entry<String, List<ProfileRequest>> entry : stormIdToActions.entrySet()) {
                 String stormId = entry.getKey();

http://git-wip-us.apache.org/repos/asf/storm/blob/465a4b89/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java
index 36ee6b6..49f48ef 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java
@@ -29,7 +29,7 @@ import org.slf4j.LoggerFactory;
 import java.util.Collection;
 import java.util.Map;
 
-public class SupervisorHealthCheck extends ShutdownWork implements Runnable {
+public class SupervisorHealthCheck implements Runnable {
 
     private static final Logger LOG = LoggerFactory.getLogger(SupervisorHealthCheck.class);
 
@@ -47,7 +47,7 @@ public class SupervisorHealthCheck extends ShutdownWork implements Runnable {
         if (healthCode != 0) {
             for (String workerId : workerIds) {
                 try {
-                    shutWorker(supervisorData, workerId);
+                    SupervisorUtils.shutWorker(supervisorData, workerId);
                 } catch (Exception e) {
                     throw Utils.wrapInRuntime(e);
                 }

http://git-wip-us.apache.org/repos/asf/storm/blob/465a4b89/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
index e158dbc..4137e94 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
@@ -31,12 +31,10 @@ import java.util.Map;
 
 public class SupervisorHeartbeat implements Runnable {
 
-    private IStormClusterState stormClusterState;
-    private String supervisorId;
-    private Map conf;
-    private SupervisorInfo supervisorInfo;
-
-    private SupervisorData supervisorData;
+     private final IStormClusterState stormClusterState;
+     private final String supervisorId;
+     private final Map conf;
+     private final SupervisorData supervisorData;
 
     public SupervisorHeartbeat(Map conf, SupervisorData supervisorData) {
         this.stormClusterState = supervisorData.getStormClusterState();
@@ -46,13 +44,13 @@ public class SupervisorHeartbeat implements Runnable {
     }
 
     private SupervisorInfo update(Map conf, SupervisorData supervisorData) {
-        supervisorInfo = new SupervisorInfo();
+        SupervisorInfo supervisorInfo = new SupervisorInfo();
         supervisorInfo.set_time_secs(Time.currentTimeSecs());
         supervisorInfo.set_hostname(supervisorData.getHostName());
         supervisorInfo.set_assignment_id(supervisorData.getAssignmentId());
 
         List<Long> usedPorts = new ArrayList<>();
-        usedPorts.addAll(supervisorData.getCurrAssignment().keySet());
+        usedPorts.addAll(supervisorData.getCurrAssignment().get().keySet());
         supervisorInfo.set_used_ports(usedPorts);
         List metaDatas = (List)supervisorData.getiSupervisor().getMetadata();
         List<Long> portList = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/storm/blob/465a4b89/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java
index 623afa5..ebb1d5f 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java
@@ -38,6 +38,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * downloads all blobs listed in the topology configuration for all topologies assigned to this supervisor, and creates version files with a suffix. The
@@ -58,9 +59,9 @@ public class UpdateBlobs implements Runnable {
         try {
             Map conf = supervisorData.getConf();
             Set<String> downloadedStormIds = SupervisorUtils.readDownLoadedStormIds(conf);
-            ConcurrentHashMap<Long, LocalAssignment> newAssignment = supervisorData.getCurrAssignment();
+            AtomicReference<Map<Long, LocalAssignment>> newAssignment = supervisorData.getCurrAssignment();
             Set<String> assignedStormIds = new HashSet<>();
-            for (LocalAssignment localAssignment : newAssignment.values()) {
+            for (LocalAssignment localAssignment : newAssignment.get().values()) {
                 assignedStormIds.add(localAssignment.get_topology_id());
             }
             for (String stormId : downloadedStormIds) {