You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/04/01 16:07:28 UTC
[05/35] storm git commit: update test codes about supervisor
http://git-wip-us.apache.org/repos/asf/storm/blob/19fcafbd/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java
new file mode 100644
index 0000000..9df7ec1
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.commons.io.FileUtils;
+import org.apache.storm.Config;
+import org.apache.storm.StormTimer;
+import org.apache.storm.daemon.metrics.MetricsUtils;
+import org.apache.storm.daemon.metrics.reporters.PreparableReporter;
+import org.apache.storm.daemon.supervisor.timer.RunProfilerActions;
+import org.apache.storm.daemon.supervisor.timer.SupervisorHealthCheck;
+import org.apache.storm.daemon.supervisor.timer.SupervisorHeartbeat;
+import org.apache.storm.daemon.supervisor.timer.UpdateBlobs;
+import org.apache.storm.event.EventManagerImp;
+import org.apache.storm.localizer.Localizer;
+import org.apache.storm.messaging.IContext;
+import org.apache.storm.scheduler.ISupervisor;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.utils.VersionInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.InterruptedIOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class Supervisor {
+ private static Logger LOG = LoggerFactory.getLogger(Supervisor.class);
+
+ //TODO: to be removed after porting worker.clj. localSyncProcess is intended to start local supervisor
+ private SyncProcessEvent localSyncProcess;
+
+ public void setLocalSyncProcess(SyncProcessEvent localSyncProcess) {
+ this.localSyncProcess = localSyncProcess;
+ }
+
+
+ /**
+ * in local state, supervisor stores who its current assignments are another thread launches events to restart any dead processes if necessary
+ *
+ * @param conf
+ * @param sharedContext
+ * @param iSupervisor
+ * @return
+ * @throws Exception
+ */
+ public SupervisorManger mkSupervisor(final Map conf, IContext sharedContext, ISupervisor iSupervisor) throws Exception {
+ SupervisorManger supervisorManger = null;
+ try {
+ LOG.info("Starting Supervisor with conf {}", conf);
+ iSupervisor.prepare(conf, ConfigUtils.supervisorIsupervisorDir(conf));
+ String path = ConfigUtils.supervisorTmpDir(conf);
+ FileUtils.cleanDirectory(new File(path));
+
+ final SupervisorData supervisorData = new SupervisorData(conf, sharedContext, iSupervisor);
+ Localizer localizer = supervisorData.getLocalizer();
+
+ SupervisorHeartbeat hb = new SupervisorHeartbeat(conf, supervisorData);
+ hb.run();
+ // should synchronize supervisor so it doesn't launch anything after being down (optimization)
+ Integer heartbeatFrequency = Utils.getInt(conf.get(Config.SUPERVISOR_HEARTBEAT_FREQUENCY_SECS));
+ supervisorData.getHeartbeatTimer().scheduleRecurring(0, heartbeatFrequency, hb);
+
+ Set<String> downdedStormId = SupervisorUtils.readDownLoadedStormIds(conf);
+ for (String stormId : downdedStormId) {
+ SupervisorUtils.addBlobReferences(localizer, stormId, conf);
+ }
+ // do this after adding the references so we don't try to clean things being used
+ localizer.startCleaner();
+
+ EventManagerImp syncSupEventManager = new EventManagerImp(false);
+ EventManagerImp syncProcessManager = new EventManagerImp(false);
+
+ SyncProcessEvent syncProcessEvent = null;
+ if (ConfigUtils.isLocalMode(conf)){
+ localSyncProcess.init(supervisorData);
+ syncProcessEvent = localSyncProcess;
+ }else{
+ syncProcessEvent = new SyncProcessEvent(supervisorData);
+ }
+
+ SyncSupervisorEvent syncSupervisorEvent = new SyncSupervisorEvent(supervisorData, syncProcessEvent, syncSupEventManager, syncProcessManager);
+ UpdateBlobs updateBlobsThread = new UpdateBlobs(supervisorData);
+ RunProfilerActions runProfilerActionThread = new RunProfilerActions(supervisorData);
+
+ if ((Boolean) conf.get(Config.SUPERVISOR_ENABLE)) {
+ StormTimer eventTimer = supervisorData.getEventTimer();
+ // This isn't strictly necessary, but it doesn't hurt and ensures that the machine stays up
+ // to date even if callbacks don't all work exactly right
+ eventTimer.scheduleRecurring(0, 10, new EventManagerPushCallback(syncSupervisorEvent, syncSupEventManager));
+
+ eventTimer.scheduleRecurring(0, Utils.getInt(conf.get(Config.SUPERVISOR_MONITOR_FREQUENCY_SECS)),
+ new EventManagerPushCallback(syncProcessEvent, syncProcessManager));
+
+ // Blob update thread. Starts with 30 seconds delay, every 30 seconds
+ supervisorData.getBlobUpdateTimer().scheduleRecurring(30, 30, new EventManagerPushCallback(updateBlobsThread, syncSupEventManager));
+
+ // supervisor health check
+ eventTimer.scheduleRecurring(300, 300, new SupervisorHealthCheck(supervisorData));
+
+ // Launch a thread that Runs profiler commands . Starts with 30 seconds delay, every 30 seconds
+ eventTimer.scheduleRecurring(30, 30, new EventManagerPushCallback(runProfilerActionThread, syncSupEventManager));
+ }
+ LOG.info("Starting supervisor with id {} at host {}.", supervisorData.getSupervisorId(), supervisorData.getHostName() );
+ supervisorManger = new SupervisorManger(supervisorData, syncSupEventManager, syncProcessManager);
+ } catch (Throwable t) {
+ if (Utils.exceptionCauseIsInstanceOf(InterruptedIOException.class, t)) {
+ throw t;
+ } else if (Utils.exceptionCauseIsInstanceOf(InterruptedException.class, t)) {
+ throw t;
+ } else {
+ LOG.error("Error on initialization of server supervisor: {}", t);
+ Utils.exitProcess(13, "Error on initialization");
+ }
+ }
+ return supervisorManger;
+ }
+
+ /**
+ * start distribute supervisor
+ */
+ private void distributeLaunch() {
+ LOG.info("Starting supervisor for storm version '{}'.", VersionInfo.getVersion());
+ SupervisorManger supervisorManager;
+ try {
+ Map<Object, Object> conf = Utils.readStormConfig();
+ if (ConfigUtils.isLocalMode(conf)) {
+ throw new IllegalArgumentException("Cannot start server in local mode!");
+ }
+ ISupervisor iSupervisor = new StandaloneSupervisor();
+ supervisorManager = mkSupervisor(conf, null, iSupervisor);
+ if (supervisorManager != null)
+ Utils.addShutdownHookWithForceKillIn1Sec(supervisorManager);
+ registerWorkerNumGauge("drpc:num-execute-http-requests", conf);
+ startMetricsReporters(conf);
+ } catch (Exception e) {
+ LOG.error("Failed to start supervisor\n", e);
+ System.exit(1);
+ }
+ }
+
+ // To be removed
+ private void registerWorkerNumGauge(String name, final Map conf) {
+ MetricRegistry metricRegistry = new MetricRegistry();
+ metricRegistry.remove(name);
+ metricRegistry.register(name, new Gauge<Integer>() {
+ @Override
+ public Integer getValue() {
+ Collection<String> pids = SupervisorUtils.myWorkerIds(conf);
+ return pids.size();
+ }
+ });
+ }
+
+ // To be removed
+ private void startMetricsReporters(Map conf) {
+ List<PreparableReporter> preparableReporters = MetricsUtils.getPreparableReporters(conf);
+ for (PreparableReporter reporter : preparableReporters) {
+ reporter.prepare(new MetricRegistry(), conf);
+ reporter.start();
+ }
+ LOG.info("Started statistics report plugin...");
+ }
+
+ /**
+ * supervisor daemon enter entrance
+ *
+ * @param args
+ */
+ public static void main(String[] args) {
+ Utils.setupDefaultUncaughtExceptionHandler();
+ Supervisor instance = new Supervisor();
+ instance.distributeLaunch();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/19fcafbd/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorData.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorData.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorData.java
index 9eec253..039fe30 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorData.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorData.java
@@ -105,10 +105,9 @@ public class SupervisorData {
List<ACL> acls = null;
if (Utils.isZkAuthenticationConfiguredStormServer(conf)) {
- acls = new ArrayList<>();
- acls.add(ZooDefs.Ids.CREATOR_ALL_ACL.get(0));
- acls.add(new ACL((ZooDefs.Perms.READ ^ ZooDefs.Perms.CREATE), ZooDefs.Ids.ANYONE_ID_UNSAFE));
+ acls = SupervisorUtils.supervisorZkAcls();
}
+
try {
this.stormClusterState = ClusterUtils.mkStormClusterState(conf, acls, new ClusterStateContext(DaemonType.SUPERVISOR));
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/storm/blob/19fcafbd/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorServer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorServer.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorServer.java
deleted file mode 100644
index fd31631..0000000
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorServer.java
+++ /dev/null
@@ -1,201 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.daemon.supervisor;
-
-import com.codahale.metrics.Gauge;
-import com.codahale.metrics.MetricRegistry;
-import org.apache.commons.io.FileUtils;
-import org.apache.storm.Config;
-import org.apache.storm.StormTimer;
-import org.apache.storm.command.HealthCheck;
-import org.apache.storm.daemon.metrics.MetricsUtils;
-import org.apache.storm.daemon.metrics.reporters.PreparableReporter;
-import org.apache.storm.daemon.supervisor.timer.RunProfilerActions;
-import org.apache.storm.daemon.supervisor.timer.SupervisorHealthCheck;
-import org.apache.storm.daemon.supervisor.timer.SupervisorHeartbeat;
-import org.apache.storm.daemon.supervisor.timer.UpdateBlobs;
-import org.apache.storm.event.EventManagerImp;
-import org.apache.storm.localizer.Localizer;
-import org.apache.storm.messaging.IContext;
-import org.apache.storm.scheduler.ISupervisor;
-import org.apache.storm.utils.ConfigUtils;
-import org.apache.storm.utils.Utils;
-import org.apache.storm.utils.VersionInfo;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.InterruptedIOException;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-public class SupervisorServer {
- private static Logger LOG = LoggerFactory.getLogger(SupervisorServer.class);
-
- /**
- * in local state, supervisor stores who its current assignments are another thread launches events to restart any dead processes if necessary
- *
- * @param conf
- * @param sharedContext
- * @param iSupervisor
- * @return
- * @throws Exception
- */
- private SupervisorManger mkSupervisor(final Map conf, IContext sharedContext, ISupervisor iSupervisor) throws Exception {
- SupervisorManger supervisorManger = null;
- try {
- LOG.info("Starting Supervisor with conf {}", conf);
- iSupervisor.prepare(conf, ConfigUtils.supervisorIsupervisorDir(conf));
- String path = ConfigUtils.supervisorTmpDir(conf);
- FileUtils.cleanDirectory(new File(path));
-
- final SupervisorData supervisorData = new SupervisorData(conf, sharedContext, iSupervisor);
- Localizer localizer = supervisorData.getLocalizer();
-
- SupervisorHeartbeat hb = new SupervisorHeartbeat(conf, supervisorData);
- hb.run();
- // should synchronize supervisor so it doesn't launch anything after being down (optimization)
- Integer heartbeatFrequency = (Integer) conf.get(Config.SUPERVISOR_HEARTBEAT_FREQUENCY_SECS);
- supervisorData.getHeartbeatTimer().scheduleRecurring(0, heartbeatFrequency, hb);
-
- Set<String> downdedStormId = SupervisorUtils.readDownLoadedStormIds(conf);
- for (String stormId : downdedStormId) {
- SupervisorUtils.addBlobReferences(localizer, stormId, conf);
- }
- // do this after adding the references so we don't try to clean things being used
- localizer.startCleaner();
-
- EventManagerImp syncSupEventManager = new EventManagerImp(false);
- EventManagerImp syncProcessManager = new EventManagerImp(false);
- SyncProcessEvent syncProcessEvent = new SyncProcessEvent(supervisorData);
- SyncSupervisorEvent syncSupervisorEvent = new SyncSupervisorEvent(supervisorData, syncProcessEvent, syncSupEventManager, syncProcessManager);
- UpdateBlobs updateBlobsThread = new UpdateBlobs(supervisorData);
- RunProfilerActions runProfilerActionThread = new RunProfilerActions(supervisorData);
-
- if ((Boolean) conf.get(Config.SUPERVISOR_ENABLE)) {
- StormTimer eventTimer = supervisorData.getEventTimer();
- // This isn't strictly necessary, but it doesn't hurt and ensures that the machine stays up
- // to date even if callbacks don't all work exactly right
- eventTimer.scheduleRecurring(0, 10, new EventManagerPushCallback(syncSupervisorEvent, syncSupEventManager));
-
- eventTimer.scheduleRecurring(0, (Integer) conf.get(Config.SUPERVISOR_MONITOR_FREQUENCY_SECS),
- new EventManagerPushCallback(syncProcessEvent, syncProcessManager));
-
- // Blob update thread. Starts with 30 seconds delay, every 30 seconds
- supervisorData.getBlobUpdateTimer().scheduleRecurring(30, 30, new EventManagerPushCallback(updateBlobsThread, syncSupEventManager));
-
- // supervisor health check
- eventTimer.scheduleRecurring(300, 300, new SupervisorHealthCheck(supervisorData));
-
- // Launch a thread that Runs profiler commands . Starts with 30 seconds delay, every 30 seconds
- eventTimer.scheduleRecurring(30, 30, new EventManagerPushCallback(runProfilerActionThread, syncSupEventManager));
- }
- supervisorManger = new SupervisorManger(supervisorData, syncSupEventManager, syncProcessManager);
- } catch (Throwable t) {
- if (Utils.exceptionCauseIsInstanceOf(InterruptedIOException.class, t)) {
- throw t;
- } else if (Utils.exceptionCauseIsInstanceOf(InterruptedException.class, t)) {
- throw t;
- } else {
- LOG.error("Error on initialization of server supervisor");
- Utils.exitProcess(13, "Error on initialization");
- }
- }
- return supervisorManger;
- }
-
- /**
- * start local supervisor
- */
- public void localLaunch() {
- LOG.info("Starting supervisor for storm version '{}'.", VersionInfo.getVersion());
- SupervisorManger supervisorManager;
- try {
- Map<Object, Object> conf = Utils.readStormConfig();
- if (!ConfigUtils.isLocalMode(conf)) {
- throw new IllegalArgumentException("Cannot start server in distribute mode!");
- }
- ISupervisor iSupervisor = new StandaloneSupervisor();
- supervisorManager = mkSupervisor(conf, null, iSupervisor);
- if (supervisorManager != null)
- Utils.addShutdownHookWithForceKillIn1Sec(supervisorManager);
- } catch (Exception e) {
- LOG.error("Failed to start supervisor\n", e);
- System.exit(1);
- }
- }
-
- /**
- * start distribute supervisor
- */
- private void distributeLaunch() {
- LOG.info("Starting supervisor for storm version '{}'.", VersionInfo.getVersion());
- SupervisorManger supervisorManager;
- try {
- Map<Object, Object> conf = Utils.readStormConfig();
- if (ConfigUtils.isLocalMode(conf)) {
- throw new IllegalArgumentException("Cannot start server in local mode!");
- }
- ISupervisor iSupervisor = new StandaloneSupervisor();
- supervisorManager = mkSupervisor(conf, null, iSupervisor);
- if (supervisorManager != null)
- Utils.addShutdownHookWithForceKillIn1Sec(supervisorManager);
- registerWorkerNumGauge("drpc:num-execute-http-requests", conf);
- startMetricsReporters(conf);
- } catch (Exception e) {
- LOG.error("Failed to start supervisor\n", e);
- System.exit(1);
- }
- }
-
- // To be removed
- private void registerWorkerNumGauge(String name, final Map conf) {
- MetricRegistry metricRegistry = new MetricRegistry();
- metricRegistry.remove(name);
- metricRegistry.register(name, new Gauge<Integer>() {
- @Override
- public Integer getValue() {
- Collection<String> pids = Utils.readDirContents(ConfigUtils.workerRoot(conf));
- return pids.size();
- }
- });
- }
-
- // To be removed
- private void startMetricsReporters(Map conf) {
- List<PreparableReporter> preparableReporters = MetricsUtils.getPreparableReporters(conf);
- for (PreparableReporter reporter : preparableReporters) {
- reporter.prepare(new MetricRegistry(), conf);
- reporter.start();
- }
- LOG.info("Started statistics report plugin...");
- }
-
- /**
- * supervisor daemon enter entrance
- *
- * @param args
- */
- public static void main(String[] args) {
- Utils.setupDefaultUncaughtExceptionHandler();
- SupervisorServer instance = new SupervisorServer();
- instance.distributeLaunch();
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/19fcafbd/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java
index ffdb839..9d0b343 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java
@@ -20,10 +20,14 @@ package org.apache.storm.daemon.supervisor;
import org.apache.commons.lang.StringUtils;
import org.apache.curator.utils.PathUtils;
import org.apache.storm.Config;
+import org.apache.storm.generated.LSWorkerHeartbeat;
import org.apache.storm.localizer.LocalResource;
import org.apache.storm.localizer.Localizer;
import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.LocalState;
import org.apache.storm.utils.Utils;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,13 +40,24 @@ public class SupervisorUtils {
private static final Logger LOG = LoggerFactory.getLogger(SupervisorUtils.class);
+ private static final SupervisorUtils INSTANCE = new SupervisorUtils();
+ private static SupervisorUtils _instance = INSTANCE;
+
+ public static void setInstance(SupervisorUtils u) {
+ _instance = u;
+ }
+
+ public static void resetInstance() {
+ _instance = INSTANCE;
+ }
+
public static Process workerLauncher(Map conf, String user, List<String> args, Map<String, String> environment, final String logPreFix,
final Utils.ExitCodeCallable exitCodeCallback, File dir) throws IOException {
if (StringUtils.isBlank(user)) {
throw new IllegalArgumentException("User cannot be blank when calling workerLauncher.");
}
String wlinitial = (String) (conf.get(Config.SUPERVISOR_WORKER_LAUNCHER));
- String stormHome = System.getProperty("storm.home");
+ String stormHome = ConfigUtils.concatIfNotNull(System.getProperty("storm.home"));
String wl;
if (StringUtils.isNotBlank(wlinitial)) {
wl = wlinitial;
@@ -165,9 +180,94 @@ public class SupervisorUtils {
return false;
if (!Utils.checkFileExists(stormconfpath))
return false;
- if (!ConfigUtils.isLocalMode(conf) && !Utils.checkFileExists(stormjarpath))
- return false;
- return true;
+ if (ConfigUtils.isLocalMode(conf) || Utils.checkFileExists(stormjarpath))
+ return true;
+ return false;
+ }
+
+ public static Collection<String> myWorkerIds(Map conf){
+ return Utils.readDirContents(ConfigUtils.workerRoot(conf));
+ }
+
+ /**
+ * Returns map from worr id to heartbeat
+ *
+ * @param conf
+ * @return
+ * @throws Exception
+ */
+ public static Map<String, LSWorkerHeartbeat> readWorkerHeartbeats(Map conf) throws Exception {
+ return _instance.readWorkerHeartbeatsImpl(conf);
+ }
+
+ public Map<String, LSWorkerHeartbeat> readWorkerHeartbeatsImpl(Map conf) throws Exception {
+ Map<String, LSWorkerHeartbeat> workerHeartbeats = new HashMap<>();
+
+ Collection<String> workerIds = SupervisorUtils.supervisorWorkerIds(conf);
+
+ for (String workerId : workerIds) {
+ LSWorkerHeartbeat whb = readWorkerHeartbeat(conf, workerId);
+ // ATTENTION: whb can be null
+ workerHeartbeats.put(workerId, whb);
+ }
+ return workerHeartbeats;
+ }
+
+
+ /**
+ * get worker heartbeat by workerId
+ *
+ * @param conf
+ * @param workerId
+ * @return
+ * @throws IOException
+ */
+ public static LSWorkerHeartbeat readWorkerHeartbeat(Map conf, String workerId) {
+ return _instance.readWorkerHeartbeatImpl(conf, workerId);
+ }
+
+ public LSWorkerHeartbeat readWorkerHeartbeatImpl(Map conf, String workerId) {
+ try {
+ LocalState localState = ConfigUtils.workerState(conf, workerId);
+ return localState.getWorkerHeartBeat();
+ } catch (Exception e) {
+ LOG.warn("Failed to read local heartbeat for workerId : {},Ignoring exception.", workerId, e);
+ return null;
+ }
+ }
+
+ public static boolean isWorkerHbTimedOut(int now, LSWorkerHeartbeat whb, Map conf) {
+ return _instance.isWorkerHbTimedOutImpl(now, whb, conf);
+ }
+
+ public boolean isWorkerHbTimedOutImpl(int now, LSWorkerHeartbeat whb, Map conf) {
+ boolean result = false;
+ if ((now - whb.get_time_secs()) > Utils.getInt(conf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS))) {
+ result = true;
+ }
+ return result;
+ }
+
+ public static String javaCmd(String cmd) {
+ return _instance.javaCmdImpl(cmd);
+ }
+
+ public String javaCmdImpl(String cmd) {
+ String ret = null;
+ String javaHome = System.getenv().get("JAVA_HOME");
+ if (StringUtils.isNotBlank(javaHome)) {
+ ret = javaHome + Utils.FILE_PATH_SEPARATOR + "bin" + Utils.FILE_PATH_SEPARATOR + cmd;
+ } else {
+ ret = cmd;
+ }
+ return ret;
+ }
+
+ public static List<ACL> supervisorZkAcls() {
+ List<ACL> acls = new ArrayList<>();
+ acls.add(ZooDefs.Ids.CREATOR_ALL_ACL.get(0));
+ acls.add(new ACL((ZooDefs.Perms.READ ^ ZooDefs.Perms.CREATE), ZooDefs.Ids.ANYONE_ID_UNSAFE));
+ return acls;
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/19fcafbd/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java
index af454b9..4ef6d1c 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java
@@ -17,14 +17,10 @@
*/
package org.apache.storm.daemon.supervisor;
-import clojure.lang.IFn;
-import clojure.lang.RT;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.storm.Config;
-import org.apache.storm.ProcessSimulator;
-import org.apache.storm.cluster.IStormClusterState;
-import org.apache.storm.daemon.Shutdownable;
+import org.apache.storm.container.cgroup.CgroupManager;
import org.apache.storm.generated.ExecutorInfo;
import org.apache.storm.generated.LSWorkerHeartbeat;
import org.apache.storm.generated.LocalAssignment;
@@ -33,6 +29,7 @@ import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.LocalState;
import org.apache.storm.utils.Time;
import org.apache.storm.utils.Utils;
+import org.eclipse.jetty.util.ConcurrentHashSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.yaml.snakeyaml.Yaml;
@@ -52,9 +49,7 @@ public class SyncProcessEvent extends ShutdownWork implements Runnable {
private static Logger LOG = LoggerFactory.getLogger(SyncProcessEvent.class);
- private final LocalState localState;
-
- private IStormClusterState stormClusterState;
+ private LocalState localState;
private SupervisorData supervisorData;
@@ -80,15 +75,21 @@ public class SyncProcessEvent extends ShutdownWork implements Runnable {
}
}
+ public SyncProcessEvent(){
+
+ }
+
public SyncProcessEvent(SupervisorData supervisorData) {
+ init(supervisorData);
+ }
+ //TODO: initData is intended to local supervisor, so we will remove them after porting worker.clj to java
+ public void init(SupervisorData supervisorData){
this.supervisorData = supervisorData;
-
this.localState = supervisorData.getLocalState();
-
- this.stormClusterState = supervisorData.getStormClusterState();
}
+
/**
* 1. to kill are those in allocated that are dead or disallowed 2. kill the ones that should be dead - read pids, kill -9 and individually remove file -
* rmr heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log) 3. of the rest, figure out what assignments aren't yet satisfied 4. generate new
@@ -101,12 +102,13 @@ public class SyncProcessEvent extends ShutdownWork implements Runnable {
try {
Map conf = supervisorData.getConf();
Map<Integer, LocalAssignment> assignedExecutors = localState.getLocalAssignmentsMap();
+
if (assignedExecutors == null) {
assignedExecutors = new HashMap<>();
}
int now = Time.currentTimeSecs();
- Map<String, StateHeartbeat> localWorkerStats = getLocalWorkerStats(assignedExecutors, now);
+ Map<String, StateHeartbeat> localWorkerStats = getLocalWorkerStats(supervisorData, assignedExecutors, now);
Set<String> keeperWorkerIds = new HashSet<>();
Set<Integer> keepPorts = new HashSet<>();
@@ -171,16 +173,17 @@ public class SyncProcessEvent extends ShutdownWork implements Runnable {
}
}
- Map<Integer, LocalAssignment> getReassignExecutors(Map<Integer, LocalAssignment> assignExecutors, Set<Integer> keepPorts) {
+ protected Map<Integer, LocalAssignment> getReassignExecutors(Map<Integer, LocalAssignment> assignExecutors, Set<Integer> keepPorts) {
Map<Integer, LocalAssignment> reassignExecutors = new HashMap<>();
+ reassignExecutors.putAll(assignExecutors);
for (Integer port : keepPorts) {
- if (assignExecutors.containsKey(port)) {
- reassignExecutors.put(port, assignExecutors.get(port));
- }
+ reassignExecutors.remove(port);
}
return reassignExecutors;
}
+
+
/**
* Returns map from worker id to worker heartbeat. if the heartbeat is nil, then the worker is dead
*
@@ -188,11 +191,11 @@ public class SyncProcessEvent extends ShutdownWork implements Runnable {
* @return
* @throws Exception
*/
- public Map<String, StateHeartbeat> getLocalWorkerStats(Map<Integer, LocalAssignment> assignedExecutors, int now) throws Exception {
+ public Map<String, StateHeartbeat> getLocalWorkerStats(SupervisorData supervisorData, Map<Integer, LocalAssignment> assignedExecutors, int now) throws Exception {
Map<String, StateHeartbeat> workerIdHbstate = new HashMap<>();
Map conf = supervisorData.getConf();
LocalState localState = supervisorData.getLocalState();
- Map<String, LSWorkerHeartbeat> idToHeartbeat = readWorkerHeartbeats(conf);
+ Map<String, LSWorkerHeartbeat> idToHeartbeat = SupervisorUtils.readWorkerHeartbeats(conf);
Map<String, Integer> approvedWorkers = localState.getApprovedWorkers();
Set<String> approvedIds = new HashSet<>();
if (approvedWorkers != null) {
@@ -209,12 +212,12 @@ public class SyncProcessEvent extends ShutdownWork implements Runnable {
} else if (supervisorData.getDeadWorkers().contains(workerId)) {
LOG.info("Worker Process {}as died", workerId);
state = State.timedOut;
- } else if ((now - whb.get_time_secs()) > (Integer) (conf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS))) {
+ } else if (SupervisorUtils.isWorkerHbTimedOut(now, whb, conf)) {
state = State.timedOut;
} else {
state = State.valid;
}
- LOG.debug("Worker:{} state:{} WorkerHeartbeat:{} at supervisor time-secs {}", workerId, state, whb.toString(), now);
+ LOG.debug("Worker:{} state:{} WorkerHeartbeat:{} at supervisor time-secs {}", workerId, state, whb, now);
workerIdHbstate.put(workerId, new StateHeartbeat(state, whb));
}
return workerIdHbstate;
@@ -222,7 +225,7 @@ public class SyncProcessEvent extends ShutdownWork implements Runnable {
protected boolean matchesAssignment(LSWorkerHeartbeat whb, Map<Integer, LocalAssignment> assignedExecutors) {
LocalAssignment localAssignment = assignedExecutors.get(whb.get_port());
- if (localAssignment == null || localAssignment.get_topology_id() != whb.get_topology_id()) {
+ if (localAssignment == null || !localAssignment.get_topology_id().equals(whb.get_topology_id())) {
return false;
}
List<ExecutorInfo> executorInfos = new ArrayList<>();
@@ -230,61 +233,34 @@ public class SyncProcessEvent extends ShutdownWork implements Runnable {
// remove SYSTEM_EXECUTOR_ID
executorInfos.remove(new ExecutorInfo(-1, -1));
List<ExecutorInfo> localExecuorInfos = localAssignment.get_executors();
- if (executorInfos != localExecuorInfos)
- return false;
- return true;
- }
-
- /**
- * Returns map from worr id to heartbeat
- *
- * @param conf
- * @return
- * @throws Exception
- */
- protected Map<String, LSWorkerHeartbeat> readWorkerHeartbeats(Map conf) throws Exception {
- Map<String, LSWorkerHeartbeat> workerHeartbeats = new HashMap<>();
- Collection<String> workerIds = SupervisorUtils.supervisorWorkerIds(conf);
-
- for (String workerId : workerIds) {
- LSWorkerHeartbeat whb = readWorkerHeartbeat(conf, workerId);
- // ATTENTION: whb can be null
- workerHeartbeats.put(workerId, whb);
- }
- return workerHeartbeats;
- }
+ if (localExecuorInfos.size() != executorInfos.size())
+ return false;
- /**
- * get worker heartbeat by workerId
- *
- * @param conf
- * @param workerId
- * @return
- * @throws IOException
- */
- protected LSWorkerHeartbeat readWorkerHeartbeat(Map conf, String workerId) {
- try {
- LocalState localState = ConfigUtils.workerState(conf, workerId);
- return localState.getWorkerHeartBeat();
- } catch (Exception e) {
- LOG.warn("Failed to read local heartbeat for workerId : {},Ignoring exception.", workerId, e);
- return null;
+ for (ExecutorInfo executorInfo : localExecuorInfos){
+ if (!localExecuorInfos.contains(executorInfo))
+ return false;
}
+ return true;
}
/**
* launch a worker in local mode. But it may exist question???
*/
- protected void launchLocalWorker(String stormId, Integer port, String workerId, WorkerResources resources) throws IOException {
+ protected void launchLocalWorker(SupervisorData supervisorData, String stormId, Long port, String workerId, WorkerResources resources) throws IOException {
// port this function after porting worker to java
}
protected String getWorkerClassPath(String stormJar, Map stormConf) {
List<String> topoClasspath = new ArrayList<>();
Object object = stormConf.get(Config.TOPOLOGY_CLASSPATH);
- if (object != null) {
+
+ if (object instanceof List) {
topoClasspath.addAll((List<String>) object);
+ } else if (object instanceof String){
+ topoClasspath.add((String)object);
+ }else {
+ //ignore
}
String classPath = Utils.workerClasspath();
String classAddPath = Utils.addToClasspath(classPath, Arrays.asList(stormJar));
@@ -300,54 +276,46 @@ public class SyncProcessEvent extends ShutdownWork implements Runnable {
* @param port
* @param memOnheap
*/
- public List<String> substituteChildopts(Object value, String workerId, String stormId, Integer port, int memOnheap) {
+ public List<String> substituteChildopts(Object value, String workerId, String stormId, Long port, int memOnheap) {
List<String> rets = new ArrayList<>();
if (value instanceof String) {
String string = (String) value;
- string.replace("%ID%", String.valueOf(port));
- string.replace("%WORKER-ID%", workerId);
- string.replace("%TOPOLOGY-ID%", stormId);
- string.replace("%WORKER-PORT%", String.valueOf(port));
- string.replace("%HEAP-MEM%", String.valueOf(memOnheap));
+ string = string.replace("%ID%", String.valueOf(port));
+ string = string.replace("%WORKER-ID%", workerId);
+ string = string.replace("%TOPOLOGY-ID%", stormId);
+ string = string.replace("%WORKER-PORT%", String.valueOf(port));
+ string = string.replace("%HEAP-MEM%", String.valueOf(memOnheap));
String[] strings = string.split("\\s+");
rets.addAll(Arrays.asList(strings));
} else if (value instanceof List) {
- List<String> strings = (List<String>) value;
- for (String str : strings) {
- str.replace("%ID%", String.valueOf(port));
- str.replace("%WORKER-ID%", workerId);
- str.replace("%TOPOLOGY-ID%", stormId);
- str.replace("%WORKER-PORT%", String.valueOf(port));
- str.replace("%HEAP-MEM%", String.valueOf(memOnheap));
+ List<Object> objects = (List<Object>) value;
+ for (Object object : objects) {
+ String str = (String)object;
+ str = str.replace("%ID%", String.valueOf(port));
+ str = str.replace("%WORKER-ID%", workerId);
+ str = str.replace("%TOPOLOGY-ID%", stormId);
+ str = str.replace("%WORKER-PORT%", String.valueOf(port));
+ str = str.replace("%HEAP-MEM%", String.valueOf(memOnheap));
rets.add(str);
}
}
return rets;
}
- private String jvmCmd(String cmd) {
- String ret = null;
- String javaHome = System.getProperty("JAVA_HOME");
- if (StringUtils.isNotBlank(javaHome)) {
- ret = javaHome + Utils.FILE_PATH_SEPARATOR + "bin" + Utils.FILE_PATH_SEPARATOR + cmd;
- } else {
- ret = cmd;
- }
- return ret;
- }
+
/**
* launch a worker in distributed mode
- *
+ * supervisorId for testing
* @throws IOException
*/
- protected void launchDistributeWorker(String stormId, Integer port, String workerId, WorkerResources resources) throws IOException {
+ protected void launchDistributeWorker(Map conf, String supervisorId, String assignmentId, String stormId, Long port, String workerId,
+ WorkerResources resources, CgroupManager cgroupManager, ConcurrentHashSet deadWorkers) throws IOException {
- Map conf = supervisorData.getConf();
Boolean runWorkerAsUser = Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false);
- String stormHome = System.getProperty("storm.home");
- String stormOptions = System.getProperty("storm.options");
- String stormConfFile = System.getProperty("storm.conf.file");
+ String stormHome = ConfigUtils.concatIfNotNull(System.getProperty("storm.home"));
+ String stormOptions = ConfigUtils.concatIfNotNull(System.getProperty("storm.options"));
+ String stormConfFile = ConfigUtils.concatIfNotNull(System.getProperty("storm.conf.file"));
String stormLogDir = ConfigUtils.getLogDir();
String stormLogConfDir = (String) (conf.get(Config.STORM_LOG4J2_CONF_DIR));
@@ -384,7 +352,8 @@ public class SyncProcessEvent extends ShutdownWork implements Runnable {
if (resources.get_mem_on_heap() > 0) {
memOnheap = (int) Math.ceil(resources.get_mem_on_heap());
} else {
- memOnheap = Utils.getInt(stormConf.get(Config.WORKER_HEAP_MEMORY_MB));
+ //set the default heap memory size for supervisor-test
+ memOnheap = Utils.getInt(stormConf.get(Config.WORKER_HEAP_MEMORY_MB), 768);
}
int memoffheap = (int) Math.ceil(resources.get_mem_off_heap());
@@ -425,16 +394,16 @@ public class SyncProcessEvent extends ShutdownWork implements Runnable {
List<String> workerProfilerChildopts = null;
if (Utils.getBoolean(conf.get(Config.WORKER_PROFILER_ENABLED), false)) {
workerProfilerChildopts = substituteChildopts(conf.get(Config.WORKER_PROFILER_CHILDOPTS), workerId, stormId, port, memOnheap);
+ }else {
+ workerProfilerChildopts = new ArrayList<>();
}
- Map<String, String> environment = new HashMap<String, String>();
- Map<String, String> topEnvironment = (Map<String, String>) stormConf.get(Config.TOPOLOGY_ENVIRONMENT);
- if (topEnvironment != null) {
- environment.putAll(topEnvironment);
- environment.put("LD_LIBRARY_PATH", jlp);
- } else {
- environment.put("LD_LIBRARY_PATH", jlp);
+ Map<String, String> topEnvironment = new HashMap<String, String>();
+ Map<String, String> environment = (Map<String, String>) stormConf.get(Config.TOPOLOGY_ENVIRONMENT);
+ if (environment != null) {
+ topEnvironment.putAll(environment);
}
+ topEnvironment.put("LD_LIBRARY_PATH", jlp);
String log4jConfigurationFile = null;
if (System.getProperty("os.name").startsWith("Windows") && !stormLog4j2ConfDir.startsWith("file:")) {
@@ -444,10 +413,8 @@ public class SyncProcessEvent extends ShutdownWork implements Runnable {
}
log4jConfigurationFile = log4jConfigurationFile + Utils.FILE_PATH_SEPARATOR + "worker.xml";
- StringBuilder commandSB = new StringBuilder();
-
List<String> commandList = new ArrayList<>();
- commandList.add(jvmCmd("java"));
+ commandList.add(SupervisorUtils.javaCmd("java"));
commandList.add("-cp");
commandList.add(workerClassPath);
commandList.addAll(topoWorkerLogwriterChildopts);
@@ -462,7 +429,7 @@ public class SyncProcessEvent extends ShutdownWork implements Runnable {
commandList.add("-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector");
commandList.add("org.apache.storm.LogWriter");
- commandList.add(jvmCmd("java"));
+ commandList.add(SupervisorUtils.javaCmd("java"));
commandList.add("-server");
commandList.addAll(workerChildopts);
commandList.addAll(topWorkerChildopts);
@@ -476,7 +443,7 @@ public class SyncProcessEvent extends ShutdownWork implements Runnable {
commandList.add("-Dstorm.options=" + stormOptions);
commandList.add("-Dstorm.log.dir=" + stormLogDir);
commandList.add("-Dlogging.sensitivity=" + loggingSensitivity);
- commandList.add(" -Dlog4j.configurationFile=" + log4jConfigurationFile);
+ commandList.add("-Dlog4j.configurationFile=" + log4jConfigurationFile);
commandList.add("-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector");
commandList.add("-Dstorm.id=" + stormId);
commandList.add("-Dworker.id=" + workerId);
@@ -485,7 +452,7 @@ public class SyncProcessEvent extends ShutdownWork implements Runnable {
commandList.add(workerClassPath);
commandList.add("org.apache.storm.daemon.worker");
commandList.add(stormId);
- commandList.add(supervisorData.getAssignmentId());
+ commandList.add(assignmentId);
commandList.add(String.valueOf(port));
commandList.add(workerId);
@@ -497,27 +464,29 @@ public class SyncProcessEvent extends ShutdownWork implements Runnable {
Map<String, Number> map = new HashMap<>();
map.put("cpu", cpuValue);
map.put("memory", memoryValue);
- supervisorData.getResourceIsolationManager().reserveResourcesForWorker(workerId, map);
- commandList = supervisorData.getResourceIsolationManager().getLaunchCommand(workerId, commandList);
+ cgroupManager.reserveResourcesForWorker(workerId, map);
+ commandList = cgroupManager.getLaunchCommand(workerId, commandList);
}
- LOG.info("Launching worker with command: ", Utils.shellCmd(commandList));
+ LOG.info("Launching worker with command: {}. ", Utils.shellCmd(commandList));
writeLogMetadata(stormConf, user, workerId, stormId, port, conf);
ConfigUtils.setWorkerUserWSE(conf, workerId, user);
createArtifactsLink(conf, stormId, port, workerId);
String logPrefix = "Worker Process " + workerId;
String workerDir = ConfigUtils.workerRoot(conf, workerId);
- supervisorData.getDeadWorkers().remove(workerId);
+
+ if (deadWorkers != null)
+ deadWorkers.remove(workerId);
createBlobstoreLinks(conf, stormId, workerId);
ProcessExitCallback processExitCallback = new ProcessExitCallback(logPrefix, workerId);
if (runWorkerAsUser) {
- List<String> stringList = new ArrayList<>();
- stringList.add("worker");
- stringList.add(workerDir);
- stringList.add(Utils.writeScript(workerDir, commandList, topEnvironment));
- SupervisorUtils.workerLauncher(conf, user, stringList, null, logPrefix, processExitCallback, new File(workerDir));
+ List<String> args = new ArrayList<>();
+ args.add("worker");
+ args.add(workerDir);
+ args.add(Utils.writeScript(workerDir, commandList, topEnvironment));
+ SupervisorUtils.workerLauncher(conf, user, args, null, logPrefix, processExitCallback, new File(workerDir));
} else {
Utils.launchProcess(commandList, topEnvironment, logPrefix, processExitCallback, new File(workerDir));
}
@@ -536,6 +505,7 @@ public class SyncProcessEvent extends ShutdownWork implements Runnable {
Map<String, Integer> newValidWorkerIds = new HashMap<>();
Map conf = supervisorData.getConf();
+ String supervisorId = supervisorData.getSupervisorId();
String clusterMode = ConfigUtils.clusterMode(conf);
for (Map.Entry<Integer, LocalAssignment> entry : reassignExecutors.entrySet()) {
@@ -550,17 +520,20 @@ public class SyncProcessEvent extends ShutdownWork implements Runnable {
String pidsPath = ConfigUtils.workerPidsRoot(conf, workerId);
String hbPath = ConfigUtils.workerHeartbeatsRoot(conf, workerId);
+ LOG.info("Launching worker with assignment {} for this supervisor {} on port {} with id {}", assignment, supervisorData.getSupervisorId(), port,
+ workerId);
+
FileUtils.forceMkdir(new File(pidsPath));
FileUtils.forceMkdir(new File(hbPath));
if (clusterMode.endsWith("distributed")) {
- launchDistributeWorker(stormId, port, workerId, resources);
+ launchDistributeWorker(conf, supervisorId, supervisorData.getAssignmentId(), stormId, port.longValue(), workerId, resources,
+ supervisorData.getResourceIsolationManager(), supervisorData.getDeadWorkers());
} else if (clusterMode.endsWith("local")) {
- launchLocalWorker(stormId, port, workerId, resources);
+ launchLocalWorker(supervisorData, stormId, port.longValue(), workerId, resources);
}
newValidWorkerIds.put(workerId, port);
- LOG.info("Launching worker with assignment {} for this supervisor {} on port {} with id {}", assignment, supervisorData.getSupervisorId(), port,
- workerId);
+
} else {
LOG.info("Missing topology storm code, so can't launch worker with assignment {} for this supervisor {} on port {} with id {}", assignment,
supervisorData.getSupervisorId(), port, workerId);
@@ -570,26 +543,39 @@ public class SyncProcessEvent extends ShutdownWork implements Runnable {
return newValidWorkerIds;
}
- protected void writeLogMetadata(Map stormconf, String user, String workerId, String stormId, int port, Map conf) throws IOException {
+ public void writeLogMetadata(Map stormconf, String user, String workerId, String stormId, Long port, Map conf) throws IOException {
Map data = new HashMap();
data.put(Config.TOPOLOGY_SUBMITTER_USER, user);
data.put("worker-id", workerId);
Set<String> logsGroups = new HashSet<>();
+ //for supervisor-test
if (stormconf.get(Config.LOGS_GROUPS) != null) {
- logsGroups.addAll((List<String>) stormconf.get(Config.LOGS_GROUPS));
+ List<String> groups = (List<String>) stormconf.get(Config.LOGS_GROUPS);
+ for (String group : groups){
+ logsGroups.add(group);
+ }
}
if (stormconf.get(Config.TOPOLOGY_GROUPS) != null) {
- logsGroups.addAll((List<String>) stormconf.get(Config.TOPOLOGY_GROUPS));
+ List<String> topGroups = (List<String>) stormconf.get(Config.TOPOLOGY_GROUPS);
+ for (String group : topGroups){
+ logsGroups.add(group);
+ }
}
data.put(Config.LOGS_GROUPS, logsGroups.toArray());
Set<String> logsUsers = new HashSet<>();
if (stormconf.get(Config.LOGS_USERS) != null) {
- logsUsers.addAll((List<String>) stormconf.get(Config.LOGS_USERS));
+ List<String> logUsers = (List<String>) stormconf.get(Config.LOGS_USERS);
+ for (String logUser : logUsers){
+ logsUsers.add(logUser);
+ }
}
if (stormconf.get(Config.TOPOLOGY_USERS) != null) {
- logsUsers.addAll((List<String>) stormconf.get(Config.TOPOLOGY_USERS));
+ List<String> topUsers = (List<String>) stormconf.get(Config.TOPOLOGY_USERS);
+ for (String logUser : topUsers){
+ logsUsers.add(logUser);
+ }
}
data.put(Config.LOGS_USERS, logsUsers.toArray());
writeLogMetadataToYamlFile(stormId, port, data, conf);
@@ -604,19 +590,25 @@ public class SyncProcessEvent extends ShutdownWork implements Runnable {
* @param conf
* @throws IOException
*/
- protected void writeLogMetadataToYamlFile(String stormId, int port, Map data, Map conf) throws IOException {
- File file = ConfigUtils.getLogMetaDataFile(conf, stormId, port);
+ public void writeLogMetadataToYamlFile(String stormId, Long port, Map data, Map conf) throws IOException {
+ File file = ConfigUtils.getLogMetaDataFile(conf, stormId, port.intValue());
+
if (!Utils.checkFileExists(file.getParent())) {
if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
FileUtils.forceMkdir(file.getParentFile());
SupervisorUtils.setupStormCodeDir(conf, ConfigUtils.readSupervisorStormConf(conf, stormId), file.getParentFile().getCanonicalPath());
} else {
- file.getParentFile().mkdir();
+ file.getParentFile().mkdirs();
}
}
FileWriter writer = new FileWriter(file);
Yaml yaml = new Yaml();
- yaml.dump(data, writer);
+ try {
+ yaml.dump(data, writer);
+ }finally {
+ writer.close();
+ }
+
}
/**
@@ -627,7 +619,7 @@ public class SyncProcessEvent extends ShutdownWork implements Runnable {
* @param port
* @param workerId
*/
- protected void createArtifactsLink(Map conf, String stormId, int port, String workerId) throws IOException {
+ protected void createArtifactsLink(Map conf, String stormId, Long port, String workerId) throws IOException {
String workerDir = ConfigUtils.workerRoot(conf, workerId);
String topoDir = ConfigUtils.workerArtifactsRoot(conf, stormId);
if (Utils.checkFileExists(workerDir)) {
http://git-wip-us.apache.org/repos/asf/storm/blob/19fcafbd/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java
index d6dc45e..2de9203 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java
@@ -88,6 +88,7 @@ public class SyncSupervisorEvent implements Runnable {
Map<Integer, LocalAssignment> allAssignment =
readAssignments(assignmentsSnapshot, existingAssignment, supervisorData.getAssignmentId(), supervisorData.getSyncRetry());
+
Map<Integer, LocalAssignment> newAssignment = new HashMap<>();
Set<String> assignedStormIds = new HashSet<>();
@@ -97,6 +98,7 @@ public class SyncSupervisorEvent implements Runnable {
assignedStormIds.add(entry.getValue().get_topology_id());
}
}
+
Set<String> srashStormIds = verifyDownloadedFiles(conf, supervisorData.getLocalizer(), assignedStormIds, allDownloadedTopologyIds);
Set<String> downloadedStormIds = new HashSet<>();
downloadedStormIds.addAll(allDownloadedTopologyIds);
@@ -312,6 +314,7 @@ public class SyncSupervisorEvent implements Runnable {
}
FileUtils.moveDirectory(new File(tmproot), new File(stormroot));
+
SupervisorUtils.setupStormCodeDir(conf, ConfigUtils.readSupervisorStormConf(conf, stormId), stormroot);
ClassLoader classloader = Thread.currentThread().getContextClassLoader();
@@ -350,7 +353,7 @@ public class SyncSupervisorEvent implements Runnable {
String tmproot = ConfigUtils.supervisorTmpDir(conf) + Utils.FILE_PATH_SEPARATOR + Utils.uuid();
String stormroot = ConfigUtils.supervisorStormDistRoot(conf, stormId);
ClientBlobStore blobStore = Utils.getClientBlobStoreForSupervisor(conf);
-
+ FileUtils.forceMkdir(new File(tmproot));
if (Utils.isOnWindows()) {
if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
throw new RuntimeException("ERROR: Windows doesn't implement setting the correct permissions");
@@ -358,7 +361,6 @@ public class SyncSupervisorEvent implements Runnable {
} else {
Utils.restrictPermissions(tmproot);
}
- FileUtils.forceMkdir(new File(tmproot));
String stormJarKey = ConfigUtils.masterStormJarKey(stormId);
String stormCodeKey = ConfigUtils.masterStormCodeKey(stormId);
String stormConfKey = ConfigUtils.masterStormConfKey(stormId);
@@ -549,7 +551,7 @@ public class SyncSupervisorEvent implements Runnable {
for (Map.Entry<List<Long>, NodeInfo> entry : executorNodePort.entrySet()) {
if (entry.getValue().get_node().equals(assignmentId)) {
for (Long port : entry.getValue().get_port()) {
- LocalAssignment localAssignment = portTasks.get(port);
+ LocalAssignment localAssignment = portTasks.get(port.intValue());
if (localAssignment == null) {
List<ExecutorInfo> executors = new ArrayList<ExecutorInfo>();
localAssignment = new LocalAssignment(stormId, executors);
@@ -577,8 +579,7 @@ public class SyncSupervisorEvent implements Runnable {
assignedExecutors = new HashMap<>();
}
int now = Time.currentTimeSecs();
- SyncProcessEvent syncProcesses = new SyncProcessEvent(supervisorData);
- Map<String, StateHeartbeat> workerIdHbstate = syncProcesses.getLocalWorkerStats(assignedExecutors, now);
+ Map<String, StateHeartbeat> workerIdHbstate = syncProcesses.getLocalWorkerStats(supervisorData, assignedExecutors, now);
LOG.debug("Allocated workers ", assignedExecutors);
for (Map.Entry<String, StateHeartbeat> entry : workerIdHbstate.entrySet()){
String workerId = entry.getKey();
http://git-wip-us.apache.org/repos/asf/storm/blob/19fcafbd/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java
index 2d73327..91044cc 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java
@@ -41,7 +41,6 @@ public class RunProfilerActions implements Runnable {
private Map conf;
private IStormClusterState stormClusterState;
private String hostName;
- private String stormHome;
private String profileCmd;
@@ -79,7 +78,6 @@ public class RunProfilerActions implements Runnable {
this.conf = supervisorData.getConf();
this.stormClusterState = supervisorData.getStormClusterState();
this.hostName = supervisorData.getHostName();
- this.stormHome = System.getProperty("storm.home");
this.profileCmd = (String) (conf.get(Config.WORKER_PROFILER_COMMAND));
this.supervisorData = supervisorData;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/19fcafbd/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
index d41ca87..e158dbc 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
@@ -22,6 +22,7 @@ import org.apache.storm.cluster.IStormClusterState;
import org.apache.storm.daemon.supervisor.SupervisorData;
import org.apache.storm.generated.SupervisorInfo;
import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
import java.util.ArrayList;
import java.util.HashMap;
@@ -53,13 +54,16 @@ public class SupervisorHeartbeat implements Runnable {
List<Long> usedPorts = new ArrayList<>();
usedPorts.addAll(supervisorData.getCurrAssignment().keySet());
supervisorInfo.set_used_ports(usedPorts);
+ List metaDatas = (List)supervisorData.getiSupervisor().getMetadata();
List<Long> portList = new ArrayList<>();
- Object metas = supervisorData.getiSupervisor().getMetadata();
- if (metas != null) {
- for (Integer port : (List<Integer>) metas) {
- portList.add(port.longValue());
+ if (metaDatas != null){
+ for (Object data : metaDatas){
+ Integer port = Utils.getInt(data);
+ if (port != null)
+ portList.add(port.longValue());
}
}
+
supervisorInfo.set_meta(portList);
supervisorInfo.set_scheduler_meta((Map<String, String>) conf.get(Config.SUPERVISOR_SCHEDULER_META));
supervisorInfo.set_uptime_secs(supervisorData.getUpTime().upTime());
http://git-wip-us.apache.org/repos/asf/storm/blob/19fcafbd/storm-core/src/jvm/org/apache/storm/testing/staticmocking/MockedSupervisorUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/testing/staticmocking/MockedSupervisorUtils.java b/storm-core/src/jvm/org/apache/storm/testing/staticmocking/MockedSupervisorUtils.java
new file mode 100644
index 0000000..d33dc9c
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/testing/staticmocking/MockedSupervisorUtils.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.testing.staticmocking;
+
+import org.apache.storm.daemon.supervisor.SupervisorUtils;
+
+public class MockedSupervisorUtils implements AutoCloseable {
+
+ public MockedSupervisorUtils(SupervisorUtils inst) {
+ SupervisorUtils.setInstance(inst);
+ }
+
+ @Override
+ public void close() throws Exception {
+ SupervisorUtils.resetInstance();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/19fcafbd/storm-core/src/jvm/org/apache/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/Utils.java b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
index 1ba3de7..4e3dbb4 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/Utils.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
@@ -215,7 +215,7 @@ public class Utils {
try {
T ret = (T) c.newInstance();
TDeserializer des = getDes();
- des.deserialize((TBase)ret, b, offset, length);
+ des.deserialize((TBase) ret, b, offset, length);
return ret;
} catch (Exception e) {
throw new RuntimeException(e);
@@ -1700,7 +1700,7 @@ public class Utils {
if(map == null) {
return null;
}
- return findOne(pred, (Set<T>)map.entrySet());
+ return findOne(pred, (Set<T>) map.entrySet());
}
public static String localHostname () throws UnknownHostException {
http://git-wip-us.apache.org/repos/asf/storm/blob/19fcafbd/storm-core/test/clj/org/apache/storm/logviewer_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/logviewer_test.clj b/storm-core/test/clj/org/apache/storm/logviewer_test.clj
index 4889c8e..d06c11c 100644
--- a/storm-core/test/clj/org/apache/storm/logviewer_test.clj
+++ b/storm-core/test/clj/org/apache/storm/logviewer_test.clj
@@ -15,8 +15,7 @@
;; limitations under the License.
(ns org.apache.storm.logviewer-test
(:use [org.apache.storm config util])
- (:require [org.apache.storm.daemon [logviewer :as logviewer]
- [supervisor :as supervisor]])
+ (:require [org.apache.storm.daemon [logviewer :as logviewer]])
(:require [conjure.core])
(:use [clojure test])
(:use [conjure core])
@@ -24,7 +23,10 @@
[org.apache.storm.ui helpers])
(:import [org.apache.storm.daemon DirectoryCleaner]
[org.apache.storm.utils Utils Time]
- [org.apache.storm.utils.staticmocking UtilsInstaller])
+ [org.apache.storm.utils.staticmocking UtilsInstaller]
+ [org.apache.storm.daemon.supervisor SupervisorUtils]
+ [org.apache.storm.testing.staticmocking MockedSupervisorUtils]
+ [org.apache.storm.generated LSWorkerHeartbeat])
(:import [java.nio.file Files Path DirectoryStream])
(:import [java.nio.file Files])
(:import [java.nio.file.attribute FileAttribute])
@@ -236,25 +238,33 @@
mock-metaFile (mk-mock-File {:name "worker.yaml"
:type :file})
exp-id "id12345"
- expected {exp-id port1-dir}]
- (stubbing [supervisor/read-worker-heartbeats nil
- logviewer/get-metadata-file-for-wroker-logdir mock-metaFile
- logviewer/get-worker-id-from-metadata-file exp-id]
- (is (= expected (logviewer/identify-worker-log-dirs [port1-dir])))))))
+ expected {exp-id port1-dir}
+ supervisor-util (Mockito/mock SupervisorUtils)]
+ (with-open [_ (MockedSupervisorUtils. supervisor-util)]
+ (stubbing [logviewer/get-metadata-file-for-wroker-logdir mock-metaFile
+ logviewer/get-worker-id-from-metadata-file exp-id]
+ (. (Mockito/when (.readWorkerHeartbeatsImpl supervisor-util (Mockito/any))) (thenReturn nil))
+ (is (= expected (logviewer/identify-worker-log-dirs [port1-dir]))))))))
+
+
(deftest test-get-dead-worker-dirs
(testing "removes any files of workers that are still alive"
(let [conf {SUPERVISOR-WORKER-TIMEOUT-SECS 5}
- id->hb {"42" {:time-secs 1}}
+ hb (let[lwb (LSWorkerHeartbeat.)]
+ (.set_time_secs lwb (int 1)) lwb)
+ id->hb {"42" hb}
now-secs 2
unexpected-dir (mk-mock-File {:name "dir1" :type :directory})
expected-dir (mk-mock-File {:name "dir2" :type :directory})
- log-dirs #{unexpected-dir expected-dir}]
+ log-dirs #{unexpected-dir expected-dir}
+ supervisor-util (Mockito/mock SupervisorUtils)]
+ (with-open [_ (MockedSupervisorUtils. supervisor-util)]
(stubbing [logviewer/identify-worker-log-dirs {"42" unexpected-dir,
- "007" expected-dir}
- supervisor/read-worker-heartbeats id->hb]
+ "007" expected-dir}]
+ (. (Mockito/when (.readWorkerHeartbeatsImpl supervisor-util (Mockito/any))) (thenReturn id->hb))
(is (= #{expected-dir}
- (logviewer/get-dead-worker-dirs conf now-secs log-dirs)))))))
+ (logviewer/get-dead-worker-dirs conf now-secs log-dirs))))))))
(deftest test-cleanup-fn
(testing "cleanup function forceDeletes files of dead workers"