You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/04/01 16:07:53 UTC

[30/35] storm git commit: update supervisor based on revans2 and longdafeng

http://git-wip-us.apache.org/repos/asf/storm/blob/dba69b52/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java
index 6b294f2,0000000..04467c2
mode 100644,000000..100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java
@@@ -1,221 -1,0 +1,214 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + * http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.storm.daemon.supervisor.timer;
 +
++import com.google.common.collect.Lists;
 +import org.apache.storm.Config;
 +import org.apache.storm.cluster.IStormClusterState;
 +import org.apache.storm.daemon.supervisor.SupervisorData;
 +import org.apache.storm.daemon.supervisor.SupervisorUtils;
 +import org.apache.storm.generated.ProfileAction;
 +import org.apache.storm.generated.ProfileRequest;
 +import org.apache.storm.utils.ConfigUtils;
 +import org.apache.storm.utils.Utils;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import java.io.BufferedReader;
 +import java.io.File;
 +import java.io.FileReader;
 +import java.io.IOException;
 +import java.util.*;
 +
 +public class RunProfilerActions implements Runnable {
 +    private static Logger LOG = LoggerFactory.getLogger(RunProfilerActions.class);
 +
 +    private Map conf;
 +    private IStormClusterState stormClusterState;
 +    private String hostName;
 +
 +    private String profileCmd;
 +
 +    private SupervisorData supervisorData;
 +
 +    private class ActionExitCallback implements Utils.ExitCodeCallable {
 +        private String stormId;
 +        private ProfileRequest profileRequest;
 +        private String logPrefix;
++        private boolean stop;
 +
-         public ActionExitCallback(String stormId, ProfileRequest profileRequest, String logPrefix) {
++        public ActionExitCallback(String stormId, ProfileRequest profileRequest, String logPrefix, boolean stop) {
 +            this.stormId = stormId;
 +            this.profileRequest = profileRequest;
 +            this.logPrefix = logPrefix;
++            this.stop = stop;
 +        }
 +
 +        @Override
 +        public Object call() throws Exception {
 +            return null;
 +        }
 +
 +        @Override
 +        public Object call(int exitCode) {
 +            LOG.info("{} profile-action exited for {}", logPrefix, exitCode);
 +            try {
-                 stormClusterState.deleteTopologyProfileRequests(stormId, profileRequest);
++                if (stop)
++                    stormClusterState.deleteTopologyProfileRequests(stormId, profileRequest);
 +            } catch (Exception e) {
 +                LOG.warn("failed delete profileRequest: " + profileRequest);
 +            }
 +            return null;
 +        }
 +    }
 +
 +    public RunProfilerActions(SupervisorData supervisorData) {
 +        this.conf = supervisorData.getConf();
 +        this.stormClusterState = supervisorData.getStormClusterState();
 +        this.hostName = supervisorData.getHostName();
 +        this.profileCmd = (String) (conf.get(Config.WORKER_PROFILER_COMMAND));
 +        this.supervisorData = supervisorData;
 +    }
 +
 +    @Override
 +    public void run() {
-         Map<String, List<ProfileRequest>> stormIdToActions = supervisorData.getStormIdToProfileActions().get();
++        Map<String, List<ProfileRequest>> stormIdToActions = supervisorData.getStormIdToProfilerActions().get();
 +        try {
 +            for (Map.Entry<String, List<ProfileRequest>> entry : stormIdToActions.entrySet()) {
 +                String stormId = entry.getKey();
 +                List<ProfileRequest> requests = entry.getValue();
 +                if (requests != null) {
 +                    for (ProfileRequest profileRequest : requests) {
 +                        if (profileRequest.get_nodeInfo().get_node().equals(hostName)) {
-                             boolean stop = System.currentTimeMillis() > profileRequest.get_time_stamp() ? true : false;
++                            boolean stop = System.currentTimeMillis() > profileRequest.get_time_stamp();
 +                            Long port = profileRequest.get_nodeInfo().get_port().iterator().next();
 +                            String targetDir = ConfigUtils.workerArtifactsRoot(conf, String.valueOf(port));
 +                            Map stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId);
 +
 +                            String user = null;
 +                            if (stormConf.get(Config.TOPOLOGY_SUBMITTER_USER) != null) {
 +                                user = (String) (stormConf.get(Config.TOPOLOGY_SUBMITTER_USER));
 +                            }
 +                            Map<String, String> env = null;
 +                            if (stormConf.get(Config.TOPOLOGY_ENVIRONMENT) != null) {
 +                                env = (Map<String, String>) stormConf.get(Config.TOPOLOGY_ENVIRONMENT);
 +                            } else {
 +                                env = new HashMap<String, String>();
 +                            }
 +
 +                            String str = ConfigUtils.workerArtifactsPidPath(conf, stormId, port.intValue());
 +                            StringBuilder stringBuilder = new StringBuilder();
-                             FileReader reader = null;
-                             BufferedReader br = null;
-                             try {
-                                 reader = new FileReader(str);
-                                 br = new BufferedReader(reader);
++
++                            try (FileReader reader = new FileReader(str);
++                                 BufferedReader br = new BufferedReader(reader)) {
 +                                int c;
 +                                while ((c = br.read()) >= 0) {
 +                                    stringBuilder.append(c);
 +                                }
-                             } catch (IOException e) {
-                                 if (reader != null)
-                                     reader.close();
-                                 if (br != null)
-                                     br.close();
 +                            }
 +                            String workerPid = stringBuilder.toString().trim();
 +                            ProfileAction profileAction = profileRequest.get_action();
 +                            String logPrefix = "ProfilerAction process " + stormId + ":" + port + " PROFILER_ACTION: " + profileAction + " ";
 +
 +                            // Until PROFILER_STOP action is invalid, keep launching profiler start in case worker restarted
 +                            // The profiler plugin script validates if JVM is recording before starting another recording.
-                             String command = mkCommand(profileAction, stop, workerPid, targetDir);
-                             List<String> listCommand = new ArrayList<>();
-                             if (command != null) {
-                                 listCommand.addAll(Arrays.asList(command.split(" ")));
-                             }
++                            List<String> command = mkCommand(profileAction, stop, workerPid, targetDir);
 +                            try {
-                                 ActionExitCallback actionExitCallback = new ActionExitCallback(stormId, profileRequest, logPrefix);
-                                 launchProfilerActionForWorker(user, targetDir, listCommand, env, actionExitCallback, logPrefix);
++                                ActionExitCallback actionExitCallback = new ActionExitCallback(stormId, profileRequest, logPrefix, stop);
++                                launchProfilerActionForWorker(user, targetDir, command, env, actionExitCallback, logPrefix);
 +                            } catch (IOException e) {
 +                                LOG.error("Error in processing ProfilerAction '{}' for {}:{}, will retry later", profileAction, stormId, port);
 +                            } catch (RuntimeException e) {
 +                                LOG.error("Error in processing ProfilerAction '{}' for {}:{}, will retry later", profileAction, stormId, port);
 +                            }
 +                        }
 +                    }
 +                }
 +            }
 +        } catch (Exception e) {
 +            LOG.error("Error running profiler actions, will retry again later");
 +        }
 +    }
 +
 +    private void launchProfilerActionForWorker(String user, String targetDir, List<String> commands, Map<String, String> environment,
 +            final Utils.ExitCodeCallable exitCodeCallable, String logPrefix) throws IOException {
 +        File targetFile = new File(targetDir);
 +        if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
 +            LOG.info("Running as user:{} command:{}", user, commands);
 +            String containerFile = Utils.containerFilePath(targetDir);
 +            if (Utils.checkFileExists(containerFile)) {
 +                SupervisorUtils.rmrAsUser(conf, containerFile, containerFile);
 +            }
 +            String scriptFile = Utils.scriptFilePath(targetDir);
 +            if (Utils.checkFileExists(scriptFile)) {
 +                SupervisorUtils.rmrAsUser(conf, scriptFile, scriptFile);
 +            }
 +            String script = Utils.writeScript(targetDir, commands, environment);
 +            List<String> args = new ArrayList<>();
 +            args.add("profiler");
 +            args.add(targetDir);
 +            args.add(script);
 +            SupervisorUtils.processLauncher(conf, user, null, args, environment, logPrefix, exitCodeCallable, targetFile);
 +        } else {
 +            Utils.launchProcess(commands, environment, logPrefix, exitCodeCallable, targetFile);
 +        }
 +    }
 +
-     private String mkCommand(ProfileAction action, boolean stop, String workerPid, String targetDir) {
++    private List<String> mkCommand(ProfileAction action, boolean stop, String workerPid, String targetDir) {
 +        if (action == ProfileAction.JMAP_DUMP) {
 +            return jmapDumpCmd(workerPid, targetDir);
 +        } else if (action == ProfileAction.JSTACK_DUMP) {
 +            return jstackDumpCmd(workerPid, targetDir);
 +        } else if (action == ProfileAction.JPROFILE_DUMP) {
 +            return jprofileDump(workerPid, targetDir);
 +        } else if (action == ProfileAction.JVM_RESTART) {
 +            return jprofileJvmRestart(workerPid);
 +        } else if (!stop && action == ProfileAction.JPROFILE_STOP) {
 +            return jprofileStart(workerPid);
 +        } else if (stop && action == ProfileAction.JPROFILE_STOP) {
 +            return jprofileStop(workerPid, targetDir);
 +        }
-         return null;
++        return Lists.newArrayList();
 +    }
 +
-     private String jmapDumpCmd(String pid, String targetDir) {
-         return profileCmd + " " + pid + " jmap " + targetDir;
++    private List<String> jmapDumpCmd(String pid, String targetDir) {
++        return Lists.newArrayList(profileCmd, pid, "jmap", targetDir);
 +    }
 +
-     private String jstackDumpCmd(String pid, String targetDir) {
-         return profileCmd + " " + pid + " jstack " + targetDir;
++    private List<String> jstackDumpCmd(String pid, String targetDir) {
++        return Lists.newArrayList(profileCmd, pid, "jstack", targetDir);
 +    }
 +
-     private String jprofileStart(String pid) {
-         return profileCmd + " " + pid + " start";
++    private List<String> jprofileStart(String pid) {
++        return Lists.newArrayList(profileCmd, pid, "start");
 +    }
 +
-     private String jprofileStop(String pid, String targetDir) {
-         return profileCmd + " " + pid + " stop " + targetDir;
++    private List<String> jprofileStop(String pid, String targetDir) {
++        return Lists.newArrayList(profileCmd, pid, "stop", targetDir);
 +    }
 +
-     private String jprofileDump(String pid, String targetDir) {
-         return profileCmd + " " + pid + " dump " + targetDir;
++    private List<String> jprofileDump(String pid, String targetDir) {
++        return Lists.newArrayList(profileCmd, pid, "dump", targetDir);
 +    }
 +
-     private String jprofileJvmRestart(String pid) {
-         return profileCmd + " " + pid + " kill";
++    private List<String> jprofileJvmRestart(String pid) {
++        return Lists.newArrayList(profileCmd, pid, "kill");
 +    }
 +
 +}

http://git-wip-us.apache.org/repos/asf/storm/blob/dba69b52/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java
index 5e7b6d3,0000000..3ce8f5d
mode 100644,000000..100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java
@@@ -1,62 -1,0 +1,52 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + * http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.storm.daemon.supervisor.timer;
 +
 +import org.apache.storm.command.HealthCheck;
 +import org.apache.storm.daemon.supervisor.SupervisorData;
 +import org.apache.storm.daemon.supervisor.SupervisorUtils;
 +import org.apache.storm.daemon.supervisor.workermanager.IWorkerManager;
 +import org.apache.storm.utils.Utils;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import java.util.Collection;
 +import java.util.Map;
 +
 +public class SupervisorHealthCheck implements Runnable {
 +
 +    private static final Logger LOG = LoggerFactory.getLogger(SupervisorHealthCheck.class);
 +
 +    private SupervisorData supervisorData;
 +
 +    public SupervisorHealthCheck(SupervisorData supervisorData) {
 +        this.supervisorData = supervisorData;
 +    }
 +
 +    @Override
 +    public void run() {
 +        Map conf = supervisorData.getConf();
 +        IWorkerManager workerManager = supervisorData.getWorkerManager();
 +        int healthCode = HealthCheck.healthCheck(conf);
-         Collection<String> workerIds = SupervisorUtils.supervisorWorkerIds(conf);
 +        if (healthCode != 0) {
-             for (String workerId : workerIds) {
-                 try {
-                     workerManager.shutdownWorker(supervisorData.getSupervisorId(), workerId, supervisorData.getWorkerThreadPids());
-                     boolean success = workerManager.cleanupWorker(workerId);
-                     if (success){
-                         supervisorData.getDeadWorkers().remove(workerId);
-                     }
-                 } catch (Exception e) {
-                     throw Utils.wrapInRuntime(e);
-                 }
-             }
++            SupervisorUtils.shutdownAllWorkers(conf, supervisorData.getSupervisorId(), supervisorData.getWorkerThreadPids(), supervisorData.getDeadWorkers(),
++                    workerManager);
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/storm/blob/dba69b52/storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/DefaultWorkerManager.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/DefaultWorkerManager.java
index 9529b1a,0000000..05ed82b
mode 100644,000000..100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/DefaultWorkerManager.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/DefaultWorkerManager.java
@@@ -1,408 -1,0 +1,401 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + * http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.storm.daemon.supervisor.workermanager;
 +
 +import org.apache.commons.lang.StringUtils;
 +import org.apache.storm.Config;
 +import org.apache.storm.ProcessSimulator;
 +import org.apache.storm.container.cgroup.CgroupManager;
 +import org.apache.storm.daemon.supervisor.SupervisorUtils;
 +import org.apache.storm.generated.WorkerResources;
 +import org.apache.storm.localizer.Localizer;
 +import org.apache.storm.utils.ConfigUtils;
 +import org.apache.storm.utils.Time;
 +import org.apache.storm.utils.Utils;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import java.io.File;
 +import java.io.IOException;
 +import java.util.*;
 +
 +public class DefaultWorkerManager implements IWorkerManager {
 +
 +    private static Logger LOG = LoggerFactory.getLogger(DefaultWorkerManager.class);
 +
 +    private Map conf;
 +    private CgroupManager resourceIsolationManager;
 +    private boolean runWorkerAsUser;
 +
 +    @Override
 +    public void prepareWorker(Map conf, Localizer localizer) {
 +        this.conf = conf;
 +        if (Utils.getBoolean(conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN_ENABLE), false)) {
 +            try {
 +                this.resourceIsolationManager = Utils.newInstance((String) conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN));
 +                this.resourceIsolationManager.prepare(conf);
 +                LOG.info("Using resource isolation plugin {} {}", conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN), resourceIsolationManager);
 +            } catch (IOException e) {
 +                throw Utils.wrapInRuntime(e);
 +            }
 +        } else {
 +            this.resourceIsolationManager = null;
 +        }
 +        this.runWorkerAsUser = Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false);
 +    }
 +
 +    @Override
-     public IWorkerResult launchWorker(String supervisorId, String assignmentId, String stormId, Long port, String workerId, WorkerResources resources,
++    public void launchWorker(String supervisorId, String assignmentId, String stormId, Long port, String workerId, WorkerResources resources,
 +            Utils.ExitCodeCallable workerExitCallback) {
 +        try {
 +
 +            String stormHome = ConfigUtils.concatIfNotNull(System.getProperty("storm.home"));
 +            String stormOptions = ConfigUtils.concatIfNotNull(System.getProperty("storm.options"));
 +            String stormConfFile = ConfigUtils.concatIfNotNull(System.getProperty("storm.conf.file"));
 +            String workerTmpDir = ConfigUtils.workerTmpRoot(conf, workerId);
 +
 +            String stormLogDir = ConfigUtils.getLogDir();
 +            String stormLogConfDir = (String) (conf.get(Config.STORM_LOG4J2_CONF_DIR));
 +
 +            String stormLog4j2ConfDir;
 +            if (StringUtils.isNotBlank(stormLogConfDir)) {
 +                if (Utils.isAbsolutePath(stormLogConfDir)) {
 +                    stormLog4j2ConfDir = stormLogConfDir;
 +                } else {
 +                    stormLog4j2ConfDir = stormHome + Utils.FILE_PATH_SEPARATOR + stormLogConfDir;
 +                }
 +            } else {
 +                stormLog4j2ConfDir = stormHome + Utils.FILE_PATH_SEPARATOR + "log4j2";
 +            }
 +
 +            String stormRoot = ConfigUtils.supervisorStormDistRoot(conf, stormId);
 +
 +            String jlp = jlp(stormRoot, conf);
 +
 +            String stormJar = ConfigUtils.supervisorStormJarPath(stormRoot);
 +
 +            Map stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId);
 +
 +            String workerClassPath = getWorkerClassPath(stormJar, stormConf);
 +
 +            Object topGcOptsObject = stormConf.get(Config.TOPOLOGY_WORKER_GC_CHILDOPTS);
 +            List<String> topGcOpts = new ArrayList<>();
 +            if (topGcOptsObject instanceof String) {
 +                topGcOpts.add((String) topGcOptsObject);
 +            } else if (topGcOptsObject instanceof List) {
 +                topGcOpts.addAll((List<String>) topGcOptsObject);
 +            }
 +
 +            int memOnheap = 0;
 +            if (resources.get_mem_on_heap() > 0) {
 +                memOnheap = (int) Math.ceil(resources.get_mem_on_heap());
 +            } else {
 +                // set the default heap memory size for supervisor-test
 +                memOnheap = Utils.getInt(stormConf.get(Config.WORKER_HEAP_MEMORY_MB), 768);
 +            }
 +
 +            int memoffheap = (int) Math.ceil(resources.get_mem_off_heap());
 +
 +            int cpu = (int) Math.ceil(resources.get_cpu());
 +
 +            List<String> gcOpts = null;
 +
 +            if (topGcOpts.size() > 0) {
 +                gcOpts = substituteChildopts(topGcOpts, workerId, stormId, port, memOnheap);
 +            } else {
 +                gcOpts = substituteChildopts(conf.get(Config.WORKER_GC_CHILDOPTS), workerId, stormId, port, memOnheap);
 +            }
 +
 +            Object topoWorkerLogwriterObject = stormConf.get(Config.TOPOLOGY_WORKER_LOGWRITER_CHILDOPTS);
 +            List<String> topoWorkerLogwriterChildopts = new ArrayList<>();
 +            if (topoWorkerLogwriterObject instanceof String) {
 +                topoWorkerLogwriterChildopts.add((String) topoWorkerLogwriterObject);
 +            } else if (topoWorkerLogwriterObject instanceof List) {
 +                topoWorkerLogwriterChildopts.addAll((List<String>) topoWorkerLogwriterObject);
 +            }
 +
 +            String user = (String) stormConf.get(Config.TOPOLOGY_SUBMITTER_USER);
 +
 +            String logfileName = "worker.log";
 +
 +            String workersArtifacets = ConfigUtils.workerArtifactsRoot(conf);
 +
 +            String loggingSensitivity = (String) stormConf.get(Config.TOPOLOGY_LOGGING_SENSITIVITY);
 +            if (loggingSensitivity == null) {
 +                loggingSensitivity = "S3";
 +            }
 +
 +            List<String> workerChildopts = substituteChildopts(conf.get(Config.WORKER_CHILDOPTS), workerId, stormId, port, memOnheap);
 +
 +            List<String> topWorkerChildopts = substituteChildopts(stormConf.get(Config.TOPOLOGY_WORKER_CHILDOPTS), workerId, stormId, port, memOnheap);
 +
 +            List<String> workerProfilerChildopts = null;
 +            if (Utils.getBoolean(conf.get(Config.WORKER_PROFILER_ENABLED), false)) {
 +                workerProfilerChildopts = substituteChildopts(conf.get(Config.WORKER_PROFILER_CHILDOPTS), workerId, stormId, port, memOnheap);
 +            } else {
 +                workerProfilerChildopts = new ArrayList<>();
 +            }
 +
 +            Map<String, String> topEnvironment = new HashMap<String, String>();
 +            Map<String, String> environment = (Map<String, String>) stormConf.get(Config.TOPOLOGY_ENVIRONMENT);
 +            if (environment != null) {
 +                topEnvironment.putAll(environment);
 +            }
 +            topEnvironment.put("LD_LIBRARY_PATH", jlp);
 +
 +            String log4jConfigurationFile = null;
 +            if (System.getProperty("os.name").startsWith("Windows") && !stormLog4j2ConfDir.startsWith("file:")) {
 +                log4jConfigurationFile = "file:///" + stormLog4j2ConfDir;
 +            } else {
 +                log4jConfigurationFile = stormLog4j2ConfDir;
 +            }
 +            log4jConfigurationFile = log4jConfigurationFile + Utils.FILE_PATH_SEPARATOR + "worker.xml";
 +
 +            List<String> commandList = new ArrayList<>();
 +            commandList.add(SupervisorUtils.javaCmd("java"));
 +            commandList.add("-cp");
 +            commandList.add(workerClassPath);
 +            commandList.addAll(topoWorkerLogwriterChildopts);
 +            commandList.add("-Dlogfile.name=" + logfileName);
 +            commandList.add("-Dstorm.home=" + stormHome);
 +            commandList.add("-Dworkers.artifacts=" + workersArtifacets);
 +            commandList.add("-Dstorm.id=" + stormId);
 +            commandList.add("-Dworker.id=" + workerId);
 +            commandList.add("-Dworker.port=" + port);
 +            commandList.add("-Dstorm.log.dir=" + stormLogDir);
 +            commandList.add("-Dlog4j.configurationFile=" + log4jConfigurationFile);
 +            commandList.add("-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector");
 +            commandList.add("org.apache.storm.LogWriter");
 +
 +            commandList.add(SupervisorUtils.javaCmd("java"));
 +            commandList.add("-server");
 +            commandList.addAll(workerChildopts);
 +            commandList.addAll(topWorkerChildopts);
 +            commandList.addAll(gcOpts);
 +            commandList.addAll(workerProfilerChildopts);
 +            commandList.add("-Djava.library.path=" + jlp);
 +            commandList.add("-Dlogfile.name=" + logfileName);
 +            commandList.add("-Dstorm.home=" + stormHome);
 +            commandList.add("-Dworkers.artifacts=" + workersArtifacets);
 +            commandList.add("-Dstorm.conf.file=" + stormConfFile);
 +            commandList.add("-Dstorm.options=" + stormOptions);
 +            commandList.add("-Dstorm.log.dir=" + stormLogDir);
 +            commandList.add("-Djava.io.tmpdir=" + workerTmpDir);
 +            commandList.add("-Dlogging.sensitivity=" + loggingSensitivity);
 +            commandList.add("-Dlog4j.configurationFile=" + log4jConfigurationFile);
 +            commandList.add("-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector");
 +            commandList.add("-Dstorm.id=" + stormId);
 +            commandList.add("-Dworker.id=" + workerId);
 +            commandList.add("-Dworker.port=" + port);
 +            commandList.add("-cp");
 +            commandList.add(workerClassPath);
 +            commandList.add("org.apache.storm.daemon.worker");
 +            commandList.add(stormId);
 +            commandList.add(assignmentId);
 +            commandList.add(String.valueOf(port));
 +            commandList.add(workerId);
 +
 +            // {"cpu" cpu "memory" (+ mem-onheap mem-offheap (int (Math/ceil (conf STORM-CGROUP-MEMORY-LIMIT-TOLERANCE-MARGIN-MB))))
 +            if (resourceIsolationManager != null) {
 +                int cGroupMem = (int) (Math.ceil((double) conf.get(Config.STORM_CGROUP_MEMORY_LIMIT_TOLERANCE_MARGIN_MB)));
 +                int memoryValue = memoffheap + memOnheap + cGroupMem;
 +                int cpuValue = cpu;
 +                Map<String, Number> map = new HashMap<>();
 +                map.put("cpu", cpuValue);
 +                map.put("memory", memoryValue);
 +                resourceIsolationManager.reserveResourcesForWorker(workerId, map);
 +                commandList = resourceIsolationManager.getLaunchCommand(workerId, commandList);
 +            }
 +
 +            LOG.info("Launching worker with command: {}. ", Utils.shellCmd(commandList));
 +
 +            String logPrefix = "Worker Process " + workerId;
 +            String workerDir = ConfigUtils.workerRoot(conf, workerId);
 +
 +            if (runWorkerAsUser) {
 +                List<String> args = new ArrayList<>();
 +                args.add("worker");
 +                args.add(workerDir);
 +                args.add(Utils.writeScript(workerDir, commandList, topEnvironment));
 +                List<String> commandPrefix = null;
 +                if (resourceIsolationManager != null)
 +                    commandPrefix = resourceIsolationManager.getLaunchCommandPrefix(workerId);
 +                SupervisorUtils.processLauncher(conf, user, commandPrefix, args, null, logPrefix, workerExitCallback, new File(workerDir));
 +            } else {
 +                Utils.launchProcess(commandList, topEnvironment, logPrefix, workerExitCallback, new File(workerDir));
 +            }
 +        } catch (IOException e) {
 +            throw Utils.wrapInRuntime(e);
 +        }
-         return null;
 +    }
 +
 +    @Override
-     public IWorkerResult shutdownWorker(String supervisorId, String workerId, Map<String, String> workerThreadPids) {
++    public void shutdownWorker(String supervisorId, String workerId, Map<String, String> workerThreadPids) {
 +        try {
 +            LOG.info("Shutting down {}:{}", supervisorId, workerId);
 +            Collection<String> pids = Utils.readDirContents(ConfigUtils.workerPidsRoot(conf, workerId));
 +            Integer shutdownSleepSecs = Utils.getInt(conf.get(Config.SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS));
 +            String user = ConfigUtils.getWorkerUser(conf, workerId);
 +            String threadPid = workerThreadPids.get(workerId);
 +            if (StringUtils.isNotBlank(threadPid)) {
 +                ProcessSimulator.killProcess(threadPid);
 +            }
 +
 +            for (String pid : pids) {
 +                if (runWorkerAsUser) {
 +                    List<String> commands = new ArrayList<>();
 +                    commands.add("signal");
 +                    commands.add(pid);
 +                    commands.add("15");
 +                    String logPrefix = "kill -15 " + pid;
 +                    SupervisorUtils.processLauncherAndWait(conf, user, commands, null, logPrefix);
 +                } else {
 +                    Utils.killProcessWithSigTerm(pid);
 +                }
 +            }
 +
 +            if (pids.size() > 0) {
 +                LOG.info("Sleep {} seconds for execution of cleanup threads on worker.", shutdownSleepSecs);
 +                Time.sleepSecs(shutdownSleepSecs);
 +            }
 +
 +            for (String pid : pids) {
 +                if (runWorkerAsUser) {
 +                    List<String> commands = new ArrayList<>();
 +                    commands.add("signal");
 +                    commands.add(pid);
 +                    commands.add("9");
 +                    String logPrefix = "kill -9 " + pid;
 +                    SupervisorUtils.processLauncherAndWait(conf, user, commands, null, logPrefix);
 +                } else {
 +                    Utils.forceKillProcess(pid);
 +                }
 +                String path = ConfigUtils.workerPidPath(conf, workerId, pid);
 +                if (runWorkerAsUser) {
 +                    SupervisorUtils.rmrAsUser(conf, workerId, path);
 +                } else {
 +                    try {
 +                        LOG.debug("Removing path {}", path);
 +                        new File(path).delete();
 +                    } catch (Exception e) {
 +                        // on windows, the supervisor may still holds the lock on the worker directory
 +                        // ignore
 +                    }
 +                }
 +            }
 +            LOG.info("Shut down {}:{}", supervisorId, workerId);
 +        } catch (Exception e) {
 +            throw Utils.wrapInRuntime(e);
 +        }
-         return null;
 +    }
 +
 +    @Override
 +    public boolean cleanupWorker(String workerId) {
 +        try {
 +            //clean up for resource isolation if enabled
 +            if (resourceIsolationManager != null) {
 +                resourceIsolationManager.releaseResourcesForWorker(workerId);
 +            }
 +            //Always make sure to clean up everything else before worker directory
 +            //is removed since that is what is going to trigger the retry for cleanup
 +            String workerRoot = ConfigUtils.workerRoot(conf, workerId);
 +            if (Utils.checkFileExists(workerRoot)) {
 +                if (runWorkerAsUser) {
 +                    SupervisorUtils.rmrAsUser(conf, workerId, workerRoot);
 +                } else {
 +                    Utils.forceDelete(ConfigUtils.workerHeartbeatsRoot(conf, workerId));
 +                    Utils.forceDelete(ConfigUtils.workerPidsRoot(conf, workerId));
 +                    Utils.forceDelete(ConfigUtils.workerTmpRoot(conf, workerId));
 +                    Utils.forceDelete(ConfigUtils.workerRoot(conf, workerId));
 +                }
 +                ConfigUtils.removeWorkerUserWSE(conf, workerId);
 +            }
 +            return true;
 +        } catch (IOException e) {
 +            LOG.warn("Failed to cleanup worker {}. Will retry later", workerId, e);
 +        } catch (RuntimeException e) {
 +            LOG.warn("Failed to cleanup worker {}. Will retry later", workerId, e);
 +        }
 +        return false;
 +    }
 +
-     @Override
-     public IWorkerResult resizeWorker(String supervisorId, String assignmentId, String stormId, Long port, String workerId, WorkerResources resources) {
-         return null;
-     }
- 
 +    protected String jlp(String stormRoot, Map conf) {
 +        String resourceRoot = stormRoot + Utils.FILE_PATH_SEPARATOR + ConfigUtils.RESOURCES_SUBDIR;
 +        String os = System.getProperty("os.name").replaceAll("\\s+", "_");
 +        String arch = System.getProperty("os.arch");
 +        String archResourceRoot = resourceRoot + Utils.FILE_PATH_SEPARATOR + os + "-" + arch;
 +        String ret = archResourceRoot + Utils.CLASS_PATH_SEPARATOR + resourceRoot + Utils.CLASS_PATH_SEPARATOR + conf.get(Config.JAVA_LIBRARY_PATH);
 +        return ret;
 +    }
 +
 +    protected String getWorkerClassPath(String stormJar, Map stormConf) {
 +        List<String> topoClasspath = new ArrayList<>();
 +        Object object = stormConf.get(Config.TOPOLOGY_CLASSPATH);
 +
 +        if (object instanceof List) {
 +            topoClasspath.addAll((List<String>) object);
 +        } else if (object instanceof String) {
 +            topoClasspath.add((String) object);
-         } else {
-             LOG.error("topology specific classpath is invaild");
 +        }
++        LOG.debug("topology specific classpath is {}", object);
++
 +        String classPath = Utils.workerClasspath();
 +        String classAddPath = Utils.addToClasspath(classPath, Arrays.asList(stormJar));
 +        return Utils.addToClasspath(classAddPath, topoClasspath);
 +    }
 +
++    private static String substituteChildOptsInternal(String string,  String workerId, String stormId, Long port, int memOnheap) {
++        if (StringUtils.isNotBlank(string)){
++            string = string.replace("%ID%", String.valueOf(port));
++            string = string.replace("%WORKER-ID%", workerId);
++            string = string.replace("%TOPOLOGY-ID%", stormId);
++            string = string.replace("%WORKER-PORT%", String.valueOf(port));
++            string = string.replace("%HEAP-MEM%", String.valueOf(memOnheap));
++        }
++        return string;
++    }
++
 +    /**
 +     * "Generates runtime childopts by replacing keys with topology-id, worker-id, port, mem-onheap"
 +     *
 +     * @param value
 +     * @param workerId
 +     * @param stormId
 +     * @param port
 +     * @param memOnheap
 +     */
 +    public List<String> substituteChildopts(Object value, String workerId, String stormId, Long port, int memOnheap) {
 +        List<String> rets = new ArrayList<>();
 +        if (value instanceof String) {
-             String string = (String) value;
++            String string = substituteChildOptsInternal((String) value,  workerId, stormId, port, memOnheap);
 +            if (StringUtils.isNotBlank(string)){
-                 string = string.replace("%ID%", String.valueOf(port));
-                 string = string.replace("%WORKER-ID%", workerId);
-                 string = string.replace("%TOPOLOGY-ID%", stormId);
-                 string = string.replace("%WORKER-PORT%", String.valueOf(port));
-                 string = string.replace("%HEAP-MEM%", String.valueOf(memOnheap));
 +                String[] strings = string.split("\\s+");
 +                rets.addAll(Arrays.asList(strings));
 +            }
- 
 +        } else if (value instanceof List) {
 +            List<Object> objects = (List<Object>) value;
 +            for (Object object : objects) {
-                 String str = (String) object;
++                String str = substituteChildOptsInternal((String) object,  workerId, stormId, port, memOnheap);
 +                if (StringUtils.isNotBlank(str)){
-                     str = str.replace("%ID%", String.valueOf(port));
-                     str = str.replace("%WORKER-ID%", workerId);
-                     str = str.replace("%TOPOLOGY-ID%", stormId);
-                     str = str.replace("%WORKER-PORT%", String.valueOf(port));
-                     str = str.replace("%HEAP-MEM%", String.valueOf(memOnheap));
 +                    rets.add(str);
 +                }
 +            }
 +        }
 +        return rets;
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/storm/blob/dba69b52/storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/IWorkerManager.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/IWorkerManager.java
index 3b0912a,0000000..e62b9d8
mode 100644,000000..100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/IWorkerManager.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/IWorkerManager.java
@@@ -1,38 -1,0 +1,35 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + * http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.storm.daemon.supervisor.workermanager;
 +
 +import org.apache.storm.generated.WorkerResources;
 +import org.apache.storm.localizer.Localizer;
 +import org.apache.storm.utils.Utils;
 +
 +import java.util.List;
 +import java.util.Map;
 +
 +public interface IWorkerManager {
-     public void prepareWorker(Map conf, Localizer localizer);
++    void prepareWorker(Map conf, Localizer localizer);
 +
-     IWorkerResult launchWorker(String supervisorId, String assignmentId, String stormId, Long port, String workerId, WorkerResources resources,
++    void launchWorker(String supervisorId, String assignmentId, String stormId, Long port, String workerId, WorkerResources resources,
 +                               Utils.ExitCodeCallable workerExitCallback);
++    void shutdownWorker(String supervisorId, String workerId, Map<String, String> workerThreadPids);
 +
-     IWorkerResult shutdownWorker(String supervisorId, String workerId, Map<String, String> workerThreadPids);
- 
-     IWorkerResult resizeWorker(String supervisorId, String assignmentId, String stormId, Long port, String workerId, WorkerResources resources);
- 
-     public boolean cleanupWorker(String workerId);
++    boolean cleanupWorker(String workerId);
 +}