You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/04/01 16:07:26 UTC

[03/35] storm git commit: update supervisor's structure

update supervisor's structure


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b281c735
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b281c735
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b281c735

Branch: refs/heads/master
Commit: b281c735f0089d24407af67586a1b41de45ac382
Parents: 08934e2
Author: xiaojian.fxj <xi...@alibaba-inc.com>
Authored: Fri Feb 26 13:15:56 2016 +0800
Committer: xiaojian.fxj <xi...@alibaba-inc.com>
Committed: Fri Feb 26 13:15:56 2016 +0800

----------------------------------------------------------------------
 .../daemon/supervisor/RunProfilerActions.java   | 221 ------------------
 .../daemon/supervisor/SupervisorHeartbeat.java  |  84 -------
 .../daemon/supervisor/SupervisorServer.java     |  23 +-
 .../storm/daemon/supervisor/UpdateBlobs.java    | 103 ---------
 .../supervisor/timer/RunProfilerActions.java    | 223 +++++++++++++++++++
 .../supervisor/timer/SupervisorHealthCheck.java |  57 +++++
 .../supervisor/timer/SupervisorHeartbeat.java   |  85 +++++++
 .../daemon/supervisor/timer/UpdateBlobs.java    | 105 +++++++++
 8 files changed, 476 insertions(+), 425 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/b281c735/storm-core/src/jvm/org/apache/storm/daemon/supervisor/RunProfilerActions.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/RunProfilerActions.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/RunProfilerActions.java
deleted file mode 100644
index 209c067..0000000
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/RunProfilerActions.java
+++ /dev/null
@@ -1,221 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.daemon.supervisor;
-
-import org.apache.storm.Config;
-import org.apache.storm.cluster.IStormClusterState;
-import org.apache.storm.generated.ProfileAction;
-import org.apache.storm.generated.ProfileRequest;
-import org.apache.storm.utils.ConfigUtils;
-import org.apache.storm.utils.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
-import java.util.*;
-
-public class RunProfilerActions implements Runnable {
-    private static Logger LOG = LoggerFactory.getLogger(RunProfilerActions.class);
-
-    private Map conf;
-    private IStormClusterState stormClusterState;
-    private String hostName;
-    private String stormHome;
-
-    private String profileCmd;
-
-    private SupervisorData supervisorData;
-
-    private class ActionExitCallback implements Utils.ExitCodeCallable {
-        private String stormId;
-        private ProfileRequest profileRequest;
-        private String logPrefix;
-
-        public ActionExitCallback(String stormId, ProfileRequest profileRequest, String logPrefix) {
-            this.stormId = stormId;
-            this.profileRequest = profileRequest;
-            this.logPrefix = logPrefix;
-        }
-
-        @Override
-        public Object call() throws Exception {
-            return null;
-        }
-
-        @Override
-        public Object call(int exitCode) {
-            LOG.info("{} profile-action exited for {}", logPrefix, exitCode);
-            try {
-                stormClusterState.deleteTopologyProfileRequests(stormId, profileRequest);
-            } catch (Exception e) {
-                LOG.warn("failed delete profileRequest: " + profileRequest);
-            }
-            return null;
-        }
-    }
-
-    public RunProfilerActions(SupervisorData supervisorData) {
-        this.conf = supervisorData.getConf();
-        this.stormClusterState = supervisorData.getStormClusterState();
-        this.hostName = supervisorData.getHostName();
-        this.stormHome = System.getProperty("storm.home");
-        this.profileCmd = (String) (conf.get(Config.WORKER_PROFILER_COMMAND));
-        this.supervisorData = supervisorData;
-    }
-
-    @Override
-    public void run() {
-        Map<String, List<ProfileRequest>> stormIdToActions = supervisorData.getStormIdToProfileActions();
-        try {
-            for (Map.Entry<String, List<ProfileRequest>> entry : stormIdToActions.entrySet()) {
-                String stormId = entry.getKey();
-                List<ProfileRequest> requests = entry.getValue();
-                if (requests != null) {
-                    for (ProfileRequest profileRequest : requests) {
-                        if (profileRequest.get_nodeInfo().get_node().equals(hostName)) {
-                            boolean stop = System.currentTimeMillis() > profileRequest.get_time_stamp() ? true : false;
-                            Long port = profileRequest.get_nodeInfo().get_port().iterator().next();
-                            String targetDir = ConfigUtils.workerArtifactsRoot(conf, String.valueOf(port));
-                            Map stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId);
-
-                            String user = null;
-                            if (stormConf.get(Config.TOPOLOGY_SUBMITTER_USER) != null) {
-                                user = (String) (stormConf.get(Config.TOPOLOGY_SUBMITTER_USER));
-                            }
-                            Map<String, String> env = null;
-                            if (stormConf.get(Config.TOPOLOGY_ENVIRONMENT) != null) {
-                                env = (Map<String, String>) stormConf.get(Config.TOPOLOGY_ENVIRONMENT);
-                            } else {
-                                env = new HashMap<String, String>();
-                            }
-
-                            String str = ConfigUtils.workerArtifactsPidPath(conf, stormId, port.intValue());
-                            StringBuilder stringBuilder = new StringBuilder();
-                            FileReader reader = null;
-                            BufferedReader br = null;
-                            try {
-                                reader = new FileReader(str);
-                                br = new BufferedReader(reader);
-                                int c;
-                                while ((c = br.read()) >= 0) {
-                                    stringBuilder.append(c);
-                                }
-                            } catch (IOException e) {
-                                if (reader != null)
-                                    reader.close();
-                                if (br != null)
-                                    br.close();
-                            }
-                            String workerPid = stringBuilder.toString().trim();
-                            ProfileAction profileAction = profileRequest.get_action();
-                            String logPrefix = "ProfilerAction process " + stormId + ":" + port + " PROFILER_ACTION: " + profileAction + " ";
-
-                            // Until PROFILER_STOP action is invalid, keep launching profiler start in case worker restarted
-                            // The profiler plugin script validates if JVM is recording before starting another recording.
-                            String command = mkCommand(profileAction, stop, workerPid, targetDir);
-                            List<String> listCommand = new ArrayList<>();
-                            if (command != null) {
-                                listCommand.addAll(Arrays.asList(command.split(" ")));
-                            }
-                            try {
-                                ActionExitCallback actionExitCallback = new ActionExitCallback(stormId, profileRequest, logPrefix);
-                                launchProfilerActionForWorker(user, targetDir, listCommand, env, actionExitCallback, logPrefix);
-                            } catch (IOException e) {
-                                LOG.error("Error in processing ProfilerAction '{}' for {}:{}, will retry later", profileAction, stormId, port);
-                            } catch (RuntimeException e) {
-                                LOG.error("Error in processing ProfilerAction '{}' for {}:{}, will retry later", profileAction, stormId, port);
-                            }
-                        }
-                    }
-                }
-            }
-        } catch (Exception e) {
-            LOG.error("Error running profiler actions, will retry again later");
-        }
-    }
-
-    private void launchProfilerActionForWorker(String user, String targetDir, List<String> commands, Map<String, String> environment,
-            final Utils.ExitCodeCallable exitCodeCallable, String logPrefix) throws IOException {
-        File targetFile = new File(targetDir);
-        if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
-            LOG.info("Running as user:{} command:{}", user, commands);
-            String containerFile = Utils.containerFilePath(targetDir);
-            if (Utils.checkFileExists(containerFile)) {
-                SupervisorUtils.rmrAsUser(conf, containerFile, containerFile);
-            }
-            String scriptFile = Utils.scriptFilePath(targetDir);
-            if (Utils.checkFileExists(scriptFile)) {
-                SupervisorUtils.rmrAsUser(conf, scriptFile, scriptFile);
-            }
-            String script = Utils.writeScript(targetDir, commands, environment);
-            List<String> newCommands = new ArrayList<>();
-            newCommands.add("profiler");
-            newCommands.add(targetDir);
-            newCommands.add(script);
-            SupervisorUtils.workerLauncher(conf, user, newCommands, environment, logPrefix, exitCodeCallable, targetFile);
-        } else {
-            Utils.launchProcess(commands, environment, logPrefix, exitCodeCallable, targetFile);
-        }
-    }
-
-    private String mkCommand(ProfileAction action, boolean stop, String workerPid, String targetDir) {
-        if (action == ProfileAction.JMAP_DUMP) {
-            return jmapDumpCmd(workerPid, targetDir);
-        } else if (action == ProfileAction.JSTACK_DUMP) {
-            return jstackDumpCmd(workerPid, targetDir);
-        } else if (action == ProfileAction.JPROFILE_DUMP) {
-            return jprofileDump(workerPid, targetDir);
-        } else if (action == ProfileAction.JVM_RESTART) {
-            return jprofileJvmRestart(workerPid);
-        } else if (!stop && action == ProfileAction.JPROFILE_STOP) {
-            return jprofileStart(workerPid);
-        } else if (stop && action == ProfileAction.JPROFILE_STOP) {
-            return jprofileStop(workerPid, targetDir);
-        }
-        return null;
-    }
-
-    private String jmapDumpCmd(String pid, String targetDir) {
-        return profileCmd + " " + pid + " jmap " + targetDir;
-    }
-
-    private String jstackDumpCmd(String pid, String targetDir) {
-        return profileCmd + " " + pid + " jstack " + targetDir;
-    }
-
-    private String jprofileStart(String pid) {
-        return profileCmd + " " + pid + " start";
-    }
-
-    private String jprofileStop(String pid, String targetDir) {
-        return profileCmd + " " + pid + " stop " + targetDir;
-    }
-
-    private String jprofileDump(String pid, String targetDir) {
-        return profileCmd + " " + pid + " dump " + targetDir;
-    }
-
-    private String jprofileJvmRestart(String pid) {
-        return profileCmd + " " + pid + " kill";
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/b281c735/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorHeartbeat.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorHeartbeat.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorHeartbeat.java
deleted file mode 100644
index 399dcd2..0000000
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorHeartbeat.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.daemon.supervisor;
-
-import org.apache.storm.Config;
-import org.apache.storm.cluster.IStormClusterState;
-import org.apache.storm.generated.SupervisorInfo;
-import org.apache.storm.utils.Time;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class SupervisorHeartbeat implements Runnable {
-
-    private IStormClusterState stormClusterState;
-    private String supervisorId;
-    private Map conf;
-    private SupervisorInfo supervisorInfo;
-
-    private SupervisorData supervisorData;
-
-    public SupervisorHeartbeat(Map conf, SupervisorData supervisorData) {
-        this.stormClusterState = supervisorData.getStormClusterState();
-        this.supervisorId = supervisorData.getSupervisorId();
-        this.supervisorData = supervisorData;
-        this.conf = conf;
-    }
-
-    private SupervisorInfo update(Map conf, SupervisorData supervisorData) {
-        supervisorInfo = new SupervisorInfo();
-        supervisorInfo.set_time_secs(Time.currentTimeSecs());
-        supervisorInfo.set_hostname(supervisorData.getHostName());
-        supervisorInfo.set_assignment_id(supervisorData.getAssignmentId());
-
-        List<Long> usedPorts = new ArrayList<>();
-        usedPorts.addAll(supervisorData.getCurrAssignment().keySet());
-        supervisorInfo.set_used_ports(usedPorts);
-        List<Long> portList = new ArrayList<>();
-        Object metas = supervisorData.getiSupervisor().getMetadata();
-        if (metas != null) {
-            for (Integer port : (List<Integer>) metas) {
-                portList.add(port.longValue());
-            }
-        }
-        supervisorInfo.set_meta(portList);
-        supervisorInfo.set_scheduler_meta((Map<String, String>) conf.get(Config.SUPERVISOR_SCHEDULER_META));
-        supervisorInfo.set_uptime_secs(supervisorData.getUpTime().upTime());
-        supervisorInfo.set_version(supervisorData.getStormVersion());
-        supervisorInfo.set_resources_map(mkSupervisorCapacities(conf));
-        return supervisorInfo;
-    }
-
-    private Map<String, Double> mkSupervisorCapacities(Map conf) {
-        Map<String, Double> ret = new HashMap<String, Double>();
-        Double mem = (double) (conf.get(Config.SUPERVISOR_MEMORY_CAPACITY_MB));
-        ret.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, mem);
-        Double cpu = (double) (conf.get(Config.SUPERVISOR_CPU_CAPACITY));
-        ret.put(Config.SUPERVISOR_CPU_CAPACITY, cpu);
-        return ret;
-    }
-
-    @Override
-    public void run() {
-        SupervisorInfo supervisorInfo = update(conf, supervisorData);
-        stormClusterState.supervisorHeartbeat(supervisorId, supervisorInfo);
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/b281c735/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorServer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorServer.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorServer.java
index f1dfb8a..fd31631 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorServer.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorServer.java
@@ -25,6 +25,10 @@ import org.apache.storm.StormTimer;
 import org.apache.storm.command.HealthCheck;
 import org.apache.storm.daemon.metrics.MetricsUtils;
 import org.apache.storm.daemon.metrics.reporters.PreparableReporter;
+import org.apache.storm.daemon.supervisor.timer.RunProfilerActions;
+import org.apache.storm.daemon.supervisor.timer.SupervisorHealthCheck;
+import org.apache.storm.daemon.supervisor.timer.SupervisorHeartbeat;
+import org.apache.storm.daemon.supervisor.timer.UpdateBlobs;
 import org.apache.storm.event.EventManagerImp;
 import org.apache.storm.localizer.Localizer;
 import org.apache.storm.messaging.IContext;
@@ -42,7 +46,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-public class SupervisorServer extends ShutdownWork {
+public class SupervisorServer {
     private static Logger LOG = LoggerFactory.getLogger(SupervisorServer.class);
 
     /**
@@ -98,22 +102,7 @@ public class SupervisorServer extends ShutdownWork {
                 supervisorData.getBlobUpdateTimer().scheduleRecurring(30, 30, new EventManagerPushCallback(updateBlobsThread, syncSupEventManager));
 
                 // supervisor health check
-                eventTimer.scheduleRecurring(300, 300, new Runnable() {
-                    @Override
-                    public void run() {
-                        int healthCode = HealthCheck.healthCheck(conf);
-                        Collection<String> workerIds = SupervisorUtils.supervisorWorkerIds(conf);
-                        if (healthCode != 0) {
-                            for (String workerId : workerIds) {
-                                try {
-                                    shutWorker(supervisorData, workerId);
-                                } catch (Exception e) {
-                                    throw Utils.wrapInRuntime(e);
-                                }
-                            }
-                        }
-                    }
-                });
+                eventTimer.scheduleRecurring(300, 300, new SupervisorHealthCheck(supervisorData));
 
                 // Launch a thread that Runs profiler commands . Starts with 30 seconds delay, every 30 seconds
                 eventTimer.scheduleRecurring(30, 30, new EventManagerPushCallback(runProfilerActionThread, syncSupEventManager));

http://git-wip-us.apache.org/repos/asf/storm/blob/b281c735/storm-core/src/jvm/org/apache/storm/daemon/supervisor/UpdateBlobs.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/UpdateBlobs.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/UpdateBlobs.java
deleted file mode 100644
index 90dccae..0000000
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/UpdateBlobs.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.daemon.supervisor;
-
-import org.apache.storm.Config;
-import org.apache.storm.generated.AuthorizationException;
-import org.apache.storm.generated.KeyNotFoundException;
-import org.apache.storm.generated.LocalAssignment;
-import org.apache.storm.localizer.LocalResource;
-import org.apache.storm.localizer.Localizer;
-import org.apache.storm.utils.ConfigUtils;
-import org.apache.storm.utils.NimbusLeaderNotFoundException;
-import org.apache.storm.utils.Utils;
-import org.apache.thrift.transport.TTransportException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * downloads all blobs listed in the topology configuration for all topologies assigned to this supervisor, and creates version files with a suffix. The
- * Runnable is intended to be run periodically by a timer, created elsewhere.
- */
-public class UpdateBlobs implements Runnable {
-
-    private static final Logger LOG = LoggerFactory.getLogger(UpdateBlobs.class);
-
-    private SupervisorData supervisorData;
-
-    public UpdateBlobs(SupervisorData supervisorData) {
-        this.supervisorData = supervisorData;
-    }
-
-    @Override
-    public void run() {
-        try {
-            Map conf = supervisorData.getConf();
-            Set<String> downloadedStormIds = SupervisorUtils.readDownLoadedStormIds(conf);
-            ConcurrentHashMap<Long, LocalAssignment> newAssignment = supervisorData.getCurrAssignment();
-            Set<String> assignedStormIds = new HashSet<>();
-            for (LocalAssignment localAssignment : newAssignment.values()) {
-                assignedStormIds.add(localAssignment.get_topology_id());
-            }
-            for (String stormId : downloadedStormIds) {
-                if (assignedStormIds.contains(stormId)) {
-                    String stormRoot = ConfigUtils.supervisorStormDistRoot(conf, stormId);
-                    LOG.debug("Checking Blob updates for storm topology id {} With target_dir: {}", stormId, stormRoot);
-                    updateBlobsForTopology(conf, stormId, supervisorData.getLocalizer());
-                }
-            }
-        } catch (Exception e) {
-            if (Utils.exceptionCauseIsInstanceOf(TTransportException.class, e)) {
-                LOG.error("Network error while updating blobs, will retry again later", e);
-            } else if (Utils.exceptionCauseIsInstanceOf(NimbusLeaderNotFoundException.class, e)) {
-                LOG.error("Nimbus unavailable to update blobs, will retry again later", e);
-            } else {
-                throw Utils.wrapInRuntime(e);
-            }
-        }
-    }
-
-    /**
-     * Update each blob listed in the topology configuration if the latest version of the blob has not been downloaded.
-     * 
-     * @param conf
-     * @param stormId
-     * @param localizer
-     * @throws IOException
-     */
-    private void updateBlobsForTopology(Map conf, String stormId, Localizer localizer) throws IOException {
-        Map stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId);
-        Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String, Object>>) stormConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
-        String user = (String) stormConf.get(Config.TOPOLOGY_SUBMITTER_USER);
-        List<LocalResource> localresources = SupervisorUtils.blobstoreMapToLocalresources(blobstoreMap);
-        try {
-            localizer.updateBlobs(localresources, user);
-        } catch (AuthorizationException authExp) {
-            LOG.error("AuthorizationException error", authExp);
-        } catch (KeyNotFoundException knf) {
-            LOG.error("KeyNotFoundException error", knf);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/b281c735/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java
new file mode 100644
index 0000000..2d73327
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.daemon.supervisor.timer;
+
+import org.apache.storm.Config;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.daemon.supervisor.SupervisorData;
+import org.apache.storm.daemon.supervisor.SupervisorUtils;
+import org.apache.storm.generated.ProfileAction;
+import org.apache.storm.generated.ProfileRequest;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.*;
+
+public class RunProfilerActions implements Runnable {
+    private static Logger LOG = LoggerFactory.getLogger(RunProfilerActions.class);
+
+    private Map conf;
+    private IStormClusterState stormClusterState;
+    private String hostName;
+    private String stormHome;
+
+    private String profileCmd;
+
+    private SupervisorData supervisorData;
+
+    private class ActionExitCallback implements Utils.ExitCodeCallable {
+        private String stormId;
+        private ProfileRequest profileRequest;
+        private String logPrefix;
+
+        public ActionExitCallback(String stormId, ProfileRequest profileRequest, String logPrefix) {
+            this.stormId = stormId;
+            this.profileRequest = profileRequest;
+            this.logPrefix = logPrefix;
+        }
+
+        @Override
+        public Object call() throws Exception {
+            return null;
+        }
+
+        @Override
+        public Object call(int exitCode) {
+            LOG.info("{} profile-action exited for {}", logPrefix, exitCode);
+            try {
+                stormClusterState.deleteTopologyProfileRequests(stormId, profileRequest);
+            } catch (Exception e) {
+                LOG.warn("failed delete profileRequest: " + profileRequest);
+            }
+            return null;
+        }
+    }
+
+    public RunProfilerActions(SupervisorData supervisorData) {
+        this.conf = supervisorData.getConf();
+        this.stormClusterState = supervisorData.getStormClusterState();
+        this.hostName = supervisorData.getHostName();
+        this.stormHome = System.getProperty("storm.home");
+        this.profileCmd = (String) (conf.get(Config.WORKER_PROFILER_COMMAND));
+        this.supervisorData = supervisorData;
+    }
+
+    @Override
+    public void run() {
+        Map<String, List<ProfileRequest>> stormIdToActions = supervisorData.getStormIdToProfileActions();
+        try {
+            for (Map.Entry<String, List<ProfileRequest>> entry : stormIdToActions.entrySet()) {
+                String stormId = entry.getKey();
+                List<ProfileRequest> requests = entry.getValue();
+                if (requests != null) {
+                    for (ProfileRequest profileRequest : requests) {
+                        if (profileRequest.get_nodeInfo().get_node().equals(hostName)) {
+                            boolean stop = System.currentTimeMillis() > profileRequest.get_time_stamp() ? true : false;
+                            Long port = profileRequest.get_nodeInfo().get_port().iterator().next();
+                            String targetDir = ConfigUtils.workerArtifactsRoot(conf, String.valueOf(port));
+                            Map stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId);
+
+                            String user = null;
+                            if (stormConf.get(Config.TOPOLOGY_SUBMITTER_USER) != null) {
+                                user = (String) (stormConf.get(Config.TOPOLOGY_SUBMITTER_USER));
+                            }
+                            Map<String, String> env = null;
+                            if (stormConf.get(Config.TOPOLOGY_ENVIRONMENT) != null) {
+                                env = (Map<String, String>) stormConf.get(Config.TOPOLOGY_ENVIRONMENT);
+                            } else {
+                                env = new HashMap<String, String>();
+                            }
+
+                            String str = ConfigUtils.workerArtifactsPidPath(conf, stormId, port.intValue());
+                            StringBuilder stringBuilder = new StringBuilder();
+                            FileReader reader = null;
+                            BufferedReader br = null;
+                            try {
+                                reader = new FileReader(str);
+                                br = new BufferedReader(reader);
+                                int c;
+                                while ((c = br.read()) >= 0) {
+                                    stringBuilder.append(c);
+                                }
+                            } catch (IOException e) {
+                                if (reader != null)
+                                    reader.close();
+                                if (br != null)
+                                    br.close();
+                            }
+                            String workerPid = stringBuilder.toString().trim();
+                            ProfileAction profileAction = profileRequest.get_action();
+                            String logPrefix = "ProfilerAction process " + stormId + ":" + port + " PROFILER_ACTION: " + profileAction + " ";
+
+                            // Until PROFILER_STOP action is invalid, keep launching profiler start in case worker restarted
+                            // The profiler plugin script validates if JVM is recording before starting another recording.
+                            String command = mkCommand(profileAction, stop, workerPid, targetDir);
+                            List<String> listCommand = new ArrayList<>();
+                            if (command != null) {
+                                listCommand.addAll(Arrays.asList(command.split(" ")));
+                            }
+                            try {
+                                ActionExitCallback actionExitCallback = new ActionExitCallback(stormId, profileRequest, logPrefix);
+                                launchProfilerActionForWorker(user, targetDir, listCommand, env, actionExitCallback, logPrefix);
+                            } catch (IOException e) {
+                                LOG.error("Error in processing ProfilerAction '{}' for {}:{}, will retry later", profileAction, stormId, port);
+                            } catch (RuntimeException e) {
+                                LOG.error("Error in processing ProfilerAction '{}' for {}:{}, will retry later", profileAction, stormId, port);
+                            }
+                        }
+                    }
+                }
+            }
+        } catch (Exception e) {
+            LOG.error("Error running profiler actions, will retry again later");
+        }
+    }
+
+    private void launchProfilerActionForWorker(String user, String targetDir, List<String> commands, Map<String, String> environment,
+            final Utils.ExitCodeCallable exitCodeCallable, String logPrefix) throws IOException {
+        File targetFile = new File(targetDir);
+        if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
+            LOG.info("Running as user:{} command:{}", user, commands);
+            String containerFile = Utils.containerFilePath(targetDir);
+            if (Utils.checkFileExists(containerFile)) {
+                SupervisorUtils.rmrAsUser(conf, containerFile, containerFile);
+            }
+            String scriptFile = Utils.scriptFilePath(targetDir);
+            if (Utils.checkFileExists(scriptFile)) {
+                SupervisorUtils.rmrAsUser(conf, scriptFile, scriptFile);
+            }
+            String script = Utils.writeScript(targetDir, commands, environment);
+            List<String> newCommands = new ArrayList<>();
+            newCommands.add("profiler");
+            newCommands.add(targetDir);
+            newCommands.add(script);
+            SupervisorUtils.workerLauncher(conf, user, newCommands, environment, logPrefix, exitCodeCallable, targetFile);
+        } else {
+            Utils.launchProcess(commands, environment, logPrefix, exitCodeCallable, targetFile);
+        }
+    }
+
+    private String mkCommand(ProfileAction action, boolean stop, String workerPid, String targetDir) {
+        if (action == ProfileAction.JMAP_DUMP) {
+            return jmapDumpCmd(workerPid, targetDir);
+        } else if (action == ProfileAction.JSTACK_DUMP) {
+            return jstackDumpCmd(workerPid, targetDir);
+        } else if (action == ProfileAction.JPROFILE_DUMP) {
+            return jprofileDump(workerPid, targetDir);
+        } else if (action == ProfileAction.JVM_RESTART) {
+            return jprofileJvmRestart(workerPid);
+        } else if (!stop && action == ProfileAction.JPROFILE_STOP) {
+            return jprofileStart(workerPid);
+        } else if (stop && action == ProfileAction.JPROFILE_STOP) {
+            return jprofileStop(workerPid, targetDir);
+        }
+        return null;
+    }
+
+    private String jmapDumpCmd(String pid, String targetDir) {
+        return profileCmd + " " + pid + " jmap " + targetDir;
+    }
+
+    private String jstackDumpCmd(String pid, String targetDir) {
+        return profileCmd + " " + pid + " jstack " + targetDir;
+    }
+
+    private String jprofileStart(String pid) {
+        return profileCmd + " " + pid + " start";
+    }
+
+    private String jprofileStop(String pid, String targetDir) {
+        return profileCmd + " " + pid + " stop " + targetDir;
+    }
+
+    private String jprofileDump(String pid, String targetDir) {
+        return profileCmd + " " + pid + " dump " + targetDir;
+    }
+
+    private String jprofileJvmRestart(String pid) {
+        return profileCmd + " " + pid + " kill";
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/b281c735/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java
new file mode 100644
index 0000000..36ee6b6
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.daemon.supervisor.timer;
+
+import org.apache.storm.command.HealthCheck;
+import org.apache.storm.daemon.supervisor.ShutdownWork;
+import org.apache.storm.daemon.supervisor.SupervisorData;
+import org.apache.storm.daemon.supervisor.SupervisorUtils;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Map;
+
+public class SupervisorHealthCheck extends ShutdownWork implements Runnable {
+
+    private static final Logger LOG = LoggerFactory.getLogger(SupervisorHealthCheck.class);
+
+    private SupervisorData supervisorData;
+
+    public SupervisorHealthCheck(SupervisorData supervisorData) {
+        this.supervisorData = supervisorData;
+    }
+
+    @Override
+    public void run() {
+        Map conf = supervisorData.getConf();
+        int healthCode = HealthCheck.healthCheck(conf);
+        Collection<String> workerIds = SupervisorUtils.supervisorWorkerIds(conf);
+        if (healthCode != 0) {
+            for (String workerId : workerIds) {
+                try {
+                    shutWorker(supervisorData, workerId);
+                } catch (Exception e) {
+                    throw Utils.wrapInRuntime(e);
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/b281c735/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
new file mode 100644
index 0000000..d41ca87
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor.timer;
+
+import org.apache.storm.Config;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.daemon.supervisor.SupervisorData;
+import org.apache.storm.generated.SupervisorInfo;
+import org.apache.storm.utils.Time;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class SupervisorHeartbeat implements Runnable {
+
+    private IStormClusterState stormClusterState;
+    private String supervisorId;
+    private Map conf;
+    private SupervisorInfo supervisorInfo;
+
+    private SupervisorData supervisorData;
+
+    public SupervisorHeartbeat(Map conf, SupervisorData supervisorData) {
+        this.stormClusterState = supervisorData.getStormClusterState();
+        this.supervisorId = supervisorData.getSupervisorId();
+        this.supervisorData = supervisorData;
+        this.conf = conf;
+    }
+
+    private SupervisorInfo update(Map conf, SupervisorData supervisorData) {
+        supervisorInfo = new SupervisorInfo();
+        supervisorInfo.set_time_secs(Time.currentTimeSecs());
+        supervisorInfo.set_hostname(supervisorData.getHostName());
+        supervisorInfo.set_assignment_id(supervisorData.getAssignmentId());
+
+        List<Long> usedPorts = new ArrayList<>();
+        usedPorts.addAll(supervisorData.getCurrAssignment().keySet());
+        supervisorInfo.set_used_ports(usedPorts);
+        List<Long> portList = new ArrayList<>();
+        Object metas = supervisorData.getiSupervisor().getMetadata();
+        if (metas != null) {
+            for (Integer port : (List<Integer>) metas) {
+                portList.add(port.longValue());
+            }
+        }
+        supervisorInfo.set_meta(portList);
+        supervisorInfo.set_scheduler_meta((Map<String, String>) conf.get(Config.SUPERVISOR_SCHEDULER_META));
+        supervisorInfo.set_uptime_secs(supervisorData.getUpTime().upTime());
+        supervisorInfo.set_version(supervisorData.getStormVersion());
+        supervisorInfo.set_resources_map(mkSupervisorCapacities(conf));
+        return supervisorInfo;
+    }
+
+    private Map<String, Double> mkSupervisorCapacities(Map conf) {
+        Map<String, Double> ret = new HashMap<String, Double>();
+        Double mem = (double) (conf.get(Config.SUPERVISOR_MEMORY_CAPACITY_MB));
+        ret.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, mem);
+        Double cpu = (double) (conf.get(Config.SUPERVISOR_CPU_CAPACITY));
+        ret.put(Config.SUPERVISOR_CPU_CAPACITY, cpu);
+        return ret;
+    }
+
+    @Override
+    public void run() {
+        SupervisorInfo supervisorInfo = update(conf, supervisorData);
+        stormClusterState.supervisorHeartbeat(supervisorId, supervisorInfo);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/b281c735/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java
new file mode 100644
index 0000000..623afa5
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor.timer;
+
+import org.apache.storm.Config;
+import org.apache.storm.daemon.supervisor.SupervisorData;
+import org.apache.storm.daemon.supervisor.SupervisorUtils;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.KeyNotFoundException;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.localizer.LocalResource;
+import org.apache.storm.localizer.Localizer;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.NimbusLeaderNotFoundException;
+import org.apache.storm.utils.Utils;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * downloads all blobs listed in the topology configuration for all topologies assigned to this supervisor, and creates version files with a suffix. The
+ * Runnable is intended to be run periodically by a timer, created elsewhere.
+ */
+public class UpdateBlobs implements Runnable {
+
+    private static final Logger LOG = LoggerFactory.getLogger(UpdateBlobs.class);
+
+    private SupervisorData supervisorData;
+
+    public UpdateBlobs(SupervisorData supervisorData) {
+        this.supervisorData = supervisorData;
+    }
+
+    @Override
+    public void run() {
+        try {
+            Map conf = supervisorData.getConf();
+            Set<String> downloadedStormIds = SupervisorUtils.readDownLoadedStormIds(conf);
+            ConcurrentHashMap<Long, LocalAssignment> newAssignment = supervisorData.getCurrAssignment();
+            Set<String> assignedStormIds = new HashSet<>();
+            for (LocalAssignment localAssignment : newAssignment.values()) {
+                assignedStormIds.add(localAssignment.get_topology_id());
+            }
+            for (String stormId : downloadedStormIds) {
+                if (assignedStormIds.contains(stormId)) {
+                    String stormRoot = ConfigUtils.supervisorStormDistRoot(conf, stormId);
+                    LOG.debug("Checking Blob updates for storm topology id {} With target_dir: {}", stormId, stormRoot);
+                    updateBlobsForTopology(conf, stormId, supervisorData.getLocalizer());
+                }
+            }
+        } catch (Exception e) {
+            if (Utils.exceptionCauseIsInstanceOf(TTransportException.class, e)) {
+                LOG.error("Network error while updating blobs, will retry again later", e);
+            } else if (Utils.exceptionCauseIsInstanceOf(NimbusLeaderNotFoundException.class, e)) {
+                LOG.error("Nimbus unavailable to update blobs, will retry again later", e);
+            } else {
+                throw Utils.wrapInRuntime(e);
+            }
+        }
+    }
+
+    /**
+     * Update each blob listed in the topology configuration if the latest version of the blob has not been downloaded.
+     * 
+     * @param conf
+     * @param stormId
+     * @param localizer
+     * @throws IOException
+     */
+    private void updateBlobsForTopology(Map conf, String stormId, Localizer localizer) throws IOException {
+        Map stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId);
+        Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String, Object>>) stormConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
+        String user = (String) stormConf.get(Config.TOPOLOGY_SUBMITTER_USER);
+        List<LocalResource> localresources = SupervisorUtils.blobstoreMapToLocalresources(blobstoreMap);
+        try {
+            localizer.updateBlobs(localresources, user);
+        } catch (AuthorizationException authExp) {
+            LOG.error("AuthorizationException error", authExp);
+        } catch (KeyNotFoundException knf) {
+            LOG.error("KeyNotFoundException error", knf);
+        }
+    }
+}