You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/04/01 16:07:25 UTC

[02/35] storm git commit: port Supervisor to java

port Supervisor to java


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/08934e29
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/08934e29
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/08934e29

Branch: refs/heads/master
Commit: 08934e29982d3936c9e247a8d7bac563053f869f
Parents: 73312ad
Author: xiaojian.fxj <xi...@alibaba-inc.com>
Authored: Fri Feb 26 12:38:23 2016 +0800
Committer: xiaojian.fxj <xi...@alibaba-inc.com>
Committed: Fri Feb 26 12:38:23 2016 +0800

----------------------------------------------------------------------
 .../storm/daemon/supervisor/DaemonCommon.java   |  22 +
 .../DefaultUncaughtExceptionHandler.java        |  31 +
 .../supervisor/EventManagerPushCallback.java    |  37 +
 .../daemon/supervisor/RunProfilerActions.java   | 221 ++++++
 .../storm/daemon/supervisor/ShutdownWork.java   | 125 ++++
 .../daemon/supervisor/StandaloneSupervisor.java |  82 +++
 .../apache/storm/daemon/supervisor/State.java   |  22 +
 .../storm/daemon/supervisor/StateHeartbeat.java |  45 ++
 .../daemon/supervisor/SupervisorDaemon.java     |  28 +
 .../storm/daemon/supervisor/SupervisorData.java | 340 ++++++++++
 .../daemon/supervisor/SupervisorHeartbeat.java  |  84 +++
 .../daemon/supervisor/SupervisorManger.java     | 101 +++
 .../daemon/supervisor/SupervisorServer.java     | 212 ++++++
 .../daemon/supervisor/SupervisorUtils.java      | 173 +++++
 .../daemon/supervisor/SyncProcessEvent.java     | 674 +++++++++++++++++++
 .../daemon/supervisor/SyncSupervisorEvent.java  | 592 ++++++++++++++++
 .../storm/daemon/supervisor/UpdateBlobs.java    | 103 +++
 17 files changed, 2892 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/08934e29/storm-core/src/jvm/org/apache/storm/daemon/supervisor/DaemonCommon.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/DaemonCommon.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/DaemonCommon.java
new file mode 100644
index 0000000..3b7a18e
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/DaemonCommon.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+public interface DaemonCommon {
+    boolean isWaiting();
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/08934e29/storm-core/src/jvm/org/apache/storm/daemon/supervisor/DefaultUncaughtExceptionHandler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/DefaultUncaughtExceptionHandler.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/DefaultUncaughtExceptionHandler.java
new file mode 100644
index 0000000..8785f86
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/DefaultUncaughtExceptionHandler.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DefaultUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
+    private static final Logger LOG = LoggerFactory.getLogger(DefaultUncaughtExceptionHandler.class);
+    @Override
+    public void uncaughtException(Thread t, Throwable e) {
+        LOG.error("Error when processing event", e);
+        Utils.exitProcess(20, "Error when processing an event");
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/08934e29/storm-core/src/jvm/org/apache/storm/daemon/supervisor/EventManagerPushCallback.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/EventManagerPushCallback.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/EventManagerPushCallback.java
new file mode 100644
index 0000000..177bf67
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/EventManagerPushCallback.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import org.apache.storm.event.EventManager;
+
+public class EventManagerPushCallback implements Runnable {
+
+    private EventManager eventManager;
+
+    private Runnable cb;
+
+    public EventManagerPushCallback(Runnable cb, EventManager eventManager) {
+        this.eventManager = eventManager;
+        this.cb = cb;
+    }
+
+    @Override
+    public void run() {
+        eventManager.add(cb);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/08934e29/storm-core/src/jvm/org/apache/storm/daemon/supervisor/RunProfilerActions.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/RunProfilerActions.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/RunProfilerActions.java
new file mode 100644
index 0000000..209c067
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/RunProfilerActions.java
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.daemon.supervisor;
+
+import org.apache.storm.Config;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.generated.ProfileAction;
+import org.apache.storm.generated.ProfileRequest;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.*;
+
+public class RunProfilerActions implements Runnable {
+    private static Logger LOG = LoggerFactory.getLogger(RunProfilerActions.class);
+
+    private Map conf;
+    private IStormClusterState stormClusterState;
+    private String hostName;
+    private String stormHome;
+
+    private String profileCmd;
+
+    private SupervisorData supervisorData;
+
+    private class ActionExitCallback implements Utils.ExitCodeCallable {
+        private String stormId;
+        private ProfileRequest profileRequest;
+        private String logPrefix;
+
+        public ActionExitCallback(String stormId, ProfileRequest profileRequest, String logPrefix) {
+            this.stormId = stormId;
+            this.profileRequest = profileRequest;
+            this.logPrefix = logPrefix;
+        }
+
+        @Override
+        public Object call() throws Exception {
+            return null;
+        }
+
+        @Override
+        public Object call(int exitCode) {
+            LOG.info("{} profile-action exited for {}", logPrefix, exitCode);
+            try {
+                stormClusterState.deleteTopologyProfileRequests(stormId, profileRequest);
+            } catch (Exception e) {
+                LOG.warn("failed delete profileRequest: " + profileRequest);
+            }
+            return null;
+        }
+    }
+
+    public RunProfilerActions(SupervisorData supervisorData) {
+        this.conf = supervisorData.getConf();
+        this.stormClusterState = supervisorData.getStormClusterState();
+        this.hostName = supervisorData.getHostName();
+        this.stormHome = System.getProperty("storm.home");
+        this.profileCmd = (String) (conf.get(Config.WORKER_PROFILER_COMMAND));
+        this.supervisorData = supervisorData;
+    }
+
+    @Override
+    public void run() {
+        Map<String, List<ProfileRequest>> stormIdToActions = supervisorData.getStormIdToProfileActions();
+        try {
+            for (Map.Entry<String, List<ProfileRequest>> entry : stormIdToActions.entrySet()) {
+                String stormId = entry.getKey();
+                List<ProfileRequest> requests = entry.getValue();
+                if (requests != null) {
+                    for (ProfileRequest profileRequest : requests) {
+                        if (profileRequest.get_nodeInfo().get_node().equals(hostName)) {
+                            boolean stop = System.currentTimeMillis() > profileRequest.get_time_stamp() ? true : false;
+                            Long port = profileRequest.get_nodeInfo().get_port().iterator().next();
+                            String targetDir = ConfigUtils.workerArtifactsRoot(conf, String.valueOf(port));
+                            Map stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId);
+
+                            String user = null;
+                            if (stormConf.get(Config.TOPOLOGY_SUBMITTER_USER) != null) {
+                                user = (String) (stormConf.get(Config.TOPOLOGY_SUBMITTER_USER));
+                            }
+                            Map<String, String> env = null;
+                            if (stormConf.get(Config.TOPOLOGY_ENVIRONMENT) != null) {
+                                env = (Map<String, String>) stormConf.get(Config.TOPOLOGY_ENVIRONMENT);
+                            } else {
+                                env = new HashMap<String, String>();
+                            }
+
+                            String str = ConfigUtils.workerArtifactsPidPath(conf, stormId, port.intValue());
+                            StringBuilder stringBuilder = new StringBuilder();
+                            FileReader reader = null;
+                            BufferedReader br = null;
+                            try {
+                                reader = new FileReader(str);
+                                br = new BufferedReader(reader);
+                                int c;
+                                while ((c = br.read()) >= 0) {
+                                    stringBuilder.append(c);
+                                }
+                            } catch (IOException e) {
+                                if (reader != null)
+                                    reader.close();
+                                if (br != null)
+                                    br.close();
+                            }
+                            String workerPid = stringBuilder.toString().trim();
+                            ProfileAction profileAction = profileRequest.get_action();
+                            String logPrefix = "ProfilerAction process " + stormId + ":" + port + " PROFILER_ACTION: " + profileAction + " ";
+
+                            // Until PROFILER_STOP action is invalid, keep launching profiler start in case worker restarted
+                            // The profiler plugin script validates if JVM is recording before starting another recording.
+                            String command = mkCommand(profileAction, stop, workerPid, targetDir);
+                            List<String> listCommand = new ArrayList<>();
+                            if (command != null) {
+                                listCommand.addAll(Arrays.asList(command.split(" ")));
+                            }
+                            try {
+                                ActionExitCallback actionExitCallback = new ActionExitCallback(stormId, profileRequest, logPrefix);
+                                launchProfilerActionForWorker(user, targetDir, listCommand, env, actionExitCallback, logPrefix);
+                            } catch (IOException e) {
+                                LOG.error("Error in processing ProfilerAction '{}' for {}:{}, will retry later", profileAction, stormId, port);
+                            } catch (RuntimeException e) {
+                                LOG.error("Error in processing ProfilerAction '{}' for {}:{}, will retry later", profileAction, stormId, port);
+                            }
+                        }
+                    }
+                }
+            }
+        } catch (Exception e) {
+            LOG.error("Error running profiler actions, will retry again later");
+        }
+    }
+
+    private void launchProfilerActionForWorker(String user, String targetDir, List<String> commands, Map<String, String> environment,
+            final Utils.ExitCodeCallable exitCodeCallable, String logPrefix) throws IOException {
+        File targetFile = new File(targetDir);
+        if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
+            LOG.info("Running as user:{} command:{}", user, commands);
+            String containerFile = Utils.containerFilePath(targetDir);
+            if (Utils.checkFileExists(containerFile)) {
+                SupervisorUtils.rmrAsUser(conf, containerFile, containerFile);
+            }
+            String scriptFile = Utils.scriptFilePath(targetDir);
+            if (Utils.checkFileExists(scriptFile)) {
+                SupervisorUtils.rmrAsUser(conf, scriptFile, scriptFile);
+            }
+            String script = Utils.writeScript(targetDir, commands, environment);
+            List<String> newCommands = new ArrayList<>();
+            newCommands.add("profiler");
+            newCommands.add(targetDir);
+            newCommands.add(script);
+            SupervisorUtils.workerLauncher(conf, user, newCommands, environment, logPrefix, exitCodeCallable, targetFile);
+        } else {
+            Utils.launchProcess(commands, environment, logPrefix, exitCodeCallable, targetFile);
+        }
+    }
+
+    private String mkCommand(ProfileAction action, boolean stop, String workerPid, String targetDir) {
+        if (action == ProfileAction.JMAP_DUMP) {
+            return jmapDumpCmd(workerPid, targetDir);
+        } else if (action == ProfileAction.JSTACK_DUMP) {
+            return jstackDumpCmd(workerPid, targetDir);
+        } else if (action == ProfileAction.JPROFILE_DUMP) {
+            return jprofileDump(workerPid, targetDir);
+        } else if (action == ProfileAction.JVM_RESTART) {
+            return jprofileJvmRestart(workerPid);
+        } else if (!stop && action == ProfileAction.JPROFILE_STOP) {
+            return jprofileStart(workerPid);
+        } else if (stop && action == ProfileAction.JPROFILE_STOP) {
+            return jprofileStop(workerPid, targetDir);
+        }
+        return null;
+    }
+
+    private String jmapDumpCmd(String pid, String targetDir) {
+        return profileCmd + " " + pid + " jmap " + targetDir;
+    }
+
+    private String jstackDumpCmd(String pid, String targetDir) {
+        return profileCmd + " " + pid + " jstack " + targetDir;
+    }
+
+    private String jprofileStart(String pid) {
+        return profileCmd + " " + pid + " start";
+    }
+
+    private String jprofileStop(String pid, String targetDir) {
+        return profileCmd + " " + pid + " stop " + targetDir;
+    }
+
+    private String jprofileDump(String pid, String targetDir) {
+        return profileCmd + " " + pid + " dump " + targetDir;
+    }
+
+    private String jprofileJvmRestart(String pid) {
+        return profileCmd + " " + pid + " kill";
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/08934e29/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ShutdownWork.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ShutdownWork.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ShutdownWork.java
new file mode 100644
index 0000000..674454b
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ShutdownWork.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.storm.Config;
+import org.apache.storm.ProcessSimulator;
+import org.apache.storm.daemon.Shutdownable;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+
+public abstract class ShutdownWork implements Shutdownable {
+
+    private static Logger LOG = LoggerFactory.getLogger(ShutdownWork.class);
+
+    public void shutWorker(SupervisorData supervisorData, String workerId) throws IOException, InterruptedException {
+
+        LOG.info("Shutting down {}:{}", supervisorData.getSupervisorId(), workerId);
+        Map conf = supervisorData.getConf();
+        Collection<String> pids = Utils.readDirContents(ConfigUtils.workerPidsRoot(conf, workerId));
+        Integer shutdownSleepSecs = (Integer) conf.get(Config.SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS);
+        Boolean asUser = Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false);
+        String user = ConfigUtils.getWorkerUser(conf, workerId);
+        String threadPid = supervisorData.getWorkerThreadPidsAtom().get(workerId);
+        if (StringUtils.isNotBlank(threadPid)) {
+            ProcessSimulator.killProcess(threadPid);
+        }
+
+        for (String pid : pids) {
+            if (asUser) {
+                List<String> commands = new ArrayList<>();
+                commands.add("signal");
+                commands.add(pid);
+                commands.add("15");
+                String logPrefix = "kill - 15 " + pid;
+                SupervisorUtils.workerLauncherAndWait(conf, user, commands, null, logPrefix);
+            } else {
+                Utils.killProcessWithSigTerm(pid);
+            }
+        }
+
+        if (pids.size() > 0) {
+            LOG.info("Sleep {} seconds for execution of cleanup threads on worker.", shutdownSleepSecs);
+            Time.sleepSecs(shutdownSleepSecs);
+        }
+
+        for (String pid : pids) {
+            if (asUser) {
+                List<String> commands = new ArrayList<>();
+                commands.add("signal");
+                commands.add(pid);
+                commands.add("9");
+                String logPrefix = "kill - 9 " + pid;
+                SupervisorUtils.workerLauncherAndWait(conf, user, commands, null, logPrefix);
+            } else {
+                Utils.forceKillProcess(pid);
+            }
+            String path = ConfigUtils.workerPidPath(conf, workerId, pid);
+            if (asUser) {
+                SupervisorUtils.rmrAsUser(conf, workerId, path);
+            } else {
+                try {
+                    LOG.debug("Removing path {}", path);
+                    new File(path).delete();
+                } catch (Exception e) {
+                    // on windows, the supervisor may still holds the lock on the worker directory
+                    // ignore
+                }
+            }
+        }
+        tryCleanupWorker(conf, supervisorData, workerId);
+        LOG.info("Shut down {}:{}", supervisorData.getSupervisorId(), workerId);
+
+    }
+
+    protected void tryCleanupWorker(Map conf, SupervisorData supervisorData, String workerId) {
+        try {
+            String workerRoot = ConfigUtils.workerRoot(conf, workerId);
+            if (Utils.checkFileExists(workerRoot)) {
+                if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
+                    SupervisorUtils.rmrAsUser(conf, workerId, workerRoot);
+                } else {
+                    Utils.forceDelete(ConfigUtils.workerHeartbeatsRoot(conf, workerId));
+                    Utils.forceDelete(ConfigUtils.workerPidsRoot(conf, workerId));
+                    Utils.forceDelete(ConfigUtils.workerRoot(conf, workerId));
+                }
+                ConfigUtils.removeWorkerUserWSE(conf, workerId);
+                supervisorData.getDeadWorkers().remove(workerId);
+            }
+            if (conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN_ENABLE) != null) {
+                supervisorData.getResourceIsolationManager().releaseResourcesForWorker(workerId);
+            }
+        } catch (IOException e) {
+            LOG.warn("{} Failed to cleanup worker {}. Will retry later", e, workerId);
+        } catch (RuntimeException e) {
+            LOG.warn("{} Failed to cleanup worker {}. Will retry later", e, workerId);
+        }
+    }
+
+    @Override
+    public void shutdown() {
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/08934e29/storm-core/src/jvm/org/apache/storm/daemon/supervisor/StandaloneSupervisor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/StandaloneSupervisor.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/StandaloneSupervisor.java
new file mode 100644
index 0000000..da54b88
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/StandaloneSupervisor.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import org.apache.storm.Config;
+import org.apache.storm.scheduler.ISupervisor;
+import org.apache.storm.utils.LocalState;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.UUID;
+
+public class StandaloneSupervisor implements ISupervisor {
+
+    private String supervisorId;
+
+    private Map conf;
+
+    @Override
+    public void prepare(Map stormConf, String schedulerLocalDir) {
+        try {
+            LocalState localState = new LocalState(schedulerLocalDir);
+            String supervisorId = localState.getSupervisorId();
+            if (supervisorId == null) {
+                supervisorId = UUID.randomUUID().toString();
+                localState.setSupervisorId(supervisorId);
+            }
+            this.conf = stormConf;
+            this.supervisorId = supervisorId;
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public String getSupervisorId() {
+        return supervisorId;
+    }
+
+    @Override
+    public String getAssignmentId() {
+        return supervisorId;
+    }
+
+    @Override
+    // @return is vector which need be converted to be int
+    public Object getMetadata() {
+        Object ports = conf.get(Config.SUPERVISOR_SLOTS_PORTS);
+        return ports;
+    }
+
+    @Override
+    public boolean confirmAssigned(int port) {
+        return true;
+    }
+
+    @Override
+    public void killedWorker(int port) {
+
+    }
+
+    @Override
+    public void assigned(Collection<Integer> ports) {
+
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/08934e29/storm-core/src/jvm/org/apache/storm/daemon/supervisor/State.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/State.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/State.java
new file mode 100644
index 0000000..1913c91
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/State.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+public enum State {
+    valid, disallowed, notStarted, timedOut;
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/08934e29/storm-core/src/jvm/org/apache/storm/daemon/supervisor/StateHeartbeat.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/StateHeartbeat.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/StateHeartbeat.java
new file mode 100644
index 0000000..cca3fa2
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/StateHeartbeat.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import org.apache.commons.lang.builder.ToStringBuilder;
+import org.apache.commons.lang.builder.ToStringStyle;
+import org.apache.storm.generated.LSWorkerHeartbeat;
+
+public class StateHeartbeat {
+    private State state;
+    private LSWorkerHeartbeat hb;
+
+    public StateHeartbeat(State state, LSWorkerHeartbeat hb) {
+        this.state = state;
+        this.hb = hb;
+    }
+
+    public State getState() {
+        return this.state;
+    }
+
+    public LSWorkerHeartbeat getHeartbeat() {
+        return this.hb;
+    }
+
+    @Override
+    public String toString() {
+        return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/08934e29/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorDaemon.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorDaemon.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorDaemon.java
new file mode 100644
index 0000000..115c7c6
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorDaemon.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import java.util.Map;
+
+public interface SupervisorDaemon {
+    String getId();
+
+    Map getConf();
+
+    void shutdownAllWorkers();
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/08934e29/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorData.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorData.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorData.java
new file mode 100644
index 0000000..9eec253
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorData.java
@@ -0,0 +1,340 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import org.apache.storm.Config;
+import org.apache.storm.StormTimer;
+import org.apache.storm.cluster.ClusterStateContext;
+import org.apache.storm.cluster.ClusterUtils;
+import org.apache.storm.cluster.DaemonType;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.container.cgroup.CgroupManager;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.generated.ProfileRequest;
+import org.apache.storm.localizer.Localizer;
+import org.apache.storm.messaging.IContext;
+import org.apache.storm.scheduler.ISupervisor;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.utils.VersionInfo;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+import org.eclipse.jetty.util.ConcurrentHashSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class SupervisorData {
+
+    private static final Logger LOG = LoggerFactory.getLogger(SupervisorData.class);
+
+    private Map conf;
+    private IContext sharedContext;
+    private volatile boolean active;
+    private ISupervisor iSupervisor;
+    private Utils.UptimeComputer upTime;
+    private String stormVersion;
+
+    private ConcurrentHashMap<String, String> workerThreadPidsAtom; // for local mode
+
+    private IStormClusterState stormClusterState;
+
+    private LocalState localState;
+
+    private String supervisorId;
+
+    private String assignmentId;
+
+    private String hostName;
+
+    // used for reporting used ports when heartbeating
+    private ConcurrentHashMap<Long, LocalAssignment> currAssignment;
+
+    private StormTimer heartbeatTimer;
+
+    private StormTimer eventTimer;
+
+    private StormTimer blobUpdateTimer;
+
+    private Localizer localizer;
+
+    private ConcurrentHashMap<String, Map<String, Object>> assignmentVersions;
+
+    private AtomicInteger syncRetry;
+
+    private final Object downloadLock = new Object();
+
+    private ConcurrentHashMap<String, List<ProfileRequest>> stormIdToProfileActions;
+
+    private CgroupManager resourceIsolationManager;
+
+    private ConcurrentHashSet<String> deadWorkers;
+
+    public SupervisorData(Map conf, IContext sharedContext, ISupervisor iSupervisor) {
+        this.conf = conf;
+        this.sharedContext = sharedContext;
+        this.iSupervisor = iSupervisor;
+        this.active = true;
+        this.upTime = Utils.makeUptimeComputer();
+        this.stormVersion = VersionInfo.getVersion();
+        this.workerThreadPidsAtom = new ConcurrentHashMap<String, String>();
+        this.deadWorkers = new ConcurrentHashSet();
+
+        List<ACL> acls = null;
+        if (Utils.isZkAuthenticationConfiguredStormServer(conf)) {
+            acls = new ArrayList<>();
+            acls.add(ZooDefs.Ids.CREATOR_ALL_ACL.get(0));
+            acls.add(new ACL((ZooDefs.Perms.READ ^ ZooDefs.Perms.CREATE), ZooDefs.Ids.ANYONE_ID_UNSAFE));
+        }
+        try {
+            this.stormClusterState = ClusterUtils.mkStormClusterState(conf, acls, new ClusterStateContext(DaemonType.SUPERVISOR));
+        } catch (Exception e) {
+            LOG.error("supervisor can't create stormClusterState");
+            throw Utils.wrapInRuntime(e);
+        }
+
+        try {
+            this.localState = ConfigUtils.supervisorState(conf);
+            this.localizer = Utils.createLocalizer(conf, ConfigUtils.supervisorLocalDir(conf));
+        } catch (IOException e) {
+            throw Utils.wrapInRuntime(e);
+        }
+        this.supervisorId = iSupervisor.getSupervisorId();
+        this.assignmentId = iSupervisor.getAssignmentId();
+
+        try {
+            this.hostName = Utils.hostname(conf);
+        } catch (UnknownHostException e) {
+            throw Utils.wrapInRuntime(e);
+        }
+
+        this.currAssignment = new ConcurrentHashMap<>();
+
+        this.heartbeatTimer = new StormTimer(null, new DefaultUncaughtExceptionHandler());
+
+        this.eventTimer = new StormTimer(null, new DefaultUncaughtExceptionHandler());
+
+        this.blobUpdateTimer = new StormTimer("blob-update-timer", new DefaultUncaughtExceptionHandler());
+
+        this.assignmentVersions = new ConcurrentHashMap<>();
+        this.syncRetry = new AtomicInteger(0);
+        this.stormIdToProfileActions = new ConcurrentHashMap<>();
+        if (Utils.getBoolean(conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN_ENABLE), false)) {
+            try {
+                this.resourceIsolationManager = (CgroupManager) Utils.newInstance((String) conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN));
+                this.resourceIsolationManager.prepare(conf);
+                LOG.info("Using resource isolation plugin {} {}", conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN), resourceIsolationManager);
+            } catch (IOException e) {
+                throw Utils.wrapInRuntime(e);
+            }
+        } else {
+            this.resourceIsolationManager = null;
+        }
+    }
+
+    public ConcurrentHashMap<String, List<ProfileRequest>> getStormIdToProfileActions() {
+        return stormIdToProfileActions;
+    }
+
+    public void setStormIdToProfileActions(Map<String, List<ProfileRequest>> stormIdToProfileActions) {
+        this.stormIdToProfileActions.clear();
+        this.stormIdToProfileActions.putAll(stormIdToProfileActions);
+    }
+
+    public Map getConf() {
+        return conf;
+    }
+
+    public void setConf(Map conf) {
+        this.conf = conf;
+    }
+
+    public IContext getSharedContext() {
+        return sharedContext;
+    }
+
+    public void setSharedContext(IContext sharedContext) {
+        this.sharedContext = sharedContext;
+    }
+
+    public boolean isActive() {
+        return active;
+    }
+
+    public void setActive(boolean active) {
+        this.active = active;
+    }
+
+    public ISupervisor getiSupervisor() {
+        return iSupervisor;
+    }
+
+    public void setiSupervisor(ISupervisor iSupervisor) {
+        this.iSupervisor = iSupervisor;
+    }
+
+    public Utils.UptimeComputer getUpTime() {
+        return upTime;
+    }
+
+    public void setUpTime(Utils.UptimeComputer upTime) {
+        this.upTime = upTime;
+    }
+
+    public String getStormVersion() {
+        return stormVersion;
+    }
+
+    public void setStormVersion(String stormVersion) {
+        this.stormVersion = stormVersion;
+    }
+
+    public ConcurrentHashMap<String, String> getWorkerThreadPidsAtom() {
+        return workerThreadPidsAtom;
+    }
+
+    public void setWorkerThreadPidsAtom(ConcurrentHashMap<String, String> workerThreadPidsAtom) {
+        this.workerThreadPidsAtom = workerThreadPidsAtom;
+    }
+
+    public IStormClusterState getStormClusterState() {
+        return stormClusterState;
+    }
+
+    public void setStormClusterState(IStormClusterState stormClusterState) {
+        this.stormClusterState = stormClusterState;
+    }
+
+    public LocalState getLocalState() {
+        return localState;
+    }
+
+    public void setLocalState(LocalState localState) {
+        this.localState = localState;
+    }
+
+    public String getSupervisorId() {
+        return supervisorId;
+    }
+
+    public void setSupervisorId(String supervisorId) {
+        this.supervisorId = supervisorId;
+    }
+
+    public String getAssignmentId() {
+        return assignmentId;
+    }
+
+    public void setAssignmentId(String assignmentId) {
+        this.assignmentId = assignmentId;
+    }
+
+    public String getHostName() {
+        return hostName;
+    }
+
+    public void setHostName(String hostName) {
+        this.hostName = hostName;
+    }
+
+    public ConcurrentHashMap<Long, LocalAssignment> getCurrAssignment() {
+        return currAssignment;
+    }
+
+    public void setCurrAssignment(Map<Long, LocalAssignment> currAssignment) {
+        this.currAssignment.clear();
+        this.currAssignment.putAll(currAssignment);
+    }
+
+    public StormTimer getHeartbeatTimer() {
+        return heartbeatTimer;
+    }
+
+    public void setHeartbeatTimer(StormTimer heartbeatTimer) {
+        this.heartbeatTimer = heartbeatTimer;
+    }
+
+    public StormTimer getEventTimer() {
+        return eventTimer;
+    }
+
+    public void setEventTimer(StormTimer eventTimer) {
+        this.eventTimer = eventTimer;
+    }
+
+    public StormTimer getBlobUpdateTimer() {
+        return blobUpdateTimer;
+    }
+
+    public void setBlobUpdateTimer(StormTimer blobUpdateTimer) {
+        this.blobUpdateTimer = blobUpdateTimer;
+    }
+
+    public Localizer getLocalizer() {
+        return localizer;
+    }
+
+    public void setLocalizer(Localizer localizer) {
+        this.localizer = localizer;
+    }
+
+    public AtomicInteger getSyncRetry() {
+        return syncRetry;
+    }
+
+    public void setSyncRetry(AtomicInteger syncRetry) {
+        this.syncRetry = syncRetry;
+    }
+
+    public ConcurrentHashMap<String, Map<String, Object>> getAssignmentVersions() {
+        return assignmentVersions;
+    }
+
+    public void setAssignmentVersions(Map<String, Map<String, Object>> assignmentVersions) {
+        this.assignmentVersions.clear();
+        this.assignmentVersions.putAll(assignmentVersions);
+    }
+
+    public CgroupManager getResourceIsolationManager() {
+        return resourceIsolationManager;
+    }
+
+    public void setResourceIsolationManager(CgroupManager resourceIsolationManager) {
+        this.resourceIsolationManager = resourceIsolationManager;
+    }
+
+    public Object getDownloadLock() {
+        return downloadLock;
+    }
+
+    public ConcurrentHashSet getDeadWorkers() {
+        return deadWorkers;
+    }
+
+    public void setDeadWorkers(ConcurrentHashSet deadWorkers) {
+        this.deadWorkers = deadWorkers;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/08934e29/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorHeartbeat.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorHeartbeat.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorHeartbeat.java
new file mode 100644
index 0000000..399dcd2
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorHeartbeat.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import org.apache.storm.Config;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.generated.SupervisorInfo;
+import org.apache.storm.utils.Time;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class SupervisorHeartbeat implements Runnable {
+
+    private IStormClusterState stormClusterState;
+    private String supervisorId;
+    private Map conf;
+    private SupervisorInfo supervisorInfo;
+
+    private SupervisorData supervisorData;
+
+    public SupervisorHeartbeat(Map conf, SupervisorData supervisorData) {
+        this.stormClusterState = supervisorData.getStormClusterState();
+        this.supervisorId = supervisorData.getSupervisorId();
+        this.supervisorData = supervisorData;
+        this.conf = conf;
+    }
+
+    private SupervisorInfo update(Map conf, SupervisorData supervisorData) {
+        supervisorInfo = new SupervisorInfo();
+        supervisorInfo.set_time_secs(Time.currentTimeSecs());
+        supervisorInfo.set_hostname(supervisorData.getHostName());
+        supervisorInfo.set_assignment_id(supervisorData.getAssignmentId());
+
+        List<Long> usedPorts = new ArrayList<>();
+        usedPorts.addAll(supervisorData.getCurrAssignment().keySet());
+        supervisorInfo.set_used_ports(usedPorts);
+        List<Long> portList = new ArrayList<>();
+        Object metas = supervisorData.getiSupervisor().getMetadata();
+        if (metas != null) {
+            for (Integer port : (List<Integer>) metas) {
+                portList.add(port.longValue());
+            }
+        }
+        supervisorInfo.set_meta(portList);
+        supervisorInfo.set_scheduler_meta((Map<String, String>) conf.get(Config.SUPERVISOR_SCHEDULER_META));
+        supervisorInfo.set_uptime_secs(supervisorData.getUpTime().upTime());
+        supervisorInfo.set_version(supervisorData.getStormVersion());
+        supervisorInfo.set_resources_map(mkSupervisorCapacities(conf));
+        return supervisorInfo;
+    }
+
+    private Map<String, Double> mkSupervisorCapacities(Map conf) {
+        Map<String, Double> ret = new HashMap<String, Double>();
+        Double mem = (double) (conf.get(Config.SUPERVISOR_MEMORY_CAPACITY_MB));
+        ret.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, mem);
+        Double cpu = (double) (conf.get(Config.SUPERVISOR_CPU_CAPACITY));
+        ret.put(Config.SUPERVISOR_CPU_CAPACITY, cpu);
+        return ret;
+    }
+
+    @Override
+    public void run() {
+        SupervisorInfo supervisorInfo = update(conf, supervisorData);
+        stormClusterState.supervisorHeartbeat(supervisorId, supervisorInfo);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/08934e29/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorManger.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorManger.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorManger.java
new file mode 100644
index 0000000..acc2cb8
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorManger.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import org.apache.storm.event.EventManager;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Map;
+
+public class SupervisorManger extends ShutdownWork implements SupervisorDaemon, DaemonCommon, Runnable {
+
+    private static final Logger LOG = LoggerFactory.getLogger(SupervisorManger.class);
+
+    private final EventManager eventManager;
+
+    private final EventManager processesEventManager;
+
+    private SupervisorData supervisorData;
+
+    public SupervisorManger(SupervisorData supervisorData, EventManager eventManager, EventManager processesEventManager) {
+        this.eventManager = eventManager;
+        this.supervisorData = supervisorData;
+        this.processesEventManager = processesEventManager;
+    }
+
+    @Override
+    public void shutdown() {
+        LOG.info("Shutting down supervisor{}", supervisorData.getSupervisorId());
+        supervisorData.setActive(false);
+        try {
+            supervisorData.getHeartbeatTimer().close();
+            supervisorData.getEventTimer().close();
+            supervisorData.getBlobUpdateTimer().close();
+            eventManager.close();
+            processesEventManager.close();
+        } catch (Exception e) {
+            throw Utils.wrapInRuntime(e);
+        }
+        supervisorData.getStormClusterState().disconnect();
+    }
+
+    @Override
+    public void shutdownAllWorkers() {
+
+        Collection<String> workerIds = SupervisorUtils.supervisorWorkerIds(supervisorData.getConf());
+        try {
+            for (String workerId : workerIds) {
+                shutWorker(supervisorData, workerId);
+            }
+        } catch (Exception e) {
+            LOG.error("shutWorker failed");
+            throw Utils.wrapInRuntime(e);
+        }
+    }
+
+    @Override
+    public Map getConf() {
+        return supervisorData.getConf();
+    }
+
+    @Override
+    public String getId() {
+        return supervisorData.getSupervisorId();
+    }
+
+    @Override
+    public boolean isWaiting() {
+        if (!supervisorData.isActive()) {
+            return true;
+        }
+
+        if (supervisorData.getHeartbeatTimer().isTimerWaiting() && supervisorData.getEventTimer().isTimerWaiting() && eventManager.waiting()
+                && processesEventManager.waiting()) {
+            return true;
+        }
+        return false;
+    }
+
+    public void run() {
+        shutdown();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/08934e29/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorServer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorServer.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorServer.java
new file mode 100644
index 0000000..f1dfb8a
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorServer.java
@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.commons.io.FileUtils;
+import org.apache.storm.Config;
+import org.apache.storm.StormTimer;
+import org.apache.storm.command.HealthCheck;
+import org.apache.storm.daemon.metrics.MetricsUtils;
+import org.apache.storm.daemon.metrics.reporters.PreparableReporter;
+import org.apache.storm.event.EventManagerImp;
+import org.apache.storm.localizer.Localizer;
+import org.apache.storm.messaging.IContext;
+import org.apache.storm.scheduler.ISupervisor;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.utils.VersionInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.InterruptedIOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class SupervisorServer extends ShutdownWork {
+    private static Logger LOG = LoggerFactory.getLogger(SupervisorServer.class);
+
+    /**
+     * in local state, supervisor stores who its current assignments are another thread launches events to restart any dead processes if necessary
+     * 
+     * @param conf
+     * @param sharedContext
+     * @param iSupervisor
+     * @return
+     * @throws Exception
+     */
+    private SupervisorManger mkSupervisor(final Map conf, IContext sharedContext, ISupervisor iSupervisor) throws Exception {
+        SupervisorManger supervisorManger = null;
+        try {
+            LOG.info("Starting Supervisor with conf {}", conf);
+            iSupervisor.prepare(conf, ConfigUtils.supervisorIsupervisorDir(conf));
+            String path = ConfigUtils.supervisorTmpDir(conf);
+            FileUtils.cleanDirectory(new File(path));
+
+            final SupervisorData supervisorData = new SupervisorData(conf, sharedContext, iSupervisor);
+            Localizer localizer = supervisorData.getLocalizer();
+
+            SupervisorHeartbeat hb = new SupervisorHeartbeat(conf, supervisorData);
+            hb.run();
+            // should synchronize supervisor so it doesn't launch anything after being down (optimization)
+            Integer heartbeatFrequency = (Integer) conf.get(Config.SUPERVISOR_HEARTBEAT_FREQUENCY_SECS);
+            supervisorData.getHeartbeatTimer().scheduleRecurring(0, heartbeatFrequency, hb);
+
+            Set<String> downdedStormId = SupervisorUtils.readDownLoadedStormIds(conf);
+            for (String stormId : downdedStormId) {
+                SupervisorUtils.addBlobReferences(localizer, stormId, conf);
+            }
+            // do this after adding the references so we don't try to clean things being used
+            localizer.startCleaner();
+
+            EventManagerImp syncSupEventManager = new EventManagerImp(false);
+            EventManagerImp syncProcessManager = new EventManagerImp(false);
+            SyncProcessEvent syncProcessEvent = new SyncProcessEvent(supervisorData);
+            SyncSupervisorEvent syncSupervisorEvent = new SyncSupervisorEvent(supervisorData, syncProcessEvent, syncSupEventManager, syncProcessManager);
+            UpdateBlobs updateBlobsThread = new UpdateBlobs(supervisorData);
+            RunProfilerActions runProfilerActionThread = new RunProfilerActions(supervisorData);
+
+            if ((Boolean) conf.get(Config.SUPERVISOR_ENABLE)) {
+                StormTimer eventTimer = supervisorData.getEventTimer();
+                // This isn't strictly necessary, but it doesn't hurt and ensures that the machine stays up
+                // to date even if callbacks don't all work exactly right
+                eventTimer.scheduleRecurring(0, 10, new EventManagerPushCallback(syncSupervisorEvent, syncSupEventManager));
+
+                eventTimer.scheduleRecurring(0, (Integer) conf.get(Config.SUPERVISOR_MONITOR_FREQUENCY_SECS),
+                        new EventManagerPushCallback(syncProcessEvent, syncProcessManager));
+
+                // Blob update thread. Starts with 30 seconds delay, every 30 seconds
+                supervisorData.getBlobUpdateTimer().scheduleRecurring(30, 30, new EventManagerPushCallback(updateBlobsThread, syncSupEventManager));
+
+                // supervisor health check
+                eventTimer.scheduleRecurring(300, 300, new Runnable() {
+                    @Override
+                    public void run() {
+                        int healthCode = HealthCheck.healthCheck(conf);
+                        Collection<String> workerIds = SupervisorUtils.supervisorWorkerIds(conf);
+                        if (healthCode != 0) {
+                            for (String workerId : workerIds) {
+                                try {
+                                    shutWorker(supervisorData, workerId);
+                                } catch (Exception e) {
+                                    throw Utils.wrapInRuntime(e);
+                                }
+                            }
+                        }
+                    }
+                });
+
+                // Launch a thread that Runs profiler commands . Starts with 30 seconds delay, every 30 seconds
+                eventTimer.scheduleRecurring(30, 30, new EventManagerPushCallback(runProfilerActionThread, syncSupEventManager));
+            }
+            supervisorManger = new SupervisorManger(supervisorData, syncSupEventManager, syncProcessManager);
+        } catch (Throwable t) {
+            if (Utils.exceptionCauseIsInstanceOf(InterruptedIOException.class, t)) {
+                throw t;
+            } else if (Utils.exceptionCauseIsInstanceOf(InterruptedException.class, t)) {
+                throw t;
+            } else {
+                LOG.error("Error on initialization of server supervisor");
+                Utils.exitProcess(13, "Error on initialization");
+            }
+        }
+        return supervisorManger;
+    }
+
+    /**
+     * start local supervisor
+     */
+    public void localLaunch() {
+        LOG.info("Starting supervisor for storm version '{}'.", VersionInfo.getVersion());
+        SupervisorManger supervisorManager;
+        try {
+            Map<Object, Object> conf = Utils.readStormConfig();
+            if (!ConfigUtils.isLocalMode(conf)) {
+                throw new IllegalArgumentException("Cannot start server in distribute mode!");
+            }
+            ISupervisor iSupervisor = new StandaloneSupervisor();
+            supervisorManager = mkSupervisor(conf, null, iSupervisor);
+            if (supervisorManager != null)
+                Utils.addShutdownHookWithForceKillIn1Sec(supervisorManager);
+        } catch (Exception e) {
+            LOG.error("Failed to start supervisor\n", e);
+            System.exit(1);
+        }
+    }
+
+    /**
+     * start distribute supervisor
+     */
+    private void distributeLaunch() {
+        LOG.info("Starting supervisor for storm version '{}'.", VersionInfo.getVersion());
+        SupervisorManger supervisorManager;
+        try {
+            Map<Object, Object> conf = Utils.readStormConfig();
+            if (ConfigUtils.isLocalMode(conf)) {
+                throw new IllegalArgumentException("Cannot start server in local mode!");
+            }
+            ISupervisor iSupervisor = new StandaloneSupervisor();
+            supervisorManager = mkSupervisor(conf, null, iSupervisor);
+            if (supervisorManager != null)
+                Utils.addShutdownHookWithForceKillIn1Sec(supervisorManager);
+            registerWorkerNumGauge("drpc:num-execute-http-requests", conf);
+            startMetricsReporters(conf);
+        } catch (Exception e) {
+            LOG.error("Failed to start supervisor\n", e);
+            System.exit(1);
+        }
+    }
+
+    // To be removed
+    private void registerWorkerNumGauge(String name, final Map conf) {
+        MetricRegistry metricRegistry = new MetricRegistry();
+        metricRegistry.remove(name);
+        metricRegistry.register(name, new Gauge<Integer>() {
+            @Override
+            public Integer getValue() {
+                Collection<String> pids = Utils.readDirContents(ConfigUtils.workerRoot(conf));
+                return pids.size();
+            }
+        });
+    }
+
+    // To be removed
+    private void startMetricsReporters(Map conf) {
+        List<PreparableReporter> preparableReporters = MetricsUtils.getPreparableReporters(conf);
+        for (PreparableReporter reporter : preparableReporters) {
+            reporter.prepare(new MetricRegistry(), conf);
+            reporter.start();
+        }
+        LOG.info("Started statistics report plugin...");
+    }
+
+    /**
+     * supervisor daemon enter entrance
+     *
+     * @param args
+     */
+    public static void main(String[] args) {
+        Utils.setupDefaultUncaughtExceptionHandler();
+        SupervisorServer instance = new SupervisorServer();
+        instance.distributeLaunch();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/08934e29/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java
new file mode 100644
index 0000000..ffdb839
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.curator.utils.PathUtils;
+import org.apache.storm.Config;
+import org.apache.storm.localizer.LocalResource;
+import org.apache.storm.localizer.Localizer;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URLDecoder;
+import java.util.*;
+
+public class SupervisorUtils {
+
+    private static final Logger LOG = LoggerFactory.getLogger(SupervisorUtils.class);
+
+    public static Process workerLauncher(Map conf, String user, List<String> args, Map<String, String> environment, final String logPreFix,
+            final Utils.ExitCodeCallable exitCodeCallback, File dir) throws IOException {
+        if (StringUtils.isBlank(user)) {
+            throw new IllegalArgumentException("User cannot be blank when calling workerLauncher.");
+        }
+        String wlinitial = (String) (conf.get(Config.SUPERVISOR_WORKER_LAUNCHER));
+        String stormHome = System.getProperty("storm.home");
+        String wl;
+        if (StringUtils.isNotBlank(wlinitial)) {
+            wl = wlinitial;
+        } else {
+            wl = stormHome + "/bin/worker-launcher";
+        }
+        List<String> commands = new ArrayList<>();
+        commands.add(wl);
+        commands.add(user);
+        commands.addAll(args);
+        return Utils.launchProcess(commands, environment, logPreFix, exitCodeCallback, dir);
+    }
+
+    public static int workerLauncherAndWait(Map conf, String user, List<String> args, final Map<String, String> environment, final String logPreFix)
+            throws IOException {
+        int ret = 0;
+        Process process = workerLauncher(conf, user, args, environment, logPreFix, null, null);
+        if (StringUtils.isNotBlank(logPreFix))
+            Utils.readAndLogStream(logPreFix, process.getInputStream());
+        try {
+            process.waitFor();
+        } catch (InterruptedException e) {
+            LOG.info("{} interrupted.", logPreFix);
+        }
+        ret = process.exitValue();
+        return ret;
+    }
+
+    public static void setupStormCodeDir(Map conf, Map stormConf, String dir) throws IOException {
+        if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
+            String logPrefix = "setup conf for " + dir;
+            List<String> commands = new ArrayList<>();
+            commands.add("code-dir");
+            commands.add(dir);
+            workerLauncherAndWait(conf, (String) (stormConf.get(Config.TOPOLOGY_SUBMITTER_USER)), commands, null, logPrefix);
+        }
+    }
+
+    public static void rmrAsUser(Map conf, String id, String path) throws IOException {
+        String user = Utils.getFileOwner(path);
+        String logPreFix = "rmr " + id;
+        List<String> commands = new ArrayList<>();
+        commands.add("rmr");
+        commands.add(path);
+        SupervisorUtils.workerLauncherAndWait(conf, user, commands, null, logPreFix);
+        if (Utils.checkFileExists(path)) {
+            throw new RuntimeException(path + " was not deleted.");
+        }
+    }
+
+    /**
+     * Given the blob information returns the value of the uncompress field, handling it either being a string or a boolean value, or if it's not specified then
+     * returns false
+     * 
+     * @param blobInfo
+     * @return
+     */
+    public static Boolean isShouldUncompressBlob(Map<String, Object> blobInfo) {
+        return new Boolean((String) blobInfo.get("uncompress"));
+    }
+
+    /**
+     * Remove a reference to a blob when its no longer needed
+     * 
+     * @param blobstoreMap
+     * @return
+     */
+    public static List<LocalResource> blobstoreMapToLocalresources(Map<String, Map<String, Object>> blobstoreMap) {
+        List<LocalResource> localResourceList = new ArrayList<>();
+        if (blobstoreMap != null) {
+            for (Map.Entry<String, Map<String, Object>> map : blobstoreMap.entrySet()) {
+                LocalResource localResource = new LocalResource(map.getKey(), isShouldUncompressBlob(map.getValue()));
+                localResourceList.add(localResource);
+            }
+        }
+        return localResourceList;
+    }
+
+    /**
+     * For each of the downloaded topologies, adds references to the blobs that the topologies are using. This is used to reconstruct the cache on restart.
+     * 
+     * @param localizer
+     * @param stormId
+     * @param conf
+     */
+    public static void addBlobReferences(Localizer localizer, String stormId, Map conf) throws IOException {
+        Map stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId);
+        Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String, Object>>) stormConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
+        String user = (String) stormConf.get(Config.TOPOLOGY_SUBMITTER_USER);
+        String topoName = (String) stormConf.get(Config.TOPOLOGY_NAME);
+        List<LocalResource> localresources = SupervisorUtils.blobstoreMapToLocalresources(blobstoreMap);
+        if (blobstoreMap != null) {
+            localizer.addReferences(localresources, user, topoName);
+        }
+    }
+
+    public static Set<String> readDownLoadedStormIds(Map conf) throws IOException {
+        Set<String> stormIds = new HashSet<>();
+        String path = ConfigUtils.supervisorStormDistRoot(conf);
+        Collection<String> rets = Utils.readDirContents(path);
+        for (String ret : rets) {
+            stormIds.add(URLDecoder.decode(ret));
+        }
+        return stormIds;
+    }
+
+    public static Collection<String> supervisorWorkerIds(Map conf) {
+        String workerRoot = ConfigUtils.workerRoot(conf);
+        return Utils.readDirContents(workerRoot);
+    }
+
+    public static boolean checkTopoFilesExist(Map conf, String stormId) throws IOException {
+        String stormroot = ConfigUtils.supervisorStormDistRoot(conf, stormId);
+        String stormjarpath = ConfigUtils.supervisorStormJarPath(stormroot);
+        String stormcodepath = ConfigUtils.supervisorStormCodePath(stormroot);
+        String stormconfpath = ConfigUtils.supervisorStormConfPath(stormroot);
+        if (!Utils.checkFileExists(stormroot))
+            return false;
+        if (!Utils.checkFileExists(stormcodepath))
+            return false;
+        if (!Utils.checkFileExists(stormconfpath))
+            return false;
+        if (!ConfigUtils.isLocalMode(conf) && !Utils.checkFileExists(stormjarpath))
+            return false;
+        return true;
+    }
+
+}