You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/04/01 16:07:41 UTC
[18/35] storm git commit: add the plugin to use for manager worker
add the plugin to use for manager worker
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a1e47352
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a1e47352
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a1e47352
Branch: refs/heads/master
Commit: a1e473526b5d9074ae1f9ff98162ddc78e426a73
Parents: cc95d4f
Author: xiaojian.fxj <xi...@alibaba-inc.com>
Authored: Mon Mar 14 16:54:36 2016 +0800
Committer: xiaojian.fxj <xi...@alibaba-inc.com>
Committed: Mon Mar 14 18:55:57 2016 +0800
----------------------------------------------------------------------
conf/defaults.yaml | 4 +
.../org/apache/storm/command/kill_workers.clj | 11 +-
.../apache/storm/daemon/local_supervisor.clj | 16 +-
storm-core/src/clj/org/apache/storm/testing.clj | 16 +-
storm-core/src/jvm/org/apache/storm/Config.java | 7 +
.../storm/daemon/supervisor/DaemonCommon.java | 22 -
.../daemon/supervisor/StandaloneSupervisor.java | 1 -
.../storm/daemon/supervisor/Supervisor.java | 14 +-
.../storm/daemon/supervisor/SupervisorData.java | 24 +-
.../daemon/supervisor/SupervisorManager.java | 103 +++++
.../daemon/supervisor/SupervisorManger.java | 97 -----
.../daemon/supervisor/SupervisorUtils.java | 105 +----
.../daemon/supervisor/SyncProcessEvent.java | 274 +------------
.../daemon/supervisor/SyncSupervisorEvent.java | 16 +-
.../supervisor/timer/RunProfilerActions.java | 2 +-
.../supervisor/timer/SupervisorHealthCheck.java | 8 +-
.../workermanager/DefaultWorkerManager.java | 397 +++++++++++++++++++
.../workermanager/IWorkerManager.java | 38 ++
.../supervisor/workermanager/IWorkerResult.java | 21 +
.../clj/org/apache/storm/supervisor_test.clj | 84 ++--
20 files changed, 706 insertions(+), 554 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/a1e47352/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index 9817161..da25ef8 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -289,6 +289,10 @@ storm.daemon.metrics.reporter.plugins:
storm.resource.isolation.plugin: "org.apache.storm.container.cgroup.CgroupManager"
storm.resource.isolation.plugin.enable: false
+
+# Default plugin to use for manager worker
+storm.supervisor.worker.manager.plugin: org.apache.storm.daemon.supervisor.workermanager.DefaultWorkerManager
+
# Configs for CGroup support
storm.cgroup.hierarchy.dir: "/cgroup/storm_resources"
storm.cgroup.resources:
http://git-wip-us.apache.org/repos/asf/storm/blob/a1e47352/storm-core/src/clj/org/apache/storm/command/kill_workers.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/command/kill_workers.clj b/storm-core/src/clj/org/apache/storm/command/kill_workers.clj
index aadc9fd..08de3ed 100644
--- a/storm-core/src/clj/org/apache/storm/command/kill_workers.clj
+++ b/storm-core/src/clj/org/apache/storm/command/kill_workers.clj
@@ -28,6 +28,13 @@
conf (assoc conf STORM-LOCAL-DIR (. (File. (conf STORM-LOCAL-DIR)) getCanonicalPath))
isupervisor (StandaloneSupervisor.)
supervisor-data (SupervisorData. conf nil isupervisor)
- ids (SupervisorUtils/supervisorWorkerIds conf)]
+ worker-manager (.getWorkerManager supervisor-data)
+ ids (SupervisorUtils/supervisorWorkerIds conf)
+ supervisor-id (.getSupervisorId supervisor-data)
+ worker-pids (.getWorkerThreadPids supervisor-data)
+ dead-workers (.getDeadWorkers supervisor-data)]
(doseq [id ids]
- (SupervisorUtils/shutWorker supervisor-data id))))
+ (.shutdownWorker worker-manager supervisor-id id worker-pids)
+ (if (.cleanupWorker worker-manager id)
+ (.remove dead-workers id))
+ )))
http://git-wip-us.apache.org/repos/asf/storm/blob/a1e47352/storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj b/storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj
index c8ae2d6..b28ae08 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj
@@ -36,17 +36,21 @@
(ProcessSimulator/registerProcess pid worker)
(.put (.getWorkerThreadPids supervisorData) workerId pid)
))
-
-(defn shutdown-local-worker [supervisorData workerId]
- (log-message "shutdown-local-worker")
- (SupervisorUtils/shutWorker supervisorData workerId))
+(defn shutdown-local-worker [supervisorData worker-manager workerId]
+ (log-message "shutdown-local-worker")
+ (let [supervisor-id (.getSupervisorId supervisorData)
+ worker-pids (.getWorkerThreadPids supervisorData)
+ dead-workers (.getDeadWorkers supervisorData)]
+ (.shutdownWorker worker-manager supervisor-id workerId worker-pids)
+ (if (.cleanupWorker worker-manager workerId)
+ (.remove dead-workers workerId))))
(defn local-process []
"Create a local process event"
(proxy [SyncProcessEvent] []
- (launchWorker [supervisorData stormId port workerId resources]
+ (launchLocalWorker [supervisorData stormId port workerId resources]
(launch-local-worker supervisorData stormId port workerId resources))
- (shutWorker [supervisorData workerId] (shutdown-local-worker supervisorData workerId))))
+ (shutWorker [supervisorData worker-manager workerId] (shutdown-local-worker supervisorData worker-manager workerId))))
(defserverfn mk-local-supervisor [conf shared-context isupervisor]
http://git-wip-us.apache.org/repos/asf/storm/blob/a1e47352/storm-core/src/clj/org/apache/storm/testing.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/testing.clj b/storm-core/src/clj/org/apache/storm/testing.clj
index 7804747..5000fd3 100644
--- a/storm-core/src/clj/org/apache/storm/testing.clj
+++ b/storm-core/src/clj/org/apache/storm/testing.clj
@@ -25,7 +25,7 @@
[org.apache.storm.utils]
[org.apache.storm.zookeeper Zookeeper]
[org.apache.storm ProcessSimulator]
- [org.apache.storm.daemon.supervisor StandaloneSupervisor SupervisorData SupervisorManger SupervisorUtils])
+ [org.apache.storm.daemon.supervisor StandaloneSupervisor SupervisorData SupervisorManager SupervisorUtils SupervisorManager])
(:import [java.io File])
(:import [java.util HashMap ArrayList])
(:import [java.util.concurrent.atomic AtomicInteger])
@@ -137,7 +137,8 @@
supervisor-conf (merge (:daemon-conf cluster-map)
conf
{STORM-LOCAL-DIR tmp-dir
- SUPERVISOR-SLOTS-PORTS port-ids})
+ SUPERVISOR-SLOTS-PORTS port-ids
+ STORM-SUPERVISOR-WORKER-MANAGER-PLUGIN "org.apache.storm.daemon.supervisor.workermanager.DefaultWorkerManager"})
id-fn (if id id (Utils/uuid))
isupervisor (proxy [StandaloneSupervisor] []
(generateSupervisorId [] id-fn))
@@ -282,7 +283,7 @@
([timeout-ms apredicate]
(while-timeout timeout-ms (not (apredicate))
(Time/sleep 100))))
-(defn is-supervisor-waiting [^SupervisorManger supervisor]
+(defn is-supervisor-waiting [^SupervisorManager supervisor]
(.isWaiting supervisor))
(defn wait-until-cluster-waiting
@@ -415,15 +416,18 @@
(defn mk-capture-shutdown-fn
[capture-atom]
- (fn [supervisorData workerId]
+ (fn [supervisorData worker-manager workerId]
(let [conf (.getConf supervisorData)
supervisor-id (.getSupervisorId supervisorData)
port (find-worker-port conf workerId)
+ worker-pids (.getWorkerThreadPids supervisorData)
+ dead-workers (.getDeadWorkers supervisorData)
existing (get @capture-atom [supervisor-id port] 0)]
(log-message "mk-capture-shutdown-fn")
(swap! capture-atom assoc [supervisor-id port] (inc existing))
- (SupervisorUtils/shutWorker supervisorData workerId))))
-
+ (.shutdownWorker worker-manager supervisor-id workerId worker-pids)
+ (if (.cleanupWorker worker-manager workerId)
+ (.remove dead-workers workerId)))))
(defmacro capture-changed-workers
[& body]
`(let [launch-captured# (atom {})
http://git-wip-us.apache.org/repos/asf/storm/blob/a1e47352/storm-core/src/jvm/org/apache/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/Config.java b/storm-core/src/jvm/org/apache/storm/Config.java
index 6ea8b0f..103e585 100644
--- a/storm-core/src/jvm/org/apache/storm/Config.java
+++ b/storm-core/src/jvm/org/apache/storm/Config.java
@@ -18,6 +18,7 @@
package org.apache.storm;
import org.apache.storm.container.ResourceIsolationInterface;
+import org.apache.storm.daemon.supervisor.workermanager.IWorkerManager;
import org.apache.storm.scheduler.resource.strategies.eviction.IEvictionStrategy;
import org.apache.storm.scheduler.resource.strategies.priority.ISchedulingPriorityStrategy;
import org.apache.storm.scheduler.resource.strategies.scheduling.IStrategy;
@@ -2212,6 +2213,12 @@ public class Config extends HashMap<String, Object> {
public static final Object STORM_RESOURCE_ISOLATION_PLUGIN = "storm.resource.isolation.plugin";
/**
+ * The plugin to be used for manager worker
+ */
+ @isImplementationOfClass(implementsClass = IWorkerManager.class)
+ public static final Object STORM_SUPERVISOR_WORKER_MANAGER_PLUGIN = "storm.supervisor.worker.manager.plugin";
+
+ /**
* CGroup Setting below
*/
http://git-wip-us.apache.org/repos/asf/storm/blob/a1e47352/storm-core/src/jvm/org/apache/storm/daemon/supervisor/DaemonCommon.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/DaemonCommon.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/DaemonCommon.java
deleted file mode 100644
index 3b7a18e..0000000
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/DaemonCommon.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.daemon.supervisor;
-
-public interface DaemonCommon {
- boolean isWaiting();
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/a1e47352/storm-core/src/jvm/org/apache/storm/daemon/supervisor/StandaloneSupervisor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/StandaloneSupervisor.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/StandaloneSupervisor.java
index 4947c6f..a1fa798 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/StandaloneSupervisor.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/StandaloneSupervisor.java
@@ -57,7 +57,6 @@ public class StandaloneSupervisor implements ISupervisor {
}
@Override
- // @return is vector which need be converted to be int
public Object getMetadata() {
Object ports = conf.get(Config.SUPERVISOR_SLOTS_PORTS);
return ports;
http://git-wip-us.apache.org/repos/asf/storm/blob/a1e47352/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java
index 6124aef..1dd44a9 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java
@@ -61,8 +61,8 @@ public class Supervisor {
* @return
* @throws Exception
*/
- public SupervisorManger mkSupervisor(final Map conf, IContext sharedContext, ISupervisor iSupervisor) throws Exception {
- SupervisorManger supervisorManger = null;
+ public SupervisorManager mkSupervisor(final Map conf, IContext sharedContext, ISupervisor iSupervisor) throws Exception {
+ SupervisorManager supervisorManager = null;
try {
LOG.info("Starting Supervisor with conf {}", conf);
iSupervisor.prepare(conf, ConfigUtils.supervisorIsupervisorDir(conf));
@@ -78,8 +78,8 @@ public class Supervisor {
Integer heartbeatFrequency = Utils.getInt(conf.get(Config.SUPERVISOR_HEARTBEAT_FREQUENCY_SECS));
supervisorData.getHeartbeatTimer().scheduleRecurring(0, heartbeatFrequency, hb);
- Set<String> downdedStormId = SupervisorUtils.readDownLoadedStormIds(conf);
- for (String stormId : downdedStormId) {
+ Set<String> downloadedStormIds = SupervisorUtils.readDownLoadedStormIds(conf);
+ for (String stormId : downloadedStormIds) {
SupervisorUtils.addBlobReferences(localizer, stormId, conf);
}
// do this after adding the references so we don't try to clean things being used
@@ -119,7 +119,7 @@ public class Supervisor {
eventTimer.scheduleRecurring(30, 30, new EventManagerPushCallback(runProfilerActionThread, syncSupEventManager));
}
LOG.info("Starting supervisor with id {} at host {}.", supervisorData.getSupervisorId(), supervisorData.getHostName());
- supervisorManger = new SupervisorManger(supervisorData, syncSupEventManager, syncProcessManager);
+ supervisorManager = new SupervisorManager(supervisorData, syncSupEventManager, syncProcessManager);
} catch (Throwable t) {
if (Utils.exceptionCauseIsInstanceOf(InterruptedIOException.class, t)) {
throw t;
@@ -130,7 +130,7 @@ public class Supervisor {
Utils.exitProcess(13, "Error on initialization");
}
}
- return supervisorManger;
+ return supervisorManager;
}
/**
@@ -138,7 +138,7 @@ public class Supervisor {
*/
private void launch(ISupervisor iSupervisor) {
LOG.info("Starting supervisor for storm version '{}'.", VersionInfo.getVersion());
- SupervisorManger supervisorManager;
+ SupervisorManager supervisorManager;
try {
Map<Object, Object> conf = Utils.readStormConfig();
if (ConfigUtils.isLocalMode(conf)) {
http://git-wip-us.apache.org/repos/asf/storm/blob/a1e47352/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorData.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorData.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorData.java
index 8c17edc..213457d 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorData.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorData.java
@@ -23,7 +23,7 @@ import org.apache.storm.cluster.ClusterStateContext;
import org.apache.storm.cluster.ClusterUtils;
import org.apache.storm.cluster.DaemonType;
import org.apache.storm.cluster.IStormClusterState;
-import org.apache.storm.container.cgroup.CgroupManager;
+import org.apache.storm.daemon.supervisor.workermanager.IWorkerManager;
import org.apache.storm.generated.LocalAssignment;
import org.apache.storm.generated.ProfileRequest;
import org.apache.storm.localizer.Localizer;
@@ -73,8 +73,8 @@ public class SupervisorData {
private AtomicInteger syncRetry;
private final Object downloadLock = new Object();
private AtomicReference<Map<String, List<ProfileRequest>>> stormIdToProfileActions;
- private CgroupManager resourceIsolationManager;
private ConcurrentHashSet<String> deadWorkers;
+ private final IWorkerManager workerManager;
public SupervisorData(Map conf, IContext sharedContext, ISupervisor iSupervisor) {
this.conf = conf;
@@ -124,17 +124,8 @@ public class SupervisorData {
this.assignmentVersions = new AtomicReference<Map<String, Map<String, Object>>>(new HashMap<String, Map<String, Object>>());
this.syncRetry = new AtomicInteger(0);
this.stormIdToProfileActions = new AtomicReference<Map<String, List<ProfileRequest>>>(new HashMap<String, List<ProfileRequest>>());
- if (Utils.getBoolean(conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN_ENABLE), false)) {
- try {
- this.resourceIsolationManager = (CgroupManager) Utils.newInstance((String) conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN));
- this.resourceIsolationManager.prepare(conf);
- LOG.info("Using resource isolation plugin {} {}", conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN), resourceIsolationManager);
- } catch (IOException e) {
- throw Utils.wrapInRuntime(e);
- }
- } else {
- this.resourceIsolationManager = null;
- }
+ this.workerManager = Utils.newInstance((String) conf.get(Config.STORM_SUPERVISOR_WORKER_MANAGER_PLUGIN));
+ this.workerManager.prepareWorker(conf, localizer);
}
public AtomicReference<Map<String, List<ProfileRequest>>> getStormIdToProfileActions() {
@@ -233,12 +224,11 @@ public class SupervisorData {
this.assignmentVersions.set(assignmentVersions);
}
- public CgroupManager getResourceIsolationManager() {
- return resourceIsolationManager;
- }
-
public ConcurrentHashSet getDeadWorkers() {
return deadWorkers;
}
+ public IWorkerManager getWorkerManager() {
+ return workerManager;
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/a1e47352/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorManager.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorManager.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorManager.java
new file mode 100644
index 0000000..d593d3c
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorManager.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import org.apache.storm.daemon.DaemonCommon;
+import org.apache.storm.daemon.supervisor.workermanager.IWorkerManager;
+import org.apache.storm.event.EventManager;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Map;
+
+public class SupervisorManager implements SupervisorDaemon, DaemonCommon, Runnable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SupervisorManager.class);
+ private final EventManager eventManager;
+ private final EventManager processesEventManager;
+ private SupervisorData supervisorData;
+
+ public SupervisorManager(SupervisorData supervisorData, EventManager eventManager, EventManager processesEventManager) {
+ this.eventManager = eventManager;
+ this.supervisorData = supervisorData;
+ this.processesEventManager = processesEventManager;
+ }
+
+ public void shutdown() {
+ LOG.info("Shutting down supervisor{}", supervisorData.getSupervisorId());
+ supervisorData.setActive(false);
+ try {
+ supervisorData.getHeartbeatTimer().close();
+ supervisorData.getEventTimer().close();
+ supervisorData.getBlobUpdateTimer().close();
+ eventManager.close();
+ processesEventManager.close();
+ } catch (Exception e) {
+ throw Utils.wrapInRuntime(e);
+ }
+ supervisorData.getStormClusterState().disconnect();
+ }
+
+ @Override
+ public void shutdownAllWorkers() {
+ Collection<String> workerIds = SupervisorUtils.supervisorWorkerIds(supervisorData.getConf());
+ IWorkerManager workerManager = supervisorData.getWorkerManager();
+ try {
+ for (String workerId : workerIds) {
+ workerManager.shutdownWorker(supervisorData.getSupervisorId(), workerId, supervisorData.getWorkerThreadPids());
+ boolean success = workerManager.cleanupWorker(workerId);
+ if (success){
+ supervisorData.getDeadWorkers().remove(workerId);
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("shutWorker failed");
+ throw Utils.wrapInRuntime(e);
+ }
+ }
+
+ @Override
+ public Map getConf() {
+ return supervisorData.getConf();
+ }
+
+ @Override
+ public String getId() {
+ return supervisorData.getSupervisorId();
+ }
+
+ @Override
+ public boolean isWaiting() {
+ if (!supervisorData.isActive()) {
+ return true;
+ }
+
+ if (supervisorData.getHeartbeatTimer().isTimerWaiting() && supervisorData.getEventTimer().isTimerWaiting() && eventManager.waiting()
+ && processesEventManager.waiting()) {
+ return true;
+ }
+ return false;
+ }
+
+ public void run() {
+ shutdown();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/a1e47352/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorManger.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorManger.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorManger.java
deleted file mode 100644
index 26f0aae..0000000
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorManger.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.daemon.supervisor;
-
-import org.apache.storm.event.EventManager;
-import org.apache.storm.utils.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Collection;
-import java.util.Map;
-
-public class SupervisorManger implements SupervisorDaemon, DaemonCommon, Runnable {
-
- private static final Logger LOG = LoggerFactory.getLogger(SupervisorManger.class);
- private final EventManager eventManager;
- private final EventManager processesEventManager;
- private SupervisorData supervisorData;
-
- public SupervisorManger(SupervisorData supervisorData, EventManager eventManager, EventManager processesEventManager) {
- this.eventManager = eventManager;
- this.supervisorData = supervisorData;
- this.processesEventManager = processesEventManager;
- }
-
- public void shutdown() {
- LOG.info("Shutting down supervisor{}", supervisorData.getSupervisorId());
- supervisorData.setActive(false);
- try {
- supervisorData.getHeartbeatTimer().close();
- supervisorData.getEventTimer().close();
- supervisorData.getBlobUpdateTimer().close();
- eventManager.close();
- processesEventManager.close();
- } catch (Exception e) {
- throw Utils.wrapInRuntime(e);
- }
- supervisorData.getStormClusterState().disconnect();
- }
-
- @Override
- public void shutdownAllWorkers() {
-
- Collection<String> workerIds = SupervisorUtils.supervisorWorkerIds(supervisorData.getConf());
- try {
- for (String workerId : workerIds) {
- SupervisorUtils.shutWorker(supervisorData, workerId);
- }
- } catch (Exception e) {
- LOG.error("shutWorker failed");
- throw Utils.wrapInRuntime(e);
- }
- }
-
- @Override
- public Map getConf() {
- return supervisorData.getConf();
- }
-
- @Override
- public String getId() {
- return supervisorData.getSupervisorId();
- }
-
- @Override
- public boolean isWaiting() {
- if (!supervisorData.isActive()) {
- return true;
- }
-
- if (supervisorData.getHeartbeatTimer().isTimerWaiting() && supervisorData.getEventTimer().isTimerWaiting() && eventManager.waiting()
- && processesEventManager.waiting()) {
- return true;
- }
- return false;
- }
-
- public void run() {
- shutdown();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/a1e47352/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java
index ae3422e..bb2525a 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java
@@ -50,10 +50,10 @@ public class SupervisorUtils {
_instance = INSTANCE;
}
- public static Process workerLauncher(Map conf, String user, List<String> args, Map<String, String> environment, final String logPreFix,
- final Utils.ExitCodeCallable exitCodeCallback, File dir) throws IOException {
+ public static Process processLauncher(Map conf, String user, List<String> args, Map<String, String> environment, final String logPreFix,
+ final Utils.ExitCodeCallable exitCodeCallback, File dir) throws IOException {
if (StringUtils.isBlank(user)) {
- throw new IllegalArgumentException("User cannot be blank when calling workerLauncher.");
+ throw new IllegalArgumentException("User cannot be blank when calling processLauncher.");
}
String wlinitial = (String) (conf.get(Config.SUPERVISOR_WORKER_LAUNCHER));
String stormHome = ConfigUtils.concatIfNotNull(System.getProperty("storm.home"));
@@ -71,10 +71,10 @@ public class SupervisorUtils {
return Utils.launchProcess(commands, environment, logPreFix, exitCodeCallback, dir);
}
- public static int workerLauncherAndWait(Map conf, String user, List<String> args, final Map<String, String> environment, final String logPreFix)
+ public static int processLauncherAndWait(Map conf, String user, List<String> args, final Map<String, String> environment, final String logPreFix)
throws IOException {
int ret = 0;
- Process process = workerLauncher(conf, user, args, environment, logPreFix, null, null);
+ Process process = processLauncher(conf, user, args, environment, logPreFix, null, null);
if (StringUtils.isNotBlank(logPreFix))
Utils.readAndLogStream(logPreFix, process.getInputStream());
try {
@@ -92,7 +92,7 @@ public class SupervisorUtils {
List<String> commands = new ArrayList<>();
commands.add("code-dir");
commands.add(dir);
- workerLauncherAndWait(conf, (String) (stormConf.get(Config.TOPOLOGY_SUBMITTER_USER)), commands, null, logPrefix);
+ processLauncherAndWait(conf, (String) (stormConf.get(Config.TOPOLOGY_SUBMITTER_USER)), commands, null, logPrefix);
}
}
@@ -102,7 +102,7 @@ public class SupervisorUtils {
List<String> commands = new ArrayList<>();
commands.add("rmr");
commands.add(path);
- SupervisorUtils.workerLauncherAndWait(conf, user, commands, null, logPreFix);
+ SupervisorUtils.processLauncherAndWait(conf, user, commands, null, logPreFix);
if (Utils.checkFileExists(path)) {
throw new RuntimeException(path + " was not deleted.");
}
@@ -116,11 +116,11 @@ public class SupervisorUtils {
* @return
*/
public static Boolean shouldUncompressBlob(Map<String, Object> blobInfo) {
- return new Boolean((String) blobInfo.get("uncompress"));
+ return Utils.getBoolean(blobInfo.get("uncompress"), false);
}
/**
- * Remove a reference to a blob when its no longer needed
+ * Returns a list of LocalResources based on the blobstore-map passed in
*
* @param blobstoreMap
* @return
@@ -186,7 +186,7 @@ public class SupervisorUtils {
}
/**
- * Returns map from worr id to heartbeat
+ * map from worker id to heartbeat
*
* @param conf
* @return
@@ -265,89 +265,4 @@ public class SupervisorUtils {
acls.add(new ACL((ZooDefs.Perms.READ ^ ZooDefs.Perms.CREATE), ZooDefs.Ids.ANYONE_ID_UNSAFE));
return acls;
}
-
- public static void shutWorker(SupervisorData supervisorData, String workerId) throws IOException, InterruptedException {
- LOG.info("Shutting down {}:{}", supervisorData.getSupervisorId(), workerId);
- Map conf = supervisorData.getConf();
- Collection<String> pids = Utils.readDirContents(ConfigUtils.workerPidsRoot(conf, workerId));
- Integer shutdownSleepSecs = Utils.getInt(conf.get(Config.SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS));
- Boolean asUser = Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false);
- String user = ConfigUtils.getWorkerUser(conf, workerId);
- String threadPid = supervisorData.getWorkerThreadPids().get(workerId);
- if (StringUtils.isNotBlank(threadPid)) {
- ProcessSimulator.killProcess(threadPid);
- }
-
- for (String pid : pids) {
- if (asUser) {
- List<String> commands = new ArrayList<>();
- commands.add("signal");
- commands.add(pid);
- commands.add("15");
- String logPrefix = "kill -15 " + pid;
- SupervisorUtils.workerLauncherAndWait(conf, user, commands, null, logPrefix);
- } else {
- Utils.killProcessWithSigTerm(pid);
- }
- }
-
- if (pids.size() > 0) {
- LOG.info("Sleep {} seconds for execution of cleanup threads on worker.", shutdownSleepSecs);
- Time.sleepSecs(shutdownSleepSecs);
- }
-
- for (String pid : pids) {
- if (asUser) {
- List<String> commands = new ArrayList<>();
- commands.add("signal");
- commands.add(pid);
- commands.add("9");
- String logPrefix = "kill -9 " + pid;
- SupervisorUtils.workerLauncherAndWait(conf, user, commands, null, logPrefix);
- } else {
- Utils.forceKillProcess(pid);
- }
- String path = ConfigUtils.workerPidPath(conf, workerId, pid);
- if (asUser) {
- SupervisorUtils.rmrAsUser(conf, workerId, path);
- } else {
- try {
- LOG.debug("Removing path {}", path);
- new File(path).delete();
- } catch (Exception e) {
- // on windows, the supervisor may still holds the lock on the worker directory
- // ignore
- }
- }
- }
- tryCleanupWorker(conf, supervisorData, workerId);
- LOG.info("Shut down {}:{}", supervisorData.getSupervisorId(), workerId);
-
- }
-
- public static void tryCleanupWorker(Map conf, SupervisorData supervisorData, String workerId) {
- try {
- String workerRoot = ConfigUtils.workerRoot(conf, workerId);
- if (Utils.checkFileExists(workerRoot)) {
- if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
- SupervisorUtils.rmrAsUser(conf, workerId, workerRoot);
- } else {
- Utils.forceDelete(ConfigUtils.workerHeartbeatsRoot(conf, workerId));
- Utils.forceDelete(ConfigUtils.workerPidsRoot(conf, workerId));
- Utils.forceDelete(ConfigUtils.workerTmpRoot(conf, workerId));
- Utils.forceDelete(ConfigUtils.workerRoot(conf, workerId));
- }
- ConfigUtils.removeWorkerUserWSE(conf, workerId);
- supervisorData.getDeadWorkers().remove(workerId);
- }
- if (Utils.getBoolean(conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN_ENABLE), false)){
- supervisorData.getResourceIsolationManager().releaseResourcesForWorker(workerId);
- }
- } catch (IOException e) {
- LOG.warn("Failed to cleanup worker {}. Will retry later", workerId, e);
- } catch (RuntimeException e) {
- LOG.warn("Failed to cleanup worker {}. Will retry later", workerId, e);
- }
- }
-
}
http://git-wip-us.apache.org/repos/asf/storm/blob/a1e47352/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java
index 068c442..41fa01d 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java
@@ -21,6 +21,7 @@ import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.storm.Config;
import org.apache.storm.container.cgroup.CgroupManager;
+import org.apache.storm.daemon.supervisor.workermanager.IWorkerManager;
import org.apache.storm.generated.ExecutorInfo;
import org.apache.storm.generated.LSWorkerHeartbeat;
import org.apache.storm.generated.LocalAssignment;
@@ -88,13 +89,6 @@ public class SyncProcessEvent implements Runnable {
this.localState = supervisorData.getLocalState();
}
-
- /**
- * 1. to kill are those in allocated that are dead or disallowed 2. kill the ones that should be dead - read pids, kill -9 and individually remove file -
- * rmr heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log) 3. of the rest, figure out what assignments aren't yet satisfied 4. generate new
- * worker ids, write new "approved workers" to LS 5. create local dir for worker id 5. launch new workers (give worker-id, port, and supervisor-id) 6. wait
- * for workers launch
- */
@Override
public void run() {
LOG.debug("Syncing processes");
@@ -132,7 +126,7 @@ public class SyncProcessEvent implements Runnable {
if (stateHeartbeat.getState() != State.VALID) {
LOG.info("Shutting down and clearing state for id {}, Current supervisor time: {}, State: {}, Heartbeat: {}", entry.getKey(), now,
stateHeartbeat.getState(), stateHeartbeat.getHeartbeat());
- shutWorker(supervisorData, entry.getKey());
+ shutWorker(supervisorData, supervisorData.getWorkerManager(), entry.getKey());
}
}
// start new workers
@@ -244,261 +238,24 @@ public class SyncProcessEvent implements Runnable {
/**
* launch a worker in local mode.
*/
- protected void launchWorker(SupervisorData supervisorData, String stormId, Long port, String workerId, WorkerResources resources) throws IOException {
+ protected void launchLocalWorker(SupervisorData supervisorData, String stormId, Long port, String workerId, WorkerResources resources) throws IOException {
// port this function after porting worker to java
}
- protected String getWorkerClassPath(String stormJar, Map stormConf) {
- List<String> topoClasspath = new ArrayList<>();
- Object object = stormConf.get(Config.TOPOLOGY_CLASSPATH);
-
- if (object instanceof List) {
- topoClasspath.addAll((List<String>) object);
- } else if (object instanceof String){
- topoClasspath.add((String)object);
- }else {
- //ignore
- }
- String classPath = Utils.workerClasspath();
- String classAddPath = Utils.addToClasspath(classPath, Arrays.asList(stormJar));
- return Utils.addToClasspath(classAddPath, topoClasspath);
- }
-
- /**
- * "Generates runtime childopts by replacing keys with topology-id, worker-id, port, mem-onheap"
- *
- * @param value
- * @param workerId
- * @param stormId
- * @param port
- * @param memOnheap
- */
- public List<String> substituteChildopts(Object value, String workerId, String stormId, Long port, int memOnheap) {
- List<String> rets = new ArrayList<>();
- if (value instanceof String) {
- String string = (String) value;
- string = string.replace("%ID%", String.valueOf(port));
- string = string.replace("%WORKER-ID%", workerId);
- string = string.replace("%TOPOLOGY-ID%", stormId);
- string = string.replace("%WORKER-PORT%", String.valueOf(port));
- string = string.replace("%HEAP-MEM%", String.valueOf(memOnheap));
- String[] strings = string.split("\\s+");
- rets.addAll(Arrays.asList(strings));
- } else if (value instanceof List) {
- List<Object> objects = (List<Object>) value;
- for (Object object : objects) {
- String str = (String)object;
- str = str.replace("%ID%", String.valueOf(port));
- str = str.replace("%WORKER-ID%", workerId);
- str = str.replace("%TOPOLOGY-ID%", stormId);
- str = str.replace("%WORKER-PORT%", String.valueOf(port));
- str = str.replace("%HEAP-MEM%", String.valueOf(memOnheap));
- rets.add(str);
- }
- }
- return rets;
- }
-
-
-
- /**
- * launch a worker in distributed mode
- * supervisorId for testing
- * @throws IOException
- */
- protected void launchWorker(Map conf, String supervisorId, String assignmentId, String stormId, Long port, String workerId,
- WorkerResources resources, CgroupManager cgroupManager, ConcurrentHashSet deadWorkers) throws IOException {
-
- Boolean runWorkerAsUser = Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false);
- String stormHome = ConfigUtils.concatIfNotNull(System.getProperty("storm.home"));
- String stormOptions = ConfigUtils.concatIfNotNull(System.getProperty("storm.options"));
- String stormConfFile = ConfigUtils.concatIfNotNull(System.getProperty("storm.conf.file"));
- String workerTmpDir = ConfigUtils.workerTmpRoot(conf, workerId);
-
- String stormLogDir = ConfigUtils.getLogDir();
- String stormLogConfDir = (String) (conf.get(Config.STORM_LOG4J2_CONF_DIR));
-
- String stormLog4j2ConfDir;
- if (StringUtils.isNotBlank(stormLogConfDir)) {
- if (Utils.isAbsolutePath(stormLogConfDir)) {
- stormLog4j2ConfDir = stormLogConfDir;
- } else {
- stormLog4j2ConfDir = stormHome + Utils.FILE_PATH_SEPARATOR + stormLogConfDir;
- }
- } else {
- stormLog4j2ConfDir = stormHome + Utils.FILE_PATH_SEPARATOR + "log4j2";
- }
-
- String stormRoot = ConfigUtils.supervisorStormDistRoot(conf, stormId);
-
- String jlp = jlp(stormRoot, conf);
-
- String stormJar = ConfigUtils.supervisorStormJarPath(stormRoot);
-
+ protected void launchDistributedWorker(IWorkerManager workerManager, Map conf, String supervisorId, String assignmentId, String stormId, Long port, String workerId,
+ WorkerResources resources, ConcurrentHashSet deadWorkers) throws IOException {
Map stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId);
-
- String workerClassPath = getWorkerClassPath(stormJar, stormConf);
-
- Object topGcOptsObject = stormConf.get(Config.TOPOLOGY_WORKER_GC_CHILDOPTS);
- List<String> topGcOpts = new ArrayList<>();
- if (topGcOptsObject instanceof String) {
- topGcOpts.add((String) topGcOptsObject);
- } else if (topGcOptsObject instanceof List) {
- topGcOpts.addAll((List<String>) topGcOptsObject);
- }
-
- int memOnheap = 0;
- if (resources.get_mem_on_heap() > 0) {
- memOnheap = (int) Math.ceil(resources.get_mem_on_heap());
- } else {
- //set the default heap memory size for supervisor-test
- memOnheap = Utils.getInt(stormConf.get(Config.WORKER_HEAP_MEMORY_MB), 768);
- }
-
- int memoffheap = (int) Math.ceil(resources.get_mem_off_heap());
-
- int cpu = (int) Math.ceil(resources.get_cpu());
-
- List<String> gcOpts = null;
-
- if (topGcOpts != null) {
- gcOpts = substituteChildopts(topGcOpts, workerId, stormId, port, memOnheap);
- } else {
- gcOpts = substituteChildopts(conf.get(Config.WORKER_GC_CHILDOPTS), workerId, stormId, port, memOnheap);
- }
-
- Object topoWorkerLogwriterObject = stormConf.get(Config.TOPOLOGY_WORKER_LOGWRITER_CHILDOPTS);
- List<String> topoWorkerLogwriterChildopts = new ArrayList<>();
- if (topoWorkerLogwriterObject instanceof String) {
- topoWorkerLogwriterChildopts.add((String) topoWorkerLogwriterObject);
- } else if (topoWorkerLogwriterObject instanceof List) {
- topoWorkerLogwriterChildopts.addAll((List<String>) topoWorkerLogwriterObject);
- }
-
String user = (String) stormConf.get(Config.TOPOLOGY_SUBMITTER_USER);
-
- String logfileName = "worker.log";
-
- String workersArtifacets = ConfigUtils.workerArtifactsRoot(conf);
-
- String loggingSensitivity = (String) stormConf.get(Config.TOPOLOGY_LOGGING_SENSITIVITY);
- if (loggingSensitivity == null) {
- loggingSensitivity = "S3";
- }
-
- List<String> workerChildopts = substituteChildopts(conf.get(Config.WORKER_CHILDOPTS), workerId, stormId, port, memOnheap);
-
- List<String> topWorkerChildopts = substituteChildopts(stormConf.get(Config.TOPOLOGY_WORKER_CHILDOPTS), workerId, stormId, port, memOnheap);
-
- List<String> workerProfilerChildopts = null;
- if (Utils.getBoolean(conf.get(Config.WORKER_PROFILER_ENABLED), false)) {
- workerProfilerChildopts = substituteChildopts(conf.get(Config.WORKER_PROFILER_CHILDOPTS), workerId, stormId, port, memOnheap);
- }else {
- workerProfilerChildopts = new ArrayList<>();
- }
-
- Map<String, String> topEnvironment = new HashMap<String, String>();
- Map<String, String> environment = (Map<String, String>) stormConf.get(Config.TOPOLOGY_ENVIRONMENT);
- if (environment != null) {
- topEnvironment.putAll(environment);
- }
- topEnvironment.put("LD_LIBRARY_PATH", jlp);
-
- String log4jConfigurationFile = null;
- if (System.getProperty("os.name").startsWith("Windows") && !stormLog4j2ConfDir.startsWith("file:")) {
- log4jConfigurationFile = "file:///" + stormLog4j2ConfDir;
- } else {
- log4jConfigurationFile = stormLog4j2ConfDir;
- }
- log4jConfigurationFile = log4jConfigurationFile + Utils.FILE_PATH_SEPARATOR + "worker.xml";
-
- List<String> commandList = new ArrayList<>();
- commandList.add(SupervisorUtils.javaCmd("java"));
- commandList.add("-cp");
- commandList.add(workerClassPath);
- commandList.addAll(topoWorkerLogwriterChildopts);
- commandList.add("-Dlogfile.name=" + logfileName);
- commandList.add("-Dstorm.home=" + stormHome);
- commandList.add("-Dworkers.artifacts=" + workersArtifacets);
- commandList.add("-Dstorm.id=" + stormId);
- commandList.add("-Dworker.id=" + workerId);
- commandList.add("-Dworker.port=" + port);
- commandList.add("-Dstorm.log.dir=" + stormLogDir);
- commandList.add("-Dlog4j.configurationFile=" + log4jConfigurationFile);
- commandList.add("-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector");
- commandList.add("org.apache.storm.LogWriter");
-
- commandList.add(SupervisorUtils.javaCmd("java"));
- commandList.add("-server");
- commandList.addAll(workerChildopts);
- commandList.addAll(topWorkerChildopts);
- commandList.addAll(gcOpts);
- commandList.addAll(workerProfilerChildopts);
- commandList.add("-Djava.library.path=" + jlp);
- commandList.add("-Dlogfile.name=" + logfileName);
- commandList.add("-Dstorm.home=" + stormHome);
- commandList.add("-Dworkers.artifacts=" + workersArtifacets);
- commandList.add("-Dstorm.conf.file=" + stormConfFile);
- commandList.add("-Dstorm.options=" + stormOptions);
- commandList.add("-Dstorm.log.dir=" + stormLogDir);
- commandList.add("-Djava.io.tmpdir=" + workerTmpDir);
- commandList.add("-Dlogging.sensitivity=" + loggingSensitivity);
- commandList.add("-Dlog4j.configurationFile=" + log4jConfigurationFile);
- commandList.add("-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector");
- commandList.add("-Dstorm.id=" + stormId);
- commandList.add("-Dworker.id=" + workerId);
- commandList.add("-Dworker.port=" + port);
- commandList.add("-cp");
- commandList.add(workerClassPath);
- commandList.add("org.apache.storm.daemon.worker");
- commandList.add(stormId);
- commandList.add(assignmentId);
- commandList.add(String.valueOf(port));
- commandList.add(workerId);
-
- // {"cpu" cpu "memory" (+ mem-onheap mem-offheap (int (Math/ceil (conf STORM-CGROUP-MEMORY-LIMIT-TOLERANCE-MARGIN-MB))))
- if (Utils.getBoolean(conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN_ENABLE), false)) {
- int cgRoupMem = (int) (Math.ceil((double) conf.get(Config.STORM_CGROUP_MEMORY_LIMIT_TOLERANCE_MARGIN_MB)));
- int memoryValue = memoffheap + memOnheap + cgRoupMem;
- int cpuValue = cpu;
- Map<String, Number> map = new HashMap<>();
- map.put("cpu", cpuValue);
- map.put("memory", memoryValue);
- cgroupManager.reserveResourcesForWorker(workerId, map);
- commandList = cgroupManager.getLaunchCommand(workerId, commandList);
- }
-
- LOG.info("Launching worker with command: {}. ", Utils.shellCmd(commandList));
writeLogMetadata(stormConf, user, workerId, stormId, port, conf);
ConfigUtils.setWorkerUserWSE(conf, workerId, user);
createArtifactsLink(conf, stormId, port, workerId);
String logPrefix = "Worker Process " + workerId;
- String workerDir = ConfigUtils.workerRoot(conf, workerId);
-
if (deadWorkers != null)
deadWorkers.remove(workerId);
createBlobstoreLinks(conf, stormId, workerId);
-
ProcessExitCallback processExitCallback = new ProcessExitCallback(logPrefix, workerId);
- if (runWorkerAsUser) {
- List<String> args = new ArrayList<>();
- args.add("worker");
- args.add(workerDir);
- args.add(Utils.writeScript(workerDir, commandList, topEnvironment));
- SupervisorUtils.workerLauncher(conf, user, args, null, logPrefix, processExitCallback, new File(workerDir));
- } else {
- Utils.launchProcess(commandList, topEnvironment, logPrefix, processExitCallback, new File(workerDir));
- }
- }
-
- protected String jlp(String stormRoot, Map conf) {
- String resourceRoot = stormRoot + Utils.FILE_PATH_SEPARATOR + ConfigUtils.RESOURCES_SUBDIR;
- String os = System.getProperty("os.name").replaceAll("\\s+", "_");
- String arch = System.getProperty("os.arch");
- String archResourceRoot = resourceRoot + Utils.FILE_PATH_SEPARATOR + os + "-" + arch;
- String ret = archResourceRoot + Utils.FILE_PATH_SEPARATOR + resourceRoot + Utils.FILE_PATH_SEPARATOR + conf.get(Config.JAVA_LIBRARY_PATH);
- return ret;
+ workerManager.launchWorker(supervisorId, assignmentId, stormId, port, workerId, resources, processExitCallback);
}
protected Map<String, Integer> startNewWorkers(Map<Integer, String> newWorkerIds, Map<Integer, LocalAssignment> reassignExecutors) throws IOException {
@@ -528,10 +285,9 @@ public class SyncProcessEvent implements Runnable {
FileUtils.forceMkdir(new File(hbPath));
if (clusterMode.endsWith("distributed")) {
- launchWorker(conf, supervisorId, supervisorData.getAssignmentId(), stormId, port.longValue(), workerId, resources,
- supervisorData.getResourceIsolationManager(), supervisorData.getDeadWorkers());
+ launchDistributedWorker(supervisorData.getWorkerManager(), conf, supervisorId, supervisorData.getAssignmentId(), stormId, port.longValue(), workerId, resources, supervisorData.getDeadWorkers());
} else if (clusterMode.endsWith("local")) {
- launchWorker(supervisorData, stormId, port.longValue(), workerId, resources);
+ launchLocalWorker(supervisorData, stormId, port.longValue(), workerId, resources);
}
newValidWorkerIds.put(workerId, port);
@@ -559,9 +315,7 @@ public class SyncProcessEvent implements Runnable {
}
if (stormconf.get(Config.TOPOLOGY_GROUPS) != null) {
List<String> topGroups = (List<String>) stormconf.get(Config.TOPOLOGY_GROUPS);
- for (String group : topGroups){
- logsGroups.add(group);
- }
+ logsGroups.addAll(topGroups);
}
data.put(Config.LOGS_GROUPS, logsGroups.toArray());
@@ -609,7 +363,6 @@ public class SyncProcessEvent implements Runnable {
}finally {
writer.close();
}
-
}
/**
@@ -665,8 +418,11 @@ public class SyncProcessEvent implements Runnable {
}
}
- //for supervisor-test
- public void shutWorker(SupervisorData supervisorData, String workerId) throws IOException, InterruptedException{
- SupervisorUtils.shutWorker(supervisorData, workerId);
+ public void shutWorker(SupervisorData supervisorData, IWorkerManager workerManager, String workerId) throws IOException, InterruptedException{
+ workerManager.shutdownWorker(supervisorData.getSupervisorId(), workerId, supervisorData.getWorkerThreadPids());
+ boolean success = workerManager.cleanupWorker(workerId);
+ if (success){
+ supervisorData.getDeadWorkers().remove(workerId);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/a1e47352/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java
index 4c08014..47cf440 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java
@@ -109,6 +109,7 @@ public class SyncSupervisorEvent implements Runnable {
LOG.debug("Checked Downloaded Ids {}", srashStormIds);
LOG.debug("Downloaded Ids {}", downloadedStormIds);
LOG.debug("Storm Ids Profiler Actions {}", stormIdToProfilerActions);
+
// download code first
// This might take awhile
// - should this be done separately from usual monitoring?
@@ -204,12 +205,12 @@ public class SyncSupervisorEvent implements Runnable {
List<ExecutorInfo> existExecutors = existingAssignment.get(port).get_executors();
List<ExecutorInfo> newExecutors = newAssignment.get(port).get_executors();
if (newExecutors.size() != existExecutors.size()) {
- syncProcesses.shutWorker(supervisorData, vaildPortToWorkerIds.get(port));
+ syncProcesses.shutWorker(supervisorData, supervisorData.getWorkerManager(), vaildPortToWorkerIds.get(port));
continue;
}
for (ExecutorInfo executorInfo : newExecutors) {
if (!existExecutors.contains(executorInfo)) {
- syncProcesses.shutWorker(supervisorData, vaildPortToWorkerIds.get(port));
+ syncProcesses.shutWorker(supervisorData, supervisorData.getWorkerManager(), vaildPortToWorkerIds.get(port));
break;
}
}
@@ -353,7 +354,12 @@ public class SyncSupervisorEvent implements Runnable {
} finally {
blobStore.shutdown();
}
- FileUtils.moveDirectory(new File(tmproot), new File(stormroot));
+ try {
+ FileUtils.moveDirectory(new File(tmproot), new File(stormroot));
+ }catch (Exception e){
+ //igonre
+ }
+
SupervisorUtils.setupStormCodeDir(conf, ConfigUtils.readSupervisorStormConf(conf, stormId), stormroot);
ClassLoader classloader = Thread.currentThread().getContextClassLoader();
@@ -503,7 +509,7 @@ public class SyncSupervisorEvent implements Runnable {
protected void setupBlobPermission(Map conf, String user, String path) throws IOException {
if (Utils.getBoolean(Config.SUPERVISOR_RUN_WORKER_AS_USER, false)) {
String logPrefix = "setup blob permissions for " + path;
- SupervisorUtils.workerLauncherAndWait(conf, user, Arrays.asList("blob", path), null, logPrefix);
+ SupervisorUtils.processLauncherAndWait(conf, user, Arrays.asList("blob", path), null, logPrefix);
}
}
@@ -623,7 +629,7 @@ public class SyncSupervisorEvent implements Runnable {
String workerId = entry.getKey();
StateHeartbeat stateHeartbeat = entry.getValue();
if (stateHeartbeat.getState() == State.DISALLOWED) {
- syncProcesses.shutWorker(supervisorData, workerId);
+ syncProcesses.shutWorker(supervisorData, supervisorData.getWorkerManager(), workerId);
LOG.debug("{}'s state disallowed, so shutdown this worker");
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/a1e47352/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java
index d39a679..ec29855 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java
@@ -171,7 +171,7 @@ public class RunProfilerActions implements Runnable {
newCommands.add("profiler");
newCommands.add(targetDir);
newCommands.add(script);
- SupervisorUtils.workerLauncher(conf, user, newCommands, environment, logPrefix, exitCodeCallable, targetFile);
+ SupervisorUtils.processLauncher(conf, user, newCommands, environment, logPrefix, exitCodeCallable, targetFile);
} else {
Utils.launchProcess(commands, environment, logPrefix, exitCodeCallable, targetFile);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/a1e47352/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java
index f6b3ed6..5e7b6d3 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java
@@ -21,6 +21,7 @@ package org.apache.storm.daemon.supervisor.timer;
import org.apache.storm.command.HealthCheck;
import org.apache.storm.daemon.supervisor.SupervisorData;
import org.apache.storm.daemon.supervisor.SupervisorUtils;
+import org.apache.storm.daemon.supervisor.workermanager.IWorkerManager;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,12 +42,17 @@ public class SupervisorHealthCheck implements Runnable {
@Override
public void run() {
Map conf = supervisorData.getConf();
+ IWorkerManager workerManager = supervisorData.getWorkerManager();
int healthCode = HealthCheck.healthCheck(conf);
Collection<String> workerIds = SupervisorUtils.supervisorWorkerIds(conf);
if (healthCode != 0) {
for (String workerId : workerIds) {
try {
- SupervisorUtils.shutWorker(supervisorData, workerId);
+ workerManager.shutdownWorker(supervisorData.getSupervisorId(), workerId, supervisorData.getWorkerThreadPids());
+ boolean success = workerManager.cleanupWorker(workerId);
+ if (success){
+ supervisorData.getDeadWorkers().remove(workerId);
+ }
} catch (Exception e) {
throw Utils.wrapInRuntime(e);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/a1e47352/storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/DefaultWorkerManager.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/DefaultWorkerManager.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/DefaultWorkerManager.java
new file mode 100644
index 0000000..b19fd89
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/DefaultWorkerManager.java
@@ -0,0 +1,397 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor.workermanager;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.storm.Config;
+import org.apache.storm.ProcessSimulator;
+import org.apache.storm.container.cgroup.CgroupManager;
+import org.apache.storm.daemon.supervisor.SupervisorUtils;
+import org.apache.storm.generated.WorkerResources;
+import org.apache.storm.localizer.Localizer;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+
+public class DefaultWorkerManager implements IWorkerManager {
+
+ private static Logger LOG = LoggerFactory.getLogger(DefaultWorkerManager.class);
+
+ private Map conf;
+ private CgroupManager resourceIsolationManager;
+ private boolean runWorkerAsUser;
+
+ @Override
+ public void prepareWorker(Map conf, Localizer localizer) {
+ this.conf = conf;
+ if (Utils.getBoolean(conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN_ENABLE), false)) {
+ try {
+ this.resourceIsolationManager = Utils.newInstance((String) conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN));
+ this.resourceIsolationManager.prepare(conf);
+ LOG.info("Using resource isolation plugin {} {}", conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN), resourceIsolationManager);
+ } catch (IOException e) {
+ throw Utils.wrapInRuntime(e);
+ }
+ } else {
+ this.resourceIsolationManager = null;
+ }
+ this.runWorkerAsUser = Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false);
+ }
+
+ @Override
+ public IWorkerResult launchWorker(String supervisorId, String assignmentId, String stormId, Long port, String workerId, WorkerResources resources,
+ Utils.ExitCodeCallable workerExitCallback) {
+ try {
+
+ String stormHome = ConfigUtils.concatIfNotNull(System.getProperty("storm.home"));
+ String stormOptions = ConfigUtils.concatIfNotNull(System.getProperty("storm.options"));
+ String stormConfFile = ConfigUtils.concatIfNotNull(System.getProperty("storm.conf.file"));
+ String workerTmpDir = ConfigUtils.workerTmpRoot(conf, workerId);
+
+ String stormLogDir = ConfigUtils.getLogDir();
+ String stormLogConfDir = (String) (conf.get(Config.STORM_LOG4J2_CONF_DIR));
+
+ String stormLog4j2ConfDir;
+ if (StringUtils.isNotBlank(stormLogConfDir)) {
+ if (Utils.isAbsolutePath(stormLogConfDir)) {
+ stormLog4j2ConfDir = stormLogConfDir;
+ } else {
+ stormLog4j2ConfDir = stormHome + Utils.FILE_PATH_SEPARATOR + stormLogConfDir;
+ }
+ } else {
+ stormLog4j2ConfDir = stormHome + Utils.FILE_PATH_SEPARATOR + "log4j2";
+ }
+
+ String stormRoot = ConfigUtils.supervisorStormDistRoot(conf, stormId);
+
+ String jlp = jlp(stormRoot, conf);
+
+ String stormJar = ConfigUtils.supervisorStormJarPath(stormRoot);
+
+ Map stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId);
+
+ String workerClassPath = getWorkerClassPath(stormJar, stormConf);
+
+ Object topGcOptsObject = stormConf.get(Config.TOPOLOGY_WORKER_GC_CHILDOPTS);
+ List<String> topGcOpts = new ArrayList<>();
+ if (topGcOptsObject instanceof String) {
+ topGcOpts.add((String) topGcOptsObject);
+ } else if (topGcOptsObject instanceof List) {
+ topGcOpts.addAll((List<String>) topGcOptsObject);
+ }
+
+ int memOnheap = 0;
+ if (resources.get_mem_on_heap() > 0) {
+ memOnheap = (int) Math.ceil(resources.get_mem_on_heap());
+ } else {
+ // set the default heap memory size for supervisor-test
+ memOnheap = Utils.getInt(stormConf.get(Config.WORKER_HEAP_MEMORY_MB), 768);
+ }
+
+ int memoffheap = (int) Math.ceil(resources.get_mem_off_heap());
+
+ int cpu = (int) Math.ceil(resources.get_cpu());
+
+ List<String> gcOpts = null;
+
+ if (topGcOpts.size() > 0) {
+ gcOpts = substituteChildopts(topGcOpts, workerId, stormId, port, memOnheap);
+ } else {
+ gcOpts = substituteChildopts(conf.get(Config.WORKER_GC_CHILDOPTS), workerId, stormId, port, memOnheap);
+ }
+
+ Object topoWorkerLogwriterObject = stormConf.get(Config.TOPOLOGY_WORKER_LOGWRITER_CHILDOPTS);
+ List<String> topoWorkerLogwriterChildopts = new ArrayList<>();
+ if (topoWorkerLogwriterObject instanceof String) {
+ topoWorkerLogwriterChildopts.add((String) topoWorkerLogwriterObject);
+ } else if (topoWorkerLogwriterObject instanceof List) {
+ topoWorkerLogwriterChildopts.addAll((List<String>) topoWorkerLogwriterObject);
+ }
+
+ String user = (String) stormConf.get(Config.TOPOLOGY_SUBMITTER_USER);
+
+ String logfileName = "worker.log";
+
+ String workersArtifacets = ConfigUtils.workerArtifactsRoot(conf);
+
+ String loggingSensitivity = (String) stormConf.get(Config.TOPOLOGY_LOGGING_SENSITIVITY);
+ if (loggingSensitivity == null) {
+ loggingSensitivity = "S3";
+ }
+
+ List<String> workerChildopts = substituteChildopts(conf.get(Config.WORKER_CHILDOPTS), workerId, stormId, port, memOnheap);
+
+ List<String> topWorkerChildopts = substituteChildopts(stormConf.get(Config.TOPOLOGY_WORKER_CHILDOPTS), workerId, stormId, port, memOnheap);
+
+ List<String> workerProfilerChildopts = null;
+ if (Utils.getBoolean(conf.get(Config.WORKER_PROFILER_ENABLED), false)) {
+ workerProfilerChildopts = substituteChildopts(conf.get(Config.WORKER_PROFILER_CHILDOPTS), workerId, stormId, port, memOnheap);
+ } else {
+ workerProfilerChildopts = new ArrayList<>();
+ }
+
+ Map<String, String> topEnvironment = new HashMap<String, String>();
+ Map<String, String> environment = (Map<String, String>) stormConf.get(Config.TOPOLOGY_ENVIRONMENT);
+ if (environment != null) {
+ topEnvironment.putAll(environment);
+ }
+ topEnvironment.put("LD_LIBRARY_PATH", jlp);
+
+ String log4jConfigurationFile = null;
+ if (System.getProperty("os.name").startsWith("Windows") && !stormLog4j2ConfDir.startsWith("file:")) {
+ log4jConfigurationFile = "file:///" + stormLog4j2ConfDir;
+ } else {
+ log4jConfigurationFile = stormLog4j2ConfDir;
+ }
+ log4jConfigurationFile = log4jConfigurationFile + Utils.FILE_PATH_SEPARATOR + "worker.xml";
+
+ List<String> commandList = new ArrayList<>();
+ commandList.add(SupervisorUtils.javaCmd("java"));
+ commandList.add("-cp");
+ commandList.add(workerClassPath);
+ commandList.addAll(topoWorkerLogwriterChildopts);
+ commandList.add("-Dlogfile.name=" + logfileName);
+ commandList.add("-Dstorm.home=" + stormHome);
+ commandList.add("-Dworkers.artifacts=" + workersArtifacets);
+ commandList.add("-Dstorm.id=" + stormId);
+ commandList.add("-Dworker.id=" + workerId);
+ commandList.add("-Dworker.port=" + port);
+ commandList.add("-Dstorm.log.dir=" + stormLogDir);
+ commandList.add("-Dlog4j.configurationFile=" + log4jConfigurationFile);
+ commandList.add("-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector");
+ commandList.add("org.apache.storm.LogWriter");
+
+ commandList.add(SupervisorUtils.javaCmd("java"));
+ commandList.add("-server");
+ commandList.addAll(workerChildopts);
+ commandList.addAll(topWorkerChildopts);
+ commandList.addAll(gcOpts);
+ commandList.addAll(workerProfilerChildopts);
+ commandList.add("-Djava.library.path=" + jlp);
+ commandList.add("-Dlogfile.name=" + logfileName);
+ commandList.add("-Dstorm.home=" + stormHome);
+ commandList.add("-Dworkers.artifacts=" + workersArtifacets);
+ commandList.add("-Dstorm.conf.file=" + stormConfFile);
+ commandList.add("-Dstorm.options=" + stormOptions);
+ commandList.add("-Dstorm.log.dir=" + stormLogDir);
+ commandList.add("-Djava.io.tmpdir=" + workerTmpDir);
+ commandList.add("-Dlogging.sensitivity=" + loggingSensitivity);
+ commandList.add("-Dlog4j.configurationFile=" + log4jConfigurationFile);
+ commandList.add("-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector");
+ commandList.add("-Dstorm.id=" + stormId);
+ commandList.add("-Dworker.id=" + workerId);
+ commandList.add("-Dworker.port=" + port);
+ commandList.add("-cp");
+ commandList.add(workerClassPath);
+ commandList.add("org.apache.storm.daemon.worker");
+ commandList.add(stormId);
+ commandList.add(assignmentId);
+ commandList.add(String.valueOf(port));
+ commandList.add(workerId);
+
+ // {"cpu" cpu "memory" (+ mem-onheap mem-offheap (int (Math/ceil (conf STORM-CGROUP-MEMORY-LIMIT-TOLERANCE-MARGIN-MB))))
+ if (resourceIsolationManager != null) {
+ int cGroupMem = (int) (Math.ceil((double) conf.get(Config.STORM_CGROUP_MEMORY_LIMIT_TOLERANCE_MARGIN_MB)));
+ int memoryValue = memoffheap + memOnheap + cGroupMem;
+ int cpuValue = cpu;
+ Map<String, Number> map = new HashMap<>();
+ map.put("cpu", cpuValue);
+ map.put("memory", memoryValue);
+ resourceIsolationManager.reserveResourcesForWorker(workerId, map);
+ commandList = resourceIsolationManager.getLaunchCommand(workerId, commandList);
+ }
+
+ LOG.info("Launching worker with command: {}. ", Utils.shellCmd(commandList));
+
+ String logPrefix = "Worker Process " + workerId;
+ String workerDir = ConfigUtils.workerRoot(conf, workerId);
+
+ if (runWorkerAsUser) {
+ List<String> args = new ArrayList<>();
+ args.add("worker");
+ args.add(workerDir);
+ args.add(Utils.writeScript(workerDir, commandList, topEnvironment));
+ SupervisorUtils.processLauncher(conf, user, args, null, logPrefix, workerExitCallback, new File(workerDir));
+ } else {
+ Utils.launchProcess(commandList, topEnvironment, logPrefix, workerExitCallback, new File(workerDir));
+ }
+ } catch (IOException e) {
+ throw Utils.wrapInRuntime(e);
+ }
+ return null;
+ }
+
+ @Override
+ public IWorkerResult shutdownWorker(String supervisorId, String workerId, Map<String, String> workerThreadPids) {
+ try {
+ LOG.info("Shutting down {}:{}", supervisorId, workerId);
+ Collection<String> pids = Utils.readDirContents(ConfigUtils.workerPidsRoot(conf, workerId));
+ Integer shutdownSleepSecs = Utils.getInt(conf.get(Config.SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS));
+ String user = ConfigUtils.getWorkerUser(conf, workerId);
+ String threadPid = workerThreadPids.get(workerId);
+ if (StringUtils.isNotBlank(threadPid)) {
+ ProcessSimulator.killProcess(threadPid);
+ }
+
+ for (String pid : pids) {
+ if (runWorkerAsUser) {
+ List<String> commands = new ArrayList<>();
+ commands.add("signal");
+ commands.add(pid);
+ commands.add("15");
+ String logPrefix = "kill -15 " + pid;
+ SupervisorUtils.processLauncherAndWait(conf, user, commands, null, logPrefix);
+ } else {
+ Utils.killProcessWithSigTerm(pid);
+ }
+ }
+
+ if (pids.size() > 0) {
+ LOG.info("Sleep {} seconds for execution of cleanup threads on worker.", shutdownSleepSecs);
+ Time.sleepSecs(shutdownSleepSecs);
+ }
+
+ for (String pid : pids) {
+ if (runWorkerAsUser) {
+ List<String> commands = new ArrayList<>();
+ commands.add("signal");
+ commands.add(pid);
+ commands.add("9");
+ String logPrefix = "kill -9 " + pid;
+ SupervisorUtils.processLauncherAndWait(conf, user, commands, null, logPrefix);
+ } else {
+ Utils.forceKillProcess(pid);
+ }
+ String path = ConfigUtils.workerPidPath(conf, workerId, pid);
+ if (runWorkerAsUser) {
+ SupervisorUtils.rmrAsUser(conf, workerId, path);
+ } else {
+ try {
+ LOG.debug("Removing path {}", path);
+ new File(path).delete();
+ } catch (Exception e) {
+ // on windows, the supervisor may still holds the lock on the worker directory
+ // ignore
+ }
+ }
+ }
+ LOG.info("Shut down {}:{}", supervisorId, workerId);
+ } catch (Exception e) {
+ throw Utils.wrapInRuntime(e);
+ }
+ return null;
+ }
+
+ @Override
+ public boolean cleanupWorker(String workerId) {
+ try {
+ String workerRoot = ConfigUtils.workerRoot(conf, workerId);
+ if (Utils.checkFileExists(workerRoot)) {
+ if (runWorkerAsUser) {
+ SupervisorUtils.rmrAsUser(conf, workerId, workerRoot);
+ } else {
+ Utils.forceDelete(ConfigUtils.workerHeartbeatsRoot(conf, workerId));
+ Utils.forceDelete(ConfigUtils.workerPidsRoot(conf, workerId));
+ Utils.forceDelete(ConfigUtils.workerTmpRoot(conf, workerId));
+ Utils.forceDelete(ConfigUtils.workerRoot(conf, workerId));
+ }
+ ConfigUtils.removeWorkerUserWSE(conf, workerId);
+ }
+ if (resourceIsolationManager != null) {
+ resourceIsolationManager.releaseResourcesForWorker(workerId);
+ }
+ return true;
+ } catch (IOException e) {
+ LOG.warn("Failed to cleanup worker {}. Will retry later", workerId, e);
+ } catch (RuntimeException e) {
+ LOG.warn("Failed to cleanup worker {}. Will retry later", workerId, e);
+ }
+ return false;
+ }
+
+ @Override
+ public IWorkerResult resizeWorker(String supervisorId, String assignmentId, String stormId, Long port, String workerId, WorkerResources resources) {
+ return null;
+ }
+
+ protected String jlp(String stormRoot, Map conf) {
+ String resourceRoot = stormRoot + Utils.FILE_PATH_SEPARATOR + ConfigUtils.RESOURCES_SUBDIR;
+ String os = System.getProperty("os.name").replaceAll("\\s+", "_");
+ String arch = System.getProperty("os.arch");
+ String archResourceRoot = resourceRoot + Utils.FILE_PATH_SEPARATOR + os + "-" + arch;
+ String ret = archResourceRoot + Utils.FILE_PATH_SEPARATOR + resourceRoot + Utils.FILE_PATH_SEPARATOR + conf.get(Config.JAVA_LIBRARY_PATH);
+ return ret;
+ }
+
+ protected String getWorkerClassPath(String stormJar, Map stormConf) {
+ List<String> topoClasspath = new ArrayList<>();
+ Object object = stormConf.get(Config.TOPOLOGY_CLASSPATH);
+
+ if (object instanceof List) {
+ topoClasspath.addAll((List<String>) object);
+ } else if (object instanceof String) {
+ topoClasspath.add((String) object);
+ } else {
+ LOG.error("topology specific classpath is invaild");
+ }
+ String classPath = Utils.workerClasspath();
+ String classAddPath = Utils.addToClasspath(classPath, Arrays.asList(stormJar));
+ return Utils.addToClasspath(classAddPath, topoClasspath);
+ }
+
+ /**
+ * "Generates runtime childopts by replacing keys with topology-id, worker-id, port, mem-onheap"
+ *
+ * @param value
+ * @param workerId
+ * @param stormId
+ * @param port
+ * @param memOnheap
+ */
+ public List<String> substituteChildopts(Object value, String workerId, String stormId, Long port, int memOnheap) {
+ List<String> rets = new ArrayList<>();
+ if (value instanceof String) {
+ String string = (String) value;
+ string = string.replace("%ID%", String.valueOf(port));
+ string = string.replace("%WORKER-ID%", workerId);
+ string = string.replace("%TOPOLOGY-ID%", stormId);
+ string = string.replace("%WORKER-PORT%", String.valueOf(port));
+ string = string.replace("%HEAP-MEM%", String.valueOf(memOnheap));
+ String[] strings = string.split("\\s+");
+ rets.addAll(Arrays.asList(strings));
+ } else if (value instanceof List) {
+ List<Object> objects = (List<Object>) value;
+ for (Object object : objects) {
+ String str = (String) object;
+ str = str.replace("%ID%", String.valueOf(port));
+ str = str.replace("%WORKER-ID%", workerId);
+ str = str.replace("%TOPOLOGY-ID%", stormId);
+ str = str.replace("%WORKER-PORT%", String.valueOf(port));
+ str = str.replace("%HEAP-MEM%", String.valueOf(memOnheap));
+ rets.add(str);
+ }
+ }
+ return rets;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/a1e47352/storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/IWorkerManager.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/IWorkerManager.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/IWorkerManager.java
new file mode 100644
index 0000000..3b0912a
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/IWorkerManager.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor.workermanager;
+
+import org.apache.storm.generated.WorkerResources;
+import org.apache.storm.localizer.Localizer;
+import org.apache.storm.utils.Utils;
+
+import java.util.List;
+import java.util.Map;
+
+public interface IWorkerManager {
+ public void prepareWorker(Map conf, Localizer localizer);
+
+ IWorkerResult launchWorker(String supervisorId, String assignmentId, String stormId, Long port, String workerId, WorkerResources resources,
+ Utils.ExitCodeCallable workerExitCallback);
+
+ IWorkerResult shutdownWorker(String supervisorId, String workerId, Map<String, String> workerThreadPids);
+
+ IWorkerResult resizeWorker(String supervisorId, String assignmentId, String stormId, Long port, String workerId, WorkerResources resources);
+
+ public boolean cleanupWorker(String workerId);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/a1e47352/storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/IWorkerResult.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/IWorkerResult.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/IWorkerResult.java
new file mode 100644
index 0000000..8bf5b14
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/IWorkerResult.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor.workermanager;
+
+public interface IWorkerResult {
+}