You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2018/03/28 21:22:14 UTC

[04/10] storm git commit: [STORM-2693] Heartbeats and assignments promotion for storm2.0

http://git-wip-us.apache.org/repos/asf/storm/blob/0dac58b0/storm-client/src/py/storm/ttypes.py
----------------------------------------------------------------------
diff --git a/storm-client/src/py/storm/ttypes.py b/storm-client/src/py/storm/ttypes.py
index 2ae3605..f6c98b0 100644
--- a/storm-client/src/py/storm/ttypes.py
+++ b/storm-client/src/py/storm/ttypes.py
@@ -244,15 +244,18 @@ class HBServerMessageType:
 class WorkerTokenServiceType:
   NIMBUS = 0
   DRPC = 1
+  SUPERVISOR = 2
 
   _VALUES_TO_NAMES = {
     0: "NIMBUS",
     1: "DRPC",
+    2: "SUPERVISOR",
   }
 
   _NAMES_TO_VALUES = {
     "NIMBUS": 0,
     "DRPC": 1,
+    "SUPERVISOR": 2,
   }
 
 
@@ -9165,6 +9168,7 @@ class SupervisorInfo:
    - time_secs
    - hostname
    - assignment_id
+   - server_port
    - used_ports
    - meta
    - scheduler_meta
@@ -9178,18 +9182,20 @@ class SupervisorInfo:
     (1, TType.I64, 'time_secs', None, None, ), # 1
     (2, TType.STRING, 'hostname', None, None, ), # 2
     (3, TType.STRING, 'assignment_id', None, None, ), # 3
-    (4, TType.LIST, 'used_ports', (TType.I64,None), None, ), # 4
-    (5, TType.LIST, 'meta', (TType.I64,None), None, ), # 5
-    (6, TType.MAP, 'scheduler_meta', (TType.STRING,None,TType.STRING,None), None, ), # 6
-    (7, TType.I64, 'uptime_secs', None, None, ), # 7
-    (8, TType.STRING, 'version', None, None, ), # 8
-    (9, TType.MAP, 'resources_map', (TType.STRING,None,TType.DOUBLE,None), None, ), # 9
+    (4, TType.I32, 'server_port', None, None, ), # 4
+    (5, TType.LIST, 'used_ports', (TType.I64,None), None, ), # 5
+    (6, TType.LIST, 'meta', (TType.I64,None), None, ), # 6
+    (7, TType.MAP, 'scheduler_meta', (TType.STRING,None,TType.STRING,None), None, ), # 7
+    (8, TType.I64, 'uptime_secs', None, None, ), # 8
+    (9, TType.STRING, 'version', None, None, ), # 9
+    (10, TType.MAP, 'resources_map', (TType.STRING,None,TType.DOUBLE,None), None, ), # 10
   )
 
-  def __init__(self, time_secs=None, hostname=None, assignment_id=None, used_ports=None, meta=None, scheduler_meta=None, uptime_secs=None, version=None, resources_map=None,):
+  def __init__(self, time_secs=None, hostname=None, assignment_id=None, server_port=None, used_ports=None, meta=None, scheduler_meta=None, uptime_secs=None, version=None, resources_map=None,):
     self.time_secs = time_secs
     self.hostname = hostname
     self.assignment_id = assignment_id
+    self.server_port = server_port
     self.used_ports = used_ports
     self.meta = meta
     self.scheduler_meta = scheduler_meta
@@ -9222,6 +9228,11 @@ class SupervisorInfo:
         else:
           iprot.skip(ftype)
       elif fid == 4:
+        if ftype == TType.I32:
+          self.server_port = iprot.readI32()
+        else:
+          iprot.skip(ftype)
+      elif fid == 5:
         if ftype == TType.LIST:
           self.used_ports = []
           (_etype559, _size556) = iprot.readListBegin()
@@ -9231,7 +9242,7 @@ class SupervisorInfo:
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
-      elif fid == 5:
+      elif fid == 6:
         if ftype == TType.LIST:
           self.meta = []
           (_etype565, _size562) = iprot.readListBegin()
@@ -9241,7 +9252,7 @@ class SupervisorInfo:
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
-      elif fid == 6:
+      elif fid == 7:
         if ftype == TType.MAP:
           self.scheduler_meta = {}
           (_ktype569, _vtype570, _size568 ) = iprot.readMapBegin()
@@ -9252,17 +9263,17 @@ class SupervisorInfo:
           iprot.readMapEnd()
         else:
           iprot.skip(ftype)
-      elif fid == 7:
+      elif fid == 8:
         if ftype == TType.I64:
           self.uptime_secs = iprot.readI64()
         else:
           iprot.skip(ftype)
-      elif fid == 8:
+      elif fid == 9:
         if ftype == TType.STRING:
           self.version = iprot.readString().decode('utf-8')
         else:
           iprot.skip(ftype)
-      elif fid == 9:
+      elif fid == 10:
         if ftype == TType.MAP:
           self.resources_map = {}
           (_ktype576, _vtype577, _size575 ) = iprot.readMapBegin()
@@ -9295,22 +9306,26 @@ class SupervisorInfo:
       oprot.writeFieldBegin('assignment_id', TType.STRING, 3)
       oprot.writeString(self.assignment_id.encode('utf-8'))
       oprot.writeFieldEnd()
+    if self.server_port is not None:
+      oprot.writeFieldBegin('server_port', TType.I32, 4)
+      oprot.writeI32(self.server_port)
+      oprot.writeFieldEnd()
     if self.used_ports is not None:
-      oprot.writeFieldBegin('used_ports', TType.LIST, 4)
+      oprot.writeFieldBegin('used_ports', TType.LIST, 5)
       oprot.writeListBegin(TType.I64, len(self.used_ports))
       for iter582 in self.used_ports:
         oprot.writeI64(iter582)
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     if self.meta is not None:
-      oprot.writeFieldBegin('meta', TType.LIST, 5)
+      oprot.writeFieldBegin('meta', TType.LIST, 6)
       oprot.writeListBegin(TType.I64, len(self.meta))
       for iter583 in self.meta:
         oprot.writeI64(iter583)
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     if self.scheduler_meta is not None:
-      oprot.writeFieldBegin('scheduler_meta', TType.MAP, 6)
+      oprot.writeFieldBegin('scheduler_meta', TType.MAP, 7)
       oprot.writeMapBegin(TType.STRING, TType.STRING, len(self.scheduler_meta))
       for kiter584,viter585 in self.scheduler_meta.items():
         oprot.writeString(kiter584.encode('utf-8'))
@@ -9318,15 +9333,15 @@ class SupervisorInfo:
       oprot.writeMapEnd()
       oprot.writeFieldEnd()
     if self.uptime_secs is not None:
-      oprot.writeFieldBegin('uptime_secs', TType.I64, 7)
+      oprot.writeFieldBegin('uptime_secs', TType.I64, 8)
       oprot.writeI64(self.uptime_secs)
       oprot.writeFieldEnd()
     if self.version is not None:
-      oprot.writeFieldBegin('version', TType.STRING, 8)
+      oprot.writeFieldBegin('version', TType.STRING, 9)
       oprot.writeString(self.version.encode('utf-8'))
       oprot.writeFieldEnd()
     if self.resources_map is not None:
-      oprot.writeFieldBegin('resources_map', TType.MAP, 9)
+      oprot.writeFieldBegin('resources_map', TType.MAP, 10)
       oprot.writeMapBegin(TType.STRING, TType.DOUBLE, len(self.resources_map))
       for kiter586,viter587 in self.resources_map.items():
         oprot.writeString(kiter586.encode('utf-8'))
@@ -9349,6 +9364,7 @@ class SupervisorInfo:
     value = (value * 31) ^ hash(self.time_secs)
     value = (value * 31) ^ hash(self.hostname)
     value = (value * 31) ^ hash(self.assignment_id)
+    value = (value * 31) ^ hash(self.server_port)
     value = (value * 31) ^ hash(self.used_ports)
     value = (value * 31) ^ hash(self.meta)
     value = (value * 31) ^ hash(self.scheduler_meta)
@@ -11838,6 +11854,283 @@ class OwnerResourceSummary:
   def __ne__(self, other):
     return not (self == other)
 
+class SupervisorWorkerHeartbeat:
+  """
+  Attributes:
+   - storm_id
+   - executors
+   - time_secs
+  """
+
+  thrift_spec = (
+    None, # 0
+    (1, TType.STRING, 'storm_id', None, None, ), # 1
+    (2, TType.LIST, 'executors', (TType.STRUCT,(ExecutorInfo, ExecutorInfo.thrift_spec)), None, ), # 2
+    (3, TType.I32, 'time_secs', None, None, ), # 3
+  )
+
+  def __init__(self, storm_id=None, executors=None, time_secs=None,):
+    self.storm_id = storm_id
+    self.executors = executors
+    self.time_secs = time_secs
+
+  def read(self, iprot):
+    if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+      fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+      return
+    iprot.readStructBegin()
+    while True:
+      (fname, ftype, fid) = iprot.readFieldBegin()
+      if ftype == TType.STOP:
+        break
+      if fid == 1:
+        if ftype == TType.STRING:
+          self.storm_id = iprot.readString().decode('utf-8')
+        else:
+          iprot.skip(ftype)
+      elif fid == 2:
+        if ftype == TType.LIST:
+          self.executors = []
+          (_etype780, _size777) = iprot.readListBegin()
+          for _i781 in xrange(_size777):
+            _elem782 = ExecutorInfo()
+            _elem782.read(iprot)
+            self.executors.append(_elem782)
+          iprot.readListEnd()
+        else:
+          iprot.skip(ftype)
+      elif fid == 3:
+        if ftype == TType.I32:
+          self.time_secs = iprot.readI32()
+        else:
+          iprot.skip(ftype)
+      else:
+        iprot.skip(ftype)
+      iprot.readFieldEnd()
+    iprot.readStructEnd()
+
+  def write(self, oprot):
+    if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+      oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+      return
+    oprot.writeStructBegin('SupervisorWorkerHeartbeat')
+    if self.storm_id is not None:
+      oprot.writeFieldBegin('storm_id', TType.STRING, 1)
+      oprot.writeString(self.storm_id.encode('utf-8'))
+      oprot.writeFieldEnd()
+    if self.executors is not None:
+      oprot.writeFieldBegin('executors', TType.LIST, 2)
+      oprot.writeListBegin(TType.STRUCT, len(self.executors))
+      for iter783 in self.executors:
+        iter783.write(oprot)
+      oprot.writeListEnd()
+      oprot.writeFieldEnd()
+    if self.time_secs is not None:
+      oprot.writeFieldBegin('time_secs', TType.I32, 3)
+      oprot.writeI32(self.time_secs)
+      oprot.writeFieldEnd()
+    oprot.writeFieldStop()
+    oprot.writeStructEnd()
+
+  def validate(self):
+    if self.storm_id is None:
+      raise TProtocol.TProtocolException(message='Required field storm_id is unset!')
+    if self.executors is None:
+      raise TProtocol.TProtocolException(message='Required field executors is unset!')
+    if self.time_secs is None:
+      raise TProtocol.TProtocolException(message='Required field time_secs is unset!')
+    return
+
+
+  def __hash__(self):
+    value = 17
+    value = (value * 31) ^ hash(self.storm_id)
+    value = (value * 31) ^ hash(self.executors)
+    value = (value * 31) ^ hash(self.time_secs)
+    return value
+
+  def __repr__(self):
+    L = ['%s=%r' % (key, value)
+      for key, value in self.__dict__.iteritems()]
+    return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+  def __eq__(self, other):
+    return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+  def __ne__(self, other):
+    return not (self == other)
+
+class SupervisorWorkerHeartbeats:
+  """
+  Attributes:
+   - supervisor_id
+   - worker_heartbeats
+  """
+
+  thrift_spec = (
+    None, # 0
+    (1, TType.STRING, 'supervisor_id', None, None, ), # 1
+    (2, TType.LIST, 'worker_heartbeats', (TType.STRUCT,(SupervisorWorkerHeartbeat, SupervisorWorkerHeartbeat.thrift_spec)), None, ), # 2
+  )
+
+  def __init__(self, supervisor_id=None, worker_heartbeats=None,):
+    self.supervisor_id = supervisor_id
+    self.worker_heartbeats = worker_heartbeats
+
+  def read(self, iprot):
+    if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+      fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+      return
+    iprot.readStructBegin()
+    while True:
+      (fname, ftype, fid) = iprot.readFieldBegin()
+      if ftype == TType.STOP:
+        break
+      if fid == 1:
+        if ftype == TType.STRING:
+          self.supervisor_id = iprot.readString().decode('utf-8')
+        else:
+          iprot.skip(ftype)
+      elif fid == 2:
+        if ftype == TType.LIST:
+          self.worker_heartbeats = []
+          (_etype787, _size784) = iprot.readListBegin()
+          for _i788 in xrange(_size784):
+            _elem789 = SupervisorWorkerHeartbeat()
+            _elem789.read(iprot)
+            self.worker_heartbeats.append(_elem789)
+          iprot.readListEnd()
+        else:
+          iprot.skip(ftype)
+      else:
+        iprot.skip(ftype)
+      iprot.readFieldEnd()
+    iprot.readStructEnd()
+
+  def write(self, oprot):
+    if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+      oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+      return
+    oprot.writeStructBegin('SupervisorWorkerHeartbeats')
+    if self.supervisor_id is not None:
+      oprot.writeFieldBegin('supervisor_id', TType.STRING, 1)
+      oprot.writeString(self.supervisor_id.encode('utf-8'))
+      oprot.writeFieldEnd()
+    if self.worker_heartbeats is not None:
+      oprot.writeFieldBegin('worker_heartbeats', TType.LIST, 2)
+      oprot.writeListBegin(TType.STRUCT, len(self.worker_heartbeats))
+      for iter790 in self.worker_heartbeats:
+        iter790.write(oprot)
+      oprot.writeListEnd()
+      oprot.writeFieldEnd()
+    oprot.writeFieldStop()
+    oprot.writeStructEnd()
+
+  def validate(self):
+    if self.supervisor_id is None:
+      raise TProtocol.TProtocolException(message='Required field supervisor_id is unset!')
+    if self.worker_heartbeats is None:
+      raise TProtocol.TProtocolException(message='Required field worker_heartbeats is unset!')
+    return
+
+
+  def __hash__(self):
+    value = 17
+    value = (value * 31) ^ hash(self.supervisor_id)
+    value = (value * 31) ^ hash(self.worker_heartbeats)
+    return value
+
+  def __repr__(self):
+    L = ['%s=%r' % (key, value)
+      for key, value in self.__dict__.iteritems()]
+    return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+  def __eq__(self, other):
+    return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+  def __ne__(self, other):
+    return not (self == other)
+
+class SupervisorAssignments:
+  """
+  Attributes:
+   - storm_assignment
+  """
+
+  thrift_spec = (
+    None, # 0
+    (1, TType.MAP, 'storm_assignment', (TType.STRING,None,TType.STRUCT,(Assignment, Assignment.thrift_spec)), {
+    }, ), # 1
+  )
+
+  def __init__(self, storm_assignment=thrift_spec[1][4],):
+    if storm_assignment is self.thrift_spec[1][4]:
+      storm_assignment = {
+    }
+    self.storm_assignment = storm_assignment
+
+  def read(self, iprot):
+    if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+      fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+      return
+    iprot.readStructBegin()
+    while True:
+      (fname, ftype, fid) = iprot.readFieldBegin()
+      if ftype == TType.STOP:
+        break
+      if fid == 1:
+        if ftype == TType.MAP:
+          self.storm_assignment = {}
+          (_ktype792, _vtype793, _size791 ) = iprot.readMapBegin()
+          for _i795 in xrange(_size791):
+            _key796 = iprot.readString().decode('utf-8')
+            _val797 = Assignment()
+            _val797.read(iprot)
+            self.storm_assignment[_key796] = _val797
+          iprot.readMapEnd()
+        else:
+          iprot.skip(ftype)
+      else:
+        iprot.skip(ftype)
+      iprot.readFieldEnd()
+    iprot.readStructEnd()
+
+  def write(self, oprot):
+    if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+      oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+      return
+    oprot.writeStructBegin('SupervisorAssignments')
+    if self.storm_assignment is not None:
+      oprot.writeFieldBegin('storm_assignment', TType.MAP, 1)
+      oprot.writeMapBegin(TType.STRING, TType.STRUCT, len(self.storm_assignment))
+      for kiter798,viter799 in self.storm_assignment.items():
+        oprot.writeString(kiter798.encode('utf-8'))
+        viter799.write(oprot)
+      oprot.writeMapEnd()
+      oprot.writeFieldEnd()
+    oprot.writeFieldStop()
+    oprot.writeStructEnd()
+
+  def validate(self):
+    return
+
+
+  def __hash__(self):
+    value = 17
+    value = (value * 31) ^ hash(self.storm_assignment)
+    return value
+
+  def __repr__(self):
+    L = ['%s=%r' % (key, value)
+      for key, value in self.__dict__.iteritems()]
+    return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+  def __eq__(self, other):
+    return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+  def __ne__(self, other):
+    return not (self == other)
+
 class WorkerMetricPoint:
   """
   Attributes:
@@ -12006,11 +12299,11 @@ class WorkerMetricList:
       if fid == 1:
         if ftype == TType.LIST:
           self.metrics = []
-          (_etype780, _size777) = iprot.readListBegin()
-          for _i781 in xrange(_size777):
-            _elem782 = WorkerMetricPoint()
-            _elem782.read(iprot)
-            self.metrics.append(_elem782)
+          (_etype803, _size800) = iprot.readListBegin()
+          for _i804 in xrange(_size800):
+            _elem805 = WorkerMetricPoint()
+            _elem805.read(iprot)
+            self.metrics.append(_elem805)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
@@ -12027,8 +12320,8 @@ class WorkerMetricList:
     if self.metrics is not None:
       oprot.writeFieldBegin('metrics', TType.LIST, 1)
       oprot.writeListBegin(TType.STRUCT, len(self.metrics))
-      for iter783 in self.metrics:
-        iter783.write(oprot)
+      for iter806 in self.metrics:
+        iter806.write(oprot)
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     oprot.writeFieldStop()
@@ -12438,11 +12731,11 @@ class HBRecords:
       if fid == 1:
         if ftype == TType.LIST:
           self.pulses = []
-          (_etype787, _size784) = iprot.readListBegin()
-          for _i788 in xrange(_size784):
-            _elem789 = HBPulse()
-            _elem789.read(iprot)
-            self.pulses.append(_elem789)
+          (_etype810, _size807) = iprot.readListBegin()
+          for _i811 in xrange(_size807):
+            _elem812 = HBPulse()
+            _elem812.read(iprot)
+            self.pulses.append(_elem812)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
@@ -12459,8 +12752,8 @@ class HBRecords:
     if self.pulses is not None:
       oprot.writeFieldBegin('pulses', TType.LIST, 1)
       oprot.writeListBegin(TType.STRUCT, len(self.pulses))
-      for iter790 in self.pulses:
-        iter790.write(oprot)
+      for iter813 in self.pulses:
+        iter813.write(oprot)
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     oprot.writeFieldStop()
@@ -12512,10 +12805,10 @@ class HBNodes:
       if fid == 1:
         if ftype == TType.LIST:
           self.pulseIds = []
-          (_etype794, _size791) = iprot.readListBegin()
-          for _i795 in xrange(_size791):
-            _elem796 = iprot.readString().decode('utf-8')
-            self.pulseIds.append(_elem796)
+          (_etype817, _size814) = iprot.readListBegin()
+          for _i818 in xrange(_size814):
+            _elem819 = iprot.readString().decode('utf-8')
+            self.pulseIds.append(_elem819)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
@@ -12532,8 +12825,8 @@ class HBNodes:
     if self.pulseIds is not None:
       oprot.writeFieldBegin('pulseIds', TType.LIST, 1)
       oprot.writeListBegin(TType.STRING, len(self.pulseIds))
-      for iter797 in self.pulseIds:
-        oprot.writeString(iter797.encode('utf-8'))
+      for iter820 in self.pulseIds:
+        oprot.writeString(iter820.encode('utf-8'))
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     oprot.writeFieldStop()

http://git-wip-us.apache.org/repos/asf/storm/blob/0dac58b0/storm-client/src/storm.thrift
----------------------------------------------------------------------
diff --git a/storm-client/src/storm.thrift b/storm-client/src/storm.thrift
index 81b71f3..e0935bd 100644
--- a/storm-client/src/storm.thrift
+++ b/storm-client/src/storm.thrift
@@ -483,6 +483,7 @@ struct SupervisorInfo {
     7: optional i64 uptime_secs;
     8: optional string version;
     9: optional map<string, double> resources_map;
+   10: optional i32 server_port;
 }
 
 struct NodeInfo {
@@ -672,6 +673,21 @@ struct OwnerResourceSummary {
   18: optional double assigned_off_heap_memory;
 }
 
+struct SupervisorWorkerHeartbeat {
+  1: required string storm_id;
+  2: required list<ExecutorInfo> executors
+  3: required i32 time_secs;
+}
+
+struct SupervisorWorkerHeartbeats {
+  1: required string supervisor_id;
+  2: required list<SupervisorWorkerHeartbeat> worker_heartbeats;
+}
+
+struct SupervisorAssignments {
+  1: optional map<string, Assignment> storm_assignment = {}
+}
+
 struct WorkerMetricPoint {
   1: required string metricName;
   2: required i64 timestamp;
@@ -768,6 +784,18 @@ service Nimbus {
   StormTopology getUserTopology(1: string id) throws (1: NotAliveException e, 2: AuthorizationException aze);
   TopologyHistoryInfo getTopologyHistory(1: string user) throws (1: AuthorizationException aze);
   list<OwnerResourceSummary> getOwnerResourceSummaries (1: string owner) throws (1: AuthorizationException aze);
+  /**
+   * Get assigned assignments for a specific supervisor
+   */
+  SupervisorAssignments getSupervisorAssignments(1: string node) throws (1: AuthorizationException aze);
+  /**
+   * Send supervisor worker heartbeats for a specific supervisor
+   */
+  void sendSupervisorWorkerHeartbeats(1: SupervisorWorkerHeartbeats heartbeats) throws (1: AuthorizationException aze);
+  /**
+   * Send supervisor local worker heartbeat when a supervisor is unreachable
+   */
+  void sendSupervisorWorkerHeartbeat(1: SupervisorWorkerHeartbeat heatbeat) throws (1: AuthorizationException aze, 2: NotAliveException e);
   void processWorkerMetrics(1: WorkerMetrics metrics);
 }
 
@@ -858,10 +886,26 @@ exception HBExecutionException {
   1: required string msg;
 }
 
+service Supervisor {
+  /**
+   * Send node specific assignments to supervisor
+   */
+  void sendSupervisorAssignments(1: SupervisorAssignments assignments) throws (1: AuthorizationException aze);
+  /**
+   * Get local assignment for a storm
+   */
+  Assignment getLocalAssignmentForStorm(1: string id) throws (1: NotAliveException e, 2: AuthorizationException aze);
+  /**
+   * Send worker heartbeat to local supervisor
+   */
+  void sendSupervisorWorkerHeartbeat(1: SupervisorWorkerHeartbeat heartbeat) throws (1: AuthorizationException aze);
+}
+
 # WorkerTokens are used as credentials that allow a Worker to authenticate with DRPC, Nimbus, or other storm processes that we add in here.
 enum WorkerTokenServiceType {
     NIMBUS,
-    DRPC
+    DRPC,
+    SUPERVISOR
 }
 
 #This is information that we want to be sure users do not modify in any way...

http://git-wip-us.apache.org/repos/asf/storm/blob/0dac58b0/storm-client/test/jvm/org/apache/storm/assignments/LocalAssignmentsBackendTest.java
----------------------------------------------------------------------
diff --git a/storm-client/test/jvm/org/apache/storm/assignments/LocalAssignmentsBackendTest.java b/storm-client/test/jvm/org/apache/storm/assignments/LocalAssignmentsBackendTest.java
new file mode 100644
index 0000000..69777a8
--- /dev/null
+++ b/storm-client/test/jvm/org/apache/storm/assignments/LocalAssignmentsBackendTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.assignments;
+
+import org.apache.commons.collections.map.HashedMap;
+import org.apache.storm.generated.Assignment;
+import org.apache.storm.generated.NodeInfo;
+import org.apache.storm.utils.ConfigUtils;
+import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Set;
+import java.util.HashSet;
+
+
+public class LocalAssignmentsBackendTest {
+
+    @Test
+    public void testLocalAssignment(){
+        Map<String, Assignment> stormToAssignment = new HashMap<>();
+        String storm1 = "storm1";
+        String storm2 = "storm2";
+        Assignment ass1 = mockedAssignment(1);
+        Assignment ass2 = mockedAssignment(2);
+
+        ILocalAssignmentsBackend backend = LocalAssignmentsBackendFactory.getBackend(ConfigUtils.readStormConfig());
+        assertEquals(null, backend.getAssignment(storm1));
+        backend.keepOrUpdateAssignment(storm1, ass1);
+        backend.keepOrUpdateAssignment(storm2, ass2);
+        assertEquals(ass1, backend.getAssignment(storm1));
+        assertEquals(ass2, backend.getAssignment(storm2));
+        backend.clearStateForStorm(storm1);
+        assertEquals(null, backend.getAssignment(storm1));
+        backend.keepOrUpdateAssignment(storm1, ass1);
+        backend.keepOrUpdateAssignment(storm1, ass2);
+        assertEquals(ass2, backend.getAssignment(storm1));
+    }
+
+    @Test
+    public void testLocalIdInfo() {
+        String name1 = "name1";
+        String name2 = "name2";
+        String name3 = "name3";
+
+        String id1 = "id1";
+        String id2 = "id2";
+        String id3 = "id3";
+
+        ILocalAssignmentsBackend backend = LocalAssignmentsBackendFactory.getBackend(ConfigUtils.readStormConfig());
+        assertEquals(null, backend.getStormId(name3));
+        backend.keepStormId(name1, id1);
+        backend.keepStormId(name2, id2);
+        assertEquals(id1, backend.getStormId(name1));
+        assertEquals(id2, backend.getStormId(name2));
+        backend.deleteStormId(name1);
+        assertEquals(null, backend.getStormId(name1));
+        backend.clearStateForStorm(id2);
+        assertEquals(null, backend.getStormId(name2));
+        backend.keepStormId(name1, id1);
+        backend.keepStormId(name1, id3);
+        assertEquals(id3, backend.getStormId(name1));
+    }
+
+    private Assignment mockedAssignment(int i) {
+        Assignment ass = new Assignment();
+        ass.set_master_code_dir("master_code_dir" + i);
+        HashMap node_to_host = new HashMap();
+        node_to_host.put("node" + i, "host" + i);
+        ass.set_node_host(node_to_host);
+        Map<List<Long>, NodeInfo> executor_node_port = new HashMap<>();
+        Set<Long> nodePorts = new HashSet<>();
+        nodePorts.add(9723L);
+        executor_node_port.put(Arrays.asList(i + 0L), new NodeInfo("node" + i, nodePorts));
+        ass.set_executor_node_port(executor_node_port);
+        Map<List<Long>, Long> executor_start_time_secs = new HashMap<>();
+        executor_start_time_secs.put(Arrays.asList(1L), 12345L);
+        ass.set_executor_start_time_secs(executor_start_time_secs);
+        ass.set_worker_resources(new HashedMap());
+        ass.set_total_shared_off_heap(new HashedMap());
+        ass.set_owner("o");
+        return ass;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/0dac58b0/storm-client/test/jvm/org/apache/storm/cluster/StormClusterStateImplTest.java
----------------------------------------------------------------------
diff --git a/storm-client/test/jvm/org/apache/storm/cluster/StormClusterStateImplTest.java b/storm-client/test/jvm/org/apache/storm/cluster/StormClusterStateImplTest.java
index e2ef41e..8664701 100644
--- a/storm-client/test/jvm/org/apache/storm/cluster/StormClusterStateImplTest.java
+++ b/storm-client/test/jvm/org/apache/storm/cluster/StormClusterStateImplTest.java
@@ -18,9 +18,8 @@
 
 package org.apache.storm.cluster;
 
+import org.apache.storm.assignments.LocalAssignmentsBackendFactory;
 import org.apache.storm.callback.ZKStateChangedCallback;
-import org.apache.zookeeper.KeeperException;
-import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Matchers;
@@ -48,7 +47,7 @@ public class StormClusterStateImplTest {
     public void init() throws Exception {
         storage = Mockito.mock(IStateStorage.class);
         context = new ClusterStateContext();
-        state = new StormClusterStateImpl(storage, context, false /*solo*/);
+        state = new StormClusterStateImpl(storage, LocalAssignmentsBackendFactory.getDefault(), context, false /*solo*/);
     }
 
 

http://git-wip-us.apache.org/repos/asf/storm/blob/0dac58b0/storm-core/src/clj/org/apache/storm/command/shell_submission.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/command/shell_submission.clj b/storm-core/src/clj/org/apache/storm/command/shell_submission.clj
index 3514040..4805729 100644
--- a/storm-core/src/clj/org/apache/storm/command/shell_submission.clj
+++ b/storm-core/src/clj/org/apache/storm/command/shell_submission.clj
@@ -32,7 +32,7 @@
         root (.get conf Config/STORM_ZOOKEEPER_ROOT)
         zk (ClientZookeeper/mkClient conf servers port root (DefaultWatcherCallBack.) conf)
         ; since this is not a purpose to add to leader lock queue, passing nil as blob-store and topo cache is ok
-        zk-leader-elector (Zookeeper/zkLeaderElector conf zk nil nil)
+        zk-leader-elector (Zookeeper/zkLeaderElector conf zk nil nil nil nil)
         leader-nimbus (.getLeader zk-leader-elector)
         host (.getHost leader-nimbus)
         port (.getPort leader-nimbus)

http://git-wip-us.apache.org/repos/asf/storm/blob/0dac58b0/storm-core/test/clj/org/apache/storm/nimbus_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/nimbus_test.clj b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
index 37f5c74..d2066e8 100644
--- a/storm-core/test/clj/org/apache/storm/nimbus_test.clj
+++ b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
@@ -46,7 +46,8 @@
   (:import [org.apache.commons.io FileUtils])
   (:import [org.json.simple JSONValue])
   (:import [org.apache.storm.daemon StormCommon])
-  (:import [org.apache.storm.cluster IStormClusterState StormClusterStateImpl ClusterStateContext ClusterUtils])
+  (:import [org.apache.storm.cluster IStormClusterState StormClusterStateImpl ClusterStateContext ClusterUtils]
+           [org.apache.storm.assignments LocalAssignmentsBackendFactory])
   (:use [org.apache.storm util daemon-config config log])
   (:require [conjure.core])
 
@@ -168,7 +169,8 @@
     (log-warn "merged:" stats)
 
     (.workerHeartbeat state storm-id node port
-      (StatsUtil/thriftifyZkWorkerHb (StatsUtil/mkZkWorkerHb storm-id stats (int 10))))))
+      (StatsUtil/thriftifyZkWorkerHb (StatsUtil/mkZkWorkerHb storm-id stats (int 10))))
+    (.sendSupervisorWorkerHeartbeat (.getNimbus cluster) (StatsUtil/thriftifyRPCWorkerHb storm-id executor))))
 
 (defn slot-assignments [cluster storm-id]
   (let [state (.getClusterState cluster)
@@ -686,11 +688,12 @@
       (.submitTopology cluster "test3" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 5} topology)
       (bind storm-id3 (StormCommon/getStormId state "test3"))
       (.advanceClusterTime cluster 11)
-      (.removeStorm state storm-id3)
+      ;; this guarantees an immediate kill notification
+      (.killTopology (.getNimbus cluster) "test3")
+      (.advanceClusterTime cluster 41)
       (is (nil? (.stormBase state storm-id3 nil)))
       (is (nil? (.assignmentInfo state storm-id3 nil)))
 
-      (.advanceClusterTime cluster 11)
       (is (= 0 (count (.heartbeatStorms state))))
 
       ;; this guarantees that monitor thread won't trigger for 10 more seconds
@@ -801,6 +804,7 @@
       (.advanceClusterTime cluster 31)
       (is (not= ass1 (executor-assignment cluster storm-id executor-id1)))
       (is (= ass2 (executor-assignment cluster storm-id executor-id2)))  ; tests launch timeout
+
       (check-consistency cluster "test")
 
 
@@ -879,7 +883,8 @@
       (.advanceClusterTime cluster 13)
       (is (= ass1 (executor-assignment cluster storm-id executor-id1)))
       (is (= ass2 (executor-assignment cluster storm-id executor-id2)))
-      (.killSupervisor cluster "b")
+      ;; with rpc reporting mode, only heartbeats from killed supervisor will time out
+      (.killSupervisor cluster (.get_node ass2))
       (do-executor-heartbeat cluster storm-id executor-id1)
 
       (.advanceClusterTime cluster 11)
@@ -1311,7 +1316,7 @@
   (with-open [zk (InProcessZookeeper. )]
     (with-open [tmp-nimbus-dir (TmpPath.)
                 _ (MockedZookeeper. (proxy [Zookeeper] []
-                      (zkLeaderElectorImpl [conf zk blob-store tc] (MockLeaderElector. ))))]
+                      (zkLeaderElectorImpl [conf zk blob-store tc cluster-state acls] (MockLeaderElector. ))))]
       (let [nimbus-dir (.getPath tmp-nimbus-dir)]
         (letlocals
           (bind conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
@@ -1319,8 +1324,9 @@
                         STORM-CLUSTER-MODE "local"
                         STORM-ZOOKEEPER-PORT (.getPort zk)
                         STORM-LOCAL-DIR nimbus-dir}))
-          (bind cluster-state (ClusterUtils/mkStormClusterState conf (ClusterStateContext.)))
-          (bind nimbus (mk-nimbus conf (Nimbus$StandaloneINimbus.) nil nil nil nil))
+          (bind ass-backend (LocalAssignmentsBackendFactory/getDefault))
+          (bind cluster-state (ClusterUtils/mkStormClusterState conf ass-backend (ClusterStateContext.)))
+          (bind nimbus (mk-nimbus conf (Nimbus$StandaloneINimbus.) nil nil nil cluster-state))
           (.launchServer nimbus)
           (bind topology (Thrift/buildTopology
                            {"1" (Thrift/prepareSpoutDetails
@@ -1328,11 +1334,11 @@
                            {}))
 
           (with-open [_ (MockedZookeeper. (proxy [Zookeeper] []
-                          (zkLeaderElectorImpl [conf zk blob-store tc] (MockLeaderElector. false))))]
+                          (zkLeaderElectorImpl [conf zk blob-store tc  cluster-state acls] (MockLeaderElector. false))))]
 
             (letlocals
-              (bind non-leader-cluster-state (ClusterUtils/mkStormClusterState conf (ClusterStateContext.)))
-              (bind non-leader-nimbus (mk-nimbus conf (Nimbus$StandaloneINimbus.) nil nil nil nil))
+              (bind non-leader-cluster-state (ClusterUtils/mkStormClusterState conf ass-backend (ClusterStateContext.)))
+              (bind non-leader-nimbus (mk-nimbus conf (Nimbus$StandaloneINimbus.) nil nil nil non-leader-cluster-state))
               (.launchServer non-leader-nimbus)
 
               ;first we verify that the master nimbus can perform all actions, even with another nimbus present.
@@ -1374,7 +1380,8 @@
 (deftest test-nimbus-iface-submitTopologyWithOpts-checks-authorization
   (with-open [cluster (.build (doto (LocalCluster$Builder. )
                                       (.withDaemonConf {NIMBUS-AUTHORIZER
-                          "org.apache.storm.security.auth.authorizer.DenyAuthorizer"})))]
+                          "org.apache.storm.security.auth.authorizer.DenyAuthorizer"
+                          SUPERVISOR-AUTHORIZER "org.apache.storm.security.auth.authorizer.DenyAuthorizer"})))]
     (let [
           topology (Thrift/buildTopology {} {})
          ]
@@ -1395,7 +1402,8 @@
                             (.withClusterState cluster-state)
                             (.withBlobStore blob-store)
                             (.withTopoCache tc)
-                            (.withDaemonConf {NIMBUS-AUTHORIZER "org.apache.storm.security.auth.authorizer.DenyAuthorizer"})))]
+                            (.withDaemonConf {NIMBUS-AUTHORIZER "org.apache.storm.security.auth.authorizer.DenyAuthorizer"
+                                              SUPERVISOR-AUTHORIZER "org.apache.storm.security.auth.authorizer.DenyAuthorizer"})))]
       (let [nimbus (.getNimbus cluster)
             topology-name "test"
             topology-id "test-id"]
@@ -1417,7 +1425,8 @@
                             (.withBlobStore blob-store)
                             (.withTopoCache tc)
                             (.withNimbusWrapper (reify UnaryOperator (apply [this nimbus] (Mockito/spy nimbus))))
-                            (.withDaemonConf {NIMBUS-AUTHORIZER "org.apache.storm.security.auth.authorizer.NoopAuthorizer"})))]
+                            (.withDaemonConf {NIMBUS-AUTHORIZER "org.apache.storm.security.auth.authorizer.NoopAuthorizer"
+                                              SUPERVISOR-AUTHORIZER "org.apache.storm.security.auth.authorizer.NoopAuthorizer"})))]
     (let [nimbus (.getNimbus cluster)
           topology-name "test-nimbus-check-autho-params"
           topology-id "fake-id"
@@ -1476,7 +1485,8 @@
                             (.withBlobStore blob-store)
                             (.withTopoCache tc)
                             (.withNimbusWrapper (reify UnaryOperator (apply [this nimbus] (Mockito/spy nimbus))))
-                            (.withDaemonConf {NIMBUS-AUTHORIZER "org.apache.storm.security.auth.authorizer.NoopAuthorizer"})))]
+                            (.withDaemonConf {NIMBUS-AUTHORIZER "org.apache.storm.security.auth.authorizer.NoopAuthorizer"
+                                              SUPERVISOR-AUTHORIZER "org.apache.storm.security.auth.authorizer.NoopAuthorizer"})))]
     (let [nimbus (.getNimbus cluster)
           expected-name "test-nimbus-check-autho-params"
           expected-conf {TOPOLOGY-NAME expected-name
@@ -1506,7 +1516,7 @@
       (.thenReturn (Mockito/when (.allSupervisorInfo cluster-state)) all-supervisors)
       (.thenReturn (Mockito/when (.readTopoConf tc (Mockito/any String) (Mockito/any Subject))) expected-conf)
       (.thenReturn (Mockito/when (.readTopology tc (Mockito/any String) (Mockito/any Subject))) topology)
-      (.thenReturn (Mockito/when (.topologyAssignments cluster-state)) topo-assignment)
+      (.thenReturn (Mockito/when (.assignmentsInfo cluster-state)) topo-assignment)
       (.getSupervisorPageInfo nimbus "super1" nil true)
 
       ;; afterwards, it should get called twice
@@ -1630,7 +1640,8 @@
                             (.withClusterState cluster-state)
                             (.withBlobStore blob-store)
                             (.withTopoCache tc)
-                            (.withDaemonConf {NIMBUS-AUTHORIZER "org.apache.storm.security.auth.authorizer.NoopAuthorizer"})))]
+                            (.withDaemonConf {NIMBUS-AUTHORIZER "org.apache.storm.security.auth.authorizer.NoopAuthorizer"
+                                              SUPERVISOR-AUTHORIZER "org.apache.storm.security.auth.authorizer.NoopAuthorizer"})))]
       (.thenReturn (Mockito/when (.getTopoId cluster-state "test")) (Optional/empty))
       (let [topology (Thrift/buildTopology {} {})
             bad-config {"topology.isolate.machines" "2"}]
@@ -1676,7 +1687,7 @@
   (with-open [zk (InProcessZookeeper. )]
     (with-open [tmp-nimbus-dir (TmpPath.)
                 _ (MockedZookeeper. (proxy [Zookeeper] []
-                    (zkLeaderElectorImpl [conf zk blob-store tc] (MockLeaderElector. ))))]
+                    (zkLeaderElectorImpl [conf zk blob-store tc cluster-state acls] (MockLeaderElector. ))))]
       (let [nimbus-dir (.getPath tmp-nimbus-dir)]
         (letlocals
           (bind conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
@@ -1745,7 +1756,8 @@
                             (.withClusterState cluster-state)
                             (.withBlobStore blob-store)
                             (.withTopoCache tc)
-                            (.withDaemonConf {NIMBUS-AUTHORIZER "org.apache.storm.security.auth.authorizer.NoopAuthorizer"})))]
+                            (.withDaemonConf {NIMBUS-AUTHORIZER "org.apache.storm.security.auth.authorizer.NoopAuthorizer"
+                                              SUPERVISOR-AUTHORIZER "org.apache.storm.security.auth.authorizer.NoopAuthorizer"})))]
       (let [nimbus (.getNimbus cluster)
             previous-config (LogConfig.)
             mock-config (LogConfig.)
@@ -1776,7 +1788,8 @@
                             (.withClusterState cluster-state)
                             (.withBlobStore blob-store)
                             (.withTopoCache tc)
-                            (.withDaemonConf {NIMBUS-AUTHORIZER "org.apache.storm.security.auth.authorizer.NoopAuthorizer"})))]
+                            (.withDaemonConf {NIMBUS-AUTHORIZER "org.apache.storm.security.auth.authorizer.NoopAuthorizer"
+                                              SUPERVISOR-AUTHORIZER "org.apache.storm.security.auth.authorizer.NoopAuthorizer"})))]
       (let [nimbus (.getNimbus cluster)
             previous-config (LogConfig.)
             mock-config (LogConfig.)
@@ -1874,7 +1887,7 @@
         mock-blob-store (Mockito/mock BlobStore)
         conf {NIMBUS-MONITOR-FREQ-SECS 10}]
     (with-open [_ (MockedZookeeper. (proxy [Zookeeper] []
-                    (zkLeaderElectorImpl [conf zk blob-store tc] (MockLeaderElector. ))))]
+                    (zkLeaderElectorImpl [conf zk blob-store tc cluster-state acls] (MockLeaderElector. ))))]
       (let [nimbus (Mockito/spy (Nimbus. conf nil mock-state nil mock-blob-store nil nil))]
         (.set (.getHeartbeatsCache nimbus) hb-cache)
         (.thenReturn (Mockito/when (.storedTopoIds mock-blob-store)) (HashSet. inactive-topos))
@@ -1914,7 +1927,7 @@
         mock-blob-store (Mockito/mock BlobStore)
         conf {NIMBUS-MONITOR-FREQ-SECS 10}]
     (with-open [_ (MockedZookeeper. (proxy [Zookeeper] []
-                    (zkLeaderElectorImpl [conf zk blob-store tc] (MockLeaderElector. ))))]
+                    (zkLeaderElectorImpl [conf zk blob-store tc cluster-state acls] (MockLeaderElector. ))))]
       (let [nimbus (Mockito/spy (Nimbus. conf nil mock-state nil mock-blob-store nil nil))]
         (.set (.getHeartbeatsCache nimbus) hb-cache)
         (.thenReturn (Mockito/when (.storedTopoIds mock-blob-store)) (set inactive-topos))

http://git-wip-us.apache.org/repos/asf/storm/blob/0dac58b0/storm-core/test/clj/org/apache/storm/security/auth/nimbus_auth_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/security/auth/nimbus_auth_test.clj b/storm-core/test/clj/org/apache/storm/security/auth/nimbus_auth_test.clj
index abc1579..148afd8 100644
--- a/storm-core/test/clj/org/apache/storm/security/auth/nimbus_auth_test.clj
+++ b/storm-core/test/clj/org/apache/storm/security/auth/nimbus_auth_test.clj
@@ -38,6 +38,7 @@
 
 (defn to-conf [nimbus-port login-cfg aznClass transportPluginClass]
   (let [conf {NIMBUS-AUTHORIZER aznClass
+              SUPERVISOR-AUTHORIZER "org.apache.storm.security.auth.authorizer.NoopAuthorizer"
               NIMBUS-THRIFT-PORT nimbus-port
               STORM-THRIFT-TRANSPORT-PLUGIN transportPluginClass }
          conf (if login-cfg (merge conf {"java.security.auth.login.config" login-cfg}) conf)]
@@ -78,6 +79,7 @@
                             (.withNimbusDaemon)
                             (.withDaemonConf
                                {NIMBUS-AUTHORIZER "org.apache.storm.security.auth.authorizer.NoopAuthorizer"
+                                SUPERVISOR-AUTHORIZER "org.apache.storm.security.auth.authorizer.NoopAuthorizer"
                                 NIMBUS-THRIFT-PORT 0
                                 STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.SimpleTransportPlugin"})))]
       (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
@@ -106,6 +108,7 @@
                             (.withNimbusDaemon)
                             (.withDaemonConf
                                {NIMBUS-AUTHORIZER "org.apache.storm.security.auth.authorizer.DenyAuthorizer"
+                                SUPERVISOR-AUTHORIZER "org.apache.storm.security.auth.authorizer.DenyAuthorizer"
                                 NIMBUS-THRIFT-PORT 0
                                 STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.SimpleTransportPlugin"})))]
       (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
@@ -170,6 +173,7 @@
                             (.withNimbusDaemon)
                             (.withDaemonConf
                                {NIMBUS-AUTHORIZER "org.apache.storm.security.auth.authorizer.DenyAuthorizer"
+                                SUPERVISOR-AUTHORIZER "org.apache.storm.security.auth.authorizer.DenyAuthorizer"
                                 NIMBUS-THRIFT-PORT 0
                                 "java.security.auth.login.config" "test/clj/org/apache/storm/security/auth/jaas_digest.conf"
                                 STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.digest.DigestSaslTransportPlugin"})))]

http://git-wip-us.apache.org/repos/asf/storm/blob/0dac58b0/storm-core/test/jvm/org/apache/storm/stats/TestStatsUtil.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/stats/TestStatsUtil.java b/storm-core/test/jvm/org/apache/storm/stats/TestStatsUtil.java
index 78b032c..1ca793b 100644
--- a/storm-core/test/jvm/org/apache/storm/stats/TestStatsUtil.java
+++ b/storm-core/test/jvm/org/apache/storm/stats/TestStatsUtil.java
@@ -70,14 +70,8 @@ public class TestStatsUtil {
         HashMap<String, Object> exec2Beat = new HashMap<String, Object>();
         exec2Beat.put("uptime", 200);
 
-        Map<String, Object> beat1 = new HashMap<String, Object>();
-        beat1.put("heartbeat", exec1Beat);
-
-        Map<String, Object> beat2 = new HashMap<String, Object>();
-        beat2.put("heartbeat", exec2Beat);
-
-        beats.put(exec1, beat1);
-        beats.put(exec2, beat2);
+        beats.put(exec1, exec1Beat);
+        beats.put(exec2, exec2Beat);
 
         task2Component.put(1, "my-component");
         task2Component.put(2, "__sys1");

http://git-wip-us.apache.org/repos/asf/storm/blob/0dac58b0/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
index f7388f5..ec1815d 100644
--- a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
+++ b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
@@ -38,6 +38,7 @@ import org.apache.storm.scheduler.blacklist.reporters.IReporter;
 import org.apache.storm.scheduler.blacklist.strategies.IBlacklistStrategy;
 import org.apache.storm.scheduler.resource.strategies.priority.ISchedulingPriorityStrategy;
 import org.apache.storm.scheduler.resource.strategies.scheduling.IStrategy;
+import org.apache.storm.security.auth.IAuthorizer;
 import org.apache.storm.validation.ConfigValidation;
 import org.apache.storm.validation.Validated;
 
@@ -264,6 +265,13 @@ public class DaemonConfig implements Validated {
     public static final String NIMBUS_AUTHORIZER = "nimbus.authorizer";
 
     /**
+     * Class name for authorization plugin for supervisor.
+     */
+    @isImplementationOfClass(implementsClass = IAuthorizer.class)
+    @isString
+    public static final String SUPERVISOR_AUTHORIZER = "supervisor.authorizer";
+
+    /**
      * Impersonation user ACL config entries.
      */
     @isString
@@ -283,6 +291,24 @@ public class DaemonConfig implements Validated {
     public static final String NIMBUS_TOPOLOGY_ACTION_NOTIFIER_PLUGIN = "nimbus.topology.action.notifier.plugin.class";
 
     /**
+     * This controls the number of working threads for distributing master assignments to supervisors.
+     */
+    @isInteger
+    public static final String NIMBUS_ASSIGNMENTS_SERVICE_THREADS = "nimbus.assignments.service.threads";
+
+    /**
+     * This controls the number of working thread queue size of assignment service.
+     */
+    @isInteger
+    public static final String NIMBUS_ASSIGNMENTS_SERVICE_THREAD_QUEUE_SIZE = "nimbus.assignments.service.thread.queue.size";
+
+    /**
+     * class controls heartbeats recovery strategy
+     */
+    @isString
+    public static final String NIMBUS_WORKER_HEARTBEATS_RECOVERY_STRATEGY_CLASS = "nimbus.worker.heartbeats.recovery.strategy.class";
+
+    /**
      * Storm UI binds to this host/interface.
      */
     @isString

http://git-wip-us.apache.org/repos/asf/storm/blob/0dac58b0/storm-server/src/main/java/org/apache/storm/LocalCluster.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/LocalCluster.java b/storm-server/src/main/java/org/apache/storm/LocalCluster.java
index 39c8d57..7db41b5 100644
--- a/storm-server/src/main/java/org/apache/storm/LocalCluster.java
+++ b/storm-server/src/main/java/org/apache/storm/LocalCluster.java
@@ -75,6 +75,9 @@ import org.apache.storm.generated.SettableBlobMeta;
 import org.apache.storm.generated.StormTopology;
 import org.apache.storm.generated.SubmitOptions;
 import org.apache.storm.generated.SupervisorPageInfo;
+import org.apache.storm.generated.SupervisorAssignments;
+import org.apache.storm.generated.SupervisorWorkerHeartbeats;
+import org.apache.storm.generated.SupervisorWorkerHeartbeat;
 import org.apache.storm.generated.TopologyHistoryInfo;
 import org.apache.storm.generated.TopologyInfo;
 import org.apache.storm.generated.TopologyPageInfo;
@@ -808,6 +811,8 @@ public class LocalCluster implements ILocalClusterTrackedTopologyAware, Iface {
         
         Supervisor s = new Supervisor(superConf, sharedContext, isuper);
         s.launch();
+        s.setLocalNimbus(this.nimbus);
+        this.nimbus.addSupervisor(s);
         supervisors.add(s);
         return s;
     }
@@ -1138,6 +1143,20 @@ public class LocalCluster implements ILocalClusterTrackedTopologyAware, Iface {
     }
 
     @Override
+    public SupervisorAssignments getSupervisorAssignments(String node) throws AuthorizationException, TException {
+        return null;
+    }
+
+    @Override
+    public void sendSupervisorWorkerHeartbeats(SupervisorWorkerHeartbeats heartbeats) throws AuthorizationException, TException {
+
+    }
+
+    @Override
+    public void sendSupervisorWorkerHeartbeat(SupervisorWorkerHeartbeat heatbeat) throws AuthorizationException, TException {
+
+    }
+
     public void processWorkerMetrics(WorkerMetrics metrics) throws org.apache.thrift.TException {
         getNimbus().processWorkerMetrics(metrics);
     }