You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/04/01 16:07:44 UTC

[21/35] storm git commit: Merge branch 'master' into supervisor and update supervisor based STORM-1631&STORM-1636

Merge branch 'master' into supervisor and update supervisor based STORM-1631&STORM-1636


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f03b8bec
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f03b8bec
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f03b8bec

Branch: refs/heads/master
Commit: f03b8bec105e88282211bf3e7dd4be4aeed484d8
Parents: 42928c2 2886737
Author: xiaojian.fxj <xi...@alibaba-inc.com>
Authored: Wed Mar 23 13:53:00 2016 +0800
Committer: xiaojian.fxj <xi...@alibaba-inc.com>
Committed: Wed Mar 23 14:40:22 2016 +0800

----------------------------------------------------------------------
 .gitignore                                      |    1 +
 CHANGELOG.md                                    |   18 +-
 README.markdown                                 |    4 +-
 docs/Acking-framework-implementation.md         |   36 +
 docs/Clojure-DSL.md                             |  266 +
 docs/Command-line-client.md                     |  104 +
 docs/Common-patterns.md                         |  100 +
 docs/Concepts.md                                |  115 +
 docs/Configuration.md                           |   31 +
 docs/Contributing-to-Storm.md                   |   33 +
 docs/Creating-a-new-Storm-project.md            |   25 +
 docs/DSLs-and-multilang-adapters.md             |   10 +
 docs/Daemon-Fault-Tolerance.md                  |   30 +
 ...Defining-a-non-jvm-language-dsl-for-storm.md |   38 +
 docs/Distributed-RPC.md                         |  199 +
 docs/Documentation.md                           |   50 +
 docs/FAQ.md                                     |  127 +
 docs/Fault-tolerance.md                         |   28 +
 docs/Guaranteeing-message-processing.md         |  181 +
 docs/Hooks.md                                   |    9 +
 docs/Implementation-docs.md                     |   13 +
 docs/Installing-native-dependencies.md          |   38 +
 docs/Kestrel-and-Storm.md                       |  200 +
 docs/Lifecycle-of-a-topology.md                 |   82 +
 docs/Local-mode.md                              |   29 +
 docs/Logs.md                                    |   30 +
 docs/Maven.md                                   |   22 +
 docs/Message-passing-implementation.md          |   30 +
 docs/Metrics.md                                 |   36 +
 docs/Multilang-protocol.md                      |  287 +
 docs/Pacemaker.md                               |  113 +
 docs/Powered-By.md                              | 1028 +++
 docs/Project-ideas.md                           |    6 +
 docs/README.md                                  |   61 +
 docs/Rationale.md                               |   33 +
 docs/Resource_Aware_Scheduler_overview.md       |  232 +
 ...unning-topologies-on-a-production-cluster.md |   77 +
 docs/SECURITY.md                                |  478 ++
 docs/STORM-UI-REST-API.md                       | 1017 +++
 docs/Serialization-(prior-to-0.6.0).md          |   50 +
 docs/Serialization.md                           |   62 +
 docs/Serializers.md                             |    4 +
 docs/Setting-up-a-Storm-cluster.md              |  117 +
 docs/Setting-up-a-Storm-project-in-Eclipse.md   |    1 +
 docs/Setting-up-development-environment.md      |   33 +
 docs/Spout-implementations.md                   |   10 +
 docs/State-checkpointing.md                     |  160 +
 ...guage-protocol-(versions-0.7.0-and-below).md |  122 +
 docs/Structure-of-the-codebase.md               |  134 +
 docs/Support-for-non-java-languages.md          |    9 +
 docs/Transactional-topologies.md                |  361 +
 docs/Trident-API-Overview.md                    |  525 ++
 docs/Trident-spouts.md                          |   44 +
 docs/Trident-state.md                           |  331 +
 docs/Trident-tutorial.md                        |  254 +
 docs/Troubleshooting.md                         |  182 +
 docs/Tutorial.md                                |  320 +
 ...nding-the-parallelism-of-a-Storm-topology.md |  123 +
 docs/Using-non-JVM-languages-with-Storm.md      |   53 +
 docs/Windowing.md                               |  239 +
 docs/_config.yml                                |   18 +
 docs/_includes/footer.html                      |   55 +
 docs/_includes/head.html                        |   34 +
 docs/_includes/header.html                      |   59 +
 docs/_layouts/about.html                        |   43 +
 docs/_layouts/default.html                      |   18 +
 docs/_layouts/documentation.html                |    9 +
 docs/_layouts/page.html                         |    5 +
 docs/_layouts/post.html                         |   61 +
 docs/_plugins/releases.rb                       |   84 +
 docs/_sass/_syntax-highlighting.scss            |   70 +
 docs/assets/css/bootstrap-theme.css             |  470 ++
 docs/assets/css/bootstrap-theme.css.map         |    1 +
 docs/assets/css/bootstrap-theme.min.css         |    5 +
 docs/assets/css/bootstrap.css                   | 6800 ++++++++++++++++++
 docs/assets/css/bootstrap.css.map               |    1 +
 docs/assets/css/bootstrap.min.css               |    5 +
 docs/assets/css/font-awesome.min.css            |    4 +
 docs/assets/css/main.scss                       |   48 +
 docs/assets/css/owl.carousel.css                |   71 +
 docs/assets/css/owl.theme.css                   |   79 +
 docs/assets/css/style.css                       |  503 ++
 docs/assets/css/theme.css                       |   18 +
 docs/assets/favicon.ico                         |  Bin 0 -> 1150 bytes
 .../fonts/glyphicons-halflings-regular.eot      |  Bin 0 -> 20335 bytes
 .../fonts/glyphicons-halflings-regular.svg      |  229 +
 .../fonts/glyphicons-halflings-regular.ttf      |  Bin 0 -> 41280 bytes
 .../fonts/glyphicons-halflings-regular.woff     |  Bin 0 -> 23320 bytes
 docs/assets/js/bootstrap.js                     | 2320 ++++++
 docs/assets/js/bootstrap.min.js                 |    7 +
 docs/assets/js/jquery.min.js                    |    6 +
 docs/assets/js/npm.js                           |   13 +
 docs/assets/js/owl.carousel.min.js              |   47 +
 docs/assets/js/storm.js                         |   67 +
 docs/cgroups_in_storm.md                        |   71 +
 docs/css/style.css                              |  553 ++
 docs/distcache-blobstore.md                     |  740 ++
 docs/dynamic-log-level-settings.md              |   45 +
 docs/dynamic-worker-profiling.md                |   37 +
 docs/favicon.ico                                |  Bin 0 -> 1150 bytes
 docs/flux.md                                    |  835 +++
 docs/images/ack_tree.png                        |  Bin 0 -> 31463 bytes
 docs/images/architecture.png                    |  Bin 0 -> 69825 bytes
 docs/images/architecture.svg                    | 1458 ++++
 docs/images/batched-stream.png                  |  Bin 0 -> 66336 bytes
 docs/images/bolt.png                            |  Bin 0 -> 24796 bytes
 docs/images/bolt.svg                            |  743 ++
 docs/images/bullet.gif                          |  Bin 0 -> 82 bytes
 docs/images/download.png                        |  Bin 0 -> 16272 bytes
 docs/images/drpc-workflow.png                   |  Bin 0 -> 66199 bytes
 docs/images/dynamic_log_level_settings_1.png    |  Bin 0 -> 93689 bytes
 docs/images/dynamic_log_level_settings_2.png    |  Bin 0 -> 78785 bytes
 docs/images/dynamic_profiling_debugging_1.png   |  Bin 0 -> 56876 bytes
 docs/images/dynamic_profiling_debugging_2.png   |  Bin 0 -> 99164 bytes
 docs/images/dynamic_profiling_debugging_3.png   |  Bin 0 -> 96974 bytes
 docs/images/dynamic_profiling_debugging_4.png   |  Bin 0 -> 121994 bytes
 docs/images/eclipse-project-properties.png      |  Bin 0 -> 80810 bytes
 docs/images/example-of-a-running-topology.png   |  Bin 0 -> 81430 bytes
 docs/images/footer-bg.png                       |  Bin 0 -> 138 bytes
 docs/images/grouping.png                        |  Bin 0 -> 39701 bytes
 docs/images/hdfs_blobstore.png                  |  Bin 0 -> 82180 bytes
 docs/images/header-bg.png                       |  Bin 0 -> 470 bytes
 docs/images/incubator-logo.png                  |  Bin 0 -> 11651 bytes
 docs/images/ld-library-path-eclipse-linux.png   |  Bin 0 -> 114597 bytes
 docs/images/loading.gif                         |  Bin 0 -> 12150 bytes
 docs/images/local_blobstore.png                 |  Bin 0 -> 81212 bytes
 docs/images/logo.png                            |  Bin 0 -> 26889 bytes
 docs/images/logos/aeris.jpg                     |  Bin 0 -> 7420 bytes
 docs/images/logos/alibaba.jpg                   |  Bin 0 -> 10317 bytes
 docs/images/logos/bai.jpg                       |  Bin 0 -> 10026 bytes
 docs/images/logos/cerner.jpg                    |  Bin 0 -> 7244 bytes
 docs/images/logos/flipboard.jpg                 |  Bin 0 -> 8318 bytes
 docs/images/logos/fullcontact.jpg               |  Bin 0 -> 6172 bytes
 docs/images/logos/groupon.jpg                   |  Bin 0 -> 9849 bytes
 docs/images/logos/health-market-science.jpg     |  Bin 0 -> 6509 bytes
 docs/images/logos/images.png                    |  Bin 0 -> 7339 bytes
 docs/images/logos/infochimp.jpg                 |  Bin 0 -> 5290 bytes
 docs/images/logos/klout.jpg                     |  Bin 0 -> 7251 bytes
 docs/images/logos/loggly.jpg                    |  Bin 0 -> 9258 bytes
 docs/images/logos/ooyala.jpg                    |  Bin 0 -> 5675 bytes
 docs/images/logos/parc.png                      |  Bin 0 -> 13720 bytes
 docs/images/logos/premise.jpg                   |  Bin 0 -> 5391 bytes
 docs/images/logos/qiy.jpg                       |  Bin 0 -> 7441 bytes
 docs/images/logos/quicklizard.jpg               |  Bin 0 -> 7382 bytes
 docs/images/logos/rocketfuel.jpg                |  Bin 0 -> 10007 bytes
 docs/images/logos/rubicon.jpg                   |  Bin 0 -> 7120 bytes
 docs/images/logos/spider.jpg                    |  Bin 0 -> 6265 bytes
 docs/images/logos/spotify.jpg                   |  Bin 0 -> 6445 bytes
 docs/images/logos/taobao.jpg                    |  Bin 0 -> 16814 bytes
 docs/images/logos/the-weather-channel.jpg       |  Bin 0 -> 13295 bytes
 docs/images/logos/twitter.jpg                   |  Bin 0 -> 7139 bytes
 docs/images/logos/verisign.jpg                  |  Bin 0 -> 5982 bytes
 docs/images/logos/webmd.jpg                     |  Bin 0 -> 8226 bytes
 docs/images/logos/wego.jpg                      |  Bin 0 -> 6836 bytes
 docs/images/logos/yahoo-japan.jpg               |  Bin 0 -> 10350 bytes
 docs/images/logos/yahoo.png                     |  Bin 0 -> 13067 bytes
 docs/images/logos/yelp.jpg                      |  Bin 0 -> 7220 bytes
 docs/images/mailinglist.png                     |  Bin 0 -> 4245 bytes
 docs/images/nimbus_ha_blobstore.png             |  Bin 0 -> 113991 bytes
 .../nimbus_ha_leader_election_and_failover.png  |  Bin 0 -> 154316 bytes
 docs/images/nimbus_ha_topology_submission.png   |  Bin 0 -> 134180 bytes
 ...onships-worker-processes-executors-tasks.png |  Bin 0 -> 54804 bytes
 docs/images/search-a-topology.png               |  Bin 0 -> 671031 bytes
 docs/images/search-for-a-single-worker-log.png  |  Bin 0 -> 736579 bytes
 docs/images/security.png                        |  Bin 0 -> 72415 bytes
 docs/images/security.svg                        | 1779 +++++
 docs/images/spout-vs-state.png                  |  Bin 0 -> 24804 bytes
 docs/images/spout.png                           |  Bin 0 -> 22911 bytes
 docs/images/spout.svg                           |  833 +++
 docs/images/storm-cluster.png                   |  Bin 0 -> 34604 bytes
 docs/images/storm-flow.png                      |  Bin 0 -> 59688 bytes
 docs/images/storm-sql-internal-example.png      |  Bin 0 -> 28377 bytes
 docs/images/storm-sql-internal-workflow.png     |  Bin 0 -> 20020 bytes
 docs/images/storm.svg                           | 1326 ++++
 docs/images/storm_header.png                    |  Bin 0 -> 17291 bytes
 docs/images/storm_logo_tagline_color.png        |  Bin 0 -> 33568 bytes
 docs/images/top_bg.gif                          |  Bin 0 -> 113 bytes
 docs/images/topology-tasks.png                  |  Bin 0 -> 45960 bytes
 docs/images/topology.png                        |  Bin 0 -> 23147 bytes
 docs/images/topology.svg                        | 1044 +++
 docs/images/topology_dark.png                   |  Bin 0 -> 49692 bytes
 docs/images/topology_dark.svg                   | 1101 +++
 docs/images/transactional-batches.png           |  Bin 0 -> 23293 bytes
 docs/images/transactional-commit-flow.png       |  Bin 0 -> 17725 bytes
 docs/images/transactional-design-2.png          |  Bin 0 -> 13537 bytes
 docs/images/transactional-spout-structure.png   |  Bin 0 -> 25067 bytes
 docs/images/trident-to-storm1.png               |  Bin 0 -> 67173 bytes
 docs/images/trident-to-storm2.png               |  Bin 0 -> 68943 bytes
 docs/images/tuple-dag.png                       |  Bin 0 -> 18849 bytes
 docs/images/tuple_tree.png                      |  Bin 0 -> 58186 bytes
 docs/images/ui_topology_viz.png                 |  Bin 0 -> 112831 bytes
 docs/index.md                                   |   81 +
 docs/nimbus-ha-design.md                        |  222 +
 docs/storm-eventhubs.md                         |   40 +
 docs/storm-hbase.md                             |  241 +
 docs/storm-hdfs.md                              |  368 +
 docs/storm-hive.md                              |  111 +
 docs/storm-jdbc.md                              |  285 +
 docs/storm-kafka.md                             |  287 +
 .../storm-metrics-profiling-internal-actions.md |   70 +
 docs/storm-redis.md                             |  258 +
 docs/storm-solr.md                              |  184 +
 docs/storm-sql-internal.md                      |   55 +
 docs/storm-sql.md                               |   97 +
 .../storm/starter/spout/RandomIntegerSpout.java |   15 +-
 .../src/jvm/storm/starter/StatefulTopology.java |    1 +
 external/storm-kafka/README.md                  |    1 -
 .../apache/storm/kafka/PartitionManager.java    |   12 +-
 external/storm-mongodb/README.md                |  195 +
 external/storm-mongodb/pom.xml                  |   74 +
 .../storm/mongodb/bolt/AbstractMongoBolt.java   |   56 +
 .../storm/mongodb/bolt/MongoInsertBolt.java     |   62 +
 .../storm/mongodb/bolt/MongoUpdateBolt.java     |   75 +
 .../storm/mongodb/common/MongoDBClient.java     |   91 +
 .../mongodb/common/QueryFilterCreator.java      |   38 +
 .../common/SimpleQueryFilterCreator.java        |   39 +
 .../mongodb/common/mapper/MongoMapper.java      |   38 +
 .../common/mapper/SimpleMongoMapper.java        |   40 +
 .../common/mapper/SimpleMongoUpdateMapper.java  |   41 +
 .../storm/mongodb/trident/state/MongoState.java |   97 +
 .../trident/state/MongoStateFactory.java        |   42 +
 .../trident/state/MongoStateUpdater.java        |   34 +
 .../storm/mongodb/topology/InsertWordCount.java |   81 +
 .../storm/mongodb/topology/UpdateWordCount.java |   91 +
 .../storm/mongodb/topology/WordCounter.java     |   67 +
 .../storm/mongodb/topology/WordSpout.java       |   88 +
 .../storm/mongodb/trident/WordCountTrident.java |   85 +
 log4j2/cluster.xml                              |   15 -
 log4j2/worker.xml                               |   15 +
 pom.xml                                         |    1 +
 .../src/clj/org/apache/storm/clojure.clj        |    3 +
 .../clj/org/apache/storm/command/heartbeats.clj |    5 +-
 .../src/clj/org/apache/storm/converter.clj      |   46 +-
 .../org/apache/storm/daemon/builtin_metrics.clj |   33 +-
 .../clj/org/apache/storm/daemon/executor.clj    |   33 +-
 .../src/clj/org/apache/storm/daemon/nimbus.clj  |  117 +-
 .../src/clj/org/apache/storm/daemon/task.clj    |   10 +-
 .../src/clj/org/apache/storm/daemon/worker.clj  |   19 +-
 .../clj/org/apache/storm/internal/clojure.clj   |    3 +
 .../apache/storm/scheduler/DefaultScheduler.clj |   80 -
 .../apache/storm/scheduler/EvenScheduler.clj    |   98 -
 .../storm/scheduler/IsolationScheduler.clj      |    4 +-
 storm-core/src/clj/org/apache/storm/stats.clj   | 1568 ----
 storm-core/src/clj/org/apache/storm/testing.clj |    8 +-
 .../clj/org/apache/storm/trident/testing.clj    |   12 +-
 storm-core/src/clj/org/apache/storm/ui/core.clj |   71 +-
 storm-core/src/clj/org/apache/storm/util.clj    |   11 -
 .../org/apache/storm/blobstore/BlobStore.java   |    5 +
 .../storm/cluster/IStormClusterState.java       |    2 +
 .../storm/cluster/StormClusterStateImpl.java    |   33 +-
 .../container/ResourceIsolationInterface.java   |    8 +
 .../storm/container/cgroup/CgroupManager.java   |   16 +-
 .../storm/coordination/CoordinatedBolt.java     |    4 +
 .../src/jvm/org/apache/storm/daemon/Acker.java  |   18 +-
 .../org/apache/storm/daemon/StormCommon.java    |    4 +
 .../daemon/supervisor/SupervisorUtils.java      |    7 +-
 .../supervisor/timer/RunProfilerActions.java    |   10 +-
 .../workermanager/DefaultWorkerManager.java     |   14 +-
 .../storm/scheduler/DefaultScheduler.java       |  111 +
 .../apache/storm/scheduler/EvenScheduler.java   |  168 +
 .../apache/storm/scheduler/TopologyDetails.java |    3 +-
 .../apache/storm/stats/BoltExecutorStats.java   |  105 +
 .../jvm/org/apache/storm/stats/CommonStats.java |  112 +
 .../apache/storm/stats/SpoutExecutorStats.java  |   79 +
 .../jvm/org/apache/storm/stats/StatsUtil.java   | 2441 +++++++
 .../org/apache/storm/task/IOutputCollector.java |    1 +
 .../org/apache/storm/task/OutputCollector.java  |   11 +
 .../storm/topology/BasicOutputCollector.java    |   10 +
 .../topology/CheckpointTupleForwarder.java      |   22 +-
 .../ComponentConfigurationDeclarer.java         |    5 +-
 .../storm/topology/IBasicOutputCollector.java   |    2 +
 .../apache/storm/topology/IStatefulBolt.java    |    7 +-
 .../apache/storm/topology/ResourceDeclarer.java |   28 +
 .../storm/topology/StatefulBoltExecutor.java    |   46 +-
 .../apache/storm/topology/TopologyBuilder.java  |    5 +-
 .../jvm/org/apache/storm/trident/Stream.java    |   31 +-
 .../org/apache/storm/trident/TridentState.java  |   27 +-
 .../apache/storm/trident/TridentTopology.java   |   91 +-
 .../org/apache/storm/trident/graph/Group.java   |   22 +-
 .../operation/DefaultResourceDeclarer.java      |   66 +
 .../trident/operation/ITridentResource.java     |   32 +
 .../org/apache/storm/trident/planner/Node.java  |    5 +-
 .../trident/topology/TridentBoltExecutor.java   |    4 +
 .../jvm/org/apache/storm/utils/ConfigUtils.java |   10 +-
 .../src/jvm/org/apache/storm/utils/Utils.java   |   38 +-
 .../org/apache/storm/integration_test.clj       |   53 +-
 .../apache/storm/trident/integration_test.clj   |  110 +-
 .../apache/storm/messaging/netty_unit_test.clj  |   14 +-
 .../test/clj/org/apache/storm/nimbus_test.clj   |  152 +-
 .../clj/org/apache/storm/scheduler_test.clj     |   34 +-
 .../apache/storm/security/auth/auth_test.clj    |   15 +-
 .../storm/security/auth/drpc_auth_test.clj      |   15 +-
 .../storm/security/auth/nimbus_auth_test.clj    |   15 +-
 .../clj/org/apache/storm/supervisor_test.clj    |    2 +-
 .../apache/storm/blobstore/BlobStoreTest.java   |  171 +-
 .../cluster/StormClusterStateImplTest.java      |  116 +
 .../apache/storm/localizer/LocalizerTest.java   |    7 +-
 .../topology/StatefulBoltExecutorTest.java      |    1 +
 storm-dist/binary/src/main/assembly/binary.xml  |   14 +
 299 files changed, 37982 insertions(+), 2220 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/f03b8bec/storm-core/src/clj/org/apache/storm/testing.clj
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/f03b8bec/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java
index bb2525a,0000000..a567956
mode 100644,000000..100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java
@@@ -1,268 -1,0 +1,271 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + * http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.storm.daemon.supervisor;
 +
 +import org.apache.commons.lang.StringUtils;
 +import org.apache.storm.Config;
 +import org.apache.storm.ProcessSimulator;
 +import org.apache.storm.generated.LSWorkerHeartbeat;
 +import org.apache.storm.localizer.LocalResource;
 +import org.apache.storm.localizer.Localizer;
 +import org.apache.storm.utils.ConfigUtils;
 +import org.apache.storm.utils.LocalState;
 +import org.apache.storm.utils.Time;
 +import org.apache.storm.utils.Utils;
 +import org.apache.zookeeper.ZooDefs;
 +import org.apache.zookeeper.data.ACL;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import java.io.File;
 +import java.io.IOException;
 +import java.net.URLDecoder;
 +import java.util.*;
 +
 +public class SupervisorUtils {
 +
 +    private static final Logger LOG = LoggerFactory.getLogger(SupervisorUtils.class);
 +
 +    private static final SupervisorUtils INSTANCE = new SupervisorUtils();
 +    private static SupervisorUtils _instance = INSTANCE;
 +    public static void setInstance(SupervisorUtils u) {
 +        _instance = u;
 +    }
 +    public static void resetInstance() {
 +        _instance = INSTANCE;
 +    }
 +
-     public static Process processLauncher(Map conf, String user, List<String> args, Map<String, String> environment, final String logPreFix,
++    public static Process processLauncher(Map conf, String user, List<String> commandPrefix, List<String> args, Map<String, String> environment, final String logPreFix,
 +                                          final Utils.ExitCodeCallable exitCodeCallback, File dir) throws IOException {
 +        if (StringUtils.isBlank(user)) {
 +            throw new IllegalArgumentException("User cannot be blank when calling processLauncher.");
 +        }
 +        String wlinitial = (String) (conf.get(Config.SUPERVISOR_WORKER_LAUNCHER));
 +        String stormHome = ConfigUtils.concatIfNotNull(System.getProperty("storm.home"));
 +        String wl;
 +        if (StringUtils.isNotBlank(wlinitial)) {
 +            wl = wlinitial;
 +        } else {
 +            wl = stormHome + "/bin/worker-launcher";
 +        }
 +        List<String> commands = new ArrayList<>();
++        if (commandPrefix != null){
++            commands.addAll(commandPrefix);
++        }
 +        commands.add(wl);
 +        commands.add(user);
 +        commands.addAll(args);
 +        LOG.info("Running as user: {} command: {}", user, commands);
 +        return Utils.launchProcess(commands, environment, logPreFix, exitCodeCallback, dir);
 +    }
 +
 +    public static int processLauncherAndWait(Map conf, String user, List<String> args, final Map<String, String> environment, final String logPreFix)
 +            throws IOException {
 +        int ret = 0;
-         Process process = processLauncher(conf, user, args, environment, logPreFix, null, null);
++        Process process = processLauncher(conf, user, null, args, environment, logPreFix, null, null);
 +        if (StringUtils.isNotBlank(logPreFix))
 +            Utils.readAndLogStream(logPreFix, process.getInputStream());
 +        try {
 +            process.waitFor();
 +        } catch (InterruptedException e) {
 +            LOG.info("{} interrupted.", logPreFix);
 +        }
 +        ret = process.exitValue();
 +        return ret;
 +    }
 +
 +    public static void setupStormCodeDir(Map conf, Map stormConf, String dir) throws IOException {
 +        if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
 +            String logPrefix = "setup conf for " + dir;
 +            List<String> commands = new ArrayList<>();
 +            commands.add("code-dir");
 +            commands.add(dir);
 +            processLauncherAndWait(conf, (String) (stormConf.get(Config.TOPOLOGY_SUBMITTER_USER)), commands, null, logPrefix);
 +        }
 +    }
 +
 +    public static void rmrAsUser(Map conf, String id, String path) throws IOException {
 +        String user = Utils.getFileOwner(path);
 +        String logPreFix = "rmr " + id;
 +        List<String> commands = new ArrayList<>();
 +        commands.add("rmr");
 +        commands.add(path);
 +        SupervisorUtils.processLauncherAndWait(conf, user, commands, null, logPreFix);
 +        if (Utils.checkFileExists(path)) {
 +            throw new RuntimeException(path + " was not deleted.");
 +        }
 +    }
 +
 +    /**
 +     * Given the blob information returns the value of the uncompress field, handling it either being a string or a boolean value, or if it's not specified then
 +     * returns false
 +     * 
 +     * @param blobInfo
 +     * @return
 +     */
 +    public static Boolean shouldUncompressBlob(Map<String, Object> blobInfo) {
 +        return Utils.getBoolean(blobInfo.get("uncompress"), false);
 +    }
 +
 +    /**
 +     * Returns a list of LocalResources based on the blobstore-map passed in
 +     * 
 +     * @param blobstoreMap
 +     * @return
 +     */
 +    public static List<LocalResource> blobstoreMapToLocalresources(Map<String, Map<String, Object>> blobstoreMap) {
 +        List<LocalResource> localResourceList = new ArrayList<>();
 +        if (blobstoreMap != null) {
 +            for (Map.Entry<String, Map<String, Object>> map : blobstoreMap.entrySet()) {
 +                LocalResource localResource = new LocalResource(map.getKey(), shouldUncompressBlob(map.getValue()));
 +                localResourceList.add(localResource);
 +            }
 +        }
 +        return localResourceList;
 +    }
 +
 +    /**
 +     * For each of the downloaded topologies, adds references to the blobs that the topologies are using. This is used to reconstruct the cache on restart.
 +     * 
 +     * @param localizer
 +     * @param stormId
 +     * @param conf
 +     */
 +    public static void addBlobReferences(Localizer localizer, String stormId, Map conf) throws IOException {
 +        Map stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId);
 +        Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String, Object>>) stormConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
 +        String user = (String) stormConf.get(Config.TOPOLOGY_SUBMITTER_USER);
 +        String topoName = (String) stormConf.get(Config.TOPOLOGY_NAME);
 +        List<LocalResource> localresources = SupervisorUtils.blobstoreMapToLocalresources(blobstoreMap);
 +        if (blobstoreMap != null) {
 +            localizer.addReferences(localresources, user, topoName);
 +        }
 +    }
 +
 +    public static Set<String> readDownLoadedStormIds(Map conf) throws IOException {
 +        Set<String> stormIds = new HashSet<>();
 +        String path = ConfigUtils.supervisorStormDistRoot(conf);
 +        Collection<String> rets = Utils.readDirContents(path);
 +        for (String ret : rets) {
 +            stormIds.add(URLDecoder.decode(ret));
 +        }
 +        return stormIds;
 +    }
 +
 +    public static Collection<String> supervisorWorkerIds(Map conf) {
 +        String workerRoot = ConfigUtils.workerRoot(conf);
 +        return Utils.readDirContents(workerRoot);
 +    }
 +
 +    public static boolean doRequiredTopoFilesExist(Map conf, String stormId) throws IOException {
 +        String stormroot = ConfigUtils.supervisorStormDistRoot(conf, stormId);
 +        String stormjarpath = ConfigUtils.supervisorStormJarPath(stormroot);
 +        String stormcodepath = ConfigUtils.supervisorStormCodePath(stormroot);
 +        String stormconfpath = ConfigUtils.supervisorStormConfPath(stormroot);
 +        if (!Utils.checkFileExists(stormroot))
 +            return false;
 +        if (!Utils.checkFileExists(stormcodepath))
 +            return false;
 +        if (!Utils.checkFileExists(stormconfpath))
 +            return false;
 +        if (ConfigUtils.isLocalMode(conf) || Utils.checkFileExists(stormjarpath))
 +            return true;
 +        return false;
 +    }
 +
 +    /**
 +     * map from worker id to heartbeat
 +     *
 +     * @param conf
 +     * @return
 +     * @throws Exception
 +     */
 +    public static Map<String, LSWorkerHeartbeat> readWorkerHeartbeats(Map conf) throws Exception {
 +        return _instance.readWorkerHeartbeatsImpl(conf);
 +    }
 +
 +    public  Map<String, LSWorkerHeartbeat> readWorkerHeartbeatsImpl(Map conf) throws Exception {
 +        Map<String, LSWorkerHeartbeat> workerHeartbeats = new HashMap<>();
 +
 +        Collection<String> workerIds = SupervisorUtils.supervisorWorkerIds(conf);
 +
 +        for (String workerId : workerIds) {
 +            LSWorkerHeartbeat whb = readWorkerHeartbeat(conf, workerId);
 +            // ATTENTION: whb can be null
 +            workerHeartbeats.put(workerId, whb);
 +        }
 +        return workerHeartbeats;
 +    }
 +
 +
 +    /**
 +     * get worker heartbeat by workerId
 +     *
 +     * @param conf
 +     * @param workerId
 +     * @return
 +     * @throws IOException
 +     */
 +    public static LSWorkerHeartbeat readWorkerHeartbeat(Map conf, String workerId) {
 +        return _instance.readWorkerHeartbeatImpl(conf, workerId);
 +    }
 +
 +    public  LSWorkerHeartbeat readWorkerHeartbeatImpl(Map conf, String workerId) {
 +        try {
 +            LocalState localState = ConfigUtils.workerState(conf, workerId);
 +            return localState.getWorkerHeartBeat();
 +        } catch (Exception e) {
 +            LOG.warn("Failed to read local heartbeat for workerId : {},Ignoring exception.", workerId, e);
 +            return null;
 +        }
 +    }
 +
 +    public static boolean  isWorkerHbTimedOut(int now, LSWorkerHeartbeat whb, Map conf) {
 +        return _instance.isWorkerHbTimedOutImpl(now, whb, conf);
 +    }
 +
 +    public  boolean  isWorkerHbTimedOutImpl(int now, LSWorkerHeartbeat whb, Map conf) {
 +        boolean result = false;
 +        if ((now - whb.get_time_secs()) > Utils.getInt(conf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS))) {
 +            result = true;
 +        }
 +        return result;
 +    }
 +
 +    public static String javaCmd(String cmd) {
 +        return _instance.javaCmdImpl(cmd);
 +    }
 +
 +    public String javaCmdImpl(String cmd) {
 +        String ret = null;
 +        String javaHome = System.getenv().get("JAVA_HOME");
 +        if (StringUtils.isNotBlank(javaHome)) {
 +            ret = javaHome + Utils.FILE_PATH_SEPARATOR + "bin" + Utils.FILE_PATH_SEPARATOR + cmd;
 +        } else {
 +            ret = cmd;
 +        }
 +        return ret;
 +    }
 +    
 +    public final static List<ACL> supervisorZkAcls() {
 +        final List<ACL> acls = new ArrayList<>();
 +        acls.add(ZooDefs.Ids.CREATOR_ALL_ACL.get(0));
 +        acls.add(new ACL((ZooDefs.Perms.READ ^ ZooDefs.Perms.CREATE), ZooDefs.Ids.ANYONE_ID_UNSAFE));
 +        return acls;
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/storm/blob/f03b8bec/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java
index ec29855,0000000..6b294f2
mode 100644,000000..100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java
@@@ -1,221 -1,0 +1,221 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + * http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.storm.daemon.supervisor.timer;
 +
 +import org.apache.storm.Config;
 +import org.apache.storm.cluster.IStormClusterState;
 +import org.apache.storm.daemon.supervisor.SupervisorData;
 +import org.apache.storm.daemon.supervisor.SupervisorUtils;
 +import org.apache.storm.generated.ProfileAction;
 +import org.apache.storm.generated.ProfileRequest;
 +import org.apache.storm.utils.ConfigUtils;
 +import org.apache.storm.utils.Utils;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import java.io.BufferedReader;
 +import java.io.File;
 +import java.io.FileReader;
 +import java.io.IOException;
 +import java.util.*;
 +
 +public class RunProfilerActions implements Runnable {
 +    private static Logger LOG = LoggerFactory.getLogger(RunProfilerActions.class);
 +
 +    private Map conf;
 +    private IStormClusterState stormClusterState;
 +    private String hostName;
 +
 +    private String profileCmd;
 +
 +    private SupervisorData supervisorData;
 +
 +    private class ActionExitCallback implements Utils.ExitCodeCallable {
 +        private String stormId;
 +        private ProfileRequest profileRequest;
 +        private String logPrefix;
 +
 +        public ActionExitCallback(String stormId, ProfileRequest profileRequest, String logPrefix) {
 +            this.stormId = stormId;
 +            this.profileRequest = profileRequest;
 +            this.logPrefix = logPrefix;
 +        }
 +
 +        @Override
 +        public Object call() throws Exception {
 +            return null;
 +        }
 +
 +        @Override
 +        public Object call(int exitCode) {
 +            LOG.info("{} profile-action exited for {}", logPrefix, exitCode);
 +            try {
 +                stormClusterState.deleteTopologyProfileRequests(stormId, profileRequest);
 +            } catch (Exception e) {
 +                LOG.warn("failed delete profileRequest: " + profileRequest);
 +            }
 +            return null;
 +        }
 +    }
 +
 +    public RunProfilerActions(SupervisorData supervisorData) {
 +        this.conf = supervisorData.getConf();
 +        this.stormClusterState = supervisorData.getStormClusterState();
 +        this.hostName = supervisorData.getHostName();
 +        this.profileCmd = (String) (conf.get(Config.WORKER_PROFILER_COMMAND));
 +        this.supervisorData = supervisorData;
 +    }
 +
 +    @Override
 +    public void run() {
 +        Map<String, List<ProfileRequest>> stormIdToActions = supervisorData.getStormIdToProfileActions().get();
 +        try {
 +            for (Map.Entry<String, List<ProfileRequest>> entry : stormIdToActions.entrySet()) {
 +                String stormId = entry.getKey();
 +                List<ProfileRequest> requests = entry.getValue();
 +                if (requests != null) {
 +                    for (ProfileRequest profileRequest : requests) {
 +                        if (profileRequest.get_nodeInfo().get_node().equals(hostName)) {
 +                            boolean stop = System.currentTimeMillis() > profileRequest.get_time_stamp() ? true : false;
 +                            Long port = profileRequest.get_nodeInfo().get_port().iterator().next();
 +                            String targetDir = ConfigUtils.workerArtifactsRoot(conf, String.valueOf(port));
 +                            Map stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId);
 +
 +                            String user = null;
 +                            if (stormConf.get(Config.TOPOLOGY_SUBMITTER_USER) != null) {
 +                                user = (String) (stormConf.get(Config.TOPOLOGY_SUBMITTER_USER));
 +                            }
 +                            Map<String, String> env = null;
 +                            if (stormConf.get(Config.TOPOLOGY_ENVIRONMENT) != null) {
 +                                env = (Map<String, String>) stormConf.get(Config.TOPOLOGY_ENVIRONMENT);
 +                            } else {
 +                                env = new HashMap<String, String>();
 +                            }
 +
 +                            String str = ConfigUtils.workerArtifactsPidPath(conf, stormId, port.intValue());
 +                            StringBuilder stringBuilder = new StringBuilder();
 +                            FileReader reader = null;
 +                            BufferedReader br = null;
 +                            try {
 +                                reader = new FileReader(str);
 +                                br = new BufferedReader(reader);
 +                                int c;
 +                                while ((c = br.read()) >= 0) {
 +                                    stringBuilder.append(c);
 +                                }
 +                            } catch (IOException e) {
 +                                if (reader != null)
 +                                    reader.close();
 +                                if (br != null)
 +                                    br.close();
 +                            }
 +                            String workerPid = stringBuilder.toString().trim();
 +                            ProfileAction profileAction = profileRequest.get_action();
 +                            String logPrefix = "ProfilerAction process " + stormId + ":" + port + " PROFILER_ACTION: " + profileAction + " ";
 +
 +                            // Until PROFILER_STOP action is invalid, keep launching profiler start in case worker restarted
 +                            // The profiler plugin script validates if JVM is recording before starting another recording.
 +                            String command = mkCommand(profileAction, stop, workerPid, targetDir);
 +                            List<String> listCommand = new ArrayList<>();
 +                            if (command != null) {
 +                                listCommand.addAll(Arrays.asList(command.split(" ")));
 +                            }
 +                            try {
 +                                ActionExitCallback actionExitCallback = new ActionExitCallback(stormId, profileRequest, logPrefix);
 +                                launchProfilerActionForWorker(user, targetDir, listCommand, env, actionExitCallback, logPrefix);
 +                            } catch (IOException e) {
 +                                LOG.error("Error in processing ProfilerAction '{}' for {}:{}, will retry later", profileAction, stormId, port);
 +                            } catch (RuntimeException e) {
 +                                LOG.error("Error in processing ProfilerAction '{}' for {}:{}, will retry later", profileAction, stormId, port);
 +                            }
 +                        }
 +                    }
 +                }
 +            }
 +        } catch (Exception e) {
 +            LOG.error("Error running profiler actions, will retry again later");
 +        }
 +    }
 +
 +    private void launchProfilerActionForWorker(String user, String targetDir, List<String> commands, Map<String, String> environment,
 +            final Utils.ExitCodeCallable exitCodeCallable, String logPrefix) throws IOException {
 +        File targetFile = new File(targetDir);
 +        if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
 +            LOG.info("Running as user:{} command:{}", user, commands);
 +            String containerFile = Utils.containerFilePath(targetDir);
 +            if (Utils.checkFileExists(containerFile)) {
 +                SupervisorUtils.rmrAsUser(conf, containerFile, containerFile);
 +            }
 +            String scriptFile = Utils.scriptFilePath(targetDir);
 +            if (Utils.checkFileExists(scriptFile)) {
 +                SupervisorUtils.rmrAsUser(conf, scriptFile, scriptFile);
 +            }
 +            String script = Utils.writeScript(targetDir, commands, environment);
-             List<String> newCommands = new ArrayList<>();
-             newCommands.add("profiler");
-             newCommands.add(targetDir);
-             newCommands.add(script);
-             SupervisorUtils.processLauncher(conf, user, newCommands, environment, logPrefix, exitCodeCallable, targetFile);
++            List<String> args = new ArrayList<>();
++            args.add("profiler");
++            args.add(targetDir);
++            args.add(script);
++            SupervisorUtils.processLauncher(conf, user, null, args, environment, logPrefix, exitCodeCallable, targetFile);
 +        } else {
 +            Utils.launchProcess(commands, environment, logPrefix, exitCodeCallable, targetFile);
 +        }
 +    }
 +
 +    private String mkCommand(ProfileAction action, boolean stop, String workerPid, String targetDir) {
 +        if (action == ProfileAction.JMAP_DUMP) {
 +            return jmapDumpCmd(workerPid, targetDir);
 +        } else if (action == ProfileAction.JSTACK_DUMP) {
 +            return jstackDumpCmd(workerPid, targetDir);
 +        } else if (action == ProfileAction.JPROFILE_DUMP) {
 +            return jprofileDump(workerPid, targetDir);
 +        } else if (action == ProfileAction.JVM_RESTART) {
 +            return jprofileJvmRestart(workerPid);
 +        } else if (!stop && action == ProfileAction.JPROFILE_STOP) {
 +            return jprofileStart(workerPid);
 +        } else if (stop && action == ProfileAction.JPROFILE_STOP) {
 +            return jprofileStop(workerPid, targetDir);
 +        }
 +        return null;
 +    }
 +
 +    private String jmapDumpCmd(String pid, String targetDir) {
 +        return profileCmd + " " + pid + " jmap " + targetDir;
 +    }
 +
 +    private String jstackDumpCmd(String pid, String targetDir) {
 +        return profileCmd + " " + pid + " jstack " + targetDir;
 +    }
 +
 +    private String jprofileStart(String pid) {
 +        return profileCmd + " " + pid + " start";
 +    }
 +
 +    private String jprofileStop(String pid, String targetDir) {
 +        return profileCmd + " " + pid + " stop " + targetDir;
 +    }
 +
 +    private String jprofileDump(String pid, String targetDir) {
 +        return profileCmd + " " + pid + " dump " + targetDir;
 +    }
 +
 +    private String jprofileJvmRestart(String pid) {
 +        return profileCmd + " " + pid + " kill";
 +    }
 +
 +}

http://git-wip-us.apache.org/repos/asf/storm/blob/f03b8bec/storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/DefaultWorkerManager.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/DefaultWorkerManager.java
index a73a9bd,0000000..9529b1a
mode 100644,000000..100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/DefaultWorkerManager.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/DefaultWorkerManager.java
@@@ -1,402 -1,0 +1,408 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + * http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.storm.daemon.supervisor.workermanager;
 +
 +import org.apache.commons.lang.StringUtils;
 +import org.apache.storm.Config;
 +import org.apache.storm.ProcessSimulator;
 +import org.apache.storm.container.cgroup.CgroupManager;
 +import org.apache.storm.daemon.supervisor.SupervisorUtils;
 +import org.apache.storm.generated.WorkerResources;
 +import org.apache.storm.localizer.Localizer;
 +import org.apache.storm.utils.ConfigUtils;
 +import org.apache.storm.utils.Time;
 +import org.apache.storm.utils.Utils;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import java.io.File;
 +import java.io.IOException;
 +import java.util.*;
 +
 +public class DefaultWorkerManager implements IWorkerManager {
 +
 +    private static Logger LOG = LoggerFactory.getLogger(DefaultWorkerManager.class);
 +
 +    private Map conf;
 +    private CgroupManager resourceIsolationManager;
 +    private boolean runWorkerAsUser;
 +
 +    @Override
 +    public void prepareWorker(Map conf, Localizer localizer) {
 +        this.conf = conf;
 +        if (Utils.getBoolean(conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN_ENABLE), false)) {
 +            try {
 +                this.resourceIsolationManager = Utils.newInstance((String) conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN));
 +                this.resourceIsolationManager.prepare(conf);
 +                LOG.info("Using resource isolation plugin {} {}", conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN), resourceIsolationManager);
 +            } catch (IOException e) {
 +                throw Utils.wrapInRuntime(e);
 +            }
 +        } else {
 +            this.resourceIsolationManager = null;
 +        }
 +        this.runWorkerAsUser = Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false);
 +    }
 +
 +    @Override
 +    public IWorkerResult launchWorker(String supervisorId, String assignmentId, String stormId, Long port, String workerId, WorkerResources resources,
 +            Utils.ExitCodeCallable workerExitCallback) {
 +        try {
 +
 +            String stormHome = ConfigUtils.concatIfNotNull(System.getProperty("storm.home"));
 +            String stormOptions = ConfigUtils.concatIfNotNull(System.getProperty("storm.options"));
 +            String stormConfFile = ConfigUtils.concatIfNotNull(System.getProperty("storm.conf.file"));
 +            String workerTmpDir = ConfigUtils.workerTmpRoot(conf, workerId);
 +
 +            String stormLogDir = ConfigUtils.getLogDir();
 +            String stormLogConfDir = (String) (conf.get(Config.STORM_LOG4J2_CONF_DIR));
 +
 +            String stormLog4j2ConfDir;
 +            if (StringUtils.isNotBlank(stormLogConfDir)) {
 +                if (Utils.isAbsolutePath(stormLogConfDir)) {
 +                    stormLog4j2ConfDir = stormLogConfDir;
 +                } else {
 +                    stormLog4j2ConfDir = stormHome + Utils.FILE_PATH_SEPARATOR + stormLogConfDir;
 +                }
 +            } else {
 +                stormLog4j2ConfDir = stormHome + Utils.FILE_PATH_SEPARATOR + "log4j2";
 +            }
 +
 +            String stormRoot = ConfigUtils.supervisorStormDistRoot(conf, stormId);
 +
 +            String jlp = jlp(stormRoot, conf);
 +
 +            String stormJar = ConfigUtils.supervisorStormJarPath(stormRoot);
 +
 +            Map stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId);
 +
 +            String workerClassPath = getWorkerClassPath(stormJar, stormConf);
 +
 +            Object topGcOptsObject = stormConf.get(Config.TOPOLOGY_WORKER_GC_CHILDOPTS);
 +            List<String> topGcOpts = new ArrayList<>();
 +            if (topGcOptsObject instanceof String) {
 +                topGcOpts.add((String) topGcOptsObject);
 +            } else if (topGcOptsObject instanceof List) {
 +                topGcOpts.addAll((List<String>) topGcOptsObject);
 +            }
 +
 +            int memOnheap = 0;
 +            if (resources.get_mem_on_heap() > 0) {
 +                memOnheap = (int) Math.ceil(resources.get_mem_on_heap());
 +            } else {
 +                // set the default heap memory size for supervisor-test
 +                memOnheap = Utils.getInt(stormConf.get(Config.WORKER_HEAP_MEMORY_MB), 768);
 +            }
 +
 +            int memoffheap = (int) Math.ceil(resources.get_mem_off_heap());
 +
 +            int cpu = (int) Math.ceil(resources.get_cpu());
 +
 +            List<String> gcOpts = null;
 +
 +            if (topGcOpts.size() > 0) {
 +                gcOpts = substituteChildopts(topGcOpts, workerId, stormId, port, memOnheap);
 +            } else {
 +                gcOpts = substituteChildopts(conf.get(Config.WORKER_GC_CHILDOPTS), workerId, stormId, port, memOnheap);
 +            }
 +
 +            Object topoWorkerLogwriterObject = stormConf.get(Config.TOPOLOGY_WORKER_LOGWRITER_CHILDOPTS);
 +            List<String> topoWorkerLogwriterChildopts = new ArrayList<>();
 +            if (topoWorkerLogwriterObject instanceof String) {
 +                topoWorkerLogwriterChildopts.add((String) topoWorkerLogwriterObject);
 +            } else if (topoWorkerLogwriterObject instanceof List) {
 +                topoWorkerLogwriterChildopts.addAll((List<String>) topoWorkerLogwriterObject);
 +            }
 +
 +            String user = (String) stormConf.get(Config.TOPOLOGY_SUBMITTER_USER);
 +
 +            String logfileName = "worker.log";
 +
 +            String workersArtifacets = ConfigUtils.workerArtifactsRoot(conf);
 +
 +            String loggingSensitivity = (String) stormConf.get(Config.TOPOLOGY_LOGGING_SENSITIVITY);
 +            if (loggingSensitivity == null) {
 +                loggingSensitivity = "S3";
 +            }
 +
 +            List<String> workerChildopts = substituteChildopts(conf.get(Config.WORKER_CHILDOPTS), workerId, stormId, port, memOnheap);
 +
 +            List<String> topWorkerChildopts = substituteChildopts(stormConf.get(Config.TOPOLOGY_WORKER_CHILDOPTS), workerId, stormId, port, memOnheap);
 +
 +            List<String> workerProfilerChildopts = null;
 +            if (Utils.getBoolean(conf.get(Config.WORKER_PROFILER_ENABLED), false)) {
 +                workerProfilerChildopts = substituteChildopts(conf.get(Config.WORKER_PROFILER_CHILDOPTS), workerId, stormId, port, memOnheap);
 +            } else {
 +                workerProfilerChildopts = new ArrayList<>();
 +            }
 +
 +            Map<String, String> topEnvironment = new HashMap<String, String>();
 +            Map<String, String> environment = (Map<String, String>) stormConf.get(Config.TOPOLOGY_ENVIRONMENT);
 +            if (environment != null) {
 +                topEnvironment.putAll(environment);
 +            }
 +            topEnvironment.put("LD_LIBRARY_PATH", jlp);
 +
 +            String log4jConfigurationFile = null;
 +            if (System.getProperty("os.name").startsWith("Windows") && !stormLog4j2ConfDir.startsWith("file:")) {
 +                log4jConfigurationFile = "file:///" + stormLog4j2ConfDir;
 +            } else {
 +                log4jConfigurationFile = stormLog4j2ConfDir;
 +            }
 +            log4jConfigurationFile = log4jConfigurationFile + Utils.FILE_PATH_SEPARATOR + "worker.xml";
 +
 +            List<String> commandList = new ArrayList<>();
 +            commandList.add(SupervisorUtils.javaCmd("java"));
 +            commandList.add("-cp");
 +            commandList.add(workerClassPath);
 +            commandList.addAll(topoWorkerLogwriterChildopts);
 +            commandList.add("-Dlogfile.name=" + logfileName);
 +            commandList.add("-Dstorm.home=" + stormHome);
 +            commandList.add("-Dworkers.artifacts=" + workersArtifacets);
 +            commandList.add("-Dstorm.id=" + stormId);
 +            commandList.add("-Dworker.id=" + workerId);
 +            commandList.add("-Dworker.port=" + port);
 +            commandList.add("-Dstorm.log.dir=" + stormLogDir);
 +            commandList.add("-Dlog4j.configurationFile=" + log4jConfigurationFile);
 +            commandList.add("-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector");
 +            commandList.add("org.apache.storm.LogWriter");
 +
 +            commandList.add(SupervisorUtils.javaCmd("java"));
 +            commandList.add("-server");
 +            commandList.addAll(workerChildopts);
 +            commandList.addAll(topWorkerChildopts);
 +            commandList.addAll(gcOpts);
 +            commandList.addAll(workerProfilerChildopts);
 +            commandList.add("-Djava.library.path=" + jlp);
 +            commandList.add("-Dlogfile.name=" + logfileName);
 +            commandList.add("-Dstorm.home=" + stormHome);
 +            commandList.add("-Dworkers.artifacts=" + workersArtifacets);
 +            commandList.add("-Dstorm.conf.file=" + stormConfFile);
 +            commandList.add("-Dstorm.options=" + stormOptions);
 +            commandList.add("-Dstorm.log.dir=" + stormLogDir);
 +            commandList.add("-Djava.io.tmpdir=" + workerTmpDir);
 +            commandList.add("-Dlogging.sensitivity=" + loggingSensitivity);
 +            commandList.add("-Dlog4j.configurationFile=" + log4jConfigurationFile);
 +            commandList.add("-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector");
 +            commandList.add("-Dstorm.id=" + stormId);
 +            commandList.add("-Dworker.id=" + workerId);
 +            commandList.add("-Dworker.port=" + port);
 +            commandList.add("-cp");
 +            commandList.add(workerClassPath);
 +            commandList.add("org.apache.storm.daemon.worker");
 +            commandList.add(stormId);
 +            commandList.add(assignmentId);
 +            commandList.add(String.valueOf(port));
 +            commandList.add(workerId);
 +
 +            // {"cpu" cpu "memory" (+ mem-onheap mem-offheap (int (Math/ceil (conf STORM-CGROUP-MEMORY-LIMIT-TOLERANCE-MARGIN-MB))))
 +            if (resourceIsolationManager != null) {
 +                int cGroupMem = (int) (Math.ceil((double) conf.get(Config.STORM_CGROUP_MEMORY_LIMIT_TOLERANCE_MARGIN_MB)));
 +                int memoryValue = memoffheap + memOnheap + cGroupMem;
 +                int cpuValue = cpu;
 +                Map<String, Number> map = new HashMap<>();
 +                map.put("cpu", cpuValue);
 +                map.put("memory", memoryValue);
 +                resourceIsolationManager.reserveResourcesForWorker(workerId, map);
 +                commandList = resourceIsolationManager.getLaunchCommand(workerId, commandList);
 +            }
 +
 +            LOG.info("Launching worker with command: {}. ", Utils.shellCmd(commandList));
 +
 +            String logPrefix = "Worker Process " + workerId;
 +            String workerDir = ConfigUtils.workerRoot(conf, workerId);
 +
 +            if (runWorkerAsUser) {
 +                List<String> args = new ArrayList<>();
 +                args.add("worker");
 +                args.add(workerDir);
 +                args.add(Utils.writeScript(workerDir, commandList, topEnvironment));
-                 SupervisorUtils.processLauncher(conf, user, args, null, logPrefix, workerExitCallback, new File(workerDir));
++                List<String> commandPrefix = null;
++                if (resourceIsolationManager != null)
++                    commandPrefix = resourceIsolationManager.getLaunchCommandPrefix(workerId);
++                SupervisorUtils.processLauncher(conf, user, commandPrefix, args, null, logPrefix, workerExitCallback, new File(workerDir));
 +            } else {
 +                Utils.launchProcess(commandList, topEnvironment, logPrefix, workerExitCallback, new File(workerDir));
 +            }
 +        } catch (IOException e) {
 +            throw Utils.wrapInRuntime(e);
 +        }
 +        return null;
 +    }
 +
 +    @Override
 +    public IWorkerResult shutdownWorker(String supervisorId, String workerId, Map<String, String> workerThreadPids) {
 +        try {
 +            LOG.info("Shutting down {}:{}", supervisorId, workerId);
 +            Collection<String> pids = Utils.readDirContents(ConfigUtils.workerPidsRoot(conf, workerId));
 +            Integer shutdownSleepSecs = Utils.getInt(conf.get(Config.SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS));
 +            String user = ConfigUtils.getWorkerUser(conf, workerId);
 +            String threadPid = workerThreadPids.get(workerId);
 +            if (StringUtils.isNotBlank(threadPid)) {
 +                ProcessSimulator.killProcess(threadPid);
 +            }
 +
 +            for (String pid : pids) {
 +                if (runWorkerAsUser) {
 +                    List<String> commands = new ArrayList<>();
 +                    commands.add("signal");
 +                    commands.add(pid);
 +                    commands.add("15");
 +                    String logPrefix = "kill -15 " + pid;
 +                    SupervisorUtils.processLauncherAndWait(conf, user, commands, null, logPrefix);
 +                } else {
 +                    Utils.killProcessWithSigTerm(pid);
 +                }
 +            }
 +
 +            if (pids.size() > 0) {
 +                LOG.info("Sleep {} seconds for execution of cleanup threads on worker.", shutdownSleepSecs);
 +                Time.sleepSecs(shutdownSleepSecs);
 +            }
 +
 +            for (String pid : pids) {
 +                if (runWorkerAsUser) {
 +                    List<String> commands = new ArrayList<>();
 +                    commands.add("signal");
 +                    commands.add(pid);
 +                    commands.add("9");
 +                    String logPrefix = "kill -9 " + pid;
 +                    SupervisorUtils.processLauncherAndWait(conf, user, commands, null, logPrefix);
 +                } else {
 +                    Utils.forceKillProcess(pid);
 +                }
 +                String path = ConfigUtils.workerPidPath(conf, workerId, pid);
 +                if (runWorkerAsUser) {
 +                    SupervisorUtils.rmrAsUser(conf, workerId, path);
 +                } else {
 +                    try {
 +                        LOG.debug("Removing path {}", path);
 +                        new File(path).delete();
 +                    } catch (Exception e) {
 +                        // on windows, the supervisor may still holds the lock on the worker directory
 +                        // ignore
 +                    }
 +                }
 +            }
 +            LOG.info("Shut down {}:{}", supervisorId, workerId);
 +        } catch (Exception e) {
 +            throw Utils.wrapInRuntime(e);
 +        }
 +        return null;
 +    }
 +
 +    @Override
 +    public boolean cleanupWorker(String workerId) {
 +        try {
++            //clean up for resource isolation if enabled
++            if (resourceIsolationManager != null) {
++                resourceIsolationManager.releaseResourcesForWorker(workerId);
++            }
++            //Always make sure to clean up everything else before worker directory
++            //is removed since that is what is going to trigger the retry for cleanup
 +            String workerRoot = ConfigUtils.workerRoot(conf, workerId);
 +            if (Utils.checkFileExists(workerRoot)) {
 +                if (runWorkerAsUser) {
 +                    SupervisorUtils.rmrAsUser(conf, workerId, workerRoot);
 +                } else {
 +                    Utils.forceDelete(ConfigUtils.workerHeartbeatsRoot(conf, workerId));
 +                    Utils.forceDelete(ConfigUtils.workerPidsRoot(conf, workerId));
 +                    Utils.forceDelete(ConfigUtils.workerTmpRoot(conf, workerId));
 +                    Utils.forceDelete(ConfigUtils.workerRoot(conf, workerId));
 +                }
 +                ConfigUtils.removeWorkerUserWSE(conf, workerId);
 +            }
-             if (resourceIsolationManager != null) {
-                 resourceIsolationManager.releaseResourcesForWorker(workerId);
-             }
 +            return true;
 +        } catch (IOException e) {
 +            LOG.warn("Failed to cleanup worker {}. Will retry later", workerId, e);
 +        } catch (RuntimeException e) {
 +            LOG.warn("Failed to cleanup worker {}. Will retry later", workerId, e);
 +        }
 +        return false;
 +    }
 +
 +    @Override
 +    public IWorkerResult resizeWorker(String supervisorId, String assignmentId, String stormId, Long port, String workerId, WorkerResources resources) {
 +        return null;
 +    }
 +
 +    protected String jlp(String stormRoot, Map conf) {
 +        String resourceRoot = stormRoot + Utils.FILE_PATH_SEPARATOR + ConfigUtils.RESOURCES_SUBDIR;
 +        String os = System.getProperty("os.name").replaceAll("\\s+", "_");
 +        String arch = System.getProperty("os.arch");
 +        String archResourceRoot = resourceRoot + Utils.FILE_PATH_SEPARATOR + os + "-" + arch;
 +        String ret = archResourceRoot + Utils.CLASS_PATH_SEPARATOR + resourceRoot + Utils.CLASS_PATH_SEPARATOR + conf.get(Config.JAVA_LIBRARY_PATH);
 +        return ret;
 +    }
 +
 +    protected String getWorkerClassPath(String stormJar, Map stormConf) {
 +        List<String> topoClasspath = new ArrayList<>();
 +        Object object = stormConf.get(Config.TOPOLOGY_CLASSPATH);
 +
 +        if (object instanceof List) {
 +            topoClasspath.addAll((List<String>) object);
 +        } else if (object instanceof String) {
 +            topoClasspath.add((String) object);
 +        } else {
 +            LOG.error("topology specific classpath is invaild");
 +        }
 +        String classPath = Utils.workerClasspath();
 +        String classAddPath = Utils.addToClasspath(classPath, Arrays.asList(stormJar));
 +        return Utils.addToClasspath(classAddPath, topoClasspath);
 +    }
 +
 +    /**
 +     * "Generates runtime childopts by replacing keys with topology-id, worker-id, port, mem-onheap"
 +     *
 +     * @param value
 +     * @param workerId
 +     * @param stormId
 +     * @param port
 +     * @param memOnheap
 +     */
 +    public List<String> substituteChildopts(Object value, String workerId, String stormId, Long port, int memOnheap) {
 +        List<String> rets = new ArrayList<>();
 +        if (value instanceof String) {
 +            String string = (String) value;
 +            if (StringUtils.isNotBlank(string)){
 +                string = string.replace("%ID%", String.valueOf(port));
 +                string = string.replace("%WORKER-ID%", workerId);
 +                string = string.replace("%TOPOLOGY-ID%", stormId);
 +                string = string.replace("%WORKER-PORT%", String.valueOf(port));
 +                string = string.replace("%HEAP-MEM%", String.valueOf(memOnheap));
 +                String[] strings = string.split("\\s+");
 +                rets.addAll(Arrays.asList(strings));
 +            }
 +
 +        } else if (value instanceof List) {
 +            List<Object> objects = (List<Object>) value;
 +            for (Object object : objects) {
 +                String str = (String) object;
 +                if (StringUtils.isNotBlank(str)){
 +                    str = str.replace("%ID%", String.valueOf(port));
 +                    str = str.replace("%WORKER-ID%", workerId);
 +                    str = str.replace("%TOPOLOGY-ID%", stormId);
 +                    str = str.replace("%WORKER-PORT%", String.valueOf(port));
 +                    str = str.replace("%HEAP-MEM%", String.valueOf(memOnheap));
 +                    rets.add(str);
 +                }
 +            }
 +        }
 +        return rets;
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/storm/blob/f03b8bec/storm-core/src/jvm/org/apache/storm/utils/Utils.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/f03b8bec/storm-core/test/clj/org/apache/storm/supervisor_test.clj
----------------------------------------------------------------------
diff --cc storm-core/test/clj/org/apache/storm/supervisor_test.clj
index 8f11f8a,ade1c2f..0d6603d
--- a/storm-core/test/clj/org/apache/storm/supervisor_test.clj
+++ b/storm-core/test/clj/org/apache/storm/supervisor_test.clj
@@@ -728,7 -689,7 +728,7 @@@
                            (launchProcessImpl [& _] nil))]
          (with-open [_ (UtilsInstaller. utils-proxy)]
            (is (try
-                 (SupervisorUtils/processLauncher {} nil (ArrayList.) {} nil nil nil)
 -                (supervisor/worker-launcher {} nil "")
++                (SupervisorUtils/processLauncher {} nil nil (ArrayList.) {} nil nil nil)
                  false
                  (catch Throwable t
                    (and (re-matches #"(?i).*user cannot be blank.*" (.getMessage t))