You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/09/19 21:01:35 UTC
[6/8] storm git commit: STORM-2018: Supervisor V2
http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Container.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Container.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Container.java
new file mode 100644
index 0000000..d76b950
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Container.java
@@ -0,0 +1,549 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Writer;
+import java.lang.ProcessBuilder.Redirect;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.storm.Config;
+import org.apache.storm.container.ResourceIsolationInterface;
+import org.apache.storm.generated.LSWorkerHeartbeat;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.generated.ProfileRequest;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+
+/**
+ * Represents a container that a worker will run in.
+ */
+public abstract class Container implements Killable {
+ private static final Logger LOG = LoggerFactory.getLogger(Container.class);
+ public static enum ContainerType {
+ LAUNCH(false, false),
+ RECOVER_FULL(true, false),
+ RECOVER_PARTIAL(true, true);
+
+ private final boolean _recovery;
+ private final boolean _onlyKillable;
+
+ ContainerType(boolean recovery, boolean onlyKillable) {
+ _recovery = recovery;
+ _onlyKillable = onlyKillable;
+ }
+
+ public boolean isRecovery() {
+ return _recovery;
+ }
+
+ public void assertFull() {
+ if (_onlyKillable) {
+ throw new IllegalStateException("Container is only Killable.");
+ }
+ }
+
+ public boolean isOnlyKillable() {
+ return _onlyKillable;
+ }
+ }
+
+ protected final Map<String, Object> _conf;
+ protected final Map<String, Object> _topoConf; //Not set if RECOVER_PARTIAL
+ protected String _workerId;
+ protected final String _topologyId; //Not set if RECOVER_PARTIAL
+ protected final String _supervisorId;
+ protected final int _port; //Not set if RECOVER_PARTIAL
+ protected final LocalAssignment _assignment; //Not set if RECOVER_PARTIAL
+ protected final AdvancedFSOps _ops;
+ protected final ResourceIsolationInterface _resourceIsolationManager;
+ protected ContainerType _type;
+
+ /**
+ * Create a new Container.
+ * @param type the type of container being made.
+ * @param conf the supervisor config
+ * @param supervisorId the ID of the supervisor this is a part of.
+ * @param port the port the container is on. Should be <= 0 if only a partial recovery
+ * @param assignment the assignment for this container. Should be null if only a partial recovery.
+ * @param resourceIsolationManager used to isolate resources for a container can be null if no isolation is used.
+ * @param workerId the id of the worker to use. Must not be null if doing a partial recovery.
+ * @param topoConf the config of the topology (mostly for testing) if null
+ * and not a partial recovery the real conf is read.
+ * @param ops file system operations (mostly for testing) if null a new one is made
+ * @throws IOException on any error.
+ */
+ protected Container(ContainerType type, Map<String, Object> conf, String supervisorId,
+ int port, LocalAssignment assignment, ResourceIsolationInterface resourceIsolationManager,
+ String workerId, Map<String, Object> topoConf, AdvancedFSOps ops) throws IOException {
+ assert(type != null);
+ assert(conf != null);
+ assert(supervisorId != null);
+
+ if (ops == null) {
+ ops = AdvancedFSOps.make(conf);
+ }
+
+ _workerId = workerId;
+ _type = type;
+ _port = port;
+ _ops = ops;
+ _conf = conf;
+ _supervisorId = supervisorId;
+ _resourceIsolationManager = resourceIsolationManager;
+ _assignment = assignment;
+
+ if (_type.isOnlyKillable()) {
+ assert(_assignment == null);
+ assert(_port <= 0);
+ assert(_workerId != null);
+ _topologyId = null;
+ _topoConf = null;
+ } else {
+ assert(assignment != null);
+ assert(port > 0);
+ _topologyId = assignment.get_topology_id();
+ if (!_ops.doRequiredTopoFilesExist(_conf, _topologyId)) {
+ LOG.info("Missing topology storm code, so can't launch worker with assignment {} for this supervisor {} on port {} with id {}", _assignment,
+ _supervisorId, _port, _workerId);
+ throw new ContainerRecoveryException("Missing required topology files...");
+ }
+ if (topoConf == null) {
+ _topoConf = readTopoConf();
+ } else {
+ //For testing...
+ _topoConf = topoConf;
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "topo:" + _topologyId + " worker:" + _workerId;
+ }
+
+ protected Map<String, Object> readTopoConf() throws IOException {
+ assert(_topologyId != null);
+ return ConfigUtils.readSupervisorStormConf(_conf, _topologyId);
+ }
+
+ /**
+ * Kill a given process
+ * @param pid the id of the process to kill
+ * @throws IOException
+ */
+ protected void kill(long pid) throws IOException {
+ Utils.killProcessWithSigTerm(String.valueOf(pid));
+ }
+
+ /**
+ * Kill a given process
+ * @param pid the id of the process to kill
+ * @throws IOException
+ */
+ protected void forceKill(long pid) throws IOException {
+ Utils.forceKillProcess(String.valueOf(pid));
+ }
+
+ @Override
+ public void kill() throws IOException {
+ LOG.info("Killing {}:{}", _supervisorId, _workerId);
+ Set<Long> pids = getAllPids();
+
+ for (Long pid : pids) {
+ kill(pid);
+ }
+ }
+
+ @Override
+ public void forceKill() throws IOException {
+ LOG.info("Force Killing {}:{}", _supervisorId, _workerId);
+ Set<Long> pids = getAllPids();
+
+ for (Long pid : pids) {
+ forceKill(pid);
+ }
+ }
+
+ /**
+ * Read the Heartbeat for the current container.
+ * @return the Heartbeat
+ * @throws IOException on any error
+ */
+ public LSWorkerHeartbeat readHeartbeat() throws IOException {
+ LocalState localState = ConfigUtils.workerState(_conf, _workerId);
+ LSWorkerHeartbeat hb = localState.getWorkerHeartBeat();
+ LOG.trace("{}: Reading heartbeat {}", _workerId, hb);
+ return hb;
+ }
+
+ /**
+ * Is a process alive and running?
+ * @param pid the PID of the running process
+ * @param user the user that is expected to own that process
+ * @return true if it is, else false
+ * @throws IOException on any error
+ */
+ protected boolean isProcessAlive(long pid, String user) throws IOException {
+ if (Utils.IS_ON_WINDOWS) {
+ return isWindowsProcessAlive(pid, user);
+ }
+ return isPosixProcessAlive(pid, user);
+ }
+
+ private boolean isWindowsProcessAlive(long pid, String user) throws IOException {
+ boolean ret = false;
+ ProcessBuilder pb = new ProcessBuilder("tasklist", "/fo", "list", "/fi", "pid eq " + pid, "/v");
+ pb.redirectError(Redirect.INHERIT);
+ Process p = pb.start();
+ try (BufferedReader in = new BufferedReader(new InputStreamReader(p.getInputStream()))) {
+ String read;
+ while ((read = in.readLine()) != null) {
+ if (read.contains("User Name:")) { //Check for : in case someone called their user "User Name"
+ //This line contains the user name for the pid we're looking up
+ //Example line: "User Name: exampleDomain\exampleUser"
+ List<String> userNameLineSplitOnWhitespace = Arrays.asList(read.split(":"));
+ if(userNameLineSplitOnWhitespace.size() == 2){
+ List<String> userAndMaybeDomain = Arrays.asList(userNameLineSplitOnWhitespace.get(1).trim().split("\\\\"));
+ String processUser = userAndMaybeDomain.size() == 2 ? userAndMaybeDomain.get(1) : userAndMaybeDomain.get(0);
+ if(user.equals(processUser)){
+ ret = true;
+ } else {
+ LOG.info("Found {} running as {}, but expected it to be {}", pid, processUser, user);
+ }
+ } else {
+ LOG.error("Received unexpected output from tasklist command. Expected one colon in user name line. Line was {}", read);
+ }
+ break;
+ }
+ }
+ }
+ return ret;
+ }
+
+ private boolean isPosixProcessAlive(long pid, String user) throws IOException {
+ boolean ret = false;
+ ProcessBuilder pb = new ProcessBuilder("ps", "-o", "user", "-p", String.valueOf(pid));
+ pb.redirectError(Redirect.INHERIT);
+ Process p = pb.start();
+ try (BufferedReader in = new BufferedReader(new InputStreamReader(p.getInputStream()))) {
+ String first = in.readLine();
+ assert("USER".equals(first));
+ String processUser;
+ while ((processUser = in.readLine()) != null) {
+ if (user.equals(processUser)) {
+ ret = true;
+ break;
+ } else {
+ LOG.info("Found {} running as {}, but expected it to be {}", pid, processUser, user);
+ }
+ }
+ }
+ return ret;
+ }
+
+ @Override
+ public boolean areAllProcessesDead() throws IOException {
+ Set<Long> pids = getAllPids();
+ String user = getWorkerUser();
+
+ boolean allDead = true;
+ for (Long pid: pids) {
+ if (!isProcessAlive(pid, user)) {
+ LOG.debug("{}: PID {} is dead", _workerId, pid);
+ } else {
+ allDead = false;
+ break;
+ }
+ }
+ return allDead;
+ }
+
+ @Override
+ public void cleanUp() throws IOException {
+ cleanUpForRestart();
+ }
+
+ /**
+ * Setup the container to run. By default this creates the needed directories/links in the
+ * local file system
+ * PREREQUISITE: All needed blobs and topology, jars/configs have been downloaded and
+ * placed in the appropriate locations
+ * @throws IOException on any error
+ */
+ protected void setup() throws IOException {
+ _type.assertFull();
+ if (!_ops.doRequiredTopoFilesExist(_conf, _topologyId)) {
+ LOG.info("Missing topology storm code, so can't launch worker with assignment {} for this supervisor {} on port {} with id {}", _assignment,
+ _supervisorId, _port, _workerId);
+ throw new IllegalStateException("Not all needed files are here!!!!");
+ }
+ LOG.info("Setting up {}:{}", _supervisorId, _workerId);
+
+ _ops.forceMkdir(new File(ConfigUtils.workerPidsRoot(_conf, _workerId)));
+ _ops.forceMkdir(new File(ConfigUtils.workerTmpRoot(_conf, _workerId)));
+ _ops.forceMkdir(new File(ConfigUtils.workerHeartbeatsRoot(_conf, _workerId)));
+
+ File workerArtifacts = new File(ConfigUtils.workerArtifactsRoot(_conf, _topologyId, _port));
+ if (!_ops.fileExists(workerArtifacts)) {
+ _ops.forceMkdir(workerArtifacts);
+ _ops.setupStormCodeDir(_topoConf, workerArtifacts);
+ }
+
+ String user = getWorkerUser();
+ writeLogMetadata(user);
+ saveWorkerUser(user);
+ createArtifactsLink();
+ createBlobstoreLinks();
+ }
+
+ /**
+ * Write out the file used by the log viewer to allow/reject log access
+ * @param user the user this is going to run as
+ * @throws IOException on any error
+ */
+ @SuppressWarnings("unchecked")
+ protected void writeLogMetadata(String user) throws IOException {
+ _type.assertFull();
+ Map<String, Object> data = new HashMap<>();
+ data.put(Config.TOPOLOGY_SUBMITTER_USER, user);
+ data.put("worker-id", _workerId);
+
+ Set<String> logsGroups = new HashSet<>();
+ if (_topoConf.get(Config.LOGS_GROUPS) != null) {
+ List<String> groups = (List<String>) _topoConf.get(Config.LOGS_GROUPS);
+ for (String group : groups){
+ logsGroups.add(group);
+ }
+ }
+ if (_topoConf.get(Config.TOPOLOGY_GROUPS) != null) {
+ List<String> topGroups = (List<String>) _topoConf.get(Config.TOPOLOGY_GROUPS);
+ logsGroups.addAll(topGroups);
+ }
+ data.put(Config.LOGS_GROUPS, logsGroups.toArray());
+
+ Set<String> logsUsers = new HashSet<>();
+ if (_topoConf.get(Config.LOGS_USERS) != null) {
+ List<String> logUsers = (List<String>) _topoConf.get(Config.LOGS_USERS);
+ for (String logUser : logUsers){
+ logsUsers.add(logUser);
+ }
+ }
+ if (_topoConf.get(Config.TOPOLOGY_USERS) != null) {
+ List<String> topUsers = (List<String>) _topoConf.get(Config.TOPOLOGY_USERS);
+ for (String logUser : topUsers){
+ logsUsers.add(logUser);
+ }
+ }
+ data.put(Config.LOGS_USERS, logsUsers.toArray());
+
+ File file = ConfigUtils.getLogMetaDataFile(_conf, _topologyId, _port);
+
+ Yaml yaml = new Yaml();
+ try (Writer writer = _ops.getWriter(file)) {
+ yaml.dump(data, writer);
+ }
+ }
+
+ /**
+ * Create symlink from the containers directory/artifacts to the artifacts directory
+ * @throws IOException on any error
+ */
+ protected void createArtifactsLink() throws IOException {
+ _type.assertFull();
+ File workerDir = new File(ConfigUtils.workerRoot(_conf, _workerId));
+ File topoDir = new File(ConfigUtils.workerArtifactsRoot(_conf, _topologyId, _port));
+ if (_ops.fileExists(workerDir)) {
+ LOG.debug("Creating symlinks for worker-id: {} topology-id: {} to its port artifacts directory", _workerId, _topologyId);
+ _ops.createSymlink(new File(workerDir, "artifacts"), topoDir);
+ }
+ }
+
+ /**
+ * Create symlinks for each of the blobs from the container's directory to
+ * corresponding links in the storm dist directory.
+ * @throws IOException on any error.
+ */
+ protected void createBlobstoreLinks() throws IOException {
+ _type.assertFull();
+ String stormRoot = ConfigUtils.supervisorStormDistRoot(_conf, _topologyId);
+ String workerRoot = ConfigUtils.workerRoot(_conf, _workerId);
+
+ @SuppressWarnings("unchecked")
+ Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String, Object>>) _topoConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
+ List<String> blobFileNames = new ArrayList<>();
+ if (blobstoreMap != null) {
+ for (Map.Entry<String, Map<String, Object>> entry : blobstoreMap.entrySet()) {
+ String key = entry.getKey();
+ Map<String, Object> blobInfo = entry.getValue();
+ String ret = null;
+ if (blobInfo != null && blobInfo.containsKey("localname")) {
+ ret = (String) blobInfo.get("localname");
+ } else {
+ ret = key;
+ }
+ blobFileNames.add(ret);
+ }
+ }
+ List<String> resourceFileNames = new ArrayList<>();
+ resourceFileNames.add(ConfigUtils.RESOURCES_SUBDIR);
+ resourceFileNames.addAll(blobFileNames);
+ LOG.info("Creating symlinks for worker-id: {} storm-id: {} for files({}): {}", _workerId, _topologyId, resourceFileNames.size(), resourceFileNames);
+ _ops.createSymlink(new File(workerRoot, ConfigUtils.RESOURCES_SUBDIR),
+ new File(stormRoot, ConfigUtils.RESOURCES_SUBDIR));
+ for (String fileName : blobFileNames) {
+ _ops.createSymlink(new File(workerRoot, fileName),
+ new File(stormRoot, fileName));
+ }
+ }
+
+ /**
+ * @return all of the pids that are a part of this container.
+ */
+ protected Set<Long> getAllPids() throws IOException {
+ Set<Long> ret = new HashSet<>();
+ for (String listing: Utils.readDirContents(ConfigUtils.workerPidsRoot(_conf, _workerId))) {
+ ret.add(Long.valueOf(listing));
+ }
+
+ if (_resourceIsolationManager != null) {
+ Set<Long> morePids = _resourceIsolationManager.getRunningPIDs(_workerId);
+ assert(morePids != null);
+ ret.addAll(morePids);
+ }
+
+ return ret;
+ }
+
+ /**
+ * @return the user that some operations should be done as.
+ * @throws IOException on any error
+ */
+ protected String getWorkerUser() throws IOException {
+ LOG.info("GET worker-user for {}", _workerId);
+ File file = new File(ConfigUtils.workerUserFile(_conf, _workerId));
+
+ if (_ops.fileExists(file)) {
+ return _ops.slurpString(file).trim();
+ } else if (_topoConf != null) {
+ return (String) _topoConf.get(Config.TOPOLOGY_SUBMITTER_USER);
+ }
+ if (ConfigUtils.isLocalMode(_conf)) {
+ return System.getProperty("user.name");
+ } else {
+ File f = new File(ConfigUtils.workerArtifactsRoot(_conf));
+ if (f.exists()) {
+ return Files.getOwner(f.toPath()).getName();
+ }
+ throw new IllegalStateException("Could not recover the user for " + _workerId);
+ }
+ }
+
+ protected void saveWorkerUser(String user) throws IOException {
+ _type.assertFull();
+ LOG.info("SET worker-user {} {}", _workerId, user);
+ _ops.dump(new File(ConfigUtils.workerUserFile(_conf, _workerId)), user);
+ }
+
+ protected void deleteSavedWorkerUser() throws IOException {
+ LOG.info("REMOVE worker-user {}", _workerId);
+ _ops.deleteIfExists(new File(ConfigUtils.workerUserFile(_conf, _workerId)));
+ }
+
+ /**
+ * Clean up the container partly preparing for restart.
+ * By default delete all of the temp directories we are going
+ * to get a new worker_id anyways.
+ * POST CONDITION: the workerId will be set to null
+ * @throws IOException on any error
+ */
+ public void cleanUpForRestart() throws IOException {
+ LOG.info("Cleaning up {}:{}", _supervisorId, _workerId);
+ Set<Long> pids = getAllPids();
+ String user = getWorkerUser();
+
+ for (Long pid : pids) {
+ File path = new File(ConfigUtils.workerPidPath(_conf, _workerId, pid));
+ _ops.deleteIfExists(path, user, _workerId);
+ }
+
+ //clean up for resource isolation if enabled
+ if (_resourceIsolationManager != null) {
+ _resourceIsolationManager.releaseResourcesForWorker(_workerId);
+ }
+
+ //Always make sure to clean up everything else before worker directory
+ //is removed since that is what is going to trigger the retry for cleanup
+ _ops.deleteIfExists(new File(ConfigUtils.workerHeartbeatsRoot(_conf, _workerId)), user, _workerId);
+ _ops.deleteIfExists(new File(ConfigUtils.workerPidsRoot(_conf, _workerId)), user, _workerId);
+ _ops.deleteIfExists(new File(ConfigUtils.workerTmpRoot(_conf, _workerId)), user, _workerId);
+ _ops.deleteIfExists(new File(ConfigUtils.workerRoot(_conf, _workerId)), user, _workerId);
+ deleteSavedWorkerUser();
+ _workerId = null;
+ }
+
+ /**
+ * Launch the process for the first time
+ * PREREQUISITE: setup has run and passed
+ * @throws IOException on any error
+ */
+ public abstract void launch() throws IOException;
+
+ /**
+ * Restart the processes in this container
+ * PREREQUISITE: cleanUpForRestart has run and passed
+ * @throws IOException on any error
+ */
+ public abstract void relaunch() throws IOException;
+
+ /**
+ * @return true if the main process exited, else false. This is just best effort return false if unknown.
+ */
+ public abstract boolean didMainProcessExit();
+
+ /**
+ * Run a profiling request
+ * @param request the request to run
+ * @param stop is this a stop request?
+ * @return true if it succeeded, else false
+ * @throws IOException on any error
+ * @throws InterruptedException if running the command is interrupted.
+ */
+ public abstract boolean runProfiling(ProfileRequest request, boolean stop) throws IOException, InterruptedException;
+
+ /**
+ * @return the id of the container or null if there is no worker id right now.
+ */
+ public String getWorkerId() {
+ return _workerId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ContainerLauncher.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ContainerLauncher.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ContainerLauncher.java
new file mode 100644
index 0000000..55f167c
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ContainerLauncher.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.storm.Config;
+import org.apache.storm.container.ResourceIsolationInterface;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.messaging.IContext;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Launches containers
+ */
+public abstract class ContainerLauncher {
+ private static final Logger LOG = LoggerFactory.getLogger(ContainerLauncher.class);
+
+ /**
+ * Factory to create the right container launcher
+ * for the config and the environment.
+ * @param conf the config
+ * @param supervisorId the ID of the supervisor
+ * @param sharedContext Used in local mode to let workers talk together without netty
+ * @return the proper container launcher
+ * @throws IOException on any error
+ */
+ public static ContainerLauncher make(Map<String, Object> conf, String supervisorId, IContext sharedContext) throws IOException {
+ if (ConfigUtils.isLocalMode(conf)) {
+ return new LocalContainerLauncher(conf, supervisorId, sharedContext);
+ }
+
+ ResourceIsolationInterface resourceIsolationManager = null;
+ if (Utils.getBoolean(conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN_ENABLE), false)) {
+ resourceIsolationManager = Utils.newInstance((String) conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN));
+ resourceIsolationManager.prepare(conf);
+ LOG.info("Using resource isolation plugin {} {}", conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN), resourceIsolationManager);
+ }
+
+ if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
+ return new RunAsUserContainerLauncher(conf, supervisorId, resourceIsolationManager);
+ }
+ return new BasicContainerLauncher(conf, supervisorId, resourceIsolationManager);
+ }
+
+ protected ContainerLauncher() {
+ //Empty
+ }
+
+ /**
+ * Launch a container in a given slot
+ * @param port the port to run this on
+ * @param assignment what to launch
+ * @param state the current state of the supervisor
+ * @return The container that can be used to manager the processes.
+ * @throws IOException on any error
+ */
+ public abstract Container launchContainer(int port, LocalAssignment assignment, LocalState state) throws IOException;
+
+ /**
+ * Recover a container for a running process
+ * @param port the port the assignment is running on
+ * @param assignment the assignment that was launched
+ * @param state the current state of the supervisor
+ * @return The container that can be used to manage the processes.
+ * @throws IOException on any error
+ * @throws ContainerRecoveryException if the Container could not be recovered
+ */
+ public abstract Container recoverContainer(int port, LocalAssignment assignment, LocalState state) throws IOException, ContainerRecoveryException;
+
+ /**
+ * Try to recover a container using just the worker ID.
+ * The result is really only useful for killing the container
+ * and so is returning a Killable. Even if a Container is returned
+ * do not case the result to Container because only the Killable APIs
+ * are guaranteed to work.
+ * @param workerId the id of the worker to use
+ * @param localState the state of the running supervisor
+ * @return a Killable that can be used to kill the underlying container.
+ * @throws IOException on any error
+ * @throws ContainerRecoveryException if the Container could not be recovered
+ */
+ public abstract Killable recoverContainer(String workerId, LocalState localState) throws IOException, ContainerRecoveryException;
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ContainerRecoveryException.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ContainerRecoveryException.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ContainerRecoveryException.java
new file mode 100644
index 0000000..7ab6e67
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ContainerRecoveryException.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+/**
+ * Could not recover the container.
+ */
+public class ContainerRecoveryException extends RuntimeException {
+
+ public ContainerRecoveryException(String message) {
+ super(message);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ExitCodeCallback.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ExitCodeCallback.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ExitCodeCallback.java
new file mode 100644
index 0000000..082f205
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ExitCodeCallback.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+/**
+ * A callback that can accept an integer.
+ */
+public interface ExitCodeCallback {
+
+ /**
+ * The process finished
+ * @param exitCode the exit code of the finished process.
+ */
+ public void call(int exitCode);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Killable.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Killable.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Killable.java
new file mode 100644
index 0000000..8d6d8e0
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Killable.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import java.io.IOException;
+
+public interface Killable {
+
+ /**
+ * Kill the processes in this container nicely.
+ * kill -15 equivalent
+ * @throws IOException on any error
+ */
+ public void kill() throws IOException;
+
+ /**
+ * Kill the processes in this container violently.
+ * kill -9 equivalent
+ * @throws IOException on any error
+ */
+ public void forceKill() throws IOException;
+
+ /**
+ * @return true if all of the processes are dead, else false
+ * @throws IOException on any error
+ */
+ public boolean areAllProcessesDead() throws IOException;
+
+ /**
+ * Clean up the container. It is not coming back.
+ * by default do the same thing as when restarting.
+ * @throws IOException on any error
+ */
+ public void cleanUp() throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/daemon/supervisor/LocalContainer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/LocalContainer.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/LocalContainer.java
new file mode 100644
index 0000000..9b196d4
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/LocalContainer.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.storm.ProcessSimulator;
+import org.apache.storm.daemon.Shutdownable;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.generated.ProfileRequest;
+import org.apache.storm.messaging.IContext;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import clojure.java.api.Clojure;
+import clojure.lang.IFn;
+
+public class LocalContainer extends Container {
+ private static final Logger LOG = LoggerFactory.getLogger(LocalContainer.class);
+ private volatile boolean _isAlive = false;
+ private final IContext _sharedContext;
+
+ public LocalContainer(Map<String, Object> conf, String supervisorId, int port, LocalAssignment assignment, IContext sharedContext) throws IOException {
+ super(ContainerType.LAUNCH, conf, supervisorId, port, assignment, null, null, null, null);
+ _sharedContext = sharedContext;
+ _workerId = Utils.uuid();
+ }
+
+ @Override
+ public void launch() throws IOException {
+ //TODO when worker goes to java, just call it directly (not through clojure)
+ IFn mkWorker = Clojure.var("org.apache.storm.daemon.worker", "mk-worker");
+
+ Shutdownable worker = (Shutdownable) mkWorker.invoke(_conf, _sharedContext, _topologyId, _supervisorId, _port, _workerId);
+ saveWorkerUser(System.getProperty("user.name"));
+ ProcessSimulator.registerProcess(_workerId, worker);
+ _isAlive = true;
+ }
+
+ @Override
+ public void kill() throws IOException {
+ ProcessSimulator.killProcess(_workerId);
+ _isAlive = false;
+ //Make sure the worker is down before we try to shoot any child processes
+ super.kill();
+ }
+
+ @Override
+ public boolean areAllProcessesDead() throws IOException {
+ return !_isAlive && super.areAllProcessesDead();
+ }
+
+ @Override
+ public void relaunch() throws IOException {
+ LOG.warn("NOOP relaunch in local mode...");
+ }
+
+ @Override
+ public boolean didMainProcessExit() {
+ //In local mode the main process should never exit on it's own
+ return false;
+ }
+
+ @Override
+ public boolean runProfiling(ProfileRequest request, boolean stop) throws IOException, InterruptedException {
+ throw new RuntimeException("Profiling requests are not supported in local mode");
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/daemon/supervisor/LocalContainerLauncher.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/LocalContainerLauncher.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/LocalContainerLauncher.java
new file mode 100644
index 0000000..c25bc49
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/LocalContainerLauncher.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.messaging.IContext;
+import org.apache.storm.utils.LocalState;
+
+/**
+ * Launch Containers in local mode.
+ */
+public class LocalContainerLauncher extends ContainerLauncher {
+ private final Map<String, Object> _conf;
+ private final String _supervisorId;
+ private final IContext _sharedContext;
+
+ public LocalContainerLauncher(Map<String, Object> conf, String supervisorId, IContext sharedContext) {
+ _conf = conf;
+ _supervisorId = supervisorId;
+ _sharedContext = sharedContext;
+ }
+
+ @Override
+ public Container launchContainer(int port, LocalAssignment assignment, LocalState state) throws IOException {
+ LocalContainer ret = new LocalContainer(_conf, _supervisorId, port, assignment, _sharedContext);
+ ret.setup();
+ ret.launch();
+ return ret;
+ }
+
+ @Override
+ public Container recoverContainer(int port, LocalAssignment assignment, LocalState state) throws IOException {
+ //We are in the same process we cannot recover anything
+ throw new ContainerRecoveryException("Local Mode Recovery is not supported");
+ }
+
+ @Override
+ public Killable recoverContainer(String workerId, LocalState localState) throws IOException {
+ //We are in the same process we cannot recover anything
+ throw new ContainerRecoveryException("Local Mode Recovery is not supported");
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ReadClusterState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ReadClusterState.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ReadClusterState.java
new file mode 100644
index 0000000..27f18b6
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ReadClusterState.java
@@ -0,0 +1,327 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.storm.Config;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.cluster.VersionedData;
+import org.apache.storm.daemon.supervisor.Slot.MachineState;
+import org.apache.storm.daemon.supervisor.Slot.TopoProfileAction;
+import org.apache.storm.event.EventManager;
+import org.apache.storm.generated.Assignment;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.generated.NodeInfo;
+import org.apache.storm.generated.ProfileRequest;
+import org.apache.storm.generated.WorkerResources;
+import org.apache.storm.localizer.ILocalizer;
+import org.apache.storm.scheduler.ISupervisor;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReadClusterState implements Runnable, AutoCloseable {
+ private static final Logger LOG = LoggerFactory.getLogger(ReadClusterState.class);
+
+ private final Map<String, Object> superConf;
+ private final IStormClusterState stormClusterState;
+ private final EventManager syncSupEventManager;
+ private final AtomicReference<Map<String, VersionedData<Assignment>>> assignmentVersions;
+ private final Map<Integer, Slot> slots = new HashMap<>();
+ private final AtomicInteger readRetry = new AtomicInteger(0);
+ private final String assignmentId;
+ private final ISupervisor iSuper;
+ private final ILocalizer localizer;
+ private final ContainerLauncher launcher;
+ private final String host;
+ private final LocalState localState;
+ private final IStormClusterState clusterState;
+ private final AtomicReference<Map<Long, LocalAssignment>> cachedAssignments;
+
+ public ReadClusterState(Supervisor supervisor) throws Exception {
+ this.superConf = supervisor.getConf();
+ this.stormClusterState = supervisor.getStormClusterState();
+ this.syncSupEventManager = supervisor.getEventManger();
+ this.assignmentVersions = new AtomicReference<>(new HashMap<>());
+ this.assignmentId = supervisor.getAssignmentId();
+ this.iSuper = supervisor.getiSupervisor();
+ this.localizer = supervisor.getAsyncLocalizer();
+ this.host = supervisor.getHostName();
+ this.localState = supervisor.getLocalState();
+ this.clusterState = supervisor.getStormClusterState();
+ this.cachedAssignments = supervisor.getCurrAssignment();
+
+ this.launcher = ContainerLauncher.make(superConf, assignmentId, supervisor.getSharedContext());
+
+ @SuppressWarnings("unchecked")
+ List<Number> ports = (List<Number>)superConf.get(Config.SUPERVISOR_SLOTS_PORTS);
+ for (Number port: ports) {
+ slots.put(port.intValue(), mkSlot(port.intValue()));
+ }
+
+ try {
+ Collection<String> workers = SupervisorUtils.supervisorWorkerIds(superConf);
+ for (Slot slot: slots.values()) {
+ String workerId = slot.getWorkerId();
+ if (workerId != null) {
+ workers.remove(workerId);
+ }
+ }
+ if (!workers.isEmpty()) {
+ supervisor.killWorkers(workers, launcher);
+ }
+ } catch (Exception e) {
+ LOG.warn("Error trying to clean up old workers", e);
+ }
+
+ //All the slots/assignments should be recovered now, so we can clean up anything that we don't expect to be here
+ try {
+ localizer.cleanupUnusedTopologies();
+ } catch (Exception e) {
+ LOG.warn("Error trying to clean up old topologies", e);
+ }
+
+ for (Slot slot: slots.values()) {
+ slot.start();
+ }
+ }
+
+ private Slot mkSlot(int port) throws Exception {
+ return new Slot(localizer, superConf, launcher, host, port,
+ localState, clusterState, iSuper, cachedAssignments);
+ }
+
+ @Override
+ public synchronized void run() {
+ try {
+ Runnable syncCallback = new EventManagerPushCallback(this, syncSupEventManager);
+ List<String> stormIds = stormClusterState.assignments(syncCallback);
+ Map<String, VersionedData<Assignment>> assignmentsSnapshot =
+ getAssignmentsSnapshot(stormClusterState, stormIds, assignmentVersions.get(), syncCallback);
+
+ Map<Integer, LocalAssignment> allAssignments =
+ readAssignments(assignmentsSnapshot);
+ if (allAssignments == null) {
+ //Something odd happened try again later
+ return;
+ }
+ Map<String, List<ProfileRequest>> topoIdToProfilerActions = getProfileActions(stormClusterState, stormIds);
+
+ HashSet<Integer> assignedPorts = new HashSet<>();
+ LOG.debug("Synchronizing supervisor");
+ LOG.debug("All assignment: {}", allAssignments);
+ LOG.debug("Topology Ids -> Profiler Actions {}", topoIdToProfilerActions);
+ for (Integer port: allAssignments.keySet()) {
+ if (iSuper.confirmAssigned(port)) {
+ assignedPorts.add(port);
+ }
+ }
+ HashSet<Integer> allPorts = new HashSet<>(assignedPorts);
+ allPorts.addAll(slots.keySet());
+
+ Map<Integer, Set<TopoProfileAction>> filtered = new HashMap<>();
+ for (Entry<String, List<ProfileRequest>> entry: topoIdToProfilerActions.entrySet()) {
+ String topoId = entry.getKey();
+ if (entry.getValue() != null) {
+ for (ProfileRequest req: entry.getValue()) {
+ NodeInfo ni = req.get_nodeInfo();
+ if (host.equals(ni.get_node())) {
+ Long port = ni.get_port().iterator().next();
+ Set<TopoProfileAction> actions = filtered.get(port);
+ if (actions == null) {
+ actions = new HashSet<>();
+ filtered.put(port.intValue(), actions);
+ }
+ actions.add(new TopoProfileAction(topoId, req));
+ }
+ }
+ }
+ }
+
+ for (Integer port: allPorts) {
+ Slot slot = slots.get(port);
+ if (slot == null) {
+ slot = mkSlot(port);
+ slots.put(port, slot);
+ slot.start();
+ }
+ slot.setNewAssignment(allAssignments.get(port));
+ slot.addProfilerActions(filtered.get(port));
+ }
+
+ } catch (Exception e) {
+ LOG.error("Failed to Sync Supervisor", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ protected Map<String, VersionedData<Assignment>> getAssignmentsSnapshot(IStormClusterState stormClusterState, List<String> topoIds,
+ Map<String, VersionedData<Assignment>> localAssignmentVersion, Runnable callback) throws Exception {
+ Map<String, VersionedData<Assignment>> updateAssignmentVersion = new HashMap<>();
+ for (String topoId : topoIds) {
+ Integer recordedVersion = -1;
+ Integer version = stormClusterState.assignmentVersion(topoId, callback);
+ VersionedData<Assignment> locAssignment = localAssignmentVersion.get(topoId);
+ if (locAssignment != null) {
+ recordedVersion = locAssignment.getVersion();
+ }
+ if (version == null) {
+ // ignore
+ } else if (version == recordedVersion) {
+ updateAssignmentVersion.put(topoId, locAssignment);
+ } else {
+ VersionedData<Assignment> assignmentVersion = stormClusterState.assignmentInfoWithVersion(topoId, callback);
+ updateAssignmentVersion.put(topoId, assignmentVersion);
+ }
+ }
+ return updateAssignmentVersion;
+ }
+
+ protected Map<String, List<ProfileRequest>> getProfileActions(IStormClusterState stormClusterState, List<String> stormIds) throws Exception {
+ Map<String, List<ProfileRequest>> ret = new HashMap<String, List<ProfileRequest>>();
+ for (String stormId : stormIds) {
+ List<ProfileRequest> profileRequests = stormClusterState.getTopologyProfileRequests(stormId);
+ ret.put(stormId, profileRequests);
+ }
+ return ret;
+ }
+
+ protected Map<Integer, LocalAssignment> readAssignments(Map<String, VersionedData<Assignment>> assignmentsSnapshot) {
+ try {
+ Map<Integer, LocalAssignment> portLA = new HashMap<>();
+ for (Map.Entry<String, VersionedData<Assignment>> assignEntry : assignmentsSnapshot.entrySet()) {
+ String topoId = assignEntry.getKey();
+ Assignment assignment = assignEntry.getValue().getData();
+
+ Map<Integer, LocalAssignment> portTasks = readMyExecutors(topoId, assignmentId, assignment);
+
+ for (Map.Entry<Integer, LocalAssignment> entry : portTasks.entrySet()) {
+
+ Integer port = entry.getKey();
+
+ LocalAssignment la = entry.getValue();
+
+ if (!portLA.containsKey(port)) {
+ portLA.put(port, la);
+ } else {
+ throw new RuntimeException("Should not have multiple topologies assigned to one port "
+ + port + " " + la + " " + portLA);
+ }
+ }
+ }
+ readRetry.set(0);
+ return portLA;
+ } catch (RuntimeException e) {
+ if (readRetry.get() > 2) {
+ throw e;
+ } else {
+ readRetry.addAndGet(1);
+ }
+ LOG.warn("{} : retrying {} of 3", e.getMessage(), readRetry.get());
+ return null;
+ }
+ }
+
+ protected Map<Integer, LocalAssignment> readMyExecutors(String stormId, String assignmentId, Assignment assignment) {
+ Map<Integer, LocalAssignment> portTasks = new HashMap<>();
+ Map<Long, WorkerResources> slotsResources = new HashMap<>();
+ Map<NodeInfo, WorkerResources> nodeInfoWorkerResourcesMap = assignment.get_worker_resources();
+ if (nodeInfoWorkerResourcesMap != null) {
+ for (Map.Entry<NodeInfo, WorkerResources> entry : nodeInfoWorkerResourcesMap.entrySet()) {
+ if (entry.getKey().get_node().equals(assignmentId)) {
+ Set<Long> ports = entry.getKey().get_port();
+ for (Long port : ports) {
+ slotsResources.put(port, entry.getValue());
+ }
+ }
+ }
+ }
+ Map<List<Long>, NodeInfo> executorNodePort = assignment.get_executor_node_port();
+ if (executorNodePort != null) {
+ for (Map.Entry<List<Long>, NodeInfo> entry : executorNodePort.entrySet()) {
+ if (entry.getValue().get_node().equals(assignmentId)) {
+ for (Long port : entry.getValue().get_port()) {
+ LocalAssignment localAssignment = portTasks.get(port.intValue());
+ if (localAssignment == null) {
+ List<ExecutorInfo> executors = new ArrayList<>();
+ localAssignment = new LocalAssignment(stormId, executors);
+ if (slotsResources.containsKey(port)) {
+ localAssignment.set_resources(slotsResources.get(port));
+ }
+ portTasks.put(port.intValue(), localAssignment);
+ }
+ List<ExecutorInfo> executorInfoList = localAssignment.get_executors();
+ executorInfoList.add(new ExecutorInfo(entry.getKey().get(0).intValue(), entry.getKey().get(entry.getKey().size() - 1).intValue()));
+ }
+ }
+ }
+ }
+ return portTasks;
+ }
+
+ public synchronized void shutdownAllWorkers() {
+ for (Slot slot: slots.values()) {
+ slot.setNewAssignment(null);
+ }
+
+ for (Slot slot: slots.values()) {
+ try {
+ int count = 0;
+ while (slot.getMachineState() != MachineState.EMPTY) {
+ if (count > 10) {
+ LOG.warn("DONE waiting for {} to finish {}", slot, slot.getMachineState());
+ break;
+ }
+ if (Time.isSimulating()) {
+ Time.advanceTime(1000);
+ Thread.sleep(100);
+ } else {
+ Time.sleep(100);
+ }
+ count++;
+ }
+ } catch (Exception e) {
+ LOG.error("Error trying to shutdown workers in {}", slot, e);
+ }
+ }
+ }
+
+ @Override
+ public void close() {
+ for (Slot slot: slots.values()) {
+ try {
+ slot.close();
+ } catch (Exception e) {
+ LOG.error("Error trying to shutdown {}", slot, e);
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/daemon/supervisor/RunAsUserContainer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/RunAsUserContainer.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/RunAsUserContainer.java
new file mode 100644
index 0000000..a2d8991
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/RunAsUserContainer.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.storm.container.ResourceIsolationInterface;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RunAsUserContainer extends BasicContainer {
+ private static final Logger LOG = LoggerFactory.getLogger(RunAsUserContainer.class);
+
+ public RunAsUserContainer(ContainerType type, Map<String, Object> conf, String supervisorId, int port,
+ LocalAssignment assignment, ResourceIsolationInterface resourceIsolationManager, LocalState localState,
+ String workerId) throws IOException {
+ this(type, conf, supervisorId, port, assignment, resourceIsolationManager, localState, workerId, null, null, null);
+ }
+
+ RunAsUserContainer(ContainerType type, Map<String, Object> conf, String supervisorId, int port,
+ LocalAssignment assignment, ResourceIsolationInterface resourceIsolationManager, LocalState localState,
+ String workerId, Map<String, Object> topoConf, AdvancedFSOps ops, String profileCmd) throws IOException {
+ super(type, conf, supervisorId, port, assignment, resourceIsolationManager, localState, workerId, topoConf, ops,
+ profileCmd);
+ if (Utils.isOnWindows()) {
+ throw new UnsupportedOperationException("ERROR: Windows doesn't support running workers as different users yet");
+ }
+ }
+
+ private void signal(long pid, int signal) throws IOException {
+ List<String> commands = Arrays.asList("signal", String.valueOf(pid), String.valueOf(signal));
+ String user = getWorkerUser();
+ String logPrefix = "kill -"+signal+" " + pid;
+ SupervisorUtils.processLauncherAndWait(_conf, user, commands, null, logPrefix);
+ }
+
+ @Override
+ protected void kill(long pid) throws IOException {
+ signal(pid, 15);
+ }
+
+ @Override
+ protected void forceKill(long pid) throws IOException {
+ signal(pid, 9);
+ }
+
+ @Override
+ protected boolean runProfilingCommand(List<String> command, Map<String, String> env, String logPrefix, File targetDir) throws IOException, InterruptedException {
+ String user = this.getWorkerUser();
+ String td = targetDir.getAbsolutePath();
+ LOG.info("Running as user: {} command: {}", user, command);
+ String containerFile = Utils.containerFilePath(td);
+ if (Utils.checkFileExists(containerFile)) {
+ SupervisorUtils.rmrAsUser(_conf, containerFile, containerFile);
+ }
+ String scriptFile = Utils.scriptFilePath(td);
+ if (Utils.checkFileExists(scriptFile)) {
+ SupervisorUtils.rmrAsUser(_conf, scriptFile, scriptFile);
+ }
+ String script = Utils.writeScript(td, command, env);
+ List<String> args = Arrays.asList("profiler", td, script);
+ int ret = SupervisorUtils.processLauncherAndWait(_conf, user, args, env, logPrefix);
+ return ret == 0;
+ }
+
+ @Override
+ protected void launchWorkerProcess(List<String> command, Map<String, String> env,
+ String logPrefix, ExitCodeCallback processExitCallback, File targetDir) throws IOException {
+ String workerDir = targetDir.getAbsolutePath();
+ String user = this.getWorkerUser();
+ List<String> args = Arrays.asList("worker", workerDir, Utils.writeScript(workerDir, command, env));
+ List<String> commandPrefix = null;
+ if (_resourceIsolationManager != null) {
+ commandPrefix = _resourceIsolationManager.getLaunchCommandPrefix(_workerId);
+ }
+ SupervisorUtils.processLauncher(_conf, user, commandPrefix, args, null, logPrefix, processExitCallback, targetDir);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/daemon/supervisor/RunAsUserContainerLauncher.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/RunAsUserContainerLauncher.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/RunAsUserContainerLauncher.java
new file mode 100644
index 0000000..c8bee27
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/RunAsUserContainerLauncher.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.storm.container.ResourceIsolationInterface;
+import org.apache.storm.daemon.supervisor.Container.ContainerType;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.utils.LocalState;
+
+public class RunAsUserContainerLauncher extends ContainerLauncher {
+ private final Map<String, Object> _conf;
+ private final String _supervisorId;
+ protected final ResourceIsolationInterface _resourceIsolationManager;
+
+ public RunAsUserContainerLauncher(Map<String, Object> conf, String supervisorId, ResourceIsolationInterface resourceIsolationManager) throws IOException {
+ _conf = conf;
+ _supervisorId = supervisorId;
+ _resourceIsolationManager = resourceIsolationManager;
+ }
+
+ @Override
+ public Container launchContainer(int port, LocalAssignment assignment, LocalState state) throws IOException {
+ Container container = new RunAsUserContainer(ContainerType.LAUNCH, _conf, _supervisorId, port, assignment,
+ _resourceIsolationManager, state, null, null, null, null);
+ container.setup();
+ container.launch();
+ return container;
+ }
+
+ @Override
+ public Container recoverContainer(int port, LocalAssignment assignment, LocalState state) throws IOException {
+ return new RunAsUserContainer(ContainerType.RECOVER_FULL, _conf, _supervisorId, port, assignment,
+ _resourceIsolationManager, state, null, null, null, null);
+ }
+
+ @Override
+ public Killable recoverContainer(String workerId, LocalState localState) throws IOException {
+ return new RunAsUserContainer(ContainerType.RECOVER_PARTIAL, _conf, _supervisorId, -1, null,
+ _resourceIsolationManager, localState, workerId, null, null, null);
+ }
+
+}