You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/04/01 16:07:53 UTC
[30/35] storm git commit: update supervisor based on revans2 and
longdafeng
http://git-wip-us.apache.org/repos/asf/storm/blob/dba69b52/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java
index 6b294f2,0000000..04467c2
mode 100644,000000..100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java
@@@ -1,221 -1,0 +1,214 @@@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.daemon.supervisor.timer;
+
++import com.google.common.collect.Lists;
+import org.apache.storm.Config;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.daemon.supervisor.SupervisorData;
+import org.apache.storm.daemon.supervisor.SupervisorUtils;
+import org.apache.storm.generated.ProfileAction;
+import org.apache.storm.generated.ProfileRequest;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.*;
+
+public class RunProfilerActions implements Runnable {
+ private static Logger LOG = LoggerFactory.getLogger(RunProfilerActions.class);
+
+ private Map conf;
+ private IStormClusterState stormClusterState;
+ private String hostName;
+
+ private String profileCmd;
+
+ private SupervisorData supervisorData;
+
+ private class ActionExitCallback implements Utils.ExitCodeCallable {
+ private String stormId;
+ private ProfileRequest profileRequest;
+ private String logPrefix;
++ private boolean stop;
+
- public ActionExitCallback(String stormId, ProfileRequest profileRequest, String logPrefix) {
++ public ActionExitCallback(String stormId, ProfileRequest profileRequest, String logPrefix, boolean stop) {
+ this.stormId = stormId;
+ this.profileRequest = profileRequest;
+ this.logPrefix = logPrefix;
++ this.stop = stop;
+ }
+
+ @Override
+ public Object call() throws Exception {
+ return null;
+ }
+
+ @Override
+ public Object call(int exitCode) {
+ LOG.info("{} profile-action exited for {}", logPrefix, exitCode);
+ try {
- stormClusterState.deleteTopologyProfileRequests(stormId, profileRequest);
++ if (stop)
++ stormClusterState.deleteTopologyProfileRequests(stormId, profileRequest);
+ } catch (Exception e) {
+ LOG.warn("failed delete profileRequest: " + profileRequest);
+ }
+ return null;
+ }
+ }
+
+ public RunProfilerActions(SupervisorData supervisorData) {
+ this.conf = supervisorData.getConf();
+ this.stormClusterState = supervisorData.getStormClusterState();
+ this.hostName = supervisorData.getHostName();
+ this.profileCmd = (String) (conf.get(Config.WORKER_PROFILER_COMMAND));
+ this.supervisorData = supervisorData;
+ }
+
+ @Override
+ public void run() {
- Map<String, List<ProfileRequest>> stormIdToActions = supervisorData.getStormIdToProfileActions().get();
++ Map<String, List<ProfileRequest>> stormIdToActions = supervisorData.getStormIdToProfilerActions().get();
+ try {
+ for (Map.Entry<String, List<ProfileRequest>> entry : stormIdToActions.entrySet()) {
+ String stormId = entry.getKey();
+ List<ProfileRequest> requests = entry.getValue();
+ if (requests != null) {
+ for (ProfileRequest profileRequest : requests) {
+ if (profileRequest.get_nodeInfo().get_node().equals(hostName)) {
- boolean stop = System.currentTimeMillis() > profileRequest.get_time_stamp() ? true : false;
++ boolean stop = System.currentTimeMillis() > profileRequest.get_time_stamp();
+ Long port = profileRequest.get_nodeInfo().get_port().iterator().next();
+ String targetDir = ConfigUtils.workerArtifactsRoot(conf, String.valueOf(port));
+ Map stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId);
+
+ String user = null;
+ if (stormConf.get(Config.TOPOLOGY_SUBMITTER_USER) != null) {
+ user = (String) (stormConf.get(Config.TOPOLOGY_SUBMITTER_USER));
+ }
+ Map<String, String> env = null;
+ if (stormConf.get(Config.TOPOLOGY_ENVIRONMENT) != null) {
+ env = (Map<String, String>) stormConf.get(Config.TOPOLOGY_ENVIRONMENT);
+ } else {
+ env = new HashMap<String, String>();
+ }
+
+ String str = ConfigUtils.workerArtifactsPidPath(conf, stormId, port.intValue());
+ StringBuilder stringBuilder = new StringBuilder();
- FileReader reader = null;
- BufferedReader br = null;
- try {
- reader = new FileReader(str);
- br = new BufferedReader(reader);
++
++ try (FileReader reader = new FileReader(str);
++ BufferedReader br = new BufferedReader(reader)) {
+ int c;
+ while ((c = br.read()) >= 0) {
+ stringBuilder.append(c);
+ }
- } catch (IOException e) {
- if (reader != null)
- reader.close();
- if (br != null)
- br.close();
+ }
+ String workerPid = stringBuilder.toString().trim();
+ ProfileAction profileAction = profileRequest.get_action();
+ String logPrefix = "ProfilerAction process " + stormId + ":" + port + " PROFILER_ACTION: " + profileAction + " ";
+
+ // Until PROFILER_STOP action is invalid, keep launching profiler start in case worker restarted
+ // The profiler plugin script validates if JVM is recording before starting another recording.
- String command = mkCommand(profileAction, stop, workerPid, targetDir);
- List<String> listCommand = new ArrayList<>();
- if (command != null) {
- listCommand.addAll(Arrays.asList(command.split(" ")));
- }
++ List<String> command = mkCommand(profileAction, stop, workerPid, targetDir);
+ try {
- ActionExitCallback actionExitCallback = new ActionExitCallback(stormId, profileRequest, logPrefix);
- launchProfilerActionForWorker(user, targetDir, listCommand, env, actionExitCallback, logPrefix);
++ ActionExitCallback actionExitCallback = new ActionExitCallback(stormId, profileRequest, logPrefix, stop);
++ launchProfilerActionForWorker(user, targetDir, command, env, actionExitCallback, logPrefix);
+ } catch (IOException e) {
+ LOG.error("Error in processing ProfilerAction '{}' for {}:{}, will retry later", profileAction, stormId, port);
+ } catch (RuntimeException e) {
+ LOG.error("Error in processing ProfilerAction '{}' for {}:{}, will retry later", profileAction, stormId, port);
+ }
+ }
+ }
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("Error running profiler actions, will retry again later");
+ }
+ }
+
+ private void launchProfilerActionForWorker(String user, String targetDir, List<String> commands, Map<String, String> environment,
+ final Utils.ExitCodeCallable exitCodeCallable, String logPrefix) throws IOException {
+ File targetFile = new File(targetDir);
+ if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
+ LOG.info("Running as user:{} command:{}", user, commands);
+ String containerFile = Utils.containerFilePath(targetDir);
+ if (Utils.checkFileExists(containerFile)) {
+ SupervisorUtils.rmrAsUser(conf, containerFile, containerFile);
+ }
+ String scriptFile = Utils.scriptFilePath(targetDir);
+ if (Utils.checkFileExists(scriptFile)) {
+ SupervisorUtils.rmrAsUser(conf, scriptFile, scriptFile);
+ }
+ String script = Utils.writeScript(targetDir, commands, environment);
+ List<String> args = new ArrayList<>();
+ args.add("profiler");
+ args.add(targetDir);
+ args.add(script);
+ SupervisorUtils.processLauncher(conf, user, null, args, environment, logPrefix, exitCodeCallable, targetFile);
+ } else {
+ Utils.launchProcess(commands, environment, logPrefix, exitCodeCallable, targetFile);
+ }
+ }
+
- private String mkCommand(ProfileAction action, boolean stop, String workerPid, String targetDir) {
++ private List<String> mkCommand(ProfileAction action, boolean stop, String workerPid, String targetDir) {
+ if (action == ProfileAction.JMAP_DUMP) {
+ return jmapDumpCmd(workerPid, targetDir);
+ } else if (action == ProfileAction.JSTACK_DUMP) {
+ return jstackDumpCmd(workerPid, targetDir);
+ } else if (action == ProfileAction.JPROFILE_DUMP) {
+ return jprofileDump(workerPid, targetDir);
+ } else if (action == ProfileAction.JVM_RESTART) {
+ return jprofileJvmRestart(workerPid);
+ } else if (!stop && action == ProfileAction.JPROFILE_STOP) {
+ return jprofileStart(workerPid);
+ } else if (stop && action == ProfileAction.JPROFILE_STOP) {
+ return jprofileStop(workerPid, targetDir);
+ }
- return null;
++ return Lists.newArrayList();
+ }
+
- private String jmapDumpCmd(String pid, String targetDir) {
- return profileCmd + " " + pid + " jmap " + targetDir;
++ private List<String> jmapDumpCmd(String pid, String targetDir) {
++ return Lists.newArrayList(profileCmd, pid, "jmap", targetDir);
+ }
+
- private String jstackDumpCmd(String pid, String targetDir) {
- return profileCmd + " " + pid + " jstack " + targetDir;
++ private List<String> jstackDumpCmd(String pid, String targetDir) {
++ return Lists.newArrayList(profileCmd, pid, "jstack", targetDir);
+ }
+
- private String jprofileStart(String pid) {
- return profileCmd + " " + pid + " start";
++ private List<String> jprofileStart(String pid) {
++ return Lists.newArrayList(profileCmd, pid, "start");
+ }
+
- private String jprofileStop(String pid, String targetDir) {
- return profileCmd + " " + pid + " stop " + targetDir;
++ private List<String> jprofileStop(String pid, String targetDir) {
++ return Lists.newArrayList(profileCmd, pid, "stop", targetDir);
+ }
+
- private String jprofileDump(String pid, String targetDir) {
- return profileCmd + " " + pid + " dump " + targetDir;
++ private List<String> jprofileDump(String pid, String targetDir) {
++ return Lists.newArrayList(profileCmd, pid, "dump", targetDir);
+ }
+
- private String jprofileJvmRestart(String pid) {
- return profileCmd + " " + pid + " kill";
++ private List<String> jprofileJvmRestart(String pid) {
++ return Lists.newArrayList(profileCmd, pid, "kill");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/dba69b52/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java
index 5e7b6d3,0000000..3ce8f5d
mode 100644,000000..100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java
@@@ -1,62 -1,0 +1,52 @@@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.daemon.supervisor.timer;
+
+import org.apache.storm.command.HealthCheck;
+import org.apache.storm.daemon.supervisor.SupervisorData;
+import org.apache.storm.daemon.supervisor.SupervisorUtils;
+import org.apache.storm.daemon.supervisor.workermanager.IWorkerManager;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Map;
+
+public class SupervisorHealthCheck implements Runnable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SupervisorHealthCheck.class);
+
+ private SupervisorData supervisorData;
+
+ public SupervisorHealthCheck(SupervisorData supervisorData) {
+ this.supervisorData = supervisorData;
+ }
+
+ @Override
+ public void run() {
+ Map conf = supervisorData.getConf();
+ IWorkerManager workerManager = supervisorData.getWorkerManager();
+ int healthCode = HealthCheck.healthCheck(conf);
- Collection<String> workerIds = SupervisorUtils.supervisorWorkerIds(conf);
+ if (healthCode != 0) {
- for (String workerId : workerIds) {
- try {
- workerManager.shutdownWorker(supervisorData.getSupervisorId(), workerId, supervisorData.getWorkerThreadPids());
- boolean success = workerManager.cleanupWorker(workerId);
- if (success){
- supervisorData.getDeadWorkers().remove(workerId);
- }
- } catch (Exception e) {
- throw Utils.wrapInRuntime(e);
- }
- }
++ SupervisorUtils.shutdownAllWorkers(conf, supervisorData.getSupervisorId(), supervisorData.getWorkerThreadPids(), supervisorData.getDeadWorkers(),
++ workerManager);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/dba69b52/storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/DefaultWorkerManager.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/DefaultWorkerManager.java
index 9529b1a,0000000..05ed82b
mode 100644,000000..100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/DefaultWorkerManager.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/DefaultWorkerManager.java
@@@ -1,408 -1,0 +1,401 @@@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor.workermanager;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.storm.Config;
+import org.apache.storm.ProcessSimulator;
+import org.apache.storm.container.cgroup.CgroupManager;
+import org.apache.storm.daemon.supervisor.SupervisorUtils;
+import org.apache.storm.generated.WorkerResources;
+import org.apache.storm.localizer.Localizer;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+
+public class DefaultWorkerManager implements IWorkerManager {
+
+ private static Logger LOG = LoggerFactory.getLogger(DefaultWorkerManager.class);
+
+ private Map conf;
+ private CgroupManager resourceIsolationManager;
+ private boolean runWorkerAsUser;
+
+ @Override
+ public void prepareWorker(Map conf, Localizer localizer) {
+ this.conf = conf;
+ if (Utils.getBoolean(conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN_ENABLE), false)) {
+ try {
+ this.resourceIsolationManager = Utils.newInstance((String) conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN));
+ this.resourceIsolationManager.prepare(conf);
+ LOG.info("Using resource isolation plugin {} {}", conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN), resourceIsolationManager);
+ } catch (IOException e) {
+ throw Utils.wrapInRuntime(e);
+ }
+ } else {
+ this.resourceIsolationManager = null;
+ }
+ this.runWorkerAsUser = Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false);
+ }
+
+ @Override
- public IWorkerResult launchWorker(String supervisorId, String assignmentId, String stormId, Long port, String workerId, WorkerResources resources,
++ public void launchWorker(String supervisorId, String assignmentId, String stormId, Long port, String workerId, WorkerResources resources,
+ Utils.ExitCodeCallable workerExitCallback) {
+ try {
+
+ String stormHome = ConfigUtils.concatIfNotNull(System.getProperty("storm.home"));
+ String stormOptions = ConfigUtils.concatIfNotNull(System.getProperty("storm.options"));
+ String stormConfFile = ConfigUtils.concatIfNotNull(System.getProperty("storm.conf.file"));
+ String workerTmpDir = ConfigUtils.workerTmpRoot(conf, workerId);
+
+ String stormLogDir = ConfigUtils.getLogDir();
+ String stormLogConfDir = (String) (conf.get(Config.STORM_LOG4J2_CONF_DIR));
+
+ String stormLog4j2ConfDir;
+ if (StringUtils.isNotBlank(stormLogConfDir)) {
+ if (Utils.isAbsolutePath(stormLogConfDir)) {
+ stormLog4j2ConfDir = stormLogConfDir;
+ } else {
+ stormLog4j2ConfDir = stormHome + Utils.FILE_PATH_SEPARATOR + stormLogConfDir;
+ }
+ } else {
+ stormLog4j2ConfDir = stormHome + Utils.FILE_PATH_SEPARATOR + "log4j2";
+ }
+
+ String stormRoot = ConfigUtils.supervisorStormDistRoot(conf, stormId);
+
+ String jlp = jlp(stormRoot, conf);
+
+ String stormJar = ConfigUtils.supervisorStormJarPath(stormRoot);
+
+ Map stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId);
+
+ String workerClassPath = getWorkerClassPath(stormJar, stormConf);
+
+ Object topGcOptsObject = stormConf.get(Config.TOPOLOGY_WORKER_GC_CHILDOPTS);
+ List<String> topGcOpts = new ArrayList<>();
+ if (topGcOptsObject instanceof String) {
+ topGcOpts.add((String) topGcOptsObject);
+ } else if (topGcOptsObject instanceof List) {
+ topGcOpts.addAll((List<String>) topGcOptsObject);
+ }
+
+ int memOnheap = 0;
+ if (resources.get_mem_on_heap() > 0) {
+ memOnheap = (int) Math.ceil(resources.get_mem_on_heap());
+ } else {
+ // set the default heap memory size for supervisor-test
+ memOnheap = Utils.getInt(stormConf.get(Config.WORKER_HEAP_MEMORY_MB), 768);
+ }
+
+ int memoffheap = (int) Math.ceil(resources.get_mem_off_heap());
+
+ int cpu = (int) Math.ceil(resources.get_cpu());
+
+ List<String> gcOpts = null;
+
+ if (topGcOpts.size() > 0) {
+ gcOpts = substituteChildopts(topGcOpts, workerId, stormId, port, memOnheap);
+ } else {
+ gcOpts = substituteChildopts(conf.get(Config.WORKER_GC_CHILDOPTS), workerId, stormId, port, memOnheap);
+ }
+
+ Object topoWorkerLogwriterObject = stormConf.get(Config.TOPOLOGY_WORKER_LOGWRITER_CHILDOPTS);
+ List<String> topoWorkerLogwriterChildopts = new ArrayList<>();
+ if (topoWorkerLogwriterObject instanceof String) {
+ topoWorkerLogwriterChildopts.add((String) topoWorkerLogwriterObject);
+ } else if (topoWorkerLogwriterObject instanceof List) {
+ topoWorkerLogwriterChildopts.addAll((List<String>) topoWorkerLogwriterObject);
+ }
+
+ String user = (String) stormConf.get(Config.TOPOLOGY_SUBMITTER_USER);
+
+ String logfileName = "worker.log";
+
+ String workersArtifacets = ConfigUtils.workerArtifactsRoot(conf);
+
+ String loggingSensitivity = (String) stormConf.get(Config.TOPOLOGY_LOGGING_SENSITIVITY);
+ if (loggingSensitivity == null) {
+ loggingSensitivity = "S3";
+ }
+
+ List<String> workerChildopts = substituteChildopts(conf.get(Config.WORKER_CHILDOPTS), workerId, stormId, port, memOnheap);
+
+ List<String> topWorkerChildopts = substituteChildopts(stormConf.get(Config.TOPOLOGY_WORKER_CHILDOPTS), workerId, stormId, port, memOnheap);
+
+ List<String> workerProfilerChildopts = null;
+ if (Utils.getBoolean(conf.get(Config.WORKER_PROFILER_ENABLED), false)) {
+ workerProfilerChildopts = substituteChildopts(conf.get(Config.WORKER_PROFILER_CHILDOPTS), workerId, stormId, port, memOnheap);
+ } else {
+ workerProfilerChildopts = new ArrayList<>();
+ }
+
+ Map<String, String> topEnvironment = new HashMap<String, String>();
+ Map<String, String> environment = (Map<String, String>) stormConf.get(Config.TOPOLOGY_ENVIRONMENT);
+ if (environment != null) {
+ topEnvironment.putAll(environment);
+ }
+ topEnvironment.put("LD_LIBRARY_PATH", jlp);
+
+ String log4jConfigurationFile = null;
+ if (System.getProperty("os.name").startsWith("Windows") && !stormLog4j2ConfDir.startsWith("file:")) {
+ log4jConfigurationFile = "file:///" + stormLog4j2ConfDir;
+ } else {
+ log4jConfigurationFile = stormLog4j2ConfDir;
+ }
+ log4jConfigurationFile = log4jConfigurationFile + Utils.FILE_PATH_SEPARATOR + "worker.xml";
+
+ List<String> commandList = new ArrayList<>();
+ commandList.add(SupervisorUtils.javaCmd("java"));
+ commandList.add("-cp");
+ commandList.add(workerClassPath);
+ commandList.addAll(topoWorkerLogwriterChildopts);
+ commandList.add("-Dlogfile.name=" + logfileName);
+ commandList.add("-Dstorm.home=" + stormHome);
+ commandList.add("-Dworkers.artifacts=" + workersArtifacets);
+ commandList.add("-Dstorm.id=" + stormId);
+ commandList.add("-Dworker.id=" + workerId);
+ commandList.add("-Dworker.port=" + port);
+ commandList.add("-Dstorm.log.dir=" + stormLogDir);
+ commandList.add("-Dlog4j.configurationFile=" + log4jConfigurationFile);
+ commandList.add("-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector");
+ commandList.add("org.apache.storm.LogWriter");
+
+ commandList.add(SupervisorUtils.javaCmd("java"));
+ commandList.add("-server");
+ commandList.addAll(workerChildopts);
+ commandList.addAll(topWorkerChildopts);
+ commandList.addAll(gcOpts);
+ commandList.addAll(workerProfilerChildopts);
+ commandList.add("-Djava.library.path=" + jlp);
+ commandList.add("-Dlogfile.name=" + logfileName);
+ commandList.add("-Dstorm.home=" + stormHome);
+ commandList.add("-Dworkers.artifacts=" + workersArtifacets);
+ commandList.add("-Dstorm.conf.file=" + stormConfFile);
+ commandList.add("-Dstorm.options=" + stormOptions);
+ commandList.add("-Dstorm.log.dir=" + stormLogDir);
+ commandList.add("-Djava.io.tmpdir=" + workerTmpDir);
+ commandList.add("-Dlogging.sensitivity=" + loggingSensitivity);
+ commandList.add("-Dlog4j.configurationFile=" + log4jConfigurationFile);
+ commandList.add("-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector");
+ commandList.add("-Dstorm.id=" + stormId);
+ commandList.add("-Dworker.id=" + workerId);
+ commandList.add("-Dworker.port=" + port);
+ commandList.add("-cp");
+ commandList.add(workerClassPath);
+ commandList.add("org.apache.storm.daemon.worker");
+ commandList.add(stormId);
+ commandList.add(assignmentId);
+ commandList.add(String.valueOf(port));
+ commandList.add(workerId);
+
+ // {"cpu" cpu "memory" (+ mem-onheap mem-offheap (int (Math/ceil (conf STORM-CGROUP-MEMORY-LIMIT-TOLERANCE-MARGIN-MB))))
+ if (resourceIsolationManager != null) {
+ int cGroupMem = (int) (Math.ceil((double) conf.get(Config.STORM_CGROUP_MEMORY_LIMIT_TOLERANCE_MARGIN_MB)));
+ int memoryValue = memoffheap + memOnheap + cGroupMem;
+ int cpuValue = cpu;
+ Map<String, Number> map = new HashMap<>();
+ map.put("cpu", cpuValue);
+ map.put("memory", memoryValue);
+ resourceIsolationManager.reserveResourcesForWorker(workerId, map);
+ commandList = resourceIsolationManager.getLaunchCommand(workerId, commandList);
+ }
+
+ LOG.info("Launching worker with command: {}. ", Utils.shellCmd(commandList));
+
+ String logPrefix = "Worker Process " + workerId;
+ String workerDir = ConfigUtils.workerRoot(conf, workerId);
+
+ if (runWorkerAsUser) {
+ List<String> args = new ArrayList<>();
+ args.add("worker");
+ args.add(workerDir);
+ args.add(Utils.writeScript(workerDir, commandList, topEnvironment));
+ List<String> commandPrefix = null;
+ if (resourceIsolationManager != null)
+ commandPrefix = resourceIsolationManager.getLaunchCommandPrefix(workerId);
+ SupervisorUtils.processLauncher(conf, user, commandPrefix, args, null, logPrefix, workerExitCallback, new File(workerDir));
+ } else {
+ Utils.launchProcess(commandList, topEnvironment, logPrefix, workerExitCallback, new File(workerDir));
+ }
+ } catch (IOException e) {
+ throw Utils.wrapInRuntime(e);
+ }
- return null;
+ }
+
+ @Override
- public IWorkerResult shutdownWorker(String supervisorId, String workerId, Map<String, String> workerThreadPids) {
++ public void shutdownWorker(String supervisorId, String workerId, Map<String, String> workerThreadPids) {
+ try {
+ LOG.info("Shutting down {}:{}", supervisorId, workerId);
+ Collection<String> pids = Utils.readDirContents(ConfigUtils.workerPidsRoot(conf, workerId));
+ Integer shutdownSleepSecs = Utils.getInt(conf.get(Config.SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS));
+ String user = ConfigUtils.getWorkerUser(conf, workerId);
+ String threadPid = workerThreadPids.get(workerId);
+ if (StringUtils.isNotBlank(threadPid)) {
+ ProcessSimulator.killProcess(threadPid);
+ }
+
+ for (String pid : pids) {
+ if (runWorkerAsUser) {
+ List<String> commands = new ArrayList<>();
+ commands.add("signal");
+ commands.add(pid);
+ commands.add("15");
+ String logPrefix = "kill -15 " + pid;
+ SupervisorUtils.processLauncherAndWait(conf, user, commands, null, logPrefix);
+ } else {
+ Utils.killProcessWithSigTerm(pid);
+ }
+ }
+
+ if (pids.size() > 0) {
+ LOG.info("Sleep {} seconds for execution of cleanup threads on worker.", shutdownSleepSecs);
+ Time.sleepSecs(shutdownSleepSecs);
+ }
+
+ for (String pid : pids) {
+ if (runWorkerAsUser) {
+ List<String> commands = new ArrayList<>();
+ commands.add("signal");
+ commands.add(pid);
+ commands.add("9");
+ String logPrefix = "kill -9 " + pid;
+ SupervisorUtils.processLauncherAndWait(conf, user, commands, null, logPrefix);
+ } else {
+ Utils.forceKillProcess(pid);
+ }
+ String path = ConfigUtils.workerPidPath(conf, workerId, pid);
+ if (runWorkerAsUser) {
+ SupervisorUtils.rmrAsUser(conf, workerId, path);
+ } else {
+ try {
+ LOG.debug("Removing path {}", path);
+ new File(path).delete();
+ } catch (Exception e) {
+ // on windows, the supervisor may still holds the lock on the worker directory
+ // ignore
+ }
+ }
+ }
+ LOG.info("Shut down {}:{}", supervisorId, workerId);
+ } catch (Exception e) {
+ throw Utils.wrapInRuntime(e);
+ }
- return null;
+ }
+
+ @Override
+ public boolean cleanupWorker(String workerId) {
+ try {
+ //clean up for resource isolation if enabled
+ if (resourceIsolationManager != null) {
+ resourceIsolationManager.releaseResourcesForWorker(workerId);
+ }
+ //Always make sure to clean up everything else before worker directory
+ //is removed since that is what is going to trigger the retry for cleanup
+ String workerRoot = ConfigUtils.workerRoot(conf, workerId);
+ if (Utils.checkFileExists(workerRoot)) {
+ if (runWorkerAsUser) {
+ SupervisorUtils.rmrAsUser(conf, workerId, workerRoot);
+ } else {
+ Utils.forceDelete(ConfigUtils.workerHeartbeatsRoot(conf, workerId));
+ Utils.forceDelete(ConfigUtils.workerPidsRoot(conf, workerId));
+ Utils.forceDelete(ConfigUtils.workerTmpRoot(conf, workerId));
+ Utils.forceDelete(ConfigUtils.workerRoot(conf, workerId));
+ }
+ ConfigUtils.removeWorkerUserWSE(conf, workerId);
+ }
+ return true;
+ } catch (IOException e) {
+ LOG.warn("Failed to cleanup worker {}. Will retry later", workerId, e);
+ } catch (RuntimeException e) {
+ LOG.warn("Failed to cleanup worker {}. Will retry later", workerId, e);
+ }
+ return false;
+ }
+
- @Override
- public IWorkerResult resizeWorker(String supervisorId, String assignmentId, String stormId, Long port, String workerId, WorkerResources resources) {
- return null;
- }
-
+ protected String jlp(String stormRoot, Map conf) {
+ String resourceRoot = stormRoot + Utils.FILE_PATH_SEPARATOR + ConfigUtils.RESOURCES_SUBDIR;
+ String os = System.getProperty("os.name").replaceAll("\\s+", "_");
+ String arch = System.getProperty("os.arch");
+ String archResourceRoot = resourceRoot + Utils.FILE_PATH_SEPARATOR + os + "-" + arch;
+ String ret = archResourceRoot + Utils.CLASS_PATH_SEPARATOR + resourceRoot + Utils.CLASS_PATH_SEPARATOR + conf.get(Config.JAVA_LIBRARY_PATH);
+ return ret;
+ }
+
+ protected String getWorkerClassPath(String stormJar, Map stormConf) {
+ List<String> topoClasspath = new ArrayList<>();
+ Object object = stormConf.get(Config.TOPOLOGY_CLASSPATH);
+
+ if (object instanceof List) {
+ topoClasspath.addAll((List<String>) object);
+ } else if (object instanceof String) {
+ topoClasspath.add((String) object);
- } else {
- LOG.error("topology specific classpath is invaild");
+ }
++ LOG.debug("topology specific classpath is {}", object);
++
+ String classPath = Utils.workerClasspath();
+ String classAddPath = Utils.addToClasspath(classPath, Arrays.asList(stormJar));
+ return Utils.addToClasspath(classAddPath, topoClasspath);
+ }
+
++ private static String substituteChildOptsInternal(String string, String workerId, String stormId, Long port, int memOnheap) {
++ if (StringUtils.isNotBlank(string)){
++ string = string.replace("%ID%", String.valueOf(port));
++ string = string.replace("%WORKER-ID%", workerId);
++ string = string.replace("%TOPOLOGY-ID%", stormId);
++ string = string.replace("%WORKER-PORT%", String.valueOf(port));
++ string = string.replace("%HEAP-MEM%", String.valueOf(memOnheap));
++ }
++ return string;
++ }
++
+ /**
+ * "Generates runtime childopts by replacing keys with topology-id, worker-id, port, mem-onheap"
+ *
+ * @param value
+ * @param workerId
+ * @param stormId
+ * @param port
+ * @param memOnheap
+ */
+ public List<String> substituteChildopts(Object value, String workerId, String stormId, Long port, int memOnheap) {
+ List<String> rets = new ArrayList<>();
+ if (value instanceof String) {
- String string = (String) value;
++ String string = substituteChildOptsInternal((String) value, workerId, stormId, port, memOnheap);
+ if (StringUtils.isNotBlank(string)){
- string = string.replace("%ID%", String.valueOf(port));
- string = string.replace("%WORKER-ID%", workerId);
- string = string.replace("%TOPOLOGY-ID%", stormId);
- string = string.replace("%WORKER-PORT%", String.valueOf(port));
- string = string.replace("%HEAP-MEM%", String.valueOf(memOnheap));
+ String[] strings = string.split("\\s+");
+ rets.addAll(Arrays.asList(strings));
+ }
-
+ } else if (value instanceof List) {
+ List<Object> objects = (List<Object>) value;
+ for (Object object : objects) {
- String str = (String) object;
++ String str = substituteChildOptsInternal((String) object, workerId, stormId, port, memOnheap);
+ if (StringUtils.isNotBlank(str)){
- str = str.replace("%ID%", String.valueOf(port));
- str = str.replace("%WORKER-ID%", workerId);
- str = str.replace("%TOPOLOGY-ID%", stormId);
- str = str.replace("%WORKER-PORT%", String.valueOf(port));
- str = str.replace("%HEAP-MEM%", String.valueOf(memOnheap));
+ rets.add(str);
+ }
+ }
+ }
+ return rets;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/dba69b52/storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/IWorkerManager.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/IWorkerManager.java
index 3b0912a,0000000..e62b9d8
mode 100644,000000..100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/IWorkerManager.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/IWorkerManager.java
@@@ -1,38 -1,0 +1,35 @@@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor.workermanager;
+
+import org.apache.storm.generated.WorkerResources;
+import org.apache.storm.localizer.Localizer;
+import org.apache.storm.utils.Utils;
+
+import java.util.List;
+import java.util.Map;
+
+public interface IWorkerManager {
- public void prepareWorker(Map conf, Localizer localizer);
++ void prepareWorker(Map conf, Localizer localizer);
+
- IWorkerResult launchWorker(String supervisorId, String assignmentId, String stormId, Long port, String workerId, WorkerResources resources,
++ void launchWorker(String supervisorId, String assignmentId, String stormId, Long port, String workerId, WorkerResources resources,
+ Utils.ExitCodeCallable workerExitCallback);
++ void shutdownWorker(String supervisorId, String workerId, Map<String, String> workerThreadPids);
+
- IWorkerResult shutdownWorker(String supervisorId, String workerId, Map<String, String> workerThreadPids);
-
- IWorkerResult resizeWorker(String supervisorId, String assignmentId, String stormId, Long port, String workerId, WorkerResources resources);
-
- public boolean cleanupWorker(String workerId);
++ boolean cleanupWorker(String workerId);
+}