You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/04/01 16:07:26 UTC
[03/35] storm git commit: update supervisor's structure
update supervisor's structure
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b281c735
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b281c735
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b281c735
Branch: refs/heads/master
Commit: b281c735f0089d24407af67586a1b41de45ac382
Parents: 08934e2
Author: xiaojian.fxj <xi...@alibaba-inc.com>
Authored: Fri Feb 26 13:15:56 2016 +0800
Committer: xiaojian.fxj <xi...@alibaba-inc.com>
Committed: Fri Feb 26 13:15:56 2016 +0800
----------------------------------------------------------------------
.../daemon/supervisor/RunProfilerActions.java | 221 ------------------
.../daemon/supervisor/SupervisorHeartbeat.java | 84 -------
.../daemon/supervisor/SupervisorServer.java | 23 +-
.../storm/daemon/supervisor/UpdateBlobs.java | 103 ---------
.../supervisor/timer/RunProfilerActions.java | 223 +++++++++++++++++++
.../supervisor/timer/SupervisorHealthCheck.java | 57 +++++
.../supervisor/timer/SupervisorHeartbeat.java | 85 +++++++
.../daemon/supervisor/timer/UpdateBlobs.java | 105 +++++++++
8 files changed, 476 insertions(+), 425 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/b281c735/storm-core/src/jvm/org/apache/storm/daemon/supervisor/RunProfilerActions.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/RunProfilerActions.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/RunProfilerActions.java
deleted file mode 100644
index 209c067..0000000
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/RunProfilerActions.java
+++ /dev/null
@@ -1,221 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.daemon.supervisor;
-
-import org.apache.storm.Config;
-import org.apache.storm.cluster.IStormClusterState;
-import org.apache.storm.generated.ProfileAction;
-import org.apache.storm.generated.ProfileRequest;
-import org.apache.storm.utils.ConfigUtils;
-import org.apache.storm.utils.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
-import java.util.*;
-
-public class RunProfilerActions implements Runnable {
- private static Logger LOG = LoggerFactory.getLogger(RunProfilerActions.class);
-
- private Map conf;
- private IStormClusterState stormClusterState;
- private String hostName;
- private String stormHome;
-
- private String profileCmd;
-
- private SupervisorData supervisorData;
-
- private class ActionExitCallback implements Utils.ExitCodeCallable {
- private String stormId;
- private ProfileRequest profileRequest;
- private String logPrefix;
-
- public ActionExitCallback(String stormId, ProfileRequest profileRequest, String logPrefix) {
- this.stormId = stormId;
- this.profileRequest = profileRequest;
- this.logPrefix = logPrefix;
- }
-
- @Override
- public Object call() throws Exception {
- return null;
- }
-
- @Override
- public Object call(int exitCode) {
- LOG.info("{} profile-action exited for {}", logPrefix, exitCode);
- try {
- stormClusterState.deleteTopologyProfileRequests(stormId, profileRequest);
- } catch (Exception e) {
- LOG.warn("failed delete profileRequest: " + profileRequest);
- }
- return null;
- }
- }
-
- public RunProfilerActions(SupervisorData supervisorData) {
- this.conf = supervisorData.getConf();
- this.stormClusterState = supervisorData.getStormClusterState();
- this.hostName = supervisorData.getHostName();
- this.stormHome = System.getProperty("storm.home");
- this.profileCmd = (String) (conf.get(Config.WORKER_PROFILER_COMMAND));
- this.supervisorData = supervisorData;
- }
-
- @Override
- public void run() {
- Map<String, List<ProfileRequest>> stormIdToActions = supervisorData.getStormIdToProfileActions();
- try {
- for (Map.Entry<String, List<ProfileRequest>> entry : stormIdToActions.entrySet()) {
- String stormId = entry.getKey();
- List<ProfileRequest> requests = entry.getValue();
- if (requests != null) {
- for (ProfileRequest profileRequest : requests) {
- if (profileRequest.get_nodeInfo().get_node().equals(hostName)) {
- boolean stop = System.currentTimeMillis() > profileRequest.get_time_stamp() ? true : false;
- Long port = profileRequest.get_nodeInfo().get_port().iterator().next();
- String targetDir = ConfigUtils.workerArtifactsRoot(conf, String.valueOf(port));
- Map stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId);
-
- String user = null;
- if (stormConf.get(Config.TOPOLOGY_SUBMITTER_USER) != null) {
- user = (String) (stormConf.get(Config.TOPOLOGY_SUBMITTER_USER));
- }
- Map<String, String> env = null;
- if (stormConf.get(Config.TOPOLOGY_ENVIRONMENT) != null) {
- env = (Map<String, String>) stormConf.get(Config.TOPOLOGY_ENVIRONMENT);
- } else {
- env = new HashMap<String, String>();
- }
-
- String str = ConfigUtils.workerArtifactsPidPath(conf, stormId, port.intValue());
- StringBuilder stringBuilder = new StringBuilder();
- FileReader reader = null;
- BufferedReader br = null;
- try {
- reader = new FileReader(str);
- br = new BufferedReader(reader);
- int c;
- while ((c = br.read()) >= 0) {
- stringBuilder.append(c);
- }
- } catch (IOException e) {
- if (reader != null)
- reader.close();
- if (br != null)
- br.close();
- }
- String workerPid = stringBuilder.toString().trim();
- ProfileAction profileAction = profileRequest.get_action();
- String logPrefix = "ProfilerAction process " + stormId + ":" + port + " PROFILER_ACTION: " + profileAction + " ";
-
- // Until PROFILER_STOP action is invalid, keep launching profiler start in case worker restarted
- // The profiler plugin script validates if JVM is recording before starting another recording.
- String command = mkCommand(profileAction, stop, workerPid, targetDir);
- List<String> listCommand = new ArrayList<>();
- if (command != null) {
- listCommand.addAll(Arrays.asList(command.split(" ")));
- }
- try {
- ActionExitCallback actionExitCallback = new ActionExitCallback(stormId, profileRequest, logPrefix);
- launchProfilerActionForWorker(user, targetDir, listCommand, env, actionExitCallback, logPrefix);
- } catch (IOException e) {
- LOG.error("Error in processing ProfilerAction '{}' for {}:{}, will retry later", profileAction, stormId, port);
- } catch (RuntimeException e) {
- LOG.error("Error in processing ProfilerAction '{}' for {}:{}, will retry later", profileAction, stormId, port);
- }
- }
- }
- }
- }
- } catch (Exception e) {
- LOG.error("Error running profiler actions, will retry again later");
- }
- }
-
- private void launchProfilerActionForWorker(String user, String targetDir, List<String> commands, Map<String, String> environment,
- final Utils.ExitCodeCallable exitCodeCallable, String logPrefix) throws IOException {
- File targetFile = new File(targetDir);
- if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
- LOG.info("Running as user:{} command:{}", user, commands);
- String containerFile = Utils.containerFilePath(targetDir);
- if (Utils.checkFileExists(containerFile)) {
- SupervisorUtils.rmrAsUser(conf, containerFile, containerFile);
- }
- String scriptFile = Utils.scriptFilePath(targetDir);
- if (Utils.checkFileExists(scriptFile)) {
- SupervisorUtils.rmrAsUser(conf, scriptFile, scriptFile);
- }
- String script = Utils.writeScript(targetDir, commands, environment);
- List<String> newCommands = new ArrayList<>();
- newCommands.add("profiler");
- newCommands.add(targetDir);
- newCommands.add(script);
- SupervisorUtils.workerLauncher(conf, user, newCommands, environment, logPrefix, exitCodeCallable, targetFile);
- } else {
- Utils.launchProcess(commands, environment, logPrefix, exitCodeCallable, targetFile);
- }
- }
-
- private String mkCommand(ProfileAction action, boolean stop, String workerPid, String targetDir) {
- if (action == ProfileAction.JMAP_DUMP) {
- return jmapDumpCmd(workerPid, targetDir);
- } else if (action == ProfileAction.JSTACK_DUMP) {
- return jstackDumpCmd(workerPid, targetDir);
- } else if (action == ProfileAction.JPROFILE_DUMP) {
- return jprofileDump(workerPid, targetDir);
- } else if (action == ProfileAction.JVM_RESTART) {
- return jprofileJvmRestart(workerPid);
- } else if (!stop && action == ProfileAction.JPROFILE_STOP) {
- return jprofileStart(workerPid);
- } else if (stop && action == ProfileAction.JPROFILE_STOP) {
- return jprofileStop(workerPid, targetDir);
- }
- return null;
- }
-
- private String jmapDumpCmd(String pid, String targetDir) {
- return profileCmd + " " + pid + " jmap " + targetDir;
- }
-
- private String jstackDumpCmd(String pid, String targetDir) {
- return profileCmd + " " + pid + " jstack " + targetDir;
- }
-
- private String jprofileStart(String pid) {
- return profileCmd + " " + pid + " start";
- }
-
- private String jprofileStop(String pid, String targetDir) {
- return profileCmd + " " + pid + " stop " + targetDir;
- }
-
- private String jprofileDump(String pid, String targetDir) {
- return profileCmd + " " + pid + " dump " + targetDir;
- }
-
- private String jprofileJvmRestart(String pid) {
- return profileCmd + " " + pid + " kill";
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/b281c735/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorHeartbeat.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorHeartbeat.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorHeartbeat.java
deleted file mode 100644
index 399dcd2..0000000
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorHeartbeat.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.daemon.supervisor;
-
-import org.apache.storm.Config;
-import org.apache.storm.cluster.IStormClusterState;
-import org.apache.storm.generated.SupervisorInfo;
-import org.apache.storm.utils.Time;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class SupervisorHeartbeat implements Runnable {
-
- private IStormClusterState stormClusterState;
- private String supervisorId;
- private Map conf;
- private SupervisorInfo supervisorInfo;
-
- private SupervisorData supervisorData;
-
- public SupervisorHeartbeat(Map conf, SupervisorData supervisorData) {
- this.stormClusterState = supervisorData.getStormClusterState();
- this.supervisorId = supervisorData.getSupervisorId();
- this.supervisorData = supervisorData;
- this.conf = conf;
- }
-
- private SupervisorInfo update(Map conf, SupervisorData supervisorData) {
- supervisorInfo = new SupervisorInfo();
- supervisorInfo.set_time_secs(Time.currentTimeSecs());
- supervisorInfo.set_hostname(supervisorData.getHostName());
- supervisorInfo.set_assignment_id(supervisorData.getAssignmentId());
-
- List<Long> usedPorts = new ArrayList<>();
- usedPorts.addAll(supervisorData.getCurrAssignment().keySet());
- supervisorInfo.set_used_ports(usedPorts);
- List<Long> portList = new ArrayList<>();
- Object metas = supervisorData.getiSupervisor().getMetadata();
- if (metas != null) {
- for (Integer port : (List<Integer>) metas) {
- portList.add(port.longValue());
- }
- }
- supervisorInfo.set_meta(portList);
- supervisorInfo.set_scheduler_meta((Map<String, String>) conf.get(Config.SUPERVISOR_SCHEDULER_META));
- supervisorInfo.set_uptime_secs(supervisorData.getUpTime().upTime());
- supervisorInfo.set_version(supervisorData.getStormVersion());
- supervisorInfo.set_resources_map(mkSupervisorCapacities(conf));
- return supervisorInfo;
- }
-
- private Map<String, Double> mkSupervisorCapacities(Map conf) {
- Map<String, Double> ret = new HashMap<String, Double>();
- Double mem = (double) (conf.get(Config.SUPERVISOR_MEMORY_CAPACITY_MB));
- ret.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, mem);
- Double cpu = (double) (conf.get(Config.SUPERVISOR_CPU_CAPACITY));
- ret.put(Config.SUPERVISOR_CPU_CAPACITY, cpu);
- return ret;
- }
-
- @Override
- public void run() {
- SupervisorInfo supervisorInfo = update(conf, supervisorData);
- stormClusterState.supervisorHeartbeat(supervisorId, supervisorInfo);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/b281c735/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorServer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorServer.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorServer.java
index f1dfb8a..fd31631 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorServer.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorServer.java
@@ -25,6 +25,10 @@ import org.apache.storm.StormTimer;
import org.apache.storm.command.HealthCheck;
import org.apache.storm.daemon.metrics.MetricsUtils;
import org.apache.storm.daemon.metrics.reporters.PreparableReporter;
+import org.apache.storm.daemon.supervisor.timer.RunProfilerActions;
+import org.apache.storm.daemon.supervisor.timer.SupervisorHealthCheck;
+import org.apache.storm.daemon.supervisor.timer.SupervisorHeartbeat;
+import org.apache.storm.daemon.supervisor.timer.UpdateBlobs;
import org.apache.storm.event.EventManagerImp;
import org.apache.storm.localizer.Localizer;
import org.apache.storm.messaging.IContext;
@@ -42,7 +46,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
-public class SupervisorServer extends ShutdownWork {
+public class SupervisorServer {
private static Logger LOG = LoggerFactory.getLogger(SupervisorServer.class);
/**
@@ -98,22 +102,7 @@ public class SupervisorServer extends ShutdownWork {
supervisorData.getBlobUpdateTimer().scheduleRecurring(30, 30, new EventManagerPushCallback(updateBlobsThread, syncSupEventManager));
// supervisor health check
- eventTimer.scheduleRecurring(300, 300, new Runnable() {
- @Override
- public void run() {
- int healthCode = HealthCheck.healthCheck(conf);
- Collection<String> workerIds = SupervisorUtils.supervisorWorkerIds(conf);
- if (healthCode != 0) {
- for (String workerId : workerIds) {
- try {
- shutWorker(supervisorData, workerId);
- } catch (Exception e) {
- throw Utils.wrapInRuntime(e);
- }
- }
- }
- }
- });
+ eventTimer.scheduleRecurring(300, 300, new SupervisorHealthCheck(supervisorData));
// Launch a thread that Runs profiler commands . Starts with 30 seconds delay, every 30 seconds
eventTimer.scheduleRecurring(30, 30, new EventManagerPushCallback(runProfilerActionThread, syncSupEventManager));
http://git-wip-us.apache.org/repos/asf/storm/blob/b281c735/storm-core/src/jvm/org/apache/storm/daemon/supervisor/UpdateBlobs.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/UpdateBlobs.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/UpdateBlobs.java
deleted file mode 100644
index 90dccae..0000000
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/UpdateBlobs.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.daemon.supervisor;
-
-import org.apache.storm.Config;
-import org.apache.storm.generated.AuthorizationException;
-import org.apache.storm.generated.KeyNotFoundException;
-import org.apache.storm.generated.LocalAssignment;
-import org.apache.storm.localizer.LocalResource;
-import org.apache.storm.localizer.Localizer;
-import org.apache.storm.utils.ConfigUtils;
-import org.apache.storm.utils.NimbusLeaderNotFoundException;
-import org.apache.storm.utils.Utils;
-import org.apache.thrift.transport.TTransportException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * downloads all blobs listed in the topology configuration for all topologies assigned to this supervisor, and creates version files with a suffix. The
- * Runnable is intended to be run periodically by a timer, created elsewhere.
- */
-public class UpdateBlobs implements Runnable {
-
- private static final Logger LOG = LoggerFactory.getLogger(UpdateBlobs.class);
-
- private SupervisorData supervisorData;
-
- public UpdateBlobs(SupervisorData supervisorData) {
- this.supervisorData = supervisorData;
- }
-
- @Override
- public void run() {
- try {
- Map conf = supervisorData.getConf();
- Set<String> downloadedStormIds = SupervisorUtils.readDownLoadedStormIds(conf);
- ConcurrentHashMap<Long, LocalAssignment> newAssignment = supervisorData.getCurrAssignment();
- Set<String> assignedStormIds = new HashSet<>();
- for (LocalAssignment localAssignment : newAssignment.values()) {
- assignedStormIds.add(localAssignment.get_topology_id());
- }
- for (String stormId : downloadedStormIds) {
- if (assignedStormIds.contains(stormId)) {
- String stormRoot = ConfigUtils.supervisorStormDistRoot(conf, stormId);
- LOG.debug("Checking Blob updates for storm topology id {} With target_dir: {}", stormId, stormRoot);
- updateBlobsForTopology(conf, stormId, supervisorData.getLocalizer());
- }
- }
- } catch (Exception e) {
- if (Utils.exceptionCauseIsInstanceOf(TTransportException.class, e)) {
- LOG.error("Network error while updating blobs, will retry again later", e);
- } else if (Utils.exceptionCauseIsInstanceOf(NimbusLeaderNotFoundException.class, e)) {
- LOG.error("Nimbus unavailable to update blobs, will retry again later", e);
- } else {
- throw Utils.wrapInRuntime(e);
- }
- }
- }
-
- /**
- * Update each blob listed in the topology configuration if the latest version of the blob has not been downloaded.
- *
- * @param conf
- * @param stormId
- * @param localizer
- * @throws IOException
- */
- private void updateBlobsForTopology(Map conf, String stormId, Localizer localizer) throws IOException {
- Map stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId);
- Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String, Object>>) stormConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
- String user = (String) stormConf.get(Config.TOPOLOGY_SUBMITTER_USER);
- List<LocalResource> localresources = SupervisorUtils.blobstoreMapToLocalresources(blobstoreMap);
- try {
- localizer.updateBlobs(localresources, user);
- } catch (AuthorizationException authExp) {
- LOG.error("AuthorizationException error", authExp);
- } catch (KeyNotFoundException knf) {
- LOG.error("KeyNotFoundException error", knf);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/b281c735/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java
new file mode 100644
index 0000000..2d73327
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.daemon.supervisor.timer;
+
+import org.apache.storm.Config;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.daemon.supervisor.SupervisorData;
+import org.apache.storm.daemon.supervisor.SupervisorUtils;
+import org.apache.storm.generated.ProfileAction;
+import org.apache.storm.generated.ProfileRequest;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.*;
+
+public class RunProfilerActions implements Runnable {
+ private static Logger LOG = LoggerFactory.getLogger(RunProfilerActions.class);
+
+ private Map conf;
+ private IStormClusterState stormClusterState;
+ private String hostName;
+ private String stormHome;
+
+ private String profileCmd;
+
+ private SupervisorData supervisorData;
+
+ private class ActionExitCallback implements Utils.ExitCodeCallable {
+ private String stormId;
+ private ProfileRequest profileRequest;
+ private String logPrefix;
+
+ public ActionExitCallback(String stormId, ProfileRequest profileRequest, String logPrefix) {
+ this.stormId = stormId;
+ this.profileRequest = profileRequest;
+ this.logPrefix = logPrefix;
+ }
+
+ @Override
+ public Object call() throws Exception {
+ return null;
+ }
+
+ @Override
+ public Object call(int exitCode) {
+ LOG.info("{} profile-action exited for {}", logPrefix, exitCode);
+ try {
+ stormClusterState.deleteTopologyProfileRequests(stormId, profileRequest);
+ } catch (Exception e) {
+ LOG.warn("failed delete profileRequest: " + profileRequest);
+ }
+ return null;
+ }
+ }
+
+ public RunProfilerActions(SupervisorData supervisorData) {
+ this.conf = supervisorData.getConf();
+ this.stormClusterState = supervisorData.getStormClusterState();
+ this.hostName = supervisorData.getHostName();
+ this.stormHome = System.getProperty("storm.home");
+ this.profileCmd = (String) (conf.get(Config.WORKER_PROFILER_COMMAND));
+ this.supervisorData = supervisorData;
+ }
+
+ @Override
+ public void run() {
+ Map<String, List<ProfileRequest>> stormIdToActions = supervisorData.getStormIdToProfileActions();
+ try {
+ for (Map.Entry<String, List<ProfileRequest>> entry : stormIdToActions.entrySet()) {
+ String stormId = entry.getKey();
+ List<ProfileRequest> requests = entry.getValue();
+ if (requests != null) {
+ for (ProfileRequest profileRequest : requests) {
+ if (profileRequest.get_nodeInfo().get_node().equals(hostName)) {
+ boolean stop = System.currentTimeMillis() > profileRequest.get_time_stamp() ? true : false;
+ Long port = profileRequest.get_nodeInfo().get_port().iterator().next();
+ String targetDir = ConfigUtils.workerArtifactsRoot(conf, String.valueOf(port));
+ Map stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId);
+
+ String user = null;
+ if (stormConf.get(Config.TOPOLOGY_SUBMITTER_USER) != null) {
+ user = (String) (stormConf.get(Config.TOPOLOGY_SUBMITTER_USER));
+ }
+ Map<String, String> env = null;
+ if (stormConf.get(Config.TOPOLOGY_ENVIRONMENT) != null) {
+ env = (Map<String, String>) stormConf.get(Config.TOPOLOGY_ENVIRONMENT);
+ } else {
+ env = new HashMap<String, String>();
+ }
+
+ String str = ConfigUtils.workerArtifactsPidPath(conf, stormId, port.intValue());
+ StringBuilder stringBuilder = new StringBuilder();
+ FileReader reader = null;
+ BufferedReader br = null;
+ try {
+ reader = new FileReader(str);
+ br = new BufferedReader(reader);
+ int c;
+ while ((c = br.read()) >= 0) {
+ stringBuilder.append(c);
+ }
+ } catch (IOException e) {
+ if (reader != null)
+ reader.close();
+ if (br != null)
+ br.close();
+ }
+ String workerPid = stringBuilder.toString().trim();
+ ProfileAction profileAction = profileRequest.get_action();
+ String logPrefix = "ProfilerAction process " + stormId + ":" + port + " PROFILER_ACTION: " + profileAction + " ";
+
+ // Until PROFILER_STOP action is invalid, keep launching profiler start in case worker restarted
+ // The profiler plugin script validates if JVM is recording before starting another recording.
+ String command = mkCommand(profileAction, stop, workerPid, targetDir);
+ List<String> listCommand = new ArrayList<>();
+ if (command != null) {
+ listCommand.addAll(Arrays.asList(command.split(" ")));
+ }
+ try {
+ ActionExitCallback actionExitCallback = new ActionExitCallback(stormId, profileRequest, logPrefix);
+ launchProfilerActionForWorker(user, targetDir, listCommand, env, actionExitCallback, logPrefix);
+ } catch (IOException e) {
+ LOG.error("Error in processing ProfilerAction '{}' for {}:{}, will retry later", profileAction, stormId, port);
+ } catch (RuntimeException e) {
+ LOG.error("Error in processing ProfilerAction '{}' for {}:{}, will retry later", profileAction, stormId, port);
+ }
+ }
+ }
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("Error running profiler actions, will retry again later");
+ }
+ }
+
+ private void launchProfilerActionForWorker(String user, String targetDir, List<String> commands, Map<String, String> environment,
+ final Utils.ExitCodeCallable exitCodeCallable, String logPrefix) throws IOException {
+ File targetFile = new File(targetDir);
+ if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
+ LOG.info("Running as user:{} command:{}", user, commands);
+ String containerFile = Utils.containerFilePath(targetDir);
+ if (Utils.checkFileExists(containerFile)) {
+ SupervisorUtils.rmrAsUser(conf, containerFile, containerFile);
+ }
+ String scriptFile = Utils.scriptFilePath(targetDir);
+ if (Utils.checkFileExists(scriptFile)) {
+ SupervisorUtils.rmrAsUser(conf, scriptFile, scriptFile);
+ }
+ String script = Utils.writeScript(targetDir, commands, environment);
+ List<String> newCommands = new ArrayList<>();
+ newCommands.add("profiler");
+ newCommands.add(targetDir);
+ newCommands.add(script);
+ SupervisorUtils.workerLauncher(conf, user, newCommands, environment, logPrefix, exitCodeCallable, targetFile);
+ } else {
+ Utils.launchProcess(commands, environment, logPrefix, exitCodeCallable, targetFile);
+ }
+ }
+
+ private String mkCommand(ProfileAction action, boolean stop, String workerPid, String targetDir) {
+ if (action == ProfileAction.JMAP_DUMP) {
+ return jmapDumpCmd(workerPid, targetDir);
+ } else if (action == ProfileAction.JSTACK_DUMP) {
+ return jstackDumpCmd(workerPid, targetDir);
+ } else if (action == ProfileAction.JPROFILE_DUMP) {
+ return jprofileDump(workerPid, targetDir);
+ } else if (action == ProfileAction.JVM_RESTART) {
+ return jprofileJvmRestart(workerPid);
+ } else if (!stop && action == ProfileAction.JPROFILE_STOP) {
+ return jprofileStart(workerPid);
+ } else if (stop && action == ProfileAction.JPROFILE_STOP) {
+ return jprofileStop(workerPid, targetDir);
+ }
+ return null;
+ }
+
+ private String jmapDumpCmd(String pid, String targetDir) {
+ return profileCmd + " " + pid + " jmap " + targetDir;
+ }
+
+ private String jstackDumpCmd(String pid, String targetDir) {
+ return profileCmd + " " + pid + " jstack " + targetDir;
+ }
+
+ private String jprofileStart(String pid) {
+ return profileCmd + " " + pid + " start";
+ }
+
+ private String jprofileStop(String pid, String targetDir) {
+ return profileCmd + " " + pid + " stop " + targetDir;
+ }
+
+ private String jprofileDump(String pid, String targetDir) {
+ return profileCmd + " " + pid + " dump " + targetDir;
+ }
+
+ private String jprofileJvmRestart(String pid) {
+ return profileCmd + " " + pid + " kill";
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/b281c735/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java
new file mode 100644
index 0000000..36ee6b6
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.daemon.supervisor.timer;
+
+import org.apache.storm.command.HealthCheck;
+import org.apache.storm.daemon.supervisor.ShutdownWork;
+import org.apache.storm.daemon.supervisor.SupervisorData;
+import org.apache.storm.daemon.supervisor.SupervisorUtils;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Map;
+
+public class SupervisorHealthCheck extends ShutdownWork implements Runnable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SupervisorHealthCheck.class);
+
+ private SupervisorData supervisorData;
+
+ public SupervisorHealthCheck(SupervisorData supervisorData) {
+ this.supervisorData = supervisorData;
+ }
+
+ @Override
+ public void run() {
+ Map conf = supervisorData.getConf();
+ int healthCode = HealthCheck.healthCheck(conf);
+ Collection<String> workerIds = SupervisorUtils.supervisorWorkerIds(conf);
+ if (healthCode != 0) {
+ for (String workerId : workerIds) {
+ try {
+ shutWorker(supervisorData, workerId);
+ } catch (Exception e) {
+ throw Utils.wrapInRuntime(e);
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/b281c735/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
new file mode 100644
index 0000000..d41ca87
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor.timer;
+
+import org.apache.storm.Config;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.daemon.supervisor.SupervisorData;
+import org.apache.storm.generated.SupervisorInfo;
+import org.apache.storm.utils.Time;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class SupervisorHeartbeat implements Runnable {
+
+ private IStormClusterState stormClusterState;
+ private String supervisorId;
+ private Map conf;
+ private SupervisorInfo supervisorInfo;
+
+ private SupervisorData supervisorData;
+
+ public SupervisorHeartbeat(Map conf, SupervisorData supervisorData) {
+ this.stormClusterState = supervisorData.getStormClusterState();
+ this.supervisorId = supervisorData.getSupervisorId();
+ this.supervisorData = supervisorData;
+ this.conf = conf;
+ }
+
+ private SupervisorInfo update(Map conf, SupervisorData supervisorData) {
+ supervisorInfo = new SupervisorInfo();
+ supervisorInfo.set_time_secs(Time.currentTimeSecs());
+ supervisorInfo.set_hostname(supervisorData.getHostName());
+ supervisorInfo.set_assignment_id(supervisorData.getAssignmentId());
+
+ List<Long> usedPorts = new ArrayList<>();
+ usedPorts.addAll(supervisorData.getCurrAssignment().keySet());
+ supervisorInfo.set_used_ports(usedPorts);
+ List<Long> portList = new ArrayList<>();
+ Object metas = supervisorData.getiSupervisor().getMetadata();
+ if (metas != null) {
+ for (Integer port : (List<Integer>) metas) {
+ portList.add(port.longValue());
+ }
+ }
+ supervisorInfo.set_meta(portList);
+ supervisorInfo.set_scheduler_meta((Map<String, String>) conf.get(Config.SUPERVISOR_SCHEDULER_META));
+ supervisorInfo.set_uptime_secs(supervisorData.getUpTime().upTime());
+ supervisorInfo.set_version(supervisorData.getStormVersion());
+ supervisorInfo.set_resources_map(mkSupervisorCapacities(conf));
+ return supervisorInfo;
+ }
+
+ private Map<String, Double> mkSupervisorCapacities(Map conf) {
+ Map<String, Double> ret = new HashMap<String, Double>();
+ Double mem = (double) (conf.get(Config.SUPERVISOR_MEMORY_CAPACITY_MB));
+ ret.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, mem);
+ Double cpu = (double) (conf.get(Config.SUPERVISOR_CPU_CAPACITY));
+ ret.put(Config.SUPERVISOR_CPU_CAPACITY, cpu);
+ return ret;
+ }
+
+ @Override
+ public void run() {
+ SupervisorInfo supervisorInfo = update(conf, supervisorData);
+ stormClusterState.supervisorHeartbeat(supervisorId, supervisorInfo);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/b281c735/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java
new file mode 100644
index 0000000..623afa5
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor.timer;
+
+import org.apache.storm.Config;
+import org.apache.storm.daemon.supervisor.SupervisorData;
+import org.apache.storm.daemon.supervisor.SupervisorUtils;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.KeyNotFoundException;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.localizer.LocalResource;
+import org.apache.storm.localizer.Localizer;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.NimbusLeaderNotFoundException;
+import org.apache.storm.utils.Utils;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * downloads all blobs listed in the topology configuration for all topologies assigned to this supervisor, and creates version files with a suffix. The
+ * Runnable is intended to be run periodically by a timer, created elsewhere.
+ */
+public class UpdateBlobs implements Runnable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(UpdateBlobs.class);
+
+ private SupervisorData supervisorData;
+
+ public UpdateBlobs(SupervisorData supervisorData) {
+ this.supervisorData = supervisorData;
+ }
+
+ @Override
+ public void run() {
+ try {
+ Map conf = supervisorData.getConf();
+ Set<String> downloadedStormIds = SupervisorUtils.readDownLoadedStormIds(conf);
+ ConcurrentHashMap<Long, LocalAssignment> newAssignment = supervisorData.getCurrAssignment();
+ Set<String> assignedStormIds = new HashSet<>();
+ for (LocalAssignment localAssignment : newAssignment.values()) {
+ assignedStormIds.add(localAssignment.get_topology_id());
+ }
+ for (String stormId : downloadedStormIds) {
+ if (assignedStormIds.contains(stormId)) {
+ String stormRoot = ConfigUtils.supervisorStormDistRoot(conf, stormId);
+ LOG.debug("Checking Blob updates for storm topology id {} With target_dir: {}", stormId, stormRoot);
+ updateBlobsForTopology(conf, stormId, supervisorData.getLocalizer());
+ }
+ }
+ } catch (Exception e) {
+ if (Utils.exceptionCauseIsInstanceOf(TTransportException.class, e)) {
+ LOG.error("Network error while updating blobs, will retry again later", e);
+ } else if (Utils.exceptionCauseIsInstanceOf(NimbusLeaderNotFoundException.class, e)) {
+ LOG.error("Nimbus unavailable to update blobs, will retry again later", e);
+ } else {
+ throw Utils.wrapInRuntime(e);
+ }
+ }
+ }
+
+ /**
+ * Update each blob listed in the topology configuration if the latest version of the blob has not been downloaded.
+ *
+ * @param conf
+ * @param stormId
+ * @param localizer
+ * @throws IOException
+ */
+ private void updateBlobsForTopology(Map conf, String stormId, Localizer localizer) throws IOException {
+ Map stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId);
+ Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String, Object>>) stormConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
+ String user = (String) stormConf.get(Config.TOPOLOGY_SUBMITTER_USER);
+ List<LocalResource> localresources = SupervisorUtils.blobstoreMapToLocalresources(blobstoreMap);
+ try {
+ localizer.updateBlobs(localresources, user);
+ } catch (AuthorizationException authExp) {
+ LOG.error("AuthorizationException error", authExp);
+ } catch (KeyNotFoundException knf) {
+ LOG.error("KeyNotFoundException error", knf);
+ }
+ }
+}