You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/04/01 16:07:30 UTC

[07/35] storm git commit: Merge branch 'master' into supervisor

Merge branch 'master' into supervisor


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b09b4129
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b09b4129
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b09b4129

Branch: refs/heads/master
Commit: b09b4129d845aff6be285ea1748b842499c40e0b
Parents: 19fcafb 672c895
Author: xiaojian.fxj <xi...@alibaba-inc.com>
Authored: Fri Mar 4 12:14:41 2016 +0800
Committer: xiaojian.fxj <xi...@alibaba-inc.com>
Committed: Fri Mar 4 13:33:09 2016 +0800

----------------------------------------------------------------------
 .gitignore                                      |   1 +
 CHANGELOG.md                                    |   9 ++
 README.markdown                                 |   4 +
 bin/storm.cmd                                   |   2 +-
 bin/storm.py                                    |   4 +-
 .../storm/starter/ThroughputVsLatency.java      |   2 +-
 .../apache/storm/sql/compiler/CompilerUtil.java |   7 +-
 .../apache/storm/sql/compiler/ExprCompiler.java |  32 ++++-
 .../backends/standalone/RelNodeCompiler.java    |   6 +-
 .../apache/storm/sql/parser/StormParser.java    |   5 +
 .../test/org/apache/storm/sql/TestStormSql.java |  64 +++++++++-
 .../storm/sql/compiler/TestCompilerUtils.java   |  62 ++++++++-
 .../storm/sql/compiler/TestExprSemantic.java    |  18 +++
 .../backends/standalone/TestPlanCompiler.java   |  20 +++
 .../backends/trident/TestPlanCompiler.java      |   4 +-
 .../test/org/apache/storm/sql/TestUtils.java    |  32 ++++-
 pom.xml                                         |  23 ++++
 .../apache/storm/command/upload_credentials.clj |  35 -----
 .../src/clj/org/apache/storm/daemon/acker.clj   | 108 ----------------
 .../src/clj/org/apache/storm/daemon/common.clj  |  17 ++-
 .../apache/storm/daemon/local_supervisor.clj    |   2 +-
 storm-core/src/clj/org/apache/storm/testing.clj |   7 +-
 .../storm/blobstore/LocalFsBlobStore.java       |   2 +-
 .../src/jvm/org/apache/storm/command/List.java  |  50 --------
 .../apache/storm/command/ListTopologies.java    |  52 ++++++++
 .../apache/storm/command/UploadCredentials.java |  61 +++++++++
 .../src/jvm/org/apache/storm/daemon/Acker.java  | 128 +++++++++++++++++++
 .../storm/daemon/supervisor/Supervisor.java     |   4 +-
 .../daemon/supervisor/SyncProcessEvent.java     |  11 +-
 .../apache/storm/security/auth/AuthUtils.java   |  40 ++++++
 .../storm/security/auth/kerberos/AutoTGT.java   |  64 ++++------
 .../auth/kerberos/AutoTGTKrb5LoginModule.java   |   8 +-
 .../apache/storm/topology/TopologyBuilder.java  |  13 +-
 .../jvm/org/apache/storm/utils/ConfigUtils.java |   8 +-
 .../src/jvm/org/apache/storm/utils/Utils.java   |  13 +-
 .../security/auth/auto_login_module_test.clj    |  24 +++-
 .../clj/org/apache/storm/supervisor_test.clj    |  52 ++++----
 .../storm/topology/TopologyBuilderTest.java     |  65 ++++++++++
 38 files changed, 734 insertions(+), 325 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/b09b4129/bin/storm.cmd
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/b09b4129/bin/storm.py
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/b09b4129/storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj
index 65cf907,0000000..3dfed6f
mode 100644,000000..100644
--- a/storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj
@@@ -1,61 -1,0 +1,61 @@@
 +;; Licensed to the Apache Software Foundation (ASF) under one
 +;; or more contributor license agreements.  See the NOTICE file
 +;; distributed with this work for additional information
 +;; regarding copyright ownership.  The ASF licenses this file
 +;; to you under the Apache License, Version 2.0 (the
 +;; "License"); you may not use this file except in compliance
 +;; with the License.  You may obtain a copy of the License at
 +;;
 +;; http://www.apache.org/licenses/LICENSE-2.0
 +;;
 +;; Unless required by applicable law or agreed to in writing, software
 +;; distributed under the License is distributed on an "AS IS" BASIS,
 +;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 +;; See the License for the specific language governing permissions and
 +;; limitations under the License.
 +(ns org.apache.storm.daemon.local-supervisor
 +  (:import [org.apache.storm.daemon.supervisor SyncProcessEvent SupervisorData ShutdownWork Supervisor]
 +           [org.apache.storm.utils Utils ConfigUtils]
 +           [org.apache.storm ProcessSimulator])
 +  (:use [org.apache.storm.daemon common]
 +        [org.apache.storm log])
 +  (:require [org.apache.storm.daemon [worker :as worker] ])
 +  (:require [clojure.string :as str])
 +  (:gen-class))
 +
 +(defn launch-local-worker [supervisorData stormId port workerId resources]
 +  (let [conf (.getConf supervisorData)
 +         pid (Utils/uuid)
 +        worker (worker/mk-worker conf
 +                 (.getSharedContext supervisorData)
 +                 stormId
 +                 (.getAssignmentId supervisorData)
 +                 (int port)
 +                 workerId)]
 +    (ConfigUtils/setWorkerUserWSE conf workerId "")
 +    (ProcessSimulator/registerProcess pid worker)
 +    (.put (.getWorkerThreadPidsAtom supervisorData) workerId pid)
 +    ))
 +
 +(defn shutdown-local-worker [supervisorData workerId]
 +  (let [shut-workers (ShutdownWork.)]
 +    (log-message "shutdown-local-worker")
 +    (.shutWorker shut-workers supervisorData workerId)))
 +
 +(defn local-process []
 +  "Create a local process event"
 +  (proxy [SyncProcessEvent] []
-     (launchLocalWorker [supervisorData stormId port workerId resources]
++    (launchWorker [supervisorData stormId port workerId resources]
 +      (launch-local-worker supervisorData stormId port workerId resources))
 +    (shutWorker [supervisorData workerId] (shutdown-local-worker supervisorData workerId))))
 +
 +
 +(defserverfn mk-local-supervisor [conf shared-context isupervisor]
 +  (log-message "Starting local Supervisor with conf " conf)
 +  (if (not (ConfigUtils/isLocalMode conf))
 +    (throw
 +      (IllegalArgumentException. "Cannot start server in distrubuted mode!")))
 +  (let [local-process (local-process)
 +        supervisor-server (Supervisor.)]
 +    (.setLocalSyncProcess supervisor-server local-process)
 +    (.mkSupervisor supervisor-server conf shared-context isupervisor)))

http://git-wip-us.apache.org/repos/asf/storm/blob/b09b4129/storm-core/src/clj/org/apache/storm/testing.clj
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/b09b4129/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java
index 9df7ec1,0000000..2c7810d
mode 100644,000000..100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java
@@@ -1,196 -1,0 +1,196 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + * http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.storm.daemon.supervisor;
 +
 +import com.codahale.metrics.Gauge;
 +import com.codahale.metrics.MetricRegistry;
 +import org.apache.commons.io.FileUtils;
 +import org.apache.storm.Config;
 +import org.apache.storm.StormTimer;
 +import org.apache.storm.daemon.metrics.MetricsUtils;
 +import org.apache.storm.daemon.metrics.reporters.PreparableReporter;
 +import org.apache.storm.daemon.supervisor.timer.RunProfilerActions;
 +import org.apache.storm.daemon.supervisor.timer.SupervisorHealthCheck;
 +import org.apache.storm.daemon.supervisor.timer.SupervisorHeartbeat;
 +import org.apache.storm.daemon.supervisor.timer.UpdateBlobs;
 +import org.apache.storm.event.EventManagerImp;
 +import org.apache.storm.localizer.Localizer;
 +import org.apache.storm.messaging.IContext;
 +import org.apache.storm.scheduler.ISupervisor;
 +import org.apache.storm.utils.ConfigUtils;
 +import org.apache.storm.utils.Utils;
 +import org.apache.storm.utils.VersionInfo;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import java.io.File;
 +import java.io.InterruptedIOException;
 +import java.util.Collection;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Set;
 +
 +public class Supervisor {
 +    private static Logger LOG = LoggerFactory.getLogger(Supervisor.class);
 +
 +    //TODO: to be removed after porting worker.clj. localSyncProcess is intended to start local supervisor
 +    private SyncProcessEvent localSyncProcess;
 +
 +    public void setLocalSyncProcess(SyncProcessEvent localSyncProcess) {
 +        this.localSyncProcess = localSyncProcess;
 +    }
 +
 +
 +    /**
 +     * in local state, supervisor stores who its current assignments are another thread launches events to restart any dead processes if necessary
 +     * 
 +     * @param conf
 +     * @param sharedContext
 +     * @param iSupervisor
 +     * @return
 +     * @throws Exception
 +     */
 +    public SupervisorManger mkSupervisor(final Map conf, IContext sharedContext, ISupervisor iSupervisor) throws Exception {
 +        SupervisorManger supervisorManger = null;
 +        try {
 +            LOG.info("Starting Supervisor with conf {}", conf);
 +            iSupervisor.prepare(conf, ConfigUtils.supervisorIsupervisorDir(conf));
 +            String path = ConfigUtils.supervisorTmpDir(conf);
 +            FileUtils.cleanDirectory(new File(path));
 +
 +            final SupervisorData supervisorData = new SupervisorData(conf, sharedContext, iSupervisor);
 +            Localizer localizer = supervisorData.getLocalizer();
 +
 +            SupervisorHeartbeat hb = new SupervisorHeartbeat(conf, supervisorData);
 +            hb.run();
 +            // should synchronize supervisor so it doesn't launch anything after being down (optimization)
 +            Integer heartbeatFrequency = Utils.getInt(conf.get(Config.SUPERVISOR_HEARTBEAT_FREQUENCY_SECS));
 +            supervisorData.getHeartbeatTimer().scheduleRecurring(0, heartbeatFrequency, hb);
 +
 +            Set<String> downdedStormId = SupervisorUtils.readDownLoadedStormIds(conf);
 +            for (String stormId : downdedStormId) {
 +                SupervisorUtils.addBlobReferences(localizer, stormId, conf);
 +            }
 +            // do this after adding the references so we don't try to clean things being used
 +            localizer.startCleaner();
 +
 +            EventManagerImp syncSupEventManager = new EventManagerImp(false);
 +            EventManagerImp syncProcessManager = new EventManagerImp(false);
 +
 +            SyncProcessEvent syncProcessEvent = null;
 +            if (ConfigUtils.isLocalMode(conf)){
 +                localSyncProcess.init(supervisorData);
 +                syncProcessEvent = localSyncProcess;
 +            }else{
 +                syncProcessEvent = new SyncProcessEvent(supervisorData);
 +            }
 +
 +            SyncSupervisorEvent syncSupervisorEvent = new SyncSupervisorEvent(supervisorData, syncProcessEvent, syncSupEventManager, syncProcessManager);
 +            UpdateBlobs updateBlobsThread = new UpdateBlobs(supervisorData);
 +            RunProfilerActions runProfilerActionThread = new RunProfilerActions(supervisorData);
 +
 +            if ((Boolean) conf.get(Config.SUPERVISOR_ENABLE)) {
 +                StormTimer eventTimer = supervisorData.getEventTimer();
 +                // This isn't strictly necessary, but it doesn't hurt and ensures that the machine stays up
 +                // to date even if callbacks don't all work exactly right
 +                eventTimer.scheduleRecurring(0, 10, new EventManagerPushCallback(syncSupervisorEvent, syncSupEventManager));
 +
 +                eventTimer.scheduleRecurring(0, Utils.getInt(conf.get(Config.SUPERVISOR_MONITOR_FREQUENCY_SECS)),
 +                        new EventManagerPushCallback(syncProcessEvent, syncProcessManager));
 +
 +                // Blob update thread. Starts with 30 seconds delay, every 30 seconds
 +                supervisorData.getBlobUpdateTimer().scheduleRecurring(30, 30, new EventManagerPushCallback(updateBlobsThread, syncSupEventManager));
 +
 +                // supervisor health check
 +                eventTimer.scheduleRecurring(300, 300, new SupervisorHealthCheck(supervisorData));
 +
 +                // Launch a thread that Runs profiler commands . Starts with 30 seconds delay, every 30 seconds
 +                eventTimer.scheduleRecurring(30, 30, new EventManagerPushCallback(runProfilerActionThread, syncSupEventManager));
 +            }
 +            LOG.info("Starting supervisor with id {} at host {}.", supervisorData.getSupervisorId(), supervisorData.getHostName() );
 +            supervisorManger = new SupervisorManger(supervisorData, syncSupEventManager, syncProcessManager);
 +        } catch (Throwable t) {
 +            if (Utils.exceptionCauseIsInstanceOf(InterruptedIOException.class, t)) {
 +                throw t;
 +            } else if (Utils.exceptionCauseIsInstanceOf(InterruptedException.class, t)) {
 +                throw t;
 +            } else {
 +                LOG.error("Error on initialization of server supervisor: {}", t);
 +                Utils.exitProcess(13, "Error on initialization");
 +            }
 +        }
 +        return supervisorManger;
 +    }
 +
 +    /**
 +     * start distribute supervisor
 +     */
-     private void distributeLaunch() {
++    private void launch() {
 +        LOG.info("Starting supervisor for storm version '{}'.", VersionInfo.getVersion());
 +        SupervisorManger supervisorManager;
 +        try {
 +            Map<Object, Object> conf = Utils.readStormConfig();
 +            if (ConfigUtils.isLocalMode(conf)) {
 +                throw new IllegalArgumentException("Cannot start server in local mode!");
 +            }
 +            ISupervisor iSupervisor = new StandaloneSupervisor();
 +            supervisorManager = mkSupervisor(conf, null, iSupervisor);
 +            if (supervisorManager != null)
 +                Utils.addShutdownHookWithForceKillIn1Sec(supervisorManager);
 +            registerWorkerNumGauge("drpc:num-execute-http-requests", conf);
 +            startMetricsReporters(conf);
 +        } catch (Exception e) {
 +            LOG.error("Failed to start supervisor\n", e);
 +            System.exit(1);
 +        }
 +    }
 +
 +    // To be removed
 +    private void registerWorkerNumGauge(String name, final Map conf) {
 +        MetricRegistry metricRegistry = new MetricRegistry();
 +        metricRegistry.remove(name);
 +        metricRegistry.register(name, new Gauge<Integer>() {
 +            @Override
 +            public Integer getValue() {
 +                Collection<String> pids = SupervisorUtils.myWorkerIds(conf);
 +                return pids.size();
 +            }
 +        });
 +    }
 +
 +    // To be removed
 +    private void startMetricsReporters(Map conf) {
 +        List<PreparableReporter> preparableReporters = MetricsUtils.getPreparableReporters(conf);
 +        for (PreparableReporter reporter : preparableReporters) {
 +            reporter.prepare(new MetricRegistry(), conf);
 +            reporter.start();
 +        }
 +        LOG.info("Started statistics report plugin...");
 +    }
 +
 +    /**
 +     * supervisor daemon enter entrance
 +     *
 +     * @param args
 +     */
 +    public static void main(String[] args) {
 +        Utils.setupDefaultUncaughtExceptionHandler();
 +        Supervisor instance = new Supervisor();
-         instance.distributeLaunch();
++        instance.launch();
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/storm/blob/b09b4129/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java
index 4ef6d1c,0000000..4e0b8a0
mode 100644,000000..100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java
@@@ -1,666 -1,0 +1,665 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + * http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.storm.daemon.supervisor;
 +
 +import org.apache.commons.io.FileUtils;
 +import org.apache.commons.lang.StringUtils;
 +import org.apache.storm.Config;
 +import org.apache.storm.container.cgroup.CgroupManager;
 +import org.apache.storm.generated.ExecutorInfo;
 +import org.apache.storm.generated.LSWorkerHeartbeat;
 +import org.apache.storm.generated.LocalAssignment;
 +import org.apache.storm.generated.WorkerResources;
 +import org.apache.storm.utils.ConfigUtils;
 +import org.apache.storm.utils.LocalState;
 +import org.apache.storm.utils.Time;
 +import org.apache.storm.utils.Utils;
 +import org.eclipse.jetty.util.ConcurrentHashSet;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +import org.yaml.snakeyaml.Yaml;
 +
 +import java.io.File;
 +import java.io.FileWriter;
 +import java.io.IOException;
 +import java.util.*;
 +
 +/**
 + * 1. to kill are those in allocated that are dead or disallowed 2. kill the ones that should be dead - read pids, kill -9 and individually remove file - rmr
 + * heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log) 3. of the rest, figure out what assignments aren't yet satisfied 4. generate new worker
 + * ids, write new "approved workers" to LS 5. create local dir for worker id 5. launch new workers (give worker-id, port, and supervisor-id) 6. wait for workers
 + * launch
 + */
 +public class SyncProcessEvent extends ShutdownWork implements Runnable {
 +
 +    private static Logger LOG = LoggerFactory.getLogger(SyncProcessEvent.class);
 +
 +    private  LocalState localState;
 +
 +    private SupervisorData supervisorData;
 +
 +    private class ProcessExitCallback implements Utils.ExitCodeCallable {
 +        private final String logPrefix;
 +        private final String workerId;
 +
 +        public ProcessExitCallback(String logPrefix, String workerId) {
 +            this.logPrefix = logPrefix;
 +            this.workerId = workerId;
 +        }
 +
 +        @Override
 +        public Object call() throws Exception {
 +            return null;
 +        }
 +
 +        @Override
 +        public Object call(int exitCode) {
 +            LOG.info("{} exited with code: {}", logPrefix, exitCode);
 +            supervisorData.getDeadWorkers().add(workerId);
 +            return null;
 +        }
 +    }
 +
 +    public SyncProcessEvent(){
 +
 +    }
- 
 +    public SyncProcessEvent(SupervisorData supervisorData) {
 +        init(supervisorData);
 +    }
 +
 +    //TODO: initData is intended to local supervisor, so we will remove them after porting worker.clj to java
 +    public void init(SupervisorData supervisorData){
 +        this.supervisorData = supervisorData;
 +        this.localState = supervisorData.getLocalState();
 +    }
 +
 +
 +    /**
 +     * 1. to kill are those in allocated that are dead or disallowed 2. kill the ones that should be dead - read pids, kill -9 and individually remove file -
 +     * rmr heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log) 3. of the rest, figure out what assignments aren't yet satisfied 4. generate new
 +     * worker ids, write new "approved workers" to LS 5. create local dir for worker id 5. launch new workers (give worker-id, port, and supervisor-id) 6. wait
 +     * for workers launch
 +     */
 +    @Override
 +    public void run() {
 +        LOG.debug("Syncing processes");
 +        try {
 +            Map conf = supervisorData.getConf();
 +            Map<Integer, LocalAssignment> assignedExecutors = localState.getLocalAssignmentsMap();
 +
 +            if (assignedExecutors == null) {
 +                assignedExecutors = new HashMap<>();
 +            }
 +            int now = Time.currentTimeSecs();
 +
 +            Map<String, StateHeartbeat> localWorkerStats = getLocalWorkerStats(supervisorData, assignedExecutors, now);
 +
 +            Set<String> keeperWorkerIds = new HashSet<>();
 +            Set<Integer> keepPorts = new HashSet<>();
 +            for (Map.Entry<String, StateHeartbeat> entry : localWorkerStats.entrySet()) {
 +                StateHeartbeat stateHeartbeat = entry.getValue();
 +                if (stateHeartbeat.getState() == State.valid) {
 +                    keeperWorkerIds.add(entry.getKey());
 +                    keepPorts.add(stateHeartbeat.getHeartbeat().get_port());
 +                }
 +            }
 +            Map<Integer, LocalAssignment> reassignExecutors = getReassignExecutors(assignedExecutors, keepPorts);
 +            Map<Integer, String> newWorkerIds = new HashMap<>();
 +            for (Integer port : reassignExecutors.keySet()) {
 +                newWorkerIds.put(port, Utils.uuid());
 +            }
 +            LOG.debug("Syncing processes");
 +            LOG.debug("Assigned executors: {}", assignedExecutors);
 +            LOG.debug("Allocated: {}", localWorkerStats);
 +
 +            for (Map.Entry<String, StateHeartbeat> entry : localWorkerStats.entrySet()) {
 +                StateHeartbeat stateHeartbeat = entry.getValue();
 +                if (stateHeartbeat.getState() != State.valid) {
 +                    LOG.info("Shutting down and clearing state for id {}, Current supervisor time: {}, State: {}, Heartbeat: {}", entry.getKey(), now,
 +                            stateHeartbeat.getState(), stateHeartbeat.getHeartbeat());
 +                    shutWorker(supervisorData, entry.getKey());
 +                }
 +            }
 +            // start new workers
 +            Map<String, Integer> newWorkerPortToIds = startNewWorkers(newWorkerIds, reassignExecutors);
 +
 +            Map<String, Integer> allWorkerPortToIds = new HashMap<>();
 +            Map<String, Integer> approvedWorkers = localState.getApprovedWorkers();
 +            for (String keeper : keeperWorkerIds) {
 +                allWorkerPortToIds.put(keeper, approvedWorkers.get(keeper));
 +            }
 +            allWorkerPortToIds.putAll(newWorkerPortToIds);
 +            localState.setApprovedWorkers(allWorkerPortToIds);
 +            waitForWorkersLaunch(conf, newWorkerPortToIds.keySet());
 +
 +        } catch (Exception e) {
 +            LOG.error("Failed Sync Process", e);
 +            throw Utils.wrapInRuntime(e);
 +        }
 +
 +    }
 +
 +    protected void waitForWorkersLaunch(Map conf, Set<String> workerIds) throws Exception {
 +        int startTime = Time.currentTimeSecs();
 +        int timeOut = (int) conf.get(Config.NIMBUS_SUPERVISOR_TIMEOUT_SECS);
 +        for (String workerId : workerIds) {
 +            LocalState localState = ConfigUtils.workerState(conf, workerId);
 +            while (true) {
 +                LSWorkerHeartbeat hb = localState.getWorkerHeartBeat();
 +                if (hb != null || (Time.currentTimeSecs() - startTime) > timeOut)
 +                    break;
 +                LOG.info("{} still hasn't started", workerId);
 +                Time.sleep(500);
 +            }
 +            if (localState.getWorkerHeartBeat() == null) {
 +                LOG.info("Worker {} failed to start", workerId);
 +            }
 +        }
 +    }
 +
 +    protected Map<Integer, LocalAssignment> getReassignExecutors(Map<Integer, LocalAssignment> assignExecutors, Set<Integer> keepPorts) {
 +        Map<Integer, LocalAssignment> reassignExecutors = new HashMap<>();
 +        reassignExecutors.putAll(assignExecutors);
 +        for (Integer port : keepPorts) {
 +            reassignExecutors.remove(port);
 +        }
 +        return reassignExecutors;
 +    }
 +
 +
 +
 +    /**
 +     * Returns map from worker id to worker heartbeat. if the heartbeat is nil, then the worker is dead
 +     * 
 +     * @param assignedExecutors
 +     * @return
 +     * @throws Exception
 +     */
 +    public Map<String, StateHeartbeat> getLocalWorkerStats(SupervisorData supervisorData, Map<Integer, LocalAssignment> assignedExecutors, int now) throws Exception {
 +        Map<String, StateHeartbeat> workerIdHbstate = new HashMap<>();
 +        Map conf = supervisorData.getConf();
 +        LocalState localState = supervisorData.getLocalState();
 +        Map<String, LSWorkerHeartbeat> idToHeartbeat = SupervisorUtils.readWorkerHeartbeats(conf);
 +        Map<String, Integer> approvedWorkers = localState.getApprovedWorkers();
 +        Set<String> approvedIds = new HashSet<>();
 +        if (approvedWorkers != null) {
 +            approvedIds.addAll(approvedWorkers.keySet());
 +        }
 +        for (Map.Entry<String, LSWorkerHeartbeat> entry : idToHeartbeat.entrySet()) {
 +            String workerId = entry.getKey();
 +            LSWorkerHeartbeat whb = entry.getValue();
 +            State state;
 +            if (whb == null) {
 +                state = State.notStarted;
 +            } else if (!approvedIds.contains(workerId) || !matchesAssignment(whb, assignedExecutors)) {
 +                state = State.disallowed;
 +            } else if (supervisorData.getDeadWorkers().contains(workerId)) {
 +                LOG.info("Worker Process {}as died", workerId);
 +                state = State.timedOut;
 +            } else if (SupervisorUtils.isWorkerHbTimedOut(now, whb, conf)) {
 +                state = State.timedOut;
 +            } else {
 +                state = State.valid;
 +            }
 +            LOG.debug("Worker:{} state:{} WorkerHeartbeat:{} at supervisor time-secs {}", workerId, state, whb, now);
 +            workerIdHbstate.put(workerId, new StateHeartbeat(state, whb));
 +        }
 +        return workerIdHbstate;
 +    }
 +
 +    protected boolean matchesAssignment(LSWorkerHeartbeat whb, Map<Integer, LocalAssignment> assignedExecutors) {
 +        LocalAssignment localAssignment = assignedExecutors.get(whb.get_port());
 +        if (localAssignment == null || !localAssignment.get_topology_id().equals(whb.get_topology_id())) {
 +            return false;
 +        }
 +        List<ExecutorInfo> executorInfos = new ArrayList<>();
 +        executorInfos.addAll(whb.get_executors());
 +        // remove SYSTEM_EXECUTOR_ID
 +        executorInfos.remove(new ExecutorInfo(-1, -1));
 +        List<ExecutorInfo> localExecuorInfos = localAssignment.get_executors();
 +
 +        if (localExecuorInfos.size() != executorInfos.size())
 +            return false;
 +
 +        for (ExecutorInfo executorInfo : localExecuorInfos){
 +            if (!localExecuorInfos.contains(executorInfo))
 +                return false;
 +        }
 +        return true;
 +    }
 +
 +    /**
-      * launch a worker in local mode. But it may exist question???
++     * launch a worker in local mode.
 +     */
-     protected void launchLocalWorker(SupervisorData supervisorData, String stormId, Long port, String workerId, WorkerResources resources) throws IOException {
++    protected void launchWorker(SupervisorData supervisorData, String stormId, Long port, String workerId, WorkerResources resources) throws IOException {
 +        // port this function after porting worker to java
 +    }
 +
 +    protected String getWorkerClassPath(String stormJar, Map stormConf) {
 +        List<String> topoClasspath = new ArrayList<>();
 +        Object object = stormConf.get(Config.TOPOLOGY_CLASSPATH);
 +
 +        if (object instanceof List) {
 +            topoClasspath.addAll((List<String>) object);
 +        } else if (object instanceof String){
 +            topoClasspath.add((String)object);
 +        }else {
 +            //ignore
 +        }
 +        String classPath = Utils.workerClasspath();
 +        String classAddPath = Utils.addToClasspath(classPath, Arrays.asList(stormJar));
 +        return Utils.addToClasspath(classAddPath, topoClasspath);
 +    }
 +
 +    /**
 +     * "Generates runtime childopts by replacing keys with topology-id, worker-id, port, mem-onheap"
 +     * 
 +     * @param value
 +     * @param workerId
 +     * @param stormId
 +     * @param port
 +     * @param memOnheap
 +     */
 +    public List<String> substituteChildopts(Object value, String workerId, String stormId, Long port, int memOnheap) {
 +        List<String> rets = new ArrayList<>();
 +        if (value instanceof String) {
 +            String string = (String) value;
 +            string = string.replace("%ID%", String.valueOf(port));
 +            string = string.replace("%WORKER-ID%", workerId);
 +            string = string.replace("%TOPOLOGY-ID%", stormId);
 +            string = string.replace("%WORKER-PORT%", String.valueOf(port));
 +            string = string.replace("%HEAP-MEM%", String.valueOf(memOnheap));
 +            String[] strings = string.split("\\s+");
 +            rets.addAll(Arrays.asList(strings));
 +        } else if (value instanceof List) {
 +            List<Object> objects = (List<Object>) value;
 +            for (Object object : objects) {
 +                String str = (String)object;
 +                str = str.replace("%ID%", String.valueOf(port));
 +                str = str.replace("%WORKER-ID%", workerId);
 +                str = str.replace("%TOPOLOGY-ID%", stormId);
 +                str = str.replace("%WORKER-PORT%", String.valueOf(port));
 +                str = str.replace("%HEAP-MEM%", String.valueOf(memOnheap));
 +                rets.add(str);
 +            }
 +        }
 +        return rets;
 +    }
 +
 +
 +
 +    /**
 +     * launch a worker in distributed mode
 +     * supervisorId for testing
 +     * @throws IOException
 +     */
-     protected void launchDistributeWorker(Map conf, String supervisorId, String assignmentId, String stormId, Long port, String workerId,
++    protected void launchWorker(Map conf, String supervisorId, String assignmentId, String stormId, Long port, String workerId,
 +            WorkerResources resources, CgroupManager cgroupManager, ConcurrentHashSet deadWorkers) throws IOException {
 +
 +        Boolean runWorkerAsUser = Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false);
 +        String stormHome = ConfigUtils.concatIfNotNull(System.getProperty("storm.home"));
 +        String stormOptions = ConfigUtils.concatIfNotNull(System.getProperty("storm.options"));
 +        String stormConfFile = ConfigUtils.concatIfNotNull(System.getProperty("storm.conf.file"));
 +        String stormLogDir = ConfigUtils.getLogDir();
 +        String stormLogConfDir = (String) (conf.get(Config.STORM_LOG4J2_CONF_DIR));
 +
 +        String stormLog4j2ConfDir;
 +        if (StringUtils.isNotBlank(stormLogConfDir)) {
 +            if (Utils.isAbsolutePath(stormLogConfDir)) {
 +                stormLog4j2ConfDir = stormLogConfDir;
 +            } else {
 +                stormLog4j2ConfDir = stormHome + Utils.FILE_PATH_SEPARATOR + stormLogConfDir;
 +            }
 +        } else {
 +            stormLog4j2ConfDir = stormHome + Utils.FILE_PATH_SEPARATOR + "log4j2";
 +        }
 +
 +        String stormRoot = ConfigUtils.supervisorStormDistRoot(conf, stormId);
 +
 +        String jlp = jlp(stormRoot, conf);
 +
 +        String stormJar = ConfigUtils.supervisorStormJarPath(stormRoot);
 +
 +        Map stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId);
 +
 +        String workerClassPath = getWorkerClassPath(stormJar, stormConf);
 +
 +        Object topGcOptsObject = stormConf.get(Config.TOPOLOGY_WORKER_GC_CHILDOPTS);
 +        List<String> topGcOpts = new ArrayList<>();
 +        if (topGcOptsObject instanceof String) {
 +            topGcOpts.add((String) topGcOptsObject);
 +        } else if (topGcOptsObject instanceof List) {
 +            topGcOpts.addAll((List<String>) topGcOptsObject);
 +        }
 +
 +        int memOnheap = 0;
 +        if (resources.get_mem_on_heap() > 0) {
 +            memOnheap = (int) Math.ceil(resources.get_mem_on_heap());
 +        } else {
 +            //set the default heap memory size for supervisor-test
 +            memOnheap = Utils.getInt(stormConf.get(Config.WORKER_HEAP_MEMORY_MB), 768);
 +        }
 +
 +        int memoffheap = (int) Math.ceil(resources.get_mem_off_heap());
 +
 +        int cpu = (int) Math.ceil(resources.get_cpu());
 +
 +        List<String> gcOpts = null;
 +
 +        if (topGcOpts != null) {
 +            gcOpts = substituteChildopts(topGcOpts, workerId, stormId, port, memOnheap);
 +        } else {
 +            gcOpts = substituteChildopts(conf.get(Config.WORKER_GC_CHILDOPTS), workerId, stormId, port, memOnheap);
 +        }
 +
 +        Object topoWorkerLogwriterObject = stormConf.get(Config.TOPOLOGY_WORKER_LOGWRITER_CHILDOPTS);
 +        List<String> topoWorkerLogwriterChildopts = new ArrayList<>();
 +        if (topoWorkerLogwriterObject instanceof String) {
 +            topoWorkerLogwriterChildopts.add((String) topoWorkerLogwriterObject);
 +        } else if (topoWorkerLogwriterObject instanceof List) {
 +            topoWorkerLogwriterChildopts.addAll((List<String>) topoWorkerLogwriterObject);
 +        }
 +
 +        String user = (String) stormConf.get(Config.TOPOLOGY_SUBMITTER_USER);
 +
 +        String logfileName = "worker.log";
 +
 +        String workersArtifacets = ConfigUtils.workerArtifactsRoot(conf);
 +
 +        String loggingSensitivity = (String) stormConf.get(Config.TOPOLOGY_LOGGING_SENSITIVITY);
 +        if (loggingSensitivity == null) {
 +            loggingSensitivity = "S3";
 +        }
 +
 +        List<String> workerChildopts = substituteChildopts(conf.get(Config.WORKER_CHILDOPTS), workerId, stormId, port, memOnheap);
 +
 +        List<String> topWorkerChildopts = substituteChildopts(stormConf.get(Config.TOPOLOGY_WORKER_CHILDOPTS), workerId, stormId, port, memOnheap);
 +
 +        List<String> workerProfilerChildopts = null;
 +        if (Utils.getBoolean(conf.get(Config.WORKER_PROFILER_ENABLED), false)) {
 +            workerProfilerChildopts = substituteChildopts(conf.get(Config.WORKER_PROFILER_CHILDOPTS), workerId, stormId, port, memOnheap);
 +        }else {
 +            workerProfilerChildopts = new ArrayList<>();
 +        }
 +
 +        Map<String, String> topEnvironment = new HashMap<String, String>();
 +        Map<String, String> environment = (Map<String, String>) stormConf.get(Config.TOPOLOGY_ENVIRONMENT);
 +        if (environment != null) {
 +            topEnvironment.putAll(environment);
 +        }
 +        topEnvironment.put("LD_LIBRARY_PATH", jlp);
 +
 +        String log4jConfigurationFile = null;
 +        if (System.getProperty("os.name").startsWith("Windows") && !stormLog4j2ConfDir.startsWith("file:")) {
 +            log4jConfigurationFile = "file:///" + stormLog4j2ConfDir;
 +        } else {
 +            log4jConfigurationFile = stormLog4j2ConfDir;
 +        }
 +        log4jConfigurationFile = log4jConfigurationFile + Utils.FILE_PATH_SEPARATOR + "worker.xml";
 +
 +        List<String> commandList = new ArrayList<>();
 +        commandList.add(SupervisorUtils.javaCmd("java"));
 +        commandList.add("-cp");
 +        commandList.add(workerClassPath);
 +        commandList.addAll(topoWorkerLogwriterChildopts);
 +        commandList.add("-Dlogfile.name=" + logfileName);
 +        commandList.add("-Dstorm.home=" + stormHome);
 +        commandList.add("-Dworkers.artifacts=" + workersArtifacets);
 +        commandList.add("-Dstorm.id=" + stormId);
 +        commandList.add("-Dworker.id=" + workerId);
 +        commandList.add("-Dworker.port=" + port);
 +        commandList.add("-Dstorm.log.dir=" + stormLogDir);
 +        commandList.add("-Dlog4j.configurationFile=" + log4jConfigurationFile);
 +        commandList.add("-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector");
 +        commandList.add("org.apache.storm.LogWriter");
 +
 +        commandList.add(SupervisorUtils.javaCmd("java"));
 +        commandList.add("-server");
 +        commandList.addAll(workerChildopts);
 +        commandList.addAll(topWorkerChildopts);
 +        commandList.addAll(gcOpts);
 +        commandList.addAll(workerProfilerChildopts);
 +        commandList.add("-Djava.library.path=" + jlp);
 +        commandList.add("-Dlogfile.name=" + logfileName);
 +        commandList.add("-Dstorm.home=" + stormHome);
 +        commandList.add("-Dworkers.artifacts=" + workersArtifacets);
 +        commandList.add("-Dstorm.conf.file=" + stormConfFile);
 +        commandList.add("-Dstorm.options=" + stormOptions);
 +        commandList.add("-Dstorm.log.dir=" + stormLogDir);
 +        commandList.add("-Dlogging.sensitivity=" + loggingSensitivity);
 +        commandList.add("-Dlog4j.configurationFile=" + log4jConfigurationFile);
 +        commandList.add("-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector");
 +        commandList.add("-Dstorm.id=" + stormId);
 +        commandList.add("-Dworker.id=" + workerId);
 +        commandList.add("-Dworker.port=" + port);
 +        commandList.add("-cp");
 +        commandList.add(workerClassPath);
 +        commandList.add("org.apache.storm.daemon.worker");
 +        commandList.add(stormId);
 +        commandList.add(assignmentId);
 +        commandList.add(String.valueOf(port));
 +        commandList.add(workerId);
 +
 +        // {"cpu" cpu "memory" (+ mem-onheap mem-offheap (int (Math/ceil (conf STORM-CGROUP-MEMORY-LIMIT-TOLERANCE-MARGIN-MB))))
 +        if (Utils.getBoolean(conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN_ENABLE), false)) {
 +            int cgRoupMem = (int) (Math.ceil((double) conf.get(Config.STORM_CGROUP_MEMORY_LIMIT_TOLERANCE_MARGIN_MB)));
 +            int memoryValue = memoffheap + memOnheap + cgRoupMem;
 +            int cpuValue = cpu;
 +            Map<String, Number> map = new HashMap<>();
 +            map.put("cpu", cpuValue);
 +            map.put("memory", memoryValue);
 +            cgroupManager.reserveResourcesForWorker(workerId, map);
 +            commandList = cgroupManager.getLaunchCommand(workerId, commandList);
 +        }
 +
 +        LOG.info("Launching worker with command: {}. ", Utils.shellCmd(commandList));
 +        writeLogMetadata(stormConf, user, workerId, stormId, port, conf);
 +        ConfigUtils.setWorkerUserWSE(conf, workerId, user);
 +        createArtifactsLink(conf, stormId, port, workerId);
 +
 +        String logPrefix = "Worker Process " + workerId;
 +        String workerDir = ConfigUtils.workerRoot(conf, workerId);
 +
 +        if (deadWorkers != null)
 +            deadWorkers.remove(workerId);
 +        createBlobstoreLinks(conf, stormId, workerId);
 +
 +        ProcessExitCallback processExitCallback = new ProcessExitCallback(logPrefix, workerId);
 +        if (runWorkerAsUser) {
 +            List<String> args = new ArrayList<>();
 +            args.add("worker");
 +            args.add(workerDir);
 +            args.add(Utils.writeScript(workerDir, commandList, topEnvironment));
 +            SupervisorUtils.workerLauncher(conf, user, args, null, logPrefix, processExitCallback, new File(workerDir));
 +        } else {
 +            Utils.launchProcess(commandList, topEnvironment, logPrefix, processExitCallback, new File(workerDir));
 +        }
 +    }
 +
 +    protected String jlp(String stormRoot, Map conf) {
 +        String resourceRoot = stormRoot + Utils.FILE_PATH_SEPARATOR + ConfigUtils.RESOURCES_SUBDIR;
 +        String os = System.getProperty("os.name").replaceAll("\\s+", "_");
 +        String arch = System.getProperty("os.arch");
 +        String archResourceRoot = resourceRoot + Utils.FILE_PATH_SEPARATOR + os + "-" + arch;
 +        String ret = archResourceRoot + Utils.FILE_PATH_SEPARATOR + resourceRoot + Utils.FILE_PATH_SEPARATOR + conf.get(Config.JAVA_LIBRARY_PATH);
 +        return ret;
 +    }
 +
 +    protected Map<String, Integer> startNewWorkers(Map<Integer, String> newWorkerIds, Map<Integer, LocalAssignment> reassignExecutors) throws IOException {
 +
 +        Map<String, Integer> newValidWorkerIds = new HashMap<>();
 +        Map conf = supervisorData.getConf();
 +        String supervisorId = supervisorData.getSupervisorId();
 +        String clusterMode = ConfigUtils.clusterMode(conf);
 +
 +        for (Map.Entry<Integer, LocalAssignment> entry : reassignExecutors.entrySet()) {
 +            Integer port = entry.getKey();
 +            LocalAssignment assignment = entry.getValue();
 +            String workerId = newWorkerIds.get(port);
 +            String stormId = assignment.get_topology_id();
 +            WorkerResources resources = assignment.get_resources();
 +
 +            // This condition checks for required files exist before launching the worker
 +            if (SupervisorUtils.checkTopoFilesExist(conf, stormId)) {
 +                String pidsPath = ConfigUtils.workerPidsRoot(conf, workerId);
 +                String hbPath = ConfigUtils.workerHeartbeatsRoot(conf, workerId);
 +
 +                LOG.info("Launching worker with assignment {} for this supervisor {} on port {} with id {}", assignment, supervisorData.getSupervisorId(), port,
 +                        workerId);
 +
 +                FileUtils.forceMkdir(new File(pidsPath));
 +                FileUtils.forceMkdir(new File(hbPath));
 +
 +                if (clusterMode.endsWith("distributed")) {
-                     launchDistributeWorker(conf, supervisorId, supervisorData.getAssignmentId(), stormId, port.longValue(), workerId, resources,
++                    launchWorker(conf, supervisorId, supervisorData.getAssignmentId(), stormId, port.longValue(), workerId, resources,
 +                            supervisorData.getResourceIsolationManager(), supervisorData.getDeadWorkers());
 +                } else if (clusterMode.endsWith("local")) {
-                     launchLocalWorker(supervisorData, stormId, port.longValue(), workerId, resources);
++                    launchWorker(supervisorData, stormId, port.longValue(), workerId, resources);
 +                }
 +                newValidWorkerIds.put(workerId, port);
 +
 +            } else {
 +                LOG.info("Missing topology storm code, so can't launch worker with assignment {} for this supervisor {} on port {} with id {}", assignment,
 +                        supervisorData.getSupervisorId(), port, workerId);
 +            }
 +
 +        }
 +        return newValidWorkerIds;
 +    }
 +
 +    public void writeLogMetadata(Map stormconf, String user, String workerId, String stormId, Long port, Map conf) throws IOException {
 +        Map data = new HashMap();
 +        data.put(Config.TOPOLOGY_SUBMITTER_USER, user);
 +        data.put("worker-id", workerId);
 +
 +        Set<String> logsGroups = new HashSet<>();
 +        //for supervisor-test
 +        if (stormconf.get(Config.LOGS_GROUPS) != null) {
 +            List<String> groups = (List<String>) stormconf.get(Config.LOGS_GROUPS);
 +            for (String group : groups){
 +                logsGroups.add(group);
 +            }
 +        }
 +        if (stormconf.get(Config.TOPOLOGY_GROUPS) != null) {
 +            List<String> topGroups = (List<String>) stormconf.get(Config.TOPOLOGY_GROUPS);
 +            for (String group : topGroups){
 +                logsGroups.add(group);
 +            }
 +        }
 +        data.put(Config.LOGS_GROUPS, logsGroups.toArray());
 +
 +        Set<String> logsUsers = new HashSet<>();
 +        if (stormconf.get(Config.LOGS_USERS) != null) {
 +            List<String> logUsers = (List<String>) stormconf.get(Config.LOGS_USERS);
 +            for (String logUser : logUsers){
 +                logsUsers.add(logUser);
 +            }
 +        }
 +        if (stormconf.get(Config.TOPOLOGY_USERS) != null) {
 +            List<String> topUsers = (List<String>) stormconf.get(Config.TOPOLOGY_USERS);
 +            for (String logUser : topUsers){
 +                logsUsers.add(logUser);
 +            }
 +        }
 +        data.put(Config.LOGS_USERS, logsUsers.toArray());
 +        writeLogMetadataToYamlFile(stormId, port, data, conf);
 +    }
 +
 +    /**
 +     * run worker as user needs the directory to have special permissions or it is insecure
 +     * 
 +     * @param stormId
 +     * @param port
 +     * @param data
 +     * @param conf
 +     * @throws IOException
 +     */
 +    public void writeLogMetadataToYamlFile(String stormId, Long port, Map data, Map conf) throws IOException {
 +        File file = ConfigUtils.getLogMetaDataFile(conf, stormId, port.intValue());
 +
 +        if (!Utils.checkFileExists(file.getParent())) {
 +            if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
 +                FileUtils.forceMkdir(file.getParentFile());
 +                SupervisorUtils.setupStormCodeDir(conf, ConfigUtils.readSupervisorStormConf(conf, stormId), file.getParentFile().getCanonicalPath());
 +            } else {
 +                file.getParentFile().mkdirs();
 +            }
 +        }
 +        FileWriter writer = new FileWriter(file);
 +        Yaml yaml = new Yaml();
 +        try {
 +            yaml.dump(data, writer);
 +        }finally {
 +            writer.close();
 +        }
 +
 +    }
 +
 +    /**
 +     * Create a symlink from workder directory to its port artifacts directory
 +     * 
 +     * @param conf
 +     * @param stormId
 +     * @param port
 +     * @param workerId
 +     */
 +    protected void createArtifactsLink(Map conf, String stormId, Long port, String workerId) throws IOException {
 +        String workerDir = ConfigUtils.workerRoot(conf, workerId);
 +        String topoDir = ConfigUtils.workerArtifactsRoot(conf, stormId);
 +        if (Utils.checkFileExists(workerDir)) {
 +            Utils.createSymlink(workerDir, topoDir, "artifacts", String.valueOf(port));
 +        }
 +    }
 +
 +    /**
 +     * Create symlinks in worker launch directory for all blobs
 +     * 
 +     * @param conf
 +     * @param stormId
 +     * @param workerId
 +     * @throws IOException
 +     */
 +    protected void createBlobstoreLinks(Map conf, String stormId, String workerId) throws IOException {
 +        String stormRoot = ConfigUtils.supervisorStormDistRoot(conf, stormId);
 +        Map stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId);
 +        String workerRoot = ConfigUtils.workerRoot(conf, workerId);
 +        Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String, Object>>) stormConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
 +        List<String> blobFileNames = new ArrayList<>();
 +        if (blobstoreMap != null) {
 +            for (Map.Entry<String, Map<String, Object>> entry : blobstoreMap.entrySet()) {
 +                String key = entry.getKey();
 +                Map<String, Object> blobInfo = entry.getValue();
 +                String ret = null;
 +                if (blobInfo != null && blobInfo.containsKey("localname")) {
 +                    ret = (String) blobInfo.get("localname");
 +                } else {
 +                    ret = key;
 +                }
 +                blobFileNames.add(ret);
 +            }
 +        }
 +        List<String> resourceFileNames = new ArrayList<>();
 +        resourceFileNames.add(ConfigUtils.RESOURCES_SUBDIR);
 +        resourceFileNames.addAll(blobFileNames);
 +        LOG.info("Creating symlinks for worker-id: {} storm-id: {} for files({}): {}", workerId, stormId, resourceFileNames.size(), resourceFileNames);
 +        Utils.createSymlink(workerRoot, stormRoot, ConfigUtils.RESOURCES_SUBDIR);
 +        for (String fileName : blobFileNames) {
 +            Utils.createSymlink(workerRoot, stormRoot, fileName, fileName);
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/storm/blob/b09b4129/storm-core/src/jvm/org/apache/storm/utils/Utils.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/b09b4129/storm-core/test/clj/org/apache/storm/supervisor_test.clj
----------------------------------------------------------------------
diff --cc storm-core/test/clj/org/apache/storm/supervisor_test.clj
index b367fce,415a56d..9b5f1e0
--- a/storm-core/test/clj/org/apache/storm/supervisor_test.clj
+++ b/storm-core/test/clj/org/apache/storm/supervisor_test.clj
@@@ -297,53 -294,46 +297,52 @@@
  
  (deftest test-worker-launch-command
    (testing "*.worker.childopts configuration"
 -    (let [mock-port "42"
 +    (let [mock-port 42
            mock-storm-id "fake-storm-id"
            mock-worker-id "fake-worker-id"
 -          storm-log-dir (ConfigUtils/getLogDir)
            mock-cp (str Utils/FILE_PATH_SEPARATOR "base" Utils/CLASS_PATH_SEPARATOR Utils/FILE_PATH_SEPARATOR "stormjar.jar")
            mock-sensitivity "S3"
--          mock-cp "/base:/stormjar.jar"
            exp-args-fn (fn [opts topo-opts classpath]
 -                       (concat [(supervisor/java-cmd) "-cp" classpath
 -                               (str "-Dlogfile.name=" "worker.log")
 -                               "-Dstorm.home="
 -                               (str "-Dworkers.artifacts=" "/tmp/workers-artifacts")
 -                               (str "-Dstorm.id=" mock-storm-id)
 -                               (str "-Dworker.id=" mock-worker-id)
 -                               (str "-Dworker.port=" mock-port)
 -                               (str "-Dstorm.log.dir=" storm-log-dir)
 -                               "-Dlog4j.configurationFile=/log4j2/worker.xml"
 -                               "-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector"
 -                               "org.apache.storm.LogWriter"]
 -                               [(supervisor/java-cmd) "-server"]
 -                               opts
 -                               topo-opts
 -                               ["-Djava.library.path="
 -                                (str "-Dlogfile.name=" "worker.log")
 -                                "-Dstorm.home="
 -                                "-Dworkers.artifacts=/tmp/workers-artifacts"
 -                                "-Dstorm.conf.file="
 -                                "-Dstorm.options="
 -                                (str "-Dstorm.log.dir=" storm-log-dir)
 -                                (str "-Dlogging.sensitivity=" mock-sensitivity)
 -                                (str "-Dlog4j.configurationFile=" Utils/FILE_PATH_SEPARATOR "log4j2" Utils/FILE_PATH_SEPARATOR "worker.xml")
 -                                "-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector"
 -                                (str "-Dstorm.id=" mock-storm-id)
 -                                (str "-Dworker.id=" mock-worker-id)
 -                                (str "-Dworker.port=" mock-port)
 -                                "-cp" classpath
 -                                "org.apache.storm.daemon.worker"
 -                                mock-storm-id
 -                                mock-port
 -                                mock-worker-id]))]
 +                        (let [file-prefix (let [os (System/getProperty "os.name")]
 +                                            (if (.startsWith os "Windows") (str "file:///")
 +                                                    (str "")))
 +                              sequences (concat [(SupervisorUtils/javaCmd "java") "-cp" classpath
 +                                                (str "-Dlogfile.name=" "worker.log")
 +                                                "-Dstorm.home="
 +                                                (str "-Dworkers.artifacts=" "/tmp/workers-artifacts")
 +                                                (str "-Dstorm.id=" mock-storm-id)
 +                                                (str "-Dworker.id=" mock-worker-id)
 +                                                (str "-Dworker.port=" mock-port)
 +                                                (str "-Dstorm.log.dir=" (ConfigUtils/getLogDir))
 +                                                (str "-Dlog4j.configurationFile=" file-prefix Utils/FILE_PATH_SEPARATOR "log4j2" Utils/FILE_PATH_SEPARATOR "worker.xml")
 +                                                 "-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector"
 +                                                "org.apache.storm.LogWriter"]
 +                                         [(SupervisorUtils/javaCmd "java") "-server"]
 +                                         opts
 +                                         topo-opts
 +                                         ["-Djava.library.path="
 +                                          (str "-Dlogfile.name=" "worker.log")
 +                                          "-Dstorm.home="
 +                                          "-Dworkers.artifacts=/tmp/workers-artifacts"
 +                                          "-Dstorm.conf.file="
 +                                          "-Dstorm.options="
 +                                          (str "-Dstorm.log.dir=" (ConfigUtils/getLogDir))
 +                                          (str "-Dlogging.sensitivity=" mock-sensitivity)
 +                                          (str "-Dlog4j.configurationFile=" file-prefix Utils/FILE_PATH_SEPARATOR "log4j2" Utils/FILE_PATH_SEPARATOR "worker.xml")
 +                                          "-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector"
 +                                          (str "-Dstorm.id=" mock-storm-id)
 +                                          (str "-Dworker.id=" mock-worker-id)
 +                                          (str "-Dworker.port=" mock-port)
 +                                          "-cp" classpath
 +                                          "org.apache.storm.daemon.worker"
 +                                          mock-storm-id
 +                                          ""
 +                                          mock-port
 +                                          mock-worker-id])
 +                          ret (ArrayList.)]
 +                        (doseq [val sequences]
 +                          (.add ret (str val)))
 +                          ret))]
        (testing "testing *.worker.childopts as strings with extra spaces"
          (let [string-opts "-Dfoo=bar  -Xmx1024m"
                topo-string-opts "-Dkau=aux   -Xmx2048m"
@@@ -364,22 -354,19 +363,20 @@@
                                                         ([conf storm-id] nil))
                            (readSupervisorStormConfImpl [conf storm-id] mocked-supervisor-storm-conf)
                            (setWorkerUserWSEImpl [conf worker-id user] nil)
 -                          (workerArtifactsRootImpl [conf] "/tmp/workers-artifacts"))]
 +                          (workerArtifactsRootImpl [conf] "/tmp/workers-artifacts"))
 +              process-proxy (proxy [SyncProcessEvent] []
 +                              (jlp [stormRoot conf] "")
 +                              (writeLogMetadata [stormconf user workerId stormId port conf] nil)
 +                              (createBlobstoreLinks [conf stormId workerId] nil))]
 +
            (with-open [_ (ConfigUtilsInstaller. cu-proxy)
                        _ (UtilsInstaller. utils-spy)]
-                 (.launchDistributeWorker process-proxy mock-supervisor nil
 -              (stubbing [supervisor/jlp nil
 -                         supervisor/write-log-metadata! nil
 -                         supervisor/create-blobstore-links nil]
 -                (supervisor/launch-worker mock-supervisor
 -                                      mock-storm-id
 -                                      mock-port
++                (.launchWorker process-proxy mock-supervisor nil
 +                                      "" mock-storm-id mock-port
                                        mock-worker-id
 -                                      (WorkerResources.))
 +                                      (WorkerResources.) nil nil)
-             ;I update "(Matchers/eq exp-args)" to "(Matchers/any) " as exp-args is different with the first argument.
-             ;But I find they have same values from supervisor-test.xml. I don't kown what happened here?
                  (. (Mockito/verify utils-spy)
-                    (launchProcessImpl (Matchers/any)
+                    (launchProcessImpl (Matchers/eq exp-args)
                                        (Matchers/any)
                                        (Matchers/any)
                                        (Matchers/any)
@@@ -403,20 -390,19 +400,20 @@@
                            (proxy [Utils] []
                              (addToClasspathImpl [classpath paths] mock-cp)
                              (launchProcessImpl [& _] nil))
 -                          Mockito/spy)]
 +                          Mockito/spy)
 +              process-proxy (proxy [SyncProcessEvent] []
 +                              (jlp [stormRoot conf] "")
 +                              (writeLogMetadata [stormconf user workerId stormId port conf] nil)
 +                              (createBlobstoreLinks [conf stormId workerId] nil))]
              (with-open [_ (ConfigUtilsInstaller. cu-proxy)
                          _ (UtilsInstaller. utils-spy)]
-                   (.launchDistributeWorker process-proxy mock-supervisor nil
 -                (stubbing [supervisor/jlp nil
 -                           supervisor/write-log-metadata! nil
 -                           supervisor/create-blobstore-links nil]
 -                  (supervisor/launch-worker mock-supervisor
 -                                            mock-storm-id
++                  (.launchWorker process-proxy mock-supervisor nil
 +                                            "" mock-storm-id
                                              mock-port
                                              mock-worker-id
 -                                            (WorkerResources.))
 +                                            (WorkerResources.) nil nil)
                    (. (Mockito/verify utils-spy)
-                      (launchProcessImpl (Matchers/any)
+                      (launchProcessImpl (Matchers/eq exp-args)
                                          (Matchers/any)
                                          (Matchers/any)
                                          (Matchers/any)
@@@ -438,20 -424,19 +435,20 @@@
                              (currentClasspathImpl []
                                (str Utils/FILE_PATH_SEPARATOR "base"))
                              (launchProcessImpl [& _] nil))
 -                          Mockito/spy)]
 +                          Mockito/spy)
 +              process-proxy (proxy [SyncProcessEvent] []
 +                              (jlp [stormRoot conf] "")
 +                              (writeLogMetadata [stormconf user workerId stormId port conf] nil)
 +                              (createBlobstoreLinks [conf stormId workerId] nil))]
            (with-open [_ (ConfigUtilsInstaller. cu-proxy)
                        _ (UtilsInstaller. utils-spy)]
-                   (.launchDistributeWorker process-proxy mock-supervisor nil
 -                (stubbing [supervisor/jlp nil
 -                     supervisor/write-log-metadata! nil
 -                     supervisor/create-blobstore-links nil]
 -                  (supervisor/launch-worker mock-supervisor
 -                                              mock-storm-id
++                  (.launchWorker process-proxy mock-supervisor nil
 +                                               "" mock-storm-id
                                                mock-port
                                                mock-worker-id
 -                                              (WorkerResources.))
 +                                              (WorkerResources.) nil nil)
                    (. (Mockito/verify utils-spy)
-                      (launchProcessImpl (Matchers/any)
+                      (launchProcessImpl (Matchers/eq exp-args)
                                          (Matchers/any)
                                          (Matchers/any)
                                          (Matchers/any)
@@@ -473,18 -458,17 +470,18 @@@
                              (currentClasspathImpl []
                                (str Utils/FILE_PATH_SEPARATOR "base"))
                              (launchProcessImpl [& _] nil))
 -                          Mockito/spy)]
 +                          Mockito/spy)
 +              process-proxy (proxy [SyncProcessEvent] []
 +                              (jlp [stormRoot conf] nil)
 +                              (writeLogMetadata [stormconf user workerId stormId port conf] nil)
 +                              (createBlobstoreLinks [conf stormId workerId] nil))]
            (with-open [_ (ConfigUtilsInstaller. cu-proxy)
                        _ (UtilsInstaller. utils-spy)]
-             (.launchDistributeWorker process-proxy mock-supervisor nil
 -            (stubbing [supervisor/jlp nil
 -                       supervisor/write-log-metadata! nil
 -                       supervisor/create-blobstore-links nil]
 -              (supervisor/launch-worker mock-supervisor
 -                                        mock-storm-id
++            (.launchWorker process-proxy mock-supervisor nil
 +                                        "" mock-storm-id
                                          mock-port
                                          mock-worker-id
 -                                        (WorkerResources.))
 +                                        (WorkerResources.) nil nil)
                (. (Mockito/verify utils-spy)
                   (launchProcessImpl (Matchers/any)
                                      (Matchers/eq full-env)
@@@ -494,21 -478,22 +491,24 @@@
  
  (deftest test-worker-launch-command-run-as-user
    (testing "*.worker.childopts configuration"
-     (let [mock-port 42
 -    (let [mock-port "42"
++    (let [file-prefix (let [os (System/getProperty "os.name")]
++                        (if (.startsWith os "Windows") (str "file:///")
++                          (str "")))
++          mock-port 42
            mock-storm-id "fake-storm-id"
            mock-worker-id "fake-worker-id"
            mock-sensitivity "S3"
            mock-cp "mock-classpath'quote-on-purpose"
            attrs (make-array FileAttribute 0)
            storm-local (.getCanonicalPath (.toFile (Files/createTempDirectory "storm-local" attrs)))
 -          storm-log-dir (ConfigUtils/getLogDir)
--          worker-script (str storm-local "/workers/" mock-worker-id "/storm-worker-script.sh")
++          worker-script (str storm-local Utils/FILE_PATH_SEPARATOR "workers" Utils/FILE_PATH_SEPARATOR mock-worker-id Utils/FILE_PATH_SEPARATOR "storm-worker-script.sh")
            exp-launch ["/bin/worker-launcher"
                        "me"
                        "worker"
--                      (str storm-local "/workers/" mock-worker-id)
++                      (str storm-local Utils/FILE_PATH_SEPARATOR "workers" Utils/FILE_PATH_SEPARATOR mock-worker-id)
                        worker-script]
            exp-script-fn (fn [opts topo-opts]
--                          (str "#!/bin/bash\n'export' 'LD_LIBRARY_PATH=';\n\nexec 'java'"
++                          (str "#!/bin/bash\r\n'export' 'LD_LIBRARY_PATH=';\r\n\r\nexec 'java'"
                                 " '-cp' 'mock-classpath'\"'\"'quote-on-purpose'"
                                 " '-Dlogfile.name=" "worker.log'"
                                 " '-Dstorm.home='"
@@@ -516,8 -501,8 +516,8 @@@
                                 " '-Dstorm.id=" mock-storm-id "'"
                                 " '-Dworker.id=" mock-worker-id "'"
                                 " '-Dworker.port=" mock-port "'"
-                                " '-Dstorm.log.dir=/logs'"
 -                               " '-Dstorm.log.dir=" storm-log-dir "'"
--                               " '-Dlog4j.configurationFile=/log4j2/worker.xml'"
++                               " '-Dstorm.log.dir=" (ConfigUtils/getLogDir) "'"
++                               " '-Dlog4j.configurationFile=" (str file-prefix Utils/FILE_PATH_SEPARATOR "log4j2" Utils/FILE_PATH_SEPARATOR "worker.xml'")
                                 " '-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector'"
                                 " 'org.apache.storm.LogWriter'"
                                 " 'java' '-server'"
@@@ -529,9 -514,9 +529,9 @@@
                                 " '-Dworkers.artifacts=" (str storm-local "/workers-artifacts'")
                                 " '-Dstorm.conf.file='"
                                 " '-Dstorm.options='"
-                                " '-Dstorm.log.dir=/logs'"
 -                               " '-Dstorm.log.dir=" storm-log-dir "'"
++                               " '-Dstorm.log.dir=" (ConfigUtils/getLogDir) "'"
                                 " '-Dlogging.sensitivity=" mock-sensitivity "'"
--                               " '-Dlog4j.configurationFile=/log4j2/worker.xml'"
++                               " '-Dlog4j.configurationFile=" (str file-prefix Utils/FILE_PATH_SEPARATOR "log4j2" Utils/FILE_PATH_SEPARATOR "worker.xml'")
                                 " '-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector'"
                                 " '-Dstorm.id=" mock-storm-id "'"
                                 " '-Dworker.id=" mock-worker-id "'"
@@@ -539,6 -524,6 +539,7 @@@
                                 " '-cp' 'mock-classpath'\"'\"'quote-on-purpose'"
                                 " 'org.apache.storm.daemon.worker'"
                                 " '" mock-storm-id "'"
++                               " '""'"
                                 " '" mock-port "'"
                                 " '" mock-worker-id "';"))]
        (try
@@@ -565,29 -550,24 +566,28 @@@
                              (proxy [Utils] []
                                (addToClasspathImpl [classpath paths] mock-cp)
                                (launchProcessImpl [& _] nil))
 -                            Mockito/spy)]
 +                            Mockito/spy)
 +                supervisor-utils (Mockito/mock SupervisorUtils)
 +                process-proxy (proxy [SyncProcessEvent] []
 +                                (jlp [stormRoot conf] "")
 +                                (writeLogMetadata [stormconf user workerId stormId port conf] nil))]
              (with-open [_ (ConfigUtilsInstaller. cu-proxy)
 -                        _ (UtilsInstaller. utils-spy)]
 -              (stubbing [supervisor/java-cmd "java"
 -                         supervisor/jlp nil
 -                         supervisor/write-log-metadata! nil]
 -                (supervisor/launch-worker mock-supervisor
 -                                          mock-storm-id
 +                        _ (UtilsInstaller. utils-spy)
 +                        _ (MockedSupervisorUtils. supervisor-utils)]
-               (.launchDistributeWorker process-proxy mock-supervisor nil
++              (. (Mockito/when (.javaCmdImpl supervisor-utils (Mockito/any))) (thenReturn (str "java")))
++              (.launchWorker process-proxy mock-supervisor nil
 +                                          "" mock-storm-id
                                            mock-port
                                            mock-worker-id
 -                                          (WorkerResources.))
 +                                          (WorkerResources.) nil nil)
-                 (. (Mockito/when (.javaCmdImpl supervisor-utils (Mockito/any))) (thenReturn "java"))
                  (. (Mockito/verify utils-spy)
-                    (launchProcessImpl (Matchers/any)
+                    (launchProcessImpl (Matchers/eq exp-launch)
                                        (Matchers/any)
                                        (Matchers/any)
                                        (Matchers/any)
 -                                      (Matchers/any)))))
 -            (is (= (slurp worker-script) exp-script))))
 +                                      (Matchers/any))))
-            ;can't pass here
-            ; (is (= (slurp worker-script) exp-script))
++            (is (= (slurp worker-script) exp-script))
 +            ))
          (finally (Utils/forceDelete storm-local)))
        (.mkdirs (io/file storm-local "workers" mock-worker-id))
        (try
@@@ -612,28 -592,24 +612,28 @@@
                              (proxy [Utils] []
                                (addToClasspathImpl [classpath paths] mock-cp)
                                (launchProcessImpl [& _] nil))
 -                            Mockito/spy)]
 +                            Mockito/spy)
 +                supervisor-utils (Mockito/mock SupervisorUtils)
 +                process-proxy (proxy [SyncProcessEvent] []
 +                                (jlp [stormRoot conf] "")
 +                                (writeLogMetadata [stormconf user workerId stormId port conf] nil))]
              (with-open [_ (ConfigUtilsInstaller. cu-proxy)
 -                        _ (UtilsInstaller. utils-spy)]
 -              (stubbing [supervisor/java-cmd "java"
 -                         supervisor/jlp nil
 -                         supervisor/write-log-metadata! nil]
 -                (supervisor/launch-worker mock-supervisor
 -                                          mock-storm-id
 +                        _ (UtilsInstaller. utils-spy)
 +                        _ (MockedSupervisorUtils. supervisor-utils)]
-               (.launchDistributeWorker process-proxy mock-supervisor nil
++              (. (Mockito/when (.javaCmdImpl supervisor-utils (Mockito/any))) (thenReturn (str "java")))
++              (.launchWorker process-proxy mock-supervisor nil
 +                                          "" mock-storm-id
                                            mock-port
                                            mock-worker-id
 -                                          (WorkerResources.))
 +                                          (WorkerResources.) nil nil)
-                 (. (Mockito/when (.javaCmdImpl supervisor-utils (Mockito/any))) (thenReturn "java"))
                  (. (Mockito/verify utils-spy)
 -                 (launchProcessImpl (Matchers/eq exp-launch)
 +                 (launchProcessImpl (Matchers/any)
                                      (Matchers/any)
                                      (Matchers/any)
                                      (Matchers/any)
 -                                    (Matchers/any)))))
 -            (is (= (slurp worker-script) exp-script))))
 +                                    (Matchers/any))))
-            ; (is (= (slurp worker-script) exp-script))
++            (is (= (slurp worker-script) exp-script))
 +            ))
          (finally (Utils/forceDelete storm-local))))))
  
  (deftest test-workers-go-bananas