You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/04/01 16:07:32 UTC
[09/35] storm git commit: xxxx
xxxx
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/465a4b89
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/465a4b89
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/465a4b89
Branch: refs/heads/master
Commit: 465a4b89521a4ac15b81969009133bdfa12d0655
Parents: 42bacde
Author: xiaojian.fxj <xi...@alibaba-inc.com>
Authored: Thu Mar 10 20:12:18 2016 +0800
Committer: xiaojian.fxj <xi...@alibaba-inc.com>
Committed: Thu Mar 10 20:12:18 2016 +0800
----------------------------------------------------------------------
.../org/apache/storm/command/kill_workers.clj | 5 +-
.../apache/storm/daemon/local_supervisor.clj | 5 +-
.../storm/daemon/supervisor/ShutdownWork.java | 7 +-
.../daemon/supervisor/StandaloneSupervisor.java | 2 -
.../apache/storm/daemon/supervisor/State.java | 2 +-
.../storm/daemon/supervisor/Supervisor.java | 9 +-
.../storm/daemon/supervisor/SupervisorData.java | 112 ++++---------------
.../daemon/supervisor/SupervisorManger.java | 5 +-
.../daemon/supervisor/SupervisorUtils.java | 101 +++++++++++++++--
.../daemon/supervisor/SyncProcessEvent.java | 33 +++---
.../daemon/supervisor/SyncSupervisorEvent.java | 17 ++-
.../supervisor/timer/RunProfilerActions.java | 2 +-
.../supervisor/timer/SupervisorHealthCheck.java | 4 +-
.../supervisor/timer/SupervisorHeartbeat.java | 14 +--
.../daemon/supervisor/timer/UpdateBlobs.java | 5 +-
15 files changed, 168 insertions(+), 155 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/465a4b89/storm-core/src/clj/org/apache/storm/command/kill_workers.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/command/kill_workers.clj b/storm-core/src/clj/org/apache/storm/command/kill_workers.clj
index a7de176..4ddc993 100644
--- a/storm-core/src/clj/org/apache/storm/command/kill_workers.clj
+++ b/storm-core/src/clj/org/apache/storm/command/kill_workers.clj
@@ -28,7 +28,6 @@
conf (assoc conf STORM-LOCAL-DIR (. (File. (conf STORM-LOCAL-DIR)) getCanonicalPath))
isupervisor (StandaloneSupervisor.)
supervisor-data (SupervisorData. conf nil isupervisor)
- ids (SupervisorUtils/myWorkerIds conf)
- shut-workers (ShutdownWork.)]
+ ids (SupervisorUtils/supervisorWorkerIds conf)]
(doseq [id ids]
- (.shutWorker shut-workers supervisor-data id))))
+ (SupervisorUtils/shutWorker supervisor-data id))))
http://git-wip-us.apache.org/repos/asf/storm/blob/465a4b89/storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj b/storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj
index 3dfed6f..70c280a 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj
@@ -14,7 +14,7 @@
;; See the License for the specific language governing permissions and
;; limitations under the License.
(ns org.apache.storm.daemon.local-supervisor
- (:import [org.apache.storm.daemon.supervisor SyncProcessEvent SupervisorData ShutdownWork Supervisor]
+ (:import [org.apache.storm.daemon.supervisor SyncProcessEvent SupervisorData ShutdownWork Supervisor SupervisorUtils]
[org.apache.storm.utils Utils ConfigUtils]
[org.apache.storm ProcessSimulator])
(:use [org.apache.storm.daemon common]
@@ -38,9 +38,8 @@
))
(defn shutdown-local-worker [supervisorData workerId]
- (let [shut-workers (ShutdownWork.)]
(log-message "shutdown-local-worker")
- (.shutWorker shut-workers supervisorData workerId)))
+ (SupervisorUtils/shutWorker supervisorData workerId))
(defn local-process []
"Create a local process event"
http://git-wip-us.apache.org/repos/asf/storm/blob/465a4b89/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ShutdownWork.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ShutdownWork.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ShutdownWork.java
index 5018ce1..ec69980 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ShutdownWork.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ShutdownWork.java
@@ -26,7 +26,6 @@ import org.apache.storm.utils.Time;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import java.io.File;
import java.io.IOException;
import java.util.*;
@@ -42,7 +41,7 @@ public class ShutdownWork implements Shutdownable {
Integer shutdownSleepSecs = Utils.getInt(conf.get(Config.SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS));
Boolean asUser = Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false);
String user = ConfigUtils.getWorkerUser(conf, workerId);
- String threadPid = supervisorData.getWorkerThreadPidsAtom().get(workerId);
+ String threadPid = supervisorData.getWorkerThreadPids().get(workerId);
if (StringUtils.isNotBlank(threadPid)) {
ProcessSimulator.killProcess(threadPid);
}
@@ -53,7 +52,7 @@ public class ShutdownWork implements Shutdownable {
commands.add("signal");
commands.add(pid);
commands.add("15");
- String logPrefix = "kill - 15 " + pid;
+ String logPrefix = "kill -15 " + pid;
SupervisorUtils.workerLauncherAndWait(conf, user, commands, null, logPrefix);
} else {
Utils.killProcessWithSigTerm(pid);
@@ -71,7 +70,7 @@ public class ShutdownWork implements Shutdownable {
commands.add("signal");
commands.add(pid);
commands.add("9");
- String logPrefix = "kill - 9 " + pid;
+ String logPrefix = "kill -9 " + pid;
SupervisorUtils.workerLauncherAndWait(conf, user, commands, null, logPrefix);
} else {
Utils.forceKillProcess(pid);
http://git-wip-us.apache.org/repos/asf/storm/blob/465a4b89/storm-core/src/jvm/org/apache/storm/daemon/supervisor/StandaloneSupervisor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/StandaloneSupervisor.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/StandaloneSupervisor.java
index c13df8b..d4ce623 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/StandaloneSupervisor.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/StandaloneSupervisor.java
@@ -28,9 +28,7 @@ import java.util.Map;
import java.util.UUID;
public class StandaloneSupervisor implements ISupervisor {
-
private String supervisorId;
-
private Map conf;
@Override
http://git-wip-us.apache.org/repos/asf/storm/blob/465a4b89/storm-core/src/jvm/org/apache/storm/daemon/supervisor/State.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/State.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/State.java
index 1913c91..28dffd7 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/State.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/State.java
@@ -18,5 +18,5 @@
package org.apache.storm.daemon.supervisor;
public enum State {
- valid, disallowed, notStarted, timedOut;
+ VALID, DISALLOWED, NOT_STARTED, TIMED_OUT;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/465a4b89/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java
index 2c7810d..847b38d 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java
@@ -140,7 +140,7 @@ public class Supervisor {
/**
* start distribute supervisor
*/
- private void launch() {
+ private void launch(ISupervisor iSupervisor) {
LOG.info("Starting supervisor for storm version '{}'.", VersionInfo.getVersion());
SupervisorManger supervisorManager;
try {
@@ -148,11 +148,10 @@ public class Supervisor {
if (ConfigUtils.isLocalMode(conf)) {
throw new IllegalArgumentException("Cannot start server in local mode!");
}
- ISupervisor iSupervisor = new StandaloneSupervisor();
supervisorManager = mkSupervisor(conf, null, iSupervisor);
if (supervisorManager != null)
Utils.addShutdownHookWithForceKillIn1Sec(supervisorManager);
- registerWorkerNumGauge("drpc:num-execute-http-requests", conf);
+ registerWorkerNumGauge("supervisor:num-slots-used-gauge", conf);
startMetricsReporters(conf);
} catch (Exception e) {
LOG.error("Failed to start supervisor\n", e);
@@ -167,7 +166,7 @@ public class Supervisor {
metricRegistry.register(name, new Gauge<Integer>() {
@Override
public Integer getValue() {
- Collection<String> pids = SupervisorUtils.myWorkerIds(conf);
+ Collection<String> pids = SupervisorUtils.supervisorWorkerIds(conf);
return pids.size();
}
});
@@ -191,6 +190,6 @@ public class Supervisor {
public static void main(String[] args) {
Utils.setupDefaultUncaughtExceptionHandler();
Supervisor instance = new Supervisor();
- instance.launch();
+ instance.launch(new StandaloneSupervisor());
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/465a4b89/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorData.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorData.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorData.java
index 039fe30..be39b4e 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorData.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorData.java
@@ -42,23 +42,25 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.UnknownHostException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
public class SupervisorData {
private static final Logger LOG = LoggerFactory.getLogger(SupervisorData.class);
- private Map conf;
- private IContext sharedContext;
+ private final Map conf;
+ private final IContext sharedContext;
private volatile boolean active;
private ISupervisor iSupervisor;
private Utils.UptimeComputer upTime;
private String stormVersion;
- private ConcurrentHashMap<String, String> workerThreadPidsAtom; // for local mode
+ private ConcurrentHashMap<String, String> workerThreadPids; // for local mode
private IStormClusterState stormClusterState;
@@ -71,7 +73,7 @@ public class SupervisorData {
private String hostName;
// used for reporting used ports when heartbeating
- private ConcurrentHashMap<Long, LocalAssignment> currAssignment;
+ private AtomicReference<Map<Long, LocalAssignment>> currAssignment;
private StormTimer heartbeatTimer;
@@ -81,13 +83,13 @@ public class SupervisorData {
private Localizer localizer;
- private ConcurrentHashMap<String, Map<String, Object>> assignmentVersions;
+ private AtomicReference<Map<String, Map<String, Object>>> assignmentVersions;
private AtomicInteger syncRetry;
private final Object downloadLock = new Object();
- private ConcurrentHashMap<String, List<ProfileRequest>> stormIdToProfileActions;
+ private AtomicReference<Map<String, List<ProfileRequest>>> stormIdToProfileActions;
private CgroupManager resourceIsolationManager;
@@ -100,7 +102,7 @@ public class SupervisorData {
this.active = true;
this.upTime = Utils.makeUptimeComputer();
this.stormVersion = VersionInfo.getVersion();
- this.workerThreadPidsAtom = new ConcurrentHashMap<String, String>();
+ this.workerThreadPids = new ConcurrentHashMap<String, String>();
this.deadWorkers = new ConcurrentHashSet();
List<ACL> acls = null;
@@ -130,7 +132,7 @@ public class SupervisorData {
throw Utils.wrapInRuntime(e);
}
- this.currAssignment = new ConcurrentHashMap<>();
+ this.currAssignment = new AtomicReference<Map<Long, LocalAssignment>>(new HashMap<Long,LocalAssignment>());
this.heartbeatTimer = new StormTimer(null, new DefaultUncaughtExceptionHandler());
@@ -138,9 +140,9 @@ public class SupervisorData {
this.blobUpdateTimer = new StormTimer("blob-update-timer", new DefaultUncaughtExceptionHandler());
- this.assignmentVersions = new ConcurrentHashMap<>();
+ this.assignmentVersions = new AtomicReference<Map<String, Map<String, Object>>>(new HashMap<String, Map<String, Object>>());
this.syncRetry = new AtomicInteger(0);
- this.stormIdToProfileActions = new ConcurrentHashMap<>();
+ this.stormIdToProfileActions = new AtomicReference<Map<String, List<ProfileRequest>>>(new HashMap<String, List<ProfileRequest>>());
if (Utils.getBoolean(conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN_ENABLE), false)) {
try {
this.resourceIsolationManager = (CgroupManager) Utils.newInstance((String) conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN));
@@ -154,31 +156,22 @@ public class SupervisorData {
}
}
- public ConcurrentHashMap<String, List<ProfileRequest>> getStormIdToProfileActions() {
+ public AtomicReference<Map<String, List<ProfileRequest>>> getStormIdToProfileActions() {
return stormIdToProfileActions;
}
public void setStormIdToProfileActions(Map<String, List<ProfileRequest>> stormIdToProfileActions) {
- this.stormIdToProfileActions.clear();
- this.stormIdToProfileActions.putAll(stormIdToProfileActions);
+ this.stormIdToProfileActions.set(stormIdToProfileActions);
}
public Map getConf() {
return conf;
}
- public void setConf(Map conf) {
- this.conf = conf;
- }
-
public IContext getSharedContext() {
return sharedContext;
}
- public void setSharedContext(IContext sharedContext) {
- this.sharedContext = sharedContext;
- }
-
public boolean isActive() {
return active;
}
@@ -191,107 +184,58 @@ public class SupervisorData {
return iSupervisor;
}
- public void setiSupervisor(ISupervisor iSupervisor) {
- this.iSupervisor = iSupervisor;
- }
-
public Utils.UptimeComputer getUpTime() {
return upTime;
}
- public void setUpTime(Utils.UptimeComputer upTime) {
- this.upTime = upTime;
- }
-
public String getStormVersion() {
return stormVersion;
}
- public void setStormVersion(String stormVersion) {
- this.stormVersion = stormVersion;
- }
-
- public ConcurrentHashMap<String, String> getWorkerThreadPidsAtom() {
- return workerThreadPidsAtom;
- }
-
- public void setWorkerThreadPidsAtom(ConcurrentHashMap<String, String> workerThreadPidsAtom) {
- this.workerThreadPidsAtom = workerThreadPidsAtom;
+ public ConcurrentHashMap<String, String> getWorkerThreadPids() {
+ return workerThreadPids;
}
public IStormClusterState getStormClusterState() {
return stormClusterState;
}
- public void setStormClusterState(IStormClusterState stormClusterState) {
- this.stormClusterState = stormClusterState;
- }
-
public LocalState getLocalState() {
return localState;
}
- public void setLocalState(LocalState localState) {
- this.localState = localState;
- }
-
public String getSupervisorId() {
return supervisorId;
}
- public void setSupervisorId(String supervisorId) {
- this.supervisorId = supervisorId;
- }
-
public String getAssignmentId() {
return assignmentId;
}
- public void setAssignmentId(String assignmentId) {
- this.assignmentId = assignmentId;
- }
-
public String getHostName() {
return hostName;
}
- public void setHostName(String hostName) {
- this.hostName = hostName;
- }
-
- public ConcurrentHashMap<Long, LocalAssignment> getCurrAssignment() {
+ public AtomicReference<Map<Long, LocalAssignment>> getCurrAssignment() {
return currAssignment;
}
public void setCurrAssignment(Map<Long, LocalAssignment> currAssignment) {
- this.currAssignment.clear();
- this.currAssignment.putAll(currAssignment);
+ this.currAssignment.set(currAssignment);
}
public StormTimer getHeartbeatTimer() {
return heartbeatTimer;
}
- public void setHeartbeatTimer(StormTimer heartbeatTimer) {
- this.heartbeatTimer = heartbeatTimer;
- }
-
public StormTimer getEventTimer() {
return eventTimer;
}
- public void setEventTimer(StormTimer eventTimer) {
- this.eventTimer = eventTimer;
- }
-
public StormTimer getBlobUpdateTimer() {
return blobUpdateTimer;
}
- public void setBlobUpdateTimer(StormTimer blobUpdateTimer) {
- this.blobUpdateTimer = blobUpdateTimer;
- }
-
public Localizer getLocalizer() {
return localizer;
}
@@ -304,36 +248,20 @@ public class SupervisorData {
return syncRetry;
}
- public void setSyncRetry(AtomicInteger syncRetry) {
- this.syncRetry = syncRetry;
- }
-
- public ConcurrentHashMap<String, Map<String, Object>> getAssignmentVersions() {
+ public AtomicReference<Map<String, Map<String, Object>>> getAssignmentVersions() {
return assignmentVersions;
}
public void setAssignmentVersions(Map<String, Map<String, Object>> assignmentVersions) {
- this.assignmentVersions.clear();
- this.assignmentVersions.putAll(assignmentVersions);
+ this.assignmentVersions.set(assignmentVersions);
}
public CgroupManager getResourceIsolationManager() {
return resourceIsolationManager;
}
- public void setResourceIsolationManager(CgroupManager resourceIsolationManager) {
- this.resourceIsolationManager = resourceIsolationManager;
- }
-
- public Object getDownloadLock() {
- return downloadLock;
- }
-
public ConcurrentHashSet getDeadWorkers() {
return deadWorkers;
}
- public void setDeadWorkers(ConcurrentHashSet deadWorkers) {
- this.deadWorkers = deadWorkers;
- }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/465a4b89/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorManger.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorManger.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorManger.java
index acc2cb8..6578529 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorManger.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorManger.java
@@ -25,7 +25,7 @@ import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.Map;
-public class SupervisorManger extends ShutdownWork implements SupervisorDaemon, DaemonCommon, Runnable {
+public class SupervisorManger implements SupervisorDaemon, DaemonCommon, Runnable {
private static final Logger LOG = LoggerFactory.getLogger(SupervisorManger.class);
@@ -41,7 +41,6 @@ public class SupervisorManger extends ShutdownWork implements SupervisorDaemon,
this.processesEventManager = processesEventManager;
}
- @Override
public void shutdown() {
LOG.info("Shutting down supervisor{}", supervisorData.getSupervisorId());
supervisorData.setActive(false);
@@ -63,7 +62,7 @@ public class SupervisorManger extends ShutdownWork implements SupervisorDaemon,
Collection<String> workerIds = SupervisorUtils.supervisorWorkerIds(supervisorData.getConf());
try {
for (String workerId : workerIds) {
- shutWorker(supervisorData, workerId);
+ SupervisorUtils.shutWorker(supervisorData, workerId);
}
} catch (Exception e) {
LOG.error("shutWorker failed");
http://git-wip-us.apache.org/repos/asf/storm/blob/465a4b89/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java
index 9d0b343..dd2a538 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java
@@ -20,11 +20,13 @@ package org.apache.storm.daemon.supervisor;
import org.apache.commons.lang.StringUtils;
import org.apache.curator.utils.PathUtils;
import org.apache.storm.Config;
+import org.apache.storm.ProcessSimulator;
import org.apache.storm.generated.LSWorkerHeartbeat;
import org.apache.storm.localizer.LocalResource;
import org.apache.storm.localizer.Localizer;
import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Time;
import org.apache.storm.utils.Utils;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
@@ -68,6 +70,7 @@ public class SupervisorUtils {
commands.add(wl);
commands.add(user);
commands.addAll(args);
+ LOG.info("Running as user: {} command: {}", user, commands);
return Utils.launchProcess(commands, environment, logPreFix, exitCodeCallback, dir);
}
@@ -115,7 +118,7 @@ public class SupervisorUtils {
* @param blobInfo
* @return
*/
- public static Boolean isShouldUncompressBlob(Map<String, Object> blobInfo) {
+ public static Boolean shouldUncompressBlob(Map<String, Object> blobInfo) {
return new Boolean((String) blobInfo.get("uncompress"));
}
@@ -129,7 +132,7 @@ public class SupervisorUtils {
List<LocalResource> localResourceList = new ArrayList<>();
if (blobstoreMap != null) {
for (Map.Entry<String, Map<String, Object>> map : blobstoreMap.entrySet()) {
- LocalResource localResource = new LocalResource(map.getKey(), isShouldUncompressBlob(map.getValue()));
+ LocalResource localResource = new LocalResource(map.getKey(), shouldUncompressBlob(map.getValue()));
localResourceList.add(localResource);
}
}
@@ -169,7 +172,7 @@ public class SupervisorUtils {
return Utils.readDirContents(workerRoot);
}
- public static boolean checkTopoFilesExist(Map conf, String stormId) throws IOException {
+ public static boolean doRequiredTopoFilesExist(Map conf, String stormId) throws IOException {
String stormroot = ConfigUtils.supervisorStormDistRoot(conf, stormId);
String stormjarpath = ConfigUtils.supervisorStormJarPath(stormroot);
String stormcodepath = ConfigUtils.supervisorStormCodePath(stormroot);
@@ -185,10 +188,6 @@ public class SupervisorUtils {
return false;
}
- public static Collection<String> myWorkerIds(Map conf){
- return Utils.readDirContents(ConfigUtils.workerRoot(conf));
- }
-
/**
* Returns map from worr id to heartbeat
*
@@ -263,11 +262,95 @@ public class SupervisorUtils {
return ret;
}
- public static List<ACL> supervisorZkAcls() {
- List<ACL> acls = new ArrayList<>();
+ public final static List<ACL> supervisorZkAcls() {
+ final List<ACL> acls = new ArrayList<>();
acls.add(ZooDefs.Ids.CREATOR_ALL_ACL.get(0));
acls.add(new ACL((ZooDefs.Perms.READ ^ ZooDefs.Perms.CREATE), ZooDefs.Ids.ANYONE_ID_UNSAFE));
return acls;
}
+ public static void shutWorker(SupervisorData supervisorData, String workerId) throws IOException, InterruptedException {
+ LOG.info("Shutting down {}:{}", supervisorData.getSupervisorId(), workerId);
+ Map conf = supervisorData.getConf();
+ Collection<String> pids = Utils.readDirContents(ConfigUtils.workerPidsRoot(conf, workerId));
+ Integer shutdownSleepSecs = Utils.getInt(conf.get(Config.SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS));
+ Boolean asUser = Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false);
+ String user = ConfigUtils.getWorkerUser(conf, workerId);
+ String threadPid = supervisorData.getWorkerThreadPids().get(workerId);
+ if (StringUtils.isNotBlank(threadPid)) {
+ ProcessSimulator.killProcess(threadPid);
+ }
+
+ for (String pid : pids) {
+ if (asUser) {
+ List<String> commands = new ArrayList<>();
+ commands.add("signal");
+ commands.add(pid);
+ commands.add("15");
+ String logPrefix = "kill -15 " + pid;
+ SupervisorUtils.workerLauncherAndWait(conf, user, commands, null, logPrefix);
+ } else {
+ Utils.killProcessWithSigTerm(pid);
+ }
+ }
+
+ if (pids.size() > 0) {
+ LOG.info("Sleep {} seconds for execution of cleanup threads on worker.", shutdownSleepSecs);
+ Time.sleepSecs(shutdownSleepSecs);
+ }
+
+ for (String pid : pids) {
+ if (asUser) {
+ List<String> commands = new ArrayList<>();
+ commands.add("signal");
+ commands.add(pid);
+ commands.add("9");
+ String logPrefix = "kill -9 " + pid;
+ SupervisorUtils.workerLauncherAndWait(conf, user, commands, null, logPrefix);
+ } else {
+ Utils.forceKillProcess(pid);
+ }
+ String path = ConfigUtils.workerPidPath(conf, workerId, pid);
+ if (asUser) {
+ SupervisorUtils.rmrAsUser(conf, workerId, path);
+ } else {
+ try {
+ LOG.debug("Removing path {}", path);
+ new File(path).delete();
+ } catch (Exception e) {
+ // on windows, the supervisor may still holds the lock on the worker directory
+ // ignore
+ }
+ }
+ }
+ tryCleanupWorker(conf, supervisorData, workerId);
+ LOG.info("Shut down {}:{}", supervisorData.getSupervisorId(), workerId);
+
+ }
+
+ public static void tryCleanupWorker(Map conf, SupervisorData supervisorData, String workerId) {
+ try {
+ String workerRoot = ConfigUtils.workerRoot(conf, workerId);
+ if (Utils.checkFileExists(workerRoot)) {
+ if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
+ SupervisorUtils.rmrAsUser(conf, workerId, workerRoot);
+ } else {
+ Utils.forceDelete(ConfigUtils.workerHeartbeatsRoot(conf, workerId));
+ Utils.forceDelete(ConfigUtils.workerPidsRoot(conf, workerId));
+ Utils.forceDelete(ConfigUtils.workerTmpRoot(conf, workerId));
+ Utils.forceDelete(ConfigUtils.workerRoot(conf, workerId));
+ }
+ ConfigUtils.removeWorkerUserWSE(conf, workerId);
+ supervisorData.getDeadWorkers().remove(workerId);
+ }
+ if (Utils.getBoolean(conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN_ENABLE), false)){
+ supervisorData.getResourceIsolationManager().releaseResourcesForWorker(workerId);
+ }
+ } catch (IOException e) {
+ LOG.warn("Failed to cleanup worker {}. Will retry later", workerId, e);
+ } catch (RuntimeException e) {
+ LOG.warn("Failed to cleanup worker {}. Will retry later", workerId, e);
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/storm/blob/465a4b89/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java
index 172d223..cf26896 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java
@@ -45,7 +45,7 @@ import java.util.*;
* ids, write new "approved workers" to LS 5. create local dir for worker id 5. launch new workers (give worker-id, port, and supervisor-id) 6. wait for workers
* launch
*/
-public class SyncProcessEvent extends ShutdownWork implements Runnable {
+public class SyncProcessEvent implements Runnable {
private static Logger LOG = LoggerFactory.getLogger(SyncProcessEvent.class);
@@ -53,6 +53,8 @@ public class SyncProcessEvent extends ShutdownWork implements Runnable {
private SupervisorData supervisorData;
+ public static final ExecutorInfo SYSTEM_EXECUTOR_INFO = new ExecutorInfo(-1, -1);
+
private class ProcessExitCallback implements Utils.ExitCodeCallable {
private final String logPrefix;
private final String workerId;
@@ -113,7 +115,7 @@ public class SyncProcessEvent extends ShutdownWork implements Runnable {
Set<Integer> keepPorts = new HashSet<>();
for (Map.Entry<String, StateHeartbeat> entry : localWorkerStats.entrySet()) {
StateHeartbeat stateHeartbeat = entry.getValue();
- if (stateHeartbeat.getState() == State.valid) {
+ if (stateHeartbeat.getState() == State.VALID) {
keeperWorkerIds.add(entry.getKey());
keepPorts.add(stateHeartbeat.getHeartbeat().get_port());
}
@@ -129,7 +131,7 @@ public class SyncProcessEvent extends ShutdownWork implements Runnable {
for (Map.Entry<String, StateHeartbeat> entry : localWorkerStats.entrySet()) {
StateHeartbeat stateHeartbeat = entry.getValue();
- if (stateHeartbeat.getState() != State.valid) {
+ if (stateHeartbeat.getState() != State.VALID) {
LOG.info("Shutting down and clearing state for id {}, Current supervisor time: {}, State: {}, Heartbeat: {}", entry.getKey(), now,
stateHeartbeat.getState(), stateHeartbeat.getHeartbeat());
shutWorker(supervisorData, entry.getKey());
@@ -180,9 +182,7 @@ public class SyncProcessEvent extends ShutdownWork implements Runnable {
}
return reassignExecutors;
}
-
-
-
+
/**
* Returns map from worker id to worker heartbeat. if the heartbeat is nil, then the worker is dead
*
@@ -205,16 +205,16 @@ public class SyncProcessEvent extends ShutdownWork implements Runnable {
LSWorkerHeartbeat whb = entry.getValue();
State state;
if (whb == null) {
- state = State.notStarted;
+ state = State.NOT_STARTED;
} else if (!approvedIds.contains(workerId) || !matchesAssignment(whb, assignedExecutors)) {
- state = State.disallowed;
+ state = State.DISALLOWED;
} else if (supervisorData.getDeadWorkers().contains(workerId)) {
- LOG.info("Worker Process {}as died", workerId);
- state = State.timedOut;
+ LOG.info("Worker Process {} has died", workerId);
+ state = State.TIMED_OUT;
} else if (SupervisorUtils.isWorkerHbTimedOut(now, whb, conf)) {
- state = State.timedOut;
+ state = State.TIMED_OUT;
} else {
- state = State.valid;
+ state = State.VALID;
}
LOG.debug("Worker:{} state:{} WorkerHeartbeat:{} at supervisor time-secs {}", workerId, state, whb, now);
workerIdHbstate.put(workerId, new StateHeartbeat(state, whb));
@@ -230,7 +230,7 @@ public class SyncProcessEvent extends ShutdownWork implements Runnable {
List<ExecutorInfo> executorInfos = new ArrayList<>();
executorInfos.addAll(whb.get_executors());
// remove SYSTEM_EXECUTOR_ID
- executorInfos.remove(new ExecutorInfo(-1, -1));
+ executorInfos.remove(SYSTEM_EXECUTOR_INFO);
List<ExecutorInfo> localExecuorInfos = localAssignment.get_executors();
if (localExecuorInfos.size() != executorInfos.size())
@@ -518,7 +518,7 @@ public class SyncProcessEvent extends ShutdownWork implements Runnable {
WorkerResources resources = assignment.get_resources();
// This condition checks for required files exist before launching the worker
- if (SupervisorUtils.checkTopoFilesExist(conf, stormId)) {
+ if (SupervisorUtils.doRequiredTopoFilesExist(conf, stormId)) {
String pidsPath = ConfigUtils.workerPidsRoot(conf, workerId);
String hbPath = ConfigUtils.workerHeartbeatsRoot(conf, workerId);
@@ -666,4 +666,9 @@ public class SyncProcessEvent extends ShutdownWork implements Runnable {
Utils.createSymlink(workerRoot, stormRoot, fileName, fileName);
}
}
+
+ //for supervisor-test
+ public void shutWorker(SupervisorData supervisorData, String workerId) throws IOException, InterruptedException{
+ SupervisorUtils.shutWorker(supervisorData, workerId);
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/465a4b89/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java
index 29aad12..e96395f 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java
@@ -75,7 +75,7 @@ public class SyncSupervisorEvent implements Runnable {
Runnable syncCallback = new EventManagerPushCallback(this, syncSupEventManager);
List<String> stormIds = stormClusterState.assignments(syncCallback);
Map<String, Map<String, Object>> assignmentsSnapshot =
- getAssignmentsSnapshot(stormClusterState, stormIds, supervisorData.getAssignmentVersions(), syncCallback);
+ getAssignmentsSnapshot(stormClusterState, stormIds, supervisorData.getAssignmentVersions().get(), syncCallback);
Map<String, List<ProfileRequest>> stormIdToProfilerActions = getProfileActions(stormClusterState, stormIds);
Set<String> allDownloadedTopologyIds = SupervisorUtils.readDownLoadedStormIds(conf);
@@ -191,7 +191,7 @@ public class SyncSupervisorEvent implements Runnable {
for (Map.Entry<String, StateHeartbeat> entry : workerIdHbstate.entrySet()) {
String workerId = entry.getKey();
StateHeartbeat stateHeartbeat = entry.getValue();
- if (stateHeartbeat != null && stateHeartbeat.getState() == State.valid) {
+ if (stateHeartbeat != null && stateHeartbeat.getState() == State.VALID) {
vaildPortToWorkerIds.put(stateHeartbeat.getHeartbeat().get_port(), workerId);
}
}
@@ -277,7 +277,7 @@ public class SyncSupervisorEvent implements Runnable {
for (Map.Entry<String, Map<String, Object>> entry : blobstoreMap.entrySet()) {
String key = entry.getKey();
Map<String, Object> blobInfo = entry.getValue();
- localizer.removeBlobReference(key, user, topoName, SupervisorUtils.isShouldUncompressBlob(blobInfo));
+ localizer.removeBlobReference(key, user, topoName, SupervisorUtils.shouldUncompressBlob(blobInfo));
}
}
}
@@ -312,7 +312,7 @@ public class SyncSupervisorEvent implements Runnable {
Set<String> srashStormIds = new HashSet<>();
for (String stormId : allDownloadedTopologyIds) {
if (assignedStormIds.contains(stormId)) {
- if (!SupervisorUtils.checkTopoFilesExist(conf, stormId)) {
+ if (!SupervisorUtils.doRequiredTopoFilesExist(conf, stormId)) {
LOG.debug("Files not present in topology directory");
rmTopoFiles(conf, stormId, localizer, false);
srashStormIds.add(stormId);
@@ -357,7 +357,12 @@ public class SyncSupervisorEvent implements Runnable {
blobStore.shutdown();
}
- FileUtils.moveDirectory(new File(tmproot), new File(stormroot));
+ try {
+ FileUtils.moveDirectory(new File(tmproot), new File(stormroot));
+ }catch (Exception e){
+ ;
+ }
+
SupervisorUtils.setupStormCodeDir(conf, ConfigUtils.readSupervisorStormConf(conf, stormId), stormroot);
ClassLoader classloader = Thread.currentThread().getContextClassLoader();
@@ -627,7 +632,7 @@ public class SyncSupervisorEvent implements Runnable {
for (Map.Entry<String, StateHeartbeat> entry : workerIdHbstate.entrySet()){
String workerId = entry.getKey();
StateHeartbeat stateHeartbeat = entry.getValue();
- if (stateHeartbeat.getState() == State.disallowed){
+ if (stateHeartbeat.getState() == State.DISALLOWED){
syncProcesses.shutWorker(supervisorData, workerId);
LOG.debug("{}'s state disallowed, so shutdown this worker");
}
http://git-wip-us.apache.org/repos/asf/storm/blob/465a4b89/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java
index 91044cc..d39a679 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java
@@ -84,7 +84,7 @@ public class RunProfilerActions implements Runnable {
@Override
public void run() {
- Map<String, List<ProfileRequest>> stormIdToActions = supervisorData.getStormIdToProfileActions();
+ Map<String, List<ProfileRequest>> stormIdToActions = supervisorData.getStormIdToProfileActions().get();
try {
for (Map.Entry<String, List<ProfileRequest>> entry : stormIdToActions.entrySet()) {
String stormId = entry.getKey();
http://git-wip-us.apache.org/repos/asf/storm/blob/465a4b89/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java
index 36ee6b6..49f48ef 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java
@@ -29,7 +29,7 @@ import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.Map;
-public class SupervisorHealthCheck extends ShutdownWork implements Runnable {
+public class SupervisorHealthCheck implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(SupervisorHealthCheck.class);
@@ -47,7 +47,7 @@ public class SupervisorHealthCheck extends ShutdownWork implements Runnable {
if (healthCode != 0) {
for (String workerId : workerIds) {
try {
- shutWorker(supervisorData, workerId);
+ SupervisorUtils.shutWorker(supervisorData, workerId);
} catch (Exception e) {
throw Utils.wrapInRuntime(e);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/465a4b89/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
index e158dbc..4137e94 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
@@ -31,12 +31,10 @@ import java.util.Map;
public class SupervisorHeartbeat implements Runnable {
- private IStormClusterState stormClusterState;
- private String supervisorId;
- private Map conf;
- private SupervisorInfo supervisorInfo;
-
- private SupervisorData supervisorData;
+ private final IStormClusterState stormClusterState;
+ private final String supervisorId;
+ private final Map conf;
+ private final SupervisorData supervisorData;
public SupervisorHeartbeat(Map conf, SupervisorData supervisorData) {
this.stormClusterState = supervisorData.getStormClusterState();
@@ -46,13 +44,13 @@ public class SupervisorHeartbeat implements Runnable {
}
private SupervisorInfo update(Map conf, SupervisorData supervisorData) {
- supervisorInfo = new SupervisorInfo();
+ SupervisorInfo supervisorInfo = new SupervisorInfo();
supervisorInfo.set_time_secs(Time.currentTimeSecs());
supervisorInfo.set_hostname(supervisorData.getHostName());
supervisorInfo.set_assignment_id(supervisorData.getAssignmentId());
List<Long> usedPorts = new ArrayList<>();
- usedPorts.addAll(supervisorData.getCurrAssignment().keySet());
+ usedPorts.addAll(supervisorData.getCurrAssignment().get().keySet());
supervisorInfo.set_used_ports(usedPorts);
List metaDatas = (List)supervisorData.getiSupervisor().getMetadata();
List<Long> portList = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/storm/blob/465a4b89/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java
index 623afa5..ebb1d5f 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java
@@ -38,6 +38,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
/**
* downloads all blobs listed in the topology configuration for all topologies assigned to this supervisor, and creates version files with a suffix. The
@@ -58,9 +59,9 @@ public class UpdateBlobs implements Runnable {
try {
Map conf = supervisorData.getConf();
Set<String> downloadedStormIds = SupervisorUtils.readDownLoadedStormIds(conf);
- ConcurrentHashMap<Long, LocalAssignment> newAssignment = supervisorData.getCurrAssignment();
+ AtomicReference<Map<Long, LocalAssignment>> newAssignment = supervisorData.getCurrAssignment();
Set<String> assignedStormIds = new HashSet<>();
- for (LocalAssignment localAssignment : newAssignment.values()) {
+ for (LocalAssignment localAssignment : newAssignment.get().values()) {
assignedStormIds.add(localAssignment.get_topology_id());
}
for (String stormId : downloadedStormIds) {