You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/04/01 16:07:31 UTC

[08/35] storm git commit: Merge branch 'master' into supervisor

Merge branch 'master' into supervisor


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/42bacde2
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/42bacde2
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/42bacde2

Branch: refs/heads/master
Commit: 42bacde20ea86867b874395532aa034cfad4f120
Parents: b09b412 96f81d7
Author: xiaojian.fxj <xi...@alibaba-inc.com>
Authored: Sun Mar 6 16:05:14 2016 +0800
Committer: xiaojian.fxj <xi...@alibaba-inc.com>
Committed: Sun Mar 6 16:17:47 2016 +0800

----------------------------------------------------------------------
 CHANGELOG.md                                    |  6 ++
 .../src/clj/org/apache/storm/MockAutoCred.clj   | 58 ---------------
 .../storm/cluster/StormClusterStateImpl.java    |  7 +-
 .../storm/daemon/metrics/MetricsUtils.java      |  2 +-
 .../storm/daemon/supervisor/ShutdownWork.java   |  1 +
 .../daemon/supervisor/SyncProcessEvent.java     |  4 ++
 .../daemon/supervisor/SyncSupervisorEvent.java  | 45 +++++++++++-
 .../jvm/org/apache/storm/drpc/DRPCSpout.java    |  2 +
 .../jvm/org/apache/storm/utils/ConfigUtils.java | 10 +++
 .../test/clj/org/apache/storm/nimbus_test.clj   | 10 +--
 .../clj/org/apache/storm/supervisor_test.clj    | 10 ++-
 .../test/jvm/org/apache/storm/MockAutoCred.java | 75 ++++++++++++++++++++
 12 files changed, 162 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/42bacde2/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ShutdownWork.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/org/apache/storm/daemon/supervisor/ShutdownWork.java
index 19328e5,0000000..5018ce1
mode 100644,000000..100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ShutdownWork.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ShutdownWork.java
@@@ -1,124 -1,0 +1,125 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + * http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.storm.daemon.supervisor;
 +
 +import org.apache.commons.lang.StringUtils;
 +import org.apache.storm.Config;
 +import org.apache.storm.ProcessSimulator;
 +import org.apache.storm.daemon.Shutdownable;
 +import org.apache.storm.utils.ConfigUtils;
 +import org.apache.storm.utils.Time;
 +import org.apache.storm.utils.Utils;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import java.io.File;
 +import java.io.IOException;
 +import java.util.*;
 +
 +public  class ShutdownWork implements Shutdownable {
 +
 +    private static Logger LOG = LoggerFactory.getLogger(ShutdownWork.class);
 +
 +    public void shutWorker(SupervisorData supervisorData, String workerId) throws IOException, InterruptedException {
 +        LOG.info("Shutting down {}:{}", supervisorData.getSupervisorId(), workerId);
 +        Map conf = supervisorData.getConf();
 +        Collection<String> pids = Utils.readDirContents(ConfigUtils.workerPidsRoot(conf, workerId));
 +        Integer shutdownSleepSecs = Utils.getInt(conf.get(Config.SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS));
 +        Boolean asUser = Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false);
 +        String user = ConfigUtils.getWorkerUser(conf, workerId);
 +        String threadPid = supervisorData.getWorkerThreadPidsAtom().get(workerId);
 +        if (StringUtils.isNotBlank(threadPid)) {
 +            ProcessSimulator.killProcess(threadPid);
 +        }
 +
 +        for (String pid : pids) {
 +            if (asUser) {
 +                List<String> commands = new ArrayList<>();
 +                commands.add("signal");
 +                commands.add(pid);
 +                commands.add("15");
 +                String logPrefix = "kill - 15 " + pid;
 +                SupervisorUtils.workerLauncherAndWait(conf, user, commands, null, logPrefix);
 +            } else {
 +                Utils.killProcessWithSigTerm(pid);
 +            }
 +        }
 +
 +        if (pids.size() > 0) {
 +            LOG.info("Sleep {} seconds for execution of cleanup threads on worker.", shutdownSleepSecs);
 +            Time.sleepSecs(shutdownSleepSecs);
 +        }
 +
 +        for (String pid : pids) {
 +            if (asUser) {
 +                List<String> commands = new ArrayList<>();
 +                commands.add("signal");
 +                commands.add(pid);
 +                commands.add("9");
 +                String logPrefix = "kill - 9 " + pid;
 +                SupervisorUtils.workerLauncherAndWait(conf, user, commands, null, logPrefix);
 +            } else {
 +                Utils.forceKillProcess(pid);
 +            }
 +            String path = ConfigUtils.workerPidPath(conf, workerId, pid);
 +            if (asUser) {
 +                SupervisorUtils.rmrAsUser(conf, workerId, path);
 +            } else {
 +                try {
 +                    LOG.debug("Removing path {}", path);
 +                    new File(path).delete();
 +                } catch (Exception e) {
 +                    // on windows, the supervisor may still holds the lock on the worker directory
 +                    // ignore
 +                }
 +            }
 +        }
 +        tryCleanupWorker(conf, supervisorData, workerId);
 +        LOG.info("Shut down {}:{}", supervisorData.getSupervisorId(), workerId);
 +
 +    }
 +
 +    protected void tryCleanupWorker(Map conf, SupervisorData supervisorData, String workerId) {
 +        try {
 +            String workerRoot = ConfigUtils.workerRoot(conf, workerId);
 +            if (Utils.checkFileExists(workerRoot)) {
 +                if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
 +                    SupervisorUtils.rmrAsUser(conf, workerId, workerRoot);
 +                } else {
 +                    Utils.forceDelete(ConfigUtils.workerHeartbeatsRoot(conf, workerId));
 +                    Utils.forceDelete(ConfigUtils.workerPidsRoot(conf, workerId));
++                    Utils.forceDelete(ConfigUtils.workerTmpRoot(conf, workerId));
 +                    Utils.forceDelete(ConfigUtils.workerRoot(conf, workerId));
 +                }
 +                ConfigUtils.removeWorkerUserWSE(conf, workerId);
 +                supervisorData.getDeadWorkers().remove(workerId);
 +            }
 +            if (Utils.getBoolean(conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN_ENABLE), false)){
 +                supervisorData.getResourceIsolationManager().releaseResourcesForWorker(workerId);
 +            }
 +        } catch (IOException e) {
 +            LOG.warn("Failed to cleanup worker {}. Will retry later", workerId, e);
 +        } catch (RuntimeException e) {
 +            LOG.warn("Failed to cleanup worker {}. Will retry later", workerId, e);
 +        }
 +    }
 +
 +    @Override
 +    public void shutdown() {
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/storm/blob/42bacde2/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java
index 4e0b8a0,0000000..172d223
mode 100644,000000..100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java
@@@ -1,665 -1,0 +1,669 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + * http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.storm.daemon.supervisor;
 +
 +import org.apache.commons.io.FileUtils;
 +import org.apache.commons.lang.StringUtils;
 +import org.apache.storm.Config;
 +import org.apache.storm.container.cgroup.CgroupManager;
 +import org.apache.storm.generated.ExecutorInfo;
 +import org.apache.storm.generated.LSWorkerHeartbeat;
 +import org.apache.storm.generated.LocalAssignment;
 +import org.apache.storm.generated.WorkerResources;
 +import org.apache.storm.utils.ConfigUtils;
 +import org.apache.storm.utils.LocalState;
 +import org.apache.storm.utils.Time;
 +import org.apache.storm.utils.Utils;
 +import org.eclipse.jetty.util.ConcurrentHashSet;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +import org.yaml.snakeyaml.Yaml;
 +
 +import java.io.File;
 +import java.io.FileWriter;
 +import java.io.IOException;
 +import java.util.*;
 +
 +/**
 + * 1. to kill are those in allocated that are dead or disallowed 2. kill the ones that should be dead - read pids, kill -9 and individually remove file - rmr
 + * heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log) 3. of the rest, figure out what assignments aren't yet satisfied 4. generate new worker
 + * ids, write new "approved workers" to LS 5. create local dir for worker id 5. launch new workers (give worker-id, port, and supervisor-id) 6. wait for workers
 + * launch
 + */
 +public class SyncProcessEvent extends ShutdownWork implements Runnable {
 +
 +    private static Logger LOG = LoggerFactory.getLogger(SyncProcessEvent.class);
 +
 +    private  LocalState localState;
 +
 +    private SupervisorData supervisorData;
 +
 +    private class ProcessExitCallback implements Utils.ExitCodeCallable {
 +        private final String logPrefix;
 +        private final String workerId;
 +
 +        public ProcessExitCallback(String logPrefix, String workerId) {
 +            this.logPrefix = logPrefix;
 +            this.workerId = workerId;
 +        }
 +
 +        @Override
 +        public Object call() throws Exception {
 +            return null;
 +        }
 +
 +        @Override
 +        public Object call(int exitCode) {
 +            LOG.info("{} exited with code: {}", logPrefix, exitCode);
 +            supervisorData.getDeadWorkers().add(workerId);
 +            return null;
 +        }
 +    }
 +
 +    public SyncProcessEvent(){
 +
 +    }
 +    public SyncProcessEvent(SupervisorData supervisorData) {
 +        init(supervisorData);
 +    }
 +
 +    //TODO: initData is intended to local supervisor, so we will remove them after porting worker.clj to java
 +    public void init(SupervisorData supervisorData){
 +        this.supervisorData = supervisorData;
 +        this.localState = supervisorData.getLocalState();
 +    }
 +
 +
 +    /**
 +     * 1. to kill are those in allocated that are dead or disallowed 2. kill the ones that should be dead - read pids, kill -9 and individually remove file -
 +     * rmr heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log) 3. of the rest, figure out what assignments aren't yet satisfied 4. generate new
 +     * worker ids, write new "approved workers" to LS 5. create local dir for worker id 5. launch new workers (give worker-id, port, and supervisor-id) 6. wait
 +     * for workers launch
 +     */
 +    @Override
 +    public void run() {
 +        LOG.debug("Syncing processes");
 +        try {
 +            Map conf = supervisorData.getConf();
 +            Map<Integer, LocalAssignment> assignedExecutors = localState.getLocalAssignmentsMap();
 +
 +            if (assignedExecutors == null) {
 +                assignedExecutors = new HashMap<>();
 +            }
 +            int now = Time.currentTimeSecs();
 +
 +            Map<String, StateHeartbeat> localWorkerStats = getLocalWorkerStats(supervisorData, assignedExecutors, now);
 +
 +            Set<String> keeperWorkerIds = new HashSet<>();
 +            Set<Integer> keepPorts = new HashSet<>();
 +            for (Map.Entry<String, StateHeartbeat> entry : localWorkerStats.entrySet()) {
 +                StateHeartbeat stateHeartbeat = entry.getValue();
 +                if (stateHeartbeat.getState() == State.valid) {
 +                    keeperWorkerIds.add(entry.getKey());
 +                    keepPorts.add(stateHeartbeat.getHeartbeat().get_port());
 +                }
 +            }
 +            Map<Integer, LocalAssignment> reassignExecutors = getReassignExecutors(assignedExecutors, keepPorts);
 +            Map<Integer, String> newWorkerIds = new HashMap<>();
 +            for (Integer port : reassignExecutors.keySet()) {
 +                newWorkerIds.put(port, Utils.uuid());
 +            }
 +            LOG.debug("Syncing processes");
 +            LOG.debug("Assigned executors: {}", assignedExecutors);
 +            LOG.debug("Allocated: {}", localWorkerStats);
 +
 +            for (Map.Entry<String, StateHeartbeat> entry : localWorkerStats.entrySet()) {
 +                StateHeartbeat stateHeartbeat = entry.getValue();
 +                if (stateHeartbeat.getState() != State.valid) {
 +                    LOG.info("Shutting down and clearing state for id {}, Current supervisor time: {}, State: {}, Heartbeat: {}", entry.getKey(), now,
 +                            stateHeartbeat.getState(), stateHeartbeat.getHeartbeat());
 +                    shutWorker(supervisorData, entry.getKey());
 +                }
 +            }
 +            // start new workers
 +            Map<String, Integer> newWorkerPortToIds = startNewWorkers(newWorkerIds, reassignExecutors);
 +
 +            Map<String, Integer> allWorkerPortToIds = new HashMap<>();
 +            Map<String, Integer> approvedWorkers = localState.getApprovedWorkers();
 +            for (String keeper : keeperWorkerIds) {
 +                allWorkerPortToIds.put(keeper, approvedWorkers.get(keeper));
 +            }
 +            allWorkerPortToIds.putAll(newWorkerPortToIds);
 +            localState.setApprovedWorkers(allWorkerPortToIds);
 +            waitForWorkersLaunch(conf, newWorkerPortToIds.keySet());
 +
 +        } catch (Exception e) {
 +            LOG.error("Failed Sync Process", e);
 +            throw Utils.wrapInRuntime(e);
 +        }
 +
 +    }
 +
 +    protected void waitForWorkersLaunch(Map conf, Set<String> workerIds) throws Exception {
 +        int startTime = Time.currentTimeSecs();
 +        int timeOut = (int) conf.get(Config.NIMBUS_SUPERVISOR_TIMEOUT_SECS);
 +        for (String workerId : workerIds) {
 +            LocalState localState = ConfigUtils.workerState(conf, workerId);
 +            while (true) {
 +                LSWorkerHeartbeat hb = localState.getWorkerHeartBeat();
 +                if (hb != null || (Time.currentTimeSecs() - startTime) > timeOut)
 +                    break;
 +                LOG.info("{} still hasn't started", workerId);
 +                Time.sleep(500);
 +            }
 +            if (localState.getWorkerHeartBeat() == null) {
 +                LOG.info("Worker {} failed to start", workerId);
 +            }
 +        }
 +    }
 +
 +    protected Map<Integer, LocalAssignment> getReassignExecutors(Map<Integer, LocalAssignment> assignExecutors, Set<Integer> keepPorts) {
 +        Map<Integer, LocalAssignment> reassignExecutors = new HashMap<>();
 +        reassignExecutors.putAll(assignExecutors);
 +        for (Integer port : keepPorts) {
 +            reassignExecutors.remove(port);
 +        }
 +        return reassignExecutors;
 +    }
 +
 +
 +
 +    /**
 +     * Returns map from worker id to worker heartbeat. if the heartbeat is nil, then the worker is dead
 +     * 
 +     * @param assignedExecutors
 +     * @return
 +     * @throws Exception
 +     */
 +    public Map<String, StateHeartbeat> getLocalWorkerStats(SupervisorData supervisorData, Map<Integer, LocalAssignment> assignedExecutors, int now) throws Exception {
 +        Map<String, StateHeartbeat> workerIdHbstate = new HashMap<>();
 +        Map conf = supervisorData.getConf();
 +        LocalState localState = supervisorData.getLocalState();
 +        Map<String, LSWorkerHeartbeat> idToHeartbeat = SupervisorUtils.readWorkerHeartbeats(conf);
 +        Map<String, Integer> approvedWorkers = localState.getApprovedWorkers();
 +        Set<String> approvedIds = new HashSet<>();
 +        if (approvedWorkers != null) {
 +            approvedIds.addAll(approvedWorkers.keySet());
 +        }
 +        for (Map.Entry<String, LSWorkerHeartbeat> entry : idToHeartbeat.entrySet()) {
 +            String workerId = entry.getKey();
 +            LSWorkerHeartbeat whb = entry.getValue();
 +            State state;
 +            if (whb == null) {
 +                state = State.notStarted;
 +            } else if (!approvedIds.contains(workerId) || !matchesAssignment(whb, assignedExecutors)) {
 +                state = State.disallowed;
 +            } else if (supervisorData.getDeadWorkers().contains(workerId)) {
 +                LOG.info("Worker Process {}as died", workerId);
 +                state = State.timedOut;
 +            } else if (SupervisorUtils.isWorkerHbTimedOut(now, whb, conf)) {
 +                state = State.timedOut;
 +            } else {
 +                state = State.valid;
 +            }
 +            LOG.debug("Worker:{} state:{} WorkerHeartbeat:{} at supervisor time-secs {}", workerId, state, whb, now);
 +            workerIdHbstate.put(workerId, new StateHeartbeat(state, whb));
 +        }
 +        return workerIdHbstate;
 +    }
 +
 +    protected boolean matchesAssignment(LSWorkerHeartbeat whb, Map<Integer, LocalAssignment> assignedExecutors) {
 +        LocalAssignment localAssignment = assignedExecutors.get(whb.get_port());
 +        if (localAssignment == null || !localAssignment.get_topology_id().equals(whb.get_topology_id())) {
 +            return false;
 +        }
 +        List<ExecutorInfo> executorInfos = new ArrayList<>();
 +        executorInfos.addAll(whb.get_executors());
 +        // remove SYSTEM_EXECUTOR_ID
 +        executorInfos.remove(new ExecutorInfo(-1, -1));
 +        List<ExecutorInfo> localExecuorInfos = localAssignment.get_executors();
 +
 +        if (localExecuorInfos.size() != executorInfos.size())
 +            return false;
 +
 +        for (ExecutorInfo executorInfo : localExecuorInfos){
 +            if (!localExecuorInfos.contains(executorInfo))
 +                return false;
 +        }
 +        return true;
 +    }
 +
 +    /**
 +     * launch a worker in local mode.
 +     */
 +    protected void launchWorker(SupervisorData supervisorData, String stormId, Long port, String workerId, WorkerResources resources) throws IOException {
 +        // port this function after porting worker to java
 +    }
 +
 +    protected String getWorkerClassPath(String stormJar, Map stormConf) {
 +        List<String> topoClasspath = new ArrayList<>();
 +        Object object = stormConf.get(Config.TOPOLOGY_CLASSPATH);
 +
 +        if (object instanceof List) {
 +            topoClasspath.addAll((List<String>) object);
 +        } else if (object instanceof String){
 +            topoClasspath.add((String)object);
 +        }else {
 +            //ignore
 +        }
 +        String classPath = Utils.workerClasspath();
 +        String classAddPath = Utils.addToClasspath(classPath, Arrays.asList(stormJar));
 +        return Utils.addToClasspath(classAddPath, topoClasspath);
 +    }
 +
 +    /**
 +     * "Generates runtime childopts by replacing keys with topology-id, worker-id, port, mem-onheap"
 +     * 
 +     * @param value
 +     * @param workerId
 +     * @param stormId
 +     * @param port
 +     * @param memOnheap
 +     */
 +    public List<String> substituteChildopts(Object value, String workerId, String stormId, Long port, int memOnheap) {
 +        List<String> rets = new ArrayList<>();
 +        if (value instanceof String) {
 +            String string = (String) value;
 +            string = string.replace("%ID%", String.valueOf(port));
 +            string = string.replace("%WORKER-ID%", workerId);
 +            string = string.replace("%TOPOLOGY-ID%", stormId);
 +            string = string.replace("%WORKER-PORT%", String.valueOf(port));
 +            string = string.replace("%HEAP-MEM%", String.valueOf(memOnheap));
 +            String[] strings = string.split("\\s+");
 +            rets.addAll(Arrays.asList(strings));
 +        } else if (value instanceof List) {
 +            List<Object> objects = (List<Object>) value;
 +            for (Object object : objects) {
 +                String str = (String)object;
 +                str = str.replace("%ID%", String.valueOf(port));
 +                str = str.replace("%WORKER-ID%", workerId);
 +                str = str.replace("%TOPOLOGY-ID%", stormId);
 +                str = str.replace("%WORKER-PORT%", String.valueOf(port));
 +                str = str.replace("%HEAP-MEM%", String.valueOf(memOnheap));
 +                rets.add(str);
 +            }
 +        }
 +        return rets;
 +    }
 +
 +
 +
 +    /**
 +     * launch a worker in distributed mode
 +     * supervisorId for testing
 +     * @throws IOException
 +     */
 +    protected void launchWorker(Map conf, String supervisorId, String assignmentId, String stormId, Long port, String workerId,
 +            WorkerResources resources, CgroupManager cgroupManager, ConcurrentHashSet deadWorkers) throws IOException {
 +
 +        Boolean runWorkerAsUser = Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false);
 +        String stormHome = ConfigUtils.concatIfNotNull(System.getProperty("storm.home"));
 +        String stormOptions = ConfigUtils.concatIfNotNull(System.getProperty("storm.options"));
 +        String stormConfFile = ConfigUtils.concatIfNotNull(System.getProperty("storm.conf.file"));
++        String workerTmpDir = ConfigUtils.workerTmpRoot(conf, workerId);
++
 +        String stormLogDir = ConfigUtils.getLogDir();
 +        String stormLogConfDir = (String) (conf.get(Config.STORM_LOG4J2_CONF_DIR));
 +
 +        String stormLog4j2ConfDir;
 +        if (StringUtils.isNotBlank(stormLogConfDir)) {
 +            if (Utils.isAbsolutePath(stormLogConfDir)) {
 +                stormLog4j2ConfDir = stormLogConfDir;
 +            } else {
 +                stormLog4j2ConfDir = stormHome + Utils.FILE_PATH_SEPARATOR + stormLogConfDir;
 +            }
 +        } else {
 +            stormLog4j2ConfDir = stormHome + Utils.FILE_PATH_SEPARATOR + "log4j2";
 +        }
 +
 +        String stormRoot = ConfigUtils.supervisorStormDistRoot(conf, stormId);
 +
 +        String jlp = jlp(stormRoot, conf);
 +
 +        String stormJar = ConfigUtils.supervisorStormJarPath(stormRoot);
 +
 +        Map stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId);
 +
 +        String workerClassPath = getWorkerClassPath(stormJar, stormConf);
 +
 +        Object topGcOptsObject = stormConf.get(Config.TOPOLOGY_WORKER_GC_CHILDOPTS);
 +        List<String> topGcOpts = new ArrayList<>();
 +        if (topGcOptsObject instanceof String) {
 +            topGcOpts.add((String) topGcOptsObject);
 +        } else if (topGcOptsObject instanceof List) {
 +            topGcOpts.addAll((List<String>) topGcOptsObject);
 +        }
 +
 +        int memOnheap = 0;
 +        if (resources.get_mem_on_heap() > 0) {
 +            memOnheap = (int) Math.ceil(resources.get_mem_on_heap());
 +        } else {
 +            //set the default heap memory size for supervisor-test
 +            memOnheap = Utils.getInt(stormConf.get(Config.WORKER_HEAP_MEMORY_MB), 768);
 +        }
 +
 +        int memoffheap = (int) Math.ceil(resources.get_mem_off_heap());
 +
 +        int cpu = (int) Math.ceil(resources.get_cpu());
 +
 +        List<String> gcOpts = null;
 +
 +        if (topGcOpts != null) {
 +            gcOpts = substituteChildopts(topGcOpts, workerId, stormId, port, memOnheap);
 +        } else {
 +            gcOpts = substituteChildopts(conf.get(Config.WORKER_GC_CHILDOPTS), workerId, stormId, port, memOnheap);
 +        }
 +
 +        Object topoWorkerLogwriterObject = stormConf.get(Config.TOPOLOGY_WORKER_LOGWRITER_CHILDOPTS);
 +        List<String> topoWorkerLogwriterChildopts = new ArrayList<>();
 +        if (topoWorkerLogwriterObject instanceof String) {
 +            topoWorkerLogwriterChildopts.add((String) topoWorkerLogwriterObject);
 +        } else if (topoWorkerLogwriterObject instanceof List) {
 +            topoWorkerLogwriterChildopts.addAll((List<String>) topoWorkerLogwriterObject);
 +        }
 +
 +        String user = (String) stormConf.get(Config.TOPOLOGY_SUBMITTER_USER);
 +
 +        String logfileName = "worker.log";
 +
 +        String workersArtifacets = ConfigUtils.workerArtifactsRoot(conf);
 +
 +        String loggingSensitivity = (String) stormConf.get(Config.TOPOLOGY_LOGGING_SENSITIVITY);
 +        if (loggingSensitivity == null) {
 +            loggingSensitivity = "S3";
 +        }
 +
 +        List<String> workerChildopts = substituteChildopts(conf.get(Config.WORKER_CHILDOPTS), workerId, stormId, port, memOnheap);
 +
 +        List<String> topWorkerChildopts = substituteChildopts(stormConf.get(Config.TOPOLOGY_WORKER_CHILDOPTS), workerId, stormId, port, memOnheap);
 +
 +        List<String> workerProfilerChildopts = null;
 +        if (Utils.getBoolean(conf.get(Config.WORKER_PROFILER_ENABLED), false)) {
 +            workerProfilerChildopts = substituteChildopts(conf.get(Config.WORKER_PROFILER_CHILDOPTS), workerId, stormId, port, memOnheap);
 +        }else {
 +            workerProfilerChildopts = new ArrayList<>();
 +        }
 +
 +        Map<String, String> topEnvironment = new HashMap<String, String>();
 +        Map<String, String> environment = (Map<String, String>) stormConf.get(Config.TOPOLOGY_ENVIRONMENT);
 +        if (environment != null) {
 +            topEnvironment.putAll(environment);
 +        }
 +        topEnvironment.put("LD_LIBRARY_PATH", jlp);
 +
 +        String log4jConfigurationFile = null;
 +        if (System.getProperty("os.name").startsWith("Windows") && !stormLog4j2ConfDir.startsWith("file:")) {
 +            log4jConfigurationFile = "file:///" + stormLog4j2ConfDir;
 +        } else {
 +            log4jConfigurationFile = stormLog4j2ConfDir;
 +        }
 +        log4jConfigurationFile = log4jConfigurationFile + Utils.FILE_PATH_SEPARATOR + "worker.xml";
 +
 +        List<String> commandList = new ArrayList<>();
 +        commandList.add(SupervisorUtils.javaCmd("java"));
 +        commandList.add("-cp");
 +        commandList.add(workerClassPath);
 +        commandList.addAll(topoWorkerLogwriterChildopts);
 +        commandList.add("-Dlogfile.name=" + logfileName);
 +        commandList.add("-Dstorm.home=" + stormHome);
 +        commandList.add("-Dworkers.artifacts=" + workersArtifacets);
 +        commandList.add("-Dstorm.id=" + stormId);
 +        commandList.add("-Dworker.id=" + workerId);
 +        commandList.add("-Dworker.port=" + port);
 +        commandList.add("-Dstorm.log.dir=" + stormLogDir);
 +        commandList.add("-Dlog4j.configurationFile=" + log4jConfigurationFile);
 +        commandList.add("-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector");
 +        commandList.add("org.apache.storm.LogWriter");
 +
 +        commandList.add(SupervisorUtils.javaCmd("java"));
 +        commandList.add("-server");
 +        commandList.addAll(workerChildopts);
 +        commandList.addAll(topWorkerChildopts);
 +        commandList.addAll(gcOpts);
 +        commandList.addAll(workerProfilerChildopts);
 +        commandList.add("-Djava.library.path=" + jlp);
 +        commandList.add("-Dlogfile.name=" + logfileName);
 +        commandList.add("-Dstorm.home=" + stormHome);
 +        commandList.add("-Dworkers.artifacts=" + workersArtifacets);
 +        commandList.add("-Dstorm.conf.file=" + stormConfFile);
 +        commandList.add("-Dstorm.options=" + stormOptions);
 +        commandList.add("-Dstorm.log.dir=" + stormLogDir);
++        commandList.add("-Djava.io.tmpdir=" + workerTmpDir);
 +        commandList.add("-Dlogging.sensitivity=" + loggingSensitivity);
 +        commandList.add("-Dlog4j.configurationFile=" + log4jConfigurationFile);
 +        commandList.add("-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector");
 +        commandList.add("-Dstorm.id=" + stormId);
 +        commandList.add("-Dworker.id=" + workerId);
 +        commandList.add("-Dworker.port=" + port);
 +        commandList.add("-cp");
 +        commandList.add(workerClassPath);
 +        commandList.add("org.apache.storm.daemon.worker");
 +        commandList.add(stormId);
 +        commandList.add(assignmentId);
 +        commandList.add(String.valueOf(port));
 +        commandList.add(workerId);
 +
 +        // {"cpu" cpu "memory" (+ mem-onheap mem-offheap (int (Math/ceil (conf STORM-CGROUP-MEMORY-LIMIT-TOLERANCE-MARGIN-MB))))
 +        if (Utils.getBoolean(conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN_ENABLE), false)) {
 +            int cgRoupMem = (int) (Math.ceil((double) conf.get(Config.STORM_CGROUP_MEMORY_LIMIT_TOLERANCE_MARGIN_MB)));
 +            int memoryValue = memoffheap + memOnheap + cgRoupMem;
 +            int cpuValue = cpu;
 +            Map<String, Number> map = new HashMap<>();
 +            map.put("cpu", cpuValue);
 +            map.put("memory", memoryValue);
 +            cgroupManager.reserveResourcesForWorker(workerId, map);
 +            commandList = cgroupManager.getLaunchCommand(workerId, commandList);
 +        }
 +
 +        LOG.info("Launching worker with command: {}. ", Utils.shellCmd(commandList));
 +        writeLogMetadata(stormConf, user, workerId, stormId, port, conf);
 +        ConfigUtils.setWorkerUserWSE(conf, workerId, user);
 +        createArtifactsLink(conf, stormId, port, workerId);
 +
 +        String logPrefix = "Worker Process " + workerId;
 +        String workerDir = ConfigUtils.workerRoot(conf, workerId);
 +
 +        if (deadWorkers != null)
 +            deadWorkers.remove(workerId);
 +        createBlobstoreLinks(conf, stormId, workerId);
 +
 +        ProcessExitCallback processExitCallback = new ProcessExitCallback(logPrefix, workerId);
 +        if (runWorkerAsUser) {
 +            List<String> args = new ArrayList<>();
 +            args.add("worker");
 +            args.add(workerDir);
 +            args.add(Utils.writeScript(workerDir, commandList, topEnvironment));
 +            SupervisorUtils.workerLauncher(conf, user, args, null, logPrefix, processExitCallback, new File(workerDir));
 +        } else {
 +            Utils.launchProcess(commandList, topEnvironment, logPrefix, processExitCallback, new File(workerDir));
 +        }
 +    }
 +
 +    protected String jlp(String stormRoot, Map conf) {
 +        String resourceRoot = stormRoot + Utils.FILE_PATH_SEPARATOR + ConfigUtils.RESOURCES_SUBDIR;
 +        String os = System.getProperty("os.name").replaceAll("\\s+", "_");
 +        String arch = System.getProperty("os.arch");
 +        String archResourceRoot = resourceRoot + Utils.FILE_PATH_SEPARATOR + os + "-" + arch;
 +        String ret = archResourceRoot + Utils.FILE_PATH_SEPARATOR + resourceRoot + Utils.FILE_PATH_SEPARATOR + conf.get(Config.JAVA_LIBRARY_PATH);
 +        return ret;
 +    }
 +
 +    protected Map<String, Integer> startNewWorkers(Map<Integer, String> newWorkerIds, Map<Integer, LocalAssignment> reassignExecutors) throws IOException {
 +
 +        Map<String, Integer> newValidWorkerIds = new HashMap<>();
 +        Map conf = supervisorData.getConf();
 +        String supervisorId = supervisorData.getSupervisorId();
 +        String clusterMode = ConfigUtils.clusterMode(conf);
 +
 +        for (Map.Entry<Integer, LocalAssignment> entry : reassignExecutors.entrySet()) {
 +            Integer port = entry.getKey();
 +            LocalAssignment assignment = entry.getValue();
 +            String workerId = newWorkerIds.get(port);
 +            String stormId = assignment.get_topology_id();
 +            WorkerResources resources = assignment.get_resources();
 +
 +            // This condition checks for required files exist before launching the worker
 +            if (SupervisorUtils.checkTopoFilesExist(conf, stormId)) {
 +                String pidsPath = ConfigUtils.workerPidsRoot(conf, workerId);
 +                String hbPath = ConfigUtils.workerHeartbeatsRoot(conf, workerId);
 +
 +                LOG.info("Launching worker with assignment {} for this supervisor {} on port {} with id {}", assignment, supervisorData.getSupervisorId(), port,
 +                        workerId);
 +
 +                FileUtils.forceMkdir(new File(pidsPath));
++                FileUtils.forceMkdir(new File(ConfigUtils.workerTmpRoot(conf, workerId)));
 +                FileUtils.forceMkdir(new File(hbPath));
 +
 +                if (clusterMode.endsWith("distributed")) {
 +                    launchWorker(conf, supervisorId, supervisorData.getAssignmentId(), stormId, port.longValue(), workerId, resources,
 +                            supervisorData.getResourceIsolationManager(), supervisorData.getDeadWorkers());
 +                } else if (clusterMode.endsWith("local")) {
 +                    launchWorker(supervisorData, stormId, port.longValue(), workerId, resources);
 +                }
 +                newValidWorkerIds.put(workerId, port);
 +
 +            } else {
 +                LOG.info("Missing topology storm code, so can't launch worker with assignment {} for this supervisor {} on port {} with id {}", assignment,
 +                        supervisorData.getSupervisorId(), port, workerId);
 +            }
 +
 +        }
 +        return newValidWorkerIds;
 +    }
 +
 +    public void writeLogMetadata(Map stormconf, String user, String workerId, String stormId, Long port, Map conf) throws IOException {
 +        Map data = new HashMap();
 +        data.put(Config.TOPOLOGY_SUBMITTER_USER, user);
 +        data.put("worker-id", workerId);
 +
 +        Set<String> logsGroups = new HashSet<>();
 +        //for supervisor-test
 +        if (stormconf.get(Config.LOGS_GROUPS) != null) {
 +            List<String> groups = (List<String>) stormconf.get(Config.LOGS_GROUPS);
 +            for (String group : groups){
 +                logsGroups.add(group);
 +            }
 +        }
 +        if (stormconf.get(Config.TOPOLOGY_GROUPS) != null) {
 +            List<String> topGroups = (List<String>) stormconf.get(Config.TOPOLOGY_GROUPS);
 +            for (String group : topGroups){
 +                logsGroups.add(group);
 +            }
 +        }
 +        data.put(Config.LOGS_GROUPS, logsGroups.toArray());
 +
 +        Set<String> logsUsers = new HashSet<>();
 +        if (stormconf.get(Config.LOGS_USERS) != null) {
 +            List<String> logUsers = (List<String>) stormconf.get(Config.LOGS_USERS);
 +            for (String logUser : logUsers){
 +                logsUsers.add(logUser);
 +            }
 +        }
 +        if (stormconf.get(Config.TOPOLOGY_USERS) != null) {
 +            List<String> topUsers = (List<String>) stormconf.get(Config.TOPOLOGY_USERS);
 +            for (String logUser : topUsers){
 +                logsUsers.add(logUser);
 +            }
 +        }
 +        data.put(Config.LOGS_USERS, logsUsers.toArray());
 +        writeLogMetadataToYamlFile(stormId, port, data, conf);
 +    }
 +
 +    /**
 +     * run worker as user needs the directory to have special permissions or it is insecure
 +     * 
 +     * @param stormId
 +     * @param port
 +     * @param data
 +     * @param conf
 +     * @throws IOException
 +     */
 +    public void writeLogMetadataToYamlFile(String stormId, Long port, Map data, Map conf) throws IOException {
 +        File file = ConfigUtils.getLogMetaDataFile(conf, stormId, port.intValue());
 +
 +        if (!Utils.checkFileExists(file.getParent())) {
 +            if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
 +                FileUtils.forceMkdir(file.getParentFile());
 +                SupervisorUtils.setupStormCodeDir(conf, ConfigUtils.readSupervisorStormConf(conf, stormId), file.getParentFile().getCanonicalPath());
 +            } else {
 +                file.getParentFile().mkdirs();
 +            }
 +        }
 +        FileWriter writer = new FileWriter(file);
 +        Yaml yaml = new Yaml();
 +        try {
 +            yaml.dump(data, writer);
 +        }finally {
 +            writer.close();
 +        }
 +
 +    }
 +
 +    /**
 +     * Create a symlink from workder directory to its port artifacts directory
 +     * 
 +     * @param conf
 +     * @param stormId
 +     * @param port
 +     * @param workerId
 +     */
 +    protected void createArtifactsLink(Map conf, String stormId, Long port, String workerId) throws IOException {
 +        String workerDir = ConfigUtils.workerRoot(conf, workerId);
 +        String topoDir = ConfigUtils.workerArtifactsRoot(conf, stormId);
 +        if (Utils.checkFileExists(workerDir)) {
 +            Utils.createSymlink(workerDir, topoDir, "artifacts", String.valueOf(port));
 +        }
 +    }
 +
 +    /**
 +     * Create symlinks in worker launch directory for all blobs
 +     * 
 +     * @param conf
 +     * @param stormId
 +     * @param workerId
 +     * @throws IOException
 +     */
 +    protected void createBlobstoreLinks(Map conf, String stormId, String workerId) throws IOException {
 +        String stormRoot = ConfigUtils.supervisorStormDistRoot(conf, stormId);
 +        Map stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId);
 +        String workerRoot = ConfigUtils.workerRoot(conf, workerId);
 +        Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String, Object>>) stormConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
 +        List<String> blobFileNames = new ArrayList<>();
 +        if (blobstoreMap != null) {
 +            for (Map.Entry<String, Map<String, Object>> entry : blobstoreMap.entrySet()) {
 +                String key = entry.getKey();
 +                Map<String, Object> blobInfo = entry.getValue();
 +                String ret = null;
 +                if (blobInfo != null && blobInfo.containsKey("localname")) {
 +                    ret = (String) blobInfo.get("localname");
 +                } else {
 +                    ret = key;
 +                }
 +                blobFileNames.add(ret);
 +            }
 +        }
 +        List<String> resourceFileNames = new ArrayList<>();
 +        resourceFileNames.add(ConfigUtils.RESOURCES_SUBDIR);
 +        resourceFileNames.addAll(blobFileNames);
 +        LOG.info("Creating symlinks for worker-id: {} storm-id: {} for files({}): {}", workerId, stormId, resourceFileNames.size(), resourceFileNames);
 +        Utils.createSymlink(workerRoot, stormRoot, ConfigUtils.RESOURCES_SUBDIR);
 +        for (String fileName : blobFileNames) {
 +            Utils.createSymlink(workerRoot, stormRoot, fileName, fileName);
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/storm/blob/42bacde2/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java
index 2de9203,0000000..29aad12
mode 100644,000000..100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java
@@@ -1,593 -1,0 +1,636 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + * http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.storm.daemon.supervisor;
 +
 +import org.apache.commons.io.FileUtils;
 +import org.apache.storm.Config;
 +import org.apache.storm.blobstore.BlobStore;
 +import org.apache.storm.blobstore.ClientBlobStore;
 +import org.apache.storm.cluster.IStateStorage;
 +import org.apache.storm.cluster.IStormClusterState;
 +import org.apache.storm.event.EventManager;
 +import org.apache.storm.generated.*;
 +import org.apache.storm.localizer.LocalResource;
 +import org.apache.storm.localizer.LocalizedResource;
 +import org.apache.storm.localizer.Localizer;
 +import org.apache.storm.utils.*;
 +import org.apache.thrift.transport.TTransportException;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import java.io.File;
 +import java.io.FileOutputStream;
 +import java.io.IOException;
 +import java.net.JarURLConnection;
 +import java.net.URL;
 +import java.nio.file.Files;
 +import java.nio.file.StandardCopyOption;
 +import java.util.*;
 +import java.util.concurrent.atomic.AtomicInteger;
 +
 +public class SyncSupervisorEvent implements Runnable {
 +
 +    private static final Logger LOG = LoggerFactory.getLogger(SyncSupervisorEvent.class);
 +
 +    private EventManager syncSupEventManager;
 +    private EventManager syncProcessManager;
 +
 +    private IStormClusterState stormClusterState;
 +
 +    private LocalState localState;
 +
 +    private SyncProcessEvent syncProcesses;
 +    private SupervisorData supervisorData;
 +
 +    public SyncSupervisorEvent(SupervisorData supervisorData, SyncProcessEvent syncProcesses, EventManager syncSupEventManager,
 +            EventManager syncProcessManager) {
 +
 +        this.syncProcesses = syncProcesses;
 +        this.syncSupEventManager = syncSupEventManager;
 +        this.syncProcessManager = syncProcessManager;
 +        this.stormClusterState = supervisorData.getStormClusterState();
 +        this.localState = supervisorData.getLocalState();
 +        this.supervisorData = supervisorData;
 +    }
 +
 +    @Override
 +    public void run() {
 +        try {
 +            Map conf = supervisorData.getConf();
 +            Runnable syncCallback = new EventManagerPushCallback(this, syncSupEventManager);
 +            List<String> stormIds = stormClusterState.assignments(syncCallback);
 +            Map<String, Map<String, Object>> assignmentsSnapshot =
 +                    getAssignmentsSnapshot(stormClusterState, stormIds, supervisorData.getAssignmentVersions(), syncCallback);
 +            Map<String, List<ProfileRequest>> stormIdToProfilerActions = getProfileActions(stormClusterState, stormIds);
 +
 +            Set<String> allDownloadedTopologyIds = SupervisorUtils.readDownLoadedStormIds(conf);
 +            Map<String, String> stormcodeMap = readStormCodeLocations(assignmentsSnapshot);
 +            Map<Integer, LocalAssignment> existingAssignment = localState.getLocalAssignmentsMap();
 +            if (existingAssignment == null){
 +                existingAssignment = new HashMap<>();
 +            }
 +
 +            Map<Integer, LocalAssignment> allAssignment =
 +                    readAssignments(assignmentsSnapshot, existingAssignment, supervisorData.getAssignmentId(), supervisorData.getSyncRetry());
 +
 +
 +            Map<Integer, LocalAssignment> newAssignment = new HashMap<>();
 +            Set<String> assignedStormIds = new HashSet<>();
 +
 +            for (Map.Entry<Integer, LocalAssignment> entry : allAssignment.entrySet()) {
 +                if (supervisorData.getiSupervisor().confirmAssigned(entry.getKey())) {
 +                    newAssignment.put(entry.getKey(), entry.getValue());
 +                    assignedStormIds.add(entry.getValue().get_topology_id());
 +                }
 +            }
 +
 +            Set<String> srashStormIds = verifyDownloadedFiles(conf, supervisorData.getLocalizer(), assignedStormIds, allDownloadedTopologyIds);
 +            Set<String> downloadedStormIds = new HashSet<>();
 +            downloadedStormIds.addAll(allDownloadedTopologyIds);
 +            downloadedStormIds.removeAll(srashStormIds);
 +
 +            LOG.debug("Synchronizing supervisor");
 +            LOG.debug("Storm code map: {}", stormcodeMap);
 +            LOG.debug("All assignment: {}", allAssignment);
 +            LOG.debug("New assignment: {}", newAssignment);
 +            LOG.debug("Assigned Storm Ids {}", assignedStormIds);
 +            LOG.debug("All Downloaded Ids {}", allDownloadedTopologyIds);
 +            LOG.debug("Checked Downloaded Ids {}", srashStormIds);
 +            LOG.debug("Downloaded Ids {}", downloadedStormIds);
 +            LOG.debug("Storm Ids Profiler Actions {}", stormIdToProfilerActions);
 +            // download code first
 +            // This might take awhile
 +            // - should this be done separately from usual monitoring?
 +            // should we only download when topology is assigned to this supervisor?
 +            for (Map.Entry<String, String> entry : stormcodeMap.entrySet()) {
 +                String stormId = entry.getKey();
 +                if (!downloadedStormIds.contains(stormId) && assignedStormIds.contains(stormId)) {
 +                    LOG.info("Downloading code for storm id {}.", stormId);
 +                    try {
 +                        downloadStormCode(conf, stormId, entry.getValue(), supervisorData.getLocalizer());
 +                    } catch (Exception e) {
 +                        if (Utils.exceptionCauseIsInstanceOf(NimbusLeaderNotFoundException.class, e)) {
 +                            LOG.warn("Nimbus leader was not available.", e);
 +                        } else if (Utils.exceptionCauseIsInstanceOf(TTransportException.class, e)) {
 +                            LOG.warn("There was a connection problem with nimbus.", e);
 +                        } else {
 +                            throw e;
 +                        }
 +                    }
 +                    LOG.info("Finished downloading code for storm id {}", stormId);
 +                }
 +            }
 +
 +            LOG.debug("Writing new assignment {}", newAssignment);
 +
 +            Set<Integer> killWorkers = new HashSet<>();
 +            killWorkers.addAll(existingAssignment.keySet());
 +            killWorkers.removeAll(newAssignment.keySet());
 +            for (Integer port : killWorkers) {
 +                supervisorData.getiSupervisor().killedWorker(port);
 +            }
 +
++            killExistingWorkersWithChangeInComponents(supervisorData, existingAssignment, newAssignment);
++
 +            supervisorData.getiSupervisor().assigned(newAssignment.keySet());
 +            localState.setLocalAssignmentsMap(newAssignment);
 +            supervisorData.setAssignmentVersions(assignmentsSnapshot);
 +            supervisorData.setStormIdToProfileActions(stormIdToProfilerActions);
 +
 +            Map<Long, LocalAssignment> convertNewAssignment = new HashMap<>();
 +            for (Map.Entry<Integer, LocalAssignment> entry : newAssignment.entrySet()) {
 +                convertNewAssignment.put(entry.getKey().longValue(), entry.getValue());
 +            }
 +            supervisorData.setCurrAssignment(convertNewAssignment);
 +            // remove any downloaded code that's no longer assigned or active
 +            // important that this happens after setting the local assignment so that
 +            // synchronize-supervisor doesn't try to launch workers for which the
 +            // resources don't exist
 +            if (Utils.isOnWindows()) {
 +                shutdownDisallowedWorkers();
 +            }
 +            for (String stormId : allDownloadedTopologyIds) {
 +                if (!stormcodeMap.containsKey(stormId)) {
 +                    LOG.info("Removing code for storm id {}.", stormId);
 +                    rmTopoFiles(conf, stormId, supervisorData.getLocalizer(), true);
 +                }
 +            }
 +            syncProcessManager.add(syncProcesses);
 +        } catch (Exception e) {
 +            LOG.error("Failed to Sync Supervisor", e);
 +            throw new RuntimeException(e);
 +        }
 +
 +    }
 +
++    private void killExistingWorkersWithChangeInComponents(SupervisorData supervisorData, Map<Integer, LocalAssignment> existingAssignment,
++            Map<Integer, LocalAssignment> newAssignment) throws Exception {
++        LocalState localState = supervisorData.getLocalState();
++        Map<Integer, LocalAssignment> assignedExecutors = localState.getLocalAssignmentsMap();
++        if (assignedExecutors == null) {
++            assignedExecutors = new HashMap<>();
++        }
++        int now = Time.currentTimeSecs();
++        Map<String, StateHeartbeat> workerIdHbstate = syncProcesses.getLocalWorkerStats(supervisorData, assignedExecutors, now);
++        Map<Integer, String> vaildPortToWorkerIds = new HashMap<>();
++        for (Map.Entry<String, StateHeartbeat> entry : workerIdHbstate.entrySet()) {
++            String workerId = entry.getKey();
++            StateHeartbeat stateHeartbeat = entry.getValue();
++            if (stateHeartbeat != null && stateHeartbeat.getState() == State.valid) {
++                vaildPortToWorkerIds.put(stateHeartbeat.getHeartbeat().get_port(), workerId);
++            }
++        }
++
++        Map<Integer, LocalAssignment> intersectAssignment = new HashMap<>();
++        for (Map.Entry<Integer, LocalAssignment> entry : newAssignment.entrySet()) {
++            Integer port = entry.getKey();
++            if (existingAssignment.containsKey(port)) {
++                intersectAssignment.put(port, entry.getValue());
++            }
++        }
++
++        for (Integer port : intersectAssignment.keySet()) {
++            List<ExecutorInfo> existExecutors = existingAssignment.get(port).get_executors();
++            List<ExecutorInfo> newExecutors = newAssignment.get(port).get_executors();
++            if (newExecutors.size() != existExecutors.size()) {
++                syncProcesses.shutWorker(supervisorData, vaildPortToWorkerIds.get(port));
++                continue;
++            }
++            for (ExecutorInfo executorInfo : newExecutors) {
++                if (!existExecutors.contains(executorInfo)) {
++                    syncProcesses.shutWorker(supervisorData, vaildPortToWorkerIds.get(port));
++                    break;
++                }
++            }
++
++        }
++    }
 +    protected Map<String, Map<String, Object>> getAssignmentsSnapshot(IStormClusterState stormClusterState, List<String> stormIds,
 +            Map<String, Map<String, Object>> localAssignmentVersion, Runnable callback) throws Exception {
 +        Map<String, Map<String, Object>> updateAssignmentVersion = new HashMap<>();
 +        for (String stormId : stormIds) {
 +            Integer recordedVersion = -1;
 +            Integer version = stormClusterState.assignmentVersion(stormId, callback);
 +            if (localAssignmentVersion.containsKey(stormId) && localAssignmentVersion.get(stormId) != null) {
 +                recordedVersion = (Integer) localAssignmentVersion.get(stormId).get(IStateStorage.VERSION);
 +            }
 +            if (version == null) {
 +                // ignore
 +            } else if (version == recordedVersion) {
 +                updateAssignmentVersion.put(stormId, localAssignmentVersion.get(stormId));
 +            } else {
 +                Map<String, Object> assignmentVersion = (Map<String, Object>) stormClusterState.assignmentInfoWithVersion(stormId, callback);
 +                updateAssignmentVersion.put(stormId, assignmentVersion);
 +            }
 +        }
 +        return updateAssignmentVersion;
 +    }
 +
 +    protected Map<String, List<ProfileRequest>> getProfileActions(IStormClusterState stormClusterState, List<String> stormIds) throws Exception {
 +        Map<String, List<ProfileRequest>> ret = new HashMap<String, List<ProfileRequest>>();
 +        for (String stormId : stormIds) {
 +            List<ProfileRequest> profileRequests = stormClusterState.getTopologyProfileRequests(stormId);
 +            ret.put(stormId, profileRequests);
 +        }
 +        return ret;
 +    }
 +
 +    protected Map<String, String> readStormCodeLocations(Map<String, Map<String, Object>> assignmentsSnapshot) {
 +        Map<String, String> stormcodeMap = new HashMap<>();
 +        for (Map.Entry<String, Map<String, Object>> entry : assignmentsSnapshot.entrySet()) {
 +            Assignment assignment = (Assignment) (entry.getValue().get(IStateStorage.DATA));
 +            if (assignment != null) {
 +                stormcodeMap.put(entry.getKey(), assignment.get_master_code_dir());
 +            }
 +        }
 +        return stormcodeMap;
 +    }
 +
 +    /**
 +     * Remove a reference to a blob when its no longer needed.
 +     * 
 +     * @param localizer
 +     * @param stormId
 +     * @param conf
 +     */
 +    protected void removeBlobReferences(Localizer localizer, String stormId, Map conf) throws Exception {
 +        Map stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId);
 +        Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String, Object>>) stormConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
 +        String user = (String) stormConf.get(Config.TOPOLOGY_SUBMITTER_USER);
 +        String topoName = (String) stormConf.get(Config.TOPOLOGY_NAME);
 +        if (blobstoreMap != null) {
 +            for (Map.Entry<String, Map<String, Object>> entry : blobstoreMap.entrySet()) {
 +                String key = entry.getKey();
 +                Map<String, Object> blobInfo = entry.getValue();
 +                localizer.removeBlobReference(key, user, topoName, SupervisorUtils.isShouldUncompressBlob(blobInfo));
 +            }
 +        }
 +    }
 +
 +    protected void rmTopoFiles(Map conf, String stormId, Localizer localizer, boolean isrmBlobRefs) throws IOException {
 +        String path = ConfigUtils.supervisorStormDistRoot(conf, stormId);
 +        try {
 +            if (isrmBlobRefs) {
 +                removeBlobReferences(localizer, stormId, conf);
 +            }
 +            if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
 +                SupervisorUtils.rmrAsUser(conf, stormId, path);
 +            } else {
 +                Utils.forceDelete(ConfigUtils.supervisorStormDistRoot(conf, stormId));
 +            }
 +        } catch (Exception e) {
 +            LOG.info("Exception removing: {} ", stormId, e);
 +        }
 +    }
 +
 +    /**
 +     * Check for the files exists to avoid supervisor crashing Also makes sure there is no necessity for locking"
 +     * 
 +     * @param conf
 +     * @param localizer
 +     * @param assignedStormIds
 +     * @param allDownloadedTopologyIds
 +     * @return
 +     */
 +    protected Set<String> verifyDownloadedFiles(Map conf, Localizer localizer, Set<String> assignedStormIds, Set<String> allDownloadedTopologyIds)
 +            throws IOException {
 +        Set<String> srashStormIds = new HashSet<>();
 +        for (String stormId : allDownloadedTopologyIds) {
 +            if (assignedStormIds.contains(stormId)) {
 +                if (!SupervisorUtils.checkTopoFilesExist(conf, stormId)) {
 +                    LOG.debug("Files not present in topology directory");
 +                    rmTopoFiles(conf, stormId, localizer, false);
 +                    srashStormIds.add(stormId);
 +                }
 +            }
 +        }
 +        return srashStormIds;
 +    }
 +
 +    /**
 +     * download code ; two cluster mode: local and distributed
 +     *
 +     * @param conf
 +     * @param stormId
 +     * @param masterCodeDir
 +     * @throws IOException
 +     */
 +    private void downloadStormCode(Map conf, String stormId, String masterCodeDir, Localizer localizer) throws Exception {
 +        String clusterMode = ConfigUtils.clusterMode(conf);
 +
 +        if (clusterMode.endsWith("distributed")) {
 +            downloadDistributeStormCode(conf, stormId, masterCodeDir, localizer);
 +        } else if (clusterMode.endsWith("local")) {
 +            downloadLocalStormCode(conf, stormId, masterCodeDir, localizer);
 +        }
 +    }
 +
 +    private void downloadLocalStormCode(Map conf, String stormId, String masterCodeDir, Localizer localizer) throws Exception {
 +
 +        String tmproot = ConfigUtils.supervisorTmpDir(conf) + Utils.FILE_PATH_SEPARATOR + Utils.uuid();
 +        String stormroot = ConfigUtils.supervisorStormDistRoot(conf, stormId);
 +        BlobStore blobStore = Utils.getNimbusBlobStore(conf, masterCodeDir, null);
 +        try {
 +            FileUtils.forceMkdir(new File(tmproot));
 +            String stormCodeKey = ConfigUtils.masterStormCodeKey(stormId);
 +            String stormConfKey = ConfigUtils.masterStormConfKey(stormId);
 +            String codePath = ConfigUtils.supervisorStormCodePath(tmproot);
 +            String confPath = ConfigUtils.supervisorStormConfPath(tmproot);
 +            blobStore.readBlobTo(stormCodeKey, new FileOutputStream(codePath), null);
 +            blobStore.readBlobTo(stormConfKey, new FileOutputStream(confPath), null);
 +        } finally {
 +            blobStore.shutdown();
 +        }
 +
 +        FileUtils.moveDirectory(new File(tmproot), new File(stormroot));
 +
 +        SupervisorUtils.setupStormCodeDir(conf, ConfigUtils.readSupervisorStormConf(conf, stormId), stormroot);
 +        ClassLoader classloader = Thread.currentThread().getContextClassLoader();
 +
 +        String resourcesJar = resourcesJar();
 +
 +        URL url = classloader.getResource(ConfigUtils.RESOURCES_SUBDIR);
 +
 +        String targetDir = stormroot + Utils.FILE_PATH_SEPARATOR + ConfigUtils.RESOURCES_SUBDIR;
 +
 +        if (resourcesJar != null) {
 +            LOG.info("Extracting resources from jar at {} to {}", resourcesJar, targetDir);
 +            Utils.extractDirFromJar(resourcesJar, ConfigUtils.RESOURCES_SUBDIR, stormroot);
 +        } else if (url != null) {
 +
 +            LOG.info("Copying resources at {} to {} ", url.toString(), targetDir);
 +            if (url.getProtocol() == "jar") {
 +                JarURLConnection urlConnection = (JarURLConnection) url.openConnection();
 +                Utils.extractDirFromJar(urlConnection.getJarFileURL().getFile(), ConfigUtils.RESOURCES_SUBDIR, stormroot);
 +            } else {
 +                FileUtils.copyDirectory(new File(url.getFile()), (new File(targetDir)));
 +            }
 +        }
 +    }
 +
 +    /**
 +     * Downloading to permanent location is atomic
 +     * 
 +     * @param conf
 +     * @param stormId
 +     * @param masterCodeDir
 +     * @param localizer
 +     * @throws Exception
 +     */
 +    private void downloadDistributeStormCode(Map conf, String stormId, String masterCodeDir, Localizer localizer) throws Exception {
 +
 +        String tmproot = ConfigUtils.supervisorTmpDir(conf) + Utils.FILE_PATH_SEPARATOR + Utils.uuid();
 +        String stormroot = ConfigUtils.supervisorStormDistRoot(conf, stormId);
 +        ClientBlobStore blobStore = Utils.getClientBlobStoreForSupervisor(conf);
 +        FileUtils.forceMkdir(new File(tmproot));
 +        if (Utils.isOnWindows()) {
 +            if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
 +                throw new RuntimeException("ERROR: Windows doesn't implement setting the correct permissions");
 +            }
 +        } else {
 +            Utils.restrictPermissions(tmproot);
 +        }
 +        String stormJarKey = ConfigUtils.masterStormJarKey(stormId);
 +        String stormCodeKey = ConfigUtils.masterStormCodeKey(stormId);
 +        String stormConfKey = ConfigUtils.masterStormConfKey(stormId);
 +        String jarPath = ConfigUtils.supervisorStormJarPath(tmproot);
 +        String codePath = ConfigUtils.supervisorStormCodePath(tmproot);
 +        String confPath = ConfigUtils.supervisorStormConfPath(tmproot);
 +        Utils.downloadResourcesAsSupervisor(stormJarKey, jarPath, blobStore);
 +        Utils.downloadResourcesAsSupervisor(stormCodeKey, codePath, blobStore);
 +        Utils.downloadResourcesAsSupervisor(stormConfKey, confPath, blobStore);
 +        blobStore.shutdown();
 +        Utils.extractDirFromJar(jarPath, ConfigUtils.RESOURCES_SUBDIR, tmproot);
 +        downloadBlobsForTopology(conf, confPath, localizer, tmproot);
 +        if (IsDownloadBlobsForTopologySucceed(confPath, tmproot)) {
 +            LOG.info("Successfully downloaded blob resources for storm-id {}", stormId);
 +            FileUtils.forceMkdir(new File(stormroot));
 +            Files.move(new File(tmproot).toPath(), new File(stormroot).toPath(), StandardCopyOption.ATOMIC_MOVE);
 +            SupervisorUtils.setupStormCodeDir(conf, ConfigUtils.readSupervisorStormConf(conf, stormId), stormroot);
 +        } else {
 +            LOG.info("Failed to download blob resources for storm-id ", stormId);
 +            Utils.forceDelete(tmproot);
 +        }
 +    }
 +
 +    /**
 +     * Assert if all blobs are downloaded for the given topology
 +     * 
 +     * @param stormconfPath
 +     * @param targetDir
 +     * @return
 +     */
 +    protected boolean IsDownloadBlobsForTopologySucceed(String stormconfPath, String targetDir) throws IOException {
 +        Map stormConf = Utils.fromCompressedJsonConf(FileUtils.readFileToByteArray(new File(stormconfPath)));
 +        Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String, Object>>) stormConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
 +        List<String> blobFileNames = new ArrayList<>();
 +        if (blobstoreMap != null) {
 +            for (Map.Entry<String, Map<String, Object>> entry : blobstoreMap.entrySet()) {
 +                String key = entry.getKey();
 +                Map<String, Object> blobInfo = entry.getValue();
 +                String ret = null;
 +                if (blobInfo != null && blobInfo.containsKey("localname")) {
 +                    ret = (String) blobInfo.get("localname");
 +                } else {
 +                    ret = key;
 +                }
 +                blobFileNames.add(ret);
 +            }
 +        }
 +        for (String string : blobFileNames) {
 +            if (!Utils.checkFileExists(string))
 +                return false;
 +        }
 +        return true;
 +    }
 +
 +    /**
 +     * Download all blobs listed in the topology configuration for a given topology.
 +     * 
 +     * @param conf
 +     * @param stormconfPath
 +     * @param localizer
 +     * @param tmpRoot
 +     */
 +    protected void downloadBlobsForTopology(Map conf, String stormconfPath, Localizer localizer, String tmpRoot) throws IOException {
 +        Map stormConf = ConfigUtils.readSupervisorStormConfGivenPath(conf, stormconfPath);
 +        Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String, Object>>) stormConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
 +        String user = (String) stormConf.get(Config.TOPOLOGY_SUBMITTER_USER);
 +        String topoName = (String) stormConf.get(Config.TOPOLOGY_NAME);
 +        File userDir = localizer.getLocalUserFileCacheDir(user);
 +        List<LocalResource> localResourceList = SupervisorUtils.blobstoreMapToLocalresources(blobstoreMap);
 +        if (localResourceList.size() > 0) {
 +            if (!userDir.exists()) {
 +                FileUtils.forceMkdir(userDir);
 +            }
 +            try {
 +                List<LocalizedResource> localizedResources = localizer.getBlobs(localResourceList, user, topoName, userDir);
 +                setupBlobPermission(conf, user, userDir.toString());
 +                for (LocalizedResource localizedResource : localizedResources) {
 +                    File rsrcFilePath = new File(localizedResource.getFilePath());
 +                    String keyName = rsrcFilePath.getName();
 +                    String blobSymlinkTargetName = new File(localizedResource.getCurrentSymlinkPath()).getName();
 +
 +                    String symlinkName = null;
 +                    if (blobstoreMap != null) {
 +                        Map<String, Object> blobInfo = blobstoreMap.get(keyName);
 +                        if (blobInfo != null && blobInfo.containsKey("localname")) {
 +                            symlinkName = (String) blobInfo.get("localname");
 +                        } else {
 +                            symlinkName = keyName;
 +                        }
 +                    }
 +                    Utils.createSymlink(tmpRoot, rsrcFilePath.getParent(), symlinkName, blobSymlinkTargetName);
 +                }
 +            } catch (AuthorizationException authExp) {
 +                LOG.error("AuthorizationException error {}", authExp);
 +            } catch (KeyNotFoundException knf) {
 +                LOG.error("KeyNotFoundException error {}", knf);
 +            }
 +        }
 +    }
 +
 +    protected void setupBlobPermission(Map conf, String user, String path) throws IOException {
 +        if (Utils.getBoolean(Config.SUPERVISOR_RUN_WORKER_AS_USER, false)) {
 +            String logPrefix = "setup blob permissions for " + path;
 +            SupervisorUtils.workerLauncherAndWait(conf, user, Arrays.asList("blob", path), null, logPrefix);
 +        }
 +
 +    }
 +
 +    private String resourcesJar() throws IOException {
 +
 +        String path = Utils.currentClasspath();
 +        if (path == null) {
 +            return null;
 +        }
 +        String[] paths = path.split(File.pathSeparator);
 +        List<String> jarPaths = new ArrayList<String>();
 +        for (String s : paths) {
 +            if (s.endsWith(".jar")) {
 +                jarPaths.add(s);
 +            }
 +        }
 +
 +        List<String> rtn = new ArrayList<String>();
 +        int size = jarPaths.size();
 +        for (int i = 0; i < size; i++) {
 +            if (Utils.zipDoesContainDir(jarPaths.get(i), ConfigUtils.RESOURCES_SUBDIR)) {
 +                rtn.add(jarPaths.get(i));
 +            }
 +        }
 +        if (rtn.size() == 0)
 +            return null;
 +
 +        return rtn.get(0);
 +    }
 +
 +    protected Map<Integer, LocalAssignment> readAssignments(Map<String, Map<String, Object>> assignmentsSnapshot,
 +            Map<Integer, LocalAssignment> existingAssignment, String assignmentId, AtomicInteger retries) {
 +        try {
 +            Map<Integer, LocalAssignment> portLA = new HashMap<Integer, LocalAssignment>();
 +            for (Map.Entry<String, Map<String, Object>> assignEntry : assignmentsSnapshot.entrySet()) {
 +                String stormId = assignEntry.getKey();
 +                Assignment assignment = (Assignment) assignEntry.getValue().get(IStateStorage.DATA);
 +
 +                Map<Integer, LocalAssignment> portTasks = readMyExecutors(stormId, assignmentId, assignment);
 +
 +                for (Map.Entry<Integer, LocalAssignment> entry : portTasks.entrySet()) {
 +
 +                    Integer port = entry.getKey();
 +
 +                    LocalAssignment la = entry.getValue();
 +
 +                    if (!portLA.containsKey(port)) {
 +                        portLA.put(port, la);
 +                    } else {
 +                        throw new RuntimeException("Should not have multiple topologys assigned to one port");
 +                    }
 +                }
 +            }
 +            retries.set(0);
 +            return portLA;
 +        } catch (RuntimeException e) {
 +            if (retries.get() > 2) {
 +                throw e;
 +            } else {
 +                retries.addAndGet(1);
 +            }
 +            LOG.warn("{} : retrying {} of 3", e.getMessage(), retries.get());
 +            return existingAssignment;
 +        }
 +    }
 +
 +    protected Map<Integer, LocalAssignment> readMyExecutors(String stormId, String assignmentId, Assignment assignment) {
 +        Map<Integer, LocalAssignment> portTasks = new HashMap<>();
 +        Map<Long, WorkerResources> slotsResources = new HashMap<>();
 +        Map<NodeInfo, WorkerResources> nodeInfoWorkerResourcesMap = assignment.get_worker_resources();
 +        if (nodeInfoWorkerResourcesMap != null) {
 +            for (Map.Entry<NodeInfo, WorkerResources> entry : nodeInfoWorkerResourcesMap.entrySet()) {
 +                if (entry.getKey().get_node().equals(assignmentId)) {
 +                    Set<Long> ports = entry.getKey().get_port();
 +                    for (Long port : ports) {
 +                        slotsResources.put(port, entry.getValue());
 +                    }
 +                }
 +            }
 +        }
 +        Map<List<Long>, NodeInfo> executorNodePort = assignment.get_executor_node_port();
 +        if (executorNodePort != null) {
 +            for (Map.Entry<List<Long>, NodeInfo> entry : executorNodePort.entrySet()) {
 +                if (entry.getValue().get_node().equals(assignmentId)) {
 +                    for (Long port : entry.getValue().get_port()) {
 +                        LocalAssignment localAssignment = portTasks.get(port.intValue());
 +                        if (localAssignment == null) {
 +                            List<ExecutorInfo> executors = new ArrayList<ExecutorInfo>();
 +                            localAssignment = new LocalAssignment(stormId, executors);
 +                            if (slotsResources.containsKey(port)) {
 +                                localAssignment.set_resources(slotsResources.get(port));
 +                            }
 +                            portTasks.put(port.intValue(), localAssignment);
 +                        }
 +                        List<ExecutorInfo> executorInfoList = localAssignment.get_executors();
 +                        executorInfoList.add(new ExecutorInfo(entry.getKey().get(0).intValue(), entry.getKey().get(entry.getKey().size() - 1).intValue()));
 +                    }
 +                }
 +            }
 +        }
 +        return portTasks;
 +    }
 +
 +    // I konw it's not a good idea to create SyncProcessEvent, but I only hope SyncProcessEvent is responsible for start/shutdown
 +    //workers, and SyncSupervisorEvent is responsible for download/remove topologys' binary.
 +    protected void shutdownDisallowedWorkers() throws Exception{
-         Map conf = supervisorData.getConf();
 +        LocalState localState = supervisorData.getLocalState();
 +        Map<Integer, LocalAssignment> assignedExecutors = localState.getLocalAssignmentsMap();
 +        if (assignedExecutors == null) {
 +            assignedExecutors = new HashMap<>();
 +        }
 +        int now = Time.currentTimeSecs();
 +        Map<String, StateHeartbeat> workerIdHbstate = syncProcesses.getLocalWorkerStats(supervisorData, assignedExecutors, now);
 +        LOG.debug("Allocated workers ", assignedExecutors);
 +        for (Map.Entry<String, StateHeartbeat> entry : workerIdHbstate.entrySet()){
 +            String workerId = entry.getKey();
 +            StateHeartbeat stateHeartbeat = entry.getValue();
 +            if (stateHeartbeat.getState() == State.disallowed){
 +                syncProcesses.shutWorker(supervisorData, workerId);
 +                LOG.debug("{}'s state disallowed, so shutdown this worker");
 +            }
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/storm/blob/42bacde2/storm-core/test/clj/org/apache/storm/supervisor_test.clj
----------------------------------------------------------------------
diff --cc storm-core/test/clj/org/apache/storm/supervisor_test.clj
index 9b5f1e0,42dd766..2ff21ac
--- a/storm-core/test/clj/org/apache/storm/supervisor_test.clj
+++ b/storm-core/test/clj/org/apache/storm/supervisor_test.clj
@@@ -297,52 -294,47 +297,53 @@@
  
  (deftest test-worker-launch-command
    (testing "*.worker.childopts configuration"
 -    (let [mock-port "42"
 +    (let [mock-port 42
            mock-storm-id "fake-storm-id"
            mock-worker-id "fake-worker-id"
 -          storm-log-dir (ConfigUtils/getLogDir)
            mock-cp (str Utils/FILE_PATH_SEPARATOR "base" Utils/CLASS_PATH_SEPARATOR Utils/FILE_PATH_SEPARATOR "stormjar.jar")
            mock-sensitivity "S3"
 -          mock-cp "/base:/stormjar.jar"
            exp-args-fn (fn [opts topo-opts classpath]
 -                       (concat [(supervisor/java-cmd) "-cp" classpath
 -                               (str "-Dlogfile.name=" "worker.log")
 -                               "-Dstorm.home="
 -                               (str "-Dworkers.artifacts=" "/tmp/workers-artifacts")
 -                               (str "-Dstorm.id=" mock-storm-id)
 -                               (str "-Dworker.id=" mock-worker-id)
 -                               (str "-Dworker.port=" mock-port)
 -                               (str "-Dstorm.log.dir=" storm-log-dir)
 -                               "-Dlog4j.configurationFile=/log4j2/worker.xml"
 -                               "-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector"
 -                               "org.apache.storm.LogWriter"]
 -                               [(supervisor/java-cmd) "-server"]
 -                               opts
 -                               topo-opts
 -                               ["-Djava.library.path="
 -                                (str "-Dlogfile.name=" "worker.log")
 -                                "-Dstorm.home="
 -                                "-Dworkers.artifacts=/tmp/workers-artifacts"
 -                                "-Dstorm.conf.file="
 -                                "-Dstorm.options="
 -                                (str "-Dstorm.log.dir=" storm-log-dir)
 -                                (str "-Djava.io.tmpdir=/tmp/workers" Utils/FILE_PATH_SEPARATOR mock-worker-id Utils/FILE_PATH_SEPARATOR "tmp")
 -                                (str "-Dlogging.sensitivity=" mock-sensitivity)
 -                                (str "-Dlog4j.configurationFile=" Utils/FILE_PATH_SEPARATOR "log4j2" Utils/FILE_PATH_SEPARATOR "worker.xml")
 -                                "-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector"
 -                                (str "-Dstorm.id=" mock-storm-id)
 -                                (str "-Dworker.id=" mock-worker-id)
 -                                (str "-Dworker.port=" mock-port)
 -                                "-cp" classpath
 -                                "org.apache.storm.daemon.worker"
 -                                mock-storm-id
 -                                mock-port
 -                                mock-worker-id]))]
 +                        (let [file-prefix (let [os (System/getProperty "os.name")]
 +                                            (if (.startsWith os "Windows") (str "file:///")
 +                                                    (str "")))
 +                              sequences (concat [(SupervisorUtils/javaCmd "java") "-cp" classpath
 +                                                (str "-Dlogfile.name=" "worker.log")
 +                                                "-Dstorm.home="
 +                                                (str "-Dworkers.artifacts=" "/tmp/workers-artifacts")
 +                                                (str "-Dstorm.id=" mock-storm-id)
 +                                                (str "-Dworker.id=" mock-worker-id)
 +                                                (str "-Dworker.port=" mock-port)
 +                                                (str "-Dstorm.log.dir=" (ConfigUtils/getLogDir))
 +                                                (str "-Dlog4j.configurationFile=" file-prefix Utils/FILE_PATH_SEPARATOR "log4j2" Utils/FILE_PATH_SEPARATOR "worker.xml")
 +                                                 "-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector"
 +                                                "org.apache.storm.LogWriter"]
 +                                         [(SupervisorUtils/javaCmd "java") "-server"]
 +                                         opts
 +                                         topo-opts
 +                                         ["-Djava.library.path="
 +                                          (str "-Dlogfile.name=" "worker.log")
 +                                          "-Dstorm.home="
 +                                          "-Dworkers.artifacts=/tmp/workers-artifacts"
 +                                          "-Dstorm.conf.file="
 +                                          "-Dstorm.options="
 +                                          (str "-Dstorm.log.dir=" (ConfigUtils/getLogDir))
++                                          (str "-Djava.io.tmpdir=/tmp/workers" Utils/FILE_PATH_SEPARATOR mock-worker-id Utils/FILE_PATH_SEPARATOR "tmp")
 +                                          (str "-Dlogging.sensitivity=" mock-sensitivity)
 +                                          (str "-Dlog4j.configurationFile=" file-prefix Utils/FILE_PATH_SEPARATOR "log4j2" Utils/FILE_PATH_SEPARATOR "worker.xml")
 +                                          "-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector"
 +                                          (str "-Dstorm.id=" mock-storm-id)
 +                                          (str "-Dworker.id=" mock-worker-id)
 +                                          (str "-Dworker.port=" mock-port)
 +                                          "-cp" classpath
 +                                          "org.apache.storm.daemon.worker"
 +                                          mock-storm-id
 +                                          ""
 +                                          mock-port
 +                                          mock-worker-id])
 +                          ret (ArrayList.)]
 +                        (doseq [val sequences]
 +                          (.add ret (str val)))
 +                          ret))]
        (testing "testing *.worker.childopts as strings with extra spaces"
          (let [string-opts "-Dfoo=bar  -Xmx1024m"
                topo-string-opts "-Dkau=aux   -Xmx2048m"
@@@ -363,18 -355,18 +364,19 @@@
                                                         ([conf storm-id] nil))
                            (readSupervisorStormConfImpl [conf storm-id] mocked-supervisor-storm-conf)
                            (setWorkerUserWSEImpl [conf worker-id user] nil)
 -                          (workerRootImpl [conf] "/tmp/workers")
 -                          (workerArtifactsRootImpl [conf] "/tmp/workers-artifacts"))]
++                         (workerRootImpl [conf] "/tmp/workers")
 +                          (workerArtifactsRootImpl [conf] "/tmp/workers-artifacts"))
 +              process-proxy (proxy [SyncProcessEvent] []
 +                              (jlp [stormRoot conf] "")
 +                              (writeLogMetadata [stormconf user workerId stormId port conf] nil)
 +                              (createBlobstoreLinks [conf stormId workerId] nil))]
 +
            (with-open [_ (ConfigUtilsInstaller. cu-proxy)
                        _ (UtilsInstaller. utils-spy)]
 -              (stubbing [supervisor/jlp nil
 -                         supervisor/write-log-metadata! nil
 -                         supervisor/create-blobstore-links nil]
 -                (supervisor/launch-worker mock-supervisor
 -                                      mock-storm-id
 -                                      mock-port
 +                (.launchWorker process-proxy mock-supervisor nil
 +                                      "" mock-storm-id mock-port
                                        mock-worker-id
 -                                      (WorkerResources.))
 +                                      (WorkerResources.) nil nil)
                  (. (Mockito/verify utils-spy)
                     (launchProcessImpl (Matchers/eq exp-args)
                                        (Matchers/any)
@@@ -505,10 -495,10 +510,10 @@@
            exp-launch ["/bin/worker-launcher"
                        "me"
                        "worker"
 -                      (str storm-local "/workers/" mock-worker-id)
 +                      (str storm-local Utils/FILE_PATH_SEPARATOR "workers" Utils/FILE_PATH_SEPARATOR mock-worker-id)
                        worker-script]
            exp-script-fn (fn [opts topo-opts]
-                           (str "#!/bin/bash\r\n'export' 'LD_LIBRARY_PATH=';\r\n\r\nexec 'java'"
+                           (str "#!/bin/bash\n'export' 'LD_LIBRARY_PATH=';\n\nexec 'java'"
                                 " '-cp' 'mock-classpath'\"'\"'quote-on-purpose'"
                                 " '-Dlogfile.name=" "worker.log'"
                                 " '-Dstorm.home='"
@@@ -529,9 -519,10 +534,10 @@@
                                 " '-Dworkers.artifacts=" (str storm-local "/workers-artifacts'")
                                 " '-Dstorm.conf.file='"
                                 " '-Dstorm.options='"
 -                               " '-Dstorm.log.dir=" storm-log-dir "'"
 +                               " '-Dstorm.log.dir=" (ConfigUtils/getLogDir) "'"
+                                " '-Djava.io.tmpdir=" (str  storm-local "/workers/" mock-worker-id "/tmp'")
                                 " '-Dlogging.sensitivity=" mock-sensitivity "'"
 -                               " '-Dlog4j.configurationFile=/log4j2/worker.xml'"
 +                               " '-Dlog4j.configurationFile=" (str file-prefix Utils/FILE_PATH_SEPARATOR "log4j2" Utils/FILE_PATH_SEPARATOR "worker.xml'")
                                 " '-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector'"
                                 " '-Dstorm.id=" mock-storm-id "'"
                                 " '-Dworker.id=" mock-worker-id "'"
@@@ -612,22 -598,19 +618,22 @@@
                              (proxy [Utils] []
                                (addToClasspathImpl [classpath paths] mock-cp)
                                (launchProcessImpl [& _] nil))
 -                            Mockito/spy)]
 +                            Mockito/spy)
 +                supervisor-utils (Mockito/mock SupervisorUtils)
 +                process-proxy (proxy [SyncProcessEvent] []
 +                                (jlp [stormRoot conf] "")
 +                                (writeLogMetadata [stormconf user workerId stormId port conf] nil))]
              (with-open [_ (ConfigUtilsInstaller. cu-proxy)
 -                        _ (UtilsInstaller. utils-spy)]
 -              (stubbing [supervisor/java-cmd "java"
 -                         supervisor/jlp nil
 -                         supervisor/write-log-metadata! nil]
 -                (supervisor/launch-worker mock-supervisor
 -                                          mock-storm-id
 +                        _ (UtilsInstaller. utils-spy)
 +                        _ (MockedSupervisorUtils. supervisor-utils)]
 +              (. (Mockito/when (.javaCmdImpl supervisor-utils (Mockito/any))) (thenReturn (str "java")))
 +              (.launchWorker process-proxy mock-supervisor nil
 +                                          "" mock-storm-id
                                            mock-port
                                            mock-worker-id
 -                                          (WorkerResources.))
 +                                          (WorkerResources.) nil nil)
                  (. (Mockito/verify utils-spy)
-                  (launchProcessImpl (Matchers/any)
+                  (launchProcessImpl (Matchers/eq exp-launch)
                                      (Matchers/any)
                                      (Matchers/any)
                                      (Matchers/any)