You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2018/03/28 21:22:12 UTC

[02/10] storm git commit: [STORM-2693] Heartbeats and assignments promotion for storm2.0

http://git-wip-us.apache.org/repos/asf/storm/blob/0dac58b0/storm-server/src/main/java/org/apache/storm/daemon/supervisor/RunAsUserContainer.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/RunAsUserContainer.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/RunAsUserContainer.java
index 44008b2..2559d7a 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/RunAsUserContainer.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/RunAsUserContainer.java
@@ -34,17 +34,20 @@ import org.slf4j.LoggerFactory;
 public class RunAsUserContainer extends BasicContainer {
     private static final Logger LOG = LoggerFactory.getLogger(RunAsUserContainer.class);
 
-    public RunAsUserContainer(Container.ContainerType type, Map<String, Object> conf, String supervisorId, int port,
-                              LocalAssignment assignment, ResourceIsolationInterface resourceIsolationManager, LocalState localState,
+    public RunAsUserContainer(Container.ContainerType type, Map<String, Object> conf, String supervisorId,
+                              int supervisorPort, int port, LocalAssignment assignment,
+                              ResourceIsolationInterface resourceIsolationManager, LocalState localState,
                               String workerId) throws IOException {
-        this(type, conf, supervisorId, port, assignment, resourceIsolationManager, localState, workerId, null, null, null);
+        this(type, conf, supervisorId, supervisorPort, port, assignment, resourceIsolationManager, localState, workerId,
+                null, null, null);
     }
     
-    RunAsUserContainer(Container.ContainerType type, Map<String, Object> conf, String supervisorId, int port,
-                       LocalAssignment assignment, ResourceIsolationInterface resourceIsolationManager, LocalState localState,
-                       String workerId, Map<String, Object> topoConf, AdvancedFSOps ops, String profileCmd) throws IOException {
-        super(type, conf, supervisorId, port, assignment, resourceIsolationManager, localState, workerId, topoConf, ops,
-                profileCmd);
+    RunAsUserContainer(Container.ContainerType type, Map<String, Object> conf, String supervisorId, int supervisorPort,
+                       int port, LocalAssignment assignment, ResourceIsolationInterface resourceIsolationManager,
+                       LocalState localState, String workerId, Map<String, Object> topoConf, AdvancedFSOps ops,
+                       String profileCmd) throws IOException {
+        super(type, conf, supervisorId, supervisorPort, port, assignment, resourceIsolationManager, localState,
+                workerId, topoConf, ops, profileCmd);
         if (Utils.isOnWindows()) {
             throw new UnsupportedOperationException("ERROR: Windows doesn't support running workers as different users yet");
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/0dac58b0/storm-server/src/main/java/org/apache/storm/daemon/supervisor/RunAsUserContainerLauncher.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/RunAsUserContainerLauncher.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/RunAsUserContainerLauncher.java
index c8bee27..e6439db 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/RunAsUserContainerLauncher.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/RunAsUserContainerLauncher.java
@@ -28,18 +28,21 @@ import org.apache.storm.utils.LocalState;
 public class RunAsUserContainerLauncher extends ContainerLauncher {
     private final Map<String, Object> _conf;
     private final String _supervisorId;
+    private final int _supervisorPort;
     protected final ResourceIsolationInterface _resourceIsolationManager;
     
-    public RunAsUserContainerLauncher(Map<String, Object> conf, String supervisorId, ResourceIsolationInterface resourceIsolationManager) throws IOException {
+    public RunAsUserContainerLauncher(Map<String, Object> conf, String supervisorId, int supervisorPort,
+            ResourceIsolationInterface resourceIsolationManager) throws IOException {
         _conf = conf;
         _supervisorId = supervisorId;
+        _supervisorPort = supervisorPort;
         _resourceIsolationManager = resourceIsolationManager;
     }
 
     @Override
     public Container launchContainer(int port, LocalAssignment assignment, LocalState state) throws IOException {
-        Container container = new RunAsUserContainer(ContainerType.LAUNCH, _conf, _supervisorId, port, assignment,
-                _resourceIsolationManager, state, null, null, null, null);
+        Container container = new RunAsUserContainer(ContainerType.LAUNCH, _conf, _supervisorId, _supervisorPort, port,
+                assignment, _resourceIsolationManager, state, null, null, null, null);
         container.setup();
         container.launch();
         return container;
@@ -47,13 +50,13 @@ public class RunAsUserContainerLauncher extends ContainerLauncher {
 
     @Override
     public Container recoverContainer(int port, LocalAssignment assignment, LocalState state) throws IOException {
-        return new RunAsUserContainer(ContainerType.RECOVER_FULL, _conf, _supervisorId, port, assignment,
-                _resourceIsolationManager, state, null, null, null, null);
+        return new RunAsUserContainer(ContainerType.RECOVER_FULL, _conf, _supervisorId, _supervisorPort, port,
+                assignment, _resourceIsolationManager, state, null, null, null, null);
     }
     
     @Override
     public Killable recoverContainer(String workerId, LocalState localState) throws IOException {
-        return new RunAsUserContainer(ContainerType.RECOVER_PARTIAL, _conf, _supervisorId, -1, null,
+        return new RunAsUserContainer(ContainerType.RECOVER_PARTIAL, _conf, _supervisorId, _supervisorPort, -1, null,
                 _resourceIsolationManager, localState, workerId, null, null, null);
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/0dac58b0/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
index 6c90c0e..4e5b861 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
@@ -20,6 +20,8 @@ package org.apache.storm.daemon.supervisor;
 
 import java.io.File;
 import java.io.IOException;
+import java.net.BindException;
+import java.net.ServerSocket;
 import java.net.UnknownHostException;
 import java.util.Collection;
 import java.util.HashMap;
@@ -29,7 +31,10 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicReference;
+
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.io.FileUtils;
+import org.apache.storm.Config;
 import org.apache.storm.DaemonConfig;
 import org.apache.storm.StormTimer;
 import org.apache.storm.cluster.ClusterStateContext;
@@ -37,22 +42,41 @@ import org.apache.storm.cluster.ClusterUtils;
 import org.apache.storm.cluster.DaemonType;
 import org.apache.storm.cluster.IStormClusterState;
 import org.apache.storm.daemon.DaemonCommon;
+import org.apache.storm.daemon.StormCommon;
+import org.apache.storm.daemon.supervisor.timer.ReportWorkerHeartbeats;
 import org.apache.storm.daemon.supervisor.timer.SupervisorHealthCheck;
 import org.apache.storm.daemon.supervisor.timer.SupervisorHeartbeat;
+import org.apache.storm.daemon.supervisor.timer.SynchronizeAssignments;
 import org.apache.storm.event.EventManager;
 import org.apache.storm.event.EventManagerImp;
+import org.apache.storm.generated.Assignment;
+import org.apache.storm.generated.AuthorizationException;
 import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.generated.Nimbus;
+import org.apache.storm.generated.NotAliveException;
+import org.apache.storm.generated.SupervisorAssignments;
+import org.apache.storm.generated.SupervisorWorkerHeartbeat;
 import org.apache.storm.localizer.AsyncLocalizer;
+import org.apache.storm.logging.ThriftAccessLogger;
 import org.apache.storm.messaging.IContext;
 import org.apache.storm.metric.StormMetricsRegistry;
 import org.apache.storm.scheduler.ISupervisor;
+import org.apache.storm.security.auth.IAuthorizer;
+import org.apache.storm.security.auth.ReqContext;
+import org.apache.storm.security.auth.ThriftConnectionType;
+import org.apache.storm.security.auth.ThriftServer;
 import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.ServerConfigUtils;
+import org.apache.storm.utils.ObjectReader;
 import org.apache.storm.utils.LocalState;
 import org.apache.storm.utils.ObjectReader;
 import org.apache.storm.utils.ServerConfigUtils;
 import org.apache.storm.utils.Time;
 import org.apache.storm.utils.Utils;
 import org.apache.storm.utils.VersionInfo;
+import org.apache.thrift.TException;
+import org.apache.thrift.TProcessor;
+import org.apache.zookeeper.data.ACL;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -60,6 +84,7 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
     private static final Logger LOG = LoggerFactory.getLogger(Supervisor.class);
     private final Map<String, Object> conf;
     private final IContext sharedContext;
+    private final IAuthorizer authorizationHandler;
     private volatile boolean active;
     private final ISupervisor iSupervisor;
     private final Utils.UptimeComputer upTime;
@@ -68,10 +93,12 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
     private final LocalState localState;
     private final String supervisorId;
     private final String assignmentId;
+    private final int supervisorPort;
     private final String hostName;
     // used for reporting used ports when heartbeating
     private final AtomicReference<Map<Long, LocalAssignment>> currAssignment;
     private final StormTimer heartbeatTimer;
+    private final StormTimer workerHeartbeatTimer;
     private final StormTimer eventTimer;
     //Right now this is only used for sending metrics to nimbus,
     // but we may want to combine it with the heartbeatTimer at some point
@@ -80,12 +107,24 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
     private final AsyncLocalizer asyncLocalizer;
     private EventManager eventManager;
     private ReadClusterState readState;
+    private ThriftServer thriftServer;
+    //used for local cluster heartbeating
+    private Nimbus.Iface localNimbus;
     
-    private Supervisor(ISupervisor iSupervisor) throws IOException {
+    private Supervisor(ISupervisor iSupervisor)
+        throws IOException, IllegalAccessException, InstantiationException, ClassNotFoundException {
         this(Utils.readStormConfig(), null, iSupervisor);
     }
-    
-    public Supervisor(Map<String, Object> conf, IContext sharedContext, ISupervisor iSupervisor) throws IOException {
+
+    /**
+     * Constructor for supervisor daemon.
+     * @param conf config
+     * @param sharedContext {@link IContext}
+     * @param iSupervisor {@link ISupervisor}
+     * @throws IOException
+     */
+    public Supervisor(Map<String, Object> conf, IContext sharedContext, ISupervisor iSupervisor)
+        throws IOException, IllegalAccessException, ClassNotFoundException, InstantiationException{
         this.conf = conf;
         this.iSupervisor = iSupervisor;
         this.active = true;
@@ -93,11 +132,18 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
         this.stormVersion = VersionInfo.getVersion();
         this.sharedContext = sharedContext;
         this.heartbeatExecutor = Executors.newFixedThreadPool(1);
+        this.authorizationHandler = StormCommon.mkAuthorizationHandler(
+                                        (String) conf.get(DaemonConfig.SUPERVISOR_AUTHORIZER), conf);
+        if (authorizationHandler == null && conf.get(DaemonConfig.NIMBUS_AUTHORIZER) != null) {
+            throw new IllegalStateException("It looks like authorization is turned on for nimbus but not for the "
+                                                + "supervisor....");
+        }
         
         iSupervisor.prepare(conf, ServerConfigUtils.supervisorIsupervisorDir(conf));
 
         try {
-            this.stormClusterState = ClusterUtils.mkStormClusterState(conf, new ClusterStateContext(DaemonType.SUPERVISOR, conf));
+            this.stormClusterState = ClusterUtils.mkStormClusterState(conf,
+                new ClusterStateContext(DaemonType.SUPERVISOR, conf));
         } catch (Exception e) {
             LOG.error("supervisor can't create stormClusterState");
             throw Utils.wrapInRuntime(e);
@@ -113,6 +159,7 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
         }
         this.supervisorId = iSupervisor.getSupervisorId();
         this.assignmentId = iSupervisor.getAssignmentId();
+        this.supervisorPort = ObjectReader.getInt(conf.get(Config.SUPERVISOR_THRIFT_PORT));
 
         try {
             this.hostName = Utils.hostname();
@@ -122,6 +169,8 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
 
         this.heartbeatTimer = new StormTimer(null, new DefaultUncaughtExceptionHandler());
 
+        this.workerHeartbeatTimer = new StormTimer(null, new DefaultUncaughtExceptionHandler());
+
         this.eventTimer = new StormTimer(null, new DefaultUncaughtExceptionHandler());
     }
 
@@ -160,6 +209,10 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
         return stormClusterState;
     }
 
+    public ReadClusterState getReadClusterState() {
+        return readState;
+    }
+
     LocalState getLocalState() {
         return localState;
     }
@@ -168,6 +221,10 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
         return assignmentId;
     }
 
+    public int getThriftServerPort() {
+        return supervisorPort;
+    }
+
     public String getHostName() {
         return hostName;
     }
@@ -183,6 +240,18 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
     EventManager getEventManger() {
         return eventManager;
     }
+
+    Supervisor getSupervisor() {
+        return this;
+    }
+
+    public void setLocalNimbus(Nimbus.Iface nimbus) {
+        this.localNimbus = nimbus;
+    }
+
+    public Nimbus.Iface getLocalNimbus() {
+        return this.localNimbus;
+    }
     
     /**
      * Launch the supervisor.
@@ -206,11 +275,16 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
         if ((Boolean) conf.get(DaemonConfig.SUPERVISOR_ENABLE)) {
             // This isn't strictly necessary, but it doesn't hurt and ensures that the machine stays up
             // to date even if callbacks don't all work exactly right
-            eventTimer.scheduleRecurring(0, 10, new EventManagerPushCallback(readState, eventManager));
+            eventTimer.scheduleRecurring(0, 10,
+                    new EventManagerPushCallback(new SynchronizeAssignments(this, null, readState), eventManager));
 
             // supervisor health check
             eventTimer.scheduleRecurring(30, 30, new SupervisorHealthCheck(this));
         }
+
+        ReportWorkerHeartbeats reportWorkerHeartbeats = new ReportWorkerHeartbeats(conf, this);
+        Integer workerHeartbeatFrequency = ObjectReader.getInt(conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS));
+        workerHeartbeatTimer.scheduleRecurring(0, workerHeartbeatFrequency, reportWorkerHeartbeats);
         LOG.info("Starting supervisor with id {} at host {}.", getId(), getHostName());
     }
 
@@ -225,6 +299,8 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
                 throw new IllegalArgumentException("Cannot start server in local mode!");
             }
             launch();
+            //must invoke after launch cause some services must be initialized
+            launchSupervisorThriftServer(conf);
             Utils.addShutdownHookWithForceKillIn1Sec(this::close);
             registerWorkerNumGauge("supervisor:num-slots-used-gauge", conf);
             StormMetricsRegistry.startMetricsReporters(conf);
@@ -234,6 +310,125 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
         }
     }
 
+    @VisibleForTesting
+    public void checkAuthorization(String operation) throws AuthorizationException {
+        checkAuthorization(null, null, operation, null);
+    }
+
+    @VisibleForTesting
+    public void checkAuthorization(String topoName, Map<String, Object> topoConf, String operation)
+        throws AuthorizationException {
+        checkAuthorization(topoName, topoConf, operation, null);
+    }
+
+    @VisibleForTesting
+    public void checkAuthorization(String topoName, Map<String, Object> topoConf, String operation, ReqContext context)
+            throws AuthorizationException {
+        IAuthorizer aclHandler = authorizationHandler;
+        if (context == null) {
+            context = ReqContext.context();
+        }
+        Map<String, Object> checkConf = new HashMap<>();
+        if (topoConf != null) {
+            checkConf.putAll(topoConf);
+        } else if (topoName != null) {
+            checkConf.put(Config.TOPOLOGY_NAME, topoName);
+        }
+
+        if (context.isImpersonating()) {
+            LOG.warn("principal: {} is trying to impersonate principal: {}", context.realPrincipal(),
+                    context.principal());
+            throw new AuthorizationException("Supervisor does not support impersonation");
+        }
+
+        if (aclHandler != null) {
+            if (!aclHandler.permit(context, operation, checkConf)) {
+                ThriftAccessLogger.logAccess(context.requestID(), context.remoteAddress(), context.principal(),
+                        operation, topoName, "access-denied");
+                throw new AuthorizationException( operation + (topoName != null ? " on topology " + topoName : "") +
+                        " is not authorized");
+            } else {
+                ThriftAccessLogger.logAccess(context.requestID(), context.remoteAddress(), context.principal(),
+                        operation, topoName, "access-granted");
+            }
+        }
+    }
+
+    private void launchSupervisorThriftServer(Map<String, Object> conf) throws IOException {
+        // validate port
+        int port = getThriftServerPort();
+        try {
+            ServerSocket socket = new ServerSocket(port);
+            socket.close();
+        } catch (BindException e) {
+            LOG.error("{} is not available. Check if another process is already listening on {}", port, port);
+            throw new RuntimeException(e);
+        }
+
+        TProcessor processor = new org.apache.storm.generated.Supervisor.Processor(
+                new org.apache.storm.generated.Supervisor.Iface() {
+                    @Override
+                    public void sendSupervisorAssignments(SupervisorAssignments assignments)
+                            throws AuthorizationException, TException {
+                        checkAuthorization("sendSupervisorAssignments");
+                        LOG.info("Got an assignments from master, will start to sync with assignments: {}", assignments);
+                        SynchronizeAssignments syn = new SynchronizeAssignments(getSupervisor(), assignments,
+                                                             getReadClusterState());
+                        getEventManger().add(syn);
+                    }
+
+                    @Override
+                    public Assignment getLocalAssignmentForStorm(String id)
+                            throws NotAliveException, AuthorizationException, TException {
+                        Map<String, Object> topoConf = null;
+                        try {
+                            topoConf = ConfigUtils.readSupervisorStormConf(conf, id);
+                        } catch (IOException e) {
+                            LOG.warn("Topology config is not localized yet...");
+                        }
+                        checkAuthorization(id, topoConf, "getLocalAssignmentForStorm");
+                        Assignment assignment = getStormClusterState().assignmentInfo(id, null);
+                        if (null == assignment) {
+                            throw new NotAliveException("No local assignment assigned for storm: "
+                                                            + id
+                                                            + " for node: "
+                                                            + getHostName());
+                        }
+                        return assignment;
+                    }
+
+                    @Override
+                    public void sendSupervisorWorkerHeartbeat(SupervisorWorkerHeartbeat heartbeat)
+                            throws AuthorizationException, NotAliveException, TException {
+                        // do nothing except validate heartbeat for now.
+                        String id = heartbeat.get_storm_id();
+                        Map<String, Object> topoConf = null;
+                        try {
+                            topoConf = ConfigUtils.readSupervisorStormConf(conf, id);
+                        } catch (IOException e) {
+                            LOG.warn("Topology config is not localized yet...");
+                            throw new NotAliveException(id + " does not appear to be alive, you should probably exit");
+                        }
+                        checkAuthorization(id, topoConf, "sendSupervisorWorkerHeartbeat");
+                    }
+                });
+        this.thriftServer = new ThriftServer(conf, processor, ThriftConnectionType.SUPERVISOR);
+        this.thriftServer.serve();
+    }
+
+    /**
+     * Used for local cluster assignments distribution.
+     * @param assignments {@link SupervisorAssignments}
+     */
+    public void sendSupervisorAssignments(SupervisorAssignments assignments) {
+        //for local test
+        if (Time.isSimulating() && !(Boolean) conf.get(DaemonConfig.SUPERVISOR_ENABLE)) {
+            return;
+        }
+        SynchronizeAssignments syn = new SynchronizeAssignments(this, assignments, readState);
+        eventManager.add(syn);
+    }
+
     private void registerWorkerNumGauge(String name, final Map<String, Object> conf) {
         StormMetricsRegistry.registerGauge(name, new Callable<Integer>() {
             @Override
@@ -250,6 +445,7 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
             LOG.info("Shutting down supervisor {}", getId());
             this.active = false;
             heartbeatTimer.close();
+            workerHeartbeatTimer.close();
             eventTimer.close();
             if (eventManager != null) {
                 eventManager.close();
@@ -259,6 +455,9 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
             }
             asyncLocalizer.close();
             getStormClusterState().disconnect();
+            if(thriftServer != null) {
+                this.thriftServer.stop();
+            }
         } catch (Exception e) {
             LOG.error("Error Shutting down", e);
         }
@@ -307,7 +506,8 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
             readState.shutdownAllWorkers(onWarnTimeout, onErrorTimeout);
         } else {
             try {
-                ContainerLauncher launcher = ContainerLauncher.make(getConf(), getId(), getSharedContext());
+                ContainerLauncher launcher = ContainerLauncher.make(getConf(), getId(), getThriftServerPort(),
+                    getSharedContext());
                 killWorkers(SupervisorUtils.supervisorWorkerIds(conf), launcher);
             } catch (Exception e) {
                 throw Utils.wrapInRuntime(e);
@@ -321,18 +521,18 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
             return true;
         }
 
-        if (heartbeatTimer.isTimerWaiting() && eventTimer.isTimerWaiting() && eventManager.waiting()) {
-            return true;
-        }
-        return false;
+        return heartbeatTimer.isTimerWaiting()
+                && workerHeartbeatTimer.isTimerWaiting()
+                && eventTimer.isTimerWaiting()
+                && eventManager.waiting();
     }
 
     /**
-     * supervisor daemon enter entrance
+     * supervisor daemon enter entrance.
      *
      * @param args
      */
-    public static void main(String[] args) throws IOException {
+    public static void main(String[] args) throws Exception {
         Utils.setupDefaultUncaughtExceptionHandler();
         @SuppressWarnings("resource")
         Supervisor instance = new Supervisor(new StandaloneSupervisor());

http://git-wip-us.apache.org/repos/asf/storm/blob/0dac58b0/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java
index aba9459..90e5451 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java
@@ -15,8 +15,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.daemon.supervisor;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.storm.Config;
 import org.apache.storm.generated.LSWorkerHeartbeat;
 import org.apache.storm.localizer.LocalResource;
@@ -30,13 +38,6 @@ import org.apache.zookeeper.data.ACL;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
 public class SupervisorUtils {
 
     private static final Logger LOG = LoggerFactory.getLogger(SupervisorUtils.class);
@@ -75,7 +76,7 @@ public class SupervisorUtils {
 
     /**
      * Given the blob information returns the value of the workerRestart field, handling it either being a string or a boolean value, or
-     * if it's not specified then returns false
+     * if it's not specified then returns false.
      *
      * @param blobInfo the info for the blob.
      * @return true if the blob needs a worker restart by way of the callback else false.

http://git-wip-us.apache.org/repos/asf/storm/blob/0dac58b0/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/ReportWorkerHeartbeats.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/ReportWorkerHeartbeats.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/ReportWorkerHeartbeats.java
new file mode 100644
index 0000000..c01819b
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/ReportWorkerHeartbeats.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.daemon.supervisor.timer;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.storm.daemon.supervisor.Supervisor;
+import org.apache.storm.daemon.supervisor.SupervisorUtils;
+import org.apache.storm.generated.LSWorkerHeartbeat;
+import org.apache.storm.generated.SupervisorWorkerHeartbeat;
+import org.apache.storm.generated.SupervisorWorkerHeartbeats;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.NimbusClient;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Runnable reporting local worker reported heartbeats to master, supervisor should take care the of the heartbeats
+ * integrity for the master heartbeats recovery, a non-null node id means that the heartbeats are full,
+ * and master can go on to check and wait others nodes when doing a heartbeats recovery.
+ */
+public class ReportWorkerHeartbeats implements Runnable {
+    private static final Logger LOG = LoggerFactory.getLogger(ReportWorkerHeartbeats.class);
+
+    private Supervisor supervisor;
+    private Map<String, Object> conf;
+
+    public ReportWorkerHeartbeats(Map<String, Object> conf, Supervisor supervisor) {
+        this.conf = conf;
+        this.supervisor = supervisor;
+    }
+
+    @Override
+    public void run() {
+        SupervisorWorkerHeartbeats supervisorWorkerHeartbeats = getAndResetWorkerHeartbeats();
+        reportWorkerHeartbeats(supervisorWorkerHeartbeats);
+    }
+
+    private SupervisorWorkerHeartbeats getAndResetWorkerHeartbeats() {
+        Map<String, LSWorkerHeartbeat> localHeartbeats;
+        try {
+            localHeartbeats = SupervisorUtils.readWorkerHeartbeats(this.conf);
+            return getSupervisorWorkerHeartbeatsFromLocal(localHeartbeats);
+        } catch (Exception e) {
+            LOG.error("Read local worker heartbeats error, skipping heartbeats for this round, msg:{}", e.getMessage());
+            return null;
+        }
+    }
+
+    private SupervisorWorkerHeartbeats getSupervisorWorkerHeartbeatsFromLocal(Map<String, LSWorkerHeartbeat> localHeartbeats) {
+        SupervisorWorkerHeartbeats supervisorWorkerHeartbeats = new SupervisorWorkerHeartbeats();
+
+        List<SupervisorWorkerHeartbeat> heartbeatList = new ArrayList<>();
+
+        for (LSWorkerHeartbeat lsWorkerHeartbeat : localHeartbeats.values()) {
+            // local worker heartbeat can be null cause some error/exception
+            if (null == lsWorkerHeartbeat) {
+                continue;
+            }
+
+            SupervisorWorkerHeartbeat supervisorWorkerHeartbeat = new SupervisorWorkerHeartbeat();
+            supervisorWorkerHeartbeat.set_storm_id(lsWorkerHeartbeat.get_topology_id());
+            supervisorWorkerHeartbeat.set_executors(lsWorkerHeartbeat.get_executors());
+            supervisorWorkerHeartbeat.set_time_secs(lsWorkerHeartbeat.get_time_secs());
+
+            heartbeatList.add(supervisorWorkerHeartbeat);
+        }
+        supervisorWorkerHeartbeats.set_supervisor_id(this.supervisor.getId());
+        supervisorWorkerHeartbeats.set_worker_heartbeats(heartbeatList);
+        return supervisorWorkerHeartbeats;
+    }
+
+    private void reportWorkerHeartbeats(SupervisorWorkerHeartbeats supervisorWorkerHeartbeats) {
+        if (supervisorWorkerHeartbeats == null) {
+            // error/exception thrown, just skip
+            return;
+        }
+        // if it is local mode, just get the local nimbus instance and set the heartbeats
+        if (ConfigUtils.isLocalMode(conf)) {
+            try {
+                this.supervisor.getLocalNimbus().sendSupervisorWorkerHeartbeats(supervisorWorkerHeartbeats);
+            } catch (TException tex) {
+                LOG.error("Send local supervisor heartbeats error", tex);
+            }
+        } else {
+            try (NimbusClient master = NimbusClient.getConfiguredClient(conf)) {
+                master.getClient().sendSupervisorWorkerHeartbeats(supervisorWorkerHeartbeats);
+            } catch (Exception t) {
+                LOG.error("Send worker heartbeats to master exception", t);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/0dac58b0/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
index 2be241a..14ecf94 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.daemon.supervisor.timer;
 
 import java.util.ArrayList;
@@ -33,10 +34,10 @@ import org.apache.storm.utils.Time;
 
 public class SupervisorHeartbeat implements Runnable {
 
-     private final IStormClusterState stormClusterState;
-     private final String supervisorId;
-     private final Map<String, Object> conf;
-     private final Supervisor supervisor;
+    private final IStormClusterState stormClusterState;
+    private final String supervisorId;
+    private final Map<String, Object> conf;
+    private final Supervisor supervisor;
 
     public SupervisorHeartbeat(Map<String, Object> conf, Supervisor supervisor) {
         this.stormClusterState = supervisor.getStormClusterState();
@@ -50,17 +51,19 @@ public class SupervisorHeartbeat implements Runnable {
         supervisorInfo.set_time_secs(Time.currentTimeSecs());
         supervisorInfo.set_hostname(supervisor.getHostName());
         supervisorInfo.set_assignment_id(supervisor.getAssignmentId());
+        supervisorInfo.set_server_port(supervisor.getThriftServerPort());
 
         List<Long> usedPorts = new ArrayList<>();
         usedPorts.addAll(supervisor.getCurrAssignment().get().keySet());
         supervisorInfo.set_used_ports(usedPorts);
         List metaDatas = (List)supervisor.getiSupervisor().getMetadata();
         List<Long> portList = new ArrayList<>();
-        if (metaDatas != null){
-            for (Object data : metaDatas){
+        if (metaDatas != null) {
+            for (Object data : metaDatas) {
                 Integer port = ObjectReader.getInt(data);
-                if (port != null)
+                if (port != null) {
                     portList.add(port.longValue());
+                }
             }
         }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/0dac58b0/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SynchronizeAssignments.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SynchronizeAssignments.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SynchronizeAssignments.java
new file mode 100644
index 0000000..ba8d133
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SynchronizeAssignments.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.daemon.supervisor.timer;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.daemon.supervisor.ReadClusterState;
+import org.apache.storm.daemon.supervisor.Supervisor;
+import org.apache.storm.generated.Assignment;
+import org.apache.storm.generated.SupervisorAssignments;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.NimbusClient;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A runnable which will synchronize assignments to node local and then worker processes.
+ */
+public class SynchronizeAssignments implements Runnable {
+    private static final Logger LOG = LoggerFactory.getLogger(SynchronizeAssignments.class);
+
+    private Supervisor supervisor;
+    private SupervisorAssignments assignments;
+    private ReadClusterState readClusterState;
+
+    /**
+     * Constructor.
+     * @param supervisor {@link Supervisor}
+     * @param assignments {@link SupervisorAssignments}
+     * @param readClusterState {@link ReadClusterState}
+     */
+    public SynchronizeAssignments(Supervisor supervisor, SupervisorAssignments assignments, ReadClusterState readClusterState) {
+        this.supervisor = supervisor;
+        this.assignments = assignments;
+        this.readClusterState = readClusterState;
+    }
+
+    @Override
+    public void run() {
+        // first sync assignments to local, then sync processes.
+        if (null == assignments) {
+            getAssignmentsFromMaster(this.supervisor.getConf(), this.supervisor.getStormClusterState(), this.supervisor.getAssignmentId());
+        } else {
+            assignedAssignmentsToLocal(this.supervisor.getStormClusterState(), assignments);
+        }
+        this.readClusterState.run();
+    }
+
+    /**
+     * Used by {@link Supervisor} to fetch assignments when start up.
+     * @param supervisor {@link Supervisor}
+     */
+    public void getAssignmentsFromMasterUntilSuccess(Supervisor supervisor) {
+        boolean success = false;
+        while (!success) {
+            try (NimbusClient master = NimbusClient.getConfiguredClient(supervisor.getConf())) {
+                SupervisorAssignments assignments = master.getClient().getSupervisorAssignments(supervisor.getAssignmentId());
+                assignedAssignmentsToLocal(supervisor.getStormClusterState(), assignments);
+                success = true;
+            } catch (Exception t) {
+                // just ignore the exception
+            }
+            if (!success) {
+                LOG.info("Waiting for a success sync of assignments from master...");
+                try {
+                    Time.sleep(5000L);
+                } catch (InterruptedException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+
+        }
+
+    }
+
+    /**
+     * Used by {@link Supervisor} to fetch assignments when start up.
+     * @param conf config
+     * @param clusterState {@link IStormClusterState}
+     * @param node id of node
+     */
+    public void getAssignmentsFromMaster(Map conf, IStormClusterState clusterState, String node) {
+        if (ConfigUtils.isLocalMode(conf)) {
+            try {
+                SupervisorAssignments assignments = this.supervisor.getLocalNimbus().getSupervisorAssignments(node);
+                assignedAssignmentsToLocal(clusterState, assignments);
+            } catch (TException e) {
+                LOG.error("Get assignments from local master exception", e);
+            }
+        } else {
+            try (NimbusClient master = NimbusClient.getConfiguredClient(conf)) {
+                SupervisorAssignments assignments = master.getClient().getSupervisorAssignments(node);
+                LOG.debug("Sync an assignments from master, will start to sync with assignments: {}", assignments);
+                assignedAssignmentsToLocal(clusterState, assignments);
+            } catch (Exception t) {
+                LOG.error("Get assignments from master exception", t);
+            }
+        }
+    }
+
+    private static void assignedAssignmentsToLocal(IStormClusterState clusterState, SupervisorAssignments assignments) {
+        if (null == assignments) {
+            //unknown error, just skip
+            return;
+        }
+        Map<String, byte[]> serAssignments = new HashMap<>();
+        for (Map.Entry<String, Assignment> entry : assignments.get_storm_assignment().entrySet()) {
+            serAssignments.put(entry.getKey(), Utils.serialize(entry.getValue()));
+        }
+        clusterState.syncRemoteAssignments(serAssignments);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/0dac58b0/storm-server/src/main/java/org/apache/storm/nimbus/AssignmentDistributionService.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/nimbus/AssignmentDistributionService.java b/storm-server/src/main/java/org/apache/storm/nimbus/AssignmentDistributionService.java
new file mode 100644
index 0000000..558e570
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/nimbus/AssignmentDistributionService.java
@@ -0,0 +1,312 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.nimbus;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.storm.DaemonConfig;
+import org.apache.storm.daemon.supervisor.Supervisor;
+import org.apache.storm.generated.SupervisorAssignments;
+import org.apache.storm.scheduler.SupervisorDetails;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.SupervisorClient;
+import org.apache.storm.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A service for distributing master assignments to supervisors, this service makes the assignments notification
+ * asynchronous.
+ *
+ * <p>We support multiple working threads to distribute assignment, every thread has a queue buffer.
+ *
+ * <p>Master will shuffle its node request to the queues, if the target queue is full, we just discard the request,
+ * let the supervisors sync instead.
+ *
+ * <p>Caution: this class is not thread safe.
+ *
+ * <pre>{@code
+ * Working mode
+ *                      +--------+         +-----------------+
+ *                      | queue1 |   ==>   | Working thread1 |
+ * +--------+ shuffle   +--------+         +-----------------+
+ * | Master |   ==>
+ * +--------+           +--------+         +-----------------+
+ *                      | queue2 |   ==>   | Working thread2 |
+ *                      +--------+         +-----------------+
+ * }
+ * </pre>
+ */
+public class AssignmentDistributionService implements Closeable {
+    private static final Logger LOG = LoggerFactory.getLogger(AssignmentDistributionService.class);
+    private ExecutorService service;
+    /**
+     * Flag to indicate if the service is active.
+     */
+    private volatile boolean active = false;
+
+    private Random random;
+    /**
+     * Working threads num.
+     */
+    private int threadsNum = 0;
+    /**
+     * Working thread queue size.
+     */
+    private int queueSize = 0;
+
+    /**
+     * Assignments request queue.
+     */
+    private volatile Map<Integer, LinkedBlockingQueue<NodeAssignments>> assignmentsQueue;
+
+    /**
+     * local supervisors for local cluster assignments distribution.
+     */
+    private Map<String, Supervisor> localSupervisors;
+
+    private Map conf;
+
+    private boolean isLocalMode = false; // boolean cache for local mode decision
+
+    /**
+     * Function for initialization.
+     *
+     * @param conf config
+     */
+    public void prepare(Map conf) {
+        this.conf = conf;
+        this.random = new Random(47);
+
+        this.threadsNum = ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_ASSIGNMENTS_SERVICE_THREADS), 10);
+        this.queueSize = ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_ASSIGNMENTS_SERVICE_THREAD_QUEUE_SIZE), 100);
+
+        this.assignmentsQueue = new HashMap<>();
+        for (int i = 0; i < threadsNum; i++) {
+            this.assignmentsQueue.put(i, new LinkedBlockingQueue<NodeAssignments>(queueSize));
+        }
+        //start the thread pool
+        this.service = Executors.newFixedThreadPool(threadsNum);
+        this.active = true;
+        //start the threads
+        for (int i = 0; i < threadsNum; i++) {
+            this.service.submit(new DistributeTask(this, i));
+        }
+        // for local cluster
+        localSupervisors = new HashMap<>();
+        if (ConfigUtils.isLocalMode(conf)) {
+            isLocalMode = true;
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        this.active = false;
+        this.service.shutdownNow();
+        try {
+            this.service.awaitTermination(1L, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+            LOG.error("Failed to close assignments distribute service");
+        }
+        this.assignmentsQueue = null;
+    }
+
+    /**
+     * Add an assignments for a node/supervisor for distribution.
+     * @param node node id of supervisor.
+     * @param host host name for the node.
+     * @param serverPort node thrift server port.
+     * @param assignments the {@link org.apache.storm.generated.SupervisorAssignments}
+     */
+    public void addAssignmentsForNode(String node, String host, Integer serverPort, SupervisorAssignments assignments) {
+        try {
+            //For some reasons, we can not get supervisor port info, eg: supervisor shutdown,
+            //Just skip for this scheduling round.
+            if (serverPort == null) {
+                LOG.warn("Discard an assignment distribution for node {} because server port info is missing.", node);
+                return;
+            }
+
+            boolean success = nextQueue().offer(NodeAssignments.getInstance(node, host, serverPort, assignments), 5L, TimeUnit.SECONDS);
+            if (!success) {
+                LOG.warn("Discard an assignment distribution for node {} because the target sub queue is full.", node);
+            }
+
+        } catch (InterruptedException e) {
+            LOG.error("Add node assignments interrupted: {}", e.getMessage());
+            throw new RuntimeException(e);
+        }
+    }
+
+    static class NodeAssignments {
+        private String node;
+        private String host;
+        private Integer serverPort;
+        private SupervisorAssignments assignments;
+
+        private NodeAssignments(String node, String host, Integer serverPort, SupervisorAssignments assignments) {
+            this.node = node;
+            this.host = host;
+            this.serverPort = serverPort;
+            this.assignments = assignments;
+        }
+
+        public static NodeAssignments getInstance(String node, String host, Integer serverPort,
+            SupervisorAssignments assignments) {
+            return new NodeAssignments(node, host, serverPort, assignments);
+        }
+
+        //supervisor assignment id/supervisor id
+        public String getNode() {
+            return this.node;
+        }
+
+        public String getHost() {
+            return host;
+        }
+
+        public Integer getServerPort() {
+            return serverPort;
+        }
+
+        public SupervisorAssignments getAssignments() {
+            return this.assignments;
+        }
+
+    }
+
+    /**
+     * Task to distribute assignments.
+     */
+    static class DistributeTask implements Runnable {
+        private AssignmentDistributionService service;
+        private Integer queueIndex;
+
+        DistributeTask(AssignmentDistributionService service, Integer index) {
+            this.service = service;
+            this.queueIndex = index;
+        }
+
+        @Override
+        public void run() {
+            while (service.isActive()) {
+                try {
+                    NodeAssignments nodeAssignments = this.service.nextAssignments(queueIndex);
+                    sendAssignmentsToNode(nodeAssignments);
+                } catch (InterruptedException e) {
+                    if (service.isActive()) {
+                        LOG.error("Get an unexpected interrupt when distributing assignments to node, {}", e.getCause());
+                    } else {
+                        // service is off now just interrupt it.
+                        Thread.currentThread().interrupt();
+                    }
+                }
+            }
+        }
+
+        private void sendAssignmentsToNode(NodeAssignments assignments) {
+            if (this.service.isLocalMode) {
+                //local node
+                Supervisor supervisor = this.service.localSupervisors.get(assignments.getNode());
+                if (supervisor != null) {
+                    supervisor.sendSupervisorAssignments(assignments.getAssignments());
+                } else {
+                    LOG.error("Can not find node {} for assignments distribution", assignments.getNode());
+                    throw new RuntimeException("null for node " + assignments.getNode() +  " supervisor instance.");
+                }
+            } else {
+                // distributed mode
+                try (SupervisorClient client = SupervisorClient.getConfiguredClient(service.getConf(),
+                    assignments.getHost(), assignments.getServerPort())){
+                    try {
+                        client.getClient().sendSupervisorAssignments(assignments.getAssignments());
+                    } catch (Exception e) {
+                        //just ignore the exception.
+                        LOG.error("Exception when trying to send assignments to node {}: {}", assignments.getNode(), e.getMessage());
+                    }
+                } catch (Throwable e) {
+                    //just ignore any error/exception.
+                    LOG.error("Exception to create supervisor client for node {}: {}", assignments.getNode(), e.getMessage());
+                }
+
+            }
+        }
+    }
+
+    public void addLocalSupervisor(Supervisor supervisor) {
+        this.localSupervisors.put(supervisor.getId(), supervisor);
+    }
+
+    private Integer nextQueueId() {
+        return this.random.nextInt(threadsNum);
+    }
+
+    private LinkedBlockingQueue<NodeAssignments> nextQueue() {
+        return this.assignmentsQueue.get(nextQueueId());
+    }
+
+    private LinkedBlockingQueue<NodeAssignments> getQueueById(Integer queueIndex) {
+        return this.assignmentsQueue.get(queueIndex);
+    }
+
+    /**
+     * Get an assignments from the target queue with the specific index.
+     * @param queueIndex index of the queue
+     * @return an {@link NodeAssignments}
+     * @throws InterruptedException
+     */
+    public NodeAssignments nextAssignments(Integer queueIndex) throws InterruptedException {
+        NodeAssignments target = null;
+        while (true) {
+            target = getQueueById(queueIndex).poll();
+            if (target != null) {
+                return target;
+            }
+            Time.sleep(100L);
+        }
+    }
+
+    public boolean isActive() {
+        return this.active;
+    }
+
+    public Map getConf() {
+        return this.conf;
+    }
+
+    /**
+     * Factory method for initialize a instance.
+     * @param conf config.
+     * @return an instance of {@link AssignmentDistributionService}
+     */
+    public static AssignmentDistributionService getInstance(Map conf) {
+        AssignmentDistributionService service = new AssignmentDistributionService();
+        service.prepare(conf);
+        return service;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/0dac58b0/storm-server/src/main/java/org/apache/storm/nimbus/ILeaderElector.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/nimbus/ILeaderElector.java b/storm-server/src/main/java/org/apache/storm/nimbus/ILeaderElector.java
index c4a0f64..e362fbc 100644
--- a/storm-server/src/main/java/org/apache/storm/nimbus/ILeaderElector.java
+++ b/storm-server/src/main/java/org/apache/storm/nimbus/ILeaderElector.java
@@ -47,19 +47,19 @@ public interface ILeaderElector extends Closeable {
     void removeFromLeaderLockQueue() throws Exception;
 
     /**
-     *
+     * Decide if the caller currently has the leader lock.
      * @return true if the caller currently has the leader lock.
      */
     boolean isLeader() throws Exception;
 
     /**
-     *
+     * Get the current leader's address.
      * @return the current leader's address , may return null if no one has the lock.
      */
     NimbusInfo getLeader();
 
     /**
-     *
+     * Get list of current nimbus addresses.
      * @return list of current nimbus addresses, includes leader.
      */
     List<NimbusInfo> getAllNimbuses()throws Exception;

http://git-wip-us.apache.org/repos/asf/storm/blob/0dac58b0/storm-server/src/main/java/org/apache/storm/nimbus/IWorkerHeartbeatsRecoveryStrategy.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/nimbus/IWorkerHeartbeatsRecoveryStrategy.java b/storm-server/src/main/java/org/apache/storm/nimbus/IWorkerHeartbeatsRecoveryStrategy.java
new file mode 100644
index 0000000..bde58dc
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/nimbus/IWorkerHeartbeatsRecoveryStrategy.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.nimbus;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Interface for strategy to recover heartbeats when master gains leadership.
+ */
+public interface IWorkerHeartbeatsRecoveryStrategy {
+
+    /**
+     * Function to prepare the strategy.
+     * @param conf config
+     */
+    void prepare(Map conf);
+
+    /**
+     * Function to decide if the heartbeats is ready.
+     * @param nodeIds all the node ids from current physical plan[assignments], read from {@code ClusterState}
+     * @return true if all node worker heartbeats reported
+     */
+    boolean isReady(Set<String> nodeIds);
+
+    /**
+     * report the node id to this strategy to help to decide {@code isReady}.
+     * @param nodeId the node id from reported SupervisorWorkerHeartbeats
+     */
+    void reportNodeId(String nodeId);
+}
+

http://git-wip-us.apache.org/repos/asf/storm/blob/0dac58b0/storm-server/src/main/java/org/apache/storm/nimbus/LeaderListenerCallback.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/nimbus/LeaderListenerCallback.java b/storm-server/src/main/java/org/apache/storm/nimbus/LeaderListenerCallback.java
new file mode 100644
index 0000000..59cd462
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/nimbus/LeaderListenerCallback.java
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.nimbus;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Sets;
+
+import javax.security.auth.Subject;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.leader.LeaderLatch;
+import org.apache.storm.Config;
+import org.apache.storm.blobstore.BlobStore;
+import org.apache.storm.blobstore.InputStreamWithMeta;
+import org.apache.storm.cluster.ClusterUtils;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.daemon.nimbus.TopoCache;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.KeyNotFoundException;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.security.auth.ReqContext;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.zookeeper.ClientZookeeper;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.data.ACL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A callback function when nimbus gains leadership.
+ */
+public class LeaderListenerCallback {
+    private static final Logger LOG = LoggerFactory.getLogger(LeaderListenerCallback.class);
+
+    private final BlobStore blobStore;
+    private final TopoCache tc;
+    private final IStormClusterState clusterState;
+
+    private final CuratorFramework zk;
+    private final LeaderLatch leaderLatch;
+
+    private final Map conf;
+    private final List<ACL> acls;
+
+    private static final String STORM_JAR_SUFFIX = "-stormjar.jar";
+    private static final String STORM_CODE_SUFFIX = "-stormcode.ser";
+    private static final String STORM_CONF_SUFFIX = "-stormconf.ser";
+
+    /**
+     * Constructor for {@LeaderListenerCallback}.
+     * @param conf config
+     * @param zk zookeeper CuratorFramework client
+     * @param leaderLatch LeaderLatch
+     * @param blobStore BlobStore
+     * @param tc TopoCache
+     * @param clusterState IStormClusterState
+     * @param acls zookeeper acls
+     */
+    public LeaderListenerCallback(Map conf, CuratorFramework zk, LeaderLatch leaderLatch, BlobStore blobStore,
+                                  TopoCache tc, IStormClusterState clusterState, List<ACL> acls) {
+        this.blobStore = blobStore;
+        this.tc = tc;
+        this.clusterState = clusterState;
+        this.zk = zk;
+        this.leaderLatch = leaderLatch;
+        this.conf = conf;
+        this.acls = acls;
+    }
+
+    /**
+     * Invoke when gains leadership.
+     */
+    public void leaderCallBack() {
+        //set up nimbus-info to zk
+        setUpNimbusInfo(acls);
+        //sync zk assignments/id-info to local
+        LOG.info("Sync remote assignments and id-info to local");
+        clusterState.syncRemoteAssignments(null);
+        clusterState.syncRemoteIds(null);
+        clusterState.setAssignmentsBackendSynchronized();
+
+        Set<String> activeTopologyIds = new TreeSet<>(ClientZookeeper.getChildren(zk,
+            ClusterUtils.STORMS_SUBTREE, false));
+
+        Set<String> activeTopologyBlobKeys = populateTopologyBlobKeys(activeTopologyIds);
+        Set<String> activeTopologyCodeKeys = filterTopologyCodeKeys(activeTopologyBlobKeys);
+        Set<String> allLocalBlobKeys = Sets.newHashSet(blobStore.listKeys());
+        Set<String> allLocalTopologyBlobKeys = filterTopologyBlobKeys(allLocalBlobKeys);
+
+        // this finds all active topologies blob keys from all local topology blob keys
+        Sets.SetView<String> diffTopology = Sets.difference(activeTopologyBlobKeys, allLocalTopologyBlobKeys);
+        LOG.info("active-topology-blobs [{}] local-topology-blobs [{}] diff-topology-blobs [{}]",
+                generateJoinedString(activeTopologyIds), generateJoinedString(allLocalTopologyBlobKeys),
+                generateJoinedString(diffTopology));
+
+        if (diffTopology.isEmpty()) {
+            Set<String> activeTopologyDependencies = getTopologyDependencyKeys(activeTopologyCodeKeys);
+
+            // this finds all dependency blob keys from active topologies from all local blob keys
+            Sets.SetView<String> diffDependencies = Sets.difference(activeTopologyDependencies, allLocalBlobKeys);
+            LOG.info("active-topology-dependencies [{}] local-blobs [{}] diff-topology-dependencies [{}]",
+                    generateJoinedString(activeTopologyDependencies), generateJoinedString(allLocalBlobKeys),
+                    generateJoinedString(diffDependencies));
+
+            if (diffDependencies.isEmpty()) {
+                LOG.info("Accepting leadership, all active topologies and corresponding dependencies found locally.");
+                tc.clear();
+            } else {
+                LOG.info("Code for all active topologies is available locally, but some dependencies are not found locally, "
+                        + "giving up leadership.");
+                closeLatch();
+            }
+        } else {
+            LOG.info("code for all active topologies not available locally, giving up leadership.");
+            closeLatch();
+        }
+    }
+
+    /**
+     * Invoke when lost leadership.
+     */
+    public void notLeaderCallback() {
+        tc.clear();
+    }
+
+    private void setUpNimbusInfo(List<ACL> acls) {
+        String leaderInfoPath = conf.get(Config.STORM_ZOOKEEPER_ROOT) + ClusterUtils.LEADERINFO_SUBTREE;
+        NimbusInfo nimbusInfo = NimbusInfo.fromConf(conf);
+        if (ClientZookeeper.existsNode(zk, leaderInfoPath, false)) {
+            ClientZookeeper.setData(zk, leaderInfoPath, Utils.javaSerialize(nimbusInfo));
+        } else {
+            ClientZookeeper.createNode(zk, leaderInfoPath, Utils.javaSerialize(nimbusInfo), CreateMode.PERSISTENT, acls);
+        }
+    }
+
+    private String generateJoinedString(Set<String> activeTopologyIds) {
+        return Joiner.on(",").join(activeTopologyIds);
+    }
+
+    private Set<String> populateTopologyBlobKeys(Set<String> activeTopologyIds) {
+        Set<String> activeTopologyBlobKeys = new TreeSet<>();
+        for (String activeTopologyId : activeTopologyIds) {
+            activeTopologyBlobKeys.add(activeTopologyId + STORM_JAR_SUFFIX);
+            activeTopologyBlobKeys.add(activeTopologyId + STORM_CODE_SUFFIX);
+            activeTopologyBlobKeys.add(activeTopologyId + STORM_CONF_SUFFIX);
+        }
+        return activeTopologyBlobKeys;
+    }
+
+    private Set<String> filterTopologyBlobKeys(Set<String> blobKeys) {
+        Set<String> topologyBlobKeys = new HashSet<>();
+        for (String blobKey : blobKeys) {
+            if (blobKey.endsWith(STORM_JAR_SUFFIX)
+                    || blobKey.endsWith(STORM_CODE_SUFFIX)
+                    || blobKey.endsWith(STORM_CONF_SUFFIX)) {
+                topologyBlobKeys.add(blobKey);
+            }
+        }
+        return topologyBlobKeys;
+    }
+
+    private Set<String> filterTopologyCodeKeys(Set<String> blobKeys) {
+        Set<String> topologyCodeKeys = new HashSet<>();
+        for (String blobKey : blobKeys) {
+            if (blobKey.endsWith(STORM_CODE_SUFFIX)) {
+                topologyCodeKeys.add(blobKey);
+            }
+        }
+        return topologyCodeKeys;
+    }
+
+    private Set<String> getTopologyDependencyKeys(Set<String> activeTopologyCodeKeys) {
+        Set<String> activeTopologyDependencies = new TreeSet<>();
+        Subject subject = ReqContext.context().subject();
+
+        for (String activeTopologyCodeKey : activeTopologyCodeKeys) {
+            try {
+                InputStreamWithMeta blob = blobStore.getBlob(activeTopologyCodeKey, subject);
+                byte[] blobContent = IOUtils.readFully(blob, new Long(blob.getFileLength()).intValue());
+                StormTopology stormCode = Utils.deserialize(blobContent, StormTopology.class);
+                if (stormCode.is_set_dependency_jars()) {
+                    activeTopologyDependencies.addAll(stormCode.get_dependency_jars());
+                }
+                if (stormCode.is_set_dependency_artifacts()) {
+                    activeTopologyDependencies.addAll(stormCode.get_dependency_artifacts());
+                }
+            } catch (AuthorizationException | KeyNotFoundException | IOException e) {
+                LOG.error("Exception occurs while reading blob for key: "
+                        + activeTopologyCodeKey
+                        + ", exception: "
+                        + e, e);
+                throw new RuntimeException("Exception occurs while reading blob for key: "
+                        + activeTopologyCodeKey
+                        + ", exception: " + e, e);
+            }
+        }
+        return activeTopologyDependencies;
+    }
+
+    private void closeLatch() {
+        try {
+            leaderLatch.close();
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/0dac58b0/storm-server/src/main/java/org/apache/storm/nimbus/NimbusHeartbeatsPressureTest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/nimbus/NimbusHeartbeatsPressureTest.java b/storm-server/src/main/java/org/apache/storm/nimbus/NimbusHeartbeatsPressureTest.java
new file mode 100644
index 0000000..f725f5d
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/nimbus/NimbusHeartbeatsPressureTest.java
@@ -0,0 +1,241 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.nimbus;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.storm.Config;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.SupervisorWorkerHeartbeat;
+import org.apache.storm.generated.SupervisorWorkerHeartbeats;
+import org.apache.storm.utils.NimbusClient;
+import org.apache.storm.utils.Utils;
+import org.apache.thrift.TException;
+import org.apache.thrift.transport.TTransportException;
+
+/**
+ * Test for nimbus heartbeats max throughput, This is a client to collect the statistics.
+ */
+public class NimbusHeartbeatsPressureTest {
+    /**
+     * the args below can be configured.
+     */
+    private static String NIMBUS_HOST = "localhost";
+    private static int NIMBUS_PORT = 6627;
+
+    private static int THREADS_NUM = 50;
+    private static int THREAD_SUBMIT_NUM = 1;
+    private static int MOCKED_STORM_NUM = 5000;
+    private static volatile boolean[] readyFlags = new boolean[THREADS_NUM];
+
+    static {
+        for (int i = 0; i < THREADS_NUM; i++) {
+            readyFlags[i] = false;
+        }
+    }
+
+    private static Random rand = new Random(47);
+    private static List<double[]> totalCostTimesBook = new ArrayList<>();
+
+    /**
+     * Initialize a fake config.
+     * @return conf
+     */
+    private static Config initializedConfig() {
+        Config conf = new Config();
+        conf.putAll(Utils.readDefaultConfig());
+        ArrayList<String> nimbusSeeds = new ArrayList<>();
+        nimbusSeeds.add(NIMBUS_HOST);
+
+        conf.put(Config.NIMBUS_SEEDS, nimbusSeeds);
+        conf.put(Config.NIMBUS_THRIFT_PORT, NIMBUS_PORT);
+        return conf;
+    }
+
+    /**
+     * Test max throughput with the specific config args.
+     */
+    public static void testMaxThroughput() {
+        ExecutorService service = Executors.newFixedThreadPool(THREADS_NUM);
+
+        long submitStart = System.currentTimeMillis();
+        for (int i = 0; i < THREADS_NUM; i++) {
+            service.submit(new HeartbeatSendTask(i, THREAD_SUBMIT_NUM));
+        }
+        long submitEnd = System.currentTimeMillis();
+        println(THREADS_NUM + " tasks, " + THREAD_SUBMIT_NUM * THREADS_NUM + " submit cost "
+                + (submitEnd - submitStart) / 1000D + "seconds");
+        long totalStart = System.currentTimeMillis();
+        while (!allTasksReady()) {
+            try {
+
+                Thread.sleep(10L);
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+        }
+        long totalEnd = System.currentTimeMillis();
+        println(THREADS_NUM + " tasks, " + THREAD_SUBMIT_NUM * THREADS_NUM
+                + " requests cost " + (totalEnd - totalStart) / 1000D + "seconds");
+        printStatistics(totalCostTimesBook);
+        try {
+            service.shutdownNow();
+            service.awaitTermination(3, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+
+    }
+
+    private static boolean allTasksReady() {
+        for (boolean ready : readyFlags) {
+            if (!ready) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private static void println(Object msg) {
+        if (msg instanceof Collection) {
+            Iterator itr = ((Collection) msg).iterator();
+            while (itr.hasNext()) {
+                System.out.println(itr.next());
+            }
+        } else {
+            System.out.println(msg);
+        }
+    }
+
+    static class HeartbeatSendTask implements Runnable {
+        private double[] runtimesBook;
+        private int taskId;
+        private int tryTimes;
+        private NimbusClient client;
+
+        public HeartbeatSendTask(int taskId, int tryTimes) {
+            this.taskId = taskId;
+            this.tryTimes = tryTimes;
+            this.runtimesBook = new double[tryTimes];
+            try {
+                client = new NimbusClient(initializedConfig(), NIMBUS_HOST, NIMBUS_PORT, null, null);
+            } catch (TTransportException e) {
+                e.printStackTrace();
+            }
+        }
+
+        private static SupervisorWorkerHeartbeat nextMockedWorkerbeat() {
+            List<ExecutorInfo> executorInfos = new ArrayList<>();
+            executorInfos.add(new ExecutorInfo(1, 1));
+            executorInfos.add(new ExecutorInfo(2, 2));
+            executorInfos.add(new ExecutorInfo(3, 3));
+            executorInfos.add(new ExecutorInfo(4, 4));
+            SupervisorWorkerHeartbeat heartbeat = new SupervisorWorkerHeartbeat();
+            heartbeat.set_executors(executorInfos);
+            // generate a random storm id
+            heartbeat.set_storm_id("storm_name_example_" + rand.nextInt(MOCKED_STORM_NUM));
+            heartbeat.set_time_secs(1221212121);
+            return heartbeat;
+        }
+
+        private static SupervisorWorkerHeartbeats mockedHeartbeats() {
+            SupervisorWorkerHeartbeats heartbeats = new SupervisorWorkerHeartbeats();
+            heartbeats.set_supervisor_id("123124134123413412341351234143");
+            List<SupervisorWorkerHeartbeat> workers = new ArrayList<>();
+            for (int i = 0; i < 25; i++) {
+                workers.add(nextMockedWorkerbeat());
+            }
+            heartbeats.set_worker_heartbeats(workers);
+            return heartbeats;
+        }
+
+        @Override
+        public void run() {
+            try {
+                for (int i = 0; i < tryTimes; i++) {
+                    long thisBegin = System.currentTimeMillis();
+                    client.getClient().sendSupervisorWorkerHeartbeats(mockedHeartbeats());
+                    long thisEnd = System.currentTimeMillis();
+                    this.runtimesBook[i] = (thisEnd - thisBegin) / 1000D;
+                }
+                totalCostTimesBook.add(this.runtimesBook);
+                readyFlags[taskId] = true;
+                Thread.currentThread().interrupt();
+            } catch (TException e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    private static void printTimeCostArray(Double[] array) {
+        StringBuilder builder = new StringBuilder();
+        builder.append("[");
+        for (int i = 0; i < array.length; i++) {
+            if (i != array.length - 1) {
+                builder.append(array[i] + ",");
+            } else {
+                builder.append(array[i] + "");
+            }
+        }
+        builder.append("]");
+        System.out.println(builder.toString());
+    }
+
+    private static void printStatistics(List<double[]> data) {
+
+        List<Double> totalPoints = new ArrayList<>();
+        double total = 0D;
+        for (double[] item : data) {
+            for (Double point : item) {
+                if (point != null) {
+                    totalPoints.add(point);
+                    total += point;
+                }
+            }
+        }
+        Double[] totalPointsArray = new Double[totalPoints.size()];
+
+        totalPoints.toArray(totalPointsArray);
+        Arrays.sort(totalPointsArray);
+        // printTimeCostArray(totalPointsArray);
+        println("===== statistics ================");
+        println("===== min time cost: " + totalPointsArray[0] + " =====");
+        println("===== max time cost: " + totalPointsArray[totalPointsArray.length - 2] + " =====");
+
+        double meanVal = total / totalPointsArray.length;
+        println("===== mean time cost: " + meanVal + " =====");
+        int middleIndex = (int) (totalPointsArray.length * 0.5);
+        println("===== median time cost: " + totalPointsArray[middleIndex] + " =====");
+        int top90Index = (int) (totalPointsArray.length * 0.9);
+        println("===== top90 time cost: " + totalPointsArray[top90Index] + " =====");
+    }
+
+    public static void main(String[] args) {
+        testMaxThroughput();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/0dac58b0/storm-server/src/main/java/org/apache/storm/nimbus/TimeOutWorkerHeartbeatsRecoveryStrategy.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/nimbus/TimeOutWorkerHeartbeatsRecoveryStrategy.java b/storm-server/src/main/java/org/apache/storm/nimbus/TimeOutWorkerHeartbeatsRecoveryStrategy.java
new file mode 100644
index 0000000..78f880a
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/nimbus/TimeOutWorkerHeartbeatsRecoveryStrategy.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.nimbus;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.storm.Config;
+import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.Time;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.util.stream.Collectors.toSet;
+
+/**
+ * Wait for a node to report worker heartbeats until a configured timeout. For cases below we have strategies:
+ *
+ * <p>1: When nimbus gains leader ship, it will decide if the heartbeats are ready based on the reported node ids,
+ * supervisors/nodes will take care of the worker heartbeats recovery, a reported node id means all the workers
+ * heartbeats on the node are reported.
+ *
+ * <p>2: If several supervisor also crush and will never recover[or all crush for some unknown reason],
+ * workers will report their heartbeats directly to master, so it has not any effect.
+ */
+public class TimeOutWorkerHeartbeatsRecoveryStrategy implements IWorkerHeartbeatsRecoveryStrategy {
+    private static final Logger LOG = LoggerFactory.getLogger(TimeOutWorkerHeartbeatsRecoveryStrategy.class);
+
+    private static int NODE_MAX_TIMEOUT_SECS = 600;
+
+    private long startTimeSecs;
+
+    private Set<String> reportedIds;
+
+    @Override
+    public void prepare(Map conf) {
+        NODE_MAX_TIMEOUT_SECS = ObjectReader.getInt(conf.get(Config.SUPERVISOR_WORKER_HEARTBEATS_MAX_TIMEOUT_SECS), 600);
+        this.startTimeSecs = Time.currentTimeMillis() / 1000L;
+        this.reportedIds = new HashSet<>();
+    }
+
+    @Override
+    public boolean isReady(Set<String> nodeIds) {
+        if (exceedsMaxTimeOut()) {
+            Set<String> tmp = nodeIds.stream().filter(id -> !this.reportedIds.contains(id)).collect(toSet());
+            LOG.warn("Failed to recover heartbeats for nodes: {} with timeout {}s", tmp, NODE_MAX_TIMEOUT_SECS);
+            return true;
+        }
+
+        return nodeIds.stream().allMatch(id -> this.reportedIds.contains(id));
+    }
+
+    @Override
+    public void reportNodeId(String nodeId) {
+        this.reportedIds.add(nodeId);
+    }
+
+    private boolean exceedsMaxTimeOut() {
+        return (Time.currentTimeMillis() / 1000L - this.startTimeSecs) > NODE_MAX_TIMEOUT_SECS;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/0dac58b0/storm-server/src/main/java/org/apache/storm/nimbus/WorkerHeartbeatsRecoveryStrategyFactory.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/nimbus/WorkerHeartbeatsRecoveryStrategyFactory.java b/storm-server/src/main/java/org/apache/storm/nimbus/WorkerHeartbeatsRecoveryStrategyFactory.java
new file mode 100644
index 0000000..4befaf8
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/nimbus/WorkerHeartbeatsRecoveryStrategyFactory.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.nimbus;
+
+import com.google.common.base.Preconditions;
+import java.util.Map;
+import org.apache.storm.DaemonConfig;
+import org.apache.storm.utils.ReflectionUtils;
+
+/**
+ * Factory class for recovery strategy.
+ */
+public class WorkerHeartbeatsRecoveryStrategyFactory {
+
+    /**
+     * Get instance of {@link IWorkerHeartbeatsRecoveryStrategy} with conf.
+     * @param conf strategy config
+     * @return an instance of {@link IWorkerHeartbeatsRecoveryStrategy}
+     */
+    public static IWorkerHeartbeatsRecoveryStrategy getStrategy(Map<String, Object> conf) {
+        IWorkerHeartbeatsRecoveryStrategy strategy;
+        if (conf.get(DaemonConfig.NIMBUS_WORKER_HEARTBEATS_RECOVERY_STRATEGY_CLASS) != null) {
+            Object targetObj = ReflectionUtils.newInstance((String)
+                    conf.get(DaemonConfig.NIMBUS_WORKER_HEARTBEATS_RECOVERY_STRATEGY_CLASS));
+            Preconditions.checkState(targetObj instanceof IWorkerHeartbeatsRecoveryStrategy,
+                    "{} must implements IWorkerHeartbeatsRecoveryStrategy",
+                    DaemonConfig.NIMBUS_WORKER_HEARTBEATS_RECOVERY_STRATEGY_CLASS);
+            strategy = ((IWorkerHeartbeatsRecoveryStrategy) targetObj);
+        } else {
+            strategy = new TimeOutWorkerHeartbeatsRecoveryStrategy();
+        }
+
+        strategy.prepare(conf);
+        return strategy;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/0dac58b0/storm-server/src/main/java/org/apache/storm/scheduler/SupervisorDetails.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/SupervisorDetails.java b/storm-server/src/main/java/org/apache/storm/scheduler/SupervisorDetails.java
index 242b54c..00b8ea0 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/SupervisorDetails.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/SupervisorDetails.java
@@ -32,6 +32,10 @@ public class SupervisorDetails {
 
     private final String id;
     /**
+     * thrift server of this supervisor.
+     */
+    private final Integer serverPort;
+    /**
      * hostname of this supervisor.
      */
     private final String host;
@@ -52,16 +56,18 @@ public class SupervisorDetails {
     /**
      * Create the details of a new supervisor.
      * @param id the ID as reported by the supervisor.
+     * @param serverPort the thrift server for the supervisor.
      * @param host the host the supervisor is on.
      * @param meta meta data reported by the supervisor (should be a collection of the ports on the supervisor).
      * @param schedulerMeta Not used and can probably be removed.
      * @param allPorts all of the ports for the supervisor (a better version of meta)
      * @param totalResources all of the resources for this supervisor.
      */
-    public SupervisorDetails(String id, String host, Object meta, Object schedulerMeta,
+    public SupervisorDetails(String id, Integer serverPort, String host, Object meta, Object schedulerMeta,
                              Collection<? extends Number> allPorts, Map<String, Double> totalResources) {
 
         this.id = id;
+        this.serverPort = serverPort;
         this.host = host;
         this.meta = meta;
         this.schedulerMeta = schedulerMeta;
@@ -75,24 +81,29 @@ public class SupervisorDetails {
     }
 
     public SupervisorDetails(String id, Object meta) {
-        this(id, null, meta, null, null, null);
+        this(id, null, null, meta, null, null, null);
     }
 
     public SupervisorDetails(String id, Object meta, Map<String, Double> totalResources) {
-        this(id, null, meta, null, null, totalResources);
+        this(id, null, null, meta, null, null, totalResources);
     }
 
     public SupervisorDetails(String id, Object meta, Collection<? extends Number> allPorts) {
-        this(id, null, meta, null, allPorts, null);
+        this(id, null, null, meta, null, allPorts, null);
     }
 
     public SupervisorDetails(String id, String host, Object schedulerMeta, Collection<? extends Number> allPorts) {
-        this(id, host, null, schedulerMeta, allPorts, null);
+        this(id, null, host, null, schedulerMeta, allPorts, null);
     }
 
     public SupervisorDetails(String id, String host, Object schedulerMeta,
                              Collection<? extends Number> allPorts, Map<String, Double> totalResources) {
-        this(id, host, null, schedulerMeta, allPorts, totalResources);
+        this(id, null, host, null, schedulerMeta, allPorts, totalResources);
+    }
+
+    public SupervisorDetails(String id, int serverPort, String host, Object schedulerMeta,
+                             Collection<? extends Number> allPorts, Map<String, Double> totalResources) {
+        this(id, serverPort, host, null, schedulerMeta, allPorts, totalResources);
     }
     
     @Override
@@ -114,6 +125,10 @@ public class SupervisorDetails {
         return id;
     }
 
+    public int getServerPort() {
+        return serverPort;
+    }
+
     public String getHost() {
         return host;
     }