You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/04/01 16:07:44 UTC
[21/35] storm git commit: Merge branch 'master' into supervisor and
update supervisor based STORM-1631&STORM-1636
Merge branch 'master' into supervisor and update supervisor based STORM-1631&STORM-1636
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f03b8bec
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f03b8bec
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f03b8bec
Branch: refs/heads/master
Commit: f03b8bec105e88282211bf3e7dd4be4aeed484d8
Parents: 42928c2 2886737
Author: xiaojian.fxj <xi...@alibaba-inc.com>
Authored: Wed Mar 23 13:53:00 2016 +0800
Committer: xiaojian.fxj <xi...@alibaba-inc.com>
Committed: Wed Mar 23 14:40:22 2016 +0800
----------------------------------------------------------------------
.gitignore | 1 +
CHANGELOG.md | 18 +-
README.markdown | 4 +-
docs/Acking-framework-implementation.md | 36 +
docs/Clojure-DSL.md | 266 +
docs/Command-line-client.md | 104 +
docs/Common-patterns.md | 100 +
docs/Concepts.md | 115 +
docs/Configuration.md | 31 +
docs/Contributing-to-Storm.md | 33 +
docs/Creating-a-new-Storm-project.md | 25 +
docs/DSLs-and-multilang-adapters.md | 10 +
docs/Daemon-Fault-Tolerance.md | 30 +
...Defining-a-non-jvm-language-dsl-for-storm.md | 38 +
docs/Distributed-RPC.md | 199 +
docs/Documentation.md | 50 +
docs/FAQ.md | 127 +
docs/Fault-tolerance.md | 28 +
docs/Guaranteeing-message-processing.md | 181 +
docs/Hooks.md | 9 +
docs/Implementation-docs.md | 13 +
docs/Installing-native-dependencies.md | 38 +
docs/Kestrel-and-Storm.md | 200 +
docs/Lifecycle-of-a-topology.md | 82 +
docs/Local-mode.md | 29 +
docs/Logs.md | 30 +
docs/Maven.md | 22 +
docs/Message-passing-implementation.md | 30 +
docs/Metrics.md | 36 +
docs/Multilang-protocol.md | 287 +
docs/Pacemaker.md | 113 +
docs/Powered-By.md | 1028 +++
docs/Project-ideas.md | 6 +
docs/README.md | 61 +
docs/Rationale.md | 33 +
docs/Resource_Aware_Scheduler_overview.md | 232 +
...unning-topologies-on-a-production-cluster.md | 77 +
docs/SECURITY.md | 478 ++
docs/STORM-UI-REST-API.md | 1017 +++
docs/Serialization-(prior-to-0.6.0).md | 50 +
docs/Serialization.md | 62 +
docs/Serializers.md | 4 +
docs/Setting-up-a-Storm-cluster.md | 117 +
docs/Setting-up-a-Storm-project-in-Eclipse.md | 1 +
docs/Setting-up-development-environment.md | 33 +
docs/Spout-implementations.md | 10 +
docs/State-checkpointing.md | 160 +
...guage-protocol-(versions-0.7.0-and-below).md | 122 +
docs/Structure-of-the-codebase.md | 134 +
docs/Support-for-non-java-languages.md | 9 +
docs/Transactional-topologies.md | 361 +
docs/Trident-API-Overview.md | 525 ++
docs/Trident-spouts.md | 44 +
docs/Trident-state.md | 331 +
docs/Trident-tutorial.md | 254 +
docs/Troubleshooting.md | 182 +
docs/Tutorial.md | 320 +
...nding-the-parallelism-of-a-Storm-topology.md | 123 +
docs/Using-non-JVM-languages-with-Storm.md | 53 +
docs/Windowing.md | 239 +
docs/_config.yml | 18 +
docs/_includes/footer.html | 55 +
docs/_includes/head.html | 34 +
docs/_includes/header.html | 59 +
docs/_layouts/about.html | 43 +
docs/_layouts/default.html | 18 +
docs/_layouts/documentation.html | 9 +
docs/_layouts/page.html | 5 +
docs/_layouts/post.html | 61 +
docs/_plugins/releases.rb | 84 +
docs/_sass/_syntax-highlighting.scss | 70 +
docs/assets/css/bootstrap-theme.css | 470 ++
docs/assets/css/bootstrap-theme.css.map | 1 +
docs/assets/css/bootstrap-theme.min.css | 5 +
docs/assets/css/bootstrap.css | 6800 ++++++++++++++++++
docs/assets/css/bootstrap.css.map | 1 +
docs/assets/css/bootstrap.min.css | 5 +
docs/assets/css/font-awesome.min.css | 4 +
docs/assets/css/main.scss | 48 +
docs/assets/css/owl.carousel.css | 71 +
docs/assets/css/owl.theme.css | 79 +
docs/assets/css/style.css | 503 ++
docs/assets/css/theme.css | 18 +
docs/assets/favicon.ico | Bin 0 -> 1150 bytes
.../fonts/glyphicons-halflings-regular.eot | Bin 0 -> 20335 bytes
.../fonts/glyphicons-halflings-regular.svg | 229 +
.../fonts/glyphicons-halflings-regular.ttf | Bin 0 -> 41280 bytes
.../fonts/glyphicons-halflings-regular.woff | Bin 0 -> 23320 bytes
docs/assets/js/bootstrap.js | 2320 ++++++
docs/assets/js/bootstrap.min.js | 7 +
docs/assets/js/jquery.min.js | 6 +
docs/assets/js/npm.js | 13 +
docs/assets/js/owl.carousel.min.js | 47 +
docs/assets/js/storm.js | 67 +
docs/cgroups_in_storm.md | 71 +
docs/css/style.css | 553 ++
docs/distcache-blobstore.md | 740 ++
docs/dynamic-log-level-settings.md | 45 +
docs/dynamic-worker-profiling.md | 37 +
docs/favicon.ico | Bin 0 -> 1150 bytes
docs/flux.md | 835 +++
docs/images/ack_tree.png | Bin 0 -> 31463 bytes
docs/images/architecture.png | Bin 0 -> 69825 bytes
docs/images/architecture.svg | 1458 ++++
docs/images/batched-stream.png | Bin 0 -> 66336 bytes
docs/images/bolt.png | Bin 0 -> 24796 bytes
docs/images/bolt.svg | 743 ++
docs/images/bullet.gif | Bin 0 -> 82 bytes
docs/images/download.png | Bin 0 -> 16272 bytes
docs/images/drpc-workflow.png | Bin 0 -> 66199 bytes
docs/images/dynamic_log_level_settings_1.png | Bin 0 -> 93689 bytes
docs/images/dynamic_log_level_settings_2.png | Bin 0 -> 78785 bytes
docs/images/dynamic_profiling_debugging_1.png | Bin 0 -> 56876 bytes
docs/images/dynamic_profiling_debugging_2.png | Bin 0 -> 99164 bytes
docs/images/dynamic_profiling_debugging_3.png | Bin 0 -> 96974 bytes
docs/images/dynamic_profiling_debugging_4.png | Bin 0 -> 121994 bytes
docs/images/eclipse-project-properties.png | Bin 0 -> 80810 bytes
docs/images/example-of-a-running-topology.png | Bin 0 -> 81430 bytes
docs/images/footer-bg.png | Bin 0 -> 138 bytes
docs/images/grouping.png | Bin 0 -> 39701 bytes
docs/images/hdfs_blobstore.png | Bin 0 -> 82180 bytes
docs/images/header-bg.png | Bin 0 -> 470 bytes
docs/images/incubator-logo.png | Bin 0 -> 11651 bytes
docs/images/ld-library-path-eclipse-linux.png | Bin 0 -> 114597 bytes
docs/images/loading.gif | Bin 0 -> 12150 bytes
docs/images/local_blobstore.png | Bin 0 -> 81212 bytes
docs/images/logo.png | Bin 0 -> 26889 bytes
docs/images/logos/aeris.jpg | Bin 0 -> 7420 bytes
docs/images/logos/alibaba.jpg | Bin 0 -> 10317 bytes
docs/images/logos/bai.jpg | Bin 0 -> 10026 bytes
docs/images/logos/cerner.jpg | Bin 0 -> 7244 bytes
docs/images/logos/flipboard.jpg | Bin 0 -> 8318 bytes
docs/images/logos/fullcontact.jpg | Bin 0 -> 6172 bytes
docs/images/logos/groupon.jpg | Bin 0 -> 9849 bytes
docs/images/logos/health-market-science.jpg | Bin 0 -> 6509 bytes
docs/images/logos/images.png | Bin 0 -> 7339 bytes
docs/images/logos/infochimp.jpg | Bin 0 -> 5290 bytes
docs/images/logos/klout.jpg | Bin 0 -> 7251 bytes
docs/images/logos/loggly.jpg | Bin 0 -> 9258 bytes
docs/images/logos/ooyala.jpg | Bin 0 -> 5675 bytes
docs/images/logos/parc.png | Bin 0 -> 13720 bytes
docs/images/logos/premise.jpg | Bin 0 -> 5391 bytes
docs/images/logos/qiy.jpg | Bin 0 -> 7441 bytes
docs/images/logos/quicklizard.jpg | Bin 0 -> 7382 bytes
docs/images/logos/rocketfuel.jpg | Bin 0 -> 10007 bytes
docs/images/logos/rubicon.jpg | Bin 0 -> 7120 bytes
docs/images/logos/spider.jpg | Bin 0 -> 6265 bytes
docs/images/logos/spotify.jpg | Bin 0 -> 6445 bytes
docs/images/logos/taobao.jpg | Bin 0 -> 16814 bytes
docs/images/logos/the-weather-channel.jpg | Bin 0 -> 13295 bytes
docs/images/logos/twitter.jpg | Bin 0 -> 7139 bytes
docs/images/logos/verisign.jpg | Bin 0 -> 5982 bytes
docs/images/logos/webmd.jpg | Bin 0 -> 8226 bytes
docs/images/logos/wego.jpg | Bin 0 -> 6836 bytes
docs/images/logos/yahoo-japan.jpg | Bin 0 -> 10350 bytes
docs/images/logos/yahoo.png | Bin 0 -> 13067 bytes
docs/images/logos/yelp.jpg | Bin 0 -> 7220 bytes
docs/images/mailinglist.png | Bin 0 -> 4245 bytes
docs/images/nimbus_ha_blobstore.png | Bin 0 -> 113991 bytes
.../nimbus_ha_leader_election_and_failover.png | Bin 0 -> 154316 bytes
docs/images/nimbus_ha_topology_submission.png | Bin 0 -> 134180 bytes
...onships-worker-processes-executors-tasks.png | Bin 0 -> 54804 bytes
docs/images/search-a-topology.png | Bin 0 -> 671031 bytes
docs/images/search-for-a-single-worker-log.png | Bin 0 -> 736579 bytes
docs/images/security.png | Bin 0 -> 72415 bytes
docs/images/security.svg | 1779 +++++
docs/images/spout-vs-state.png | Bin 0 -> 24804 bytes
docs/images/spout.png | Bin 0 -> 22911 bytes
docs/images/spout.svg | 833 +++
docs/images/storm-cluster.png | Bin 0 -> 34604 bytes
docs/images/storm-flow.png | Bin 0 -> 59688 bytes
docs/images/storm-sql-internal-example.png | Bin 0 -> 28377 bytes
docs/images/storm-sql-internal-workflow.png | Bin 0 -> 20020 bytes
docs/images/storm.svg | 1326 ++++
docs/images/storm_header.png | Bin 0 -> 17291 bytes
docs/images/storm_logo_tagline_color.png | Bin 0 -> 33568 bytes
docs/images/top_bg.gif | Bin 0 -> 113 bytes
docs/images/topology-tasks.png | Bin 0 -> 45960 bytes
docs/images/topology.png | Bin 0 -> 23147 bytes
docs/images/topology.svg | 1044 +++
docs/images/topology_dark.png | Bin 0 -> 49692 bytes
docs/images/topology_dark.svg | 1101 +++
docs/images/transactional-batches.png | Bin 0 -> 23293 bytes
docs/images/transactional-commit-flow.png | Bin 0 -> 17725 bytes
docs/images/transactional-design-2.png | Bin 0 -> 13537 bytes
docs/images/transactional-spout-structure.png | Bin 0 -> 25067 bytes
docs/images/trident-to-storm1.png | Bin 0 -> 67173 bytes
docs/images/trident-to-storm2.png | Bin 0 -> 68943 bytes
docs/images/tuple-dag.png | Bin 0 -> 18849 bytes
docs/images/tuple_tree.png | Bin 0 -> 58186 bytes
docs/images/ui_topology_viz.png | Bin 0 -> 112831 bytes
docs/index.md | 81 +
docs/nimbus-ha-design.md | 222 +
docs/storm-eventhubs.md | 40 +
docs/storm-hbase.md | 241 +
docs/storm-hdfs.md | 368 +
docs/storm-hive.md | 111 +
docs/storm-jdbc.md | 285 +
docs/storm-kafka.md | 287 +
.../storm-metrics-profiling-internal-actions.md | 70 +
docs/storm-redis.md | 258 +
docs/storm-solr.md | 184 +
docs/storm-sql-internal.md | 55 +
docs/storm-sql.md | 97 +
.../storm/starter/spout/RandomIntegerSpout.java | 15 +-
.../src/jvm/storm/starter/StatefulTopology.java | 1 +
external/storm-kafka/README.md | 1 -
.../apache/storm/kafka/PartitionManager.java | 12 +-
external/storm-mongodb/README.md | 195 +
external/storm-mongodb/pom.xml | 74 +
.../storm/mongodb/bolt/AbstractMongoBolt.java | 56 +
.../storm/mongodb/bolt/MongoInsertBolt.java | 62 +
.../storm/mongodb/bolt/MongoUpdateBolt.java | 75 +
.../storm/mongodb/common/MongoDBClient.java | 91 +
.../mongodb/common/QueryFilterCreator.java | 38 +
.../common/SimpleQueryFilterCreator.java | 39 +
.../mongodb/common/mapper/MongoMapper.java | 38 +
.../common/mapper/SimpleMongoMapper.java | 40 +
.../common/mapper/SimpleMongoUpdateMapper.java | 41 +
.../storm/mongodb/trident/state/MongoState.java | 97 +
.../trident/state/MongoStateFactory.java | 42 +
.../trident/state/MongoStateUpdater.java | 34 +
.../storm/mongodb/topology/InsertWordCount.java | 81 +
.../storm/mongodb/topology/UpdateWordCount.java | 91 +
.../storm/mongodb/topology/WordCounter.java | 67 +
.../storm/mongodb/topology/WordSpout.java | 88 +
.../storm/mongodb/trident/WordCountTrident.java | 85 +
log4j2/cluster.xml | 15 -
log4j2/worker.xml | 15 +
pom.xml | 1 +
.../src/clj/org/apache/storm/clojure.clj | 3 +
.../clj/org/apache/storm/command/heartbeats.clj | 5 +-
.../src/clj/org/apache/storm/converter.clj | 46 +-
.../org/apache/storm/daemon/builtin_metrics.clj | 33 +-
.../clj/org/apache/storm/daemon/executor.clj | 33 +-
.../src/clj/org/apache/storm/daemon/nimbus.clj | 117 +-
.../src/clj/org/apache/storm/daemon/task.clj | 10 +-
.../src/clj/org/apache/storm/daemon/worker.clj | 19 +-
.../clj/org/apache/storm/internal/clojure.clj | 3 +
.../apache/storm/scheduler/DefaultScheduler.clj | 80 -
.../apache/storm/scheduler/EvenScheduler.clj | 98 -
.../storm/scheduler/IsolationScheduler.clj | 4 +-
storm-core/src/clj/org/apache/storm/stats.clj | 1568 ----
storm-core/src/clj/org/apache/storm/testing.clj | 8 +-
.../clj/org/apache/storm/trident/testing.clj | 12 +-
storm-core/src/clj/org/apache/storm/ui/core.clj | 71 +-
storm-core/src/clj/org/apache/storm/util.clj | 11 -
.../org/apache/storm/blobstore/BlobStore.java | 5 +
.../storm/cluster/IStormClusterState.java | 2 +
.../storm/cluster/StormClusterStateImpl.java | 33 +-
.../container/ResourceIsolationInterface.java | 8 +
.../storm/container/cgroup/CgroupManager.java | 16 +-
.../storm/coordination/CoordinatedBolt.java | 4 +
.../src/jvm/org/apache/storm/daemon/Acker.java | 18 +-
.../org/apache/storm/daemon/StormCommon.java | 4 +
.../daemon/supervisor/SupervisorUtils.java | 7 +-
.../supervisor/timer/RunProfilerActions.java | 10 +-
.../workermanager/DefaultWorkerManager.java | 14 +-
.../storm/scheduler/DefaultScheduler.java | 111 +
.../apache/storm/scheduler/EvenScheduler.java | 168 +
.../apache/storm/scheduler/TopologyDetails.java | 3 +-
.../apache/storm/stats/BoltExecutorStats.java | 105 +
.../jvm/org/apache/storm/stats/CommonStats.java | 112 +
.../apache/storm/stats/SpoutExecutorStats.java | 79 +
.../jvm/org/apache/storm/stats/StatsUtil.java | 2441 +++++++
.../org/apache/storm/task/IOutputCollector.java | 1 +
.../org/apache/storm/task/OutputCollector.java | 11 +
.../storm/topology/BasicOutputCollector.java | 10 +
.../topology/CheckpointTupleForwarder.java | 22 +-
.../ComponentConfigurationDeclarer.java | 5 +-
.../storm/topology/IBasicOutputCollector.java | 2 +
.../apache/storm/topology/IStatefulBolt.java | 7 +-
.../apache/storm/topology/ResourceDeclarer.java | 28 +
.../storm/topology/StatefulBoltExecutor.java | 46 +-
.../apache/storm/topology/TopologyBuilder.java | 5 +-
.../jvm/org/apache/storm/trident/Stream.java | 31 +-
.../org/apache/storm/trident/TridentState.java | 27 +-
.../apache/storm/trident/TridentTopology.java | 91 +-
.../org/apache/storm/trident/graph/Group.java | 22 +-
.../operation/DefaultResourceDeclarer.java | 66 +
.../trident/operation/ITridentResource.java | 32 +
.../org/apache/storm/trident/planner/Node.java | 5 +-
.../trident/topology/TridentBoltExecutor.java | 4 +
.../jvm/org/apache/storm/utils/ConfigUtils.java | 10 +-
.../src/jvm/org/apache/storm/utils/Utils.java | 38 +-
.../org/apache/storm/integration_test.clj | 53 +-
.../apache/storm/trident/integration_test.clj | 110 +-
.../apache/storm/messaging/netty_unit_test.clj | 14 +-
.../test/clj/org/apache/storm/nimbus_test.clj | 152 +-
.../clj/org/apache/storm/scheduler_test.clj | 34 +-
.../apache/storm/security/auth/auth_test.clj | 15 +-
.../storm/security/auth/drpc_auth_test.clj | 15 +-
.../storm/security/auth/nimbus_auth_test.clj | 15 +-
.../clj/org/apache/storm/supervisor_test.clj | 2 +-
.../apache/storm/blobstore/BlobStoreTest.java | 171 +-
.../cluster/StormClusterStateImplTest.java | 116 +
.../apache/storm/localizer/LocalizerTest.java | 7 +-
.../topology/StatefulBoltExecutorTest.java | 1 +
storm-dist/binary/src/main/assembly/binary.xml | 14 +
299 files changed, 37982 insertions(+), 2220 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/f03b8bec/storm-core/src/clj/org/apache/storm/testing.clj
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/f03b8bec/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java
index bb2525a,0000000..a567956
mode 100644,000000..100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java
@@@ -1,268 -1,0 +1,271 @@@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.storm.Config;
+import org.apache.storm.ProcessSimulator;
+import org.apache.storm.generated.LSWorkerHeartbeat;
+import org.apache.storm.localizer.LocalResource;
+import org.apache.storm.localizer.Localizer;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URLDecoder;
+import java.util.*;
+
+public class SupervisorUtils {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SupervisorUtils.class);
+
+ private static final SupervisorUtils INSTANCE = new SupervisorUtils();
+ private static SupervisorUtils _instance = INSTANCE;
+ public static void setInstance(SupervisorUtils u) {
+ _instance = u;
+ }
+ public static void resetInstance() {
+ _instance = INSTANCE;
+ }
+
- public static Process processLauncher(Map conf, String user, List<String> args, Map<String, String> environment, final String logPreFix,
++ public static Process processLauncher(Map conf, String user, List<String> commandPrefix, List<String> args, Map<String, String> environment, final String logPreFix,
+ final Utils.ExitCodeCallable exitCodeCallback, File dir) throws IOException {
+ if (StringUtils.isBlank(user)) {
+ throw new IllegalArgumentException("User cannot be blank when calling processLauncher.");
+ }
+ String wlinitial = (String) (conf.get(Config.SUPERVISOR_WORKER_LAUNCHER));
+ String stormHome = ConfigUtils.concatIfNotNull(System.getProperty("storm.home"));
+ String wl;
+ if (StringUtils.isNotBlank(wlinitial)) {
+ wl = wlinitial;
+ } else {
+ wl = stormHome + "/bin/worker-launcher";
+ }
+ List<String> commands = new ArrayList<>();
++ if (commandPrefix != null){
++ commands.addAll(commandPrefix);
++ }
+ commands.add(wl);
+ commands.add(user);
+ commands.addAll(args);
+ LOG.info("Running as user: {} command: {}", user, commands);
+ return Utils.launchProcess(commands, environment, logPreFix, exitCodeCallback, dir);
+ }
+
+ public static int processLauncherAndWait(Map conf, String user, List<String> args, final Map<String, String> environment, final String logPreFix)
+ throws IOException {
+ int ret = 0;
- Process process = processLauncher(conf, user, args, environment, logPreFix, null, null);
++ Process process = processLauncher(conf, user, null, args, environment, logPreFix, null, null);
+ if (StringUtils.isNotBlank(logPreFix))
+ Utils.readAndLogStream(logPreFix, process.getInputStream());
+ try {
+ process.waitFor();
+ } catch (InterruptedException e) {
+ LOG.info("{} interrupted.", logPreFix);
+ }
+ ret = process.exitValue();
+ return ret;
+ }
+
+ public static void setupStormCodeDir(Map conf, Map stormConf, String dir) throws IOException {
+ if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
+ String logPrefix = "setup conf for " + dir;
+ List<String> commands = new ArrayList<>();
+ commands.add("code-dir");
+ commands.add(dir);
+ processLauncherAndWait(conf, (String) (stormConf.get(Config.TOPOLOGY_SUBMITTER_USER)), commands, null, logPrefix);
+ }
+ }
+
+ public static void rmrAsUser(Map conf, String id, String path) throws IOException {
+ String user = Utils.getFileOwner(path);
+ String logPreFix = "rmr " + id;
+ List<String> commands = new ArrayList<>();
+ commands.add("rmr");
+ commands.add(path);
+ SupervisorUtils.processLauncherAndWait(conf, user, commands, null, logPreFix);
+ if (Utils.checkFileExists(path)) {
+ throw new RuntimeException(path + " was not deleted.");
+ }
+ }
+
+ /**
+ * Given the blob information returns the value of the uncompress field, handling it either being a string or a boolean value, or if it's not specified then
+ * returns false
+ *
+ * @param blobInfo
+ * @return
+ */
+ public static Boolean shouldUncompressBlob(Map<String, Object> blobInfo) {
+ return Utils.getBoolean(blobInfo.get("uncompress"), false);
+ }
+
+ /**
+ * Returns a list of LocalResources based on the blobstore-map passed in
+ *
+ * @param blobstoreMap
+ * @return
+ */
+ public static List<LocalResource> blobstoreMapToLocalresources(Map<String, Map<String, Object>> blobstoreMap) {
+ List<LocalResource> localResourceList = new ArrayList<>();
+ if (blobstoreMap != null) {
+ for (Map.Entry<String, Map<String, Object>> map : blobstoreMap.entrySet()) {
+ LocalResource localResource = new LocalResource(map.getKey(), shouldUncompressBlob(map.getValue()));
+ localResourceList.add(localResource);
+ }
+ }
+ return localResourceList;
+ }
+
+ /**
+ * For each of the downloaded topologies, adds references to the blobs that the topologies are using. This is used to reconstruct the cache on restart.
+ *
+ * @param localizer
+ * @param stormId
+ * @param conf
+ */
+ public static void addBlobReferences(Localizer localizer, String stormId, Map conf) throws IOException {
+ Map stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId);
+ Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String, Object>>) stormConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
+ String user = (String) stormConf.get(Config.TOPOLOGY_SUBMITTER_USER);
+ String topoName = (String) stormConf.get(Config.TOPOLOGY_NAME);
+ List<LocalResource> localresources = SupervisorUtils.blobstoreMapToLocalresources(blobstoreMap);
+ if (blobstoreMap != null) {
+ localizer.addReferences(localresources, user, topoName);
+ }
+ }
+
+ public static Set<String> readDownLoadedStormIds(Map conf) throws IOException {
+ Set<String> stormIds = new HashSet<>();
+ String path = ConfigUtils.supervisorStormDistRoot(conf);
+ Collection<String> rets = Utils.readDirContents(path);
+ for (String ret : rets) {
+ stormIds.add(URLDecoder.decode(ret));
+ }
+ return stormIds;
+ }
+
+ public static Collection<String> supervisorWorkerIds(Map conf) {
+ String workerRoot = ConfigUtils.workerRoot(conf);
+ return Utils.readDirContents(workerRoot);
+ }
+
+ public static boolean doRequiredTopoFilesExist(Map conf, String stormId) throws IOException {
+ String stormroot = ConfigUtils.supervisorStormDistRoot(conf, stormId);
+ String stormjarpath = ConfigUtils.supervisorStormJarPath(stormroot);
+ String stormcodepath = ConfigUtils.supervisorStormCodePath(stormroot);
+ String stormconfpath = ConfigUtils.supervisorStormConfPath(stormroot);
+ if (!Utils.checkFileExists(stormroot))
+ return false;
+ if (!Utils.checkFileExists(stormcodepath))
+ return false;
+ if (!Utils.checkFileExists(stormconfpath))
+ return false;
+ if (ConfigUtils.isLocalMode(conf) || Utils.checkFileExists(stormjarpath))
+ return true;
+ return false;
+ }
+
+ /**
+ * map from worker id to heartbeat
+ *
+ * @param conf
+ * @return
+ * @throws Exception
+ */
+ public static Map<String, LSWorkerHeartbeat> readWorkerHeartbeats(Map conf) throws Exception {
+ return _instance.readWorkerHeartbeatsImpl(conf);
+ }
+
+ public Map<String, LSWorkerHeartbeat> readWorkerHeartbeatsImpl(Map conf) throws Exception {
+ Map<String, LSWorkerHeartbeat> workerHeartbeats = new HashMap<>();
+
+ Collection<String> workerIds = SupervisorUtils.supervisorWorkerIds(conf);
+
+ for (String workerId : workerIds) {
+ LSWorkerHeartbeat whb = readWorkerHeartbeat(conf, workerId);
+ // ATTENTION: whb can be null
+ workerHeartbeats.put(workerId, whb);
+ }
+ return workerHeartbeats;
+ }
+
+
+ /**
+ * get worker heartbeat by workerId
+ *
+ * @param conf
+ * @param workerId
+ * @return
+ * @throws IOException
+ */
+ public static LSWorkerHeartbeat readWorkerHeartbeat(Map conf, String workerId) {
+ return _instance.readWorkerHeartbeatImpl(conf, workerId);
+ }
+
+ public LSWorkerHeartbeat readWorkerHeartbeatImpl(Map conf, String workerId) {
+ try {
+ LocalState localState = ConfigUtils.workerState(conf, workerId);
+ return localState.getWorkerHeartBeat();
+ } catch (Exception e) {
+ LOG.warn("Failed to read local heartbeat for workerId : {},Ignoring exception.", workerId, e);
+ return null;
+ }
+ }
+
+ public static boolean isWorkerHbTimedOut(int now, LSWorkerHeartbeat whb, Map conf) {
+ return _instance.isWorkerHbTimedOutImpl(now, whb, conf);
+ }
+
+ public boolean isWorkerHbTimedOutImpl(int now, LSWorkerHeartbeat whb, Map conf) {
+ boolean result = false;
+ if ((now - whb.get_time_secs()) > Utils.getInt(conf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS))) {
+ result = true;
+ }
+ return result;
+ }
+
+ public static String javaCmd(String cmd) {
+ return _instance.javaCmdImpl(cmd);
+ }
+
+ public String javaCmdImpl(String cmd) {
+ String ret = null;
+ String javaHome = System.getenv().get("JAVA_HOME");
+ if (StringUtils.isNotBlank(javaHome)) {
+ ret = javaHome + Utils.FILE_PATH_SEPARATOR + "bin" + Utils.FILE_PATH_SEPARATOR + cmd;
+ } else {
+ ret = cmd;
+ }
+ return ret;
+ }
+
+ public final static List<ACL> supervisorZkAcls() {
+ final List<ACL> acls = new ArrayList<>();
+ acls.add(ZooDefs.Ids.CREATOR_ALL_ACL.get(0));
+ acls.add(new ACL((ZooDefs.Perms.READ ^ ZooDefs.Perms.CREATE), ZooDefs.Ids.ANYONE_ID_UNSAFE));
+ return acls;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/f03b8bec/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java
index ec29855,0000000..6b294f2
mode 100644,000000..100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java
@@@ -1,221 -1,0 +1,221 @@@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.daemon.supervisor.timer;
+
+import org.apache.storm.Config;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.daemon.supervisor.SupervisorData;
+import org.apache.storm.daemon.supervisor.SupervisorUtils;
+import org.apache.storm.generated.ProfileAction;
+import org.apache.storm.generated.ProfileRequest;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.*;
+
+public class RunProfilerActions implements Runnable {
+ private static Logger LOG = LoggerFactory.getLogger(RunProfilerActions.class);
+
+ private Map conf;
+ private IStormClusterState stormClusterState;
+ private String hostName;
+
+ private String profileCmd;
+
+ private SupervisorData supervisorData;
+
+ private class ActionExitCallback implements Utils.ExitCodeCallable {
+ private String stormId;
+ private ProfileRequest profileRequest;
+ private String logPrefix;
+
+ public ActionExitCallback(String stormId, ProfileRequest profileRequest, String logPrefix) {
+ this.stormId = stormId;
+ this.profileRequest = profileRequest;
+ this.logPrefix = logPrefix;
+ }
+
+ @Override
+ public Object call() throws Exception {
+ return null;
+ }
+
+ @Override
+ public Object call(int exitCode) {
+ LOG.info("{} profile-action exited for {}", logPrefix, exitCode);
+ try {
+ stormClusterState.deleteTopologyProfileRequests(stormId, profileRequest);
+ } catch (Exception e) {
+ LOG.warn("failed delete profileRequest: " + profileRequest);
+ }
+ return null;
+ }
+ }
+
+ public RunProfilerActions(SupervisorData supervisorData) {
+ this.conf = supervisorData.getConf();
+ this.stormClusterState = supervisorData.getStormClusterState();
+ this.hostName = supervisorData.getHostName();
+ this.profileCmd = (String) (conf.get(Config.WORKER_PROFILER_COMMAND));
+ this.supervisorData = supervisorData;
+ }
+
+ @Override
+ public void run() {
+ Map<String, List<ProfileRequest>> stormIdToActions = supervisorData.getStormIdToProfileActions().get();
+ try {
+ for (Map.Entry<String, List<ProfileRequest>> entry : stormIdToActions.entrySet()) {
+ String stormId = entry.getKey();
+ List<ProfileRequest> requests = entry.getValue();
+ if (requests != null) {
+ for (ProfileRequest profileRequest : requests) {
+ if (profileRequest.get_nodeInfo().get_node().equals(hostName)) {
+ boolean stop = System.currentTimeMillis() > profileRequest.get_time_stamp() ? true : false;
+ Long port = profileRequest.get_nodeInfo().get_port().iterator().next();
+ String targetDir = ConfigUtils.workerArtifactsRoot(conf, String.valueOf(port));
+ Map stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId);
+
+ String user = null;
+ if (stormConf.get(Config.TOPOLOGY_SUBMITTER_USER) != null) {
+ user = (String) (stormConf.get(Config.TOPOLOGY_SUBMITTER_USER));
+ }
+ Map<String, String> env = null;
+ if (stormConf.get(Config.TOPOLOGY_ENVIRONMENT) != null) {
+ env = (Map<String, String>) stormConf.get(Config.TOPOLOGY_ENVIRONMENT);
+ } else {
+ env = new HashMap<String, String>();
+ }
+
+ String str = ConfigUtils.workerArtifactsPidPath(conf, stormId, port.intValue());
+ StringBuilder stringBuilder = new StringBuilder();
+ FileReader reader = null;
+ BufferedReader br = null;
+ try {
+ reader = new FileReader(str);
+ br = new BufferedReader(reader);
+ int c;
+ while ((c = br.read()) >= 0) {
+ stringBuilder.append(c);
+ }
+ } catch (IOException e) {
+ if (reader != null)
+ reader.close();
+ if (br != null)
+ br.close();
+ }
+ String workerPid = stringBuilder.toString().trim();
+ ProfileAction profileAction = profileRequest.get_action();
+ String logPrefix = "ProfilerAction process " + stormId + ":" + port + " PROFILER_ACTION: " + profileAction + " ";
+
+ // Until PROFILER_STOP action is invalid, keep launching profiler start in case worker restarted
+ // The profiler plugin script validates if JVM is recording before starting another recording.
+ String command = mkCommand(profileAction, stop, workerPid, targetDir);
+ List<String> listCommand = new ArrayList<>();
+ if (command != null) {
+ listCommand.addAll(Arrays.asList(command.split(" ")));
+ }
+ try {
+ ActionExitCallback actionExitCallback = new ActionExitCallback(stormId, profileRequest, logPrefix);
+ launchProfilerActionForWorker(user, targetDir, listCommand, env, actionExitCallback, logPrefix);
+ } catch (IOException e) {
+ LOG.error("Error in processing ProfilerAction '{}' for {}:{}, will retry later", profileAction, stormId, port);
+ } catch (RuntimeException e) {
+ LOG.error("Error in processing ProfilerAction '{}' for {}:{}, will retry later", profileAction, stormId, port);
+ }
+ }
+ }
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("Error running profiler actions, will retry again later");
+ }
+ }
+
+ private void launchProfilerActionForWorker(String user, String targetDir, List<String> commands, Map<String, String> environment,
+ final Utils.ExitCodeCallable exitCodeCallable, String logPrefix) throws IOException {
+ File targetFile = new File(targetDir);
+ if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
+ LOG.info("Running as user:{} command:{}", user, commands);
+ String containerFile = Utils.containerFilePath(targetDir);
+ if (Utils.checkFileExists(containerFile)) {
+ SupervisorUtils.rmrAsUser(conf, containerFile, containerFile);
+ }
+ String scriptFile = Utils.scriptFilePath(targetDir);
+ if (Utils.checkFileExists(scriptFile)) {
+ SupervisorUtils.rmrAsUser(conf, scriptFile, scriptFile);
+ }
+ String script = Utils.writeScript(targetDir, commands, environment);
- List<String> newCommands = new ArrayList<>();
- newCommands.add("profiler");
- newCommands.add(targetDir);
- newCommands.add(script);
- SupervisorUtils.processLauncher(conf, user, newCommands, environment, logPrefix, exitCodeCallable, targetFile);
++ List<String> args = new ArrayList<>();
++ args.add("profiler");
++ args.add(targetDir);
++ args.add(script);
++ SupervisorUtils.processLauncher(conf, user, null, args, environment, logPrefix, exitCodeCallable, targetFile);
+ } else {
+ Utils.launchProcess(commands, environment, logPrefix, exitCodeCallable, targetFile);
+ }
+ }
+
+ private String mkCommand(ProfileAction action, boolean stop, String workerPid, String targetDir) {
+ if (action == ProfileAction.JMAP_DUMP) {
+ return jmapDumpCmd(workerPid, targetDir);
+ } else if (action == ProfileAction.JSTACK_DUMP) {
+ return jstackDumpCmd(workerPid, targetDir);
+ } else if (action == ProfileAction.JPROFILE_DUMP) {
+ return jprofileDump(workerPid, targetDir);
+ } else if (action == ProfileAction.JVM_RESTART) {
+ return jprofileJvmRestart(workerPid);
+ } else if (!stop && action == ProfileAction.JPROFILE_STOP) {
+ return jprofileStart(workerPid);
+ } else if (stop && action == ProfileAction.JPROFILE_STOP) {
+ return jprofileStop(workerPid, targetDir);
+ }
+ return null;
+ }
+
+ private String jmapDumpCmd(String pid, String targetDir) {
+ return profileCmd + " " + pid + " jmap " + targetDir;
+ }
+
+ private String jstackDumpCmd(String pid, String targetDir) {
+ return profileCmd + " " + pid + " jstack " + targetDir;
+ }
+
+ private String jprofileStart(String pid) {
+ return profileCmd + " " + pid + " start";
+ }
+
+ private String jprofileStop(String pid, String targetDir) {
+ return profileCmd + " " + pid + " stop " + targetDir;
+ }
+
+ private String jprofileDump(String pid, String targetDir) {
+ return profileCmd + " " + pid + " dump " + targetDir;
+ }
+
+ private String jprofileJvmRestart(String pid) {
+ return profileCmd + " " + pid + " kill";
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/f03b8bec/storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/DefaultWorkerManager.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/DefaultWorkerManager.java
index a73a9bd,0000000..9529b1a
mode 100644,000000..100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/DefaultWorkerManager.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/DefaultWorkerManager.java
@@@ -1,402 -1,0 +1,408 @@@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor.workermanager;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.storm.Config;
+import org.apache.storm.ProcessSimulator;
+import org.apache.storm.container.cgroup.CgroupManager;
+import org.apache.storm.daemon.supervisor.SupervisorUtils;
+import org.apache.storm.generated.WorkerResources;
+import org.apache.storm.localizer.Localizer;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+
+public class DefaultWorkerManager implements IWorkerManager {
+
+ private static Logger LOG = LoggerFactory.getLogger(DefaultWorkerManager.class);
+
+ private Map conf;
+ private CgroupManager resourceIsolationManager;
+ private boolean runWorkerAsUser;
+
+ @Override
+ public void prepareWorker(Map conf, Localizer localizer) {
+ this.conf = conf;
+ if (Utils.getBoolean(conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN_ENABLE), false)) {
+ try {
+ this.resourceIsolationManager = Utils.newInstance((String) conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN));
+ this.resourceIsolationManager.prepare(conf);
+ LOG.info("Using resource isolation plugin {} {}", conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN), resourceIsolationManager);
+ } catch (IOException e) {
+ throw Utils.wrapInRuntime(e);
+ }
+ } else {
+ this.resourceIsolationManager = null;
+ }
+ this.runWorkerAsUser = Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false);
+ }
+
+ @Override
+ public IWorkerResult launchWorker(String supervisorId, String assignmentId, String stormId, Long port, String workerId, WorkerResources resources,
+ Utils.ExitCodeCallable workerExitCallback) {
+ try {
+
+ String stormHome = ConfigUtils.concatIfNotNull(System.getProperty("storm.home"));
+ String stormOptions = ConfigUtils.concatIfNotNull(System.getProperty("storm.options"));
+ String stormConfFile = ConfigUtils.concatIfNotNull(System.getProperty("storm.conf.file"));
+ String workerTmpDir = ConfigUtils.workerTmpRoot(conf, workerId);
+
+ String stormLogDir = ConfigUtils.getLogDir();
+ String stormLogConfDir = (String) (conf.get(Config.STORM_LOG4J2_CONF_DIR));
+
+ String stormLog4j2ConfDir;
+ if (StringUtils.isNotBlank(stormLogConfDir)) {
+ if (Utils.isAbsolutePath(stormLogConfDir)) {
+ stormLog4j2ConfDir = stormLogConfDir;
+ } else {
+ stormLog4j2ConfDir = stormHome + Utils.FILE_PATH_SEPARATOR + stormLogConfDir;
+ }
+ } else {
+ stormLog4j2ConfDir = stormHome + Utils.FILE_PATH_SEPARATOR + "log4j2";
+ }
+
+ String stormRoot = ConfigUtils.supervisorStormDistRoot(conf, stormId);
+
+ String jlp = jlp(stormRoot, conf);
+
+ String stormJar = ConfigUtils.supervisorStormJarPath(stormRoot);
+
+ Map stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId);
+
+ String workerClassPath = getWorkerClassPath(stormJar, stormConf);
+
+ Object topGcOptsObject = stormConf.get(Config.TOPOLOGY_WORKER_GC_CHILDOPTS);
+ List<String> topGcOpts = new ArrayList<>();
+ if (topGcOptsObject instanceof String) {
+ topGcOpts.add((String) topGcOptsObject);
+ } else if (topGcOptsObject instanceof List) {
+ topGcOpts.addAll((List<String>) topGcOptsObject);
+ }
+
+ int memOnheap = 0;
+ if (resources.get_mem_on_heap() > 0) {
+ memOnheap = (int) Math.ceil(resources.get_mem_on_heap());
+ } else {
+ // set the default heap memory size for supervisor-test
+ memOnheap = Utils.getInt(stormConf.get(Config.WORKER_HEAP_MEMORY_MB), 768);
+ }
+
+ int memoffheap = (int) Math.ceil(resources.get_mem_off_heap());
+
+ int cpu = (int) Math.ceil(resources.get_cpu());
+
+ List<String> gcOpts = null;
+
+ if (topGcOpts.size() > 0) {
+ gcOpts = substituteChildopts(topGcOpts, workerId, stormId, port, memOnheap);
+ } else {
+ gcOpts = substituteChildopts(conf.get(Config.WORKER_GC_CHILDOPTS), workerId, stormId, port, memOnheap);
+ }
+
+ Object topoWorkerLogwriterObject = stormConf.get(Config.TOPOLOGY_WORKER_LOGWRITER_CHILDOPTS);
+ List<String> topoWorkerLogwriterChildopts = new ArrayList<>();
+ if (topoWorkerLogwriterObject instanceof String) {
+ topoWorkerLogwriterChildopts.add((String) topoWorkerLogwriterObject);
+ } else if (topoWorkerLogwriterObject instanceof List) {
+ topoWorkerLogwriterChildopts.addAll((List<String>) topoWorkerLogwriterObject);
+ }
+
+ String user = (String) stormConf.get(Config.TOPOLOGY_SUBMITTER_USER);
+
+ String logfileName = "worker.log";
+
+ String workersArtifacets = ConfigUtils.workerArtifactsRoot(conf);
+
+ String loggingSensitivity = (String) stormConf.get(Config.TOPOLOGY_LOGGING_SENSITIVITY);
+ if (loggingSensitivity == null) {
+ loggingSensitivity = "S3";
+ }
+
+ List<String> workerChildopts = substituteChildopts(conf.get(Config.WORKER_CHILDOPTS), workerId, stormId, port, memOnheap);
+
+ List<String> topWorkerChildopts = substituteChildopts(stormConf.get(Config.TOPOLOGY_WORKER_CHILDOPTS), workerId, stormId, port, memOnheap);
+
+ List<String> workerProfilerChildopts = null;
+ if (Utils.getBoolean(conf.get(Config.WORKER_PROFILER_ENABLED), false)) {
+ workerProfilerChildopts = substituteChildopts(conf.get(Config.WORKER_PROFILER_CHILDOPTS), workerId, stormId, port, memOnheap);
+ } else {
+ workerProfilerChildopts = new ArrayList<>();
+ }
+
+ Map<String, String> topEnvironment = new HashMap<String, String>();
+ Map<String, String> environment = (Map<String, String>) stormConf.get(Config.TOPOLOGY_ENVIRONMENT);
+ if (environment != null) {
+ topEnvironment.putAll(environment);
+ }
+ topEnvironment.put("LD_LIBRARY_PATH", jlp);
+
+ String log4jConfigurationFile = null;
+ if (System.getProperty("os.name").startsWith("Windows") && !stormLog4j2ConfDir.startsWith("file:")) {
+ log4jConfigurationFile = "file:///" + stormLog4j2ConfDir;
+ } else {
+ log4jConfigurationFile = stormLog4j2ConfDir;
+ }
+ log4jConfigurationFile = log4jConfigurationFile + Utils.FILE_PATH_SEPARATOR + "worker.xml";
+
+ List<String> commandList = new ArrayList<>();
+ commandList.add(SupervisorUtils.javaCmd("java"));
+ commandList.add("-cp");
+ commandList.add(workerClassPath);
+ commandList.addAll(topoWorkerLogwriterChildopts);
+ commandList.add("-Dlogfile.name=" + logfileName);
+ commandList.add("-Dstorm.home=" + stormHome);
+ commandList.add("-Dworkers.artifacts=" + workersArtifacets);
+ commandList.add("-Dstorm.id=" + stormId);
+ commandList.add("-Dworker.id=" + workerId);
+ commandList.add("-Dworker.port=" + port);
+ commandList.add("-Dstorm.log.dir=" + stormLogDir);
+ commandList.add("-Dlog4j.configurationFile=" + log4jConfigurationFile);
+ commandList.add("-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector");
+ commandList.add("org.apache.storm.LogWriter");
+
+ commandList.add(SupervisorUtils.javaCmd("java"));
+ commandList.add("-server");
+ commandList.addAll(workerChildopts);
+ commandList.addAll(topWorkerChildopts);
+ commandList.addAll(gcOpts);
+ commandList.addAll(workerProfilerChildopts);
+ commandList.add("-Djava.library.path=" + jlp);
+ commandList.add("-Dlogfile.name=" + logfileName);
+ commandList.add("-Dstorm.home=" + stormHome);
+ commandList.add("-Dworkers.artifacts=" + workersArtifacets);
+ commandList.add("-Dstorm.conf.file=" + stormConfFile);
+ commandList.add("-Dstorm.options=" + stormOptions);
+ commandList.add("-Dstorm.log.dir=" + stormLogDir);
+ commandList.add("-Djava.io.tmpdir=" + workerTmpDir);
+ commandList.add("-Dlogging.sensitivity=" + loggingSensitivity);
+ commandList.add("-Dlog4j.configurationFile=" + log4jConfigurationFile);
+ commandList.add("-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector");
+ commandList.add("-Dstorm.id=" + stormId);
+ commandList.add("-Dworker.id=" + workerId);
+ commandList.add("-Dworker.port=" + port);
+ commandList.add("-cp");
+ commandList.add(workerClassPath);
+ commandList.add("org.apache.storm.daemon.worker");
+ commandList.add(stormId);
+ commandList.add(assignmentId);
+ commandList.add(String.valueOf(port));
+ commandList.add(workerId);
+
+ // {"cpu" cpu "memory" (+ mem-onheap mem-offheap (int (Math/ceil (conf STORM-CGROUP-MEMORY-LIMIT-TOLERANCE-MARGIN-MB))))
+ if (resourceIsolationManager != null) {
+ int cGroupMem = (int) (Math.ceil((double) conf.get(Config.STORM_CGROUP_MEMORY_LIMIT_TOLERANCE_MARGIN_MB)));
+ int memoryValue = memoffheap + memOnheap + cGroupMem;
+ int cpuValue = cpu;
+ Map<String, Number> map = new HashMap<>();
+ map.put("cpu", cpuValue);
+ map.put("memory", memoryValue);
+ resourceIsolationManager.reserveResourcesForWorker(workerId, map);
+ commandList = resourceIsolationManager.getLaunchCommand(workerId, commandList);
+ }
+
+ LOG.info("Launching worker with command: {}. ", Utils.shellCmd(commandList));
+
+ String logPrefix = "Worker Process " + workerId;
+ String workerDir = ConfigUtils.workerRoot(conf, workerId);
+
+ if (runWorkerAsUser) {
+ List<String> args = new ArrayList<>();
+ args.add("worker");
+ args.add(workerDir);
+ args.add(Utils.writeScript(workerDir, commandList, topEnvironment));
- SupervisorUtils.processLauncher(conf, user, args, null, logPrefix, workerExitCallback, new File(workerDir));
++ List<String> commandPrefix = null;
++ if (resourceIsolationManager != null)
++ commandPrefix = resourceIsolationManager.getLaunchCommandPrefix(workerId);
++ SupervisorUtils.processLauncher(conf, user, commandPrefix, args, null, logPrefix, workerExitCallback, new File(workerDir));
+ } else {
+ Utils.launchProcess(commandList, topEnvironment, logPrefix, workerExitCallback, new File(workerDir));
+ }
+ } catch (IOException e) {
+ throw Utils.wrapInRuntime(e);
+ }
+ return null;
+ }
+
+ @Override
+ public IWorkerResult shutdownWorker(String supervisorId, String workerId, Map<String, String> workerThreadPids) {
+ try {
+ LOG.info("Shutting down {}:{}", supervisorId, workerId);
+ Collection<String> pids = Utils.readDirContents(ConfigUtils.workerPidsRoot(conf, workerId));
+ Integer shutdownSleepSecs = Utils.getInt(conf.get(Config.SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS));
+ String user = ConfigUtils.getWorkerUser(conf, workerId);
+ String threadPid = workerThreadPids.get(workerId);
+ if (StringUtils.isNotBlank(threadPid)) {
+ ProcessSimulator.killProcess(threadPid);
+ }
+
+ for (String pid : pids) {
+ if (runWorkerAsUser) {
+ List<String> commands = new ArrayList<>();
+ commands.add("signal");
+ commands.add(pid);
+ commands.add("15");
+ String logPrefix = "kill -15 " + pid;
+ SupervisorUtils.processLauncherAndWait(conf, user, commands, null, logPrefix);
+ } else {
+ Utils.killProcessWithSigTerm(pid);
+ }
+ }
+
+ if (pids.size() > 0) {
+ LOG.info("Sleep {} seconds for execution of cleanup threads on worker.", shutdownSleepSecs);
+ Time.sleepSecs(shutdownSleepSecs);
+ }
+
+ for (String pid : pids) {
+ if (runWorkerAsUser) {
+ List<String> commands = new ArrayList<>();
+ commands.add("signal");
+ commands.add(pid);
+ commands.add("9");
+ String logPrefix = "kill -9 " + pid;
+ SupervisorUtils.processLauncherAndWait(conf, user, commands, null, logPrefix);
+ } else {
+ Utils.forceKillProcess(pid);
+ }
+ String path = ConfigUtils.workerPidPath(conf, workerId, pid);
+ if (runWorkerAsUser) {
+ SupervisorUtils.rmrAsUser(conf, workerId, path);
+ } else {
+ try {
+ LOG.debug("Removing path {}", path);
+ new File(path).delete();
+ } catch (Exception e) {
+ // on windows, the supervisor may still holds the lock on the worker directory
+ // ignore
+ }
+ }
+ }
+ LOG.info("Shut down {}:{}", supervisorId, workerId);
+ } catch (Exception e) {
+ throw Utils.wrapInRuntime(e);
+ }
+ return null;
+ }
+
+ @Override
+ public boolean cleanupWorker(String workerId) {
+ try {
++ //clean up for resource isolation if enabled
++ if (resourceIsolationManager != null) {
++ resourceIsolationManager.releaseResourcesForWorker(workerId);
++ }
++ //Always make sure to clean up everything else before worker directory
++ //is removed since that is what is going to trigger the retry for cleanup
+ String workerRoot = ConfigUtils.workerRoot(conf, workerId);
+ if (Utils.checkFileExists(workerRoot)) {
+ if (runWorkerAsUser) {
+ SupervisorUtils.rmrAsUser(conf, workerId, workerRoot);
+ } else {
+ Utils.forceDelete(ConfigUtils.workerHeartbeatsRoot(conf, workerId));
+ Utils.forceDelete(ConfigUtils.workerPidsRoot(conf, workerId));
+ Utils.forceDelete(ConfigUtils.workerTmpRoot(conf, workerId));
+ Utils.forceDelete(ConfigUtils.workerRoot(conf, workerId));
+ }
+ ConfigUtils.removeWorkerUserWSE(conf, workerId);
+ }
- if (resourceIsolationManager != null) {
- resourceIsolationManager.releaseResourcesForWorker(workerId);
- }
+ return true;
+ } catch (IOException e) {
+ LOG.warn("Failed to cleanup worker {}. Will retry later", workerId, e);
+ } catch (RuntimeException e) {
+ LOG.warn("Failed to cleanup worker {}. Will retry later", workerId, e);
+ }
+ return false;
+ }
+
+ @Override
+ public IWorkerResult resizeWorker(String supervisorId, String assignmentId, String stormId, Long port, String workerId, WorkerResources resources) {
+ return null;
+ }
+
+ protected String jlp(String stormRoot, Map conf) {
+ String resourceRoot = stormRoot + Utils.FILE_PATH_SEPARATOR + ConfigUtils.RESOURCES_SUBDIR;
+ String os = System.getProperty("os.name").replaceAll("\\s+", "_");
+ String arch = System.getProperty("os.arch");
+ String archResourceRoot = resourceRoot + Utils.FILE_PATH_SEPARATOR + os + "-" + arch;
+ String ret = archResourceRoot + Utils.CLASS_PATH_SEPARATOR + resourceRoot + Utils.CLASS_PATH_SEPARATOR + conf.get(Config.JAVA_LIBRARY_PATH);
+ return ret;
+ }
+
+ protected String getWorkerClassPath(String stormJar, Map stormConf) {
+ List<String> topoClasspath = new ArrayList<>();
+ Object object = stormConf.get(Config.TOPOLOGY_CLASSPATH);
+
+ if (object instanceof List) {
+ topoClasspath.addAll((List<String>) object);
+ } else if (object instanceof String) {
+ topoClasspath.add((String) object);
+ } else {
+ LOG.error("topology specific classpath is invaild");
+ }
+ String classPath = Utils.workerClasspath();
+ String classAddPath = Utils.addToClasspath(classPath, Arrays.asList(stormJar));
+ return Utils.addToClasspath(classAddPath, topoClasspath);
+ }
+
+ /**
+ * "Generates runtime childopts by replacing keys with topology-id, worker-id, port, mem-onheap"
+ *
+ * @param value
+ * @param workerId
+ * @param stormId
+ * @param port
+ * @param memOnheap
+ */
+ public List<String> substituteChildopts(Object value, String workerId, String stormId, Long port, int memOnheap) {
+ List<String> rets = new ArrayList<>();
+ if (value instanceof String) {
+ String string = (String) value;
+ if (StringUtils.isNotBlank(string)){
+ string = string.replace("%ID%", String.valueOf(port));
+ string = string.replace("%WORKER-ID%", workerId);
+ string = string.replace("%TOPOLOGY-ID%", stormId);
+ string = string.replace("%WORKER-PORT%", String.valueOf(port));
+ string = string.replace("%HEAP-MEM%", String.valueOf(memOnheap));
+ String[] strings = string.split("\\s+");
+ rets.addAll(Arrays.asList(strings));
+ }
+
+ } else if (value instanceof List) {
+ List<Object> objects = (List<Object>) value;
+ for (Object object : objects) {
+ String str = (String) object;
+ if (StringUtils.isNotBlank(str)){
+ str = str.replace("%ID%", String.valueOf(port));
+ str = str.replace("%WORKER-ID%", workerId);
+ str = str.replace("%TOPOLOGY-ID%", stormId);
+ str = str.replace("%WORKER-PORT%", String.valueOf(port));
+ str = str.replace("%HEAP-MEM%", String.valueOf(memOnheap));
+ rets.add(str);
+ }
+ }
+ }
+ return rets;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/f03b8bec/storm-core/src/jvm/org/apache/storm/utils/Utils.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/f03b8bec/storm-core/test/clj/org/apache/storm/supervisor_test.clj
----------------------------------------------------------------------
diff --cc storm-core/test/clj/org/apache/storm/supervisor_test.clj
index 8f11f8a,ade1c2f..0d6603d
--- a/storm-core/test/clj/org/apache/storm/supervisor_test.clj
+++ b/storm-core/test/clj/org/apache/storm/supervisor_test.clj
@@@ -728,7 -689,7 +728,7 @@@
(launchProcessImpl [& _] nil))]
(with-open [_ (UtilsInstaller. utils-proxy)]
(is (try
- (SupervisorUtils/processLauncher {} nil (ArrayList.) {} nil nil nil)
- (supervisor/worker-launcher {} nil "")
++ (SupervisorUtils/processLauncher {} nil nil (ArrayList.) {} nil nil nil)
false
(catch Throwable t
(and (re-matches #"(?i).*user cannot be blank.*" (.getMessage t))