You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2018/03/28 21:22:14 UTC
[04/10] storm git commit: [STORM-2693] Heartbeats and assignments
promotion for storm2.0
http://git-wip-us.apache.org/repos/asf/storm/blob/0dac58b0/storm-client/src/py/storm/ttypes.py
----------------------------------------------------------------------
diff --git a/storm-client/src/py/storm/ttypes.py b/storm-client/src/py/storm/ttypes.py
index 2ae3605..f6c98b0 100644
--- a/storm-client/src/py/storm/ttypes.py
+++ b/storm-client/src/py/storm/ttypes.py
@@ -244,15 +244,18 @@ class HBServerMessageType:
class WorkerTokenServiceType:
NIMBUS = 0
DRPC = 1
+ SUPERVISOR = 2
_VALUES_TO_NAMES = {
0: "NIMBUS",
1: "DRPC",
+ 2: "SUPERVISOR",
}
_NAMES_TO_VALUES = {
"NIMBUS": 0,
"DRPC": 1,
+ "SUPERVISOR": 2,
}
@@ -9165,6 +9168,7 @@ class SupervisorInfo:
- time_secs
- hostname
- assignment_id
+ - server_port
- used_ports
- meta
- scheduler_meta
@@ -9178,18 +9182,20 @@ class SupervisorInfo:
(1, TType.I64, 'time_secs', None, None, ), # 1
(2, TType.STRING, 'hostname', None, None, ), # 2
(3, TType.STRING, 'assignment_id', None, None, ), # 3
- (4, TType.LIST, 'used_ports', (TType.I64,None), None, ), # 4
- (5, TType.LIST, 'meta', (TType.I64,None), None, ), # 5
- (6, TType.MAP, 'scheduler_meta', (TType.STRING,None,TType.STRING,None), None, ), # 6
- (7, TType.I64, 'uptime_secs', None, None, ), # 7
- (8, TType.STRING, 'version', None, None, ), # 8
- (9, TType.MAP, 'resources_map', (TType.STRING,None,TType.DOUBLE,None), None, ), # 9
+ (4, TType.I32, 'server_port', None, None, ), # 4
+ (5, TType.LIST, 'used_ports', (TType.I64,None), None, ), # 5
+ (6, TType.LIST, 'meta', (TType.I64,None), None, ), # 6
+ (7, TType.MAP, 'scheduler_meta', (TType.STRING,None,TType.STRING,None), None, ), # 7
+ (8, TType.I64, 'uptime_secs', None, None, ), # 8
+ (9, TType.STRING, 'version', None, None, ), # 9
+ (10, TType.MAP, 'resources_map', (TType.STRING,None,TType.DOUBLE,None), None, ), # 10
)
- def __init__(self, time_secs=None, hostname=None, assignment_id=None, used_ports=None, meta=None, scheduler_meta=None, uptime_secs=None, version=None, resources_map=None,):
+ def __init__(self, time_secs=None, hostname=None, assignment_id=None, server_port=None, used_ports=None, meta=None, scheduler_meta=None, uptime_secs=None, version=None, resources_map=None,):
self.time_secs = time_secs
self.hostname = hostname
self.assignment_id = assignment_id
+ self.server_port = server_port
self.used_ports = used_ports
self.meta = meta
self.scheduler_meta = scheduler_meta
@@ -9222,6 +9228,11 @@ class SupervisorInfo:
else:
iprot.skip(ftype)
elif fid == 4:
+ if ftype == TType.I32:
+ self.server_port = iprot.readI32()
+ else:
+ iprot.skip(ftype)
+ elif fid == 5:
if ftype == TType.LIST:
self.used_ports = []
(_etype559, _size556) = iprot.readListBegin()
@@ -9231,7 +9242,7 @@ class SupervisorInfo:
iprot.readListEnd()
else:
iprot.skip(ftype)
- elif fid == 5:
+ elif fid == 6:
if ftype == TType.LIST:
self.meta = []
(_etype565, _size562) = iprot.readListBegin()
@@ -9241,7 +9252,7 @@ class SupervisorInfo:
iprot.readListEnd()
else:
iprot.skip(ftype)
- elif fid == 6:
+ elif fid == 7:
if ftype == TType.MAP:
self.scheduler_meta = {}
(_ktype569, _vtype570, _size568 ) = iprot.readMapBegin()
@@ -9252,17 +9263,17 @@ class SupervisorInfo:
iprot.readMapEnd()
else:
iprot.skip(ftype)
- elif fid == 7:
+ elif fid == 8:
if ftype == TType.I64:
self.uptime_secs = iprot.readI64()
else:
iprot.skip(ftype)
- elif fid == 8:
+ elif fid == 9:
if ftype == TType.STRING:
self.version = iprot.readString().decode('utf-8')
else:
iprot.skip(ftype)
- elif fid == 9:
+ elif fid == 10:
if ftype == TType.MAP:
self.resources_map = {}
(_ktype576, _vtype577, _size575 ) = iprot.readMapBegin()
@@ -9295,22 +9306,26 @@ class SupervisorInfo:
oprot.writeFieldBegin('assignment_id', TType.STRING, 3)
oprot.writeString(self.assignment_id.encode('utf-8'))
oprot.writeFieldEnd()
+ if self.server_port is not None:
+ oprot.writeFieldBegin('server_port', TType.I32, 4)
+ oprot.writeI32(self.server_port)
+ oprot.writeFieldEnd()
if self.used_ports is not None:
- oprot.writeFieldBegin('used_ports', TType.LIST, 4)
+ oprot.writeFieldBegin('used_ports', TType.LIST, 5)
oprot.writeListBegin(TType.I64, len(self.used_ports))
for iter582 in self.used_ports:
oprot.writeI64(iter582)
oprot.writeListEnd()
oprot.writeFieldEnd()
if self.meta is not None:
- oprot.writeFieldBegin('meta', TType.LIST, 5)
+ oprot.writeFieldBegin('meta', TType.LIST, 6)
oprot.writeListBegin(TType.I64, len(self.meta))
for iter583 in self.meta:
oprot.writeI64(iter583)
oprot.writeListEnd()
oprot.writeFieldEnd()
if self.scheduler_meta is not None:
- oprot.writeFieldBegin('scheduler_meta', TType.MAP, 6)
+ oprot.writeFieldBegin('scheduler_meta', TType.MAP, 7)
oprot.writeMapBegin(TType.STRING, TType.STRING, len(self.scheduler_meta))
for kiter584,viter585 in self.scheduler_meta.items():
oprot.writeString(kiter584.encode('utf-8'))
@@ -9318,15 +9333,15 @@ class SupervisorInfo:
oprot.writeMapEnd()
oprot.writeFieldEnd()
if self.uptime_secs is not None:
- oprot.writeFieldBegin('uptime_secs', TType.I64, 7)
+ oprot.writeFieldBegin('uptime_secs', TType.I64, 8)
oprot.writeI64(self.uptime_secs)
oprot.writeFieldEnd()
if self.version is not None:
- oprot.writeFieldBegin('version', TType.STRING, 8)
+ oprot.writeFieldBegin('version', TType.STRING, 9)
oprot.writeString(self.version.encode('utf-8'))
oprot.writeFieldEnd()
if self.resources_map is not None:
- oprot.writeFieldBegin('resources_map', TType.MAP, 9)
+ oprot.writeFieldBegin('resources_map', TType.MAP, 10)
oprot.writeMapBegin(TType.STRING, TType.DOUBLE, len(self.resources_map))
for kiter586,viter587 in self.resources_map.items():
oprot.writeString(kiter586.encode('utf-8'))
@@ -9349,6 +9364,7 @@ class SupervisorInfo:
value = (value * 31) ^ hash(self.time_secs)
value = (value * 31) ^ hash(self.hostname)
value = (value * 31) ^ hash(self.assignment_id)
+ value = (value * 31) ^ hash(self.server_port)
value = (value * 31) ^ hash(self.used_ports)
value = (value * 31) ^ hash(self.meta)
value = (value * 31) ^ hash(self.scheduler_meta)
@@ -11838,6 +11854,283 @@ class OwnerResourceSummary:
def __ne__(self, other):
return not (self == other)
+class SupervisorWorkerHeartbeat:
+ """
+ Attributes:
+ - storm_id
+ - executors
+ - time_secs
+ """
+
+ thrift_spec = (
+ None, # 0
+ (1, TType.STRING, 'storm_id', None, None, ), # 1
+ (2, TType.LIST, 'executors', (TType.STRUCT,(ExecutorInfo, ExecutorInfo.thrift_spec)), None, ), # 2
+ (3, TType.I32, 'time_secs', None, None, ), # 3
+ )
+
+ def __init__(self, storm_id=None, executors=None, time_secs=None,):
+ self.storm_id = storm_id
+ self.executors = executors
+ self.time_secs = time_secs
+
+ def read(self, iprot):
+ if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+ fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+ return
+ iprot.readStructBegin()
+ while True:
+ (fname, ftype, fid) = iprot.readFieldBegin()
+ if ftype == TType.STOP:
+ break
+ if fid == 1:
+ if ftype == TType.STRING:
+ self.storm_id = iprot.readString().decode('utf-8')
+ else:
+ iprot.skip(ftype)
+ elif fid == 2:
+ if ftype == TType.LIST:
+ self.executors = []
+ (_etype780, _size777) = iprot.readListBegin()
+ for _i781 in xrange(_size777):
+ _elem782 = ExecutorInfo()
+ _elem782.read(iprot)
+ self.executors.append(_elem782)
+ iprot.readListEnd()
+ else:
+ iprot.skip(ftype)
+ elif fid == 3:
+ if ftype == TType.I32:
+ self.time_secs = iprot.readI32()
+ else:
+ iprot.skip(ftype)
+ else:
+ iprot.skip(ftype)
+ iprot.readFieldEnd()
+ iprot.readStructEnd()
+
+ def write(self, oprot):
+ if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+ oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+ return
+ oprot.writeStructBegin('SupervisorWorkerHeartbeat')
+ if self.storm_id is not None:
+ oprot.writeFieldBegin('storm_id', TType.STRING, 1)
+ oprot.writeString(self.storm_id.encode('utf-8'))
+ oprot.writeFieldEnd()
+ if self.executors is not None:
+ oprot.writeFieldBegin('executors', TType.LIST, 2)
+ oprot.writeListBegin(TType.STRUCT, len(self.executors))
+ for iter783 in self.executors:
+ iter783.write(oprot)
+ oprot.writeListEnd()
+ oprot.writeFieldEnd()
+ if self.time_secs is not None:
+ oprot.writeFieldBegin('time_secs', TType.I32, 3)
+ oprot.writeI32(self.time_secs)
+ oprot.writeFieldEnd()
+ oprot.writeFieldStop()
+ oprot.writeStructEnd()
+
+ def validate(self):
+ if self.storm_id is None:
+ raise TProtocol.TProtocolException(message='Required field storm_id is unset!')
+ if self.executors is None:
+ raise TProtocol.TProtocolException(message='Required field executors is unset!')
+ if self.time_secs is None:
+ raise TProtocol.TProtocolException(message='Required field time_secs is unset!')
+ return
+
+
+ def __hash__(self):
+ value = 17
+ value = (value * 31) ^ hash(self.storm_id)
+ value = (value * 31) ^ hash(self.executors)
+ value = (value * 31) ^ hash(self.time_secs)
+ return value
+
+ def __repr__(self):
+ L = ['%s=%r' % (key, value)
+ for key, value in self.__dict__.iteritems()]
+ return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
+
+class SupervisorWorkerHeartbeats:
+ """
+ Attributes:
+ - supervisor_id
+ - worker_heartbeats
+ """
+
+ thrift_spec = (
+ None, # 0
+ (1, TType.STRING, 'supervisor_id', None, None, ), # 1
+ (2, TType.LIST, 'worker_heartbeats', (TType.STRUCT,(SupervisorWorkerHeartbeat, SupervisorWorkerHeartbeat.thrift_spec)), None, ), # 2
+ )
+
+ def __init__(self, supervisor_id=None, worker_heartbeats=None,):
+ self.supervisor_id = supervisor_id
+ self.worker_heartbeats = worker_heartbeats
+
+ def read(self, iprot):
+ if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+ fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+ return
+ iprot.readStructBegin()
+ while True:
+ (fname, ftype, fid) = iprot.readFieldBegin()
+ if ftype == TType.STOP:
+ break
+ if fid == 1:
+ if ftype == TType.STRING:
+ self.supervisor_id = iprot.readString().decode('utf-8')
+ else:
+ iprot.skip(ftype)
+ elif fid == 2:
+ if ftype == TType.LIST:
+ self.worker_heartbeats = []
+ (_etype787, _size784) = iprot.readListBegin()
+ for _i788 in xrange(_size784):
+ _elem789 = SupervisorWorkerHeartbeat()
+ _elem789.read(iprot)
+ self.worker_heartbeats.append(_elem789)
+ iprot.readListEnd()
+ else:
+ iprot.skip(ftype)
+ else:
+ iprot.skip(ftype)
+ iprot.readFieldEnd()
+ iprot.readStructEnd()
+
+ def write(self, oprot):
+ if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+ oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+ return
+ oprot.writeStructBegin('SupervisorWorkerHeartbeats')
+ if self.supervisor_id is not None:
+ oprot.writeFieldBegin('supervisor_id', TType.STRING, 1)
+ oprot.writeString(self.supervisor_id.encode('utf-8'))
+ oprot.writeFieldEnd()
+ if self.worker_heartbeats is not None:
+ oprot.writeFieldBegin('worker_heartbeats', TType.LIST, 2)
+ oprot.writeListBegin(TType.STRUCT, len(self.worker_heartbeats))
+ for iter790 in self.worker_heartbeats:
+ iter790.write(oprot)
+ oprot.writeListEnd()
+ oprot.writeFieldEnd()
+ oprot.writeFieldStop()
+ oprot.writeStructEnd()
+
+ def validate(self):
+ if self.supervisor_id is None:
+ raise TProtocol.TProtocolException(message='Required field supervisor_id is unset!')
+ if self.worker_heartbeats is None:
+ raise TProtocol.TProtocolException(message='Required field worker_heartbeats is unset!')
+ return
+
+
+ def __hash__(self):
+ value = 17
+ value = (value * 31) ^ hash(self.supervisor_id)
+ value = (value * 31) ^ hash(self.worker_heartbeats)
+ return value
+
+ def __repr__(self):
+ L = ['%s=%r' % (key, value)
+ for key, value in self.__dict__.iteritems()]
+ return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
+
+class SupervisorAssignments:
+ """
+ Attributes:
+ - storm_assignment
+ """
+
+ thrift_spec = (
+ None, # 0
+ (1, TType.MAP, 'storm_assignment', (TType.STRING,None,TType.STRUCT,(Assignment, Assignment.thrift_spec)), {
+ }, ), # 1
+ )
+
+ def __init__(self, storm_assignment=thrift_spec[1][4],):
+ if storm_assignment is self.thrift_spec[1][4]:
+ storm_assignment = {
+ }
+ self.storm_assignment = storm_assignment
+
+ def read(self, iprot):
+ if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+ fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+ return
+ iprot.readStructBegin()
+ while True:
+ (fname, ftype, fid) = iprot.readFieldBegin()
+ if ftype == TType.STOP:
+ break
+ if fid == 1:
+ if ftype == TType.MAP:
+ self.storm_assignment = {}
+ (_ktype792, _vtype793, _size791 ) = iprot.readMapBegin()
+ for _i795 in xrange(_size791):
+ _key796 = iprot.readString().decode('utf-8')
+ _val797 = Assignment()
+ _val797.read(iprot)
+ self.storm_assignment[_key796] = _val797
+ iprot.readMapEnd()
+ else:
+ iprot.skip(ftype)
+ else:
+ iprot.skip(ftype)
+ iprot.readFieldEnd()
+ iprot.readStructEnd()
+
+ def write(self, oprot):
+ if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+ oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+ return
+ oprot.writeStructBegin('SupervisorAssignments')
+ if self.storm_assignment is not None:
+ oprot.writeFieldBegin('storm_assignment', TType.MAP, 1)
+ oprot.writeMapBegin(TType.STRING, TType.STRUCT, len(self.storm_assignment))
+ for kiter798,viter799 in self.storm_assignment.items():
+ oprot.writeString(kiter798.encode('utf-8'))
+ viter799.write(oprot)
+ oprot.writeMapEnd()
+ oprot.writeFieldEnd()
+ oprot.writeFieldStop()
+ oprot.writeStructEnd()
+
+ def validate(self):
+ return
+
+
+ def __hash__(self):
+ value = 17
+ value = (value * 31) ^ hash(self.storm_assignment)
+ return value
+
+ def __repr__(self):
+ L = ['%s=%r' % (key, value)
+ for key, value in self.__dict__.iteritems()]
+ return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
+
class WorkerMetricPoint:
"""
Attributes:
@@ -12006,11 +12299,11 @@ class WorkerMetricList:
if fid == 1:
if ftype == TType.LIST:
self.metrics = []
- (_etype780, _size777) = iprot.readListBegin()
- for _i781 in xrange(_size777):
- _elem782 = WorkerMetricPoint()
- _elem782.read(iprot)
- self.metrics.append(_elem782)
+ (_etype803, _size800) = iprot.readListBegin()
+ for _i804 in xrange(_size800):
+ _elem805 = WorkerMetricPoint()
+ _elem805.read(iprot)
+ self.metrics.append(_elem805)
iprot.readListEnd()
else:
iprot.skip(ftype)
@@ -12027,8 +12320,8 @@ class WorkerMetricList:
if self.metrics is not None:
oprot.writeFieldBegin('metrics', TType.LIST, 1)
oprot.writeListBegin(TType.STRUCT, len(self.metrics))
- for iter783 in self.metrics:
- iter783.write(oprot)
+ for iter806 in self.metrics:
+ iter806.write(oprot)
oprot.writeListEnd()
oprot.writeFieldEnd()
oprot.writeFieldStop()
@@ -12438,11 +12731,11 @@ class HBRecords:
if fid == 1:
if ftype == TType.LIST:
self.pulses = []
- (_etype787, _size784) = iprot.readListBegin()
- for _i788 in xrange(_size784):
- _elem789 = HBPulse()
- _elem789.read(iprot)
- self.pulses.append(_elem789)
+ (_etype810, _size807) = iprot.readListBegin()
+ for _i811 in xrange(_size807):
+ _elem812 = HBPulse()
+ _elem812.read(iprot)
+ self.pulses.append(_elem812)
iprot.readListEnd()
else:
iprot.skip(ftype)
@@ -12459,8 +12752,8 @@ class HBRecords:
if self.pulses is not None:
oprot.writeFieldBegin('pulses', TType.LIST, 1)
oprot.writeListBegin(TType.STRUCT, len(self.pulses))
- for iter790 in self.pulses:
- iter790.write(oprot)
+ for iter813 in self.pulses:
+ iter813.write(oprot)
oprot.writeListEnd()
oprot.writeFieldEnd()
oprot.writeFieldStop()
@@ -12512,10 +12805,10 @@ class HBNodes:
if fid == 1:
if ftype == TType.LIST:
self.pulseIds = []
- (_etype794, _size791) = iprot.readListBegin()
- for _i795 in xrange(_size791):
- _elem796 = iprot.readString().decode('utf-8')
- self.pulseIds.append(_elem796)
+ (_etype817, _size814) = iprot.readListBegin()
+ for _i818 in xrange(_size814):
+ _elem819 = iprot.readString().decode('utf-8')
+ self.pulseIds.append(_elem819)
iprot.readListEnd()
else:
iprot.skip(ftype)
@@ -12532,8 +12825,8 @@ class HBNodes:
if self.pulseIds is not None:
oprot.writeFieldBegin('pulseIds', TType.LIST, 1)
oprot.writeListBegin(TType.STRING, len(self.pulseIds))
- for iter797 in self.pulseIds:
- oprot.writeString(iter797.encode('utf-8'))
+ for iter820 in self.pulseIds:
+ oprot.writeString(iter820.encode('utf-8'))
oprot.writeListEnd()
oprot.writeFieldEnd()
oprot.writeFieldStop()
http://git-wip-us.apache.org/repos/asf/storm/blob/0dac58b0/storm-client/src/storm.thrift
----------------------------------------------------------------------
diff --git a/storm-client/src/storm.thrift b/storm-client/src/storm.thrift
index 81b71f3..e0935bd 100644
--- a/storm-client/src/storm.thrift
+++ b/storm-client/src/storm.thrift
@@ -483,6 +483,7 @@ struct SupervisorInfo {
7: optional i64 uptime_secs;
8: optional string version;
9: optional map<string, double> resources_map;
+ 10: optional i32 server_port;
}
struct NodeInfo {
@@ -672,6 +673,21 @@ struct OwnerResourceSummary {
18: optional double assigned_off_heap_memory;
}
+struct SupervisorWorkerHeartbeat {
+ 1: required string storm_id;
+ 2: required list<ExecutorInfo> executors
+ 3: required i32 time_secs;
+}
+
+struct SupervisorWorkerHeartbeats {
+ 1: required string supervisor_id;
+ 2: required list<SupervisorWorkerHeartbeat> worker_heartbeats;
+}
+
+struct SupervisorAssignments {
+ 1: optional map<string, Assignment> storm_assignment = {}
+}
+
struct WorkerMetricPoint {
1: required string metricName;
2: required i64 timestamp;
@@ -768,6 +784,18 @@ service Nimbus {
StormTopology getUserTopology(1: string id) throws (1: NotAliveException e, 2: AuthorizationException aze);
TopologyHistoryInfo getTopologyHistory(1: string user) throws (1: AuthorizationException aze);
list<OwnerResourceSummary> getOwnerResourceSummaries (1: string owner) throws (1: AuthorizationException aze);
+ /**
+ * Get assigned assignments for a specific supervisor
+ */
+ SupervisorAssignments getSupervisorAssignments(1: string node) throws (1: AuthorizationException aze);
+ /**
+ * Send supervisor worker heartbeats for a specific supervisor
+ */
+ void sendSupervisorWorkerHeartbeats(1: SupervisorWorkerHeartbeats heartbeats) throws (1: AuthorizationException aze);
+ /**
+ * Send supervisor local worker heartbeat when a supervisor is unreachable
+ */
+ void sendSupervisorWorkerHeartbeat(1: SupervisorWorkerHeartbeat heatbeat) throws (1: AuthorizationException aze, 2: NotAliveException e);
void processWorkerMetrics(1: WorkerMetrics metrics);
}
@@ -858,10 +886,26 @@ exception HBExecutionException {
1: required string msg;
}
+service Supervisor {
+ /**
+ * Send node specific assignments to supervisor
+ */
+ void sendSupervisorAssignments(1: SupervisorAssignments assignments) throws (1: AuthorizationException aze);
+ /**
+ * Get local assignment for a storm
+ */
+ Assignment getLocalAssignmentForStorm(1: string id) throws (1: NotAliveException e, 2: AuthorizationException aze);
+ /**
+ * Send worker heartbeat to local supervisor
+ */
+ void sendSupervisorWorkerHeartbeat(1: SupervisorWorkerHeartbeat heartbeat) throws (1: AuthorizationException aze);
+}
+
# WorkerTokens are used as credentials that allow a Worker to authenticate with DRPC, Nimbus, or other storm processes that we add in here.
enum WorkerTokenServiceType {
NIMBUS,
- DRPC
+ DRPC,
+ SUPERVISOR
}
#This is information that we want to be sure users do not modify in any way...
http://git-wip-us.apache.org/repos/asf/storm/blob/0dac58b0/storm-client/test/jvm/org/apache/storm/assignments/LocalAssignmentsBackendTest.java
----------------------------------------------------------------------
diff --git a/storm-client/test/jvm/org/apache/storm/assignments/LocalAssignmentsBackendTest.java b/storm-client/test/jvm/org/apache/storm/assignments/LocalAssignmentsBackendTest.java
new file mode 100644
index 0000000..69777a8
--- /dev/null
+++ b/storm-client/test/jvm/org/apache/storm/assignments/LocalAssignmentsBackendTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.assignments;
+
+import org.apache.commons.collections.map.HashedMap;
+import org.apache.storm.generated.Assignment;
+import org.apache.storm.generated.NodeInfo;
+import org.apache.storm.utils.ConfigUtils;
+import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Set;
+import java.util.HashSet;
+
+
+public class LocalAssignmentsBackendTest {
+
+ @Test
+ public void testLocalAssignment(){
+ Map<String, Assignment> stormToAssignment = new HashMap<>();
+ String storm1 = "storm1";
+ String storm2 = "storm2";
+ Assignment ass1 = mockedAssignment(1);
+ Assignment ass2 = mockedAssignment(2);
+
+ ILocalAssignmentsBackend backend = LocalAssignmentsBackendFactory.getBackend(ConfigUtils.readStormConfig());
+ assertEquals(null, backend.getAssignment(storm1));
+ backend.keepOrUpdateAssignment(storm1, ass1);
+ backend.keepOrUpdateAssignment(storm2, ass2);
+ assertEquals(ass1, backend.getAssignment(storm1));
+ assertEquals(ass2, backend.getAssignment(storm2));
+ backend.clearStateForStorm(storm1);
+ assertEquals(null, backend.getAssignment(storm1));
+ backend.keepOrUpdateAssignment(storm1, ass1);
+ backend.keepOrUpdateAssignment(storm1, ass2);
+ assertEquals(ass2, backend.getAssignment(storm1));
+ }
+
+ @Test
+ public void testLocalIdInfo() {
+ String name1 = "name1";
+ String name2 = "name2";
+ String name3 = "name3";
+
+ String id1 = "id1";
+ String id2 = "id2";
+ String id3 = "id3";
+
+ ILocalAssignmentsBackend backend = LocalAssignmentsBackendFactory.getBackend(ConfigUtils.readStormConfig());
+ assertEquals(null, backend.getStormId(name3));
+ backend.keepStormId(name1, id1);
+ backend.keepStormId(name2, id2);
+ assertEquals(id1, backend.getStormId(name1));
+ assertEquals(id2, backend.getStormId(name2));
+ backend.deleteStormId(name1);
+ assertEquals(null, backend.getStormId(name1));
+ backend.clearStateForStorm(id2);
+ assertEquals(null, backend.getStormId(name2));
+ backend.keepStormId(name1, id1);
+ backend.keepStormId(name1, id3);
+ assertEquals(id3, backend.getStormId(name1));
+ }
+
+ private Assignment mockedAssignment(int i) {
+ Assignment ass = new Assignment();
+ ass.set_master_code_dir("master_code_dir" + i);
+ HashMap node_to_host = new HashMap();
+ node_to_host.put("node" + i, "host" + i);
+ ass.set_node_host(node_to_host);
+ Map<List<Long>, NodeInfo> executor_node_port = new HashMap<>();
+ Set<Long> nodePorts = new HashSet<>();
+ nodePorts.add(9723L);
+ executor_node_port.put(Arrays.asList(i + 0L), new NodeInfo("node" + i, nodePorts));
+ ass.set_executor_node_port(executor_node_port);
+ Map<List<Long>, Long> executor_start_time_secs = new HashMap<>();
+ executor_start_time_secs.put(Arrays.asList(1L), 12345L);
+ ass.set_executor_start_time_secs(executor_start_time_secs);
+ ass.set_worker_resources(new HashedMap());
+ ass.set_total_shared_off_heap(new HashedMap());
+ ass.set_owner("o");
+ return ass;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/0dac58b0/storm-client/test/jvm/org/apache/storm/cluster/StormClusterStateImplTest.java
----------------------------------------------------------------------
diff --git a/storm-client/test/jvm/org/apache/storm/cluster/StormClusterStateImplTest.java b/storm-client/test/jvm/org/apache/storm/cluster/StormClusterStateImplTest.java
index e2ef41e..8664701 100644
--- a/storm-client/test/jvm/org/apache/storm/cluster/StormClusterStateImplTest.java
+++ b/storm-client/test/jvm/org/apache/storm/cluster/StormClusterStateImplTest.java
@@ -18,9 +18,8 @@
package org.apache.storm.cluster;
+import org.apache.storm.assignments.LocalAssignmentsBackendFactory;
import org.apache.storm.callback.ZKStateChangedCallback;
-import org.apache.zookeeper.KeeperException;
-import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Matchers;
@@ -48,7 +47,7 @@ public class StormClusterStateImplTest {
public void init() throws Exception {
storage = Mockito.mock(IStateStorage.class);
context = new ClusterStateContext();
- state = new StormClusterStateImpl(storage, context, false /*solo*/);
+ state = new StormClusterStateImpl(storage, LocalAssignmentsBackendFactory.getDefault(), context, false /*solo*/);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/0dac58b0/storm-core/src/clj/org/apache/storm/command/shell_submission.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/command/shell_submission.clj b/storm-core/src/clj/org/apache/storm/command/shell_submission.clj
index 3514040..4805729 100644
--- a/storm-core/src/clj/org/apache/storm/command/shell_submission.clj
+++ b/storm-core/src/clj/org/apache/storm/command/shell_submission.clj
@@ -32,7 +32,7 @@
root (.get conf Config/STORM_ZOOKEEPER_ROOT)
zk (ClientZookeeper/mkClient conf servers port root (DefaultWatcherCallBack.) conf)
; since this is not a purpose to add to leader lock queue, passing nil as blob-store and topo cache is ok
- zk-leader-elector (Zookeeper/zkLeaderElector conf zk nil nil)
+ zk-leader-elector (Zookeeper/zkLeaderElector conf zk nil nil nil nil)
leader-nimbus (.getLeader zk-leader-elector)
host (.getHost leader-nimbus)
port (.getPort leader-nimbus)
http://git-wip-us.apache.org/repos/asf/storm/blob/0dac58b0/storm-core/test/clj/org/apache/storm/nimbus_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/nimbus_test.clj b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
index 37f5c74..d2066e8 100644
--- a/storm-core/test/clj/org/apache/storm/nimbus_test.clj
+++ b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
@@ -46,7 +46,8 @@
(:import [org.apache.commons.io FileUtils])
(:import [org.json.simple JSONValue])
(:import [org.apache.storm.daemon StormCommon])
- (:import [org.apache.storm.cluster IStormClusterState StormClusterStateImpl ClusterStateContext ClusterUtils])
+ (:import [org.apache.storm.cluster IStormClusterState StormClusterStateImpl ClusterStateContext ClusterUtils]
+ [org.apache.storm.assignments LocalAssignmentsBackendFactory])
(:use [org.apache.storm util daemon-config config log])
(:require [conjure.core])
@@ -168,7 +169,8 @@
(log-warn "merged:" stats)
(.workerHeartbeat state storm-id node port
- (StatsUtil/thriftifyZkWorkerHb (StatsUtil/mkZkWorkerHb storm-id stats (int 10))))))
+ (StatsUtil/thriftifyZkWorkerHb (StatsUtil/mkZkWorkerHb storm-id stats (int 10))))
+ (.sendSupervisorWorkerHeartbeat (.getNimbus cluster) (StatsUtil/thriftifyRPCWorkerHb storm-id executor))))
(defn slot-assignments [cluster storm-id]
(let [state (.getClusterState cluster)
@@ -686,11 +688,12 @@
(.submitTopology cluster "test3" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 5} topology)
(bind storm-id3 (StormCommon/getStormId state "test3"))
(.advanceClusterTime cluster 11)
- (.removeStorm state storm-id3)
+ ;; this guarantees an immediate kill notification
+ (.killTopology (.getNimbus cluster) "test3")
+ (.advanceClusterTime cluster 41)
(is (nil? (.stormBase state storm-id3 nil)))
(is (nil? (.assignmentInfo state storm-id3 nil)))
- (.advanceClusterTime cluster 11)
(is (= 0 (count (.heartbeatStorms state))))
;; this guarantees that monitor thread won't trigger for 10 more seconds
@@ -801,6 +804,7 @@
(.advanceClusterTime cluster 31)
(is (not= ass1 (executor-assignment cluster storm-id executor-id1)))
(is (= ass2 (executor-assignment cluster storm-id executor-id2))) ; tests launch timeout
+
(check-consistency cluster "test")
@@ -879,7 +883,8 @@
(.advanceClusterTime cluster 13)
(is (= ass1 (executor-assignment cluster storm-id executor-id1)))
(is (= ass2 (executor-assignment cluster storm-id executor-id2)))
- (.killSupervisor cluster "b")
+ ;; with rpc reporting mode, only heartbeats from killed supervisor will time out
+ (.killSupervisor cluster (.get_node ass2))
(do-executor-heartbeat cluster storm-id executor-id1)
(.advanceClusterTime cluster 11)
@@ -1311,7 +1316,7 @@
(with-open [zk (InProcessZookeeper. )]
(with-open [tmp-nimbus-dir (TmpPath.)
_ (MockedZookeeper. (proxy [Zookeeper] []
- (zkLeaderElectorImpl [conf zk blob-store tc] (MockLeaderElector. ))))]
+ (zkLeaderElectorImpl [conf zk blob-store tc cluster-state acls] (MockLeaderElector. ))))]
(let [nimbus-dir (.getPath tmp-nimbus-dir)]
(letlocals
(bind conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
@@ -1319,8 +1324,9 @@
STORM-CLUSTER-MODE "local"
STORM-ZOOKEEPER-PORT (.getPort zk)
STORM-LOCAL-DIR nimbus-dir}))
- (bind cluster-state (ClusterUtils/mkStormClusterState conf (ClusterStateContext.)))
- (bind nimbus (mk-nimbus conf (Nimbus$StandaloneINimbus.) nil nil nil nil))
+ (bind ass-backend (LocalAssignmentsBackendFactory/getDefault))
+ (bind cluster-state (ClusterUtils/mkStormClusterState conf ass-backend (ClusterStateContext.)))
+ (bind nimbus (mk-nimbus conf (Nimbus$StandaloneINimbus.) nil nil nil cluster-state))
(.launchServer nimbus)
(bind topology (Thrift/buildTopology
{"1" (Thrift/prepareSpoutDetails
@@ -1328,11 +1334,11 @@
{}))
(with-open [_ (MockedZookeeper. (proxy [Zookeeper] []
- (zkLeaderElectorImpl [conf zk blob-store tc] (MockLeaderElector. false))))]
+ (zkLeaderElectorImpl [conf zk blob-store tc cluster-state acls] (MockLeaderElector. false))))]
(letlocals
- (bind non-leader-cluster-state (ClusterUtils/mkStormClusterState conf (ClusterStateContext.)))
- (bind non-leader-nimbus (mk-nimbus conf (Nimbus$StandaloneINimbus.) nil nil nil nil))
+ (bind non-leader-cluster-state (ClusterUtils/mkStormClusterState conf ass-backend (ClusterStateContext.)))
+ (bind non-leader-nimbus (mk-nimbus conf (Nimbus$StandaloneINimbus.) nil nil nil non-leader-cluster-state))
(.launchServer non-leader-nimbus)
;first we verify that the master nimbus can perform all actions, even with another nimbus present.
@@ -1374,7 +1380,8 @@
(deftest test-nimbus-iface-submitTopologyWithOpts-checks-authorization
(with-open [cluster (.build (doto (LocalCluster$Builder. )
(.withDaemonConf {NIMBUS-AUTHORIZER
- "org.apache.storm.security.auth.authorizer.DenyAuthorizer"})))]
+ "org.apache.storm.security.auth.authorizer.DenyAuthorizer"
+ SUPERVISOR-AUTHORIZER "org.apache.storm.security.auth.authorizer.DenyAuthorizer"})))]
(let [
topology (Thrift/buildTopology {} {})
]
@@ -1395,7 +1402,8 @@
(.withClusterState cluster-state)
(.withBlobStore blob-store)
(.withTopoCache tc)
- (.withDaemonConf {NIMBUS-AUTHORIZER "org.apache.storm.security.auth.authorizer.DenyAuthorizer"})))]
+ (.withDaemonConf {NIMBUS-AUTHORIZER "org.apache.storm.security.auth.authorizer.DenyAuthorizer"
+ SUPERVISOR-AUTHORIZER "org.apache.storm.security.auth.authorizer.DenyAuthorizer"})))]
(let [nimbus (.getNimbus cluster)
topology-name "test"
topology-id "test-id"]
@@ -1417,7 +1425,8 @@
(.withBlobStore blob-store)
(.withTopoCache tc)
(.withNimbusWrapper (reify UnaryOperator (apply [this nimbus] (Mockito/spy nimbus))))
- (.withDaemonConf {NIMBUS-AUTHORIZER "org.apache.storm.security.auth.authorizer.NoopAuthorizer"})))]
+ (.withDaemonConf {NIMBUS-AUTHORIZER "org.apache.storm.security.auth.authorizer.NoopAuthorizer"
+ SUPERVISOR-AUTHORIZER "org.apache.storm.security.auth.authorizer.NoopAuthorizer"})))]
(let [nimbus (.getNimbus cluster)
topology-name "test-nimbus-check-autho-params"
topology-id "fake-id"
@@ -1476,7 +1485,8 @@
(.withBlobStore blob-store)
(.withTopoCache tc)
(.withNimbusWrapper (reify UnaryOperator (apply [this nimbus] (Mockito/spy nimbus))))
- (.withDaemonConf {NIMBUS-AUTHORIZER "org.apache.storm.security.auth.authorizer.NoopAuthorizer"})))]
+ (.withDaemonConf {NIMBUS-AUTHORIZER "org.apache.storm.security.auth.authorizer.NoopAuthorizer"
+ SUPERVISOR-AUTHORIZER "org.apache.storm.security.auth.authorizer.NoopAuthorizer"})))]
(let [nimbus (.getNimbus cluster)
expected-name "test-nimbus-check-autho-params"
expected-conf {TOPOLOGY-NAME expected-name
@@ -1506,7 +1516,7 @@
(.thenReturn (Mockito/when (.allSupervisorInfo cluster-state)) all-supervisors)
(.thenReturn (Mockito/when (.readTopoConf tc (Mockito/any String) (Mockito/any Subject))) expected-conf)
(.thenReturn (Mockito/when (.readTopology tc (Mockito/any String) (Mockito/any Subject))) topology)
- (.thenReturn (Mockito/when (.topologyAssignments cluster-state)) topo-assignment)
+ (.thenReturn (Mockito/when (.assignmentsInfo cluster-state)) topo-assignment)
(.getSupervisorPageInfo nimbus "super1" nil true)
;; afterwards, it should get called twice
@@ -1630,7 +1640,8 @@
(.withClusterState cluster-state)
(.withBlobStore blob-store)
(.withTopoCache tc)
- (.withDaemonConf {NIMBUS-AUTHORIZER "org.apache.storm.security.auth.authorizer.NoopAuthorizer"})))]
+ (.withDaemonConf {NIMBUS-AUTHORIZER "org.apache.storm.security.auth.authorizer.NoopAuthorizer"
+ SUPERVISOR-AUTHORIZER "org.apache.storm.security.auth.authorizer.NoopAuthorizer"})))]
(.thenReturn (Mockito/when (.getTopoId cluster-state "test")) (Optional/empty))
(let [topology (Thrift/buildTopology {} {})
bad-config {"topology.isolate.machines" "2"}]
@@ -1676,7 +1687,7 @@
(with-open [zk (InProcessZookeeper. )]
(with-open [tmp-nimbus-dir (TmpPath.)
_ (MockedZookeeper. (proxy [Zookeeper] []
- (zkLeaderElectorImpl [conf zk blob-store tc] (MockLeaderElector. ))))]
+ (zkLeaderElectorImpl [conf zk blob-store tc cluster-state acls] (MockLeaderElector. ))))]
(let [nimbus-dir (.getPath tmp-nimbus-dir)]
(letlocals
(bind conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
@@ -1745,7 +1756,8 @@
(.withClusterState cluster-state)
(.withBlobStore blob-store)
(.withTopoCache tc)
- (.withDaemonConf {NIMBUS-AUTHORIZER "org.apache.storm.security.auth.authorizer.NoopAuthorizer"})))]
+ (.withDaemonConf {NIMBUS-AUTHORIZER "org.apache.storm.security.auth.authorizer.NoopAuthorizer"
+ SUPERVISOR-AUTHORIZER "org.apache.storm.security.auth.authorizer.NoopAuthorizer"})))]
(let [nimbus (.getNimbus cluster)
previous-config (LogConfig.)
mock-config (LogConfig.)
@@ -1776,7 +1788,8 @@
(.withClusterState cluster-state)
(.withBlobStore blob-store)
(.withTopoCache tc)
- (.withDaemonConf {NIMBUS-AUTHORIZER "org.apache.storm.security.auth.authorizer.NoopAuthorizer"})))]
+ (.withDaemonConf {NIMBUS-AUTHORIZER "org.apache.storm.security.auth.authorizer.NoopAuthorizer"
+ SUPERVISOR-AUTHORIZER "org.apache.storm.security.auth.authorizer.NoopAuthorizer"})))]
(let [nimbus (.getNimbus cluster)
previous-config (LogConfig.)
mock-config (LogConfig.)
@@ -1874,7 +1887,7 @@
mock-blob-store (Mockito/mock BlobStore)
conf {NIMBUS-MONITOR-FREQ-SECS 10}]
(with-open [_ (MockedZookeeper. (proxy [Zookeeper] []
- (zkLeaderElectorImpl [conf zk blob-store tc] (MockLeaderElector. ))))]
+ (zkLeaderElectorImpl [conf zk blob-store tc cluster-state acls] (MockLeaderElector. ))))]
(let [nimbus (Mockito/spy (Nimbus. conf nil mock-state nil mock-blob-store nil nil))]
(.set (.getHeartbeatsCache nimbus) hb-cache)
(.thenReturn (Mockito/when (.storedTopoIds mock-blob-store)) (HashSet. inactive-topos))
@@ -1914,7 +1927,7 @@
mock-blob-store (Mockito/mock BlobStore)
conf {NIMBUS-MONITOR-FREQ-SECS 10}]
(with-open [_ (MockedZookeeper. (proxy [Zookeeper] []
- (zkLeaderElectorImpl [conf zk blob-store tc] (MockLeaderElector. ))))]
+ (zkLeaderElectorImpl [conf zk blob-store tc cluster-state acls] (MockLeaderElector. ))))]
(let [nimbus (Mockito/spy (Nimbus. conf nil mock-state nil mock-blob-store nil nil))]
(.set (.getHeartbeatsCache nimbus) hb-cache)
(.thenReturn (Mockito/when (.storedTopoIds mock-blob-store)) (set inactive-topos))
http://git-wip-us.apache.org/repos/asf/storm/blob/0dac58b0/storm-core/test/clj/org/apache/storm/security/auth/nimbus_auth_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/security/auth/nimbus_auth_test.clj b/storm-core/test/clj/org/apache/storm/security/auth/nimbus_auth_test.clj
index abc1579..148afd8 100644
--- a/storm-core/test/clj/org/apache/storm/security/auth/nimbus_auth_test.clj
+++ b/storm-core/test/clj/org/apache/storm/security/auth/nimbus_auth_test.clj
@@ -38,6 +38,7 @@
(defn to-conf [nimbus-port login-cfg aznClass transportPluginClass]
(let [conf {NIMBUS-AUTHORIZER aznClass
+ SUPERVISOR-AUTHORIZER "org.apache.storm.security.auth.authorizer.NoopAuthorizer"
NIMBUS-THRIFT-PORT nimbus-port
STORM-THRIFT-TRANSPORT-PLUGIN transportPluginClass }
conf (if login-cfg (merge conf {"java.security.auth.login.config" login-cfg}) conf)]
@@ -78,6 +79,7 @@
(.withNimbusDaemon)
(.withDaemonConf
{NIMBUS-AUTHORIZER "org.apache.storm.security.auth.authorizer.NoopAuthorizer"
+ SUPERVISOR-AUTHORIZER "org.apache.storm.security.auth.authorizer.NoopAuthorizer"
NIMBUS-THRIFT-PORT 0
STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.SimpleTransportPlugin"})))]
(let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
@@ -106,6 +108,7 @@
(.withNimbusDaemon)
(.withDaemonConf
{NIMBUS-AUTHORIZER "org.apache.storm.security.auth.authorizer.DenyAuthorizer"
+ SUPERVISOR-AUTHORIZER "org.apache.storm.security.auth.authorizer.DenyAuthorizer"
NIMBUS-THRIFT-PORT 0
STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.SimpleTransportPlugin"})))]
(let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
@@ -170,6 +173,7 @@
(.withNimbusDaemon)
(.withDaemonConf
{NIMBUS-AUTHORIZER "org.apache.storm.security.auth.authorizer.DenyAuthorizer"
+ SUPERVISOR-AUTHORIZER "org.apache.storm.security.auth.authorizer.DenyAuthorizer"
NIMBUS-THRIFT-PORT 0
"java.security.auth.login.config" "test/clj/org/apache/storm/security/auth/jaas_digest.conf"
STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.digest.DigestSaslTransportPlugin"})))]
http://git-wip-us.apache.org/repos/asf/storm/blob/0dac58b0/storm-core/test/jvm/org/apache/storm/stats/TestStatsUtil.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/stats/TestStatsUtil.java b/storm-core/test/jvm/org/apache/storm/stats/TestStatsUtil.java
index 78b032c..1ca793b 100644
--- a/storm-core/test/jvm/org/apache/storm/stats/TestStatsUtil.java
+++ b/storm-core/test/jvm/org/apache/storm/stats/TestStatsUtil.java
@@ -70,14 +70,8 @@ public class TestStatsUtil {
HashMap<String, Object> exec2Beat = new HashMap<String, Object>();
exec2Beat.put("uptime", 200);
- Map<String, Object> beat1 = new HashMap<String, Object>();
- beat1.put("heartbeat", exec1Beat);
-
- Map<String, Object> beat2 = new HashMap<String, Object>();
- beat2.put("heartbeat", exec2Beat);
-
- beats.put(exec1, beat1);
- beats.put(exec2, beat2);
+ beats.put(exec1, exec1Beat);
+ beats.put(exec2, exec2Beat);
task2Component.put(1, "my-component");
task2Component.put(2, "__sys1");
http://git-wip-us.apache.org/repos/asf/storm/blob/0dac58b0/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
index f7388f5..ec1815d 100644
--- a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
+++ b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
@@ -38,6 +38,7 @@ import org.apache.storm.scheduler.blacklist.reporters.IReporter;
import org.apache.storm.scheduler.blacklist.strategies.IBlacklistStrategy;
import org.apache.storm.scheduler.resource.strategies.priority.ISchedulingPriorityStrategy;
import org.apache.storm.scheduler.resource.strategies.scheduling.IStrategy;
+import org.apache.storm.security.auth.IAuthorizer;
import org.apache.storm.validation.ConfigValidation;
import org.apache.storm.validation.Validated;
@@ -264,6 +265,13 @@ public class DaemonConfig implements Validated {
public static final String NIMBUS_AUTHORIZER = "nimbus.authorizer";
/**
+ * Class name for authorization plugin for supervisor.
+ */
+ @isImplementationOfClass(implementsClass = IAuthorizer.class)
+ @isString
+ public static final String SUPERVISOR_AUTHORIZER = "supervisor.authorizer";
+
+ /**
* Impersonation user ACL config entries.
*/
@isString
@@ -283,6 +291,24 @@ public class DaemonConfig implements Validated {
public static final String NIMBUS_TOPOLOGY_ACTION_NOTIFIER_PLUGIN = "nimbus.topology.action.notifier.plugin.class";
/**
+ * This controls the number of working threads for distributing master assignments to supervisors.
+ */
+ @isInteger
+ public static final String NIMBUS_ASSIGNMENTS_SERVICE_THREADS = "nimbus.assignments.service.threads";
+
+ /**
+ * This controls the number of working thread queue size of assignment service.
+ */
+ @isInteger
+ public static final String NIMBUS_ASSIGNMENTS_SERVICE_THREAD_QUEUE_SIZE = "nimbus.assignments.service.thread.queue.size";
+
+ /**
+ * class controls heartbeats recovery strategy
+ */
+ @isString
+ public static final String NIMBUS_WORKER_HEARTBEATS_RECOVERY_STRATEGY_CLASS = "nimbus.worker.heartbeats.recovery.strategy.class";
+
+ /**
* Storm UI binds to this host/interface.
*/
@isString
http://git-wip-us.apache.org/repos/asf/storm/blob/0dac58b0/storm-server/src/main/java/org/apache/storm/LocalCluster.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/LocalCluster.java b/storm-server/src/main/java/org/apache/storm/LocalCluster.java
index 39c8d57..7db41b5 100644
--- a/storm-server/src/main/java/org/apache/storm/LocalCluster.java
+++ b/storm-server/src/main/java/org/apache/storm/LocalCluster.java
@@ -75,6 +75,9 @@ import org.apache.storm.generated.SettableBlobMeta;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.generated.SubmitOptions;
import org.apache.storm.generated.SupervisorPageInfo;
+import org.apache.storm.generated.SupervisorAssignments;
+import org.apache.storm.generated.SupervisorWorkerHeartbeats;
+import org.apache.storm.generated.SupervisorWorkerHeartbeat;
import org.apache.storm.generated.TopologyHistoryInfo;
import org.apache.storm.generated.TopologyInfo;
import org.apache.storm.generated.TopologyPageInfo;
@@ -808,6 +811,8 @@ public class LocalCluster implements ILocalClusterTrackedTopologyAware, Iface {
Supervisor s = new Supervisor(superConf, sharedContext, isuper);
s.launch();
+ s.setLocalNimbus(this.nimbus);
+ this.nimbus.addSupervisor(s);
supervisors.add(s);
return s;
}
@@ -1138,6 +1143,20 @@ public class LocalCluster implements ILocalClusterTrackedTopologyAware, Iface {
}
@Override
+ public SupervisorAssignments getSupervisorAssignments(String node) throws AuthorizationException, TException {
+ return null;
+ }
+
+ @Override
+ public void sendSupervisorWorkerHeartbeats(SupervisorWorkerHeartbeats heartbeats) throws AuthorizationException, TException {
+
+ }
+
+ @Override
+ public void sendSupervisorWorkerHeartbeat(SupervisorWorkerHeartbeat heatbeat) throws AuthorizationException, TException {
+
+ }
+
public void processWorkerMetrics(WorkerMetrics metrics) throws org.apache.thrift.TException {
getNimbus().processWorkerMetrics(metrics);
}