You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2018/03/28 21:22:19 UTC

[09/10] storm git commit: [STORM-2693] Heartbeats and assignments promotion for storm2.0

[STORM-2693] Heartbeats and assignments promotion for storm2.0


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0dac58b0
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0dac58b0
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0dac58b0

Branch: refs/heads/master
Commit: 0dac58b0aa82133df242b3b2ebeb65bfea7d63cc
Parents: adf4efb
Author: chenyuzhao <ch...@meituan.com>
Authored: Wed Mar 28 23:22:03 2018 +0800
Committer: chenyuzhao <ch...@meituan.com>
Committed: Wed Mar 28 23:22:03 2018 +0800

----------------------------------------------------------------------
 conf/defaults.yaml                              |   16 +
 .../src/jvm/org/apache/storm/Config.java        |   72 +-
 .../src/jvm/org/apache/storm/StormTimer.java    |   24 +-
 .../src/jvm/org/apache/storm/Thrift.java        |   42 +-
 .../assignments/ILocalAssignmentsBackend.java   |  116 +
 .../assignments/InMemoryAssignmentBackend.java  |  158 +
 .../LocalAssignmentsBackendFactory.java         |   48 +
 .../org/apache/storm/cluster/ClusterUtils.java  |   19 +-
 .../org/apache/storm/cluster/DaemonType.java    |    2 +
 .../storm/cluster/IStormClusterState.java       |   78 +-
 .../storm/cluster/StormClusterStateImpl.java    |  124 +-
 .../apache/storm/cluster/ZKStateStorage.java    |   15 +-
 .../storm/cluster/ZKStateStorageFactory.java    |    7 +-
 .../org/apache/storm/daemon/worker/Worker.java  |   57 +-
 .../apache/storm/daemon/worker/WorkerState.java |   49 +-
 .../jvm/org/apache/storm/generated/HBNodes.java |   32 +-
 .../org/apache/storm/generated/HBRecords.java   |   36 +-
 .../jvm/org/apache/storm/generated/Nimbus.java  | 5135 +++++++++++++-----
 .../org/apache/storm/generated/Supervisor.java  | 2968 ++++++++++
 .../storm/generated/SupervisorAssignments.java  |  478 ++
 .../apache/storm/generated/SupervisorInfo.java  |  177 +-
 .../generated/SupervisorWorkerHeartbeat.java    |  660 +++
 .../generated/SupervisorWorkerHeartbeats.java   |  561 ++
 .../storm/generated/WorkerMetricList.java       |   36 +-
 .../storm/generated/WorkerTokenServiceType.java |    5 +-
 .../grouping/LoadAwareShuffleGrouping.java      |    8 +-
 .../apache/storm/security/auth/IAuthorizer.java |    6 +-
 .../security/auth/ThriftConnectionType.java     |    5 +-
 .../auth/authorizer/SimpleACLAuthorizer.java    |    7 +-
 .../SupervisorSimpleACLAuthorizer.java          |  157 +
 .../auth/kerberos/ServerCallbackHandler.java    |    3 +-
 .../jvm/org/apache/storm/stats/StatsUtil.java   |  141 +-
 .../apache/storm/topology/TopologyBuilder.java  |   21 +-
 .../apache/storm/utils/SupervisorClient.java    |   88 +
 storm-client/src/py/storm/Nimbus-remote         |   21 +
 storm-client/src/py/storm/Nimbus.py             |  633 ++-
 storm-client/src/py/storm/Supervisor-remote     |  126 +
 storm-client/src/py/storm/Supervisor.py         |  694 +++
 storm-client/src/py/storm/__init__.py           |    2 +-
 storm-client/src/py/storm/ttypes.py             |  369 +-
 storm-client/src/storm.thrift                   |   46 +-
 .../LocalAssignmentsBackendTest.java            |  102 +
 .../cluster/StormClusterStateImplTest.java      |    5 +-
 .../apache/storm/command/shell_submission.clj   |    2 +-
 .../test/clj/org/apache/storm/nimbus_test.clj   |   57 +-
 .../storm/security/auth/nimbus_auth_test.clj    |    4 +
 .../org/apache/storm/stats/TestStatsUtil.java   |   10 +-
 .../java/org/apache/storm/DaemonConfig.java     |   26 +
 .../java/org/apache/storm/LocalCluster.java     |   19 +
 .../org/apache/storm/daemon/nimbus/Nimbus.java  |  527 +-
 .../storm/daemon/supervisor/BasicContainer.java |   40 +-
 .../supervisor/BasicContainerLauncher.java      |   13 +-
 .../storm/daemon/supervisor/Container.java      |    5 +-
 .../daemon/supervisor/ContainerLauncher.java    |   10 +-
 .../storm/daemon/supervisor/LocalContainer.java |    7 +-
 .../supervisor/LocalContainerLauncher.java      |    7 +-
 .../daemon/supervisor/ReadClusterState.java     |   56 +-
 .../daemon/supervisor/RunAsUserContainer.java   |   19 +-
 .../supervisor/RunAsUserContainerLauncher.java  |   15 +-
 .../storm/daemon/supervisor/Supervisor.java     |  224 +-
 .../daemon/supervisor/SupervisorUtils.java      |   17 +-
 .../timer/ReportWorkerHeartbeats.java           |  112 +
 .../supervisor/timer/SupervisorHeartbeat.java   |   17 +-
 .../timer/SynchronizeAssignments.java           |  133 +
 .../nimbus/AssignmentDistributionService.java   |  312 ++
 .../org/apache/storm/nimbus/ILeaderElector.java |    6 +-
 .../IWorkerHeartbeatsRecoveryStrategy.java      |   48 +
 .../storm/nimbus/LeaderListenerCallback.java    |  231 +
 .../nimbus/NimbusHeartbeatsPressureTest.java    |  241 +
 ...TimeOutWorkerHeartbeatsRecoveryStrategy.java |   80 +
 ...WorkerHeartbeatsRecoveryStrategyFactory.java |   53 +
 .../storm/scheduler/SupervisorDetails.java      |   27 +-
 .../storm/zookeeper/LeaderElectorImp.java       |   12 +-
 .../org/apache/storm/zookeeper/Zookeeper.java   |  145 +-
 .../daemon/supervisor/BasicContainerTest.java   |  244 +-
 .../storm/daemon/supervisor/ContainerTest.java  |   16 +-
 76 files changed, 14135 insertions(+), 1949 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/0dac58b0/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index d64623e..2ed4599 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -77,6 +77,10 @@ topology.max.replication.wait.time.sec: 60
 nimbus.credential.renewers.freq.secs: 600
 nimbus.queue.size: 100000
 scheduler.display.resource: false
+nimbus.local.assignments.backend.class: "org.apache.storm.assignments.InMemoryAssignmentBackend"
+nimbus.assignments.service.threads: 10
+nimbus.assignments.service.thread.queue.size: 100
+nimbus.worker.heartbeats.recovery.strategy.class: "org.apache.storm.nimbus.TimeOutWorkerHeartbeatsRecoveryStrategy"
 
 ### ui.* configs are for the master
 ui.host: 0.0.0.0
@@ -154,6 +158,8 @@ supervisor.worker.shutdown.sleep.secs: 3
 supervisor.monitor.frequency.secs: 3
 #how frequently the supervisor heartbeats to the cluster state (for nimbus)
 supervisor.heartbeat.frequency.secs: 5
+#max timeout for a node worker heartbeats when master gains leadership
+supervisor.worker.heartbeats.max.timeout.secs: 600
 supervisor.enable: true
 supervisor.supervisors: []
 supervisor.supervisors.commands: []
@@ -163,6 +169,13 @@ supervisor.memory.capacity.mb: 4096.0
 # for single threaded bolts
 supervisor.cpu.capacity: 400.0
 
+#Supervisor thrift config
+supervisor.thrift.port: 6628
+supervisor.queue.size: 128
+supervisor.thrift.threads: 16
+supervisor.thrift.max_buffer_size: 1048576
+supervisor.thrift.socket.timeout.ms: 5000
+
 ### worker.* configs are for task workers
 worker.heap.memory.mb: 768
 worker.childopts: "-Xmx%HEAP-MEM%m -XX:+PrintGCDetails -Xloggc:artifacts/gc.log -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=1M -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=artifacts/heapdump"
@@ -182,6 +195,9 @@ worker.log.level.reset.poll.secs: 30
 # control how many worker receiver threads we need per worker
 topology.worker.receiver.thread.count: 1
 
+# Executor metrics reporting interval.
+executor.metrics.frequency.secs: 60
+
 task.heartbeat.frequency.secs: 3
 task.refresh.poll.secs: 10
 task.credentials.poll.secs: 30

http://git-wip-us.apache.org/repos/asf/storm/blob/0dac58b0/storm-client/src/jvm/org/apache/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/Config.java b/storm-client/src/jvm/org/apache/storm/Config.java
index ed6068c..b275985 100644
--- a/storm-client/src/jvm/org/apache/storm/Config.java
+++ b/storm-client/src/jvm/org/apache/storm/Config.java
@@ -1102,6 +1102,13 @@ public class Config extends HashMap<String, Object> {
     public static final String NIMBUS_QUEUE_SIZE = "nimbus.queue.size";
 
     /**
+     * Nimbus assignments backend for storing local assignments. We will use it to store physical plan and runtime storm ids.
+     */
+    @isString
+    @isImplementationOfClass(implementsClass = org.apache.storm.assignments.ILocalAssignmentsBackend.class)
+    public static final String NIMBUS_LOCAL_ASSIGNMENTS_BACKEND_CLASS = "nimbus.local.assignments.backend.class";
+
+    /**
      * The number of threads that should be used by the nimbus thrift server.
      */
     @isInteger
@@ -1411,6 +1418,44 @@ public class Config extends HashMap<String, Object> {
     @isPositiveNumber
     public static final String SUPERVISOR_CPU_CAPACITY = "supervisor.cpu.capacity";
 
+    @isInteger
+    @isPositiveNumber
+    /**
+     * Port used for supervisor thrift server.
+     */
+    public static final String SUPERVISOR_THRIFT_PORT = "supervisor.thrift.port";
+
+    @isString
+    /**
+     * The Supervisor invocations transport plug-in for Thrift client/server communication.
+     */
+    public static final String SUPERVISOR_THRIFT_TRANSPORT_PLUGIN = "supervisor.thrift.transport";
+
+    @isInteger
+    @isPositiveNumber
+    /**
+     * Supervisor thrift server queue size.
+     */
+    public static final String SUPERVISOR_QUEUE_SIZE = "supervisor.queue.size";
+
+    @isInteger
+    @isPositiveNumber
+    /**
+     * The number of threads that should be used by the supervisor thrift server.
+     */
+    public static final String SUPERVISOR_THRIFT_THREADS = "supervisor.thrift.threads";
+
+    @isNumber
+    @isPositiveNumber
+    public static final String SUPERVISOR_THRIFT_MAX_BUFFER_SIZE = "supervisor.thrift.max_buffer_size";
+
+    /**
+     * How long before a supervisor Thrift Client socket hangs before timeout
+     * and restart the socket.
+     */
+    @isInteger
+    public static final String SUPERVISOR_THRIFT_SOCKET_TIMEOUT_MS = "supervisor.thrift.socket.timeout.ms";
+
     /**
      * A map of resources the Supervisor has e.g {"cpu.pcore.percent" : 200.0. "onheap.memory.mb": 256.0, "gpu.count" : 2.0 }
      */
@@ -1524,6 +1569,12 @@ public class Config extends HashMap<String, Object> {
     public static final String SUPERVISOR_RUN_WORKER_AS_USER = "supervisor.run.worker.as.user";
 
     /**
+     * max timeout for supervisor reported heartbeats when master gains leadership
+     */
+    @isInteger
+    public static final String SUPERVISOR_WORKER_HEARTBEATS_MAX_TIMEOUT_SECS = "supervisor.worker.heartbeats.max.timeout.secs";
+
+    /**
      * On some systems (windows for example) symlinks require special privileges that not everyone wants to
      * grant a headless user.  You can completely disable the use of symlinks by setting this config to true, but
      * by doing so you may also lose some features from storm.  For example the blobstore feature
@@ -1572,12 +1623,20 @@ public class Config extends HashMap<String, Object> {
     /**
      * A list of users that run the supervisors and should be authorized to interact with
      * nimbus as a supervisor would.  To use this set
-     * nimbus.authorizer to org.apache.storm.security.auth.authorizer.SimpleACLAuthorizer
+     * nimbus.authorizer to org.apache.storm.security.auth.authorizer.SimpleACLAuthorizer.
      */
     @isStringList
     public static final String NIMBUS_SUPERVISOR_USERS = "nimbus.supervisor.users";
 
     /**
+     * A list of users that nimbus runs as and should be authorized to interact with
+     * the supervisor as nimbus would. To use this set supervisor.authorizer to
+     * org.apache.storm.security.auth.authorizer.SupervisorSimpleACLAuthorizer.
+     */
+    @isStringList
+    public static final String NIMBUS_DAEMON_USERS = "nimbus.daemon.users";
+
+    /**
      * A list of users that are cluster admins and can run any command.  To use this set
      * nimbus.authorizer to org.apache.storm.security.auth.authorizer.SimpleACLAuthorizer
      */
@@ -1781,10 +1840,19 @@ public class Config extends HashMap<String, Object> {
     public static final String WORKER_HEARTBEAT_FREQUENCY_SECS = "worker.heartbeat.frequency.secs";
 
     /**
-     * How often a task should heartbeat its status to the master.
+     * How often executor metrics should report to master, used for RPC heartbeat mode.
      */
     @isInteger
     @isPositiveNumber
+    public static final String EXECUTOR_METRICS_FREQUENCY_SECS = "executor.metrics.frequency.secs";
+
+    /**
+     * How often a task should heartbeat its status to the master,
+     * deprecated for 2.0 RPC heartbeat reporting, see {@code EXECUTOR_METRICS_FREQUENCY_SECS }.
+     */
+    @Deprecated
+    @isInteger
+    @isPositiveNumber
     public static final String TASK_HEARTBEAT_FREQUENCY_SECS = "task.heartbeat.frequency.secs";
 
     /**

http://git-wip-us.apache.org/repos/asf/storm/blob/0dac58b0/storm-client/src/jvm/org/apache/storm/StormTimer.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/StormTimer.java b/storm-client/src/jvm/org/apache/storm/StormTimer.java
index 0f54ce1..72b20d3 100644
--- a/storm-client/src/jvm/org/apache/storm/StormTimer.java
+++ b/storm-client/src/jvm/org/apache/storm/StormTimer.java
@@ -18,19 +18,19 @@
 
 package org.apache.storm;
 
-import org.apache.storm.utils.Time;
-import org.apache.storm.utils.Utils;
-
 import java.nio.channels.ClosedByInterruptException;
 import java.util.Comparator;
 import java.util.Random;
 import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+
 /**
  * The timer defined in this file is very similar to java.util.Timer, except
  * it integrates with Storm's time simulation capabilities. This lets us test
- * code that does asynchronous work on the timer thread
+ * code that does asynchronous work on the timer thread.
  */
 
 public class StormTimer implements AutoCloseable {
@@ -131,7 +131,7 @@ public class StormTimer implements AutoCloseable {
     private StormTimerTask task = new StormTimerTask();
 
     /**
-     * Makes a Timer in the form of a StormTimerTask Object
+     * Makes a Timer in the form of a StormTimerTask Object.
      * @param name name of the timer
      * @param onKill function to call when timer is killed unexpectedly
      * @return StormTimerTask object that was initialized
@@ -154,7 +154,7 @@ public class StormTimer implements AutoCloseable {
     }
 
     /**
-     * Schedule a function to be executed in the timer
+     * Schedule a function to be executed in the timer.
      * @param delaySecs the number of seconds to delay before running the function
      * @param func the function to run
      * @param checkActive whether to check is the timer is active
@@ -195,7 +195,7 @@ public class StormTimer implements AutoCloseable {
     }
 
     /**
-     * Schedule a function to run recurrently
+     * Schedule a function to run recurrently.
      * @param delaySecs the number of seconds to delay before running the function
      * @param recurSecs the time between each invocation
      * @param func the function to run
@@ -212,7 +212,7 @@ public class StormTimer implements AutoCloseable {
     }
 
     /**
-     * Schedule a function to run recurrently
+     * Schedule a function to run recurrently.
      * @param delayMs the number of millis to delay before running the function
      * @param recurMs the time between each invocation
      * @param func the function to run
@@ -230,7 +230,7 @@ public class StormTimer implements AutoCloseable {
 
 
     /**
-     * schedule a function to run recurrently with jitter
+     * Schedule a function to run recurrently with jitter.
      * @param delaySecs the number of seconds to delay before running the function
      * @param recurSecs the time between each invocation
      * @param jitterMs jitter added to the run
@@ -248,7 +248,7 @@ public class StormTimer implements AutoCloseable {
     }
 
     /**
-     * check if timer is active
+     * check if timer is active.
      */
     private void checkActive() {
         if (!this.task.isActive()) {
@@ -257,7 +257,7 @@ public class StormTimer implements AutoCloseable {
     }
 
     /**
-     * cancel timer
+     * cancel timer.
      */
 
     @Override
@@ -270,7 +270,7 @@ public class StormTimer implements AutoCloseable {
     }
 
     /**
-     * is timer waiting. Used in timer simulation
+     * is timer waiting. Used in timer simulation.
      */
     public boolean isTimerWaiting() {
         return Time.isThreadWaiting(task);

http://git-wip-us.apache.org/repos/asf/storm/blob/0dac58b0/storm-client/src/jvm/org/apache/storm/Thrift.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/Thrift.java b/storm-client/src/jvm/org/apache/storm/Thrift.java
index e00b0ee..85f8a85 100644
--- a/storm-client/src/jvm/org/apache/storm/Thrift.java
+++ b/storm-client/src/jvm/org/apache/storm/Thrift.java
@@ -15,46 +15,44 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.storm;
 
-import org.apache.storm.generated.Bolt;
-import org.apache.storm.generated.JavaObjectArg;
-import org.apache.storm.generated.SpoutSpec;
-import org.apache.storm.generated.StateSpoutSpec;
-import org.apache.storm.generated.StreamInfo;
+package org.apache.storm;
 
+import java.io.Serializable;
 import java.lang.reflect.Constructor;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.List;
 import java.util.HashMap;
-import java.io.Serializable;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 
-import org.apache.storm.generated.JavaObject;
+import org.apache.storm.generated.Bolt;
+import org.apache.storm.generated.ComponentCommon;
+import org.apache.storm.generated.ComponentObject;
+import org.apache.storm.generated.GlobalStreamId;
 import org.apache.storm.generated.Grouping;
+import org.apache.storm.generated.JavaObject;
+import org.apache.storm.generated.JavaObjectArg;
+import org.apache.storm.generated.NullStruct;
+import org.apache.storm.generated.SpoutSpec;
+import org.apache.storm.generated.StateSpoutSpec;
 import org.apache.storm.generated.StormTopology;
 import org.apache.storm.generated.StormTopology._Fields;
-import org.apache.storm.generated.ComponentCommon;
-import org.apache.storm.generated.NullStruct;
-import org.apache.storm.generated.GlobalStreamId;
-import org.apache.storm.generated.ComponentObject;
-
+import org.apache.storm.generated.StreamInfo;
 import org.apache.storm.task.IBolt;
 import org.apache.storm.topology.BoltDeclarer;
-import org.apache.storm.topology.IRichBolt;
 import org.apache.storm.topology.IBasicBolt;
+import org.apache.storm.topology.IRichBolt;
 import org.apache.storm.topology.IRichSpout;
 import org.apache.storm.topology.SpoutDeclarer;
+import org.apache.storm.topology.TopologyBuilder;
 import org.apache.storm.utils.Utils;
 import org.json.simple.JSONValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.storm.topology.TopologyBuilder;
-
 public class Thrift {
     private static Logger LOG = LoggerFactory.getLogger(Thrift.class);
 
@@ -328,19 +326,19 @@ public class Thrift {
     public static StormTopology buildTopology(Map<String, SpoutDetails> spoutMap, Map<String, BoltDetails> boltMap) {
         TopologyBuilder builder = new TopologyBuilder();
         for (Entry<String, SpoutDetails> entry : spoutMap.entrySet()) {
-            String spoutID = entry.getKey();
+            String spoutId = entry.getKey();
             SpoutDetails spec = entry.getValue();
-            SpoutDeclarer spoutDeclarer = builder.setSpout(spoutID, spec.getSpout(), spec.getParallelism());
+            SpoutDeclarer spoutDeclarer = builder.setSpout(spoutId, spec.getSpout(), spec.getParallelism());
             spoutDeclarer.addConfigurations(spec.getConf());
         }
         for (Entry<String, BoltDetails> entry : boltMap.entrySet()) {
-            String spoutID = entry.getKey();
+            String spoutId = entry.getKey();
             BoltDetails spec = entry.getValue();
             BoltDeclarer boltDeclarer = null;
             if (spec.bolt instanceof IRichBolt) {
-                boltDeclarer = builder.setBolt(spoutID, (IRichBolt)spec.getBolt(), spec.getParallelism());
+                boltDeclarer = builder.setBolt(spoutId, (IRichBolt)spec.getBolt(), spec.getParallelism());
             } else {
-                boltDeclarer = builder.setBolt(spoutID, (IBasicBolt)spec.getBolt(), spec.getParallelism());
+                boltDeclarer = builder.setBolt(spoutId, (IBasicBolt)spec.getBolt(), spec.getParallelism());
             }
             boltDeclarer.addConfigurations(spec.getConf());
             addInputs(boltDeclarer, spec.getInputs());

http://git-wip-us.apache.org/repos/asf/storm/blob/0dac58b0/storm-client/src/jvm/org/apache/storm/assignments/ILocalAssignmentsBackend.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/assignments/ILocalAssignmentsBackend.java b/storm-client/src/jvm/org/apache/storm/assignments/ILocalAssignmentsBackend.java
new file mode 100644
index 0000000..10b0b0f
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/assignments/ILocalAssignmentsBackend.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.assignments;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.storm.generated.Assignment;
+
+/**
+ * Interface for storing local assignments.
+ */
+public interface ILocalAssignmentsBackend extends AutoCloseable{
+    /**
+     * Decide if the assignments is synchronized from remote state-store.
+     */
+    boolean isSynchronized();
+
+    /**
+     * Mark this backend as synchronized when sync work is done.
+     */
+    void setSynchronized();
+
+    /**
+     * Initial function for creating backend.
+     * @param conf config
+     */
+    void prepare(Map conf);
+
+    /**
+     * Keep a storm assignment to local state or update old assignment.
+     * @param stormId storm runtime id
+     * @param assignment assignment as thrift
+     */
+    void keepOrUpdateAssignment(String stormId, Assignment assignment);
+
+    /**
+     * Get assignment as {@link Assignment} for a storm.
+     * @param stormId storm runtime id
+     * @return assignment
+     */
+    Assignment getAssignment(String stormId);
+
+    void removeAssignment(String stormId);
+
+    /**
+     * List all the storm runtime ids of local assignments.
+     * @return a list of storm ids
+     */
+    List<String> assignments();
+
+    /**
+     * Get all the local assignments of local state.
+     * @return mapping of storm-id -> assignment
+     */
+    Map<String, Assignment> assignmentsInfo();
+
+    /**
+     * Sync remote assignments to local, if remote is null, we will sync it from zk.
+     * @param remote specific remote assignments, if it is null, it will sync from zookeeper[only used for nimbus]
+     */
+    void syncRemoteAssignments(Map<String, byte[]> remote);
+
+    /**
+     * Keep a mapping storm-name -> storm-id to local state.
+     * @param stormName storm name
+     * @param stormId storm runtime id
+     */
+    void keepStormId(String stormName, String stormId);
+
+    /**
+     * Get storm runtime id from local.
+     * @param stormName name of a storm
+     * @return runtime storm id
+     */
+    String getStormId(String stormName);
+
+    /**
+     * Sync remote storm ids to local, will just used for nimbus.
+     * @param remote remote ids from state store
+     */
+    void syncRemoteIds(Map<String, String> remote);
+
+    /**
+     * Delete a local cache of stormId which is mapped to a specific storm name.
+     * @param stormName storm name
+     */
+    void deleteStormId(String stormName);
+
+    /**
+     * Clear all the state for a storm.
+     * @param stormId storm id
+     */
+    void clearStateForStorm(String stormId);
+
+    /**
+     * Function to release resource.
+     */
+    void close();
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/0dac58b0/storm-client/src/jvm/org/apache/storm/assignments/InMemoryAssignmentBackend.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/assignments/InMemoryAssignmentBackend.java b/storm-client/src/jvm/org/apache/storm/assignments/InMemoryAssignmentBackend.java
new file mode 100644
index 0000000..44cf6d2
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/assignments/InMemoryAssignmentBackend.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.assignments;
+
+import java.util.ArrayList;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.storm.cluster.ClusterUtils;
+import org.apache.storm.generated.Assignment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An assignment backend which will keep all assignments and id-info in memory. Only used if no backend is specified
+ * internal.
+ * <p>About thread safe: idToAssignment,idToName,nameToId are all memory cache in nimbus local, for
+ * <ul>
+ *     <li>idToAssignment: nimbus will modify it and supervisors will sync it at fixed interval,
+ *         so the assignments would come to eventual consistency.</li>
+ *     <li>idToName: storm submitting/killing is guarded by the same lock, a {@link ConcurrentHashMap} is ok.</li>
+ *     <li>nameToId: same as <i>idToName</i>.
+ * </ul>
+ */
+public class InMemoryAssignmentBackend implements ILocalAssignmentsBackend {
+    private static final Logger LOG = LoggerFactory.getLogger(InMemoryAssignmentBackend.class);
+
+    private Map<String, Assignment> idToAssignment;
+    private Map<String, String> idToName;
+    private Map<String, String> nameToId;
+    private volatile boolean isSynchronized = false;
+
+    @Override
+    public boolean isSynchronized() {
+        return this.isSynchronized;
+    }
+
+    @Override
+    public void setSynchronized() {
+        this.isSynchronized = true;
+    }
+
+    @Override
+    public void prepare(Map conf) {
+        // do nothing for conf now
+        this.idToAssignment = new ConcurrentHashMap<>();
+        this.idToName = new ConcurrentHashMap<>();
+        this.nameToId = new ConcurrentHashMap<>();
+    }
+
+    @Override
+    public void keepOrUpdateAssignment(String stormId, Assignment assignment) {
+        this.idToAssignment.put(stormId, assignment);
+    }
+
+    @Override
+    public Assignment getAssignment(String stormId) {
+        return this.idToAssignment.get(stormId);
+    }
+
+    @Override
+    public void removeAssignment(String stormId) {
+        this.idToAssignment.remove(stormId);
+    }
+
+    @Override
+    public List<String> assignments() {
+        if(idToAssignment == null) {
+            return new ArrayList<>();
+        }
+        List<String> ret = new ArrayList<>();
+        ret.addAll(this.idToAssignment.keySet());
+        return ret;
+    }
+
+    @Override
+    public Map<String, Assignment> assignmentsInfo() {
+        Map<String, Assignment> ret = new HashMap<>();
+        ret.putAll(this.idToAssignment);
+
+        return ret;
+    }
+
+    @Override
+    public void syncRemoteAssignments(Map<String, byte[]> remote) {
+        Map<String, Assignment> tmp = new ConcurrentHashMap<>();
+        for(Map.Entry<String, byte[]> entry: remote.entrySet()) {
+            tmp.put(entry.getKey(), ClusterUtils.maybeDeserialize(entry.getValue(), Assignment.class));
+        }
+        this.idToAssignment = tmp;
+    }
+
+    @Override
+    public void keepStormId(String stormName, String stormId) {
+        this.nameToId.put(stormName, stormId);
+        this.idToName.put(stormId, stormName);
+    }
+
+    @Override
+    public String getStormId(String stormName) {
+        return this.nameToId.get(stormName);
+    }
+
+    @Override
+    public void syncRemoteIds(Map<String, String> remote) {
+        Map<String, String> tmpNameToId = new ConcurrentHashMap<>();
+        Map<String, String> tmpIdToName = new ConcurrentHashMap<>();
+        for(Map.Entry<String, String> entry: remote.entrySet()) {
+            tmpIdToName.put(entry.getKey(), entry.getValue());
+            tmpNameToId.put(entry.getValue(), entry.getKey());
+        }
+        this.idToName = tmpIdToName;
+        this.nameToId = tmpNameToId;
+    }
+
+    @Override
+    public void deleteStormId(String stormName) {
+        String id = this.nameToId.remove(stormName);
+        if (null != id) {
+            this.idToName.remove(id);
+        }
+    }
+
+    @Override
+    public void clearStateForStorm(String stormId) {
+        this.idToAssignment.remove(stormId);
+
+        String name = this.idToName.remove(stormId);
+        if (null != name) {
+            this.nameToId.remove(name);
+        }
+    }
+
+    @Override
+    public void close() {
+        this.idToAssignment = null;
+        this.nameToId = null;
+        this.idToName = null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/0dac58b0/storm-client/src/jvm/org/apache/storm/assignments/LocalAssignmentsBackendFactory.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/assignments/LocalAssignmentsBackendFactory.java b/storm-client/src/jvm/org/apache/storm/assignments/LocalAssignmentsBackendFactory.java
new file mode 100644
index 0000000..f3110f1
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/assignments/LocalAssignmentsBackendFactory.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.assignments;
+
+import com.google.common.base.Preconditions;
+import org.apache.storm.Config;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.ReflectionUtils;
+
+import java.util.Map;
+
+/**
+ * Factory class for creating local assignments.
+ */
+public class LocalAssignmentsBackendFactory {
+
+    public static ILocalAssignmentsBackend getBackend(Map<String, Object> conf) {
+        if (conf.get(Config.NIMBUS_LOCAL_ASSIGNMENTS_BACKEND_CLASS) != null) {
+            Object targetObj = ReflectionUtils.newInstance((String) conf.get(Config.NIMBUS_LOCAL_ASSIGNMENTS_BACKEND_CLASS));
+            Preconditions.checkState(targetObj instanceof ILocalAssignmentsBackend, "{} must implements ILocalAssignmentsBackend", Config.NIMBUS_LOCAL_ASSIGNMENTS_BACKEND_CLASS);
+            ((ILocalAssignmentsBackend)targetObj).prepare(conf);
+            return (ILocalAssignmentsBackend) targetObj;
+        }
+
+        return getDefault();
+    }
+
+    public static ILocalAssignmentsBackend getDefault() {
+        ILocalAssignmentsBackend backend = new InMemoryAssignmentBackend();
+        backend.prepare(ConfigUtils.readStormConfig());
+        return backend;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/0dac58b0/storm-client/src/jvm/org/apache/storm/cluster/ClusterUtils.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/ClusterUtils.java b/storm-client/src/jvm/org/apache/storm/cluster/ClusterUtils.java
index 4bd5ded..b3dfc7d 100644
--- a/storm-client/src/jvm/org/apache/storm/cluster/ClusterUtils.java
+++ b/storm-client/src/jvm/org/apache/storm/cluster/ClusterUtils.java
@@ -18,6 +18,8 @@
 package org.apache.storm.cluster;
 
 import org.apache.storm.Config;
+import org.apache.storm.assignments.ILocalAssignmentsBackend;
+import org.apache.storm.assignments.LocalAssignmentsBackendFactory;
 import org.apache.storm.generated.ClusterWorkerHeartbeat;
 import org.apache.storm.generated.ExecutorInfo;
 import org.apache.storm.generated.ExecutorStats;
@@ -49,6 +51,7 @@ public class ClusterUtils {
     public static final String SUPERVISORS_ROOT = "supervisors";
     public static final String WORKERBEATS_ROOT = "workerbeats";
     public static final String BACKPRESSURE_ROOT = "backpressure";
+    public static final String LEADERINFO_ROOT = "leader-info";
     public static final String ERRORS_ROOT = "errors";
     public static final String BLOBSTORE_ROOT = "blobstore";
     public static final String BLOBSTORE_MAX_KEY_SEQUENCE_NUMBER_ROOT = "blobstoremaxkeysequencenumber";
@@ -63,6 +66,7 @@ public class ClusterUtils {
     public static final String SUPERVISORS_SUBTREE = ZK_SEPERATOR + SUPERVISORS_ROOT;
     public static final String WORKERBEATS_SUBTREE = ZK_SEPERATOR + WORKERBEATS_ROOT;
     public static final String BACKPRESSURE_SUBTREE = ZK_SEPERATOR + BACKPRESSURE_ROOT;
+    public static final String LEADERINFO_SUBTREE = ZK_SEPERATOR + LEADERINFO_ROOT;
     public static final String ERRORS_SUBTREE = ZK_SEPERATOR + ERRORS_ROOT;
     public static final String BLOBSTORE_SUBTREE = ZK_SEPERATOR + BLOBSTORE_ROOT;
     public static final String BLOBSTORE_MAX_KEY_SEQUENCE_NUMBER_SUBTREE = ZK_SEPERATOR + BLOBSTORE_MAX_KEY_SEQUENCE_NUMBER_ROOT;
@@ -252,12 +256,13 @@ public class ClusterUtils {
         return executorWhb;
     }
 
-    public IStormClusterState mkStormClusterStateImpl(Object stateStorage, ClusterStateContext context) throws Exception {
+    public IStormClusterState mkStormClusterStateImpl(Object stateStorage, ILocalAssignmentsBackend backend, ClusterStateContext context) throws Exception {
         if (stateStorage instanceof IStateStorage) {
-            return new StormClusterStateImpl((IStateStorage) stateStorage, context, false);
+            return new StormClusterStateImpl((IStateStorage) stateStorage, backend, context, false);
         } else {
-            IStateStorage Storage = _instance.mkStateStorageImpl((Map<String, Object>) stateStorage, (Map<String, Object>) stateStorage, context);
-            return new StormClusterStateImpl(Storage, context, true);
+            IStateStorage Storage = _instance.mkStateStorageImpl((Map<String, Object>) stateStorage,
+                (Map<String, Object>) stateStorage, context);
+            return new StormClusterStateImpl(Storage, backend, context, true);
         }
     }
 
@@ -279,8 +284,12 @@ public class ClusterUtils {
         return _instance.mkStateStorageImpl(config, auth_conf, context);
     }
 
+    public static IStormClusterState mkStormClusterState(Object StateStorage, ILocalAssignmentsBackend backend, ClusterStateContext context) throws Exception {
+        return _instance.mkStormClusterStateImpl(StateStorage, backend, context);
+    }
+
     public static IStormClusterState mkStormClusterState(Object StateStorage, ClusterStateContext context) throws Exception {
-        return _instance.mkStormClusterStateImpl(StateStorage, context);
+        return _instance.mkStormClusterStateImpl(StateStorage, LocalAssignmentsBackendFactory.getDefault(), context);
     }
 
     public static String stringifyError(Throwable error) {

http://git-wip-us.apache.org/repos/asf/storm/blob/0dac58b0/storm-client/src/jvm/org/apache/storm/cluster/DaemonType.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/DaemonType.java b/storm-client/src/jvm/org/apache/storm/cluster/DaemonType.java
index 8593b7b..4f3865e 100644
--- a/storm-client/src/jvm/org/apache/storm/cluster/DaemonType.java
+++ b/storm-client/src/jvm/org/apache/storm/cluster/DaemonType.java
@@ -57,6 +57,8 @@ public enum DaemonType {
             }
             switch (type) {
                 case NIMBUS:
+                    //Fall through on purpose
+                case SUPERVISOR:
                     return ZooDefs.Ids.CREATOR_ALL_ACL;
                 case DRPC:
                     List<ACL> ret = new ArrayList<>(ZooDefs.Ids.CREATOR_ALL_ACL);

http://git-wip-us.apache.org/repos/asf/storm/blob/0dac58b0/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java b/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java
index ab893be..8a922ac 100644
--- a/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java
+++ b/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java
@@ -42,8 +42,46 @@ import org.apache.storm.nimbus.NimbusInfo;
 public interface IStormClusterState {
     List<String> assignments(Runnable callback);
 
+    /**
+     * Get the assignment based on storm id from local backend.
+     * @param stormId topology id
+     * @param callback callback function
+     * @return {@link Assignment}
+     */
     Assignment assignmentInfo(String stormId, Runnable callback);
 
+    /**
+     * Get the assignment based on storm id from remote state store, eg: ZK.
+     * @param stormId topology id
+     * @param callback callback function
+     * @return {@link Assignment}
+     */
+    Assignment remoteAssignmentInfo(String stormId, Runnable callback);
+
+    /**
+     * Get all the topologies assignments mapping stormId -> Assignment from local backend.
+     * @return stormId -> Assignment mapping
+     */
+    Map<String, Assignment> assignmentsInfo();
+
+    /**
+     * Sync the remote state store assignments to local backend, used when master gains leadership, see
+     * {@link LeaderListenerCallback}
+     * @param remote assigned assignments for a specific {@link IStormClusterState} instance, usually a supervisor/node.
+     */
+    void syncRemoteAssignments(Map<String, byte[]> remote);
+
+    /**
+     * Flag to indicate if the assignments synced successfully, see {@link #syncRemoteAssignments(Map)}.
+     * @return true if is synced successfully
+     */
+    boolean isAssignmentsBackendSynchronized();
+
+    /**
+     * Mark the assignments as synced successfully, see {@link #isAssignmentsBackendSynchronized()}
+     */
+    void setAssignmentsBackendSynchronized();
+
     VersionedData<Assignment> assignmentInfoWithVersion(String stormId, Runnable callback);
 
     Integer assignmentVersion(String stormId, Runnable callback) throws Exception;
@@ -64,6 +102,19 @@ public interface IStormClusterState {
      */
     StormBase stormBase(String stormId, Runnable callback);
 
+    /**
+     * Get storm id from passed name, null if the name doesn't exist on cluster.
+     * @param stormName storm name
+     * @return storm id
+     */
+    String stormId(String stormName);
+
+    /**
+     * Sync all the active storm ids of the cluster, used now when master gains leadership.
+     * @param ids stormName -> stormId mapping
+     */
+    void syncRemoteIds(Map<String, String> ids);
+
     ClusterWorkerHeartbeat getWorkerHeartbeat(String stormId, String node, Long port);
 
     List<ProfileRequest> getWorkerProfileRequests(String stormId, NodeInfo nodeInfo);
@@ -94,6 +145,15 @@ public interface IStormClusterState {
     @Deprecated
     List<String> backpressureTopologies();
 
+    /**
+     * Get leader info from state store, which was written when a master gains leadership.
+     * <p>Caution: it can not be used for fencing and is only for informational purposes because we use ZK as our
+     * backend now, which could have a overdue info of nodes.
+     * @param callback callback func
+     * @return {@link NimbusInfo}
+     */
+    NimbusInfo getLeader(Runnable callback);
+
     void setTopologyLogConfig(String stormId, LogConfig logConfig);
 
     LogConfig topologyLogConfig(String stormId, Runnable cb);
@@ -233,23 +293,7 @@ public interface IStormClusterState {
      * @return the id of the topology or null if it is not alive.
      */
     default Optional<String> getTopoId(final String topologyName) {
-        String ret = null;
-        for (String topoId: activeStorms()) {
-            StormBase base = stormBase(topoId, null);
-            if (base != null && topologyName.equals(base.get_name())) {
-                ret = topoId;
-                break;
-            }
-        }
-        return Optional.ofNullable(ret);
-    }
-    
-    default Map<String, Assignment> topologyAssignments() {
-        Map<String, Assignment> ret = new HashMap<>();
-        for (String topoId: assignments(null)) {
-            ret.put(topoId, assignmentInfo(topoId, null));
-        }
-        return ret;
+        return Optional.ofNullable(stormId(topologyName));
     }
     
     default Map<String, StormBase> topologyBases() {

http://git-wip-us.apache.org/repos/asf/storm/blob/0dac58b0/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java b/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
index 719ebbf..fc02b8e 100644
--- a/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
+++ b/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
@@ -20,6 +20,8 @@ package org.apache.storm.cluster;
 
 import java.nio.ByteBuffer;
 import java.security.NoSuchAlgorithmException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
@@ -28,12 +30,12 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicReference;
+
 import org.apache.commons.lang.StringUtils;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.storm.assignments.ILocalAssignmentsBackend;
 import org.apache.storm.callback.ZKStateChangedCallback;
 import org.apache.storm.generated.Assignment;
 import org.apache.storm.generated.ClusterWorkerHeartbeat;
@@ -64,6 +66,7 @@ public class StormClusterStateImpl implements IStormClusterState {
     private static Logger LOG = LoggerFactory.getLogger(StormClusterStateImpl.class);
 
     private IStateStorage stateStorage;
+    private ILocalAssignmentsBackend assignmentsBackend;
 
     private ConcurrentHashMap<String, Runnable> assignmentInfoCallback;
     private ConcurrentHashMap<String, Runnable> assignmentInfoWithVersionCallback;
@@ -71,7 +74,7 @@ public class StormClusterStateImpl implements IStormClusterState {
     private AtomicReference<Runnable> supervisorsCallback;
     // we want to register a topo directory getChildren callback for all workers of this dir
     private ConcurrentHashMap<String, Runnable> backPressureCallback;
-
+    private AtomicReference<Runnable> leaderInfoCallback;
     private AtomicReference<Runnable> assignmentsCallback;
     private ConcurrentHashMap<String, Runnable> stormBaseCallback;
     private AtomicReference<Runnable> blobstoreCallback;
@@ -83,18 +86,20 @@ public class StormClusterStateImpl implements IStormClusterState {
     private final boolean solo;
     private final ClusterStateContext context;
 
-    public StormClusterStateImpl(IStateStorage StateStorage, ClusterStateContext context, boolean solo) throws Exception {
+    public StormClusterStateImpl(IStateStorage StateStorage, ILocalAssignmentsBackend assignmentsassignmentsBackend,
+        ClusterStateContext context, boolean solo) throws Exception {
 
         this.stateStorage = StateStorage;
         this.solo = solo;
         this.defaultAcls = context.getDefaultZkAcls();
         this.context = context;
-
+        this.assignmentsBackend = assignmentsassignmentsBackend;
         assignmentInfoCallback = new ConcurrentHashMap<>();
         assignmentInfoWithVersionCallback = new ConcurrentHashMap<>();
         assignmentVersionCallback = new ConcurrentHashMap<>();
         supervisorsCallback = new AtomicReference<>();
         backPressureCallback = new ConcurrentHashMap<>();
+        leaderInfoCallback = new AtomicReference<>();
         assignmentsCallback = new AtomicReference<>();
         stormBaseCallback = new ConcurrentHashMap<>();
         credentialsCallback = new ConcurrentHashMap<>();
@@ -130,6 +135,8 @@ public class StormClusterStateImpl implements IStormClusterState {
                         issueMapCallback(logConfigCallback, toks.get(1));
                     } else if (root.equals(ClusterUtils.BACKPRESSURE_ROOT) && size > 1) {
                         issueMapCallback(backPressureCallback, toks.get(1));
+                    } else if (root.equals(ClusterUtils.LEADERINFO_ROOT)) {
+                        issueCallback(leaderInfoCallback);
                     } else {
                         LOG.error("{} Unknown callback for subtree {}", new RuntimeException("Unknown callback for this path"), path);
                         Runtime.getRuntime().exit(30);
@@ -159,14 +166,16 @@ public class StormClusterStateImpl implements IStormClusterState {
 
     protected void issueCallback(AtomicReference<Runnable> cb) {
         Runnable callback = cb.getAndSet(null);
-        if (callback != null)
+        if (callback != null) {
             callback.run();
+        }
     }
 
     protected void issueMapCallback(ConcurrentHashMap<String, Runnable> callbackConcurrentHashMap, String key) {
         Runnable callback = callbackConcurrentHashMap.remove(key);
-        if (callback != null)
+        if (callback != null) {
             callback.run();
+        }
     }
 
     @Override
@@ -174,7 +183,7 @@ public class StormClusterStateImpl implements IStormClusterState {
         if (callback != null) {
             assignmentsCallback.set(callback);
         }
-        return stateStorage.get_children(ClusterUtils.ASSIGNMENTS_SUBTREE, callback != null);
+        return this.assignmentsBackend.assignments();
     }
 
     @Override
@@ -182,11 +191,49 @@ public class StormClusterStateImpl implements IStormClusterState {
         if (callback != null) {
             assignmentInfoCallback.put(stormId, callback);
         }
+        return this.assignmentsBackend.getAssignment(stormId);
+    }
+
+    @Override
+    public Assignment remoteAssignmentInfo(String stormId, Runnable callback) {
+        if (callback != null) {
+            assignmentInfoCallback.put(stormId, callback);
+        }
         byte[] serialized = stateStorage.get_data(ClusterUtils.assignmentPath(stormId), callback != null);
         return ClusterUtils.maybeDeserialize(serialized, Assignment.class);
     }
 
     @Override
+    public Map<String, Assignment> assignmentsInfo() {
+        return this.assignmentsBackend.assignmentsInfo();
+    }
+
+    @Override
+    public void syncRemoteAssignments(Map<String, byte[]> remote) {
+        if (null != remote) {
+            this.assignmentsBackend.syncRemoteAssignments(remote);
+        } else {
+            Map<String, byte[]> tmp = new HashMap<>();
+            List<String> stormIds = this.stateStorage.get_children(ClusterUtils.ASSIGNMENTS_SUBTREE, false);
+            for (String stormId : stormIds) {
+                byte[] assignment = this.stateStorage.get_data(ClusterUtils.assignmentPath(stormId), false);
+                tmp.put(stormId, assignment);
+            }
+            this.assignmentsBackend.syncRemoteAssignments(tmp);
+        }
+    }
+
+    @Override
+    public boolean isAssignmentsBackendSynchronized() {
+        return this.assignmentsBackend.isSynchronized();
+    }
+
+    @Override
+    public void setAssignmentsBackendSynchronized() {
+        this.assignmentsBackend.setSynchronized();
+    }
+
+    @Override
     public VersionedData<Assignment> assignmentInfoWithVersion(String stormId, Runnable callback) {
         if (callback != null) {
             assignmentInfoWithVersionCallback.put(stormId, callback);
@@ -268,6 +315,25 @@ public class StormClusterStateImpl implements IStormClusterState {
     }
 
     @Override
+    public String stormId(String stormName) {
+        return this.assignmentsBackend.getStormId(stormName);
+    }
+
+    @Override
+    public void syncRemoteIds(Map<String, String> remote) {
+        if (null != remote) {
+            this.assignmentsBackend.syncRemoteIds(remote);
+        }else {
+            Map<String, String> tmp = new HashMap<>();
+            List<String> activeStorms = activeStorms();
+            for (String stormId: activeStorms) {
+                tmp.put(stormId, stormBase(stormId, null).get_name());
+            }
+            this.assignmentsBackend.syncRemoteIds(tmp);
+        }
+    }
+
+    @Override
     public ClusterWorkerHeartbeat getWorkerHeartbeat(String stormId, String node, Long port) {
         byte[] bytes = stateStorage.get_worker_hb(ClusterUtils.workerbeatPath(stormId, node, port), false);
         return ClusterUtils.maybeDeserialize(bytes, ClusterWorkerHeartbeat.class);
@@ -280,8 +346,9 @@ public class StormClusterStateImpl implements IStormClusterState {
         List<ProfileRequest> profileRequests = getTopologyProfileRequests(stormId);
         for (ProfileRequest profileRequest : profileRequests) {
             NodeInfo nodeInfo1 = profileRequest.get_nodeInfo();
-            if (nodeInfo1.equals(nodeInfo))
+            if (nodeInfo1.equals(nodeInfo)) {
                 requests.add(profileRequest);
+            }
         }
         return requests;
     }
@@ -296,8 +363,9 @@ public class StormClusterStateImpl implements IStormClusterState {
                 String childPath = path + ClusterUtils.ZK_SEPERATOR + str;
                 byte[] raw = stateStorage.get_data(childPath, false);
                 ProfileRequest request = ClusterUtils.maybeDeserialize(raw, ProfileRequest.class);
-                if (request != null)
+                if (request != null) {
                     profileRequests.add(request);
+                }
             }
         }
         return profileRequests;
@@ -322,13 +390,13 @@ public class StormClusterStateImpl implements IStormClusterState {
     }
 
     /**
-     * need to take executor->node+port in explicitly so that we don't run into a situation where a long dead worker with a skewed clock overrides all the
-     * timestamps. By only checking heartbeats with an assigned node+port, and only reading executors from that heartbeat that are actually assigned, we avoid
-     * situations like that
+     * need to take executor->node+port in explicitly so that we don't run into a situation where a long dead worker
+     * with a skewed clock overrides all the timestamps. By only checking heartbeats with an assigned node+port,
+     * and only reading executors from that heartbeat that are actually assigned, we avoid situations like that.
      * 
-     * @param stormId
-     * @param executorNodePort
-     * @return
+     * @param stormId topology id
+     * @param executorNodePort executor id -> node + port
+     * @return mapping of executorInfo -> executor beat
      */
     @Override
     public Map<ExecutorInfo, ExecutorBeat> executorBeats(String stormId, Map<List<Long>, NodeInfo> executorNodePort) {
@@ -345,8 +413,9 @@ public class StormClusterStateImpl implements IStormClusterState {
             for (List<Long> list : entry.getValue()) {
                 executorInfoList.add(new ExecutorInfo(list.get(0).intValue(), list.get(list.size() - 1).intValue()));
             }
-            if (whb != null)
+            if (whb != null) {
                 executorWhbs.putAll(ClusterUtils.convertExecutorBeats(executorInfoList, whb));
+            }
         }
         return executorWhbs;
     }
@@ -399,6 +468,14 @@ public class StormClusterStateImpl implements IStormClusterState {
     }
 
     @Override
+    public NimbusInfo getLeader(Runnable callback) {
+        if (null != callback) {
+            this.leaderInfoCallback.set(callback);
+        }
+        return Utils.javaDeserialize(this.stateStorage.get_data(ClusterUtils.LEADERINFO_SUBTREE, callback != null), NimbusInfo.class);
+    }
+
+    @Override
     public List<String> backpressureTopologies() {
         return stateStorage.get_children(ClusterUtils.BACKPRESSURE_SUBTREE, false);
     }
@@ -508,6 +585,7 @@ public class StormClusterStateImpl implements IStormClusterState {
     public void activateStorm(String stormId, StormBase stormBase) {
         String path = ClusterUtils.stormPath(stormId);
         stateStorage.set_data(path, Utils.serialize(stormBase), defaultAcls);
+        this.assignmentsBackend.keepStormId(stormBase.get_name(), stormId);
     }
 
     /**
@@ -533,8 +611,9 @@ public class StormClusterStateImpl implements IStormClusterState {
                     newComponentExecutors.put(entry.getKey(), entry.getValue());
                 }
             }
-            if (newComponentExecutors.size() > 0)
+            if (newComponentExecutors.size() > 0) {
                 newElems.set_component_executors(newComponentExecutors);
+            }
         }
 
         Map<String, DebugOptions> ComponentDebug = new HashMap<>();
@@ -605,7 +684,9 @@ public class StormClusterStateImpl implements IStormClusterState {
 
     @Override
     public void setAssignment(String stormId, Assignment info) {
-        stateStorage.set_data(ClusterUtils.assignmentPath(stormId), Utils.serialize(info), defaultAcls);
+        byte[] serAssignment = Utils.serialize(info);
+        stateStorage.set_data(ClusterUtils.assignmentPath(stormId), serAssignment, defaultAcls);
+        this.assignmentsBackend.keepOrUpdateAssignment(stormId, info);
     }
 
     @Override
@@ -636,6 +717,7 @@ public class StormClusterStateImpl implements IStormClusterState {
     @Override
     public void removeStorm(String stormId) {
         stateStorage.delete_node(ClusterUtils.assignmentPath(stormId));
+        this.assignmentsBackend.clearStateForStorm(stormId);
         stateStorage.delete_node(ClusterUtils.credentialsPath(stormId));
         stateStorage.delete_node(ClusterUtils.logConfigPath(stormId));
         stateStorage.delete_node(ClusterUtils.profilerConfigPath(stormId));
@@ -697,8 +779,9 @@ public class StormClusterStateImpl implements IStormClusterState {
             for (String child : childrens) {
                 String childPath = path + ClusterUtils.ZK_SEPERATOR + child;
                 ErrorInfo errorInfo = ClusterUtils.maybeDeserialize(stateStorage.get_data(childPath, false), ErrorInfo.class);
-                if (errorInfo != null)
+                if (errorInfo != null) {
                     errorInfos.add(errorInfo);
+                }
             }
         }
         Collections.sort(errorInfos, new Comparator<ErrorInfo>() {
@@ -745,6 +828,7 @@ public class StormClusterStateImpl implements IStormClusterState {
         stateStorage.unregister(stateId);
         if (solo) {
             stateStorage.close();
+        this.assignmentsBackend.close();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/0dac58b0/storm-client/src/jvm/org/apache/storm/cluster/ZKStateStorage.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/ZKStateStorage.java b/storm-client/src/jvm/org/apache/storm/cluster/ZKStateStorage.java
index d5c29f9..8ae6c5f 100644
--- a/storm-client/src/jvm/org/apache/storm/cluster/ZKStateStorage.java
+++ b/storm-client/src/jvm/org/apache/storm/cluster/ZKStateStorage.java
@@ -15,8 +15,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.cluster;
 
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.state.*;
 import org.apache.curator.framework.state.ConnectionState;
@@ -33,13 +41,6 @@ import org.apache.zookeeper.data.ACL;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-
 public class ZKStateStorage implements IStateStorage {
 
     private static Logger LOG = LoggerFactory.getLogger(ZKStateStorage.class);

http://git-wip-us.apache.org/repos/asf/storm/blob/0dac58b0/storm-client/src/jvm/org/apache/storm/cluster/ZKStateStorageFactory.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/ZKStateStorageFactory.java b/storm-client/src/jvm/org/apache/storm/cluster/ZKStateStorageFactory.java
index 0ae745f..5076045 100644
--- a/storm-client/src/jvm/org/apache/storm/cluster/ZKStateStorageFactory.java
+++ b/storm-client/src/jvm/org/apache/storm/cluster/ZKStateStorageFactory.java
@@ -15,14 +15,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.storm.cluster;
 
-import org.apache.storm.utils.Utils;
-import org.apache.zookeeper.data.ACL;
+package org.apache.storm.cluster;
 
 import java.util.List;
 import java.util.Map;
 
+import org.apache.storm.utils.Utils;
+import org.apache.zookeeper.data.ACL;
+
 public class ZKStateStorageFactory implements StateStorageFactory {
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/0dac58b0/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
index ca0d4d0..94ea9af 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
@@ -57,6 +57,7 @@ import org.apache.storm.generated.ExecutorInfo;
 import org.apache.storm.generated.ExecutorStats;
 import org.apache.storm.generated.LSWorkerHeartbeat;
 import org.apache.storm.generated.LogConfig;
+import org.apache.storm.generated.SupervisorWorkerHeartbeat;
 import org.apache.storm.messaging.IConnection;
 import org.apache.storm.messaging.IContext;
 import org.apache.storm.security.auth.AuthUtils;
@@ -64,7 +65,9 @@ import org.apache.storm.security.auth.IAutoCredentials;
 import org.apache.storm.stats.StatsUtil;
 import org.apache.storm.utils.ConfigUtils;
 import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.NimbusClient;
 import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.SupervisorClient;
 import org.apache.storm.utils.Time;
 import org.apache.storm.utils.Utils;
 import org.apache.zookeeper.data.ACL;
@@ -82,6 +85,7 @@ public class Worker implements Shutdownable, DaemonCommon {
     private final IContext context;
     private final String topologyId;
     private final String assignmentId;
+    private final int supervisorPort;
     private final int port;
     private final String workerId;
     private final LogConfigManager logConfigManager;
@@ -105,15 +109,18 @@ public class Worker implements Shutdownable, DaemonCommon {
      * @param context      -
      * @param topologyId   - topology id
      * @param assignmentId - assignment id
+     * @param supervisorPort - parent supervisor thrift server port
      * @param port         - port on which the worker runs
      * @param workerId     - worker id
      */
 
-    public Worker(Map<String, Object> conf, IContext context, String topologyId, String assignmentId, int port, String workerId) {
+    public Worker(Map<String, Object> conf, IContext context, String topologyId, String assignmentId,
+            int supervisorPort, int port, String workerId) {
         this.conf = conf;
         this.context = context;
         this.topologyId = topologyId;
         this.assignmentId = assignmentId;
+        this.supervisorPort = supervisorPort;
         this.port = port;
         this.workerId = workerId;
         this.logConfigManager = new LogConfigManager();
@@ -137,7 +144,7 @@ public class Worker implements Shutdownable, DaemonCommon {
             ConfigUtils.overrideLoginConfigWithSystemProperty(ConfigUtils.readSupervisorStormConf(conf, topologyId));
         ClusterStateContext csContext = new ClusterStateContext(DaemonType.WORKER, topologyConf);
         IStateStorage stateStorage = ClusterUtils.mkStateStorage(conf, topologyConf, csContext);
-        IStormClusterState stormClusterState = ClusterUtils.mkStormClusterState(stateStorage, csContext);
+        IStormClusterState stormClusterState = ClusterUtils.mkStormClusterState(stateStorage, null, csContext);
 
         Credentials initialCredentials = stormClusterState.credentials(topologyId, null);
         Map<String, String> initCreds = new HashMap<>();
@@ -156,8 +163,8 @@ public class Worker implements Shutdownable, DaemonCommon {
     private Object loadWorker(Map<String, Object> topologyConf, IStateStorage stateStorage, IStormClusterState stormClusterState,
                               Map<String, String> initCreds, Credentials initialCredentials)
         throws Exception {
-        workerState = new WorkerState(conf, context, topologyId, assignmentId, port, workerId, topologyConf, stateStorage,
-            stormClusterState, autoCreds);
+        workerState = new WorkerState(conf, context, topologyId, assignmentId, supervisorPort, port, workerId,
+            topologyConf, stateStorage, stormClusterState, autoCreds);
 
         // Heartbeat here so that worker process dies if this fails
         // it's important that worker heartbeat to supervisor ASAP so that supervisor knows
@@ -178,7 +185,7 @@ public class Worker implements Shutdownable, DaemonCommon {
             });
 
         workerState.executorHeartbeatTimer
-            .scheduleRecurring(0, (Integer) conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS),
+            .scheduleRecurring(0, (Integer) conf.get(Config.EXECUTOR_METRICS_FREQUENCY_SECS),
                 Worker.this::doExecutorHeartbeats);
 
         workerState.registerCallbacks();
@@ -313,12 +320,14 @@ public class Worker implements Shutdownable, DaemonCommon {
 
     public void doHeartBeat() throws IOException {
         LocalState state = ConfigUtils.workerState(workerState.conf, workerState.workerId);
-        state.setWorkerHeartBeat(new LSWorkerHeartbeat(Time.currentTimeSecs(), workerState.topologyId,
+        LSWorkerHeartbeat lsWorkerHeartbeat = new LSWorkerHeartbeat(Time.currentTimeSecs(), workerState.topologyId,
             workerState.localExecutors.stream()
                 .map(executor -> new ExecutorInfo(executor.get(0).intValue(), executor.get(1).intValue()))
-                .collect(Collectors.toList()), workerState.port));
+                .collect(Collectors.toList()), workerState.port);
+        state.setWorkerHeartBeat(lsWorkerHeartbeat);
         state.cleanup(60); // this is just in case supervisor is down so that disk doesn't fill up.
         // it shouldn't take supervisor 120 seconds between listing dir and reading it
+        heartbeatToMasterIfLocalbeatFail(lsWorkerHeartbeat);
     }
 
     public void doExecutorHeartbeats() {
@@ -392,6 +401,30 @@ public class Worker implements Shutdownable, DaemonCommon {
         workerState.stormClusterState.topologyLogConfig(topologyId, this::checkLogConfigChanged);
     }
 
+    /**
+     * Send a heartbeat to local supervisor first to check if supervisor is ok for heartbeating.
+     */
+    private void heartbeatToMasterIfLocalbeatFail(LSWorkerHeartbeat lsWorkerHeartbeat) {
+        if (ConfigUtils.isLocalMode(this.conf)) {
+            return;
+        }
+        //In distributed mode, send heartbeat directly to master if local supervisor goes down.
+        SupervisorWorkerHeartbeat workerHeartbeat = new SupervisorWorkerHeartbeat(lsWorkerHeartbeat.get_topology_id(),
+                lsWorkerHeartbeat.get_executors(), lsWorkerHeartbeat.get_time_secs());
+        try (SupervisorClient client = SupervisorClient.getConfiguredClient(conf, Utils.hostname(), supervisorPort)){
+            client.getClient().sendSupervisorWorkerHeartbeat(workerHeartbeat);
+        } catch (Exception tr1) {
+            //If any error/exception thrown, report directly to nimbus.
+            LOG.warn("Exception when send heartbeat to local supervisor", tr1.getMessage());
+            try (NimbusClient nimbusClient = NimbusClient.getConfiguredClient(conf)){
+                nimbusClient.getClient().sendSupervisorWorkerHeartbeat(workerHeartbeat);
+            } catch (Exception tr2) {
+                //if any error/exception thrown, just ignore.
+                LOG.error("Exception when send heartbeat to master",  tr2.getMessage());
+            }
+        }
+    }
+
     @Override
     public void shutdown() {
         try {
@@ -463,15 +496,17 @@ public class Worker implements Shutdownable, DaemonCommon {
     }
 
     public static void main(String[] args) throws Exception {
-        Preconditions.checkArgument(args.length == 4, "Illegal number of arguments. Expected: 4, Actual: " + args.length);
+        Preconditions.checkArgument(args.length == 5, "Illegal number of arguments. Expected: 5, Actual: " + args.length);
         String stormId = args[0];
         String assignmentId = args[1];
-        String portStr = args[2];
-        String workerId = args[3];
+        String supervisorPort = args[2];
+        String portStr = args[3];
+        String workerId = args[4];
         Map<String, Object> conf = Utils.readStormConfig();
         Utils.setupDefaultUncaughtExceptionHandler();
         StormCommon.validateDistributedMode(conf);
-        Worker worker = new Worker(conf, null, stormId, assignmentId, Integer.parseInt(portStr), workerId);
+        Worker worker = new Worker(conf, null, stormId, assignmentId, Integer.parseInt(supervisorPort),
+                Integer.parseInt(portStr), workerId);
         worker.start();
         Utils.addShutdownHookWithForceKillIn1Sec(worker::shutdown);
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/0dac58b0/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
index d080471..992dcbc 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
@@ -81,6 +81,7 @@ import org.apache.storm.utils.ConfigUtils;
 import org.apache.storm.utils.JCQueue;
 import org.apache.storm.utils.Utils;
 import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.SupervisorClient;
 import org.apache.storm.utils.ThriftTopologyUtils;
 import org.apache.storm.utils.Utils.SmartThread;
 import org.slf4j.Logger;
@@ -199,6 +200,7 @@ public class WorkerState {
     final IConnection receiver;
     final String topologyId;
     final String assignmentId;
+    final int supervisorPort;
     final int port;
     final String workerId;
     final IStateStorage stateStorage;
@@ -272,18 +274,17 @@ public class WorkerState {
     private final Collection<IAutoCredentials> autoCredentials;
     private static final long LOAD_REFRESH_INTERVAL_MS = 5000L;
 
-    public WorkerState(Map<String, Object> conf, IContext mqContext, String topologyId, String assignmentId, int port, String workerId,
-                       Map<String, Object> topologyConf, IStateStorage stateStorage, IStormClusterState stormClusterState,
-                       Collection<IAutoCredentials> autoCredentials)
-        throws IOException, InvalidTopologyException {
+    public WorkerState(Map<String, Object> conf, IContext mqContext, String topologyId, String assignmentId,
+        int supervisorPort, int port, String workerId, Map<String, Object> topologyConf, IStateStorage stateStorage,
+        IStormClusterState stormClusterState, Collection<IAutoCredentials> autoCredentials) throws IOException, InvalidTopologyException {
         this.autoCredentials = autoCredentials;
-        this.localExecutors = new HashSet<>(readWorkerExecutors(stormClusterState, topologyId, assignmentId, port));
-
         this.conf = conf;
+        this.localExecutors = new HashSet<>(readWorkerExecutors(stormClusterState, topologyId, assignmentId, port));
         this.mqContext = (null != mqContext) ? mqContext : TransportFactory.makeContext(topologyConf);
         this.receiver = this.mqContext.bind(topologyId, port);
         this.topologyId = topologyId;
         this.assignmentId = assignmentId;
+        this.supervisorPort = supervisorPort;
         this.port = port;
         this.workerId = workerId;
         this.stateStorage = stateStorage;
@@ -354,23 +355,7 @@ public class WorkerState {
     }
 
     public void refreshConnections(Runnable callback) throws Exception {
-        Integer version = stormClusterState.assignmentVersion(topologyId, callback);
-        version = (null == version) ? 0 : version;
-        VersionedData<Assignment> assignmentVersion = assignmentVersions.get().get(topologyId);
-        Assignment assignment;
-        if (null != assignmentVersion && (assignmentVersion.getVersion() == version)) {
-            assignment = assignmentVersion.getData();
-        } else {
-            VersionedData<Assignment>
-                newAssignmentVersion = new VersionedData<>(version,
-                stormClusterState.assignmentInfoWithVersion(topologyId, callback).getData());
-            assignmentVersions.getAndUpdate(prev -> {
-                Map<String, VersionedData<Assignment>> next = new HashMap<>(prev);
-                next.put(topologyId, newAssignmentVersion);
-                return next;
-            });
-            assignment = newAssignmentVersion.getData();
-        }
+        Assignment assignment = getLocalAssignment(conf, stormClusterState, topologyId);
 
         Set<NodeInfo> neededConnections = new HashSet<>();
         Map<Integer, NodeInfo> newTaskToNodePort = new HashMap<>();
@@ -645,8 +630,7 @@ public class WorkerState {
         LOG.info("Reading assignments");
         List<List<Long>> executorsAssignedToThisWorker = new ArrayList<>();
         executorsAssignedToThisWorker.add(Constants.SYSTEM_EXECUTOR_ID);
-        Map<List<Long>, NodeInfo> executorToNodePort =
-            stormClusterState.assignmentInfo(topologyId, null).get_executor_node_port();
+        Map<List<Long>, NodeInfo> executorToNodePort = getLocalAssignment(conf, stormClusterState, topologyId).get_executor_node_port();
         for (Map.Entry<List<Long>, NodeInfo> entry : executorToNodePort.entrySet()) {
             NodeInfo nodeInfo = entry.getValue();
             if (nodeInfo.get_node().equals(assignmentId) && nodeInfo.get_port().iterator().next() == port) {
@@ -656,6 +640,21 @@ public class WorkerState {
         return executorsAssignedToThisWorker;
     }
 
+    private Assignment getLocalAssignment(Map<String, Object> conf, IStormClusterState stormClusterState, String topologyId) {
+        if (!ConfigUtils.isLocalMode(conf)) {
+            try (SupervisorClient supervisorClient = SupervisorClient.getConfiguredClient(conf, Utils.hostname(),
+                    supervisorPort)){
+                Assignment assignment = supervisorClient.getClient().getLocalAssignmentForStorm(topologyId);
+                return assignment;
+            } catch (Throwable tr1) {
+                //if any error/exception thrown, fetch it from zookeeper
+                return stormClusterState.remoteAssignmentInfo(topologyId, null);
+            }
+        } else {
+            return stormClusterState.remoteAssignmentInfo(topologyId, null);
+        }
+    }
+
     private Map<List<Long>, JCQueue> mkReceiveQueueMap(Map<String, Object> topologyConf, Set<List<Long>> executors) {
         Integer recvQueueSize = ObjectReader.getInt(topologyConf.get(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE));
         Integer recvBatchSize = ObjectReader.getInt(topologyConf.get(Config.TOPOLOGY_PRODUCER_BATCH_SIZE));

http://git-wip-us.apache.org/repos/asf/storm/blob/0dac58b0/storm-client/src/jvm/org/apache/storm/generated/HBNodes.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/generated/HBNodes.java b/storm-client/src/jvm/org/apache/storm/generated/HBNodes.java
index 887dbc0..4bbd4f4 100644
--- a/storm-client/src/jvm/org/apache/storm/generated/HBNodes.java
+++ b/storm-client/src/jvm/org/apache/storm/generated/HBNodes.java
@@ -364,13 +364,13 @@ public class HBNodes implements org.apache.thrift.TBase<HBNodes, HBNodes._Fields
           case 1: // PULSE_IDS
             if (schemeField.type == org.apache.thrift.protocol.TType.LIST) {
               {
-                org.apache.thrift.protocol.TList _list886 = iprot.readListBegin();
-                struct.pulseIds = new ArrayList<String>(_list886.size);
-                String _elem887;
-                for (int _i888 = 0; _i888 < _list886.size; ++_i888)
+                org.apache.thrift.protocol.TList _list912 = iprot.readListBegin();
+                struct.pulseIds = new ArrayList<String>(_list912.size);
+                String _elem913;
+                for (int _i914 = 0; _i914 < _list912.size; ++_i914)
                 {
-                  _elem887 = iprot.readString();
-                  struct.pulseIds.add(_elem887);
+                  _elem913 = iprot.readString();
+                  struct.pulseIds.add(_elem913);
                 }
                 iprot.readListEnd();
               }
@@ -396,9 +396,9 @@ public class HBNodes implements org.apache.thrift.TBase<HBNodes, HBNodes._Fields
         oprot.writeFieldBegin(PULSE_IDS_FIELD_DESC);
         {
           oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRING, struct.pulseIds.size()));
-          for (String _iter889 : struct.pulseIds)
+          for (String _iter915 : struct.pulseIds)
           {
-            oprot.writeString(_iter889);
+            oprot.writeString(_iter915);
           }
           oprot.writeListEnd();
         }
@@ -429,9 +429,9 @@ public class HBNodes implements org.apache.thrift.TBase<HBNodes, HBNodes._Fields
       if (struct.is_set_pulseIds()) {
         {
           oprot.writeI32(struct.pulseIds.size());
-          for (String _iter890 : struct.pulseIds)
+          for (String _iter916 : struct.pulseIds)
           {
-            oprot.writeString(_iter890);
+            oprot.writeString(_iter916);
           }
         }
       }
@@ -443,13 +443,13 @@ public class HBNodes implements org.apache.thrift.TBase<HBNodes, HBNodes._Fields
       BitSet incoming = iprot.readBitSet(1);
       if (incoming.get(0)) {
         {
-          org.apache.thrift.protocol.TList _list891 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRING, iprot.readI32());
-          struct.pulseIds = new ArrayList<String>(_list891.size);
-          String _elem892;
-          for (int _i893 = 0; _i893 < _list891.size; ++_i893)
+          org.apache.thrift.protocol.TList _list917 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRING, iprot.readI32());
+          struct.pulseIds = new ArrayList<String>(_list917.size);
+          String _elem918;
+          for (int _i919 = 0; _i919 < _list917.size; ++_i919)
           {
-            _elem892 = iprot.readString();
-            struct.pulseIds.add(_elem892);
+            _elem918 = iprot.readString();
+            struct.pulseIds.add(_elem918);
           }
         }
         struct.set_pulseIds_isSet(true);

http://git-wip-us.apache.org/repos/asf/storm/blob/0dac58b0/storm-client/src/jvm/org/apache/storm/generated/HBRecords.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/generated/HBRecords.java b/storm-client/src/jvm/org/apache/storm/generated/HBRecords.java
index cfed785..fae00cf 100644
--- a/storm-client/src/jvm/org/apache/storm/generated/HBRecords.java
+++ b/storm-client/src/jvm/org/apache/storm/generated/HBRecords.java
@@ -367,14 +367,14 @@ public class HBRecords implements org.apache.thrift.TBase<HBRecords, HBRecords._
           case 1: // PULSES
             if (schemeField.type == org.apache.thrift.protocol.TType.LIST) {
               {
-                org.apache.thrift.protocol.TList _list878 = iprot.readListBegin();
-                struct.pulses = new ArrayList<HBPulse>(_list878.size);
-                HBPulse _elem879;
-                for (int _i880 = 0; _i880 < _list878.size; ++_i880)
+                org.apache.thrift.protocol.TList _list904 = iprot.readListBegin();
+                struct.pulses = new ArrayList<HBPulse>(_list904.size);
+                HBPulse _elem905;
+                for (int _i906 = 0; _i906 < _list904.size; ++_i906)
                 {
-                  _elem879 = new HBPulse();
-                  _elem879.read(iprot);
-                  struct.pulses.add(_elem879);
+                  _elem905 = new HBPulse();
+                  _elem905.read(iprot);
+                  struct.pulses.add(_elem905);
                 }
                 iprot.readListEnd();
               }
@@ -400,9 +400,9 @@ public class HBRecords implements org.apache.thrift.TBase<HBRecords, HBRecords._
         oprot.writeFieldBegin(PULSES_FIELD_DESC);
         {
           oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRUCT, struct.pulses.size()));
-          for (HBPulse _iter881 : struct.pulses)
+          for (HBPulse _iter907 : struct.pulses)
           {
-            _iter881.write(oprot);
+            _iter907.write(oprot);
           }
           oprot.writeListEnd();
         }
@@ -433,9 +433,9 @@ public class HBRecords implements org.apache.thrift.TBase<HBRecords, HBRecords._
       if (struct.is_set_pulses()) {
         {
           oprot.writeI32(struct.pulses.size());
-          for (HBPulse _iter882 : struct.pulses)
+          for (HBPulse _iter908 : struct.pulses)
           {
-            _iter882.write(oprot);
+            _iter908.write(oprot);
           }
         }
       }
@@ -447,14 +447,14 @@ public class HBRecords implements org.apache.thrift.TBase<HBRecords, HBRecords._
       BitSet incoming = iprot.readBitSet(1);
       if (incoming.get(0)) {
         {
-          org.apache.thrift.protocol.TList _list883 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRUCT, iprot.readI32());
-          struct.pulses = new ArrayList<HBPulse>(_list883.size);
-          HBPulse _elem884;
-          for (int _i885 = 0; _i885 < _list883.size; ++_i885)
+          org.apache.thrift.protocol.TList _list909 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRUCT, iprot.readI32());
+          struct.pulses = new ArrayList<HBPulse>(_list909.size);
+          HBPulse _elem910;
+          for (int _i911 = 0; _i911 < _list909.size; ++_i911)
           {
-            _elem884 = new HBPulse();
-            _elem884.read(iprot);
-            struct.pulses.add(_elem884);
+            _elem910 = new HBPulse();
+            _elem910.read(iprot);
+            struct.pulses.add(_elem910);
           }
         }
         struct.set_pulses_isSet(true);