You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/09/19 21:01:36 UTC
[7/8] storm git commit: STORM-2018: Supervisor V2
STORM-2018: Supervisor V2
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/5a320461
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/5a320461
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/5a320461
Branch: refs/heads/master
Commit: 5a3204610734871f18ba38a29b271cfb814ff1bc
Parents: 4ce6f04
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Fri Aug 5 16:15:21 2016 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Mon Sep 19 15:57:57 2016 -0500
----------------------------------------------------------------------
conf/defaults.yaml | 4 -
log4j2/cluster.xml | 2 +-
.../apache/storm/daemon/local_supervisor.clj | 48 +-
.../src/clj/org/apache/storm/daemon/nimbus.clj | 5 +-
.../src/clj/org/apache/storm/daemon/worker.clj | 9 +-
storm-core/src/clj/org/apache/storm/testing.clj | 53 +-
storm-core/src/jvm/org/apache/storm/Config.java | 17 +-
.../org/apache/storm/cluster/IStateStorage.java | 9 +-
.../storm/cluster/IStormClusterState.java | 2 +-
.../storm/cluster/PaceMakerStateStorage.java | 2 +-
.../storm/cluster/StormClusterStateImpl.java | 18 +-
.../org/apache/storm/cluster/VersionedData.java | 36 +
.../apache/storm/cluster/ZKStateStorage.java | 2 +-
.../org/apache/storm/command/KillWorkers.java | 33 +-
.../container/ResourceIsolationInterface.java | 21 +-
.../storm/container/cgroup/CgroupCommon.java | 6 +-
.../container/cgroup/CgroupCommonOperation.java | 2 +-
.../storm/container/cgroup/CgroupManager.java | 37 +-
.../storm/daemon/supervisor/AdvancedFSOps.java | 335 +++++++
.../storm/daemon/supervisor/BasicContainer.java | 658 +++++++++++++
.../supervisor/BasicContainerLauncher.java | 62 ++
.../storm/daemon/supervisor/Container.java | 549 +++++++++++
.../daemon/supervisor/ContainerLauncher.java | 104 +++
.../supervisor/ContainerRecoveryException.java | 29 +
.../daemon/supervisor/ExitCodeCallback.java | 30 +
.../storm/daemon/supervisor/Killable.java | 50 +
.../storm/daemon/supervisor/LocalContainer.java | 85 ++
.../supervisor/LocalContainerLauncher.java | 60 ++
.../daemon/supervisor/ReadClusterState.java | 327 +++++++
.../daemon/supervisor/RunAsUserContainer.java | 100 ++
.../supervisor/RunAsUserContainerLauncher.java | 60 ++
.../apache/storm/daemon/supervisor/Slot.java | 785 ++++++++++++++++
.../apache/storm/daemon/supervisor/State.java | 22 -
.../storm/daemon/supervisor/StateHeartbeat.java | 45 -
.../storm/daemon/supervisor/Supervisor.java | 362 ++++++--
.../daemon/supervisor/SupervisorDaemon.java | 28 -
.../storm/daemon/supervisor/SupervisorData.java | 234 -----
.../daemon/supervisor/SupervisorManager.java | 92 --
.../daemon/supervisor/SupervisorUtils.java | 170 ++--
.../daemon/supervisor/SyncProcessEvent.java | 448 ---------
.../daemon/supervisor/SyncSupervisorEvent.java | 612 ------------
.../supervisor/timer/RunProfilerActions.java | 211 -----
.../supervisor/timer/SupervisorHealthCheck.java | 27 +-
.../supervisor/timer/SupervisorHeartbeat.java | 30 +-
.../daemon/supervisor/timer/UpdateBlobs.java | 16 +-
.../workermanager/DefaultWorkerManager.java | 429 ---------
.../workermanager/IWorkerManager.java | 35 -
.../apache/storm/localizer/AsyncLocalizer.java | 432 +++++++++
.../org/apache/storm/localizer/ILocalizer.java | 70 ++
.../localizer/LocalDownloadedResource.java | 146 +++
.../LocalizedResourceRetentionSet.java | 2 +-
.../storm/localizer/LocalizedResourceSet.java | 8 +-
.../org/apache/storm/localizer/Localizer.java | 38 +-
.../jvm/org/apache/storm/task/ShellBolt.java | 7 +-
.../org/apache/storm/testing/FeederSpout.java | 29 +-
.../apache/storm/trident/util/TridentUtils.java | 2 +-
.../jvm/org/apache/storm/utils/ConfigUtils.java | 86 +-
.../org/apache/storm/utils/InprocMessaging.java | 69 +-
.../src/jvm/org/apache/storm/utils/Time.java | 26 +-
.../src/jvm/org/apache/storm/utils/Utils.java | 219 +----
.../org/apache/storm/zookeeper/Zookeeper.java | 17 +-
.../org/apache/storm/integration_test.clj | 67 --
.../test/clj/org/apache/storm/metrics_test.clj | 4 +-
.../test/clj/org/apache/storm/nimbus_test.clj | 10 +-
.../storm/security/auth/drpc_auth_test.clj | 36 +-
.../clj/org/apache/storm/supervisor_test.clj | 926 -------------------
.../test/jvm/org/apache/storm/TestCgroups.java | 3 +-
.../daemon/supervisor/BasicContainerTest.java | 484 ++++++++++
.../storm/daemon/supervisor/ContainerTest.java | 269 ++++++
.../storm/daemon/supervisor/SlotTest.java | 515 +++++++++++
.../storm/executor/error/ReportErrorTest.java | 76 ++
.../storm/localizer/AsyncLocalizerTest.java | 187 ++++
.../LocalizedResourceRetentionSetTest.java | 10 +-
.../localizer/LocalizedResourceSetTest.java | 12 +-
74 files changed, 6163 insertions(+), 3888 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index 8392936..2a6f18d 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -296,10 +296,6 @@ storm.resource.isolation.plugin: "org.apache.storm.container.cgroup.CgroupManage
# If storm.resource.isolation.plugin.enable is set to false the unit tests for cgroups will not run
storm.resource.isolation.plugin.enable: false
-
-# Default plugin to use for manager worker
-storm.supervisor.worker.manager.plugin: org.apache.storm.daemon.supervisor.workermanager.DefaultWorkerManager
-
# Configs for CGroup support
storm.cgroup.hierarchy.dir: "/cgroup/storm_resources"
storm.cgroup.resources:
http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/log4j2/cluster.xml
----------------------------------------------------------------------
diff --git a/log4j2/cluster.xml b/log4j2/cluster.xml
index e911823..a0ba3d1 100644
--- a/log4j2/cluster.xml
+++ b/log4j2/cluster.xml
@@ -18,7 +18,7 @@
<configuration monitorInterval="60">
<properties>
- <property name="pattern">%d{yyyy-MM-dd HH:mm:ss.SSS} %c{1.} [%p] %msg%n</property>
+ <property name="pattern">%d{yyyy-MM-dd HH:mm:ss.SSS} %t %c{1.} [%p] %msg%n</property>
</properties>
<appenders>
<RollingFile name="A1" immediateFlush="false"
http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj b/storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj
index 560ae3e..a02a8e2 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj
@@ -14,51 +14,15 @@
;; See the License for the specific language governing permissions and
;; limitations under the License.
(ns org.apache.storm.daemon.local-supervisor
- (:import [org.apache.storm.daemon.supervisor SyncProcessEvent SupervisorData Supervisor SupervisorUtils]
- [org.apache.storm.utils Utils ConfigUtils]
- [org.apache.storm ProcessSimulator])
- (:use [org.apache.storm.daemon common]
- [org.apache.storm log])
- (:require [org.apache.storm.daemon [worker :as worker] ])
- (:require [clojure.string :as str])
+ (:import [org.apache.storm.daemon.supervisor Supervisor]
+ [org.apache.storm.utils ConfigUtils])
+ (:use [org.apache.storm.daemon common])
(:gen-class))
-(defn launch-local-worker [supervisorData stormId port workerId resources]
- (let [conf (.getConf supervisorData)
- pid (Utils/uuid)
- worker (worker/mk-worker conf
- (.getSharedContext supervisorData)
- stormId
- (.getAssignmentId supervisorData)
- (int port)
- workerId)]
- (ConfigUtils/setWorkerUserWSE conf workerId "")
- (ProcessSimulator/registerProcess pid worker)
- (.put (.getWorkerThreadPids supervisorData) workerId pid)))
-
-(defn shutdown-local-worker [supervisorData worker-manager workerId]
- (log-message "shutdown-local-worker")
- (let [supervisor-id (.getSupervisorId supervisorData)
- worker-pids (.getWorkerThreadPids supervisorData)
- dead-workers (.getDeadWorkers supervisorData)]
- (.shutdownWorker worker-manager supervisor-id workerId worker-pids)
- (if (.cleanupWorker worker-manager workerId)
- (.remove dead-workers workerId))))
-
-(defn local-process []
- "Create a local process event"
- (proxy [SyncProcessEvent] []
- (launchLocalWorker [supervisorData stormId port workerId resources]
- (launch-local-worker supervisorData stormId port workerId resources))
- (killWorker [supervisorData worker-manager workerId] (shutdown-local-worker supervisorData worker-manager workerId))))
-
-
(defserverfn mk-local-supervisor [conf shared-context isupervisor]
- (log-message "Starting local Supervisor with conf " conf)
(if (not (ConfigUtils/isLocalMode conf))
(throw
(IllegalArgumentException. "Cannot start server in distrubuted mode!")))
- (let [local-process (local-process)
- supervisor-server (Supervisor.)]
- (.setLocalSyncProcess supervisor-server local-process)
- (.mkSupervisor supervisor-server conf shared-context isupervisor)))
\ No newline at end of file
+ (let [supervisor-server (Supervisor. conf shared-context isupervisor)]
+ (.launch supervisor-server)
+ supervisor-server))
http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
index 76b3917..1a83c0f 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
@@ -1616,7 +1616,7 @@
:all-components all-components
:launch-time-secs launch-time-secs
:assignment assignment
- :beats beats
+ :beats (or beats {})
:topology topology
:task->component task->component
:base base}))
@@ -1995,7 +1995,8 @@
executor-summaries (dofor [[executor [node port]] (:executor->node+port assignment)]
(let [host (-> assignment :node->host (get node))
heartbeat (.get beats (StatsUtil/convertExecutor executor))
- hb (if heartbeat (.get heartbeat "heartbeat"))
+ heartbeat (or heartbeat {})
+ hb (.get heartbeat "heartbeat")
excutorstats (if hb (.get hb "stats"))
excutorstats (if excutorstats
(StatsUtil/thriftifyExecutorStats excutorstats))]
http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/clj/org/apache/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/worker.clj b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
index 222de36..6b165b2 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
@@ -36,13 +36,14 @@
(:import [org.apache.storm.messaging TransportFactory])
(:import [org.apache.storm.messaging TaskMessage IContext IConnection ConnectionWithStatus ConnectionWithStatus$Status DeserializingConnectionCallback])
(:import [org.apache.storm.daemon Shutdownable StormCommon DaemonCommon])
+ (:import [org.apache.storm.daemon.supervisor AdvancedFSOps])
(:import [org.apache.storm.serialization KryoTupleSerializer])
(:import [org.apache.storm.generated StormTopology LSWorkerHeartbeat])
(:import [org.apache.storm.tuple AddressedTuple Fields])
(:import [org.apache.storm.task WorkerTopologyContext])
(:import [org.apache.storm Constants])
(:import [org.apache.storm.security.auth AuthUtils])
- (:import [org.apache.storm.cluster ClusterStateContext DaemonType ZKStateStorage StormClusterStateImpl ClusterUtils IStateStorage])
+ (:import [org.apache.storm.cluster ClusterStateContext DaemonType ZKStateStorage StormClusterStateImpl ClusterUtils])
(:import [javax.security.auth Subject])
(:import [java.security PrivilegedExceptionAction])
(:import [org.apache.logging.log4j LogManager])
@@ -272,8 +273,8 @@
receive-queue-map (->> executor-receive-queue-map
(mapcat (fn [[e queue]] (for [t (executor->tasks e)] [t queue])))
(into {}))
-
- topology (ConfigUtils/readSupervisorTopology conf storm-id)
+ ops (AdvancedFSOps/make conf)
+ topology (ConfigUtils/readSupervisorTopology conf storm-id ops)
mq-context (if mq-context
mq-context
(TransportFactory/makeContext storm-conf))]
@@ -404,7 +405,7 @@
assignment (if (= version (:version (get @(:assignment-versions worker) storm-id)))
(:data (get @(:assignment-versions worker) storm-id))
(let [thriftify-assignment-version (.assignmentInfoWithVersion storm-cluster-state storm-id callback)
- new-assignment {:data (clojurify-assignment (.get thriftify-assignment-version (IStateStorage/DATA))) :version version}]
+ new-assignment {:data (clojurify-assignment (.getData thriftify-assignment-version)) :version version}]
(swap! (:assignment-versions worker) assoc storm-id new-assignment)
(:data new-assignment)))
my-assignment (-> assignment
http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/clj/org/apache/storm/testing.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/testing.clj b/storm-core/src/clj/org/apache/storm/testing.clj
index 801f9bd..0b5f9f7 100644
--- a/storm-core/src/clj/org/apache/storm/testing.clj
+++ b/storm-core/src/clj/org/apache/storm/testing.clj
@@ -25,7 +25,7 @@
[org.apache.storm.utils]
[org.apache.storm.zookeeper Zookeeper]
[org.apache.storm ProcessSimulator]
- [org.apache.storm.daemon.supervisor StandaloneSupervisor SupervisorData SupervisorManager SupervisorUtils SupervisorManager]
+ [org.apache.storm.daemon.supervisor Supervisor StandaloneSupervisor SupervisorUtils]
[org.apache.storm.executor Executor]
[java.util.concurrent.atomic AtomicBoolean])
(:import [java.io File])
@@ -139,8 +139,7 @@
supervisor-conf (merge (:daemon-conf cluster-map)
conf
{STORM-LOCAL-DIR tmp-dir
- SUPERVISOR-SLOTS-PORTS port-ids
- STORM-SUPERVISOR-WORKER-MANAGER-PLUGIN "org.apache.storm.daemon.supervisor.workermanager.DefaultWorkerManager"})
+ SUPERVISOR-SLOTS-PORTS port-ids})
id-fn (if id id (Utils/uuid))
isupervisor (proxy [StandaloneSupervisor] []
(generateSupervisorId [] id-fn))
@@ -178,18 +177,19 @@
(let [zk-tmp (local-temp-path)
[zk-port zk-handle] (if-not (contains? daemon-conf STORM-ZOOKEEPER-SERVERS)
(Zookeeper/mkInprocessZookeeper zk-tmp nil))
+ nimbus-tmp (local-temp-path)
daemon-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
{TOPOLOGY-SKIP-MISSING-KRYO-REGISTRATIONS true
ZMQ-LINGER-MILLIS 0
TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS false
TOPOLOGY-TRIDENT-BATCH-EMIT-INTERVAL-MILLIS 50
STORM-CLUSTER-MODE "local"
- BLOBSTORE-SUPERUSER (System/getProperty "user.name")}
+ BLOBSTORE-SUPERUSER (System/getProperty "user.name")
+ BLOBSTORE-DIR nimbus-tmp}
(if-not (contains? daemon-conf STORM-ZOOKEEPER-SERVERS)
{STORM-ZOOKEEPER-PORT zk-port
STORM-ZOOKEEPER-SERVERS ["localhost"]})
daemon-conf)
- nimbus-tmp (local-temp-path)
port-counter (mk-counter supervisor-slot-port-min)
nimbus (nimbus/service-handler
(assoc daemon-conf STORM-LOCAL-DIR nimbus-tmp)
@@ -233,7 +233,7 @@
supervisors)]
;; tmp-dir will be taken care of by shutdown
(reset! (:supervisors cluster-map) (remove-first finder-fn supervisors))
- (.shutdown sup)))
+ (.close sup)))
(defn kill-local-storm-cluster [cluster-map]
(.shutdown (:nimbus cluster-map))
@@ -249,7 +249,7 @@
(doseq [s @(:supervisors cluster-map)]
(.shutdownAllWorkers s)
;; race condition here? will it launch the workers again?
- (.shutdown s))
+ (.close s))
(ProcessSimulator/killAllProcesses)
(if (not-nil? (:zookeeper cluster-map))
(do
@@ -285,7 +285,7 @@
([timeout-ms apredicate]
(while-timeout timeout-ms (not (apredicate))
(Time/sleep 100))))
-(defn is-supervisor-waiting [^SupervisorManager supervisor]
+(defn is-supervisor-waiting [^Supervisor supervisor]
(.isWaiting supervisor))
(defn wait-until-cluster-waiting
@@ -395,15 +395,6 @@
executor->node+port)]
(submit-local-topology nimbus storm-name conf topology)))))
-(defn mk-capture-launch-fn [capture-atom]
- (fn [supervisorData stormId port workerId resources]
- (let [conf (.getConf supervisorData)
- supervisorId (.getSupervisorId supervisorData)
- existing (get @capture-atom [supervisorId port] [])]
- (log-message "mk-capture-launch-fn")
- (ConfigUtils/setWorkerUserWSE conf workerId "")
- (swap! capture-atom assoc [supervisorId port] (conj existing stormId)))))
-
(defn find-worker-id
[supervisor-conf port]
(let [supervisor-state (ConfigUtils/supervisorState supervisor-conf)
@@ -430,23 +421,6 @@
(.shutdownWorker worker-manager supervisor-id workerId worker-pids)
(if (.cleanupWorker worker-manager workerId)
(.remove dead-workers workerId)))))
-(defmacro capture-changed-workers
- [& body]
- `(let [launch-captured# (atom {})
- shutdown-captured# (atom {})]
- (with-var-roots [local-supervisor/launch-local-worker (mk-capture-launch-fn launch-captured#)
- local-supervisor/shutdown-local-worker (mk-capture-shutdown-fn shutdown-captured#)]
- ~@body
- {:launched @launch-captured#
- :shutdown @shutdown-captured#})))
-
-(defmacro capture-launched-workers
- [& body]
- `(:launched (capture-changed-workers ~@body)))
-
-(defmacro capture-shutdown-workers
- [& body]
- `(:shutdown (capture-changed-workers ~@body)))
(defnk aggregated-stat
[cluster-map storm-name stat-key :component-ids nil]
@@ -723,13 +697,18 @@
(let [target (+ amt @(:last-spout-emit tracked-topology))
track-id (-> tracked-topology :cluster ::track-id)
waiting? (fn []
- (or (not= target (global-amt track-id "spout-emitted"))
- (not= (global-amt track-id "transferred")
- (global-amt track-id "processed"))))]
+ (let [spout-emitted (global-amt track-id "spout-emitted")
+ transferred (global-amt track-id "transferred")
+ processed (global-amt track-id "processed")]
+ (log-message "emitted " spout-emitted " target " target " transferred " transferred " processed " processed)
+ (or (not= target spout-emitted)
+ (not= transferred
+ processed))))]
(while-timeout timeout-ms (waiting?)
;; (println "Spout emitted: " (global-amt track-id "spout-emitted"))
;; (println "Processed: " (global-amt track-id "processed"))
;; (println "Transferred: " (global-amt track-id "transferred"))
+ (advance-time-secs! 1)
(Thread/sleep (rand-int 200)))
(reset! (:last-spout-emit tracked-topology) target))))
http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/Config.java b/storm-core/src/jvm/org/apache/storm/Config.java
index 43369bd..f38ca0f 100644
--- a/storm-core/src/jvm/org/apache/storm/Config.java
+++ b/storm-core/src/jvm/org/apache/storm/Config.java
@@ -18,7 +18,6 @@
package org.apache.storm;
import org.apache.storm.container.ResourceIsolationInterface;
-import org.apache.storm.daemon.supervisor.workermanager.IWorkerManager;
import org.apache.storm.scheduler.resource.strategies.eviction.IEvictionStrategy;
import org.apache.storm.scheduler.resource.strategies.priority.ISchedulingPriorityStrategy;
import org.apache.storm.scheduler.resource.strategies.scheduling.IStrategy;
@@ -2264,20 +2263,14 @@ public class Config extends HashMap<String, Object> {
* future releases.
*/
@isString
- public static final Object CLIENT_JAR_TRANSFORMER = "client.jartransformer.class";
+ public static final String CLIENT_JAR_TRANSFORMER = "client.jartransformer.class";
/**
* The plugin to be used for resource isolation
*/
@isImplementationOfClass(implementsClass = ResourceIsolationInterface.class)
- public static final Object STORM_RESOURCE_ISOLATION_PLUGIN = "storm.resource.isolation.plugin";
-
- /**
- * The plugin to be used for manager worker
- */
- @isImplementationOfClass(implementsClass = IWorkerManager.class)
- public static final Object STORM_SUPERVISOR_WORKER_MANAGER_PLUGIN = "storm.supervisor.worker.manager.plugin";
+ public static final String STORM_RESOURCE_ISOLATION_PLUGIN = "storm.resource.isolation.plugin";
/**
* CGroup Setting below
@@ -2287,19 +2280,19 @@ public class Config extends HashMap<String, Object> {
* root directory of the storm cgroup hierarchy
*/
@isString
- public static final Object STORM_CGROUP_HIERARCHY_DIR = "storm.cgroup.hierarchy.dir";
+ public static final String STORM_CGROUP_HIERARCHY_DIR = "storm.cgroup.hierarchy.dir";
/**
* resources to to be controlled by cgroups
*/
@isStringList
- public static final Object STORM_CGROUP_RESOURCES = "storm.cgroup.resources";
+ public static final String STORM_CGROUP_RESOURCES = "storm.cgroup.resources";
/**
* name for the cgroup hierarchy
*/
@isString
- public static final Object STORM_CGROUP_HIERARCHY_NAME = "storm.cgroup.hierarchy.name";
+ public static final String STORM_CGROUP_HIERARCHY_NAME = "storm.cgroup.hierarchy.name";
/**
* flag to determine whether to use a resource isolation plugin
http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/cluster/IStateStorage.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/cluster/IStateStorage.java b/storm-core/src/jvm/org/apache/storm/cluster/IStateStorage.java
index 0b6f043..aa731ff 100644
--- a/storm-core/src/jvm/org/apache/storm/cluster/IStateStorage.java
+++ b/storm-core/src/jvm/org/apache/storm/cluster/IStateStorage.java
@@ -41,10 +41,7 @@ import org.apache.zookeeper.data.ACL;
* Never use the same paths with the *_hb* methods as you do with the others.
*/
public interface IStateStorage {
-
- public static final String DATA = "data";
- public static final String VERSION = "version";
-
+
/**
* Registers a callback function that gets called when CuratorEvents happen.
* @param callback is a clojure IFn that accepts the type - translated to
@@ -157,9 +154,9 @@ public interface IStateStorage {
* @param watch Whether or not to set a watch on the path. Watched paths
* emit events which are consumed by functions registered with the
* register method. Very useful for catching updates to nodes.
- * @return An Map in the form {:data data :version version}
+ * @return the data with a version
*/
- Map get_data_with_version(String path, boolean watch);
+ VersionedData<byte[]> get_data_with_version(String path, boolean watch);
/**
* Write a worker heartbeat at the path.
http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/cluster/IStormClusterState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/cluster/IStormClusterState.java b/storm-core/src/jvm/org/apache/storm/cluster/IStormClusterState.java
index b016997..a6f07ed 100644
--- a/storm-core/src/jvm/org/apache/storm/cluster/IStormClusterState.java
+++ b/storm-core/src/jvm/org/apache/storm/cluster/IStormClusterState.java
@@ -29,7 +29,7 @@ public interface IStormClusterState {
public Assignment assignmentInfo(String stormId, Runnable callback);
- public Map assignmentInfoWithVersion(String stormId, Runnable callback);
+ public VersionedData<Assignment> assignmentInfoWithVersion(String stormId, Runnable callback);
public Integer assignmentVersion(String stormId, Runnable callback) throws Exception;
http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/cluster/PaceMakerStateStorage.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/cluster/PaceMakerStateStorage.java b/storm-core/src/jvm/org/apache/storm/cluster/PaceMakerStateStorage.java
index c42bd38..80a398e 100644
--- a/storm-core/src/jvm/org/apache/storm/cluster/PaceMakerStateStorage.java
+++ b/storm-core/src/jvm/org/apache/storm/cluster/PaceMakerStateStorage.java
@@ -104,7 +104,7 @@ public class PaceMakerStateStorage implements IStateStorage {
}
@Override
- public Map get_data_with_version(String path, boolean watch) {
+ public VersionedData<byte[]> get_data_with_version(String path, boolean watch) {
return stateStorage.get_data_with_version(path, watch);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java b/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
index 4f02beb..972d778 100644
--- a/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
+++ b/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
@@ -17,7 +17,6 @@
*/
package org.apache.storm.cluster;
-import clojure.lang.*;
import org.apache.commons.lang.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.state.*;
@@ -164,21 +163,18 @@ public class StormClusterStateImpl implements IStormClusterState {
}
@Override
- public Map assignmentInfoWithVersion(String stormId, Runnable callback) {
- Map map = new HashMap();
+ public VersionedData<Assignment> assignmentInfoWithVersion(String stormId, Runnable callback) {
if (callback != null) {
assignmentInfoWithVersionCallback.put(stormId, callback);
}
Assignment assignment = null;
Integer version = 0;
- Map dataWithVersionMap = stateStorage.get_data_with_version(ClusterUtils.assignmentPath(stormId), callback != null);
- if (dataWithVersionMap != null) {
- assignment = ClusterUtils.maybeDeserialize((byte[]) dataWithVersionMap.get(IStateStorage.DATA), Assignment.class);
- version = (Integer) dataWithVersionMap.get(IStateStorage.VERSION);
- }
- map.put(IStateStorage.DATA, assignment);
- map.put(IStateStorage.VERSION, version);
- return map;
+ VersionedData<byte[]> dataWithVersion = stateStorage.get_data_with_version(ClusterUtils.assignmentPath(stormId), callback != null);
+ if (dataWithVersion != null) {
+ assignment = ClusterUtils.maybeDeserialize(dataWithVersion.getData(), Assignment.class);
+ version = dataWithVersion.getVersion();
+ }
+ return new VersionedData<Assignment>(version, assignment);
}
@Override
http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/cluster/VersionedData.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/cluster/VersionedData.java b/storm-core/src/jvm/org/apache/storm/cluster/VersionedData.java
new file mode 100644
index 0000000..3de2a88
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/cluster/VersionedData.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.cluster;
+
+public class VersionedData<D> {
+ private final int version;
+ private final D data;
+
+ public VersionedData(int version, D data) {
+ this.version = version;
+ this.data = data;
+ }
+
+ public int getVersion() {
+ return version;
+ }
+
+ public D getData() {
+ return data;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorage.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorage.java b/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorage.java
index 87e7dfc..e337b1f 100644
--- a/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorage.java
+++ b/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorage.java
@@ -211,7 +211,7 @@ public class ZKStateStorage implements IStateStorage {
}
@Override
- public Map get_data_with_version(String path, boolean watch) {
+ public VersionedData<byte[]> get_data_with_version(String path, boolean watch) {
return Zookeeper.getDataWithVersion(zkReader, path, watch);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/command/KillWorkers.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/command/KillWorkers.java b/storm-core/src/jvm/org/apache/storm/command/KillWorkers.java
index 87a1459..c59efd7 100644
--- a/storm-core/src/jvm/org/apache/storm/command/KillWorkers.java
+++ b/storm-core/src/jvm/org/apache/storm/command/KillWorkers.java
@@ -17,39 +17,20 @@
*/
package org.apache.storm.command;
+import java.io.File;
+import java.util.Map;
+
import org.apache.storm.Config;
import org.apache.storm.daemon.supervisor.StandaloneSupervisor;
-import org.apache.storm.daemon.supervisor.SupervisorData;
-import org.apache.storm.daemon.supervisor.SupervisorUtils;
-import org.apache.storm.daemon.supervisor.workermanager.IWorkerManager;
+import org.apache.storm.daemon.supervisor.Supervisor;
import org.apache.storm.utils.ConfigUtils;
-import org.eclipse.jetty.util.ConcurrentHashSet;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.util.Collection;
-import java.util.Map;
-
public class KillWorkers {
- private static final Logger LOG = LoggerFactory.getLogger(KillWorkers.class);
-
public static void main(String [] args) throws Exception {
- Map conf = ConfigUtils.readStormConfig();
+ Map<String, Object> conf = ConfigUtils.readStormConfig();
conf.put(Config.STORM_LOCAL_DIR, new File((String)conf.get(Config.STORM_LOCAL_DIR)).getCanonicalPath());
- SupervisorData supervisorData = new SupervisorData(conf, null, new StandaloneSupervisor());
- IWorkerManager workerManager = supervisorData.getWorkerManager();
- Collection<String> workerIds = SupervisorUtils.supervisorWorkerIds(conf);
- String supervisorId = supervisorData.getSupervisorId();
- Map<String, String> workerToThreadPids = supervisorData.getWorkerThreadPids();
- ConcurrentHashSet deadWorkers = supervisorData.getDeadWorkers();
- for (String workerId : workerIds) {
- LOG.info("Killing worker: {} through CLI.", workerId);
- workerManager.shutdownWorker(supervisorId, workerId, workerToThreadPids);
- if (workerManager.cleanupWorker(workerId)) {
- deadWorkers.remove(workerId);
- }
+ try (Supervisor supervisor = new Supervisor(conf, null, new StandaloneSupervisor())) {
+ supervisor.shutdownAllWorkers();
}
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/container/ResourceIsolationInterface.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/container/ResourceIsolationInterface.java b/storm-core/src/jvm/org/apache/storm/container/ResourceIsolationInterface.java
index c5cad02..7bf0249 100644
--- a/storm-core/src/jvm/org/apache/storm/container/ResourceIsolationInterface.java
+++ b/storm-core/src/jvm/org/apache/storm/container/ResourceIsolationInterface.java
@@ -18,20 +18,29 @@
package org.apache.storm.container;
+import java.io.IOException;
import java.util.List;
import java.util.Map;
+import java.util.Set;
/**
* A plugin to support resource isolation and limitation within Storm
*/
public interface ResourceIsolationInterface {
+
+ /**
+ * Called when starting up
+ * @param conf the cluster config
+ * @throws IOException on any error.
+ */
+ void prepare(Map<String, Object> conf) throws IOException;
/**
* This function should be used prior to starting the worker to reserve resources for the worker
* @param workerId worker id of the worker to start
* @param resources set of resources to limit
*/
- void reserveResourcesForWorker(String workerId, Map resources);
+ void reserveResourcesForWorker(String workerId, Map<String, Number> resources);
/**
* This function will be called when the worker needs to shutdown. This function should include logic to clean up after a worker is shutdown
@@ -39,7 +48,6 @@ public interface ResourceIsolationInterface {
*/
void releaseResourcesForWorker(String workerId);
-
/**
* After reserving resources for the worker (i.e. calling reserveResourcesForWorker). This function can be used
* to get the modified command line to launch the worker with resource isolation
@@ -56,4 +64,13 @@ public interface ResourceIsolationInterface {
*/
List<String> getLaunchCommandPrefix(String workerId);
+ /**
+ * Get the list of PIDs currently in an isolated container
+ * @param workerId the id of the worker to get these for
+ * @return the set of PIDs, this will be combined with
+ * other ways of getting PIDs. An Empty set if
+ * no PIDs are found.
+ * @throws IOException on any error
+ */
+ Set<Long> getRunningPIDs(String workerId) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCommon.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCommon.java b/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCommon.java
index b12fcc0..c8bb304 100755
--- a/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCommon.java
+++ b/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCommon.java
@@ -89,11 +89,11 @@ public class CgroupCommon implements CgroupCommonOperation {
}
@Override
- public Set<Integer> getPids() throws IOException {
+ public Set<Long> getPids() throws IOException {
List<String> stringPids = CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, CGROUP_PROCS));
- Set<Integer> pids = new HashSet<Integer>();
+ Set<Long> pids = new HashSet<>();
for (String task : stringPids) {
- pids.add(Integer.valueOf(task));
+ pids.add(Long.valueOf(task));
}
return pids;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCommonOperation.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCommonOperation.java b/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCommonOperation.java
index 54368b6..eecba69 100755
--- a/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCommonOperation.java
+++ b/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCommonOperation.java
@@ -42,7 +42,7 @@ public interface CgroupCommonOperation {
/**
* get the PIDs of processes running in cgroup
*/
- public Set<Integer> getPids() throws IOException;
+ public Set<Long> getPids() throws IOException;
/**
* to set notify_on_release config in cgroup
http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupManager.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupManager.java b/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupManager.java
index 80093b3..9856595 100644
--- a/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupManager.java
+++ b/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupManager.java
@@ -18,19 +18,11 @@
package org.apache.storm.container.cgroup;
-import org.apache.commons.lang.ArrayUtils;
-import org.apache.storm.Config;
-import org.apache.storm.container.ResourceIsolationInterface;
-import org.apache.storm.container.cgroup.core.CpuCore;
-import org.apache.storm.container.cgroup.core.MemoryCore;
-import org.apache.storm.utils.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
@@ -38,6 +30,13 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.storm.Config;
+import org.apache.storm.container.ResourceIsolationInterface;
+import org.apache.storm.container.cgroup.core.CpuCore;
+import org.apache.storm.container.cgroup.core.MemoryCore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* Class that implements ResourceIsolationInterface that manages cgroups
*/
@@ -51,15 +50,15 @@ public class CgroupManager implements ResourceIsolationInterface {
private CgroupCommon rootCgroup;
- private static String rootDir;
+ private String rootDir;
- private Map conf;
+ private Map<String, Object> conf;
/**
* initialize intial data structures
* @param conf storm confs
*/
- public void prepare(Map conf) throws IOException {
+ public void prepare(Map<String, Object> conf) throws IOException {
this.conf = conf;
this.rootDir = Config.getCgroupRootDir(this.conf);
if (this.rootDir == null) {
@@ -81,7 +80,7 @@ public class CgroupManager implements ResourceIsolationInterface {
/**
* initalize subsystems
*/
- private void prepareSubSystem(Map conf) throws IOException {
+ private void prepareSubSystem(Map<String, Object> conf) throws IOException {
List<SubSystemType> subSystemTypes = new LinkedList<>();
for (String resource : Config.getCgroupStormResources(conf)) {
subSystemTypes.add(SubSystemType.getSubSystem(resource));
@@ -118,7 +117,7 @@ public class CgroupManager implements ResourceIsolationInterface {
}
}
- public void reserveResourcesForWorker(String workerId, Map resourcesMap) throws SecurityException {
+ public void reserveResourcesForWorker(String workerId, Map<String, Number> resourcesMap) throws SecurityException {
Number cpuNum = null;
// The manually set STORM_WORKER_CGROUP_CPU_LIMIT config on supervisor will overwrite resources assigned by RAS (Resource Aware Scheduler)
if (this.conf.get(Config.STORM_WORKER_CGROUP_CPU_LIMIT) != null) {
@@ -211,4 +210,14 @@ public class CgroupManager implements ResourceIsolationInterface {
public void close() throws IOException {
this.center.deleteCgroup(this.rootCgroup);
}
+
+ @Override
+ public Set<Long> getRunningPIDs(String workerId) throws IOException {
+ CgroupCommon workerGroup = new CgroupCommon(workerId, this.hierarchy, this.rootCgroup);
+ if (!this.rootCgroup.getChildren().contains(workerGroup)) {
+ LOG.warn("cgroup {} doesn't exist!", workerGroup);
+ return Collections.emptySet();
+ }
+ return workerGroup.getPids();
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/daemon/supervisor/AdvancedFSOps.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/AdvancedFSOps.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/AdvancedFSOps.java
new file mode 100644
index 0000000..361328e
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/AdvancedFSOps.java
@@ -0,0 +1,335 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.Writer;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+import java.nio.file.attribute.PosixFilePermission;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.storm.Config;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AdvancedFSOps {
+ private static final Logger LOG = LoggerFactory.getLogger(AdvancedFSOps.class);
+
+ /**
+ * Factory to create a new AdvancedFSOps
+ * @param conf the configuration of the process
+ * @return the appropriate instance of the class for this config and environment.
+ */
+ public static AdvancedFSOps make(Map<String, Object> conf) {
+ if (Utils.isOnWindows()) {
+ return new AdvancedWindowsFSOps(conf);
+ }
+ if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
+ return new AdvancedRunAsUserFSOps(conf);
+ }
+ return new AdvancedFSOps();
+ }
+
+ private static class AdvancedRunAsUserFSOps extends AdvancedFSOps {
+ private final Map<String, Object> _conf;
+
+ public AdvancedRunAsUserFSOps(Map<String, Object> conf) {
+ if (Utils.isOnWindows()) {
+ throw new UnsupportedOperationException("ERROR: Windows doesn't support running workers as different users yet");
+ }
+ _conf = conf;
+ }
+
+ @Override
+ public void setupBlobPermissions(File path, String user) throws IOException {
+ String logPrefix = "setup blob permissions for " + path;
+ SupervisorUtils.processLauncherAndWait(_conf, user, Arrays.asList("blob", path.toString()), null, logPrefix);
+ }
+
+ @Override
+ public void deleteIfExists(File path, String user, String logPrefix) throws IOException {
+ String absolutePath = path.getAbsolutePath();
+ LOG.info("Deleting path {}", absolutePath);
+ if (user == null) {
+ user = Files.getOwner(path.toPath()).getName();
+ }
+ List<String> commands = new ArrayList<>();
+ commands.add("rmr");
+ commands.add(absolutePath);
+ SupervisorUtils.processLauncherAndWait(_conf, user, commands, null, logPrefix);
+ if (Utils.checkFileExists(absolutePath)) {
+ throw new RuntimeException(path + " was not deleted.");
+ }
+ }
+
+ @Override
+ public void setupStormCodeDir(Map<String, Object> topologyConf, File path) throws IOException {
+ SupervisorUtils.setupStormCodeDir(_conf, topologyConf, path.getCanonicalPath());
+ }
+ }
+
+ /**
+ * Operations that need to override the default ones when running on Windows
+ *
+ */
+ private static class AdvancedWindowsFSOps extends AdvancedFSOps {
+
+ public AdvancedWindowsFSOps(Map<String, Object> conf) {
+ if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
+ throw new RuntimeException("ERROR: Windows doesn't support running workers as different users yet");
+ }
+ }
+
+ @Override
+ public void restrictDirectoryPermissions(File dir) throws IOException {
+ //NOOP, if windows gets support for run as user we will need to find a way to support this
+ }
+
+ @Override
+ public void moveDirectoryPreferAtomic(File fromDir, File toDir) throws IOException {
+ // Files/move with non-empty directory doesn't work well on Windows
+ // This is not atomic but it does work
+ FileUtils.moveDirectory(fromDir, toDir);
+ }
+
+ @Override
+ public boolean supportsAtomicDirectoryMove() {
+ // Files/move with non-empty directory doesn't work well on Windows
+ // FileUtils.moveDirectory is not atomic
+ return false;
+ }
+ }
+
+
+ protected AdvancedFSOps() {
+ //NOOP, but restricted permissions
+ }
+
+ /**
+ * Set directory permissions to (OWNER)RWX (GROUP)R-X (OTHER)---
+ * On some systems that do not support this, it may become a noop
+ * @param dir the directory to change permissions on
+ * @throws IOException on any error
+ */
+ public void restrictDirectoryPermissions(File dir) throws IOException {
+ Set<PosixFilePermission> perms = new HashSet<>(
+ Arrays.asList(PosixFilePermission.OWNER_READ, PosixFilePermission.OWNER_WRITE,
+ PosixFilePermission.OWNER_EXECUTE, PosixFilePermission.GROUP_READ,
+ PosixFilePermission.GROUP_EXECUTE));
+ Files.setPosixFilePermissions(dir.toPath(), perms);
+ }
+
+ /**
+ * Move fromDir to toDir, and try to make it an atomic move if possible
+ * @param fromDir what to move
+ * @param toDir where to move it from
+ * @throws IOException on any error
+ */
+ public void moveDirectoryPreferAtomic(File fromDir, File toDir) throws IOException {
+ FileUtils.forceMkdir(toDir);
+ Files.move(fromDir.toPath(), toDir.toPath(), StandardCopyOption.ATOMIC_MOVE);
+ }
+
+ /**
+ * @return true if an atomic directory move works, else false.
+ */
+ public boolean supportsAtomicDirectoryMove() {
+ return true;
+ }
+
+ /**
+ * Copy a directory
+ * @param fromDir from where
+ * @param toDir to where
+ * @throws IOException on any error
+ */
+ public void copyDirectory(File fromDir, File toDir) throws IOException {
+ FileUtils.copyDirectory(fromDir, toDir);
+ }
+
+ /**
+ * Setup permissions properly for an internal blob store path
+ * @param path the path to set the permissions on
+ * @param user the user to change the permissions for
+ * @throws IOException on any error
+ */
+ public void setupBlobPermissions(File path, String user) throws IOException {
+ //Normally this is a NOOP
+ }
+
+ /**
+ * Delete a file or a directory and all of the children. If it exists.
+ * @param path what to delete
+ * @param user who to delete it as if doing it as someone else is supported
+ * @param logPrefix if an external process needs to be launched to delete
+ * the object what prefix to include in the logs
+ * @throws IOException on any error.
+ */
+ public void deleteIfExists(File path, String user, String logPrefix) throws IOException {
+ //by default no need to do this as a different user
+ deleteIfExists(path);
+ }
+
+ /**
+ * Delete a file or a directory and all of the children. If it exists.
+ * @param path what to delete
+ * @throws IOException on any error.
+ */
+ public void deleteIfExists(File path) throws IOException {
+ LOG.info("Deleting path {}", path);
+ Path p = path.toPath();
+ if (Files.exists(p)) {
+ try {
+ FileUtils.forceDelete(path);
+ } catch (FileNotFoundException ignored) {}
+ }
+ }
+
+ /**
+ * Setup the permissions for the storm code dir
+ * @param topologyConf the config of the Topology
+ * @param path the directory to set the permissions on
+ * @throws IOException on any error
+ */
+ public void setupStormCodeDir(Map<String, Object> topologyConf, File path) throws IOException {
+ //By default this is a NOOP
+ }
+
+ /**
+ * Sanity check if everything the topology needs is there for it to run.
+ * @param conf the config of the supervisor
+ * @param topologyId the ID of the topology
+ * @return true if everything is there, else false
+ * @throws IOException on any error
+ */
+ public boolean doRequiredTopoFilesExist(Map<String, Object> conf, String topologyId) throws IOException {
+ return SupervisorUtils.doRequiredTopoFilesExist(conf, topologyId);
+ }
+
+ /**
+ * Makes a directory, including any necessary but nonexistent parent
+ * directories.
+ *
+ * @param path the directory to create
+ * @throws IOException on any error
+ */
+ public void forceMkdir(File path) throws IOException {
+ FileUtils.forceMkdir(path);
+ }
+
+ /**
+ * Check if a file exists or not
+ * @param path the path to check
+ * @return true if it exists else false
+ * @throws IOException on any error.
+ */
+ public boolean fileExists(File path) throws IOException {
+ return path.exists();
+ }
+
+ /**
+ * Get a writer for the given location
+ * @param file the file to write to
+ * @return the Writer to use.
+ * @throws IOException on any error
+ */
+ public Writer getWriter(File file) throws IOException {
+ return new FileWriter(file);
+ }
+
+ /**
+ * Get an output stream to write to a given file
+ * @param file the file to write to
+ * @return an OutputStream for that file
+ * @throws IOException on any error
+ */
+ public OutputStream getOutputStream(File file) throws IOException {
+ return new FileOutputStream(file);
+ }
+
+ /**
+ * Dump a string to a file
+ * @param location where to write to
+ * @param data the data to write
+ * @throws IOException on any error
+ */
+ public void dump(File location, String data) throws IOException {
+ File parent = location.getParentFile();
+ if (!parent.exists()) {
+ forceMkdir(parent);
+ }
+ try (Writer w = getWriter(location)) {
+ w.write(data);
+ }
+ }
+
+ /**
+ * Read the contents of a file into a String
+ * @param location the file to read
+ * @return the contents of the file
+ * @throws IOException on any error
+ */
+ public String slurpString(File location) throws IOException {
+ return FileUtils.readFileToString(location, "UTF-8");
+ }
+
+ /**
+ * Read the contents of a file into a byte array.
+ * @param localtion the file to read
+ * @return the contents of the file
+ * @throws IOException on any error
+ */
+ public byte[] slurp(File location) throws IOException {
+ return FileUtils.readFileToByteArray(location);
+ }
+
+ /**
+ * Create a symbolic link pointing at target
+ * @param link the link to create
+ * @param target where it should point to
+ * @throws IOException on any error.
+ */
+ public void createSymlink(File link, File target) throws IOException {
+ Path plink = link.toPath().toAbsolutePath();
+ Path ptarget = target.toPath().toAbsolutePath();
+ LOG.debug("Creating symlink [{}] to [{}]", plink, ptarget);
+ if (Files.exists(plink)) {
+ if (Files.isSameFile(plink, ptarget)) {
+ //It already points where we want it to
+ return;
+ }
+ FileUtils.forceDelete(link);
+ }
+ Files.createSymbolicLink(plink, ptarget);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/daemon/supervisor/BasicContainer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/BasicContainer.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/BasicContainer.java
new file mode 100644
index 0000000..efaa352
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/BasicContainer.java
@@ -0,0 +1,658 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.stream.Collectors;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.storm.Config;
+import org.apache.storm.container.ResourceIsolationInterface;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.generated.ProfileAction;
+import org.apache.storm.generated.ProfileRequest;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.generated.WorkerResources;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+
+/**
+ * A container that runs processes on the local box.
+ */
+public class BasicContainer extends Container {
+ private static final Logger LOG = LoggerFactory.getLogger(BasicContainer.class);
+ private static final FilenameFilter jarFilter = (dir, name) -> name.endsWith(".jar");
+ private static final Joiner CPJ =
+ Joiner.on(Utils.CLASS_PATH_SEPARATOR).skipNulls();
+
+ protected final LocalState _localState;
+ protected final String _profileCmd;
+ protected final String _stormHome = System.getProperty("storm.home");
+ protected volatile boolean _exitedEarly = false;
+
+ private class ProcessExitCallback implements ExitCodeCallback {
+ private final String _logPrefix;
+
+ public ProcessExitCallback(String logPrefix) {
+ _logPrefix = logPrefix;
+ }
+
+ @Override
+ public void call(int exitCode) {
+ LOG.info("{} exited with code: {}", _logPrefix, exitCode);
+ _exitedEarly = true;
+ }
+ }
+
+ /**
+ * Create a new BasicContainer
+ * @param type the type of container being made.
+ * @param conf the supervisor config
+ * @param supervisorId the ID of the supervisor this is a part of.
+ * @param port the port the container is on. Should be <= 0 if only a partial recovery
+ * @param assignment the assignment for this container. Should be null if only a partial recovery.
+ * @param resourceIsolationManager used to isolate resources for a container can be null if no isolation is used.
+ * @param localState the local state of the supervisor. May be null if partial recovery
+ * @param workerId the id of the worker to use. Must not be null if doing a partial recovery.
+ */
+ public BasicContainer(ContainerType type, Map<String, Object> conf, String supervisorId, int port,
+ LocalAssignment assignment, ResourceIsolationInterface resourceIsolationManager,
+ LocalState localState, String workerId) throws IOException {
+ this(type, conf, supervisorId, port, assignment, resourceIsolationManager, localState, workerId, null, null, null);
+ }
+
+ /**
+ * Create a new BasicContainer
+ * @param type the type of container being made.
+ * @param conf the supervisor config
+ * @param supervisorId the ID of the supervisor this is a part of.
+ * @param port the port the container is on. Should be <= 0 if only a partial recovery
+ * @param assignment the assignment for this container. Should be null if only a partial recovery.
+ * @param resourceIsolationManager used to isolate resources for a container can be null if no isolation is used.
+ * @param localState the local state of the supervisor. May be null if partial recovery
+ * @param workerId the id of the worker to use. Must not be null if doing a partial recovery.
+ * @param ops file system operations (mostly for testing) if null a new one is made
+ * @param topoConf the config of the topology (mostly for testing) if null
+ * and not a partial recovery the real conf is read.
+ * @param profileCmd the command to use when profiling (used for testing)
+ * @throws IOException on any error
+ * @throws ContainerRecoveryException if the Container could not be recovered.
+ */
+ BasicContainer(ContainerType type, Map<String, Object> conf, String supervisorId, int port,
+ LocalAssignment assignment, ResourceIsolationInterface resourceIsolationManager,
+ LocalState localState, String workerId, Map<String, Object> topoConf,
+ AdvancedFSOps ops, String profileCmd) throws IOException {
+ super(type, conf, supervisorId, port, assignment, resourceIsolationManager, workerId, topoConf, ops);
+ assert(localState != null);
+ _localState = localState;
+
+ if (type.isRecovery() && !type.isOnlyKillable()) {
+ synchronized (localState) {
+ String wid = null;
+ Map<String, Integer> workerToPort = localState.getApprovedWorkers();
+ for (Map.Entry<String, Integer> entry : workerToPort.entrySet()) {
+ if (port == entry.getValue().intValue()) {
+ wid = entry.getKey();
+ }
+ }
+ if (wid == null) {
+ throw new ContainerRecoveryException("Could not find worker id for " + port + " " + assignment);
+ }
+ LOG.info("Recovered Worker {}", wid);
+ _workerId = wid;
+ }
+ } else if (_workerId == null){
+ createNewWorkerId();
+ }
+
+ if (profileCmd == null) {
+ profileCmd = _stormHome + Utils.FILE_PATH_SEPARATOR + "bin" + Utils.FILE_PATH_SEPARATOR
+ + conf.get(Config.WORKER_PROFILER_COMMAND);
+ }
+ _profileCmd = profileCmd;
+ }
+
+ /**
+ * Create a new worker ID for this process and store in in this object and
+ * in the local state. Never call this if a worker is currently up and running.
+ * We will lose track of the process.
+ */
+ protected void createNewWorkerId() {
+ _type.assertFull();
+ assert(_workerId == null);
+ synchronized (_localState) {
+ _workerId = Utils.uuid();
+ Map<String, Integer> workerToPort = _localState.getApprovedWorkers();
+ if (workerToPort == null) {
+ workerToPort = new HashMap<>(1);
+ }
+ removeWorkersOn(workerToPort, _port);
+ workerToPort.put(_workerId, _port);
+ _localState.setApprovedWorkers(workerToPort);
+ LOG.info("Created Worker ID {}", _workerId);
+ }
+ }
+
+ private static void removeWorkersOn(Map<String, Integer> workerToPort, int _port) {
+ for (Iterator<Entry<String, Integer>> i = workerToPort.entrySet().iterator(); i.hasNext();) {
+ Entry<String, Integer> found = i.next();
+ if (_port == found.getValue().intValue()) {
+ LOG.warn("Deleting worker {} from state", found.getKey());
+ i.remove();
+ }
+ }
+ }
+
+ @Override
+ public void cleanUpForRestart() throws IOException {
+ String origWorkerId = _workerId;
+ super.cleanUpForRestart();
+ synchronized (_localState) {
+ Map<String, Integer> workersToPort = _localState.getApprovedWorkers();
+ workersToPort.remove(origWorkerId);
+ removeWorkersOn(workersToPort, _port);
+ _localState.setApprovedWorkers(workersToPort);
+ LOG.info("Removed Worker ID {}", origWorkerId);
+ }
+ }
+
+ @Override
+ public void relaunch() throws IOException {
+ _type.assertFull();
+ //We are launching it now...
+ _type = ContainerType.LAUNCH;
+ createNewWorkerId();
+ setup();
+ launch();
+ }
+
+ @Override
+ public boolean didMainProcessExit() {
+ return _exitedEarly;
+ }
+
+ /**
+ * Run the given command for profiling
+ *
+ * @param command
+ * the command to run
+ * @param env
+ * the environment to run the command
+ * @param logPrefix
+ * the prefix to include in the logs
+ * @param targetDir
+ * the working directory to run the command in
+ * @return true if it ran successfully, else false
+ * @throws IOException
+ * on any error
+ * @throws InterruptedException
+ * if interrupted wile waiting for the process to exit.
+ */
+ protected boolean runProfilingCommand(List<String> command, Map<String, String> env, String logPrefix,
+ File targetDir) throws IOException, InterruptedException {
+ _type.assertFull();
+ Process p = SupervisorUtils.launchProcess(command, env, logPrefix, null, targetDir);
+ int ret = p.waitFor();
+ return ret == 0;
+ }
+
+ @Override
+ public boolean runProfiling(ProfileRequest request, boolean stop) throws IOException, InterruptedException {
+ _type.assertFull();
+ String targetDir = ConfigUtils.workerArtifactsRoot(_conf, _topologyId, _port);
+
+ @SuppressWarnings("unchecked")
+ Map<String, String> env = (Map<String, String>) _topoConf.get(Config.TOPOLOGY_ENVIRONMENT);
+ if (env == null) {
+ env = new HashMap<String, String>();
+ }
+
+ String str = ConfigUtils.workerArtifactsPidPath(_conf, _topologyId, _port);
+
+ String workerPid = _ops.slurpString(new File(str)).trim();
+
+ ProfileAction profileAction = request.get_action();
+ String logPrefix = "ProfilerAction process " + _topologyId + ":" + _port + " PROFILER_ACTION: " + profileAction
+ + " ";
+
+ List<String> command = mkProfileCommand(profileAction, stop, workerPid, targetDir);
+
+ File targetFile = new File(targetDir);
+ if (command.size() > 0) {
+ return runProfilingCommand(command, env, logPrefix, targetFile);
+ }
+ LOG.warn("PROFILING REQUEST NOT SUPPORTED {} IGNORED...", request);
+ return true;
+ }
+
+ /**
+ * Get the command to run when doing profiling
+ * @param action the profiling action to perform
+ * @param stop if this is meant to stop the profiling or start it
+ * @param workerPid the PID of the process to profile
+ * @param targetDir the current working directory of the worker process
+ * @return the command to run for profiling.
+ */
+ private List<String> mkProfileCommand(ProfileAction action, boolean stop, String workerPid, String targetDir) {
+ switch(action) {
+ case JMAP_DUMP:
+ return jmapDumpCmd(workerPid, targetDir);
+ case JSTACK_DUMP:
+ return jstackDumpCmd(workerPid, targetDir);
+ case JPROFILE_DUMP:
+ return jprofileDump(workerPid, targetDir);
+ case JVM_RESTART:
+ return jprofileJvmRestart(workerPid);
+ case JPROFILE_STOP:
+ if (stop) {
+ return jprofileStop(workerPid, targetDir);
+ }
+ return jprofileStart(workerPid);
+ default:
+ return Lists.newArrayList();
+ }
+ }
+
+ private List<String> jmapDumpCmd(String pid, String targetDir) {
+ return Lists.newArrayList(_profileCmd, pid, "jmap", targetDir);
+ }
+
+ private List<String> jstackDumpCmd(String pid, String targetDir) {
+ return Lists.newArrayList(_profileCmd, pid, "jstack", targetDir);
+ }
+
+ private List<String> jprofileStart(String pid) {
+ return Lists.newArrayList(_profileCmd, pid, "start");
+ }
+
+ private List<String> jprofileStop(String pid, String targetDir) {
+ return Lists.newArrayList(_profileCmd, pid, "stop", targetDir);
+ }
+
+ private List<String> jprofileDump(String pid, String targetDir) {
+ return Lists.newArrayList(_profileCmd, pid, "dump", targetDir);
+ }
+
+ private List<String> jprofileJvmRestart(String pid) {
+ return Lists.newArrayList(_profileCmd, pid, "kill");
+ }
+
+ /**
+ * Compute the java.library.path that should be used for the worker.
+ * This helps it to load JNI libraries that are packaged in the uber jar.
+ * @param stormRoot the root directory of the worker process
+ * @param conf the config for the supervisor.
+ * @return the java.library.path/LD_LIBRARY_PATH to use so native libraries load correctly.
+ */
+ protected String javaLibraryPath(String stormRoot, Map<String, Object> conf) {
+ String resourceRoot = stormRoot + Utils.FILE_PATH_SEPARATOR + ConfigUtils.RESOURCES_SUBDIR;
+ String os = System.getProperty("os.name").replaceAll("\\s+", "_");
+ String arch = System.getProperty("os.arch");
+ String archResourceRoot = resourceRoot + Utils.FILE_PATH_SEPARATOR + os + "-" + arch;
+ String ret = CPJ.join(archResourceRoot, resourceRoot,
+ conf.get(Config.JAVA_LIBRARY_PATH));
+ return ret;
+ }
+
+ /**
+ * Returns a collection of jar file names found under the given directory.
+ * @param dir the directory to search
+ * @return the jar file names
+ */
+ protected List<String> getFullJars(File dir) {
+ File[] files = dir.listFiles(jarFilter);
+
+ if (files == null) {
+ return Collections.emptyList();
+ }
+
+ return Arrays.stream(files).map(f -> f.getAbsolutePath())
+ .collect(Collectors.toList());
+ }
+
+ protected List<String> frameworkClasspath() {
+ File stormLibDir = new File(_stormHome, "lib");
+ String stormConfDir =
+ System.getenv("STORM_CONF_DIR") != null ?
+ System.getenv("STORM_CONF_DIR") :
+ new File(_stormHome, "conf").getAbsolutePath();
+ File stormExtlibDir = new File(_stormHome, "extlib");
+ String extcp = System.getenv("STORM_EXT_CLASSPATH");
+ List<String> pathElements = new LinkedList<>();
+ pathElements.addAll(getFullJars(stormLibDir));
+ pathElements.addAll(getFullJars(stormExtlibDir));
+ pathElements.add(extcp);
+ pathElements.add(stormConfDir);
+
+ return pathElements;
+ }
+
+ @SuppressWarnings("unchecked")
+ private List<String> asStringList(Object o) {
+ if (o instanceof String) {
+ return Arrays.asList((String)o);
+ } else if (o instanceof List) {
+ return (List<String>)o;
+ }
+ return Collections.EMPTY_LIST;
+ }
+
+ /**
+ * Compute the classpath for the worker process
+ * @param stormJar the topology jar
+ * @param dependencyLocations any dependencies from the topology
+ * @return the full classpath
+ */
+ protected String getWorkerClassPath(String stormJar, List<String> dependencyLocations) {
+ List<String> workercp = new ArrayList<>();
+ workercp.addAll(asStringList(_topoConf.get(Config.TOPOLOGY_CLASSPATH_BEGINNING)));
+ workercp.addAll(frameworkClasspath());
+ workercp.add(stormJar);
+ workercp.addAll(dependencyLocations);
+ workercp.addAll(asStringList(_topoConf.get(Config.TOPOLOGY_CLASSPATH)));
+ return CPJ.join(workercp);
+ }
+
+ private String substituteChildOptsInternal(String string, int memOnheap) {
+ if (StringUtils.isNotBlank(string)) {
+ String p = String.valueOf(_port);
+ string = string.replace("%ID%", p);
+ string = string.replace("%WORKER-ID%", _workerId);
+ string = string.replace("%TOPOLOGY-ID%", _topologyId);
+ string = string.replace("%WORKER-PORT%", p);
+ if (memOnheap > 0) {
+ string = string.replace("%HEAP-MEM%", String.valueOf(memOnheap));
+ }
+ }
+ return string;
+ }
+
+ protected List<String> substituteChildopts(Object value) {
+ return substituteChildopts(value, -1);
+ }
+
+ protected List<String> substituteChildopts(Object value, int memOnheap) {
+ List<String> rets = new ArrayList<>();
+ if (value instanceof String) {
+ String string = substituteChildOptsInternal((String) value, memOnheap);
+ if (StringUtils.isNotBlank(string)) {
+ String[] strings = string.split("\\s+");
+ rets.addAll(Arrays.asList(strings));
+ }
+ } else if (value instanceof List) {
+ @SuppressWarnings("unchecked")
+ List<String> objects = (List<String>) value;
+ for (String object : objects) {
+ String str = substituteChildOptsInternal(object, memOnheap);
+ if (StringUtils.isNotBlank(str)) {
+ rets.add(str);
+ }
+ }
+ }
+ return rets;
+ }
+
+ /**
+ * Launch the worker process (non-blocking)
+ *
+ * @param command
+ * the command to run
+ * @param env
+ * the environment to run the command
+ * @param processExitcallback
+ * a callback for when the process exits
+ * @param logPrefix
+ * the prefix to include in the logs
+ * @param targetDir
+ * the working directory to run the command in
+ * @return true if it ran successfully, else false
+ * @throws IOException
+ * on any error
+ */
+ protected void launchWorkerProcess(List<String> command, Map<String, String> env, String logPrefix,
+ ExitCodeCallback processExitCallback, File targetDir) throws IOException {
+ if (_resourceIsolationManager != null) {
+ command = _resourceIsolationManager.getLaunchCommand(_workerId, command);
+ }
+ SupervisorUtils.launchProcess(command, env, logPrefix, processExitCallback, targetDir);
+ }
+
+ private String getWorkerLoggingConfigFile() {
+ String log4jConfigurationDir = (String) (_conf.get(Config.STORM_LOG4J2_CONF_DIR));
+
+ if (StringUtils.isNotBlank(log4jConfigurationDir)) {
+ if (!Utils.isAbsolutePath(log4jConfigurationDir)) {
+ log4jConfigurationDir = _stormHome + Utils.FILE_PATH_SEPARATOR + log4jConfigurationDir;
+ }
+ } else {
+ log4jConfigurationDir = _stormHome + Utils.FILE_PATH_SEPARATOR + "log4j2";
+ }
+
+ if (Utils.IS_ON_WINDOWS && !log4jConfigurationDir.startsWith("file:")) {
+ log4jConfigurationDir = "file:///" + log4jConfigurationDir;
+ }
+ return log4jConfigurationDir + Utils.FILE_PATH_SEPARATOR + "worker.xml";
+ }
+
+ /**
+ * Get parameters for the class path of the worker process. Also used by the
+ * log Writer
+ * @param stormRoot the root dist dir for the topology
+ * @return the classpath for the topology as command line arguments.
+ * @throws IOException on any error.
+ */
+ private List<String> getClassPathParams(final String stormRoot) throws IOException {
+ final String stormJar = ConfigUtils.supervisorStormJarPath(stormRoot);
+ final StormTopology stormTopology = ConfigUtils.readSupervisorTopology(_conf, _topologyId, _ops);
+ final List<String> dependencyLocations = new ArrayList<>();
+ if (stormTopology.get_dependency_jars() != null) {
+ for (String dependency : stormTopology.get_dependency_jars()) {
+ dependencyLocations.add(new File(stormRoot, dependency).getAbsolutePath());
+ }
+ }
+
+ if (stormTopology.get_dependency_artifacts() != null) {
+ for (String dependency : stormTopology.get_dependency_artifacts()) {
+ dependencyLocations.add(new File(stormRoot, dependency).getAbsolutePath());
+ }
+ }
+ final String workerClassPath = getWorkerClassPath(stormJar, dependencyLocations);
+
+ List<String> classPathParams = new ArrayList<>();
+ classPathParams.add("-cp");
+ classPathParams.add(workerClassPath);
+ return classPathParams;
+ }
+
+ /**
+ * Get a set of java properties that are common to both the log writer and the worker processes.
+ * These are mostly system properties that are used by logging.
+ * @return a list of command line options
+ */
+ private List<String> getCommonParams() {
+ final String workersArtifacts = ConfigUtils.workerArtifactsRoot(_conf);
+ String stormLogDir = ConfigUtils.getLogDir();
+ String log4jConfigurationFile = getWorkerLoggingConfigFile();
+
+ List<String> commonParams = new ArrayList<>();
+ commonParams.add("-Dlogging.sensitivity=" + OR((String) _topoConf.get(Config.TOPOLOGY_LOGGING_SENSITIVITY), "S3"));
+ commonParams.add("-Dlogfile.name=worker.log");
+ commonParams.add("-Dstorm.home=" + OR(_stormHome, ""));
+ commonParams.add("-Dworkers.artifacts=" + workersArtifacts);
+ commonParams.add("-Dstorm.id=" + _topologyId);
+ commonParams.add("-Dworker.id=" + _workerId);
+ commonParams.add("-Dworker.port=" + _port);
+ commonParams.add("-Dstorm.log.dir=" + stormLogDir);
+ commonParams.add("-Dlog4j.configurationFile=" + log4jConfigurationFile);
+ commonParams.add("-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector");
+ commonParams.add("-Dstorm.local.dir=" + _conf.get(Config.STORM_LOCAL_DIR));
+ return commonParams;
+ }
+
+ private int getMemOnHeap(WorkerResources resources) {
+ int memOnheap = 0;
+ if (resources != null && resources.is_set_mem_on_heap() &&
+ resources.get_mem_on_heap() > 0) {
+ memOnheap = (int) Math.ceil(resources.get_mem_on_heap());
+ } else {
+ // set the default heap memory size for supervisor-test
+ memOnheap = Utils.getInt(_topoConf.get(Config.WORKER_HEAP_MEMORY_MB), 768);
+ }
+ return memOnheap;
+ }
+
+ private List<String> getWorkerProfilerChildOpts(int memOnheap) {
+ List<String> workerProfilerChildopts = new ArrayList<>();
+ if (Utils.getBoolean(_conf.get(Config.WORKER_PROFILER_ENABLED), false)) {
+ workerProfilerChildopts = substituteChildopts(_conf.get(Config.WORKER_PROFILER_CHILDOPTS), memOnheap);
+ }
+ return workerProfilerChildopts;
+ }
+
+ /**
+ * a or b the first one that is not null
+ * @param a something
+ * @param b something else
+ * @return a or b the first one that is not null
+ */
+ private <V> V OR(V a, V b) {
+ return a == null ? b : a;
+ }
+
+ protected String javaCmd(String cmd) {
+ String ret = null;
+ String javaHome = System.getenv().get("JAVA_HOME");
+ if (StringUtils.isNotBlank(javaHome)) {
+ ret = javaHome + Utils.FILE_PATH_SEPARATOR + "bin" + Utils.FILE_PATH_SEPARATOR + cmd;
+ } else {
+ ret = cmd;
+ }
+ return ret;
+ }
+
+ /**
+ * Create the command to launch the worker process
+ * @param memOnheap the on heap memory for the worker
+ * @param stormRoot the root dist dir for the topology
+ * @param jlp java library path for the topology
+ * @return the command to run
+ * @throws IOException on any error.
+ */
+ private List<String> mkLaunchCommand(final int memOnheap, final String stormRoot,
+ final String jlp) throws IOException {
+ final String javaCmd = javaCmd("java");
+ final String stormOptions = ConfigUtils.concatIfNotNull(System.getProperty("storm.options"));
+ final String stormConfFile = ConfigUtils.concatIfNotNull(System.getProperty("storm.conf.file"));
+ final String workerTmpDir = ConfigUtils.workerTmpRoot(_conf, _workerId);
+
+ List<String> classPathParams = getClassPathParams(stormRoot);
+ List<String> commonParams = getCommonParams();
+
+ List<String> commandList = new ArrayList<>();
+ //Log Writer Command...
+ commandList.add(javaCmd);
+ commandList.addAll(classPathParams);
+ commandList.addAll(substituteChildopts(_topoConf.get(Config.TOPOLOGY_WORKER_LOGWRITER_CHILDOPTS)));
+ commandList.addAll(commonParams);
+ commandList.add("org.apache.storm.LogWriter"); //The LogWriter in turn launches the actual worker.
+
+ //Worker Command...
+ commandList.add(javaCmd);
+ commandList.add("-server");
+ commandList.addAll(commonParams);
+ commandList.addAll(substituteChildopts(_conf.get(Config.WORKER_CHILDOPTS), memOnheap));
+ commandList.addAll(substituteChildopts(_topoConf.get(Config.TOPOLOGY_WORKER_CHILDOPTS), memOnheap));
+ commandList.addAll(substituteChildopts(OR(
+ _topoConf.get(Config.TOPOLOGY_WORKER_GC_CHILDOPTS),
+ _conf.get(Config.WORKER_GC_CHILDOPTS)), memOnheap));
+ commandList.addAll(getWorkerProfilerChildOpts(memOnheap));
+ commandList.add("-Djava.library.path=" + jlp);
+ commandList.add("-Dstorm.conf.file=" + stormConfFile);
+ commandList.add("-Dstorm.options=" + stormOptions);
+ commandList.add("-Djava.io.tmpdir=" + workerTmpDir);
+ commandList.addAll(classPathParams);
+ commandList.add("org.apache.storm.daemon.worker");
+ commandList.add(_topologyId);
+ commandList.add(_supervisorId);
+ commandList.add(String.valueOf(_port));
+ commandList.add(_workerId);
+
+ return commandList;
+ }
+
+ @Override
+ public void launch() throws IOException {
+ _type.assertFull();
+ LOG.info("Launching worker with assignment {} for this supervisor {} on port {} with id {}", _assignment,
+ _supervisorId, _port, _workerId);
+ String logPrefix = "Worker Process " + _workerId;
+ ProcessExitCallback processExitCallback = new ProcessExitCallback(logPrefix);
+ _exitedEarly = false;
+
+ final WorkerResources resources = _assignment.get_resources();
+ final int memOnheap = getMemOnHeap(resources);
+ final String stormRoot = ConfigUtils.supervisorStormDistRoot(_conf, _topologyId);
+ final String jlp = javaLibraryPath(stormRoot, _conf);
+
+ List<String> commandList = mkLaunchCommand(memOnheap, stormRoot, jlp);
+
+ Map<String, String> topEnvironment = new HashMap<String, String>();
+ @SuppressWarnings("unchecked")
+ Map<String, String> environment = (Map<String, String>) _topoConf.get(Config.TOPOLOGY_ENVIRONMENT);
+ if (environment != null) {
+ topEnvironment.putAll(environment);
+ }
+ topEnvironment.put("LD_LIBRARY_PATH", jlp);
+
+ if (_resourceIsolationManager != null) {
+ int memoffheap = (int) Math.ceil(resources.get_mem_off_heap());
+ int cpu = (int) Math.ceil(resources.get_cpu());
+
+ int cGroupMem = (int) (Math.ceil((double) _conf.get(Config.STORM_CGROUP_MEMORY_LIMIT_TOLERANCE_MARGIN_MB)));
+ int memoryValue = memoffheap + memOnheap + cGroupMem;
+ int cpuValue = cpu;
+ Map<String, Number> map = new HashMap<>();
+ map.put("cpu", cpuValue);
+ map.put("memory", memoryValue);
+ _resourceIsolationManager.reserveResourcesForWorker(_workerId, map);
+ }
+
+ LOG.info("Launching worker with command: {}. ", Utils.shellCmd(commandList));
+
+ String workerDir = ConfigUtils.workerRoot(_conf, _workerId);
+
+ launchWorkerProcess(commandList, topEnvironment, logPrefix, processExitCallback, new File(workerDir));
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/daemon/supervisor/BasicContainerLauncher.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/BasicContainerLauncher.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/BasicContainerLauncher.java
new file mode 100644
index 0000000..4915650
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/BasicContainerLauncher.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.storm.container.ResourceIsolationInterface;
+import org.apache.storm.daemon.supervisor.Container.ContainerType;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.utils.LocalState;
+
+/**
+ * Launch containers with no security using standard java commands
+ */
+public class BasicContainerLauncher extends ContainerLauncher {
+ private final Map<String, Object> _conf;
+ private final String _supervisorId;
+ protected final ResourceIsolationInterface _resourceIsolationManager;
+
+ public BasicContainerLauncher(Map<String, Object> conf, String supervisorId, ResourceIsolationInterface resourceIsolationManager) throws IOException {
+ _conf = conf;
+ _supervisorId = supervisorId;
+ _resourceIsolationManager = resourceIsolationManager;
+ }
+
+ @Override
+ public Container launchContainer(int port, LocalAssignment assignment, LocalState state) throws IOException {
+ Container container = new BasicContainer(ContainerType.LAUNCH, _conf, _supervisorId, port, assignment,
+ _resourceIsolationManager, state, null);
+ container.setup();
+ container.launch();
+ return container;
+ }
+
+ @Override
+ public Container recoverContainer(int port, LocalAssignment assignment, LocalState state) throws IOException {
+ return new BasicContainer(ContainerType.RECOVER_FULL, _conf, _supervisorId, port, assignment,
+ _resourceIsolationManager, state, null);
+ }
+
+ @Override
+ public Killable recoverContainer(String workerId, LocalState localState) throws IOException {
+ return new BasicContainer(ContainerType.RECOVER_PARTIAL, _conf, _supervisorId, -1, null,
+ _resourceIsolationManager, localState, workerId);
+ }
+}