You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2018/03/28 21:22:12 UTC
[02/10] storm git commit: [STORM-2693] Heartbeats and assignments
promotion for storm2.0
http://git-wip-us.apache.org/repos/asf/storm/blob/0dac58b0/storm-server/src/main/java/org/apache/storm/daemon/supervisor/RunAsUserContainer.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/RunAsUserContainer.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/RunAsUserContainer.java
index 44008b2..2559d7a 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/RunAsUserContainer.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/RunAsUserContainer.java
@@ -34,17 +34,20 @@ import org.slf4j.LoggerFactory;
public class RunAsUserContainer extends BasicContainer {
private static final Logger LOG = LoggerFactory.getLogger(RunAsUserContainer.class);
- public RunAsUserContainer(Container.ContainerType type, Map<String, Object> conf, String supervisorId, int port,
- LocalAssignment assignment, ResourceIsolationInterface resourceIsolationManager, LocalState localState,
+ public RunAsUserContainer(Container.ContainerType type, Map<String, Object> conf, String supervisorId,
+ int supervisorPort, int port, LocalAssignment assignment,
+ ResourceIsolationInterface resourceIsolationManager, LocalState localState,
String workerId) throws IOException {
- this(type, conf, supervisorId, port, assignment, resourceIsolationManager, localState, workerId, null, null, null);
+ this(type, conf, supervisorId, supervisorPort, port, assignment, resourceIsolationManager, localState, workerId,
+ null, null, null);
}
- RunAsUserContainer(Container.ContainerType type, Map<String, Object> conf, String supervisorId, int port,
- LocalAssignment assignment, ResourceIsolationInterface resourceIsolationManager, LocalState localState,
- String workerId, Map<String, Object> topoConf, AdvancedFSOps ops, String profileCmd) throws IOException {
- super(type, conf, supervisorId, port, assignment, resourceIsolationManager, localState, workerId, topoConf, ops,
- profileCmd);
+ RunAsUserContainer(Container.ContainerType type, Map<String, Object> conf, String supervisorId, int supervisorPort,
+ int port, LocalAssignment assignment, ResourceIsolationInterface resourceIsolationManager,
+ LocalState localState, String workerId, Map<String, Object> topoConf, AdvancedFSOps ops,
+ String profileCmd) throws IOException {
+ super(type, conf, supervisorId, supervisorPort, port, assignment, resourceIsolationManager, localState,
+ workerId, topoConf, ops, profileCmd);
if (Utils.isOnWindows()) {
throw new UnsupportedOperationException("ERROR: Windows doesn't support running workers as different users yet");
}
http://git-wip-us.apache.org/repos/asf/storm/blob/0dac58b0/storm-server/src/main/java/org/apache/storm/daemon/supervisor/RunAsUserContainerLauncher.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/RunAsUserContainerLauncher.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/RunAsUserContainerLauncher.java
index c8bee27..e6439db 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/RunAsUserContainerLauncher.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/RunAsUserContainerLauncher.java
@@ -28,18 +28,21 @@ import org.apache.storm.utils.LocalState;
public class RunAsUserContainerLauncher extends ContainerLauncher {
private final Map<String, Object> _conf;
private final String _supervisorId;
+ private final int _supervisorPort;
protected final ResourceIsolationInterface _resourceIsolationManager;
- public RunAsUserContainerLauncher(Map<String, Object> conf, String supervisorId, ResourceIsolationInterface resourceIsolationManager) throws IOException {
+ public RunAsUserContainerLauncher(Map<String, Object> conf, String supervisorId, int supervisorPort,
+ ResourceIsolationInterface resourceIsolationManager) throws IOException {
_conf = conf;
_supervisorId = supervisorId;
+ _supervisorPort = supervisorPort;
_resourceIsolationManager = resourceIsolationManager;
}
@Override
public Container launchContainer(int port, LocalAssignment assignment, LocalState state) throws IOException {
- Container container = new RunAsUserContainer(ContainerType.LAUNCH, _conf, _supervisorId, port, assignment,
- _resourceIsolationManager, state, null, null, null, null);
+ Container container = new RunAsUserContainer(ContainerType.LAUNCH, _conf, _supervisorId, _supervisorPort, port,
+ assignment, _resourceIsolationManager, state, null, null, null, null);
container.setup();
container.launch();
return container;
@@ -47,13 +50,13 @@ public class RunAsUserContainerLauncher extends ContainerLauncher {
@Override
public Container recoverContainer(int port, LocalAssignment assignment, LocalState state) throws IOException {
- return new RunAsUserContainer(ContainerType.RECOVER_FULL, _conf, _supervisorId, port, assignment,
- _resourceIsolationManager, state, null, null, null, null);
+ return new RunAsUserContainer(ContainerType.RECOVER_FULL, _conf, _supervisorId, _supervisorPort, port,
+ assignment, _resourceIsolationManager, state, null, null, null, null);
}
@Override
public Killable recoverContainer(String workerId, LocalState localState) throws IOException {
- return new RunAsUserContainer(ContainerType.RECOVER_PARTIAL, _conf, _supervisorId, -1, null,
+ return new RunAsUserContainer(ContainerType.RECOVER_PARTIAL, _conf, _supervisorId, _supervisorPort, -1, null,
_resourceIsolationManager, localState, workerId, null, null, null);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/0dac58b0/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
index 6c90c0e..4e5b861 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
@@ -20,6 +20,8 @@ package org.apache.storm.daemon.supervisor;
import java.io.File;
import java.io.IOException;
+import java.net.BindException;
+import java.net.ServerSocket;
import java.net.UnknownHostException;
import java.util.Collection;
import java.util.HashMap;
@@ -29,7 +31,10 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
+
+import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.io.FileUtils;
+import org.apache.storm.Config;
import org.apache.storm.DaemonConfig;
import org.apache.storm.StormTimer;
import org.apache.storm.cluster.ClusterStateContext;
@@ -37,22 +42,41 @@ import org.apache.storm.cluster.ClusterUtils;
import org.apache.storm.cluster.DaemonType;
import org.apache.storm.cluster.IStormClusterState;
import org.apache.storm.daemon.DaemonCommon;
+import org.apache.storm.daemon.StormCommon;
+import org.apache.storm.daemon.supervisor.timer.ReportWorkerHeartbeats;
import org.apache.storm.daemon.supervisor.timer.SupervisorHealthCheck;
import org.apache.storm.daemon.supervisor.timer.SupervisorHeartbeat;
+import org.apache.storm.daemon.supervisor.timer.SynchronizeAssignments;
import org.apache.storm.event.EventManager;
import org.apache.storm.event.EventManagerImp;
+import org.apache.storm.generated.Assignment;
+import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.generated.Nimbus;
+import org.apache.storm.generated.NotAliveException;
+import org.apache.storm.generated.SupervisorAssignments;
+import org.apache.storm.generated.SupervisorWorkerHeartbeat;
import org.apache.storm.localizer.AsyncLocalizer;
+import org.apache.storm.logging.ThriftAccessLogger;
import org.apache.storm.messaging.IContext;
import org.apache.storm.metric.StormMetricsRegistry;
import org.apache.storm.scheduler.ISupervisor;
+import org.apache.storm.security.auth.IAuthorizer;
+import org.apache.storm.security.auth.ReqContext;
+import org.apache.storm.security.auth.ThriftConnectionType;
+import org.apache.storm.security.auth.ThriftServer;
import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.ServerConfigUtils;
+import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.LocalState;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.ServerConfigUtils;
import org.apache.storm.utils.Time;
import org.apache.storm.utils.Utils;
import org.apache.storm.utils.VersionInfo;
+import org.apache.thrift.TException;
+import org.apache.thrift.TProcessor;
+import org.apache.zookeeper.data.ACL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -60,6 +84,7 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(Supervisor.class);
private final Map<String, Object> conf;
private final IContext sharedContext;
+ private final IAuthorizer authorizationHandler;
private volatile boolean active;
private final ISupervisor iSupervisor;
private final Utils.UptimeComputer upTime;
@@ -68,10 +93,12 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
private final LocalState localState;
private final String supervisorId;
private final String assignmentId;
+ private final int supervisorPort;
private final String hostName;
// used for reporting used ports when heartbeating
private final AtomicReference<Map<Long, LocalAssignment>> currAssignment;
private final StormTimer heartbeatTimer;
+ private final StormTimer workerHeartbeatTimer;
private final StormTimer eventTimer;
//Right now this is only used for sending metrics to nimbus,
// but we may want to combine it with the heartbeatTimer at some point
@@ -80,12 +107,24 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
private final AsyncLocalizer asyncLocalizer;
private EventManager eventManager;
private ReadClusterState readState;
+ private ThriftServer thriftServer;
+ //used for local cluster heartbeating
+ private Nimbus.Iface localNimbus;
- private Supervisor(ISupervisor iSupervisor) throws IOException {
+ private Supervisor(ISupervisor iSupervisor)
+ throws IOException, IllegalAccessException, InstantiationException, ClassNotFoundException {
this(Utils.readStormConfig(), null, iSupervisor);
}
-
- public Supervisor(Map<String, Object> conf, IContext sharedContext, ISupervisor iSupervisor) throws IOException {
+
+ /**
+ * Constructor for supervisor daemon.
+ * @param conf config
+ * @param sharedContext {@link IContext}
+ * @param iSupervisor {@link ISupervisor}
+ * @throws IOException
+ */
+ public Supervisor(Map<String, Object> conf, IContext sharedContext, ISupervisor iSupervisor)
+ throws IOException, IllegalAccessException, ClassNotFoundException, InstantiationException{
this.conf = conf;
this.iSupervisor = iSupervisor;
this.active = true;
@@ -93,11 +132,18 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
this.stormVersion = VersionInfo.getVersion();
this.sharedContext = sharedContext;
this.heartbeatExecutor = Executors.newFixedThreadPool(1);
+ this.authorizationHandler = StormCommon.mkAuthorizationHandler(
+ (String) conf.get(DaemonConfig.SUPERVISOR_AUTHORIZER), conf);
+ if (authorizationHandler == null && conf.get(DaemonConfig.NIMBUS_AUTHORIZER) != null) {
+ throw new IllegalStateException("It looks like authorization is turned on for nimbus but not for the "
+ + "supervisor....");
+ }
iSupervisor.prepare(conf, ServerConfigUtils.supervisorIsupervisorDir(conf));
try {
- this.stormClusterState = ClusterUtils.mkStormClusterState(conf, new ClusterStateContext(DaemonType.SUPERVISOR, conf));
+ this.stormClusterState = ClusterUtils.mkStormClusterState(conf,
+ new ClusterStateContext(DaemonType.SUPERVISOR, conf));
} catch (Exception e) {
LOG.error("supervisor can't create stormClusterState");
throw Utils.wrapInRuntime(e);
@@ -113,6 +159,7 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
}
this.supervisorId = iSupervisor.getSupervisorId();
this.assignmentId = iSupervisor.getAssignmentId();
+ this.supervisorPort = ObjectReader.getInt(conf.get(Config.SUPERVISOR_THRIFT_PORT));
try {
this.hostName = Utils.hostname();
@@ -122,6 +169,8 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
this.heartbeatTimer = new StormTimer(null, new DefaultUncaughtExceptionHandler());
+ this.workerHeartbeatTimer = new StormTimer(null, new DefaultUncaughtExceptionHandler());
+
this.eventTimer = new StormTimer(null, new DefaultUncaughtExceptionHandler());
}
@@ -160,6 +209,10 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
return stormClusterState;
}
+ public ReadClusterState getReadClusterState() {
+ return readState;
+ }
+
LocalState getLocalState() {
return localState;
}
@@ -168,6 +221,10 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
return assignmentId;
}
+ public int getThriftServerPort() {
+ return supervisorPort;
+ }
+
public String getHostName() {
return hostName;
}
@@ -183,6 +240,18 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
EventManager getEventManger() {
return eventManager;
}
+
+ Supervisor getSupervisor() {
+ return this;
+ }
+
+ public void setLocalNimbus(Nimbus.Iface nimbus) {
+ this.localNimbus = nimbus;
+ }
+
+ public Nimbus.Iface getLocalNimbus() {
+ return this.localNimbus;
+ }
/**
* Launch the supervisor.
@@ -206,11 +275,16 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
if ((Boolean) conf.get(DaemonConfig.SUPERVISOR_ENABLE)) {
// This isn't strictly necessary, but it doesn't hurt and ensures that the machine stays up
// to date even if callbacks don't all work exactly right
- eventTimer.scheduleRecurring(0, 10, new EventManagerPushCallback(readState, eventManager));
+ eventTimer.scheduleRecurring(0, 10,
+ new EventManagerPushCallback(new SynchronizeAssignments(this, null, readState), eventManager));
// supervisor health check
eventTimer.scheduleRecurring(30, 30, new SupervisorHealthCheck(this));
}
+
+ ReportWorkerHeartbeats reportWorkerHeartbeats = new ReportWorkerHeartbeats(conf, this);
+ Integer workerHeartbeatFrequency = ObjectReader.getInt(conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS));
+ workerHeartbeatTimer.scheduleRecurring(0, workerHeartbeatFrequency, reportWorkerHeartbeats);
LOG.info("Starting supervisor with id {} at host {}.", getId(), getHostName());
}
@@ -225,6 +299,8 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
throw new IllegalArgumentException("Cannot start server in local mode!");
}
launch();
+ //must invoke after launch cause some services must be initialized
+ launchSupervisorThriftServer(conf);
Utils.addShutdownHookWithForceKillIn1Sec(this::close);
registerWorkerNumGauge("supervisor:num-slots-used-gauge", conf);
StormMetricsRegistry.startMetricsReporters(conf);
@@ -234,6 +310,125 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
}
}
+ @VisibleForTesting
+ public void checkAuthorization(String operation) throws AuthorizationException {
+ checkAuthorization(null, null, operation, null);
+ }
+
+ @VisibleForTesting
+ public void checkAuthorization(String topoName, Map<String, Object> topoConf, String operation)
+ throws AuthorizationException {
+ checkAuthorization(topoName, topoConf, operation, null);
+ }
+
+ @VisibleForTesting
+ public void checkAuthorization(String topoName, Map<String, Object> topoConf, String operation, ReqContext context)
+ throws AuthorizationException {
+ IAuthorizer aclHandler = authorizationHandler;
+ if (context == null) {
+ context = ReqContext.context();
+ }
+ Map<String, Object> checkConf = new HashMap<>();
+ if (topoConf != null) {
+ checkConf.putAll(topoConf);
+ } else if (topoName != null) {
+ checkConf.put(Config.TOPOLOGY_NAME, topoName);
+ }
+
+ if (context.isImpersonating()) {
+ LOG.warn("principal: {} is trying to impersonate principal: {}", context.realPrincipal(),
+ context.principal());
+ throw new AuthorizationException("Supervisor does not support impersonation");
+ }
+
+ if (aclHandler != null) {
+ if (!aclHandler.permit(context, operation, checkConf)) {
+ ThriftAccessLogger.logAccess(context.requestID(), context.remoteAddress(), context.principal(),
+ operation, topoName, "access-denied");
+ throw new AuthorizationException( operation + (topoName != null ? " on topology " + topoName : "") +
+ " is not authorized");
+ } else {
+ ThriftAccessLogger.logAccess(context.requestID(), context.remoteAddress(), context.principal(),
+ operation, topoName, "access-granted");
+ }
+ }
+ }
+
+ private void launchSupervisorThriftServer(Map<String, Object> conf) throws IOException {
+ // validate port
+ int port = getThriftServerPort();
+ try {
+ ServerSocket socket = new ServerSocket(port);
+ socket.close();
+ } catch (BindException e) {
+ LOG.error("{} is not available. Check if another process is already listening on {}", port, port);
+ throw new RuntimeException(e);
+ }
+
+ TProcessor processor = new org.apache.storm.generated.Supervisor.Processor(
+ new org.apache.storm.generated.Supervisor.Iface() {
+ @Override
+ public void sendSupervisorAssignments(SupervisorAssignments assignments)
+ throws AuthorizationException, TException {
+ checkAuthorization("sendSupervisorAssignments");
+ LOG.info("Got an assignments from master, will start to sync with assignments: {}", assignments);
+ SynchronizeAssignments syn = new SynchronizeAssignments(getSupervisor(), assignments,
+ getReadClusterState());
+ getEventManger().add(syn);
+ }
+
+ @Override
+ public Assignment getLocalAssignmentForStorm(String id)
+ throws NotAliveException, AuthorizationException, TException {
+ Map<String, Object> topoConf = null;
+ try {
+ topoConf = ConfigUtils.readSupervisorStormConf(conf, id);
+ } catch (IOException e) {
+ LOG.warn("Topology config is not localized yet...");
+ }
+ checkAuthorization(id, topoConf, "getLocalAssignmentForStorm");
+ Assignment assignment = getStormClusterState().assignmentInfo(id, null);
+ if (null == assignment) {
+ throw new NotAliveException("No local assignment assigned for storm: "
+ + id
+ + " for node: "
+ + getHostName());
+ }
+ return assignment;
+ }
+
+ @Override
+ public void sendSupervisorWorkerHeartbeat(SupervisorWorkerHeartbeat heartbeat)
+ throws AuthorizationException, NotAliveException, TException {
+ // do nothing except validate heartbeat for now.
+ String id = heartbeat.get_storm_id();
+ Map<String, Object> topoConf = null;
+ try {
+ topoConf = ConfigUtils.readSupervisorStormConf(conf, id);
+ } catch (IOException e) {
+ LOG.warn("Topology config is not localized yet...");
+ throw new NotAliveException(id + " does not appear to be alive, you should probably exit");
+ }
+ checkAuthorization(id, topoConf, "sendSupervisorWorkerHeartbeat");
+ }
+ });
+ this.thriftServer = new ThriftServer(conf, processor, ThriftConnectionType.SUPERVISOR);
+ this.thriftServer.serve();
+ }
+
+ /**
+ * Used for local cluster assignments distribution.
+ * @param assignments {@link SupervisorAssignments}
+ */
+ public void sendSupervisorAssignments(SupervisorAssignments assignments) {
+ //for local test
+ if (Time.isSimulating() && !(Boolean) conf.get(DaemonConfig.SUPERVISOR_ENABLE)) {
+ return;
+ }
+ SynchronizeAssignments syn = new SynchronizeAssignments(this, assignments, readState);
+ eventManager.add(syn);
+ }
+
private void registerWorkerNumGauge(String name, final Map<String, Object> conf) {
StormMetricsRegistry.registerGauge(name, new Callable<Integer>() {
@Override
@@ -250,6 +445,7 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
LOG.info("Shutting down supervisor {}", getId());
this.active = false;
heartbeatTimer.close();
+ workerHeartbeatTimer.close();
eventTimer.close();
if (eventManager != null) {
eventManager.close();
@@ -259,6 +455,9 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
}
asyncLocalizer.close();
getStormClusterState().disconnect();
+ if(thriftServer != null) {
+ this.thriftServer.stop();
+ }
} catch (Exception e) {
LOG.error("Error Shutting down", e);
}
@@ -307,7 +506,8 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
readState.shutdownAllWorkers(onWarnTimeout, onErrorTimeout);
} else {
try {
- ContainerLauncher launcher = ContainerLauncher.make(getConf(), getId(), getSharedContext());
+ ContainerLauncher launcher = ContainerLauncher.make(getConf(), getId(), getThriftServerPort(),
+ getSharedContext());
killWorkers(SupervisorUtils.supervisorWorkerIds(conf), launcher);
} catch (Exception e) {
throw Utils.wrapInRuntime(e);
@@ -321,18 +521,18 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
return true;
}
- if (heartbeatTimer.isTimerWaiting() && eventTimer.isTimerWaiting() && eventManager.waiting()) {
- return true;
- }
- return false;
+ return heartbeatTimer.isTimerWaiting()
+ && workerHeartbeatTimer.isTimerWaiting()
+ && eventTimer.isTimerWaiting()
+ && eventManager.waiting();
}
/**
- * supervisor daemon enter entrance
+ * supervisor daemon enter entrance.
*
* @param args
*/
- public static void main(String[] args) throws IOException {
+ public static void main(String[] args) throws Exception {
Utils.setupDefaultUncaughtExceptionHandler();
@SuppressWarnings("resource")
Supervisor instance = new Supervisor(new StandaloneSupervisor());
http://git-wip-us.apache.org/repos/asf/storm/blob/0dac58b0/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java
index aba9459..90e5451 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java
@@ -15,8 +15,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.storm.daemon.supervisor;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
import org.apache.storm.Config;
import org.apache.storm.generated.LSWorkerHeartbeat;
import org.apache.storm.localizer.LocalResource;
@@ -30,13 +38,6 @@ import org.apache.zookeeper.data.ACL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
public class SupervisorUtils {
private static final Logger LOG = LoggerFactory.getLogger(SupervisorUtils.class);
@@ -75,7 +76,7 @@ public class SupervisorUtils {
/**
* Given the blob information returns the value of the workerRestart field, handling it either being a string or a boolean value, or
- * if it's not specified then returns false
+ * if it's not specified then returns false.
*
* @param blobInfo the info for the blob.
* @return true if the blob needs a worker restart by way of the callback else false.
http://git-wip-us.apache.org/repos/asf/storm/blob/0dac58b0/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/ReportWorkerHeartbeats.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/ReportWorkerHeartbeats.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/ReportWorkerHeartbeats.java
new file mode 100644
index 0000000..c01819b
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/ReportWorkerHeartbeats.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.daemon.supervisor.timer;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.storm.daemon.supervisor.Supervisor;
+import org.apache.storm.daemon.supervisor.SupervisorUtils;
+import org.apache.storm.generated.LSWorkerHeartbeat;
+import org.apache.storm.generated.SupervisorWorkerHeartbeat;
+import org.apache.storm.generated.SupervisorWorkerHeartbeats;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.NimbusClient;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Runnable reporting local worker reported heartbeats to master, supervisor should take care the of the heartbeats
+ * integrity for the master heartbeats recovery, a non-null node id means that the heartbeats are full,
+ * and master can go on to check and wait others nodes when doing a heartbeats recovery.
+ */
+public class ReportWorkerHeartbeats implements Runnable {
+ private static final Logger LOG = LoggerFactory.getLogger(ReportWorkerHeartbeats.class);
+
+ private Supervisor supervisor;
+ private Map<String, Object> conf;
+
+ public ReportWorkerHeartbeats(Map<String, Object> conf, Supervisor supervisor) {
+ this.conf = conf;
+ this.supervisor = supervisor;
+ }
+
+ @Override
+ public void run() {
+ SupervisorWorkerHeartbeats supervisorWorkerHeartbeats = getAndResetWorkerHeartbeats();
+ reportWorkerHeartbeats(supervisorWorkerHeartbeats);
+ }
+
+ private SupervisorWorkerHeartbeats getAndResetWorkerHeartbeats() {
+ Map<String, LSWorkerHeartbeat> localHeartbeats;
+ try {
+ localHeartbeats = SupervisorUtils.readWorkerHeartbeats(this.conf);
+ return getSupervisorWorkerHeartbeatsFromLocal(localHeartbeats);
+ } catch (Exception e) {
+ LOG.error("Read local worker heartbeats error, skipping heartbeats for this round, msg:{}", e.getMessage());
+ return null;
+ }
+ }
+
+ private SupervisorWorkerHeartbeats getSupervisorWorkerHeartbeatsFromLocal(Map<String, LSWorkerHeartbeat> localHeartbeats) {
+ SupervisorWorkerHeartbeats supervisorWorkerHeartbeats = new SupervisorWorkerHeartbeats();
+
+ List<SupervisorWorkerHeartbeat> heartbeatList = new ArrayList<>();
+
+ for (LSWorkerHeartbeat lsWorkerHeartbeat : localHeartbeats.values()) {
+ // local worker heartbeat can be null cause some error/exception
+ if (null == lsWorkerHeartbeat) {
+ continue;
+ }
+
+ SupervisorWorkerHeartbeat supervisorWorkerHeartbeat = new SupervisorWorkerHeartbeat();
+ supervisorWorkerHeartbeat.set_storm_id(lsWorkerHeartbeat.get_topology_id());
+ supervisorWorkerHeartbeat.set_executors(lsWorkerHeartbeat.get_executors());
+ supervisorWorkerHeartbeat.set_time_secs(lsWorkerHeartbeat.get_time_secs());
+
+ heartbeatList.add(supervisorWorkerHeartbeat);
+ }
+ supervisorWorkerHeartbeats.set_supervisor_id(this.supervisor.getId());
+ supervisorWorkerHeartbeats.set_worker_heartbeats(heartbeatList);
+ return supervisorWorkerHeartbeats;
+ }
+
+ private void reportWorkerHeartbeats(SupervisorWorkerHeartbeats supervisorWorkerHeartbeats) {
+ if (supervisorWorkerHeartbeats == null) {
+ // error/exception thrown, just skip
+ return;
+ }
+ // if it is local mode, just get the local nimbus instance and set the heartbeats
+ if (ConfigUtils.isLocalMode(conf)) {
+ try {
+ this.supervisor.getLocalNimbus().sendSupervisorWorkerHeartbeats(supervisorWorkerHeartbeats);
+ } catch (TException tex) {
+ LOG.error("Send local supervisor heartbeats error", tex);
+ }
+ } else {
+ try (NimbusClient master = NimbusClient.getConfiguredClient(conf)) {
+ master.getClient().sendSupervisorWorkerHeartbeats(supervisorWorkerHeartbeats);
+ } catch (Exception t) {
+ LOG.error("Send worker heartbeats to master exception", t);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/0dac58b0/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
index 2be241a..14ecf94 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.storm.daemon.supervisor.timer;
import java.util.ArrayList;
@@ -33,10 +34,10 @@ import org.apache.storm.utils.Time;
public class SupervisorHeartbeat implements Runnable {
- private final IStormClusterState stormClusterState;
- private final String supervisorId;
- private final Map<String, Object> conf;
- private final Supervisor supervisor;
+ private final IStormClusterState stormClusterState;
+ private final String supervisorId;
+ private final Map<String, Object> conf;
+ private final Supervisor supervisor;
public SupervisorHeartbeat(Map<String, Object> conf, Supervisor supervisor) {
this.stormClusterState = supervisor.getStormClusterState();
@@ -50,17 +51,19 @@ public class SupervisorHeartbeat implements Runnable {
supervisorInfo.set_time_secs(Time.currentTimeSecs());
supervisorInfo.set_hostname(supervisor.getHostName());
supervisorInfo.set_assignment_id(supervisor.getAssignmentId());
+ supervisorInfo.set_server_port(supervisor.getThriftServerPort());
List<Long> usedPorts = new ArrayList<>();
usedPorts.addAll(supervisor.getCurrAssignment().get().keySet());
supervisorInfo.set_used_ports(usedPorts);
List metaDatas = (List)supervisor.getiSupervisor().getMetadata();
List<Long> portList = new ArrayList<>();
- if (metaDatas != null){
- for (Object data : metaDatas){
+ if (metaDatas != null) {
+ for (Object data : metaDatas) {
Integer port = ObjectReader.getInt(data);
- if (port != null)
+ if (port != null) {
portList.add(port.longValue());
+ }
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/0dac58b0/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SynchronizeAssignments.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SynchronizeAssignments.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SynchronizeAssignments.java
new file mode 100644
index 0000000..ba8d133
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SynchronizeAssignments.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.daemon.supervisor.timer;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.daemon.supervisor.ReadClusterState;
+import org.apache.storm.daemon.supervisor.Supervisor;
+import org.apache.storm.generated.Assignment;
+import org.apache.storm.generated.SupervisorAssignments;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.NimbusClient;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A runnable which will synchronize assignments to node local and then worker processes.
+ */
+public class SynchronizeAssignments implements Runnable {
+ private static final Logger LOG = LoggerFactory.getLogger(SynchronizeAssignments.class);
+
+ private Supervisor supervisor;
+ private SupervisorAssignments assignments;
+ private ReadClusterState readClusterState;
+
+ /**
+ * Constructor.
+ * @param supervisor {@link Supervisor}
+ * @param assignments {@link SupervisorAssignments}
+ * @param readClusterState {@link ReadClusterState}
+ */
+ public SynchronizeAssignments(Supervisor supervisor, SupervisorAssignments assignments, ReadClusterState readClusterState) {
+ this.supervisor = supervisor;
+ this.assignments = assignments;
+ this.readClusterState = readClusterState;
+ }
+
+ @Override
+ public void run() {
+ // first sync assignments to local, then sync processes.
+ if (null == assignments) {
+ getAssignmentsFromMaster(this.supervisor.getConf(), this.supervisor.getStormClusterState(), this.supervisor.getAssignmentId());
+ } else {
+ assignedAssignmentsToLocal(this.supervisor.getStormClusterState(), assignments);
+ }
+ this.readClusterState.run();
+ }
+
+ /**
+ * Used by {@link Supervisor} to fetch assignments when start up.
+ * @param supervisor {@link Supervisor}
+ */
+ public void getAssignmentsFromMasterUntilSuccess(Supervisor supervisor) {
+ boolean success = false;
+ while (!success) {
+ try (NimbusClient master = NimbusClient.getConfiguredClient(supervisor.getConf())) {
+ SupervisorAssignments assignments = master.getClient().getSupervisorAssignments(supervisor.getAssignmentId());
+ assignedAssignmentsToLocal(supervisor.getStormClusterState(), assignments);
+ success = true;
+ } catch (Exception t) {
+ // just ignore the exception
+ }
+ if (!success) {
+ LOG.info("Waiting for a success sync of assignments from master...");
+ try {
+ Time.sleep(5000L);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ }
+
+ }
+
+ /**
+ * Used by {@link Supervisor} to fetch assignments when start up.
+ * @param conf config
+ * @param clusterState {@link IStormClusterState}
+ * @param node id of node
+ */
+ public void getAssignmentsFromMaster(Map conf, IStormClusterState clusterState, String node) {
+ if (ConfigUtils.isLocalMode(conf)) {
+ try {
+ SupervisorAssignments assignments = this.supervisor.getLocalNimbus().getSupervisorAssignments(node);
+ assignedAssignmentsToLocal(clusterState, assignments);
+ } catch (TException e) {
+ LOG.error("Get assignments from local master exception", e);
+ }
+ } else {
+ try (NimbusClient master = NimbusClient.getConfiguredClient(conf)) {
+ SupervisorAssignments assignments = master.getClient().getSupervisorAssignments(node);
+ LOG.debug("Sync an assignments from master, will start to sync with assignments: {}", assignments);
+ assignedAssignmentsToLocal(clusterState, assignments);
+ } catch (Exception t) {
+ LOG.error("Get assignments from master exception", t);
+ }
+ }
+ }
+
+ private static void assignedAssignmentsToLocal(IStormClusterState clusterState, SupervisorAssignments assignments) {
+ if (null == assignments) {
+ //unknown error, just skip
+ return;
+ }
+ Map<String, byte[]> serAssignments = new HashMap<>();
+ for (Map.Entry<String, Assignment> entry : assignments.get_storm_assignment().entrySet()) {
+ serAssignments.put(entry.getKey(), Utils.serialize(entry.getValue()));
+ }
+ clusterState.syncRemoteAssignments(serAssignments);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/0dac58b0/storm-server/src/main/java/org/apache/storm/nimbus/AssignmentDistributionService.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/nimbus/AssignmentDistributionService.java b/storm-server/src/main/java/org/apache/storm/nimbus/AssignmentDistributionService.java
new file mode 100644
index 0000000..558e570
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/nimbus/AssignmentDistributionService.java
@@ -0,0 +1,312 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.nimbus;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.storm.DaemonConfig;
+import org.apache.storm.daemon.supervisor.Supervisor;
+import org.apache.storm.generated.SupervisorAssignments;
+import org.apache.storm.scheduler.SupervisorDetails;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.SupervisorClient;
+import org.apache.storm.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A service for distributing master assignments to supervisors, this service makes the assignments notification
+ * asynchronous.
+ *
+ * <p>We support multiple working threads to distribute assignment, every thread has a queue buffer.
+ *
+ * <p>Master will shuffle its node request to the queues, if the target queue is full, we just discard the request,
+ * let the supervisors sync instead.
+ *
+ * <p>Caution: this class is not thread safe.
+ *
+ * <pre>{@code
+ * Working mode
+ * +--------+ +-----------------+
+ * | queue1 | ==> | Working thread1 |
+ * +--------+ shuffle +--------+ +-----------------+
+ * | Master | ==>
+ * +--------+ +--------+ +-----------------+
+ * | queue2 | ==> | Working thread2 |
+ * +--------+ +-----------------+
+ * }
+ * </pre>
+ */
+public class AssignmentDistributionService implements Closeable {
+ private static final Logger LOG = LoggerFactory.getLogger(AssignmentDistributionService.class);
+ private ExecutorService service;
+ /**
+ * Flag to indicate if the service is active.
+ */
+ private volatile boolean active = false;
+
+ private Random random;
+ /**
+ * Working threads num.
+ */
+ private int threadsNum = 0;
+ /**
+ * Working thread queue size.
+ */
+ private int queueSize = 0;
+
+ /**
+ * Assignments request queue.
+ */
+ private volatile Map<Integer, LinkedBlockingQueue<NodeAssignments>> assignmentsQueue;
+
+ /**
+ * local supervisors for local cluster assignments distribution.
+ */
+ private Map<String, Supervisor> localSupervisors;
+
+ private Map conf;
+
+ private boolean isLocalMode = false; // boolean cache for local mode decision
+
+ /**
+ * Function for initialization.
+ *
+ * @param conf config
+ */
+ public void prepare(Map conf) {
+ this.conf = conf;
+ this.random = new Random(47);
+
+ this.threadsNum = ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_ASSIGNMENTS_SERVICE_THREADS), 10);
+ this.queueSize = ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_ASSIGNMENTS_SERVICE_THREAD_QUEUE_SIZE), 100);
+
+ this.assignmentsQueue = new HashMap<>();
+ for (int i = 0; i < threadsNum; i++) {
+ this.assignmentsQueue.put(i, new LinkedBlockingQueue<NodeAssignments>(queueSize));
+ }
+ //start the thread pool
+ this.service = Executors.newFixedThreadPool(threadsNum);
+ this.active = true;
+ //start the threads
+ for (int i = 0; i < threadsNum; i++) {
+ this.service.submit(new DistributeTask(this, i));
+ }
+ // for local cluster
+ localSupervisors = new HashMap<>();
+ if (ConfigUtils.isLocalMode(conf)) {
+ isLocalMode = true;
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ this.active = false;
+ this.service.shutdownNow();
+ try {
+ this.service.awaitTermination(1L, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ LOG.error("Failed to close assignments distribute service");
+ }
+ this.assignmentsQueue = null;
+ }
+
+ /**
+ * Add an assignments for a node/supervisor for distribution.
+ * @param node node id of supervisor.
+ * @param host host name for the node.
+ * @param serverPort node thrift server port.
+ * @param assignments the {@link org.apache.storm.generated.SupervisorAssignments}
+ */
+ public void addAssignmentsForNode(String node, String host, Integer serverPort, SupervisorAssignments assignments) {
+ try {
+ //For some reasons, we can not get supervisor port info, eg: supervisor shutdown,
+ //Just skip for this scheduling round.
+ if (serverPort == null) {
+ LOG.warn("Discard an assignment distribution for node {} because server port info is missing.", node);
+ return;
+ }
+
+ boolean success = nextQueue().offer(NodeAssignments.getInstance(node, host, serverPort, assignments), 5L, TimeUnit.SECONDS);
+ if (!success) {
+ LOG.warn("Discard an assignment distribution for node {} because the target sub queue is full.", node);
+ }
+
+ } catch (InterruptedException e) {
+ LOG.error("Add node assignments interrupted: {}", e.getMessage());
+ throw new RuntimeException(e);
+ }
+ }
+
+ static class NodeAssignments {
+ private String node;
+ private String host;
+ private Integer serverPort;
+ private SupervisorAssignments assignments;
+
+ private NodeAssignments(String node, String host, Integer serverPort, SupervisorAssignments assignments) {
+ this.node = node;
+ this.host = host;
+ this.serverPort = serverPort;
+ this.assignments = assignments;
+ }
+
+ public static NodeAssignments getInstance(String node, String host, Integer serverPort,
+ SupervisorAssignments assignments) {
+ return new NodeAssignments(node, host, serverPort, assignments);
+ }
+
+ //supervisor assignment id/supervisor id
+ public String getNode() {
+ return this.node;
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public Integer getServerPort() {
+ return serverPort;
+ }
+
+ public SupervisorAssignments getAssignments() {
+ return this.assignments;
+ }
+
+ }
+
+ /**
+ * Task to distribute assignments.
+ */
+ static class DistributeTask implements Runnable {
+ private AssignmentDistributionService service;
+ private Integer queueIndex;
+
+ DistributeTask(AssignmentDistributionService service, Integer index) {
+ this.service = service;
+ this.queueIndex = index;
+ }
+
+ @Override
+ public void run() {
+ while (service.isActive()) {
+ try {
+ NodeAssignments nodeAssignments = this.service.nextAssignments(queueIndex);
+ sendAssignmentsToNode(nodeAssignments);
+ } catch (InterruptedException e) {
+ if (service.isActive()) {
+ LOG.error("Get an unexpected interrupt when distributing assignments to node, {}", e.getCause());
+ } else {
+ // service is off now just interrupt it.
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ }
+
+ private void sendAssignmentsToNode(NodeAssignments assignments) {
+ if (this.service.isLocalMode) {
+ //local node
+ Supervisor supervisor = this.service.localSupervisors.get(assignments.getNode());
+ if (supervisor != null) {
+ supervisor.sendSupervisorAssignments(assignments.getAssignments());
+ } else {
+ LOG.error("Can not find node {} for assignments distribution", assignments.getNode());
+ throw new RuntimeException("null for node " + assignments.getNode() + " supervisor instance.");
+ }
+ } else {
+ // distributed mode
+ try (SupervisorClient client = SupervisorClient.getConfiguredClient(service.getConf(),
+ assignments.getHost(), assignments.getServerPort())){
+ try {
+ client.getClient().sendSupervisorAssignments(assignments.getAssignments());
+ } catch (Exception e) {
+ //just ignore the exception.
+ LOG.error("Exception when trying to send assignments to node {}: {}", assignments.getNode(), e.getMessage());
+ }
+ } catch (Throwable e) {
+ //just ignore any error/exception.
+ LOG.error("Exception to create supervisor client for node {}: {}", assignments.getNode(), e.getMessage());
+ }
+
+ }
+ }
+ }
+
+ public void addLocalSupervisor(Supervisor supervisor) {
+ this.localSupervisors.put(supervisor.getId(), supervisor);
+ }
+
+ private Integer nextQueueId() {
+ return this.random.nextInt(threadsNum);
+ }
+
+ private LinkedBlockingQueue<NodeAssignments> nextQueue() {
+ return this.assignmentsQueue.get(nextQueueId());
+ }
+
+ private LinkedBlockingQueue<NodeAssignments> getQueueById(Integer queueIndex) {
+ return this.assignmentsQueue.get(queueIndex);
+ }
+
+ /**
+ * Get an assignments from the target queue with the specific index.
+ * @param queueIndex index of the queue
+ * @return an {@link NodeAssignments}
+ * @throws InterruptedException
+ */
+ public NodeAssignments nextAssignments(Integer queueIndex) throws InterruptedException {
+ NodeAssignments target = null;
+ while (true) {
+ target = getQueueById(queueIndex).poll();
+ if (target != null) {
+ return target;
+ }
+ Time.sleep(100L);
+ }
+ }
+
+ public boolean isActive() {
+ return this.active;
+ }
+
+ public Map getConf() {
+ return this.conf;
+ }
+
+ /**
+ * Factory method for initialize a instance.
+ * @param conf config.
+ * @return an instance of {@link AssignmentDistributionService}
+ */
+ public static AssignmentDistributionService getInstance(Map conf) {
+ AssignmentDistributionService service = new AssignmentDistributionService();
+ service.prepare(conf);
+ return service;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/0dac58b0/storm-server/src/main/java/org/apache/storm/nimbus/ILeaderElector.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/nimbus/ILeaderElector.java b/storm-server/src/main/java/org/apache/storm/nimbus/ILeaderElector.java
index c4a0f64..e362fbc 100644
--- a/storm-server/src/main/java/org/apache/storm/nimbus/ILeaderElector.java
+++ b/storm-server/src/main/java/org/apache/storm/nimbus/ILeaderElector.java
@@ -47,19 +47,19 @@ public interface ILeaderElector extends Closeable {
void removeFromLeaderLockQueue() throws Exception;
/**
- *
+ * Decide if the caller currently has the leader lock.
* @return true if the caller currently has the leader lock.
*/
boolean isLeader() throws Exception;
/**
- *
+ * Get the current leader's address.
* @return the current leader's address , may return null if no one has the lock.
*/
NimbusInfo getLeader();
/**
- *
+ * Get list of current nimbus addresses.
* @return list of current nimbus addresses, includes leader.
*/
List<NimbusInfo> getAllNimbuses()throws Exception;
http://git-wip-us.apache.org/repos/asf/storm/blob/0dac58b0/storm-server/src/main/java/org/apache/storm/nimbus/IWorkerHeartbeatsRecoveryStrategy.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/nimbus/IWorkerHeartbeatsRecoveryStrategy.java b/storm-server/src/main/java/org/apache/storm/nimbus/IWorkerHeartbeatsRecoveryStrategy.java
new file mode 100644
index 0000000..bde58dc
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/nimbus/IWorkerHeartbeatsRecoveryStrategy.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.nimbus;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Interface for strategy to recover heartbeats when master gains leadership.
+ */
+public interface IWorkerHeartbeatsRecoveryStrategy {
+
+ /**
+ * Function to prepare the strategy.
+ * @param conf config
+ */
+ void prepare(Map conf);
+
+ /**
+ * Function to decide if the heartbeats is ready.
+ * @param nodeIds all the node ids from current physical plan[assignments], read from {@code ClusterState}
+ * @return true if all node worker heartbeats reported
+ */
+ boolean isReady(Set<String> nodeIds);
+
+ /**
+ * report the node id to this strategy to help to decide {@code isReady}.
+ * @param nodeId the node id from reported SupervisorWorkerHeartbeats
+ */
+ void reportNodeId(String nodeId);
+}
+
http://git-wip-us.apache.org/repos/asf/storm/blob/0dac58b0/storm-server/src/main/java/org/apache/storm/nimbus/LeaderListenerCallback.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/nimbus/LeaderListenerCallback.java b/storm-server/src/main/java/org/apache/storm/nimbus/LeaderListenerCallback.java
new file mode 100644
index 0000000..59cd462
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/nimbus/LeaderListenerCallback.java
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.nimbus;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Sets;
+
+import javax.security.auth.Subject;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.leader.LeaderLatch;
+import org.apache.storm.Config;
+import org.apache.storm.blobstore.BlobStore;
+import org.apache.storm.blobstore.InputStreamWithMeta;
+import org.apache.storm.cluster.ClusterUtils;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.daemon.nimbus.TopoCache;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.KeyNotFoundException;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.security.auth.ReqContext;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.zookeeper.ClientZookeeper;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.data.ACL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A callback function when nimbus gains leadership.
+ */
+public class LeaderListenerCallback {
+ private static final Logger LOG = LoggerFactory.getLogger(LeaderListenerCallback.class);
+
+ private final BlobStore blobStore;
+ private final TopoCache tc;
+ private final IStormClusterState clusterState;
+
+ private final CuratorFramework zk;
+ private final LeaderLatch leaderLatch;
+
+ private final Map conf;
+ private final List<ACL> acls;
+
+ private static final String STORM_JAR_SUFFIX = "-stormjar.jar";
+ private static final String STORM_CODE_SUFFIX = "-stormcode.ser";
+ private static final String STORM_CONF_SUFFIX = "-stormconf.ser";
+
+ /**
+ * Constructor for {@LeaderListenerCallback}.
+ * @param conf config
+ * @param zk zookeeper CuratorFramework client
+ * @param leaderLatch LeaderLatch
+ * @param blobStore BlobStore
+ * @param tc TopoCache
+ * @param clusterState IStormClusterState
+ * @param acls zookeeper acls
+ */
+ public LeaderListenerCallback(Map conf, CuratorFramework zk, LeaderLatch leaderLatch, BlobStore blobStore,
+ TopoCache tc, IStormClusterState clusterState, List<ACL> acls) {
+ this.blobStore = blobStore;
+ this.tc = tc;
+ this.clusterState = clusterState;
+ this.zk = zk;
+ this.leaderLatch = leaderLatch;
+ this.conf = conf;
+ this.acls = acls;
+ }
+
+ /**
+ * Invoke when gains leadership.
+ */
+ public void leaderCallBack() {
+ //set up nimbus-info to zk
+ setUpNimbusInfo(acls);
+ //sync zk assignments/id-info to local
+ LOG.info("Sync remote assignments and id-info to local");
+ clusterState.syncRemoteAssignments(null);
+ clusterState.syncRemoteIds(null);
+ clusterState.setAssignmentsBackendSynchronized();
+
+ Set<String> activeTopologyIds = new TreeSet<>(ClientZookeeper.getChildren(zk,
+ ClusterUtils.STORMS_SUBTREE, false));
+
+ Set<String> activeTopologyBlobKeys = populateTopologyBlobKeys(activeTopologyIds);
+ Set<String> activeTopologyCodeKeys = filterTopologyCodeKeys(activeTopologyBlobKeys);
+ Set<String> allLocalBlobKeys = Sets.newHashSet(blobStore.listKeys());
+ Set<String> allLocalTopologyBlobKeys = filterTopologyBlobKeys(allLocalBlobKeys);
+
+ // this finds all active topologies blob keys from all local topology blob keys
+ Sets.SetView<String> diffTopology = Sets.difference(activeTopologyBlobKeys, allLocalTopologyBlobKeys);
+ LOG.info("active-topology-blobs [{}] local-topology-blobs [{}] diff-topology-blobs [{}]",
+ generateJoinedString(activeTopologyIds), generateJoinedString(allLocalTopologyBlobKeys),
+ generateJoinedString(diffTopology));
+
+ if (diffTopology.isEmpty()) {
+ Set<String> activeTopologyDependencies = getTopologyDependencyKeys(activeTopologyCodeKeys);
+
+ // this finds all dependency blob keys from active topologies from all local blob keys
+ Sets.SetView<String> diffDependencies = Sets.difference(activeTopologyDependencies, allLocalBlobKeys);
+ LOG.info("active-topology-dependencies [{}] local-blobs [{}] diff-topology-dependencies [{}]",
+ generateJoinedString(activeTopologyDependencies), generateJoinedString(allLocalBlobKeys),
+ generateJoinedString(diffDependencies));
+
+ if (diffDependencies.isEmpty()) {
+ LOG.info("Accepting leadership, all active topologies and corresponding dependencies found locally.");
+ tc.clear();
+ } else {
+ LOG.info("Code for all active topologies is available locally, but some dependencies are not found locally, "
+ + "giving up leadership.");
+ closeLatch();
+ }
+ } else {
+ LOG.info("code for all active topologies not available locally, giving up leadership.");
+ closeLatch();
+ }
+ }
+
+ /**
+ * Invoke when lost leadership.
+ */
+ public void notLeaderCallback() {
+ tc.clear();
+ }
+
+ private void setUpNimbusInfo(List<ACL> acls) {
+ String leaderInfoPath = conf.get(Config.STORM_ZOOKEEPER_ROOT) + ClusterUtils.LEADERINFO_SUBTREE;
+ NimbusInfo nimbusInfo = NimbusInfo.fromConf(conf);
+ if (ClientZookeeper.existsNode(zk, leaderInfoPath, false)) {
+ ClientZookeeper.setData(zk, leaderInfoPath, Utils.javaSerialize(nimbusInfo));
+ } else {
+ ClientZookeeper.createNode(zk, leaderInfoPath, Utils.javaSerialize(nimbusInfo), CreateMode.PERSISTENT, acls);
+ }
+ }
+
+ private String generateJoinedString(Set<String> activeTopologyIds) {
+ return Joiner.on(",").join(activeTopologyIds);
+ }
+
+ private Set<String> populateTopologyBlobKeys(Set<String> activeTopologyIds) {
+ Set<String> activeTopologyBlobKeys = new TreeSet<>();
+ for (String activeTopologyId : activeTopologyIds) {
+ activeTopologyBlobKeys.add(activeTopologyId + STORM_JAR_SUFFIX);
+ activeTopologyBlobKeys.add(activeTopologyId + STORM_CODE_SUFFIX);
+ activeTopologyBlobKeys.add(activeTopologyId + STORM_CONF_SUFFIX);
+ }
+ return activeTopologyBlobKeys;
+ }
+
+ private Set<String> filterTopologyBlobKeys(Set<String> blobKeys) {
+ Set<String> topologyBlobKeys = new HashSet<>();
+ for (String blobKey : blobKeys) {
+ if (blobKey.endsWith(STORM_JAR_SUFFIX)
+ || blobKey.endsWith(STORM_CODE_SUFFIX)
+ || blobKey.endsWith(STORM_CONF_SUFFIX)) {
+ topologyBlobKeys.add(blobKey);
+ }
+ }
+ return topologyBlobKeys;
+ }
+
+ private Set<String> filterTopologyCodeKeys(Set<String> blobKeys) {
+ Set<String> topologyCodeKeys = new HashSet<>();
+ for (String blobKey : blobKeys) {
+ if (blobKey.endsWith(STORM_CODE_SUFFIX)) {
+ topologyCodeKeys.add(blobKey);
+ }
+ }
+ return topologyCodeKeys;
+ }
+
+ private Set<String> getTopologyDependencyKeys(Set<String> activeTopologyCodeKeys) {
+ Set<String> activeTopologyDependencies = new TreeSet<>();
+ Subject subject = ReqContext.context().subject();
+
+ for (String activeTopologyCodeKey : activeTopologyCodeKeys) {
+ try {
+ InputStreamWithMeta blob = blobStore.getBlob(activeTopologyCodeKey, subject);
+ byte[] blobContent = IOUtils.readFully(blob, new Long(blob.getFileLength()).intValue());
+ StormTopology stormCode = Utils.deserialize(blobContent, StormTopology.class);
+ if (stormCode.is_set_dependency_jars()) {
+ activeTopologyDependencies.addAll(stormCode.get_dependency_jars());
+ }
+ if (stormCode.is_set_dependency_artifacts()) {
+ activeTopologyDependencies.addAll(stormCode.get_dependency_artifacts());
+ }
+ } catch (AuthorizationException | KeyNotFoundException | IOException e) {
+ LOG.error("Exception occurs while reading blob for key: "
+ + activeTopologyCodeKey
+ + ", exception: "
+ + e, e);
+ throw new RuntimeException("Exception occurs while reading blob for key: "
+ + activeTopologyCodeKey
+ + ", exception: " + e, e);
+ }
+ }
+ return activeTopologyDependencies;
+ }
+
+ private void closeLatch() {
+ try {
+ leaderLatch.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/0dac58b0/storm-server/src/main/java/org/apache/storm/nimbus/NimbusHeartbeatsPressureTest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/nimbus/NimbusHeartbeatsPressureTest.java b/storm-server/src/main/java/org/apache/storm/nimbus/NimbusHeartbeatsPressureTest.java
new file mode 100644
index 0000000..f725f5d
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/nimbus/NimbusHeartbeatsPressureTest.java
@@ -0,0 +1,241 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.nimbus;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.storm.Config;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.SupervisorWorkerHeartbeat;
+import org.apache.storm.generated.SupervisorWorkerHeartbeats;
+import org.apache.storm.utils.NimbusClient;
+import org.apache.storm.utils.Utils;
+import org.apache.thrift.TException;
+import org.apache.thrift.transport.TTransportException;
+
+/**
+ * Test for nimbus heartbeats max throughput, This is a client to collect the statistics.
+ */
+public class NimbusHeartbeatsPressureTest {
+ /**
+ * the args below can be configured.
+ */
+ private static String NIMBUS_HOST = "localhost";
+ private static int NIMBUS_PORT = 6627;
+
+ private static int THREADS_NUM = 50;
+ private static int THREAD_SUBMIT_NUM = 1;
+ private static int MOCKED_STORM_NUM = 5000;
+ private static volatile boolean[] readyFlags = new boolean[THREADS_NUM];
+
+ static {
+ for (int i = 0; i < THREADS_NUM; i++) {
+ readyFlags[i] = false;
+ }
+ }
+
+ private static Random rand = new Random(47);
+ private static List<double[]> totalCostTimesBook = new ArrayList<>();
+
+ /**
+ * Initialize a fake config.
+ * @return conf
+ */
+ private static Config initializedConfig() {
+ Config conf = new Config();
+ conf.putAll(Utils.readDefaultConfig());
+ ArrayList<String> nimbusSeeds = new ArrayList<>();
+ nimbusSeeds.add(NIMBUS_HOST);
+
+ conf.put(Config.NIMBUS_SEEDS, nimbusSeeds);
+ conf.put(Config.NIMBUS_THRIFT_PORT, NIMBUS_PORT);
+ return conf;
+ }
+
+ /**
+ * Test max throughput with the specific config args.
+ */
+ public static void testMaxThroughput() {
+ ExecutorService service = Executors.newFixedThreadPool(THREADS_NUM);
+
+ long submitStart = System.currentTimeMillis();
+ for (int i = 0; i < THREADS_NUM; i++) {
+ service.submit(new HeartbeatSendTask(i, THREAD_SUBMIT_NUM));
+ }
+ long submitEnd = System.currentTimeMillis();
+ println(THREADS_NUM + " tasks, " + THREAD_SUBMIT_NUM * THREADS_NUM + " submit cost "
+ + (submitEnd - submitStart) / 1000D + "seconds");
+ long totalStart = System.currentTimeMillis();
+ while (!allTasksReady()) {
+ try {
+
+ Thread.sleep(10L);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ long totalEnd = System.currentTimeMillis();
+ println(THREADS_NUM + " tasks, " + THREAD_SUBMIT_NUM * THREADS_NUM
+ + " requests cost " + (totalEnd - totalStart) / 1000D + "seconds");
+ printStatistics(totalCostTimesBook);
+ try {
+ service.shutdownNow();
+ service.awaitTermination(3, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ }
+
+ private static boolean allTasksReady() {
+ for (boolean ready : readyFlags) {
+ if (!ready) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private static void println(Object msg) {
+ if (msg instanceof Collection) {
+ Iterator itr = ((Collection) msg).iterator();
+ while (itr.hasNext()) {
+ System.out.println(itr.next());
+ }
+ } else {
+ System.out.println(msg);
+ }
+ }
+
+ static class HeartbeatSendTask implements Runnable {
+ private double[] runtimesBook;
+ private int taskId;
+ private int tryTimes;
+ private NimbusClient client;
+
+ public HeartbeatSendTask(int taskId, int tryTimes) {
+ this.taskId = taskId;
+ this.tryTimes = tryTimes;
+ this.runtimesBook = new double[tryTimes];
+ try {
+ client = new NimbusClient(initializedConfig(), NIMBUS_HOST, NIMBUS_PORT, null, null);
+ } catch (TTransportException e) {
+ e.printStackTrace();
+ }
+ }
+
+ private static SupervisorWorkerHeartbeat nextMockedWorkerbeat() {
+ List<ExecutorInfo> executorInfos = new ArrayList<>();
+ executorInfos.add(new ExecutorInfo(1, 1));
+ executorInfos.add(new ExecutorInfo(2, 2));
+ executorInfos.add(new ExecutorInfo(3, 3));
+ executorInfos.add(new ExecutorInfo(4, 4));
+ SupervisorWorkerHeartbeat heartbeat = new SupervisorWorkerHeartbeat();
+ heartbeat.set_executors(executorInfos);
+ // generate a random storm id
+ heartbeat.set_storm_id("storm_name_example_" + rand.nextInt(MOCKED_STORM_NUM));
+ heartbeat.set_time_secs(1221212121);
+ return heartbeat;
+ }
+
+ private static SupervisorWorkerHeartbeats mockedHeartbeats() {
+ SupervisorWorkerHeartbeats heartbeats = new SupervisorWorkerHeartbeats();
+ heartbeats.set_supervisor_id("123124134123413412341351234143");
+ List<SupervisorWorkerHeartbeat> workers = new ArrayList<>();
+ for (int i = 0; i < 25; i++) {
+ workers.add(nextMockedWorkerbeat());
+ }
+ heartbeats.set_worker_heartbeats(workers);
+ return heartbeats;
+ }
+
+ @Override
+ public void run() {
+ try {
+ for (int i = 0; i < tryTimes; i++) {
+ long thisBegin = System.currentTimeMillis();
+ client.getClient().sendSupervisorWorkerHeartbeats(mockedHeartbeats());
+ long thisEnd = System.currentTimeMillis();
+ this.runtimesBook[i] = (thisEnd - thisBegin) / 1000D;
+ }
+ totalCostTimesBook.add(this.runtimesBook);
+ readyFlags[taskId] = true;
+ Thread.currentThread().interrupt();
+ } catch (TException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ private static void printTimeCostArray(Double[] array) {
+ StringBuilder builder = new StringBuilder();
+ builder.append("[");
+ for (int i = 0; i < array.length; i++) {
+ if (i != array.length - 1) {
+ builder.append(array[i] + ",");
+ } else {
+ builder.append(array[i] + "");
+ }
+ }
+ builder.append("]");
+ System.out.println(builder.toString());
+ }
+
+ private static void printStatistics(List<double[]> data) {
+
+ List<Double> totalPoints = new ArrayList<>();
+ double total = 0D;
+ for (double[] item : data) {
+ for (Double point : item) {
+ if (point != null) {
+ totalPoints.add(point);
+ total += point;
+ }
+ }
+ }
+ Double[] totalPointsArray = new Double[totalPoints.size()];
+
+ totalPoints.toArray(totalPointsArray);
+ Arrays.sort(totalPointsArray);
+ // printTimeCostArray(totalPointsArray);
+ println("===== statistics ================");
+ println("===== min time cost: " + totalPointsArray[0] + " =====");
+ println("===== max time cost: " + totalPointsArray[totalPointsArray.length - 2] + " =====");
+
+ double meanVal = total / totalPointsArray.length;
+ println("===== mean time cost: " + meanVal + " =====");
+ int middleIndex = (int) (totalPointsArray.length * 0.5);
+ println("===== median time cost: " + totalPointsArray[middleIndex] + " =====");
+ int top90Index = (int) (totalPointsArray.length * 0.9);
+ println("===== top90 time cost: " + totalPointsArray[top90Index] + " =====");
+ }
+
+ public static void main(String[] args) {
+ testMaxThroughput();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/0dac58b0/storm-server/src/main/java/org/apache/storm/nimbus/TimeOutWorkerHeartbeatsRecoveryStrategy.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/nimbus/TimeOutWorkerHeartbeatsRecoveryStrategy.java b/storm-server/src/main/java/org/apache/storm/nimbus/TimeOutWorkerHeartbeatsRecoveryStrategy.java
new file mode 100644
index 0000000..78f880a
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/nimbus/TimeOutWorkerHeartbeatsRecoveryStrategy.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.nimbus;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.storm.Config;
+import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.Time;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.util.stream.Collectors.toSet;
+
+/**
+ * Wait for a node to report worker heartbeats until a configured timeout. For cases below we have strategies:
+ *
+ * <p>1: When nimbus gains leader ship, it will decide if the heartbeats are ready based on the reported node ids,
+ * supervisors/nodes will take care of the worker heartbeats recovery, a reported node id means all the workers
+ * heartbeats on the node are reported.
+ *
+ * <p>2: If several supervisor also crush and will never recover[or all crush for some unknown reason],
+ * workers will report their heartbeats directly to master, so it has not any effect.
+ */
+public class TimeOutWorkerHeartbeatsRecoveryStrategy implements IWorkerHeartbeatsRecoveryStrategy {
+ private static final Logger LOG = LoggerFactory.getLogger(TimeOutWorkerHeartbeatsRecoveryStrategy.class);
+
+ private static int NODE_MAX_TIMEOUT_SECS = 600;
+
+ private long startTimeSecs;
+
+ private Set<String> reportedIds;
+
+ @Override
+ public void prepare(Map conf) {
+ NODE_MAX_TIMEOUT_SECS = ObjectReader.getInt(conf.get(Config.SUPERVISOR_WORKER_HEARTBEATS_MAX_TIMEOUT_SECS), 600);
+ this.startTimeSecs = Time.currentTimeMillis() / 1000L;
+ this.reportedIds = new HashSet<>();
+ }
+
+ @Override
+ public boolean isReady(Set<String> nodeIds) {
+ if (exceedsMaxTimeOut()) {
+ Set<String> tmp = nodeIds.stream().filter(id -> !this.reportedIds.contains(id)).collect(toSet());
+ LOG.warn("Failed to recover heartbeats for nodes: {} with timeout {}s", tmp, NODE_MAX_TIMEOUT_SECS);
+ return true;
+ }
+
+ return nodeIds.stream().allMatch(id -> this.reportedIds.contains(id));
+ }
+
+ @Override
+ public void reportNodeId(String nodeId) {
+ this.reportedIds.add(nodeId);
+ }
+
+ private boolean exceedsMaxTimeOut() {
+ return (Time.currentTimeMillis() / 1000L - this.startTimeSecs) > NODE_MAX_TIMEOUT_SECS;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/0dac58b0/storm-server/src/main/java/org/apache/storm/nimbus/WorkerHeartbeatsRecoveryStrategyFactory.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/nimbus/WorkerHeartbeatsRecoveryStrategyFactory.java b/storm-server/src/main/java/org/apache/storm/nimbus/WorkerHeartbeatsRecoveryStrategyFactory.java
new file mode 100644
index 0000000..4befaf8
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/nimbus/WorkerHeartbeatsRecoveryStrategyFactory.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.nimbus;
+
+import com.google.common.base.Preconditions;
+import java.util.Map;
+import org.apache.storm.DaemonConfig;
+import org.apache.storm.utils.ReflectionUtils;
+
+/**
+ * Factory class for recovery strategy.
+ */
+public class WorkerHeartbeatsRecoveryStrategyFactory {
+
+ /**
+ * Get instance of {@link IWorkerHeartbeatsRecoveryStrategy} with conf.
+ * @param conf strategy config
+ * @return an instance of {@link IWorkerHeartbeatsRecoveryStrategy}
+ */
+ public static IWorkerHeartbeatsRecoveryStrategy getStrategy(Map<String, Object> conf) {
+ IWorkerHeartbeatsRecoveryStrategy strategy;
+ if (conf.get(DaemonConfig.NIMBUS_WORKER_HEARTBEATS_RECOVERY_STRATEGY_CLASS) != null) {
+ Object targetObj = ReflectionUtils.newInstance((String)
+ conf.get(DaemonConfig.NIMBUS_WORKER_HEARTBEATS_RECOVERY_STRATEGY_CLASS));
+ Preconditions.checkState(targetObj instanceof IWorkerHeartbeatsRecoveryStrategy,
+ "{} must implements IWorkerHeartbeatsRecoveryStrategy",
+ DaemonConfig.NIMBUS_WORKER_HEARTBEATS_RECOVERY_STRATEGY_CLASS);
+ strategy = ((IWorkerHeartbeatsRecoveryStrategy) targetObj);
+ } else {
+ strategy = new TimeOutWorkerHeartbeatsRecoveryStrategy();
+ }
+
+ strategy.prepare(conf);
+ return strategy;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/0dac58b0/storm-server/src/main/java/org/apache/storm/scheduler/SupervisorDetails.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/SupervisorDetails.java b/storm-server/src/main/java/org/apache/storm/scheduler/SupervisorDetails.java
index 242b54c..00b8ea0 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/SupervisorDetails.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/SupervisorDetails.java
@@ -32,6 +32,10 @@ public class SupervisorDetails {
private final String id;
/**
+ * thrift server of this supervisor.
+ */
+ private final Integer serverPort;
+ /**
* hostname of this supervisor.
*/
private final String host;
@@ -52,16 +56,18 @@ public class SupervisorDetails {
/**
* Create the details of a new supervisor.
* @param id the ID as reported by the supervisor.
+ * @param serverPort the thrift server for the supervisor.
* @param host the host the supervisor is on.
* @param meta meta data reported by the supervisor (should be a collection of the ports on the supervisor).
* @param schedulerMeta Not used and can probably be removed.
* @param allPorts all of the ports for the supervisor (a better version of meta)
* @param totalResources all of the resources for this supervisor.
*/
- public SupervisorDetails(String id, String host, Object meta, Object schedulerMeta,
+ public SupervisorDetails(String id, Integer serverPort, String host, Object meta, Object schedulerMeta,
Collection<? extends Number> allPorts, Map<String, Double> totalResources) {
this.id = id;
+ this.serverPort = serverPort;
this.host = host;
this.meta = meta;
this.schedulerMeta = schedulerMeta;
@@ -75,24 +81,29 @@ public class SupervisorDetails {
}
public SupervisorDetails(String id, Object meta) {
- this(id, null, meta, null, null, null);
+ this(id, null, null, meta, null, null, null);
}
public SupervisorDetails(String id, Object meta, Map<String, Double> totalResources) {
- this(id, null, meta, null, null, totalResources);
+ this(id, null, null, meta, null, null, totalResources);
}
public SupervisorDetails(String id, Object meta, Collection<? extends Number> allPorts) {
- this(id, null, meta, null, allPorts, null);
+ this(id, null, null, meta, null, allPorts, null);
}
public SupervisorDetails(String id, String host, Object schedulerMeta, Collection<? extends Number> allPorts) {
- this(id, host, null, schedulerMeta, allPorts, null);
+ this(id, null, host, null, schedulerMeta, allPorts, null);
}
public SupervisorDetails(String id, String host, Object schedulerMeta,
Collection<? extends Number> allPorts, Map<String, Double> totalResources) {
- this(id, host, null, schedulerMeta, allPorts, totalResources);
+ this(id, null, host, null, schedulerMeta, allPorts, totalResources);
+ }
+
+ public SupervisorDetails(String id, int serverPort, String host, Object schedulerMeta,
+ Collection<? extends Number> allPorts, Map<String, Double> totalResources) {
+ this(id, serverPort, host, null, schedulerMeta, allPorts, totalResources);
}
@Override
@@ -114,6 +125,10 @@ public class SupervisorDetails {
return id;
}
+ public int getServerPort() {
+ return serverPort;
+ }
+
public String getHost() {
return host;
}