You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/04/01 16:07:28 UTC

[05/35] storm git commit: update test codes about supervisor

http://git-wip-us.apache.org/repos/asf/storm/blob/19fcafbd/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java
new file mode 100644
index 0000000..9df7ec1
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.commons.io.FileUtils;
+import org.apache.storm.Config;
+import org.apache.storm.StormTimer;
+import org.apache.storm.daemon.metrics.MetricsUtils;
+import org.apache.storm.daemon.metrics.reporters.PreparableReporter;
+import org.apache.storm.daemon.supervisor.timer.RunProfilerActions;
+import org.apache.storm.daemon.supervisor.timer.SupervisorHealthCheck;
+import org.apache.storm.daemon.supervisor.timer.SupervisorHeartbeat;
+import org.apache.storm.daemon.supervisor.timer.UpdateBlobs;
+import org.apache.storm.event.EventManagerImp;
+import org.apache.storm.localizer.Localizer;
+import org.apache.storm.messaging.IContext;
+import org.apache.storm.scheduler.ISupervisor;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.utils.VersionInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.InterruptedIOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class Supervisor {
+    private static Logger LOG = LoggerFactory.getLogger(Supervisor.class);
+
+    //TODO: to be removed after porting worker.clj. localSyncProcess is intended to start local supervisor
+    private SyncProcessEvent localSyncProcess;
+
+    public void setLocalSyncProcess(SyncProcessEvent localSyncProcess) {
+        this.localSyncProcess = localSyncProcess;
+    }
+
+
+    /**
+     * in local state, supervisor stores who its current assignments are another thread launches events to restart any dead processes if necessary
+     * 
+     * @param conf
+     * @param sharedContext
+     * @param iSupervisor
+     * @return
+     * @throws Exception
+     */
+    public SupervisorManger mkSupervisor(final Map conf, IContext sharedContext, ISupervisor iSupervisor) throws Exception {
+        SupervisorManger supervisorManger = null;
+        try {
+            LOG.info("Starting Supervisor with conf {}", conf);
+            iSupervisor.prepare(conf, ConfigUtils.supervisorIsupervisorDir(conf));
+            String path = ConfigUtils.supervisorTmpDir(conf);
+            FileUtils.cleanDirectory(new File(path));
+
+            final SupervisorData supervisorData = new SupervisorData(conf, sharedContext, iSupervisor);
+            Localizer localizer = supervisorData.getLocalizer();
+
+            SupervisorHeartbeat hb = new SupervisorHeartbeat(conf, supervisorData);
+            hb.run();
+            // should synchronize supervisor so it doesn't launch anything after being down (optimization)
+            Integer heartbeatFrequency = Utils.getInt(conf.get(Config.SUPERVISOR_HEARTBEAT_FREQUENCY_SECS));
+            supervisorData.getHeartbeatTimer().scheduleRecurring(0, heartbeatFrequency, hb);
+
+            Set<String> downdedStormId = SupervisorUtils.readDownLoadedStormIds(conf);
+            for (String stormId : downdedStormId) {
+                SupervisorUtils.addBlobReferences(localizer, stormId, conf);
+            }
+            // do this after adding the references so we don't try to clean things being used
+            localizer.startCleaner();
+
+            EventManagerImp syncSupEventManager = new EventManagerImp(false);
+            EventManagerImp syncProcessManager = new EventManagerImp(false);
+
+            SyncProcessEvent syncProcessEvent = null;
+            if (ConfigUtils.isLocalMode(conf)){
+                localSyncProcess.init(supervisorData);
+                syncProcessEvent = localSyncProcess;
+            }else{
+                syncProcessEvent = new SyncProcessEvent(supervisorData);
+            }
+
+            SyncSupervisorEvent syncSupervisorEvent = new SyncSupervisorEvent(supervisorData, syncProcessEvent, syncSupEventManager, syncProcessManager);
+            UpdateBlobs updateBlobsThread = new UpdateBlobs(supervisorData);
+            RunProfilerActions runProfilerActionThread = new RunProfilerActions(supervisorData);
+
+            if ((Boolean) conf.get(Config.SUPERVISOR_ENABLE)) {
+                StormTimer eventTimer = supervisorData.getEventTimer();
+                // This isn't strictly necessary, but it doesn't hurt and ensures that the machine stays up
+                // to date even if callbacks don't all work exactly right
+                eventTimer.scheduleRecurring(0, 10, new EventManagerPushCallback(syncSupervisorEvent, syncSupEventManager));
+
+                eventTimer.scheduleRecurring(0, Utils.getInt(conf.get(Config.SUPERVISOR_MONITOR_FREQUENCY_SECS)),
+                        new EventManagerPushCallback(syncProcessEvent, syncProcessManager));
+
+                // Blob update thread. Starts with 30 seconds delay, every 30 seconds
+                supervisorData.getBlobUpdateTimer().scheduleRecurring(30, 30, new EventManagerPushCallback(updateBlobsThread, syncSupEventManager));
+
+                // supervisor health check
+                eventTimer.scheduleRecurring(300, 300, new SupervisorHealthCheck(supervisorData));
+
+                // Launch a thread that Runs profiler commands . Starts with 30 seconds delay, every 30 seconds
+                eventTimer.scheduleRecurring(30, 30, new EventManagerPushCallback(runProfilerActionThread, syncSupEventManager));
+            }
+            LOG.info("Starting supervisor with id {} at host {}.", supervisorData.getSupervisorId(), supervisorData.getHostName() );
+            supervisorManger = new SupervisorManger(supervisorData, syncSupEventManager, syncProcessManager);
+        } catch (Throwable t) {
+            if (Utils.exceptionCauseIsInstanceOf(InterruptedIOException.class, t)) {
+                throw t;
+            } else if (Utils.exceptionCauseIsInstanceOf(InterruptedException.class, t)) {
+                throw t;
+            } else {
+                LOG.error("Error on initialization of server supervisor: {}", t);
+                Utils.exitProcess(13, "Error on initialization");
+            }
+        }
+        return supervisorManger;
+    }
+
+    /**
+     * start distribute supervisor
+     */
+    private void distributeLaunch() {
+        LOG.info("Starting supervisor for storm version '{}'.", VersionInfo.getVersion());
+        SupervisorManger supervisorManager;
+        try {
+            Map<Object, Object> conf = Utils.readStormConfig();
+            if (ConfigUtils.isLocalMode(conf)) {
+                throw new IllegalArgumentException("Cannot start server in local mode!");
+            }
+            ISupervisor iSupervisor = new StandaloneSupervisor();
+            supervisorManager = mkSupervisor(conf, null, iSupervisor);
+            if (supervisorManager != null)
+                Utils.addShutdownHookWithForceKillIn1Sec(supervisorManager);
+            registerWorkerNumGauge("drpc:num-execute-http-requests", conf);
+            startMetricsReporters(conf);
+        } catch (Exception e) {
+            LOG.error("Failed to start supervisor\n", e);
+            System.exit(1);
+        }
+    }
+
+    // To be removed
+    private void registerWorkerNumGauge(String name, final Map conf) {
+        MetricRegistry metricRegistry = new MetricRegistry();
+        metricRegistry.remove(name);
+        metricRegistry.register(name, new Gauge<Integer>() {
+            @Override
+            public Integer getValue() {
+                Collection<String> pids = SupervisorUtils.myWorkerIds(conf);
+                return pids.size();
+            }
+        });
+    }
+
+    // To be removed
+    private void startMetricsReporters(Map conf) {
+        List<PreparableReporter> preparableReporters = MetricsUtils.getPreparableReporters(conf);
+        for (PreparableReporter reporter : preparableReporters) {
+            reporter.prepare(new MetricRegistry(), conf);
+            reporter.start();
+        }
+        LOG.info("Started statistics report plugin...");
+    }
+
+    /**
+     * supervisor daemon enter entrance
+     *
+     * @param args
+     */
+    public static void main(String[] args) {
+        Utils.setupDefaultUncaughtExceptionHandler();
+        Supervisor instance = new Supervisor();
+        instance.distributeLaunch();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/19fcafbd/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorData.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorData.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorData.java
index 9eec253..039fe30 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorData.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorData.java
@@ -105,10 +105,9 @@ public class SupervisorData {
 
         List<ACL> acls = null;
         if (Utils.isZkAuthenticationConfiguredStormServer(conf)) {
-            acls = new ArrayList<>();
-            acls.add(ZooDefs.Ids.CREATOR_ALL_ACL.get(0));
-            acls.add(new ACL((ZooDefs.Perms.READ ^ ZooDefs.Perms.CREATE), ZooDefs.Ids.ANYONE_ID_UNSAFE));
+            acls = SupervisorUtils.supervisorZkAcls();
         }
+
         try {
             this.stormClusterState = ClusterUtils.mkStormClusterState(conf, acls, new ClusterStateContext(DaemonType.SUPERVISOR));
         } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/storm/blob/19fcafbd/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorServer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorServer.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorServer.java
deleted file mode 100644
index fd31631..0000000
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorServer.java
+++ /dev/null
@@ -1,201 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.daemon.supervisor;
-
-import com.codahale.metrics.Gauge;
-import com.codahale.metrics.MetricRegistry;
-import org.apache.commons.io.FileUtils;
-import org.apache.storm.Config;
-import org.apache.storm.StormTimer;
-import org.apache.storm.command.HealthCheck;
-import org.apache.storm.daemon.metrics.MetricsUtils;
-import org.apache.storm.daemon.metrics.reporters.PreparableReporter;
-import org.apache.storm.daemon.supervisor.timer.RunProfilerActions;
-import org.apache.storm.daemon.supervisor.timer.SupervisorHealthCheck;
-import org.apache.storm.daemon.supervisor.timer.SupervisorHeartbeat;
-import org.apache.storm.daemon.supervisor.timer.UpdateBlobs;
-import org.apache.storm.event.EventManagerImp;
-import org.apache.storm.localizer.Localizer;
-import org.apache.storm.messaging.IContext;
-import org.apache.storm.scheduler.ISupervisor;
-import org.apache.storm.utils.ConfigUtils;
-import org.apache.storm.utils.Utils;
-import org.apache.storm.utils.VersionInfo;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.InterruptedIOException;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-public class SupervisorServer {
-    private static Logger LOG = LoggerFactory.getLogger(SupervisorServer.class);
-
-    /**
-     * in local state, supervisor stores who its current assignments are another thread launches events to restart any dead processes if necessary
-     * 
-     * @param conf
-     * @param sharedContext
-     * @param iSupervisor
-     * @return
-     * @throws Exception
-     */
-    private SupervisorManger mkSupervisor(final Map conf, IContext sharedContext, ISupervisor iSupervisor) throws Exception {
-        SupervisorManger supervisorManger = null;
-        try {
-            LOG.info("Starting Supervisor with conf {}", conf);
-            iSupervisor.prepare(conf, ConfigUtils.supervisorIsupervisorDir(conf));
-            String path = ConfigUtils.supervisorTmpDir(conf);
-            FileUtils.cleanDirectory(new File(path));
-
-            final SupervisorData supervisorData = new SupervisorData(conf, sharedContext, iSupervisor);
-            Localizer localizer = supervisorData.getLocalizer();
-
-            SupervisorHeartbeat hb = new SupervisorHeartbeat(conf, supervisorData);
-            hb.run();
-            // should synchronize supervisor so it doesn't launch anything after being down (optimization)
-            Integer heartbeatFrequency = (Integer) conf.get(Config.SUPERVISOR_HEARTBEAT_FREQUENCY_SECS);
-            supervisorData.getHeartbeatTimer().scheduleRecurring(0, heartbeatFrequency, hb);
-
-            Set<String> downdedStormId = SupervisorUtils.readDownLoadedStormIds(conf);
-            for (String stormId : downdedStormId) {
-                SupervisorUtils.addBlobReferences(localizer, stormId, conf);
-            }
-            // do this after adding the references so we don't try to clean things being used
-            localizer.startCleaner();
-
-            EventManagerImp syncSupEventManager = new EventManagerImp(false);
-            EventManagerImp syncProcessManager = new EventManagerImp(false);
-            SyncProcessEvent syncProcessEvent = new SyncProcessEvent(supervisorData);
-            SyncSupervisorEvent syncSupervisorEvent = new SyncSupervisorEvent(supervisorData, syncProcessEvent, syncSupEventManager, syncProcessManager);
-            UpdateBlobs updateBlobsThread = new UpdateBlobs(supervisorData);
-            RunProfilerActions runProfilerActionThread = new RunProfilerActions(supervisorData);
-
-            if ((Boolean) conf.get(Config.SUPERVISOR_ENABLE)) {
-                StormTimer eventTimer = supervisorData.getEventTimer();
-                // This isn't strictly necessary, but it doesn't hurt and ensures that the machine stays up
-                // to date even if callbacks don't all work exactly right
-                eventTimer.scheduleRecurring(0, 10, new EventManagerPushCallback(syncSupervisorEvent, syncSupEventManager));
-
-                eventTimer.scheduleRecurring(0, (Integer) conf.get(Config.SUPERVISOR_MONITOR_FREQUENCY_SECS),
-                        new EventManagerPushCallback(syncProcessEvent, syncProcessManager));
-
-                // Blob update thread. Starts with 30 seconds delay, every 30 seconds
-                supervisorData.getBlobUpdateTimer().scheduleRecurring(30, 30, new EventManagerPushCallback(updateBlobsThread, syncSupEventManager));
-
-                // supervisor health check
-                eventTimer.scheduleRecurring(300, 300, new SupervisorHealthCheck(supervisorData));
-
-                // Launch a thread that Runs profiler commands . Starts with 30 seconds delay, every 30 seconds
-                eventTimer.scheduleRecurring(30, 30, new EventManagerPushCallback(runProfilerActionThread, syncSupEventManager));
-            }
-            supervisorManger = new SupervisorManger(supervisorData, syncSupEventManager, syncProcessManager);
-        } catch (Throwable t) {
-            if (Utils.exceptionCauseIsInstanceOf(InterruptedIOException.class, t)) {
-                throw t;
-            } else if (Utils.exceptionCauseIsInstanceOf(InterruptedException.class, t)) {
-                throw t;
-            } else {
-                LOG.error("Error on initialization of server supervisor");
-                Utils.exitProcess(13, "Error on initialization");
-            }
-        }
-        return supervisorManger;
-    }
-
-    /**
-     * start local supervisor
-     */
-    public void localLaunch() {
-        LOG.info("Starting supervisor for storm version '{}'.", VersionInfo.getVersion());
-        SupervisorManger supervisorManager;
-        try {
-            Map<Object, Object> conf = Utils.readStormConfig();
-            if (!ConfigUtils.isLocalMode(conf)) {
-                throw new IllegalArgumentException("Cannot start server in distribute mode!");
-            }
-            ISupervisor iSupervisor = new StandaloneSupervisor();
-            supervisorManager = mkSupervisor(conf, null, iSupervisor);
-            if (supervisorManager != null)
-                Utils.addShutdownHookWithForceKillIn1Sec(supervisorManager);
-        } catch (Exception e) {
-            LOG.error("Failed to start supervisor\n", e);
-            System.exit(1);
-        }
-    }
-
-    /**
-     * start distribute supervisor
-     */
-    private void distributeLaunch() {
-        LOG.info("Starting supervisor for storm version '{}'.", VersionInfo.getVersion());
-        SupervisorManger supervisorManager;
-        try {
-            Map<Object, Object> conf = Utils.readStormConfig();
-            if (ConfigUtils.isLocalMode(conf)) {
-                throw new IllegalArgumentException("Cannot start server in local mode!");
-            }
-            ISupervisor iSupervisor = new StandaloneSupervisor();
-            supervisorManager = mkSupervisor(conf, null, iSupervisor);
-            if (supervisorManager != null)
-                Utils.addShutdownHookWithForceKillIn1Sec(supervisorManager);
-            registerWorkerNumGauge("drpc:num-execute-http-requests", conf);
-            startMetricsReporters(conf);
-        } catch (Exception e) {
-            LOG.error("Failed to start supervisor\n", e);
-            System.exit(1);
-        }
-    }
-
-    // To be removed
-    private void registerWorkerNumGauge(String name, final Map conf) {
-        MetricRegistry metricRegistry = new MetricRegistry();
-        metricRegistry.remove(name);
-        metricRegistry.register(name, new Gauge<Integer>() {
-            @Override
-            public Integer getValue() {
-                Collection<String> pids = Utils.readDirContents(ConfigUtils.workerRoot(conf));
-                return pids.size();
-            }
-        });
-    }
-
-    // To be removed
-    private void startMetricsReporters(Map conf) {
-        List<PreparableReporter> preparableReporters = MetricsUtils.getPreparableReporters(conf);
-        for (PreparableReporter reporter : preparableReporters) {
-            reporter.prepare(new MetricRegistry(), conf);
-            reporter.start();
-        }
-        LOG.info("Started statistics report plugin...");
-    }
-
-    /**
-     * supervisor daemon enter entrance
-     *
-     * @param args
-     */
-    public static void main(String[] args) {
-        Utils.setupDefaultUncaughtExceptionHandler();
-        SupervisorServer instance = new SupervisorServer();
-        instance.distributeLaunch();
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/19fcafbd/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java
index ffdb839..9d0b343 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java
@@ -20,10 +20,14 @@ package org.apache.storm.daemon.supervisor;
 import org.apache.commons.lang.StringUtils;
 import org.apache.curator.utils.PathUtils;
 import org.apache.storm.Config;
+import org.apache.storm.generated.LSWorkerHeartbeat;
 import org.apache.storm.localizer.LocalResource;
 import org.apache.storm.localizer.Localizer;
 import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.LocalState;
 import org.apache.storm.utils.Utils;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,13 +40,24 @@ public class SupervisorUtils {
 
     private static final Logger LOG = LoggerFactory.getLogger(SupervisorUtils.class);
 
+    private static final SupervisorUtils INSTANCE = new SupervisorUtils();
+    private static SupervisorUtils _instance = INSTANCE;
+
+    public static void setInstance(SupervisorUtils u) {
+        _instance = u;
+    }
+
+    public static void resetInstance() {
+        _instance = INSTANCE;
+    }
+
     public static Process workerLauncher(Map conf, String user, List<String> args, Map<String, String> environment, final String logPreFix,
             final Utils.ExitCodeCallable exitCodeCallback, File dir) throws IOException {
         if (StringUtils.isBlank(user)) {
             throw new IllegalArgumentException("User cannot be blank when calling workerLauncher.");
         }
         String wlinitial = (String) (conf.get(Config.SUPERVISOR_WORKER_LAUNCHER));
-        String stormHome = System.getProperty("storm.home");
+        String stormHome = ConfigUtils.concatIfNotNull(System.getProperty("storm.home"));
         String wl;
         if (StringUtils.isNotBlank(wlinitial)) {
             wl = wlinitial;
@@ -165,9 +180,94 @@ public class SupervisorUtils {
             return false;
         if (!Utils.checkFileExists(stormconfpath))
             return false;
-        if (!ConfigUtils.isLocalMode(conf) && !Utils.checkFileExists(stormjarpath))
-            return false;
-        return true;
+        if (ConfigUtils.isLocalMode(conf) || Utils.checkFileExists(stormjarpath))
+            return true;
+        return false;
+    }
+
+    public static Collection<String> myWorkerIds(Map conf){
+        return  Utils.readDirContents(ConfigUtils.workerRoot(conf));
+    }
+
+    /**
+     * Returns map from worr id to heartbeat
+     *
+     * @param conf
+     * @return
+     * @throws Exception
+     */
+    public static Map<String, LSWorkerHeartbeat> readWorkerHeartbeats(Map conf) throws Exception {
+        return _instance.readWorkerHeartbeatsImpl(conf);
+    }
+
+    public  Map<String, LSWorkerHeartbeat> readWorkerHeartbeatsImpl(Map conf) throws Exception {
+        Map<String, LSWorkerHeartbeat> workerHeartbeats = new HashMap<>();
+
+        Collection<String> workerIds = SupervisorUtils.supervisorWorkerIds(conf);
+
+        for (String workerId : workerIds) {
+            LSWorkerHeartbeat whb = readWorkerHeartbeat(conf, workerId);
+            // ATTENTION: whb can be null
+            workerHeartbeats.put(workerId, whb);
+        }
+        return workerHeartbeats;
+    }
+
+
+    /**
+     * get worker heartbeat by workerId
+     *
+     * @param conf
+     * @param workerId
+     * @return
+     * @throws IOException
+     */
+    public static LSWorkerHeartbeat readWorkerHeartbeat(Map conf, String workerId) {
+        return _instance.readWorkerHeartbeatImpl(conf, workerId);
+    }
+
+    public  LSWorkerHeartbeat readWorkerHeartbeatImpl(Map conf, String workerId) {
+        try {
+            LocalState localState = ConfigUtils.workerState(conf, workerId);
+            return localState.getWorkerHeartBeat();
+        } catch (Exception e) {
+            LOG.warn("Failed to read local heartbeat for workerId : {},Ignoring exception.", workerId, e);
+            return null;
+        }
+    }
+
+    public static boolean  isWorkerHbTimedOut(int now, LSWorkerHeartbeat whb, Map conf) {
+        return _instance.isWorkerHbTimedOutImpl(now, whb, conf);
+    }
+
+    public  boolean  isWorkerHbTimedOutImpl(int now, LSWorkerHeartbeat whb, Map conf) {
+        boolean result = false;
+        if ((now - whb.get_time_secs()) > Utils.getInt(conf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS))) {
+            result = true;
+        }
+        return result;
+    }
+
+    public static String javaCmd(String cmd) {
+        return _instance.javaCmdImpl(cmd);
+    }
+
+    public String javaCmdImpl(String cmd) {
+        String ret = null;
+        String javaHome = System.getenv().get("JAVA_HOME");
+        if (StringUtils.isNotBlank(javaHome)) {
+            ret = javaHome + Utils.FILE_PATH_SEPARATOR + "bin" + Utils.FILE_PATH_SEPARATOR + cmd;
+        } else {
+            ret = cmd;
+        }
+        return ret;
+    }
+    
+    public static List<ACL> supervisorZkAcls() {
+        List<ACL> acls = new ArrayList<>();
+        acls.add(ZooDefs.Ids.CREATOR_ALL_ACL.get(0));
+        acls.add(new ACL((ZooDefs.Perms.READ ^ ZooDefs.Perms.CREATE), ZooDefs.Ids.ANYONE_ID_UNSAFE));
+        return acls;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/19fcafbd/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java
index af454b9..4ef6d1c 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java
@@ -17,14 +17,10 @@
  */
 package org.apache.storm.daemon.supervisor;
 
-import clojure.lang.IFn;
-import clojure.lang.RT;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.storm.Config;
-import org.apache.storm.ProcessSimulator;
-import org.apache.storm.cluster.IStormClusterState;
-import org.apache.storm.daemon.Shutdownable;
+import org.apache.storm.container.cgroup.CgroupManager;
 import org.apache.storm.generated.ExecutorInfo;
 import org.apache.storm.generated.LSWorkerHeartbeat;
 import org.apache.storm.generated.LocalAssignment;
@@ -33,6 +29,7 @@ import org.apache.storm.utils.ConfigUtils;
 import org.apache.storm.utils.LocalState;
 import org.apache.storm.utils.Time;
 import org.apache.storm.utils.Utils;
+import org.eclipse.jetty.util.ConcurrentHashSet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.yaml.snakeyaml.Yaml;
@@ -52,9 +49,7 @@ public class SyncProcessEvent extends ShutdownWork implements Runnable {
 
     private static Logger LOG = LoggerFactory.getLogger(SyncProcessEvent.class);
 
-    private final LocalState localState;
-
-    private IStormClusterState stormClusterState;
+    private  LocalState localState;
 
     private SupervisorData supervisorData;
 
@@ -80,15 +75,21 @@ public class SyncProcessEvent extends ShutdownWork implements Runnable {
         }
     }
 
+    public SyncProcessEvent(){
+
+    }
+
     public SyncProcessEvent(SupervisorData supervisorData) {
+        init(supervisorData);
+    }
 
+    //TODO: initData is intended to local supervisor, so we will remove them after porting worker.clj to java
+    public void init(SupervisorData supervisorData){
         this.supervisorData = supervisorData;
-
         this.localState = supervisorData.getLocalState();
-
-        this.stormClusterState = supervisorData.getStormClusterState();
     }
 
+
     /**
      * 1. to kill are those in allocated that are dead or disallowed 2. kill the ones that should be dead - read pids, kill -9 and individually remove file -
      * rmr heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log) 3. of the rest, figure out what assignments aren't yet satisfied 4. generate new
@@ -101,12 +102,13 @@ public class SyncProcessEvent extends ShutdownWork implements Runnable {
         try {
             Map conf = supervisorData.getConf();
             Map<Integer, LocalAssignment> assignedExecutors = localState.getLocalAssignmentsMap();
+
             if (assignedExecutors == null) {
                 assignedExecutors = new HashMap<>();
             }
             int now = Time.currentTimeSecs();
 
-            Map<String, StateHeartbeat> localWorkerStats = getLocalWorkerStats(assignedExecutors, now);
+            Map<String, StateHeartbeat> localWorkerStats = getLocalWorkerStats(supervisorData, assignedExecutors, now);
 
             Set<String> keeperWorkerIds = new HashSet<>();
             Set<Integer> keepPorts = new HashSet<>();
@@ -171,16 +173,17 @@ public class SyncProcessEvent extends ShutdownWork implements Runnable {
         }
     }
 
-    Map<Integer, LocalAssignment> getReassignExecutors(Map<Integer, LocalAssignment> assignExecutors, Set<Integer> keepPorts) {
+    protected Map<Integer, LocalAssignment> getReassignExecutors(Map<Integer, LocalAssignment> assignExecutors, Set<Integer> keepPorts) {
         Map<Integer, LocalAssignment> reassignExecutors = new HashMap<>();
+        reassignExecutors.putAll(assignExecutors);
         for (Integer port : keepPorts) {
-            if (assignExecutors.containsKey(port)) {
-                reassignExecutors.put(port, assignExecutors.get(port));
-            }
+            reassignExecutors.remove(port);
         }
         return reassignExecutors;
     }
 
+
+
     /**
      * Returns map from worker id to worker heartbeat. if the heartbeat is nil, then the worker is dead
      * 
@@ -188,11 +191,11 @@ public class SyncProcessEvent extends ShutdownWork implements Runnable {
      * @return
      * @throws Exception
      */
-    public Map<String, StateHeartbeat> getLocalWorkerStats(Map<Integer, LocalAssignment> assignedExecutors, int now) throws Exception {
+    public Map<String, StateHeartbeat> getLocalWorkerStats(SupervisorData supervisorData, Map<Integer, LocalAssignment> assignedExecutors, int now) throws Exception {
         Map<String, StateHeartbeat> workerIdHbstate = new HashMap<>();
         Map conf = supervisorData.getConf();
         LocalState localState = supervisorData.getLocalState();
-        Map<String, LSWorkerHeartbeat> idToHeartbeat = readWorkerHeartbeats(conf);
+        Map<String, LSWorkerHeartbeat> idToHeartbeat = SupervisorUtils.readWorkerHeartbeats(conf);
         Map<String, Integer> approvedWorkers = localState.getApprovedWorkers();
         Set<String> approvedIds = new HashSet<>();
         if (approvedWorkers != null) {
@@ -209,12 +212,12 @@ public class SyncProcessEvent extends ShutdownWork implements Runnable {
             } else if (supervisorData.getDeadWorkers().contains(workerId)) {
                 LOG.info("Worker Process {}as died", workerId);
                 state = State.timedOut;
-            } else if ((now - whb.get_time_secs()) > (Integer) (conf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS))) {
+            } else if (SupervisorUtils.isWorkerHbTimedOut(now, whb, conf)) {
                 state = State.timedOut;
             } else {
                 state = State.valid;
             }
-            LOG.debug("Worker:{} state:{} WorkerHeartbeat:{} at supervisor time-secs {}", workerId, state, whb.toString(), now);
+            LOG.debug("Worker:{} state:{} WorkerHeartbeat:{} at supervisor time-secs {}", workerId, state, whb, now);
             workerIdHbstate.put(workerId, new StateHeartbeat(state, whb));
         }
         return workerIdHbstate;
@@ -222,7 +225,7 @@ public class SyncProcessEvent extends ShutdownWork implements Runnable {
 
     protected boolean matchesAssignment(LSWorkerHeartbeat whb, Map<Integer, LocalAssignment> assignedExecutors) {
         LocalAssignment localAssignment = assignedExecutors.get(whb.get_port());
-        if (localAssignment == null || localAssignment.get_topology_id() != whb.get_topology_id()) {
+        if (localAssignment == null || !localAssignment.get_topology_id().equals(whb.get_topology_id())) {
             return false;
         }
         List<ExecutorInfo> executorInfos = new ArrayList<>();
@@ -230,61 +233,34 @@ public class SyncProcessEvent extends ShutdownWork implements Runnable {
         // remove SYSTEM_EXECUTOR_ID
         executorInfos.remove(new ExecutorInfo(-1, -1));
         List<ExecutorInfo> localExecuorInfos = localAssignment.get_executors();
-        if (executorInfos != localExecuorInfos)
-            return false;
-        return true;
-    }
-
-    /**
-     * Returns map from worr id to heartbeat
-     * 
-     * @param conf
-     * @return
-     * @throws Exception
-     */
-    protected Map<String, LSWorkerHeartbeat> readWorkerHeartbeats(Map conf) throws Exception {
-        Map<String, LSWorkerHeartbeat> workerHeartbeats = new HashMap<>();
 
-        Collection<String> workerIds = SupervisorUtils.supervisorWorkerIds(conf);
-
-        for (String workerId : workerIds) {
-            LSWorkerHeartbeat whb = readWorkerHeartbeat(conf, workerId);
-            // ATTENTION: whb can be null
-            workerHeartbeats.put(workerId, whb);
-        }
-        return workerHeartbeats;
-    }
+        if (localExecuorInfos.size() != executorInfos.size())
+            return false;
 
-    /**
-     * get worker heartbeat by workerId
-     * 
-     * @param conf
-     * @param workerId
-     * @return
-     * @throws IOException
-     */
-    protected LSWorkerHeartbeat readWorkerHeartbeat(Map conf, String workerId) {
-        try {
-            LocalState localState = ConfigUtils.workerState(conf, workerId);
-            return localState.getWorkerHeartBeat();
-        } catch (Exception e) {
-            LOG.warn("Failed to read local heartbeat for workerId : {},Ignoring exception.", workerId, e);
-            return null;
+        for (ExecutorInfo executorInfo : localExecuorInfos){
+            if (!localExecuorInfos.contains(executorInfo))
+                return false;
         }
+        return true;
     }
 
     /**
      * launch a worker in local mode. But it may exist question???
      */
-    protected void launchLocalWorker(String stormId, Integer port, String workerId, WorkerResources resources) throws IOException {
+    protected void launchLocalWorker(SupervisorData supervisorData, String stormId, Long port, String workerId, WorkerResources resources) throws IOException {
         // port this function after porting worker to java
     }
 
     protected String getWorkerClassPath(String stormJar, Map stormConf) {
         List<String> topoClasspath = new ArrayList<>();
         Object object = stormConf.get(Config.TOPOLOGY_CLASSPATH);
-        if (object != null) {
+
+        if (object instanceof List) {
             topoClasspath.addAll((List<String>) object);
+        } else if (object instanceof String){
+            topoClasspath.add((String)object);
+        }else {
+            //ignore
         }
         String classPath = Utils.workerClasspath();
         String classAddPath = Utils.addToClasspath(classPath, Arrays.asList(stormJar));
@@ -300,54 +276,46 @@ public class SyncProcessEvent extends ShutdownWork implements Runnable {
      * @param port
      * @param memOnheap
      */
-    public List<String> substituteChildopts(Object value, String workerId, String stormId, Integer port, int memOnheap) {
+    public List<String> substituteChildopts(Object value, String workerId, String stormId, Long port, int memOnheap) {
         List<String> rets = new ArrayList<>();
         if (value instanceof String) {
             String string = (String) value;
-            string.replace("%ID%", String.valueOf(port));
-            string.replace("%WORKER-ID%", workerId);
-            string.replace("%TOPOLOGY-ID%", stormId);
-            string.replace("%WORKER-PORT%", String.valueOf(port));
-            string.replace("%HEAP-MEM%", String.valueOf(memOnheap));
+            string = string.replace("%ID%", String.valueOf(port));
+            string = string.replace("%WORKER-ID%", workerId);
+            string = string.replace("%TOPOLOGY-ID%", stormId);
+            string = string.replace("%WORKER-PORT%", String.valueOf(port));
+            string = string.replace("%HEAP-MEM%", String.valueOf(memOnheap));
             String[] strings = string.split("\\s+");
             rets.addAll(Arrays.asList(strings));
         } else if (value instanceof List) {
-            List<String> strings = (List<String>) value;
-            for (String str : strings) {
-                str.replace("%ID%", String.valueOf(port));
-                str.replace("%WORKER-ID%", workerId);
-                str.replace("%TOPOLOGY-ID%", stormId);
-                str.replace("%WORKER-PORT%", String.valueOf(port));
-                str.replace("%HEAP-MEM%", String.valueOf(memOnheap));
+            List<Object> objects = (List<Object>) value;
+            for (Object object : objects) {
+                String str = (String)object;
+                str = str.replace("%ID%", String.valueOf(port));
+                str = str.replace("%WORKER-ID%", workerId);
+                str = str.replace("%TOPOLOGY-ID%", stormId);
+                str = str.replace("%WORKER-PORT%", String.valueOf(port));
+                str = str.replace("%HEAP-MEM%", String.valueOf(memOnheap));
                 rets.add(str);
             }
         }
         return rets;
     }
 
-    private String jvmCmd(String cmd) {
-        String ret = null;
-        String javaHome = System.getProperty("JAVA_HOME");
-        if (StringUtils.isNotBlank(javaHome)) {
-            ret = javaHome + Utils.FILE_PATH_SEPARATOR + "bin" + Utils.FILE_PATH_SEPARATOR + cmd;
-        } else {
-            ret = cmd;
-        }
-        return ret;
-    }
+
 
     /**
      * launch a worker in distributed mode
-     *
+     * supervisorId for testing
      * @throws IOException
      */
-    protected void launchDistributeWorker(String stormId, Integer port, String workerId, WorkerResources resources) throws IOException {
+    protected void launchDistributeWorker(Map conf, String supervisorId, String assignmentId, String stormId, Long port, String workerId,
+            WorkerResources resources, CgroupManager cgroupManager, ConcurrentHashSet deadWorkers) throws IOException {
 
-        Map conf = supervisorData.getConf();
         Boolean runWorkerAsUser = Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false);
-        String stormHome = System.getProperty("storm.home");
-        String stormOptions = System.getProperty("storm.options");
-        String stormConfFile = System.getProperty("storm.conf.file");
+        String stormHome = ConfigUtils.concatIfNotNull(System.getProperty("storm.home"));
+        String stormOptions = ConfigUtils.concatIfNotNull(System.getProperty("storm.options"));
+        String stormConfFile = ConfigUtils.concatIfNotNull(System.getProperty("storm.conf.file"));
         String stormLogDir = ConfigUtils.getLogDir();
         String stormLogConfDir = (String) (conf.get(Config.STORM_LOG4J2_CONF_DIR));
 
@@ -384,7 +352,8 @@ public class SyncProcessEvent extends ShutdownWork implements Runnable {
         if (resources.get_mem_on_heap() > 0) {
             memOnheap = (int) Math.ceil(resources.get_mem_on_heap());
         } else {
-            memOnheap = Utils.getInt(stormConf.get(Config.WORKER_HEAP_MEMORY_MB));
+            //set the default heap memory size for supervisor-test
+            memOnheap = Utils.getInt(stormConf.get(Config.WORKER_HEAP_MEMORY_MB), 768);
         }
 
         int memoffheap = (int) Math.ceil(resources.get_mem_off_heap());
@@ -425,16 +394,16 @@ public class SyncProcessEvent extends ShutdownWork implements Runnable {
         List<String> workerProfilerChildopts = null;
         if (Utils.getBoolean(conf.get(Config.WORKER_PROFILER_ENABLED), false)) {
             workerProfilerChildopts = substituteChildopts(conf.get(Config.WORKER_PROFILER_CHILDOPTS), workerId, stormId, port, memOnheap);
+        }else {
+            workerProfilerChildopts = new ArrayList<>();
         }
 
-        Map<String, String> environment = new HashMap<String, String>();
-        Map<String, String> topEnvironment = (Map<String, String>) stormConf.get(Config.TOPOLOGY_ENVIRONMENT);
-        if (topEnvironment != null) {
-            environment.putAll(topEnvironment);
-            environment.put("LD_LIBRARY_PATH", jlp);
-        } else {
-            environment.put("LD_LIBRARY_PATH", jlp);
+        Map<String, String> topEnvironment = new HashMap<String, String>();
+        Map<String, String> environment = (Map<String, String>) stormConf.get(Config.TOPOLOGY_ENVIRONMENT);
+        if (environment != null) {
+            topEnvironment.putAll(environment);
         }
+        topEnvironment.put("LD_LIBRARY_PATH", jlp);
 
         String log4jConfigurationFile = null;
         if (System.getProperty("os.name").startsWith("Windows") && !stormLog4j2ConfDir.startsWith("file:")) {
@@ -444,10 +413,8 @@ public class SyncProcessEvent extends ShutdownWork implements Runnable {
         }
         log4jConfigurationFile = log4jConfigurationFile + Utils.FILE_PATH_SEPARATOR + "worker.xml";
 
-        StringBuilder commandSB = new StringBuilder();
-
         List<String> commandList = new ArrayList<>();
-        commandList.add(jvmCmd("java"));
+        commandList.add(SupervisorUtils.javaCmd("java"));
         commandList.add("-cp");
         commandList.add(workerClassPath);
         commandList.addAll(topoWorkerLogwriterChildopts);
@@ -462,7 +429,7 @@ public class SyncProcessEvent extends ShutdownWork implements Runnable {
         commandList.add("-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector");
         commandList.add("org.apache.storm.LogWriter");
 
-        commandList.add(jvmCmd("java"));
+        commandList.add(SupervisorUtils.javaCmd("java"));
         commandList.add("-server");
         commandList.addAll(workerChildopts);
         commandList.addAll(topWorkerChildopts);
@@ -476,7 +443,7 @@ public class SyncProcessEvent extends ShutdownWork implements Runnable {
         commandList.add("-Dstorm.options=" + stormOptions);
         commandList.add("-Dstorm.log.dir=" + stormLogDir);
         commandList.add("-Dlogging.sensitivity=" + loggingSensitivity);
-        commandList.add(" -Dlog4j.configurationFile=" + log4jConfigurationFile);
+        commandList.add("-Dlog4j.configurationFile=" + log4jConfigurationFile);
         commandList.add("-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector");
         commandList.add("-Dstorm.id=" + stormId);
         commandList.add("-Dworker.id=" + workerId);
@@ -485,7 +452,7 @@ public class SyncProcessEvent extends ShutdownWork implements Runnable {
         commandList.add(workerClassPath);
         commandList.add("org.apache.storm.daemon.worker");
         commandList.add(stormId);
-        commandList.add(supervisorData.getAssignmentId());
+        commandList.add(assignmentId);
         commandList.add(String.valueOf(port));
         commandList.add(workerId);
 
@@ -497,27 +464,29 @@ public class SyncProcessEvent extends ShutdownWork implements Runnable {
             Map<String, Number> map = new HashMap<>();
             map.put("cpu", cpuValue);
             map.put("memory", memoryValue);
-            supervisorData.getResourceIsolationManager().reserveResourcesForWorker(workerId, map);
-            commandList = supervisorData.getResourceIsolationManager().getLaunchCommand(workerId, commandList);
+            cgroupManager.reserveResourcesForWorker(workerId, map);
+            commandList = cgroupManager.getLaunchCommand(workerId, commandList);
         }
 
-        LOG.info("Launching worker with command: ", Utils.shellCmd(commandList));
+        LOG.info("Launching worker with command: {}. ", Utils.shellCmd(commandList));
         writeLogMetadata(stormConf, user, workerId, stormId, port, conf);
         ConfigUtils.setWorkerUserWSE(conf, workerId, user);
         createArtifactsLink(conf, stormId, port, workerId);
 
         String logPrefix = "Worker Process " + workerId;
         String workerDir = ConfigUtils.workerRoot(conf, workerId);
-        supervisorData.getDeadWorkers().remove(workerId);
+
+        if (deadWorkers != null)
+            deadWorkers.remove(workerId);
         createBlobstoreLinks(conf, stormId, workerId);
 
         ProcessExitCallback processExitCallback = new ProcessExitCallback(logPrefix, workerId);
         if (runWorkerAsUser) {
-            List<String> stringList = new ArrayList<>();
-            stringList.add("worker");
-            stringList.add(workerDir);
-            stringList.add(Utils.writeScript(workerDir, commandList, topEnvironment));
-            SupervisorUtils.workerLauncher(conf, user, stringList, null, logPrefix, processExitCallback, new File(workerDir));
+            List<String> args = new ArrayList<>();
+            args.add("worker");
+            args.add(workerDir);
+            args.add(Utils.writeScript(workerDir, commandList, topEnvironment));
+            SupervisorUtils.workerLauncher(conf, user, args, null, logPrefix, processExitCallback, new File(workerDir));
         } else {
             Utils.launchProcess(commandList, topEnvironment, logPrefix, processExitCallback, new File(workerDir));
         }
@@ -536,6 +505,7 @@ public class SyncProcessEvent extends ShutdownWork implements Runnable {
 
         Map<String, Integer> newValidWorkerIds = new HashMap<>();
         Map conf = supervisorData.getConf();
+        String supervisorId = supervisorData.getSupervisorId();
         String clusterMode = ConfigUtils.clusterMode(conf);
 
         for (Map.Entry<Integer, LocalAssignment> entry : reassignExecutors.entrySet()) {
@@ -550,17 +520,20 @@ public class SyncProcessEvent extends ShutdownWork implements Runnable {
                 String pidsPath = ConfigUtils.workerPidsRoot(conf, workerId);
                 String hbPath = ConfigUtils.workerHeartbeatsRoot(conf, workerId);
 
+                LOG.info("Launching worker with assignment {} for this supervisor {} on port {} with id {}", assignment, supervisorData.getSupervisorId(), port,
+                        workerId);
+
                 FileUtils.forceMkdir(new File(pidsPath));
                 FileUtils.forceMkdir(new File(hbPath));
 
                 if (clusterMode.endsWith("distributed")) {
-                    launchDistributeWorker(stormId, port, workerId, resources);
+                    launchDistributeWorker(conf, supervisorId, supervisorData.getAssignmentId(), stormId, port.longValue(), workerId, resources,
+                            supervisorData.getResourceIsolationManager(), supervisorData.getDeadWorkers());
                 } else if (clusterMode.endsWith("local")) {
-                    launchLocalWorker(stormId, port, workerId, resources);
+                    launchLocalWorker(supervisorData, stormId, port.longValue(), workerId, resources);
                 }
                 newValidWorkerIds.put(workerId, port);
-                LOG.info("Launching worker with assignment {} for this supervisor {} on port {} with id {}", assignment, supervisorData.getSupervisorId(), port,
-                        workerId);
+
             } else {
                 LOG.info("Missing topology storm code, so can't launch worker with assignment {} for this supervisor {} on port {} with id {}", assignment,
                         supervisorData.getSupervisorId(), port, workerId);
@@ -570,26 +543,39 @@ public class SyncProcessEvent extends ShutdownWork implements Runnable {
         return newValidWorkerIds;
     }
 
-    protected void writeLogMetadata(Map stormconf, String user, String workerId, String stormId, int port, Map conf) throws IOException {
+    public void writeLogMetadata(Map stormconf, String user, String workerId, String stormId, Long port, Map conf) throws IOException {
         Map data = new HashMap();
         data.put(Config.TOPOLOGY_SUBMITTER_USER, user);
         data.put("worker-id", workerId);
 
         Set<String> logsGroups = new HashSet<>();
+        //for supervisor-test
         if (stormconf.get(Config.LOGS_GROUPS) != null) {
-            logsGroups.addAll((List<String>) stormconf.get(Config.LOGS_GROUPS));
+            List<String> groups = (List<String>) stormconf.get(Config.LOGS_GROUPS);
+            for (String group : groups){
+                logsGroups.add(group);
+            }
         }
         if (stormconf.get(Config.TOPOLOGY_GROUPS) != null) {
-            logsGroups.addAll((List<String>) stormconf.get(Config.TOPOLOGY_GROUPS));
+            List<String> topGroups = (List<String>) stormconf.get(Config.TOPOLOGY_GROUPS);
+            for (String group : topGroups){
+                logsGroups.add(group);
+            }
         }
         data.put(Config.LOGS_GROUPS, logsGroups.toArray());
 
         Set<String> logsUsers = new HashSet<>();
         if (stormconf.get(Config.LOGS_USERS) != null) {
-            logsUsers.addAll((List<String>) stormconf.get(Config.LOGS_USERS));
+            List<String> logUsers = (List<String>) stormconf.get(Config.LOGS_USERS);
+            for (String logUser : logUsers){
+                logsUsers.add(logUser);
+            }
         }
         if (stormconf.get(Config.TOPOLOGY_USERS) != null) {
-            logsUsers.addAll((List<String>) stormconf.get(Config.TOPOLOGY_USERS));
+            List<String> topUsers = (List<String>) stormconf.get(Config.TOPOLOGY_USERS);
+            for (String logUser : topUsers){
+                logsUsers.add(logUser);
+            }
         }
         data.put(Config.LOGS_USERS, logsUsers.toArray());
         writeLogMetadataToYamlFile(stormId, port, data, conf);
@@ -604,19 +590,25 @@ public class SyncProcessEvent extends ShutdownWork implements Runnable {
      * @param conf
      * @throws IOException
      */
-    protected void writeLogMetadataToYamlFile(String stormId, int port, Map data, Map conf) throws IOException {
-        File file = ConfigUtils.getLogMetaDataFile(conf, stormId, port);
+    public void writeLogMetadataToYamlFile(String stormId, Long port, Map data, Map conf) throws IOException {
+        File file = ConfigUtils.getLogMetaDataFile(conf, stormId, port.intValue());
+
         if (!Utils.checkFileExists(file.getParent())) {
             if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
                 FileUtils.forceMkdir(file.getParentFile());
                 SupervisorUtils.setupStormCodeDir(conf, ConfigUtils.readSupervisorStormConf(conf, stormId), file.getParentFile().getCanonicalPath());
             } else {
-                file.getParentFile().mkdir();
+                file.getParentFile().mkdirs();
             }
         }
         FileWriter writer = new FileWriter(file);
         Yaml yaml = new Yaml();
-        yaml.dump(data, writer);
+        try {
+            yaml.dump(data, writer);
+        }finally {
+            writer.close();
+        }
+
     }
 
     /**
@@ -627,7 +619,7 @@ public class SyncProcessEvent extends ShutdownWork implements Runnable {
      * @param port
      * @param workerId
      */
-    protected void createArtifactsLink(Map conf, String stormId, int port, String workerId) throws IOException {
+    protected void createArtifactsLink(Map conf, String stormId, Long port, String workerId) throws IOException {
         String workerDir = ConfigUtils.workerRoot(conf, workerId);
         String topoDir = ConfigUtils.workerArtifactsRoot(conf, stormId);
         if (Utils.checkFileExists(workerDir)) {

http://git-wip-us.apache.org/repos/asf/storm/blob/19fcafbd/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java
index d6dc45e..2de9203 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java
@@ -88,6 +88,7 @@ public class SyncSupervisorEvent implements Runnable {
             Map<Integer, LocalAssignment> allAssignment =
                     readAssignments(assignmentsSnapshot, existingAssignment, supervisorData.getAssignmentId(), supervisorData.getSyncRetry());
 
+
             Map<Integer, LocalAssignment> newAssignment = new HashMap<>();
             Set<String> assignedStormIds = new HashSet<>();
 
@@ -97,6 +98,7 @@ public class SyncSupervisorEvent implements Runnable {
                     assignedStormIds.add(entry.getValue().get_topology_id());
                 }
             }
+
             Set<String> srashStormIds = verifyDownloadedFiles(conf, supervisorData.getLocalizer(), assignedStormIds, allDownloadedTopologyIds);
             Set<String> downloadedStormIds = new HashSet<>();
             downloadedStormIds.addAll(allDownloadedTopologyIds);
@@ -312,6 +314,7 @@ public class SyncSupervisorEvent implements Runnable {
         }
 
         FileUtils.moveDirectory(new File(tmproot), new File(stormroot));
+
         SupervisorUtils.setupStormCodeDir(conf, ConfigUtils.readSupervisorStormConf(conf, stormId), stormroot);
         ClassLoader classloader = Thread.currentThread().getContextClassLoader();
 
@@ -350,7 +353,7 @@ public class SyncSupervisorEvent implements Runnable {
         String tmproot = ConfigUtils.supervisorTmpDir(conf) + Utils.FILE_PATH_SEPARATOR + Utils.uuid();
         String stormroot = ConfigUtils.supervisorStormDistRoot(conf, stormId);
         ClientBlobStore blobStore = Utils.getClientBlobStoreForSupervisor(conf);
-
+        FileUtils.forceMkdir(new File(tmproot));
         if (Utils.isOnWindows()) {
             if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
                 throw new RuntimeException("ERROR: Windows doesn't implement setting the correct permissions");
@@ -358,7 +361,6 @@ public class SyncSupervisorEvent implements Runnable {
         } else {
             Utils.restrictPermissions(tmproot);
         }
-        FileUtils.forceMkdir(new File(tmproot));
         String stormJarKey = ConfigUtils.masterStormJarKey(stormId);
         String stormCodeKey = ConfigUtils.masterStormCodeKey(stormId);
         String stormConfKey = ConfigUtils.masterStormConfKey(stormId);
@@ -549,7 +551,7 @@ public class SyncSupervisorEvent implements Runnable {
             for (Map.Entry<List<Long>, NodeInfo> entry : executorNodePort.entrySet()) {
                 if (entry.getValue().get_node().equals(assignmentId)) {
                     for (Long port : entry.getValue().get_port()) {
-                        LocalAssignment localAssignment = portTasks.get(port);
+                        LocalAssignment localAssignment = portTasks.get(port.intValue());
                         if (localAssignment == null) {
                             List<ExecutorInfo> executors = new ArrayList<ExecutorInfo>();
                             localAssignment = new LocalAssignment(stormId, executors);
@@ -577,8 +579,7 @@ public class SyncSupervisorEvent implements Runnable {
             assignedExecutors = new HashMap<>();
         }
         int now = Time.currentTimeSecs();
-        SyncProcessEvent syncProcesses = new SyncProcessEvent(supervisorData);
-        Map<String, StateHeartbeat> workerIdHbstate = syncProcesses.getLocalWorkerStats(assignedExecutors, now);
+        Map<String, StateHeartbeat> workerIdHbstate = syncProcesses.getLocalWorkerStats(supervisorData, assignedExecutors, now);
         LOG.debug("Allocated workers ", assignedExecutors);
         for (Map.Entry<String, StateHeartbeat> entry : workerIdHbstate.entrySet()){
             String workerId = entry.getKey();

http://git-wip-us.apache.org/repos/asf/storm/blob/19fcafbd/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java
index 2d73327..91044cc 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java
@@ -41,7 +41,6 @@ public class RunProfilerActions implements Runnable {
     private Map conf;
     private IStormClusterState stormClusterState;
     private String hostName;
-    private String stormHome;
 
     private String profileCmd;
 
@@ -79,7 +78,6 @@ public class RunProfilerActions implements Runnable {
         this.conf = supervisorData.getConf();
         this.stormClusterState = supervisorData.getStormClusterState();
         this.hostName = supervisorData.getHostName();
-        this.stormHome = System.getProperty("storm.home");
         this.profileCmd = (String) (conf.get(Config.WORKER_PROFILER_COMMAND));
         this.supervisorData = supervisorData;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/19fcafbd/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
index d41ca87..e158dbc 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
@@ -22,6 +22,7 @@ import org.apache.storm.cluster.IStormClusterState;
 import org.apache.storm.daemon.supervisor.SupervisorData;
 import org.apache.storm.generated.SupervisorInfo;
 import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -53,13 +54,16 @@ public class SupervisorHeartbeat implements Runnable {
         List<Long> usedPorts = new ArrayList<>();
         usedPorts.addAll(supervisorData.getCurrAssignment().keySet());
         supervisorInfo.set_used_ports(usedPorts);
+        List metaDatas = (List)supervisorData.getiSupervisor().getMetadata();
         List<Long> portList = new ArrayList<>();
-        Object metas = supervisorData.getiSupervisor().getMetadata();
-        if (metas != null) {
-            for (Integer port : (List<Integer>) metas) {
-                portList.add(port.longValue());
+        if (metaDatas != null){
+            for (Object data : metaDatas){
+                Integer port = Utils.getInt(data);
+                if (port != null)
+                    portList.add(port.longValue());
             }
         }
+
         supervisorInfo.set_meta(portList);
         supervisorInfo.set_scheduler_meta((Map<String, String>) conf.get(Config.SUPERVISOR_SCHEDULER_META));
         supervisorInfo.set_uptime_secs(supervisorData.getUpTime().upTime());

http://git-wip-us.apache.org/repos/asf/storm/blob/19fcafbd/storm-core/src/jvm/org/apache/storm/testing/staticmocking/MockedSupervisorUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/testing/staticmocking/MockedSupervisorUtils.java b/storm-core/src/jvm/org/apache/storm/testing/staticmocking/MockedSupervisorUtils.java
new file mode 100644
index 0000000..d33dc9c
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/testing/staticmocking/MockedSupervisorUtils.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.testing.staticmocking;
+
+import org.apache.storm.daemon.supervisor.SupervisorUtils;
+
+public class MockedSupervisorUtils implements AutoCloseable {
+
+    public MockedSupervisorUtils(SupervisorUtils inst) {
+        SupervisorUtils.setInstance(inst);
+    }
+
+    @Override
+    public void close() throws Exception {
+        SupervisorUtils.resetInstance();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/19fcafbd/storm-core/src/jvm/org/apache/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/Utils.java b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
index 1ba3de7..4e3dbb4 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/Utils.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
@@ -215,7 +215,7 @@ public class Utils {
         try {
             T ret = (T) c.newInstance();
             TDeserializer des = getDes();
-            des.deserialize((TBase)ret, b, offset, length);
+            des.deserialize((TBase) ret, b, offset, length);
             return ret;
         } catch (Exception e) {
             throw new RuntimeException(e);
@@ -1700,7 +1700,7 @@ public class Utils {
         if(map == null) {
             return null;
         }
-        return findOne(pred, (Set<T>)map.entrySet());
+        return findOne(pred, (Set<T>) map.entrySet());
     }
 
     public static String localHostname () throws UnknownHostException {

http://git-wip-us.apache.org/repos/asf/storm/blob/19fcafbd/storm-core/test/clj/org/apache/storm/logviewer_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/logviewer_test.clj b/storm-core/test/clj/org/apache/storm/logviewer_test.clj
index 4889c8e..d06c11c 100644
--- a/storm-core/test/clj/org/apache/storm/logviewer_test.clj
+++ b/storm-core/test/clj/org/apache/storm/logviewer_test.clj
@@ -15,8 +15,7 @@
 ;; limitations under the License.
 (ns org.apache.storm.logviewer-test
   (:use [org.apache.storm config util])
-  (:require [org.apache.storm.daemon [logviewer :as logviewer]
-                                   [supervisor :as supervisor]])
+  (:require [org.apache.storm.daemon [logviewer :as logviewer]])
   (:require [conjure.core])
   (:use [clojure test])
   (:use [conjure core])
@@ -24,7 +23,10 @@
         [org.apache.storm.ui helpers])
   (:import [org.apache.storm.daemon DirectoryCleaner]
            [org.apache.storm.utils Utils Time]
-           [org.apache.storm.utils.staticmocking UtilsInstaller])
+           [org.apache.storm.utils.staticmocking UtilsInstaller]
+           [org.apache.storm.daemon.supervisor SupervisorUtils]
+           [org.apache.storm.testing.staticmocking MockedSupervisorUtils]
+           [org.apache.storm.generated LSWorkerHeartbeat])
   (:import [java.nio.file Files Path DirectoryStream])
   (:import [java.nio.file Files])
   (:import [java.nio.file.attribute FileAttribute])
@@ -236,25 +238,33 @@
           mock-metaFile (mk-mock-File {:name "worker.yaml"
                                        :type :file})
           exp-id "id12345"
-          expected {exp-id port1-dir}]
-      (stubbing [supervisor/read-worker-heartbeats nil
-                 logviewer/get-metadata-file-for-wroker-logdir mock-metaFile
-                 logviewer/get-worker-id-from-metadata-file exp-id]
-        (is (= expected (logviewer/identify-worker-log-dirs [port1-dir])))))))
+          expected {exp-id port1-dir}
+          supervisor-util (Mockito/mock SupervisorUtils)]
+      (with-open [_ (MockedSupervisorUtils. supervisor-util)]
+        (stubbing [logviewer/get-metadata-file-for-wroker-logdir mock-metaFile
+                   logviewer/get-worker-id-from-metadata-file exp-id]
+          (. (Mockito/when (.readWorkerHeartbeatsImpl supervisor-util (Mockito/any))) (thenReturn nil))
+          (is (= expected (logviewer/identify-worker-log-dirs [port1-dir]))))))))
+
+
 
 (deftest test-get-dead-worker-dirs
   (testing "removes any files of workers that are still alive"
     (let [conf {SUPERVISOR-WORKER-TIMEOUT-SECS 5}
-          id->hb {"42" {:time-secs 1}}
+          hb (let[lwb (LSWorkerHeartbeat.)]
+                   (.set_time_secs lwb (int 1)) lwb)
+          id->hb {"42" hb}
           now-secs 2
           unexpected-dir (mk-mock-File {:name "dir1" :type :directory})
           expected-dir (mk-mock-File {:name "dir2" :type :directory})
-          log-dirs #{unexpected-dir expected-dir}]
+          log-dirs #{unexpected-dir expected-dir}
+          supervisor-util (Mockito/mock SupervisorUtils)]
+      (with-open [_ (MockedSupervisorUtils. supervisor-util)]
       (stubbing [logviewer/identify-worker-log-dirs {"42" unexpected-dir,
-                                                     "007" expected-dir}
-                 supervisor/read-worker-heartbeats id->hb]
+                                                     "007" expected-dir}]
+        (. (Mockito/when (.readWorkerHeartbeatsImpl supervisor-util (Mockito/any))) (thenReturn id->hb))
         (is (= #{expected-dir}
-              (logviewer/get-dead-worker-dirs conf now-secs log-dirs)))))))
+              (logviewer/get-dead-worker-dirs conf now-secs log-dirs))))))))
 
 (deftest test-cleanup-fn
   (testing "cleanup function forceDeletes files of dead workers"