You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/04/01 16:07:25 UTC
[02/35] storm git commit: port Supervisor to java
port Supervisor to java
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/08934e29
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/08934e29
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/08934e29
Branch: refs/heads/master
Commit: 08934e29982d3936c9e247a8d7bac563053f869f
Parents: 73312ad
Author: xiaojian.fxj <xi...@alibaba-inc.com>
Authored: Fri Feb 26 12:38:23 2016 +0800
Committer: xiaojian.fxj <xi...@alibaba-inc.com>
Committed: Fri Feb 26 12:38:23 2016 +0800
----------------------------------------------------------------------
.../storm/daemon/supervisor/DaemonCommon.java | 22 +
.../DefaultUncaughtExceptionHandler.java | 31 +
.../supervisor/EventManagerPushCallback.java | 37 +
.../daemon/supervisor/RunProfilerActions.java | 221 ++++++
.../storm/daemon/supervisor/ShutdownWork.java | 125 ++++
.../daemon/supervisor/StandaloneSupervisor.java | 82 +++
.../apache/storm/daemon/supervisor/State.java | 22 +
.../storm/daemon/supervisor/StateHeartbeat.java | 45 ++
.../daemon/supervisor/SupervisorDaemon.java | 28 +
.../storm/daemon/supervisor/SupervisorData.java | 340 ++++++++++
.../daemon/supervisor/SupervisorHeartbeat.java | 84 +++
.../daemon/supervisor/SupervisorManger.java | 101 +++
.../daemon/supervisor/SupervisorServer.java | 212 ++++++
.../daemon/supervisor/SupervisorUtils.java | 173 +++++
.../daemon/supervisor/SyncProcessEvent.java | 674 +++++++++++++++++++
.../daemon/supervisor/SyncSupervisorEvent.java | 592 ++++++++++++++++
.../storm/daemon/supervisor/UpdateBlobs.java | 103 +++
17 files changed, 2892 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/08934e29/storm-core/src/jvm/org/apache/storm/daemon/supervisor/DaemonCommon.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/DaemonCommon.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/DaemonCommon.java
new file mode 100644
index 0000000..3b7a18e
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/DaemonCommon.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+public interface DaemonCommon {
+ boolean isWaiting();
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/08934e29/storm-core/src/jvm/org/apache/storm/daemon/supervisor/DefaultUncaughtExceptionHandler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/DefaultUncaughtExceptionHandler.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/DefaultUncaughtExceptionHandler.java
new file mode 100644
index 0000000..8785f86
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/DefaultUncaughtExceptionHandler.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DefaultUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
+ private static final Logger LOG = LoggerFactory.getLogger(DefaultUncaughtExceptionHandler.class);
+ @Override
+ public void uncaughtException(Thread t, Throwable e) {
+ LOG.error("Error when processing event", e);
+ Utils.exitProcess(20, "Error when processing an event");
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/08934e29/storm-core/src/jvm/org/apache/storm/daemon/supervisor/EventManagerPushCallback.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/EventManagerPushCallback.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/EventManagerPushCallback.java
new file mode 100644
index 0000000..177bf67
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/EventManagerPushCallback.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import org.apache.storm.event.EventManager;
+
+public class EventManagerPushCallback implements Runnable {
+
+ private EventManager eventManager;
+
+ private Runnable cb;
+
+ public EventManagerPushCallback(Runnable cb, EventManager eventManager) {
+ this.eventManager = eventManager;
+ this.cb = cb;
+ }
+
+ @Override
+ public void run() {
+ eventManager.add(cb);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/08934e29/storm-core/src/jvm/org/apache/storm/daemon/supervisor/RunProfilerActions.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/RunProfilerActions.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/RunProfilerActions.java
new file mode 100644
index 0000000..209c067
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/RunProfilerActions.java
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.daemon.supervisor;
+
+import org.apache.storm.Config;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.generated.ProfileAction;
+import org.apache.storm.generated.ProfileRequest;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.*;
+
+public class RunProfilerActions implements Runnable {
+ private static Logger LOG = LoggerFactory.getLogger(RunProfilerActions.class);
+
+ private Map conf;
+ private IStormClusterState stormClusterState;
+ private String hostName;
+ private String stormHome;
+
+ private String profileCmd;
+
+ private SupervisorData supervisorData;
+
+ private class ActionExitCallback implements Utils.ExitCodeCallable {
+ private String stormId;
+ private ProfileRequest profileRequest;
+ private String logPrefix;
+
+ public ActionExitCallback(String stormId, ProfileRequest profileRequest, String logPrefix) {
+ this.stormId = stormId;
+ this.profileRequest = profileRequest;
+ this.logPrefix = logPrefix;
+ }
+
+ @Override
+ public Object call() throws Exception {
+ return null;
+ }
+
+ @Override
+ public Object call(int exitCode) {
+ LOG.info("{} profile-action exited for {}", logPrefix, exitCode);
+ try {
+ stormClusterState.deleteTopologyProfileRequests(stormId, profileRequest);
+ } catch (Exception e) {
+ LOG.warn("failed delete profileRequest: " + profileRequest);
+ }
+ return null;
+ }
+ }
+
+ public RunProfilerActions(SupervisorData supervisorData) {
+ this.conf = supervisorData.getConf();
+ this.stormClusterState = supervisorData.getStormClusterState();
+ this.hostName = supervisorData.getHostName();
+ this.stormHome = System.getProperty("storm.home");
+ this.profileCmd = (String) (conf.get(Config.WORKER_PROFILER_COMMAND));
+ this.supervisorData = supervisorData;
+ }
+
+ @Override
+ public void run() {
+ Map<String, List<ProfileRequest>> stormIdToActions = supervisorData.getStormIdToProfileActions();
+ try {
+ for (Map.Entry<String, List<ProfileRequest>> entry : stormIdToActions.entrySet()) {
+ String stormId = entry.getKey();
+ List<ProfileRequest> requests = entry.getValue();
+ if (requests != null) {
+ for (ProfileRequest profileRequest : requests) {
+ if (profileRequest.get_nodeInfo().get_node().equals(hostName)) {
+ boolean stop = System.currentTimeMillis() > profileRequest.get_time_stamp() ? true : false;
+ Long port = profileRequest.get_nodeInfo().get_port().iterator().next();
+ String targetDir = ConfigUtils.workerArtifactsRoot(conf, String.valueOf(port));
+ Map stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId);
+
+ String user = null;
+ if (stormConf.get(Config.TOPOLOGY_SUBMITTER_USER) != null) {
+ user = (String) (stormConf.get(Config.TOPOLOGY_SUBMITTER_USER));
+ }
+ Map<String, String> env = null;
+ if (stormConf.get(Config.TOPOLOGY_ENVIRONMENT) != null) {
+ env = (Map<String, String>) stormConf.get(Config.TOPOLOGY_ENVIRONMENT);
+ } else {
+ env = new HashMap<String, String>();
+ }
+
+ String str = ConfigUtils.workerArtifactsPidPath(conf, stormId, port.intValue());
+ StringBuilder stringBuilder = new StringBuilder();
+ FileReader reader = null;
+ BufferedReader br = null;
+ try {
+ reader = new FileReader(str);
+ br = new BufferedReader(reader);
+ int c;
+ while ((c = br.read()) >= 0) {
+ stringBuilder.append(c);
+ }
+ } catch (IOException e) {
+ if (reader != null)
+ reader.close();
+ if (br != null)
+ br.close();
+ }
+ String workerPid = stringBuilder.toString().trim();
+ ProfileAction profileAction = profileRequest.get_action();
+ String logPrefix = "ProfilerAction process " + stormId + ":" + port + " PROFILER_ACTION: " + profileAction + " ";
+
+ // Until PROFILER_STOP action is invalid, keep launching profiler start in case worker restarted
+ // The profiler plugin script validates if JVM is recording before starting another recording.
+ String command = mkCommand(profileAction, stop, workerPid, targetDir);
+ List<String> listCommand = new ArrayList<>();
+ if (command != null) {
+ listCommand.addAll(Arrays.asList(command.split(" ")));
+ }
+ try {
+ ActionExitCallback actionExitCallback = new ActionExitCallback(stormId, profileRequest, logPrefix);
+ launchProfilerActionForWorker(user, targetDir, listCommand, env, actionExitCallback, logPrefix);
+ } catch (IOException e) {
+ LOG.error("Error in processing ProfilerAction '{}' for {}:{}, will retry later", profileAction, stormId, port);
+ } catch (RuntimeException e) {
+ LOG.error("Error in processing ProfilerAction '{}' for {}:{}, will retry later", profileAction, stormId, port);
+ }
+ }
+ }
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("Error running profiler actions, will retry again later");
+ }
+ }
+
+ private void launchProfilerActionForWorker(String user, String targetDir, List<String> commands, Map<String, String> environment,
+ final Utils.ExitCodeCallable exitCodeCallable, String logPrefix) throws IOException {
+ File targetFile = new File(targetDir);
+ if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
+ LOG.info("Running as user:{} command:{}", user, commands);
+ String containerFile = Utils.containerFilePath(targetDir);
+ if (Utils.checkFileExists(containerFile)) {
+ SupervisorUtils.rmrAsUser(conf, containerFile, containerFile);
+ }
+ String scriptFile = Utils.scriptFilePath(targetDir);
+ if (Utils.checkFileExists(scriptFile)) {
+ SupervisorUtils.rmrAsUser(conf, scriptFile, scriptFile);
+ }
+ String script = Utils.writeScript(targetDir, commands, environment);
+ List<String> newCommands = new ArrayList<>();
+ newCommands.add("profiler");
+ newCommands.add(targetDir);
+ newCommands.add(script);
+ SupervisorUtils.workerLauncher(conf, user, newCommands, environment, logPrefix, exitCodeCallable, targetFile);
+ } else {
+ Utils.launchProcess(commands, environment, logPrefix, exitCodeCallable, targetFile);
+ }
+ }
+
+ private String mkCommand(ProfileAction action, boolean stop, String workerPid, String targetDir) {
+ if (action == ProfileAction.JMAP_DUMP) {
+ return jmapDumpCmd(workerPid, targetDir);
+ } else if (action == ProfileAction.JSTACK_DUMP) {
+ return jstackDumpCmd(workerPid, targetDir);
+ } else if (action == ProfileAction.JPROFILE_DUMP) {
+ return jprofileDump(workerPid, targetDir);
+ } else if (action == ProfileAction.JVM_RESTART) {
+ return jprofileJvmRestart(workerPid);
+ } else if (!stop && action == ProfileAction.JPROFILE_STOP) {
+ return jprofileStart(workerPid);
+ } else if (stop && action == ProfileAction.JPROFILE_STOP) {
+ return jprofileStop(workerPid, targetDir);
+ }
+ return null;
+ }
+
+ private String jmapDumpCmd(String pid, String targetDir) {
+ return profileCmd + " " + pid + " jmap " + targetDir;
+ }
+
+ private String jstackDumpCmd(String pid, String targetDir) {
+ return profileCmd + " " + pid + " jstack " + targetDir;
+ }
+
+ private String jprofileStart(String pid) {
+ return profileCmd + " " + pid + " start";
+ }
+
+ private String jprofileStop(String pid, String targetDir) {
+ return profileCmd + " " + pid + " stop " + targetDir;
+ }
+
+ private String jprofileDump(String pid, String targetDir) {
+ return profileCmd + " " + pid + " dump " + targetDir;
+ }
+
+ private String jprofileJvmRestart(String pid) {
+ return profileCmd + " " + pid + " kill";
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/08934e29/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ShutdownWork.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ShutdownWork.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ShutdownWork.java
new file mode 100644
index 0000000..674454b
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ShutdownWork.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.storm.Config;
+import org.apache.storm.ProcessSimulator;
+import org.apache.storm.daemon.Shutdownable;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+
+public abstract class ShutdownWork implements Shutdownable {
+
+ private static Logger LOG = LoggerFactory.getLogger(ShutdownWork.class);
+
+ public void shutWorker(SupervisorData supervisorData, String workerId) throws IOException, InterruptedException {
+
+ LOG.info("Shutting down {}:{}", supervisorData.getSupervisorId(), workerId);
+ Map conf = supervisorData.getConf();
+ Collection<String> pids = Utils.readDirContents(ConfigUtils.workerPidsRoot(conf, workerId));
+ Integer shutdownSleepSecs = (Integer) conf.get(Config.SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS);
+ Boolean asUser = Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false);
+ String user = ConfigUtils.getWorkerUser(conf, workerId);
+ String threadPid = supervisorData.getWorkerThreadPidsAtom().get(workerId);
+ if (StringUtils.isNotBlank(threadPid)) {
+ ProcessSimulator.killProcess(threadPid);
+ }
+
+ for (String pid : pids) {
+ if (asUser) {
+ List<String> commands = new ArrayList<>();
+ commands.add("signal");
+ commands.add(pid);
+ commands.add("15");
+ String logPrefix = "kill - 15 " + pid;
+ SupervisorUtils.workerLauncherAndWait(conf, user, commands, null, logPrefix);
+ } else {
+ Utils.killProcessWithSigTerm(pid);
+ }
+ }
+
+ if (pids.size() > 0) {
+ LOG.info("Sleep {} seconds for execution of cleanup threads on worker.", shutdownSleepSecs);
+ Time.sleepSecs(shutdownSleepSecs);
+ }
+
+ for (String pid : pids) {
+ if (asUser) {
+ List<String> commands = new ArrayList<>();
+ commands.add("signal");
+ commands.add(pid);
+ commands.add("9");
+ String logPrefix = "kill - 9 " + pid;
+ SupervisorUtils.workerLauncherAndWait(conf, user, commands, null, logPrefix);
+ } else {
+ Utils.forceKillProcess(pid);
+ }
+ String path = ConfigUtils.workerPidPath(conf, workerId, pid);
+ if (asUser) {
+ SupervisorUtils.rmrAsUser(conf, workerId, path);
+ } else {
+ try {
+ LOG.debug("Removing path {}", path);
+ new File(path).delete();
+ } catch (Exception e) {
+ // on windows, the supervisor may still holds the lock on the worker directory
+ // ignore
+ }
+ }
+ }
+ tryCleanupWorker(conf, supervisorData, workerId);
+ LOG.info("Shut down {}:{}", supervisorData.getSupervisorId(), workerId);
+
+ }
+
+ protected void tryCleanupWorker(Map conf, SupervisorData supervisorData, String workerId) {
+ try {
+ String workerRoot = ConfigUtils.workerRoot(conf, workerId);
+ if (Utils.checkFileExists(workerRoot)) {
+ if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
+ SupervisorUtils.rmrAsUser(conf, workerId, workerRoot);
+ } else {
+ Utils.forceDelete(ConfigUtils.workerHeartbeatsRoot(conf, workerId));
+ Utils.forceDelete(ConfigUtils.workerPidsRoot(conf, workerId));
+ Utils.forceDelete(ConfigUtils.workerRoot(conf, workerId));
+ }
+ ConfigUtils.removeWorkerUserWSE(conf, workerId);
+ supervisorData.getDeadWorkers().remove(workerId);
+ }
+ if (conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN_ENABLE) != null) {
+ supervisorData.getResourceIsolationManager().releaseResourcesForWorker(workerId);
+ }
+ } catch (IOException e) {
+ LOG.warn("{} Failed to cleanup worker {}. Will retry later", e, workerId);
+ } catch (RuntimeException e) {
+ LOG.warn("{} Failed to cleanup worker {}. Will retry later", e, workerId);
+ }
+ }
+
+ @Override
+ public void shutdown() {
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/08934e29/storm-core/src/jvm/org/apache/storm/daemon/supervisor/StandaloneSupervisor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/StandaloneSupervisor.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/StandaloneSupervisor.java
new file mode 100644
index 0000000..da54b88
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/StandaloneSupervisor.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import org.apache.storm.Config;
+import org.apache.storm.scheduler.ISupervisor;
+import org.apache.storm.utils.LocalState;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.UUID;
+
+public class StandaloneSupervisor implements ISupervisor {
+
+ private String supervisorId;
+
+ private Map conf;
+
+ @Override
+ public void prepare(Map stormConf, String schedulerLocalDir) {
+ try {
+ LocalState localState = new LocalState(schedulerLocalDir);
+ String supervisorId = localState.getSupervisorId();
+ if (supervisorId == null) {
+ supervisorId = UUID.randomUUID().toString();
+ localState.setSupervisorId(supervisorId);
+ }
+ this.conf = stormConf;
+ this.supervisorId = supervisorId;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public String getSupervisorId() {
+ return supervisorId;
+ }
+
+ @Override
+ public String getAssignmentId() {
+ return supervisorId;
+ }
+
+ @Override
+ // @return is vector which need be converted to be int
+ public Object getMetadata() {
+ Object ports = conf.get(Config.SUPERVISOR_SLOTS_PORTS);
+ return ports;
+ }
+
+ @Override
+ public boolean confirmAssigned(int port) {
+ return true;
+ }
+
+ @Override
+ public void killedWorker(int port) {
+
+ }
+
+ @Override
+ public void assigned(Collection<Integer> ports) {
+
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/08934e29/storm-core/src/jvm/org/apache/storm/daemon/supervisor/State.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/State.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/State.java
new file mode 100644
index 0000000..1913c91
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/State.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+public enum State {
+ valid, disallowed, notStarted, timedOut;
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/08934e29/storm-core/src/jvm/org/apache/storm/daemon/supervisor/StateHeartbeat.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/StateHeartbeat.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/StateHeartbeat.java
new file mode 100644
index 0000000..cca3fa2
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/StateHeartbeat.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import org.apache.commons.lang.builder.ToStringBuilder;
+import org.apache.commons.lang.builder.ToStringStyle;
+import org.apache.storm.generated.LSWorkerHeartbeat;
+
+public class StateHeartbeat {
+ private State state;
+ private LSWorkerHeartbeat hb;
+
+ public StateHeartbeat(State state, LSWorkerHeartbeat hb) {
+ this.state = state;
+ this.hb = hb;
+ }
+
+ public State getState() {
+ return this.state;
+ }
+
+ public LSWorkerHeartbeat getHeartbeat() {
+ return this.hb;
+ }
+
+ @Override
+ public String toString() {
+ return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/08934e29/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorDaemon.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorDaemon.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorDaemon.java
new file mode 100644
index 0000000..115c7c6
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorDaemon.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import java.util.Map;
+
+public interface SupervisorDaemon {
+ String getId();
+
+ Map getConf();
+
+ void shutdownAllWorkers();
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/08934e29/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorData.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorData.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorData.java
new file mode 100644
index 0000000..9eec253
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorData.java
@@ -0,0 +1,340 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import org.apache.storm.Config;
+import org.apache.storm.StormTimer;
+import org.apache.storm.cluster.ClusterStateContext;
+import org.apache.storm.cluster.ClusterUtils;
+import org.apache.storm.cluster.DaemonType;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.container.cgroup.CgroupManager;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.generated.ProfileRequest;
+import org.apache.storm.localizer.Localizer;
+import org.apache.storm.messaging.IContext;
+import org.apache.storm.scheduler.ISupervisor;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.utils.VersionInfo;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+import org.eclipse.jetty.util.ConcurrentHashSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class SupervisorData {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SupervisorData.class);
+
+ private Map conf;
+ private IContext sharedContext;
+ private volatile boolean active;
+ private ISupervisor iSupervisor;
+ private Utils.UptimeComputer upTime;
+ private String stormVersion;
+
+ private ConcurrentHashMap<String, String> workerThreadPidsAtom; // for local mode
+
+ private IStormClusterState stormClusterState;
+
+ private LocalState localState;
+
+ private String supervisorId;
+
+ private String assignmentId;
+
+ private String hostName;
+
+ // used for reporting used ports when heartbeating
+ private ConcurrentHashMap<Long, LocalAssignment> currAssignment;
+
+ private StormTimer heartbeatTimer;
+
+ private StormTimer eventTimer;
+
+ private StormTimer blobUpdateTimer;
+
+ private Localizer localizer;
+
+ private ConcurrentHashMap<String, Map<String, Object>> assignmentVersions;
+
+ private AtomicInteger syncRetry;
+
+ private final Object downloadLock = new Object();
+
+ private ConcurrentHashMap<String, List<ProfileRequest>> stormIdToProfileActions;
+
+ private CgroupManager resourceIsolationManager;
+
+ private ConcurrentHashSet<String> deadWorkers;
+
+ public SupervisorData(Map conf, IContext sharedContext, ISupervisor iSupervisor) {
+ this.conf = conf;
+ this.sharedContext = sharedContext;
+ this.iSupervisor = iSupervisor;
+ this.active = true;
+ this.upTime = Utils.makeUptimeComputer();
+ this.stormVersion = VersionInfo.getVersion();
+ this.workerThreadPidsAtom = new ConcurrentHashMap<String, String>();
+ this.deadWorkers = new ConcurrentHashSet();
+
+ List<ACL> acls = null;
+ if (Utils.isZkAuthenticationConfiguredStormServer(conf)) {
+ acls = new ArrayList<>();
+ acls.add(ZooDefs.Ids.CREATOR_ALL_ACL.get(0));
+ acls.add(new ACL((ZooDefs.Perms.READ ^ ZooDefs.Perms.CREATE), ZooDefs.Ids.ANYONE_ID_UNSAFE));
+ }
+ try {
+ this.stormClusterState = ClusterUtils.mkStormClusterState(conf, acls, new ClusterStateContext(DaemonType.SUPERVISOR));
+ } catch (Exception e) {
+ LOG.error("supervisor can't create stormClusterState");
+ throw Utils.wrapInRuntime(e);
+ }
+
+ try {
+ this.localState = ConfigUtils.supervisorState(conf);
+ this.localizer = Utils.createLocalizer(conf, ConfigUtils.supervisorLocalDir(conf));
+ } catch (IOException e) {
+ throw Utils.wrapInRuntime(e);
+ }
+ this.supervisorId = iSupervisor.getSupervisorId();
+ this.assignmentId = iSupervisor.getAssignmentId();
+
+ try {
+ this.hostName = Utils.hostname(conf);
+ } catch (UnknownHostException e) {
+ throw Utils.wrapInRuntime(e);
+ }
+
+ this.currAssignment = new ConcurrentHashMap<>();
+
+ this.heartbeatTimer = new StormTimer(null, new DefaultUncaughtExceptionHandler());
+
+ this.eventTimer = new StormTimer(null, new DefaultUncaughtExceptionHandler());
+
+ this.blobUpdateTimer = new StormTimer("blob-update-timer", new DefaultUncaughtExceptionHandler());
+
+ this.assignmentVersions = new ConcurrentHashMap<>();
+ this.syncRetry = new AtomicInteger(0);
+ this.stormIdToProfileActions = new ConcurrentHashMap<>();
+ if (Utils.getBoolean(conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN_ENABLE), false)) {
+ try {
+ this.resourceIsolationManager = (CgroupManager) Utils.newInstance((String) conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN));
+ this.resourceIsolationManager.prepare(conf);
+ LOG.info("Using resource isolation plugin {} {}", conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN), resourceIsolationManager);
+ } catch (IOException e) {
+ throw Utils.wrapInRuntime(e);
+ }
+ } else {
+ this.resourceIsolationManager = null;
+ }
+ }
+
+ public ConcurrentHashMap<String, List<ProfileRequest>> getStormIdToProfileActions() {
+ return stormIdToProfileActions;
+ }
+
+ public void setStormIdToProfileActions(Map<String, List<ProfileRequest>> stormIdToProfileActions) {
+ this.stormIdToProfileActions.clear();
+ this.stormIdToProfileActions.putAll(stormIdToProfileActions);
+ }
+
+ public Map getConf() {
+ return conf;
+ }
+
+ public void setConf(Map conf) {
+ this.conf = conf;
+ }
+
+ public IContext getSharedContext() {
+ return sharedContext;
+ }
+
+ public void setSharedContext(IContext sharedContext) {
+ this.sharedContext = sharedContext;
+ }
+
+ public boolean isActive() {
+ return active;
+ }
+
+ public void setActive(boolean active) {
+ this.active = active;
+ }
+
+ public ISupervisor getiSupervisor() {
+ return iSupervisor;
+ }
+
+ public void setiSupervisor(ISupervisor iSupervisor) {
+ this.iSupervisor = iSupervisor;
+ }
+
+ public Utils.UptimeComputer getUpTime() {
+ return upTime;
+ }
+
+ public void setUpTime(Utils.UptimeComputer upTime) {
+ this.upTime = upTime;
+ }
+
+ public String getStormVersion() {
+ return stormVersion;
+ }
+
+ public void setStormVersion(String stormVersion) {
+ this.stormVersion = stormVersion;
+ }
+
+ public ConcurrentHashMap<String, String> getWorkerThreadPidsAtom() {
+ return workerThreadPidsAtom;
+ }
+
+ public void setWorkerThreadPidsAtom(ConcurrentHashMap<String, String> workerThreadPidsAtom) {
+ this.workerThreadPidsAtom = workerThreadPidsAtom;
+ }
+
+ public IStormClusterState getStormClusterState() {
+ return stormClusterState;
+ }
+
+ public void setStormClusterState(IStormClusterState stormClusterState) {
+ this.stormClusterState = stormClusterState;
+ }
+
+ public LocalState getLocalState() {
+ return localState;
+ }
+
+ public void setLocalState(LocalState localState) {
+ this.localState = localState;
+ }
+
+ public String getSupervisorId() {
+ return supervisorId;
+ }
+
+ public void setSupervisorId(String supervisorId) {
+ this.supervisorId = supervisorId;
+ }
+
+ public String getAssignmentId() {
+ return assignmentId;
+ }
+
+ public void setAssignmentId(String assignmentId) {
+ this.assignmentId = assignmentId;
+ }
+
+ public String getHostName() {
+ return hostName;
+ }
+
+ public void setHostName(String hostName) {
+ this.hostName = hostName;
+ }
+
+ public ConcurrentHashMap<Long, LocalAssignment> getCurrAssignment() {
+ return currAssignment;
+ }
+
+ public void setCurrAssignment(Map<Long, LocalAssignment> currAssignment) {
+ this.currAssignment.clear();
+ this.currAssignment.putAll(currAssignment);
+ }
+
+ public StormTimer getHeartbeatTimer() {
+ return heartbeatTimer;
+ }
+
+ public void setHeartbeatTimer(StormTimer heartbeatTimer) {
+ this.heartbeatTimer = heartbeatTimer;
+ }
+
+ public StormTimer getEventTimer() {
+ return eventTimer;
+ }
+
+ public void setEventTimer(StormTimer eventTimer) {
+ this.eventTimer = eventTimer;
+ }
+
+ public StormTimer getBlobUpdateTimer() {
+ return blobUpdateTimer;
+ }
+
+ public void setBlobUpdateTimer(StormTimer blobUpdateTimer) {
+ this.blobUpdateTimer = blobUpdateTimer;
+ }
+
+ public Localizer getLocalizer() {
+ return localizer;
+ }
+
+ public void setLocalizer(Localizer localizer) {
+ this.localizer = localizer;
+ }
+
+ public AtomicInteger getSyncRetry() {
+ return syncRetry;
+ }
+
+ public void setSyncRetry(AtomicInteger syncRetry) {
+ this.syncRetry = syncRetry;
+ }
+
+ public ConcurrentHashMap<String, Map<String, Object>> getAssignmentVersions() {
+ return assignmentVersions;
+ }
+
+ public void setAssignmentVersions(Map<String, Map<String, Object>> assignmentVersions) {
+ this.assignmentVersions.clear();
+ this.assignmentVersions.putAll(assignmentVersions);
+ }
+
+ public CgroupManager getResourceIsolationManager() {
+ return resourceIsolationManager;
+ }
+
+ public void setResourceIsolationManager(CgroupManager resourceIsolationManager) {
+ this.resourceIsolationManager = resourceIsolationManager;
+ }
+
+ public Object getDownloadLock() {
+ return downloadLock;
+ }
+
+ public ConcurrentHashSet getDeadWorkers() {
+ return deadWorkers;
+ }
+
+ public void setDeadWorkers(ConcurrentHashSet deadWorkers) {
+ this.deadWorkers = deadWorkers;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/08934e29/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorHeartbeat.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorHeartbeat.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorHeartbeat.java
new file mode 100644
index 0000000..399dcd2
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorHeartbeat.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import org.apache.storm.Config;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.generated.SupervisorInfo;
+import org.apache.storm.utils.Time;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class SupervisorHeartbeat implements Runnable {
+
+ private IStormClusterState stormClusterState;
+ private String supervisorId;
+ private Map conf;
+ private SupervisorInfo supervisorInfo;
+
+ private SupervisorData supervisorData;
+
+ public SupervisorHeartbeat(Map conf, SupervisorData supervisorData) {
+ this.stormClusterState = supervisorData.getStormClusterState();
+ this.supervisorId = supervisorData.getSupervisorId();
+ this.supervisorData = supervisorData;
+ this.conf = conf;
+ }
+
+ private SupervisorInfo update(Map conf, SupervisorData supervisorData) {
+ supervisorInfo = new SupervisorInfo();
+ supervisorInfo.set_time_secs(Time.currentTimeSecs());
+ supervisorInfo.set_hostname(supervisorData.getHostName());
+ supervisorInfo.set_assignment_id(supervisorData.getAssignmentId());
+
+ List<Long> usedPorts = new ArrayList<>();
+ usedPorts.addAll(supervisorData.getCurrAssignment().keySet());
+ supervisorInfo.set_used_ports(usedPorts);
+ List<Long> portList = new ArrayList<>();
+ Object metas = supervisorData.getiSupervisor().getMetadata();
+ if (metas != null) {
+ for (Integer port : (List<Integer>) metas) {
+ portList.add(port.longValue());
+ }
+ }
+ supervisorInfo.set_meta(portList);
+ supervisorInfo.set_scheduler_meta((Map<String, String>) conf.get(Config.SUPERVISOR_SCHEDULER_META));
+ supervisorInfo.set_uptime_secs(supervisorData.getUpTime().upTime());
+ supervisorInfo.set_version(supervisorData.getStormVersion());
+ supervisorInfo.set_resources_map(mkSupervisorCapacities(conf));
+ return supervisorInfo;
+ }
+
+ private Map<String, Double> mkSupervisorCapacities(Map conf) {
+ Map<String, Double> ret = new HashMap<String, Double>();
+ Double mem = (double) (conf.get(Config.SUPERVISOR_MEMORY_CAPACITY_MB));
+ ret.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, mem);
+ Double cpu = (double) (conf.get(Config.SUPERVISOR_CPU_CAPACITY));
+ ret.put(Config.SUPERVISOR_CPU_CAPACITY, cpu);
+ return ret;
+ }
+
+ @Override
+ public void run() {
+ SupervisorInfo supervisorInfo = update(conf, supervisorData);
+ stormClusterState.supervisorHeartbeat(supervisorId, supervisorInfo);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/08934e29/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorManger.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorManger.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorManger.java
new file mode 100644
index 0000000..acc2cb8
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorManger.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import org.apache.storm.event.EventManager;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Map;
+
+public class SupervisorManger extends ShutdownWork implements SupervisorDaemon, DaemonCommon, Runnable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SupervisorManger.class);
+
+ private final EventManager eventManager;
+
+ private final EventManager processesEventManager;
+
+ private SupervisorData supervisorData;
+
+ public SupervisorManger(SupervisorData supervisorData, EventManager eventManager, EventManager processesEventManager) {
+ this.eventManager = eventManager;
+ this.supervisorData = supervisorData;
+ this.processesEventManager = processesEventManager;
+ }
+
+ @Override
+ public void shutdown() {
+ LOG.info("Shutting down supervisor{}", supervisorData.getSupervisorId());
+ supervisorData.setActive(false);
+ try {
+ supervisorData.getHeartbeatTimer().close();
+ supervisorData.getEventTimer().close();
+ supervisorData.getBlobUpdateTimer().close();
+ eventManager.close();
+ processesEventManager.close();
+ } catch (Exception e) {
+ throw Utils.wrapInRuntime(e);
+ }
+ supervisorData.getStormClusterState().disconnect();
+ }
+
+ @Override
+ public void shutdownAllWorkers() {
+
+ Collection<String> workerIds = SupervisorUtils.supervisorWorkerIds(supervisorData.getConf());
+ try {
+ for (String workerId : workerIds) {
+ shutWorker(supervisorData, workerId);
+ }
+ } catch (Exception e) {
+ LOG.error("shutWorker failed");
+ throw Utils.wrapInRuntime(e);
+ }
+ }
+
+ @Override
+ public Map getConf() {
+ return supervisorData.getConf();
+ }
+
+ @Override
+ public String getId() {
+ return supervisorData.getSupervisorId();
+ }
+
+ @Override
+ public boolean isWaiting() {
+ if (!supervisorData.isActive()) {
+ return true;
+ }
+
+ if (supervisorData.getHeartbeatTimer().isTimerWaiting() && supervisorData.getEventTimer().isTimerWaiting() && eventManager.waiting()
+ && processesEventManager.waiting()) {
+ return true;
+ }
+ return false;
+ }
+
+ public void run() {
+ shutdown();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/08934e29/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorServer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorServer.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorServer.java
new file mode 100644
index 0000000..f1dfb8a
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorServer.java
@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.commons.io.FileUtils;
+import org.apache.storm.Config;
+import org.apache.storm.StormTimer;
+import org.apache.storm.command.HealthCheck;
+import org.apache.storm.daemon.metrics.MetricsUtils;
+import org.apache.storm.daemon.metrics.reporters.PreparableReporter;
+import org.apache.storm.event.EventManagerImp;
+import org.apache.storm.localizer.Localizer;
+import org.apache.storm.messaging.IContext;
+import org.apache.storm.scheduler.ISupervisor;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.utils.VersionInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.InterruptedIOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class SupervisorServer extends ShutdownWork {
+ private static Logger LOG = LoggerFactory.getLogger(SupervisorServer.class);
+
+ /**
+ * in local state, supervisor stores who its current assignments are another thread launches events to restart any dead processes if necessary
+ *
+ * @param conf
+ * @param sharedContext
+ * @param iSupervisor
+ * @return
+ * @throws Exception
+ */
+ private SupervisorManger mkSupervisor(final Map conf, IContext sharedContext, ISupervisor iSupervisor) throws Exception {
+ SupervisorManger supervisorManger = null;
+ try {
+ LOG.info("Starting Supervisor with conf {}", conf);
+ iSupervisor.prepare(conf, ConfigUtils.supervisorIsupervisorDir(conf));
+ String path = ConfigUtils.supervisorTmpDir(conf);
+ FileUtils.cleanDirectory(new File(path));
+
+ final SupervisorData supervisorData = new SupervisorData(conf, sharedContext, iSupervisor);
+ Localizer localizer = supervisorData.getLocalizer();
+
+ SupervisorHeartbeat hb = new SupervisorHeartbeat(conf, supervisorData);
+ hb.run();
+ // should synchronize supervisor so it doesn't launch anything after being down (optimization)
+ Integer heartbeatFrequency = (Integer) conf.get(Config.SUPERVISOR_HEARTBEAT_FREQUENCY_SECS);
+ supervisorData.getHeartbeatTimer().scheduleRecurring(0, heartbeatFrequency, hb);
+
+ Set<String> downdedStormId = SupervisorUtils.readDownLoadedStormIds(conf);
+ for (String stormId : downdedStormId) {
+ SupervisorUtils.addBlobReferences(localizer, stormId, conf);
+ }
+ // do this after adding the references so we don't try to clean things being used
+ localizer.startCleaner();
+
+ EventManagerImp syncSupEventManager = new EventManagerImp(false);
+ EventManagerImp syncProcessManager = new EventManagerImp(false);
+ SyncProcessEvent syncProcessEvent = new SyncProcessEvent(supervisorData);
+ SyncSupervisorEvent syncSupervisorEvent = new SyncSupervisorEvent(supervisorData, syncProcessEvent, syncSupEventManager, syncProcessManager);
+ UpdateBlobs updateBlobsThread = new UpdateBlobs(supervisorData);
+ RunProfilerActions runProfilerActionThread = new RunProfilerActions(supervisorData);
+
+ if ((Boolean) conf.get(Config.SUPERVISOR_ENABLE)) {
+ StormTimer eventTimer = supervisorData.getEventTimer();
+ // This isn't strictly necessary, but it doesn't hurt and ensures that the machine stays up
+ // to date even if callbacks don't all work exactly right
+ eventTimer.scheduleRecurring(0, 10, new EventManagerPushCallback(syncSupervisorEvent, syncSupEventManager));
+
+ eventTimer.scheduleRecurring(0, (Integer) conf.get(Config.SUPERVISOR_MONITOR_FREQUENCY_SECS),
+ new EventManagerPushCallback(syncProcessEvent, syncProcessManager));
+
+ // Blob update thread. Starts with 30 seconds delay, every 30 seconds
+ supervisorData.getBlobUpdateTimer().scheduleRecurring(30, 30, new EventManagerPushCallback(updateBlobsThread, syncSupEventManager));
+
+ // supervisor health check
+ eventTimer.scheduleRecurring(300, 300, new Runnable() {
+ @Override
+ public void run() {
+ int healthCode = HealthCheck.healthCheck(conf);
+ Collection<String> workerIds = SupervisorUtils.supervisorWorkerIds(conf);
+ if (healthCode != 0) {
+ for (String workerId : workerIds) {
+ try {
+ shutWorker(supervisorData, workerId);
+ } catch (Exception e) {
+ throw Utils.wrapInRuntime(e);
+ }
+ }
+ }
+ }
+ });
+
+ // Launch a thread that Runs profiler commands . Starts with 30 seconds delay, every 30 seconds
+ eventTimer.scheduleRecurring(30, 30, new EventManagerPushCallback(runProfilerActionThread, syncSupEventManager));
+ }
+ supervisorManger = new SupervisorManger(supervisorData, syncSupEventManager, syncProcessManager);
+ } catch (Throwable t) {
+ if (Utils.exceptionCauseIsInstanceOf(InterruptedIOException.class, t)) {
+ throw t;
+ } else if (Utils.exceptionCauseIsInstanceOf(InterruptedException.class, t)) {
+ throw t;
+ } else {
+ LOG.error("Error on initialization of server supervisor");
+ Utils.exitProcess(13, "Error on initialization");
+ }
+ }
+ return supervisorManger;
+ }
+
+ /**
+ * start local supervisor
+ */
+ public void localLaunch() {
+ LOG.info("Starting supervisor for storm version '{}'.", VersionInfo.getVersion());
+ SupervisorManger supervisorManager;
+ try {
+ Map<Object, Object> conf = Utils.readStormConfig();
+ if (!ConfigUtils.isLocalMode(conf)) {
+ throw new IllegalArgumentException("Cannot start server in distribute mode!");
+ }
+ ISupervisor iSupervisor = new StandaloneSupervisor();
+ supervisorManager = mkSupervisor(conf, null, iSupervisor);
+ if (supervisorManager != null)
+ Utils.addShutdownHookWithForceKillIn1Sec(supervisorManager);
+ } catch (Exception e) {
+ LOG.error("Failed to start supervisor\n", e);
+ System.exit(1);
+ }
+ }
+
+ /**
+ * start distribute supervisor
+ */
+ private void distributeLaunch() {
+ LOG.info("Starting supervisor for storm version '{}'.", VersionInfo.getVersion());
+ SupervisorManger supervisorManager;
+ try {
+ Map<Object, Object> conf = Utils.readStormConfig();
+ if (ConfigUtils.isLocalMode(conf)) {
+ throw new IllegalArgumentException("Cannot start server in local mode!");
+ }
+ ISupervisor iSupervisor = new StandaloneSupervisor();
+ supervisorManager = mkSupervisor(conf, null, iSupervisor);
+ if (supervisorManager != null)
+ Utils.addShutdownHookWithForceKillIn1Sec(supervisorManager);
+ registerWorkerNumGauge("drpc:num-execute-http-requests", conf);
+ startMetricsReporters(conf);
+ } catch (Exception e) {
+ LOG.error("Failed to start supervisor\n", e);
+ System.exit(1);
+ }
+ }
+
+ // To be removed
+ private void registerWorkerNumGauge(String name, final Map conf) {
+ MetricRegistry metricRegistry = new MetricRegistry();
+ metricRegistry.remove(name);
+ metricRegistry.register(name, new Gauge<Integer>() {
+ @Override
+ public Integer getValue() {
+ Collection<String> pids = Utils.readDirContents(ConfigUtils.workerRoot(conf));
+ return pids.size();
+ }
+ });
+ }
+
+ // To be removed
+ private void startMetricsReporters(Map conf) {
+ List<PreparableReporter> preparableReporters = MetricsUtils.getPreparableReporters(conf);
+ for (PreparableReporter reporter : preparableReporters) {
+ reporter.prepare(new MetricRegistry(), conf);
+ reporter.start();
+ }
+ LOG.info("Started statistics report plugin...");
+ }
+
+ /**
+ * supervisor daemon enter entrance
+ *
+ * @param args
+ */
+ public static void main(String[] args) {
+ Utils.setupDefaultUncaughtExceptionHandler();
+ SupervisorServer instance = new SupervisorServer();
+ instance.distributeLaunch();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/08934e29/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java
new file mode 100644
index 0000000..ffdb839
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.curator.utils.PathUtils;
+import org.apache.storm.Config;
+import org.apache.storm.localizer.LocalResource;
+import org.apache.storm.localizer.Localizer;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URLDecoder;
+import java.util.*;
+
+public class SupervisorUtils {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SupervisorUtils.class);
+
+ public static Process workerLauncher(Map conf, String user, List<String> args, Map<String, String> environment, final String logPreFix,
+ final Utils.ExitCodeCallable exitCodeCallback, File dir) throws IOException {
+ if (StringUtils.isBlank(user)) {
+ throw new IllegalArgumentException("User cannot be blank when calling workerLauncher.");
+ }
+ String wlinitial = (String) (conf.get(Config.SUPERVISOR_WORKER_LAUNCHER));
+ String stormHome = System.getProperty("storm.home");
+ String wl;
+ if (StringUtils.isNotBlank(wlinitial)) {
+ wl = wlinitial;
+ } else {
+ wl = stormHome + "/bin/worker-launcher";
+ }
+ List<String> commands = new ArrayList<>();
+ commands.add(wl);
+ commands.add(user);
+ commands.addAll(args);
+ return Utils.launchProcess(commands, environment, logPreFix, exitCodeCallback, dir);
+ }
+
+ public static int workerLauncherAndWait(Map conf, String user, List<String> args, final Map<String, String> environment, final String logPreFix)
+ throws IOException {
+ int ret = 0;
+ Process process = workerLauncher(conf, user, args, environment, logPreFix, null, null);
+ if (StringUtils.isNotBlank(logPreFix))
+ Utils.readAndLogStream(logPreFix, process.getInputStream());
+ try {
+ process.waitFor();
+ } catch (InterruptedException e) {
+ LOG.info("{} interrupted.", logPreFix);
+ }
+ ret = process.exitValue();
+ return ret;
+ }
+
+ public static void setupStormCodeDir(Map conf, Map stormConf, String dir) throws IOException {
+ if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
+ String logPrefix = "setup conf for " + dir;
+ List<String> commands = new ArrayList<>();
+ commands.add("code-dir");
+ commands.add(dir);
+ workerLauncherAndWait(conf, (String) (stormConf.get(Config.TOPOLOGY_SUBMITTER_USER)), commands, null, logPrefix);
+ }
+ }
+
+ public static void rmrAsUser(Map conf, String id, String path) throws IOException {
+ String user = Utils.getFileOwner(path);
+ String logPreFix = "rmr " + id;
+ List<String> commands = new ArrayList<>();
+ commands.add("rmr");
+ commands.add(path);
+ SupervisorUtils.workerLauncherAndWait(conf, user, commands, null, logPreFix);
+ if (Utils.checkFileExists(path)) {
+ throw new RuntimeException(path + " was not deleted.");
+ }
+ }
+
+ /**
+ * Given the blob information returns the value of the uncompress field, handling it either being a string or a boolean value, or if it's not specified then
+ * returns false
+ *
+ * @param blobInfo
+ * @return
+ */
+ public static Boolean isShouldUncompressBlob(Map<String, Object> blobInfo) {
+ return new Boolean((String) blobInfo.get("uncompress"));
+ }
+
+ /**
+ * Remove a reference to a blob when its no longer needed
+ *
+ * @param blobstoreMap
+ * @return
+ */
+ public static List<LocalResource> blobstoreMapToLocalresources(Map<String, Map<String, Object>> blobstoreMap) {
+ List<LocalResource> localResourceList = new ArrayList<>();
+ if (blobstoreMap != null) {
+ for (Map.Entry<String, Map<String, Object>> map : blobstoreMap.entrySet()) {
+ LocalResource localResource = new LocalResource(map.getKey(), isShouldUncompressBlob(map.getValue()));
+ localResourceList.add(localResource);
+ }
+ }
+ return localResourceList;
+ }
+
+ /**
+ * For each of the downloaded topologies, adds references to the blobs that the topologies are using. This is used to reconstruct the cache on restart.
+ *
+ * @param localizer
+ * @param stormId
+ * @param conf
+ */
+ public static void addBlobReferences(Localizer localizer, String stormId, Map conf) throws IOException {
+ Map stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId);
+ Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String, Object>>) stormConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
+ String user = (String) stormConf.get(Config.TOPOLOGY_SUBMITTER_USER);
+ String topoName = (String) stormConf.get(Config.TOPOLOGY_NAME);
+ List<LocalResource> localresources = SupervisorUtils.blobstoreMapToLocalresources(blobstoreMap);
+ if (blobstoreMap != null) {
+ localizer.addReferences(localresources, user, topoName);
+ }
+ }
+
+ public static Set<String> readDownLoadedStormIds(Map conf) throws IOException {
+ Set<String> stormIds = new HashSet<>();
+ String path = ConfigUtils.supervisorStormDistRoot(conf);
+ Collection<String> rets = Utils.readDirContents(path);
+ for (String ret : rets) {
+ stormIds.add(URLDecoder.decode(ret));
+ }
+ return stormIds;
+ }
+
+ public static Collection<String> supervisorWorkerIds(Map conf) {
+ String workerRoot = ConfigUtils.workerRoot(conf);
+ return Utils.readDirContents(workerRoot);
+ }
+
+ public static boolean checkTopoFilesExist(Map conf, String stormId) throws IOException {
+ String stormroot = ConfigUtils.supervisorStormDistRoot(conf, stormId);
+ String stormjarpath = ConfigUtils.supervisorStormJarPath(stormroot);
+ String stormcodepath = ConfigUtils.supervisorStormCodePath(stormroot);
+ String stormconfpath = ConfigUtils.supervisorStormConfPath(stormroot);
+ if (!Utils.checkFileExists(stormroot))
+ return false;
+ if (!Utils.checkFileExists(stormcodepath))
+ return false;
+ if (!Utils.checkFileExists(stormconfpath))
+ return false;
+ if (!ConfigUtils.isLocalMode(conf) && !Utils.checkFileExists(stormjarpath))
+ return false;
+ return true;
+ }
+
+}