You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by xi...@apache.org on 2017/07/18 07:44:56 UTC
[2/7] storm git commit: [STORM-2622] Add owner resource summary
http://git-wip-us.apache.org/repos/asf/storm/blob/e5d6d32b/storm-client/src/py/storm/Nimbus.py
----------------------------------------------------------------------
diff --git a/storm-client/src/py/storm/Nimbus.py b/storm-client/src/py/storm/Nimbus.py
index 1c5e86e..7313057 100644
--- a/storm-client/src/py/storm/Nimbus.py
+++ b/storm-client/src/py/storm/Nimbus.py
@@ -376,6 +376,13 @@ class Iface:
"""
pass
+ def getOwnerResourceSummaries(self, owner):
+ """
+ Parameters:
+ - owner
+ """
+ pass
+
class Client(Iface):
def __init__(self, iprot, oprot=None):
@@ -1927,6 +1934,39 @@ class Client(Iface):
raise result.aze
raise TApplicationException(TApplicationException.MISSING_RESULT, "getTopologyHistory failed: unknown result")
+ def getOwnerResourceSummaries(self, owner):
+ """
+ Parameters:
+ - owner
+ """
+ self.send_getOwnerResourceSummaries(owner)
+ return self.recv_getOwnerResourceSummaries()
+
+ def send_getOwnerResourceSummaries(self, owner):
+ self._oprot.writeMessageBegin('getOwnerResourceSummaries', TMessageType.CALL, self._seqid)
+ args = getOwnerResourceSummaries_args()
+ args.owner = owner
+ args.write(self._oprot)
+ self._oprot.writeMessageEnd()
+ self._oprot.trans.flush()
+
+ def recv_getOwnerResourceSummaries(self):
+ iprot = self._iprot
+ (fname, mtype, rseqid) = iprot.readMessageBegin()
+ if mtype == TMessageType.EXCEPTION:
+ x = TApplicationException()
+ x.read(iprot)
+ iprot.readMessageEnd()
+ raise x
+ result = getOwnerResourceSummaries_result()
+ result.read(iprot)
+ iprot.readMessageEnd()
+ if result.success is not None:
+ return result.success
+ if result.aze is not None:
+ raise result.aze
+ raise TApplicationException(TApplicationException.MISSING_RESULT, "getOwnerResourceSummaries failed: unknown result")
+
class Processor(Iface, TProcessor):
def __init__(self, handler):
@@ -1977,6 +2017,7 @@ class Processor(Iface, TProcessor):
self._processMap["getTopology"] = Processor.process_getTopology
self._processMap["getUserTopology"] = Processor.process_getUserTopology
self._processMap["getTopologyHistory"] = Processor.process_getTopologyHistory
+ self._processMap["getOwnerResourceSummaries"] = Processor.process_getOwnerResourceSummaries
def process(self, iprot, oprot):
(name, type, seqid) = iprot.readMessageBegin()
@@ -3052,6 +3093,28 @@ class Processor(Iface, TProcessor):
oprot.writeMessageEnd()
oprot.trans.flush()
+ def process_getOwnerResourceSummaries(self, seqid, iprot, oprot):
+ args = getOwnerResourceSummaries_args()
+ args.read(iprot)
+ iprot.readMessageEnd()
+ result = getOwnerResourceSummaries_result()
+ try:
+ result.success = self._handler.getOwnerResourceSummaries(args.owner)
+ msg_type = TMessageType.REPLY
+ except (TTransport.TTransportException, KeyboardInterrupt, SystemExit):
+ raise
+ except AuthorizationException as aze:
+ msg_type = TMessageType.REPLY
+ result.aze = aze
+ except Exception as ex:
+ msg_type = TMessageType.EXCEPTION
+ logging.exception(ex)
+ result = TApplicationException(TApplicationException.INTERNAL_ERROR, 'Internal error')
+ oprot.writeMessageBegin("getOwnerResourceSummaries", msg_type, seqid)
+ result.write(oprot)
+ oprot.writeMessageEnd()
+ oprot.trans.flush()
+
# HELPER FUNCTIONS AND STRUCTURES
@@ -9913,3 +9976,155 @@ class getTopologyHistory_result:
def __ne__(self, other):
return not (self == other)
+
+class getOwnerResourceSummaries_args:
+ """
+ Attributes:
+ - owner
+ """
+
+ thrift_spec = (
+ None, # 0
+ (1, TType.STRING, 'owner', None, None, ), # 1
+ )
+
+ def __init__(self, owner=None,):
+ self.owner = owner
+
+ def read(self, iprot):
+ if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+ fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+ return
+ iprot.readStructBegin()
+ while True:
+ (fname, ftype, fid) = iprot.readFieldBegin()
+ if ftype == TType.STOP:
+ break
+ if fid == 1:
+ if ftype == TType.STRING:
+ self.owner = iprot.readString().decode('utf-8')
+ else:
+ iprot.skip(ftype)
+ else:
+ iprot.skip(ftype)
+ iprot.readFieldEnd()
+ iprot.readStructEnd()
+
+ def write(self, oprot):
+ if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+ oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+ return
+ oprot.writeStructBegin('getOwnerResourceSummaries_args')
+ if self.owner is not None:
+ oprot.writeFieldBegin('owner', TType.STRING, 1)
+ oprot.writeString(self.owner.encode('utf-8'))
+ oprot.writeFieldEnd()
+ oprot.writeFieldStop()
+ oprot.writeStructEnd()
+
+ def validate(self):
+ return
+
+
+ def __hash__(self):
+ value = 17
+ value = (value * 31) ^ hash(self.owner)
+ return value
+
+ def __repr__(self):
+ L = ['%s=%r' % (key, value)
+ for key, value in self.__dict__.iteritems()]
+ return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
+
+class getOwnerResourceSummaries_result:
+ """
+ Attributes:
+ - success
+ - aze
+ """
+
+ thrift_spec = (
+ (0, TType.LIST, 'success', (TType.STRUCT,(OwnerResourceSummary, OwnerResourceSummary.thrift_spec)), None, ), # 0
+ (1, TType.STRUCT, 'aze', (AuthorizationException, AuthorizationException.thrift_spec), None, ), # 1
+ )
+
+ def __init__(self, success=None, aze=None,):
+ self.success = success
+ self.aze = aze
+
+ def read(self, iprot):
+ if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+ fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+ return
+ iprot.readStructBegin()
+ while True:
+ (fname, ftype, fid) = iprot.readFieldBegin()
+ if ftype == TType.STOP:
+ break
+ if fid == 0:
+ if ftype == TType.LIST:
+ self.success = []
+ (_etype731, _size728) = iprot.readListBegin()
+ for _i732 in xrange(_size728):
+ _elem733 = OwnerResourceSummary()
+ _elem733.read(iprot)
+ self.success.append(_elem733)
+ iprot.readListEnd()
+ else:
+ iprot.skip(ftype)
+ elif fid == 1:
+ if ftype == TType.STRUCT:
+ self.aze = AuthorizationException()
+ self.aze.read(iprot)
+ else:
+ iprot.skip(ftype)
+ else:
+ iprot.skip(ftype)
+ iprot.readFieldEnd()
+ iprot.readStructEnd()
+
+ def write(self, oprot):
+ if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+ oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+ return
+ oprot.writeStructBegin('getOwnerResourceSummaries_result')
+ if self.success is not None:
+ oprot.writeFieldBegin('success', TType.LIST, 0)
+ oprot.writeListBegin(TType.STRUCT, len(self.success))
+ for iter734 in self.success:
+ iter734.write(oprot)
+ oprot.writeListEnd()
+ oprot.writeFieldEnd()
+ if self.aze is not None:
+ oprot.writeFieldBegin('aze', TType.STRUCT, 1)
+ self.aze.write(oprot)
+ oprot.writeFieldEnd()
+ oprot.writeFieldStop()
+ oprot.writeStructEnd()
+
+ def validate(self):
+ return
+
+
+ def __hash__(self):
+ value = 17
+ value = (value * 31) ^ hash(self.success)
+ value = (value * 31) ^ hash(self.aze)
+ return value
+
+ def __repr__(self):
+ L = ['%s=%r' % (key, value)
+ for key, value in self.__dict__.iteritems()]
+ return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
http://git-wip-us.apache.org/repos/asf/storm/blob/e5d6d32b/storm-client/src/py/storm/ttypes.py
----------------------------------------------------------------------
diff --git a/storm-client/src/py/storm/ttypes.py b/storm-client/src/py/storm/ttypes.py
index d47483f..4058f60 100644
--- a/storm-client/src/py/storm/ttypes.py
+++ b/storm-client/src/py/storm/ttypes.py
@@ -10998,6 +10998,294 @@ class TopologyHistoryInfo:
def __ne__(self, other):
return not (self == other)
+class OwnerResourceSummary:
+ """
+ Attributes:
+ - owner
+ - total_topologies
+ - total_executors
+ - total_workers
+ - memory_usage
+ - cpu_usage
+ - memory_guarantee
+ - cpu_guarantee
+ - memory_guarantee_remaining
+ - cpu_guarantee_remaining
+ - isolated_node_guarantee
+ - total_tasks
+ - requested_on_heap_memory
+ - requested_off_heap_memory
+ - requested_total_memory
+ - requested_cpu
+ - assigned_on_heap_memory
+ - assigned_off_heap_memory
+ """
+
+ thrift_spec = (
+ None, # 0
+ (1, TType.STRING, 'owner', None, None, ), # 1
+ (2, TType.I32, 'total_topologies', None, None, ), # 2
+ (3, TType.I32, 'total_executors', None, None, ), # 3
+ (4, TType.I32, 'total_workers', None, None, ), # 4
+ (5, TType.DOUBLE, 'memory_usage', None, None, ), # 5
+ (6, TType.DOUBLE, 'cpu_usage', None, None, ), # 6
+ (7, TType.DOUBLE, 'memory_guarantee', None, None, ), # 7
+ (8, TType.DOUBLE, 'cpu_guarantee', None, None, ), # 8
+ (9, TType.DOUBLE, 'memory_guarantee_remaining', None, None, ), # 9
+ (10, TType.DOUBLE, 'cpu_guarantee_remaining', None, None, ), # 10
+ (11, TType.I32, 'isolated_node_guarantee', None, None, ), # 11
+ (12, TType.I32, 'total_tasks', None, None, ), # 12
+ (13, TType.DOUBLE, 'requested_on_heap_memory', None, None, ), # 13
+ (14, TType.DOUBLE, 'requested_off_heap_memory', None, None, ), # 14
+ (15, TType.DOUBLE, 'requested_total_memory', None, None, ), # 15
+ (16, TType.DOUBLE, 'requested_cpu', None, None, ), # 16
+ (17, TType.DOUBLE, 'assigned_on_heap_memory', None, None, ), # 17
+ (18, TType.DOUBLE, 'assigned_off_heap_memory', None, None, ), # 18
+ )
+
+ def __init__(self, owner=None, total_topologies=None, total_executors=None, total_workers=None, memory_usage=None, cpu_usage=None, memory_guarantee=None, cpu_guarantee=None, memory_guarantee_remaining=None, cpu_guarantee_remaining=None, isolated_node_guarantee=None, total_tasks=None, requested_on_heap_memory=None, requested_off_heap_memory=None, requested_total_memory=None, requested_cpu=None, assigned_on_heap_memory=None, assigned_off_heap_memory=None,):
+ self.owner = owner
+ self.total_topologies = total_topologies
+ self.total_executors = total_executors
+ self.total_workers = total_workers
+ self.memory_usage = memory_usage
+ self.cpu_usage = cpu_usage
+ self.memory_guarantee = memory_guarantee
+ self.cpu_guarantee = cpu_guarantee
+ self.memory_guarantee_remaining = memory_guarantee_remaining
+ self.cpu_guarantee_remaining = cpu_guarantee_remaining
+ self.isolated_node_guarantee = isolated_node_guarantee
+ self.total_tasks = total_tasks
+ self.requested_on_heap_memory = requested_on_heap_memory
+ self.requested_off_heap_memory = requested_off_heap_memory
+ self.requested_total_memory = requested_total_memory
+ self.requested_cpu = requested_cpu
+ self.assigned_on_heap_memory = assigned_on_heap_memory
+ self.assigned_off_heap_memory = assigned_off_heap_memory
+
+ def read(self, iprot):
+ if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+ fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+ return
+ iprot.readStructBegin()
+ while True:
+ (fname, ftype, fid) = iprot.readFieldBegin()
+ if ftype == TType.STOP:
+ break
+ if fid == 1:
+ if ftype == TType.STRING:
+ self.owner = iprot.readString().decode('utf-8')
+ else:
+ iprot.skip(ftype)
+ elif fid == 2:
+ if ftype == TType.I32:
+ self.total_topologies = iprot.readI32()
+ else:
+ iprot.skip(ftype)
+ elif fid == 3:
+ if ftype == TType.I32:
+ self.total_executors = iprot.readI32()
+ else:
+ iprot.skip(ftype)
+ elif fid == 4:
+ if ftype == TType.I32:
+ self.total_workers = iprot.readI32()
+ else:
+ iprot.skip(ftype)
+ elif fid == 5:
+ if ftype == TType.DOUBLE:
+ self.memory_usage = iprot.readDouble()
+ else:
+ iprot.skip(ftype)
+ elif fid == 6:
+ if ftype == TType.DOUBLE:
+ self.cpu_usage = iprot.readDouble()
+ else:
+ iprot.skip(ftype)
+ elif fid == 7:
+ if ftype == TType.DOUBLE:
+ self.memory_guarantee = iprot.readDouble()
+ else:
+ iprot.skip(ftype)
+ elif fid == 8:
+ if ftype == TType.DOUBLE:
+ self.cpu_guarantee = iprot.readDouble()
+ else:
+ iprot.skip(ftype)
+ elif fid == 9:
+ if ftype == TType.DOUBLE:
+ self.memory_guarantee_remaining = iprot.readDouble()
+ else:
+ iprot.skip(ftype)
+ elif fid == 10:
+ if ftype == TType.DOUBLE:
+ self.cpu_guarantee_remaining = iprot.readDouble()
+ else:
+ iprot.skip(ftype)
+ elif fid == 11:
+ if ftype == TType.I32:
+ self.isolated_node_guarantee = iprot.readI32()
+ else:
+ iprot.skip(ftype)
+ elif fid == 12:
+ if ftype == TType.I32:
+ self.total_tasks = iprot.readI32()
+ else:
+ iprot.skip(ftype)
+ elif fid == 13:
+ if ftype == TType.DOUBLE:
+ self.requested_on_heap_memory = iprot.readDouble()
+ else:
+ iprot.skip(ftype)
+ elif fid == 14:
+ if ftype == TType.DOUBLE:
+ self.requested_off_heap_memory = iprot.readDouble()
+ else:
+ iprot.skip(ftype)
+ elif fid == 15:
+ if ftype == TType.DOUBLE:
+ self.requested_total_memory = iprot.readDouble()
+ else:
+ iprot.skip(ftype)
+ elif fid == 16:
+ if ftype == TType.DOUBLE:
+ self.requested_cpu = iprot.readDouble()
+ else:
+ iprot.skip(ftype)
+ elif fid == 17:
+ if ftype == TType.DOUBLE:
+ self.assigned_on_heap_memory = iprot.readDouble()
+ else:
+ iprot.skip(ftype)
+ elif fid == 18:
+ if ftype == TType.DOUBLE:
+ self.assigned_off_heap_memory = iprot.readDouble()
+ else:
+ iprot.skip(ftype)
+ else:
+ iprot.skip(ftype)
+ iprot.readFieldEnd()
+ iprot.readStructEnd()
+
+ def write(self, oprot):
+ if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+ oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+ return
+ oprot.writeStructBegin('OwnerResourceSummary')
+ if self.owner is not None:
+ oprot.writeFieldBegin('owner', TType.STRING, 1)
+ oprot.writeString(self.owner.encode('utf-8'))
+ oprot.writeFieldEnd()
+ if self.total_topologies is not None:
+ oprot.writeFieldBegin('total_topologies', TType.I32, 2)
+ oprot.writeI32(self.total_topologies)
+ oprot.writeFieldEnd()
+ if self.total_executors is not None:
+ oprot.writeFieldBegin('total_executors', TType.I32, 3)
+ oprot.writeI32(self.total_executors)
+ oprot.writeFieldEnd()
+ if self.total_workers is not None:
+ oprot.writeFieldBegin('total_workers', TType.I32, 4)
+ oprot.writeI32(self.total_workers)
+ oprot.writeFieldEnd()
+ if self.memory_usage is not None:
+ oprot.writeFieldBegin('memory_usage', TType.DOUBLE, 5)
+ oprot.writeDouble(self.memory_usage)
+ oprot.writeFieldEnd()
+ if self.cpu_usage is not None:
+ oprot.writeFieldBegin('cpu_usage', TType.DOUBLE, 6)
+ oprot.writeDouble(self.cpu_usage)
+ oprot.writeFieldEnd()
+ if self.memory_guarantee is not None:
+ oprot.writeFieldBegin('memory_guarantee', TType.DOUBLE, 7)
+ oprot.writeDouble(self.memory_guarantee)
+ oprot.writeFieldEnd()
+ if self.cpu_guarantee is not None:
+ oprot.writeFieldBegin('cpu_guarantee', TType.DOUBLE, 8)
+ oprot.writeDouble(self.cpu_guarantee)
+ oprot.writeFieldEnd()
+ if self.memory_guarantee_remaining is not None:
+ oprot.writeFieldBegin('memory_guarantee_remaining', TType.DOUBLE, 9)
+ oprot.writeDouble(self.memory_guarantee_remaining)
+ oprot.writeFieldEnd()
+ if self.cpu_guarantee_remaining is not None:
+ oprot.writeFieldBegin('cpu_guarantee_remaining', TType.DOUBLE, 10)
+ oprot.writeDouble(self.cpu_guarantee_remaining)
+ oprot.writeFieldEnd()
+ if self.isolated_node_guarantee is not None:
+ oprot.writeFieldBegin('isolated_node_guarantee', TType.I32, 11)
+ oprot.writeI32(self.isolated_node_guarantee)
+ oprot.writeFieldEnd()
+ if self.total_tasks is not None:
+ oprot.writeFieldBegin('total_tasks', TType.I32, 12)
+ oprot.writeI32(self.total_tasks)
+ oprot.writeFieldEnd()
+ if self.requested_on_heap_memory is not None:
+ oprot.writeFieldBegin('requested_on_heap_memory', TType.DOUBLE, 13)
+ oprot.writeDouble(self.requested_on_heap_memory)
+ oprot.writeFieldEnd()
+ if self.requested_off_heap_memory is not None:
+ oprot.writeFieldBegin('requested_off_heap_memory', TType.DOUBLE, 14)
+ oprot.writeDouble(self.requested_off_heap_memory)
+ oprot.writeFieldEnd()
+ if self.requested_total_memory is not None:
+ oprot.writeFieldBegin('requested_total_memory', TType.DOUBLE, 15)
+ oprot.writeDouble(self.requested_total_memory)
+ oprot.writeFieldEnd()
+ if self.requested_cpu is not None:
+ oprot.writeFieldBegin('requested_cpu', TType.DOUBLE, 16)
+ oprot.writeDouble(self.requested_cpu)
+ oprot.writeFieldEnd()
+ if self.assigned_on_heap_memory is not None:
+ oprot.writeFieldBegin('assigned_on_heap_memory', TType.DOUBLE, 17)
+ oprot.writeDouble(self.assigned_on_heap_memory)
+ oprot.writeFieldEnd()
+ if self.assigned_off_heap_memory is not None:
+ oprot.writeFieldBegin('assigned_off_heap_memory', TType.DOUBLE, 18)
+ oprot.writeDouble(self.assigned_off_heap_memory)
+ oprot.writeFieldEnd()
+ oprot.writeFieldStop()
+ oprot.writeStructEnd()
+
+ def validate(self):
+ if self.owner is None:
+ raise TProtocol.TProtocolException(message='Required field owner is unset!')
+ return
+
+
+ def __hash__(self):
+ value = 17
+ value = (value * 31) ^ hash(self.owner)
+ value = (value * 31) ^ hash(self.total_topologies)
+ value = (value * 31) ^ hash(self.total_executors)
+ value = (value * 31) ^ hash(self.total_workers)
+ value = (value * 31) ^ hash(self.memory_usage)
+ value = (value * 31) ^ hash(self.cpu_usage)
+ value = (value * 31) ^ hash(self.memory_guarantee)
+ value = (value * 31) ^ hash(self.cpu_guarantee)
+ value = (value * 31) ^ hash(self.memory_guarantee_remaining)
+ value = (value * 31) ^ hash(self.cpu_guarantee_remaining)
+ value = (value * 31) ^ hash(self.isolated_node_guarantee)
+ value = (value * 31) ^ hash(self.total_tasks)
+ value = (value * 31) ^ hash(self.requested_on_heap_memory)
+ value = (value * 31) ^ hash(self.requested_off_heap_memory)
+ value = (value * 31) ^ hash(self.requested_total_memory)
+ value = (value * 31) ^ hash(self.requested_cpu)
+ value = (value * 31) ^ hash(self.assigned_on_heap_memory)
+ value = (value * 31) ^ hash(self.assigned_off_heap_memory)
+ return value
+
+ def __repr__(self):
+ L = ['%s=%r' % (key, value)
+ for key, value in self.__dict__.iteritems()]
+ return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
+
class DRPCRequest:
"""
Attributes:
http://git-wip-us.apache.org/repos/asf/storm/blob/e5d6d32b/storm-client/src/storm.thrift
----------------------------------------------------------------------
diff --git a/storm-client/src/storm.thrift b/storm-client/src/storm.thrift
index ee02d1b..961c3cc 100644
--- a/storm-client/src/storm.thrift
+++ b/storm-client/src/storm.thrift
@@ -615,6 +615,27 @@ struct TopologyHistoryInfo {
1: list<string> topo_ids;
}
+struct OwnerResourceSummary {
+ 1: required string owner;
+ 2: optional i32 total_topologies;
+ 3: optional i32 total_executors;
+ 4: optional i32 total_workers;
+ 5: optional double memory_usage;
+ 6: optional double cpu_usage;
+ 7: optional double memory_guarantee;
+ 8: optional double cpu_guarantee;
+ 9: optional double memory_guarantee_remaining;
+ 10: optional double cpu_guarantee_remaining;
+ 11: optional i32 isolated_node_guarantee;
+ 12: optional i32 total_tasks;
+ 13: optional double requested_on_heap_memory;
+ 14: optional double requested_off_heap_memory;
+ 15: optional double requested_total_memory;
+ 16: optional double requested_cpu;
+ 17: optional double assigned_on_heap_memory;
+ 18: optional double assigned_off_heap_memory;
+}
+
service Nimbus {
void submitTopology(1: string name, 2: string uploadedJarLocation, 3: string jsonConf, 4: StormTopology topology) throws (1: AlreadyAliveException e, 2: InvalidTopologyException ite, 3: AuthorizationException aze);
void submitTopologyWithOpts(1: string name, 2: string uploadedJarLocation, 3: string jsonConf, 4: StormTopology topology, 5: SubmitOptions options) throws (1: AlreadyAliveException e, 2: InvalidTopologyException ite, 3: AuthorizationException aze);
@@ -690,6 +711,7 @@ service Nimbus {
*/
StormTopology getUserTopology(1: string id) throws (1: NotAliveException e, 2: AuthorizationException aze);
TopologyHistoryInfo getTopologyHistory(1: string user) throws (1: AuthorizationException aze);
+ list<OwnerResourceSummary> getOwnerResourceSummaries (1: string owner) throws (1: AuthorizationException aze);
}
struct DRPCRequest {
http://git-wip-us.apache.org/repos/asf/storm/blob/e5d6d32b/storm-core/src/clj/org/apache/storm/ui/core.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/ui/core.clj b/storm-core/src/clj/org/apache/storm/ui/core.clj
index d1bdee8..7141569 100644
--- a/storm-core/src/clj/org/apache/storm/ui/core.clj
+++ b/storm-core/src/clj/org/apache/storm/ui/core.clj
@@ -41,7 +41,7 @@
TopologyStats CommonAggregateStats ComponentAggregateStats
ComponentType BoltAggregateStats SpoutAggregateStats
ExecutorAggregateStats SpecificAggregateStats ComponentPageInfo
- LogConfig LogLevel LogLevelAction SupervisorPageInfo WorkerSummary])
+ LogConfig LogLevel LogLevelAction SupervisorPageInfo WorkerSummary OwnerResourceSummary])
(:import [org.apache.storm.security.auth AuthUtils ReqContext])
(:import [org.apache.storm.generated AuthorizationException ProfileRequest ProfileAction NodeInfo])
(:import [org.apache.storm.security.auth AuthUtils])
@@ -81,12 +81,11 @@
(def ui:num-activate-topology-http-requests (StormMetricsRegistry/registerMeter "ui:num-activate-topology-http-requests"))
(def ui:num-deactivate-topology-http-requests (StormMetricsRegistry/registerMeter "ui:num-deactivate-topology-http-requests"))
(def ui:num-debug-topology-http-requests (StormMetricsRegistry/registerMeter "ui:num-debug-topology-http-requests"))
-(def ui:num-component-op-response-http-requests (StormMetricsRegistry/registerMeter "ui:num-component-op-response-http-requests"))
-(def ui:num-topology-op-response-http-requests (StormMetricsRegistry/registerMeter "ui:num-topology-op-response-http-requests"))
-(def ui:num-topology-op-response-http-requests (StormMetricsRegistry/registerMeter "ui:num-topology-op-response-http-requests"))
+(def ui:num-component-op-response-http-requests (StormMetricsRegistry/registerMeter "ui:num-component-op-response-http-requests"))
(def ui:num-topology-op-response-http-requests (StormMetricsRegistry/registerMeter "ui:num-topology-op-response-http-requests"))
(def ui:num-main-page-http-requests (StormMetricsRegistry/registerMeter "ui:num-main-page-http-requests"))
-(def ui:num-topology-lag-http-requests (StormMetricsRegistry/registerMeter "ui:num-topology-lag-http-requests"))
+(def ui:num-topology-lag-http-requests (StormMetricsRegistry/registerMeter "ui:num-topology-lag-http-requests"))
+(def ui:num-get-owner-resource-summaries-http-requests (StormMetricsRegistry/registerMeter "ui:num-get-owner-resource-summaries-http-request"))
(defn assert-authorized-user
([op]
@@ -525,6 +524,31 @@
(supervisor-summary-to-json s))
"schedulerDisplayResource" (*STORM-CONF* SCHEDULER-DISPLAY-RESOURCE)}))
+(defnk get-topologies-map [summs :conditional (fn [t] true) :keys nil]
+ (for [^TopologySummary t summs :when (conditional t)]
+ (let [data {"id" (.get_id t)
+ "encodedId" (URLEncoder/encode (.get_id t))
+ "owner" (.get_owner t)
+ "name" (.get_name t)
+ "status" (.get_status t)
+ "uptime" (UIHelpers/prettyUptimeSec (.get_uptime_secs t))
+ "uptimeSeconds" (.get_uptime_secs t)
+ "tasksTotal" (.get_num_tasks t)
+ "workersTotal" (.get_num_workers t)
+ "executorsTotal" (.get_num_executors t)
+ "replicationCount" (.get_replication_count t)
+ "schedulerInfo" (.get_sched_status t)
+ "requestedMemOnHeap" (.get_requested_memonheap t)
+ "requestedMemOffHeap" (.get_requested_memoffheap t)
+ "requestedTotalMem" (+ (.get_requested_memonheap t) (.get_requested_memoffheap t))
+ "requestedCpu" (.get_requested_cpu t)
+ "assignedMemOnHeap" (.get_assigned_memonheap t)
+ "assignedMemOffHeap" (.get_assigned_memoffheap t)
+ "assignedTotalMem" (+ (.get_assigned_memonheap t) (.get_assigned_memoffheap t))
+ "assignedCpu" (.get_assigned_cpu t)
+ "stormVersion" (.get_storm_version t)}]
+ (if (not-nil? keys) (select-keys data keys) data))))
+
(defn all-topologies-summary
([]
(thrift/with-configured-nimbus-connection
@@ -532,30 +556,7 @@
(all-topologies-summary
(.get_topologies (.getClusterInfo ^Nimbus$Client nimbus)))))
([summs]
- {"topologies"
- (for [^TopologySummary t summs]
- {
- "id" (.get_id t)
- "encodedId" (URLEncoder/encode (.get_id t))
- "owner" (.get_owner t)
- "name" (.get_name t)
- "status" (.get_status t)
- "uptime" (UIHelpers/prettyUptimeSec (.get_uptime_secs t))
- "uptimeSeconds" (.get_uptime_secs t)
- "tasksTotal" (.get_num_tasks t)
- "workersTotal" (.get_num_workers t)
- "executorsTotal" (.get_num_executors t)
- "replicationCount" (.get_replication_count t)
- "schedulerInfo" (.get_sched_status t)
- "requestedMemOnHeap" (.get_requested_memonheap t)
- "requestedMemOffHeap" (.get_requested_memoffheap t)
- "requestedTotalMem" (+ (.get_requested_memonheap t) (.get_requested_memoffheap t))
- "requestedCpu" (.get_requested_cpu t)
- "assignedMemOnHeap" (.get_assigned_memonheap t)
- "assignedMemOffHeap" (.get_assigned_memoffheap t)
- "assignedTotalMem" (+ (.get_assigned_memonheap t) (.get_assigned_memoffheap t))
- "assignedCpu" (.get_assigned_cpu t)
- "stormVersion" (.get_storm_version t)})
+ {"topologies" (get-topologies-map summs)
"schedulerDisplayResource" (*STORM-CONF* SCHEDULER-DISPLAY-RESOURCE)}))
(defn topology-stats [window stats]
@@ -1073,7 +1074,62 @@
"profilerActive" (if (*STORM-CONF* WORKER-PROFILER-ENABLED)
(get-active-profile-actions nimbus topology-id component)
[])))))
-
+
+(defn unpack-owner-resource-summary [summary]
+ (let [memory-guarantee (if (.is_set_memory_guarantee summary)
+ (.get_memory_guarantee summary)
+ "N/A")
+ cpu-guarantee (if (.is_set_cpu_guarantee summary)
+ (.get_cpu_guarantee summary)
+ "N/A")
+ isolated-node-guarantee (if (.is_set_isolated_node_guarantee summary)
+ (.get_isolated_node_guarantee summary)
+ "N/A")
+ memory-guarantee-remaining (if (.is_set_memory_guarantee_remaining summary)
+ (.get_memory_guarantee_remaining summary)
+ "N/A")
+ cpu-guarantee-remaining (if (.is_set_cpu_guarantee_remaining summary)
+ (.get_cpu_guarantee_remaining summary)
+ "N/A")]
+ {"owner" (.get_owner summary)
+ "totalTopologies" (.get_total_topologies summary)
+ "totalExecutors" (.get_total_executors summary)
+ "totalWorkers" (.get_total_workers summary)
+ "totalTasks" (.get_total_tasks summary)
+ "totalMemoryUsage" (.get_memory_usage summary)
+ "totalCpuUsage" (.get_cpu_usage summary)
+ "memoryGuarantee" memory-guarantee
+ "cpuGuarantee" cpu-guarantee
+ "isolatedNodes" isolated-node-guarantee
+ "memoryGuaranteeRemaining" memory-guarantee-remaining
+ "cpuGuaranteeRemaining" cpu-guarantee-remaining
+ "totalReqOnHeapMem" (.get_requested_on_heap_memory summary)
+ "totalReqOffHeapMem" (.get_requested_off_heap_memory summary)
+ "totalReqMem" (.get_requested_total_memory summary)
+ "totalReqCpu" (.get_requested_cpu summary)
+ "totalAssignedOnHeapMem" (.get_assigned_on_heap_memory summary)
+ "totalAssignedOffHeapMem" (.get_assigned_off_heap_memory summary)}))
+
+(defn owner-resource-summaries []
+ (thrift/with-configured-nimbus-connection nimbus
+ (let [summaries (.getOwnerResourceSummaries nimbus nil)]
+ {"schedulerDisplayResource" (*STORM-CONF* SCHEDULER-DISPLAY-RESOURCE)
+ "owners"
+ (for [summary summaries]
+ (unpack-owner-resource-summary summary))})))
+
+(defn owner-resource-summary [owner]
+ (thrift/with-configured-nimbus-connection nimbus
+ (let [summaries (.getOwnerResourceSummaries nimbus owner)]
+ (merge {"schedulerDisplayResource" (*STORM-CONF* SCHEDULER-DISPLAY-RESOURCE)}
+ (if (empty? summaries)
+ ;; send a default value, we couldn't find topos by that owner
+ (unpack-owner-resource-summary (OwnerResourceSummary. owner))
+ (let [topologies (.get_topologies (.getClusterInfo ^Nimbus$Client nimbus))
+ data (get-topologies-map topologies :conditional (fn [t] (= (.get_owner t) owner)))]
+ (merge {"topologies" data}
+ (unpack-owner-resource-summary (first summaries)))))))))
+
(defn- level-to-dict [level]
(if level
(let [timeout (.get_reset_log_level_timeout_secs level)
@@ -1154,6 +1210,16 @@
(populate-context! servlet-request)
(assert-authorized-user "getClusterInfo")
(json-response (nimbus-summary) (:callback m)))
+ (GET "/api/v1/owner-resources" [:as {:keys [cookies servlet-request scheme]} id & m]
+ (.mark ui:num-get-owner-resource-summaries-http-requests)
+ (populate-context! servlet-request)
+ (assert-authorized-user "getOwnerResourceSummaries")
+ (json-response (owner-resource-summaries) (:callback m)))
+ (GET "/api/v1/owner-resources/:id" [:as {:keys [cookies servlet-request scheme]} id & m]
+ (.mark ui:num-get-owner-resource-summaries-http-requests)
+ (populate-context! servlet-request)
+ (assert-authorized-user "getOwnerResourceSummaries")
+ (json-response (owner-resource-summary id) (:callback m)))
(GET "/api/v1/history/summary" [:as {:keys [cookies servlet-request]} & m]
(let [user (.getUserName http-creds-handler servlet-request)]
(json-response (topology-history-info user) (:callback m))))
http://git-wip-us.apache.org/repos/asf/storm/blob/e5d6d32b/storm-core/src/ui/public/css/style.css
----------------------------------------------------------------------
diff --git a/storm-core/src/ui/public/css/style.css b/storm-core/src/ui/public/css/style.css
index 45a3982..b8c5c61 100644
--- a/storm-core/src/ui/public/css/style.css
+++ b/storm-core/src/ui/public/css/style.css
@@ -236,3 +236,28 @@ div#visualization summary {
display: list-item;
outline: none;
}
+
+.warning {
+ width: 100%;
+ height: 150px;
+ display: none;
+}
+
+#ras-warning-fixed-buffer {
+ background: #FF0000;
+ float:left;
+ position:fixed;
+ z-index:999999 !important;
+ text-align:center;
+ color:#FFFFFF;
+}
+
+.resource-guarantee-remaining-positive {
+ color: green;
+ font-weight: bold;
+}
+
+.resource-guarantee-remaining-negative {
+ color: red;
+ font-weight: bold;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/e5d6d32b/storm-core/src/ui/public/index.html
----------------------------------------------------------------------
diff --git a/storm-core/src/ui/public/index.html b/storm-core/src/ui/public/index.html
index 69081d4..00693b5 100644
--- a/storm-core/src/ui/public/index.html
+++ b/storm-core/src/ui/public/index.html
@@ -67,6 +67,12 @@
</div>
<div class="row">
<div class="col-md-12">
+ <h2>Owner Summary</h2>
+ <div id="owner-summary"></div>
+ </div>
+ </div>
+ <div class="row">
+ <div class="col-md-12">
<h2>Topology Summary</h2>
<div id="topology-summary"></div>
</div>
@@ -115,6 +121,7 @@ $(document).ready(function() {
var clusterSummary = $("#cluster-summary");
var clusterResources = $("#cluster-resources");
var nimbusSummary = $("#nimbus-summary");
+ var ownerSummary = $("#owner-summary");
var topologySummary = $("#topology-summary");
var supervisorSummary = $("#supervisor-summary");
var config = $("#nimbus-configuration");
@@ -152,6 +159,11 @@ $(document).ready(function() {
$('#nimbus-summary [data-toggle="tooltip"]').tooltip();
});
+ $.getJSON("/api/v1/owner-resources", function(response, status, jqXHR) {
+ ownerSummary.append(Mustache.render($(indexTemplate).filter("#owner-summary-template").html(), response));
+ makeOwnerSummaryTable(response, '#owner-summary-table', '#owner-summary');
+ });
+
$.getJSON("/api/v1/topology/summary",function(response,status,jqXHR) {
topologySummary.append(Mustache.render($(indexTemplate).filter("#topology-summary-template").html(),response));
//name, owner, status, uptime, num workers, num executors, num tasks, replication count, assigned total mem, assigned total cpu, scheduler info
http://git-wip-us.apache.org/repos/asf/storm/blob/e5d6d32b/storm-core/src/ui/public/js/script.js
----------------------------------------------------------------------
diff --git a/storm-core/src/ui/public/js/script.js b/storm-core/src/ui/public/js/script.js
index a7d5deb..2ed1395 100644
--- a/storm-core/src/ui/public/js/script.js
+++ b/storm-core/src/ui/public/js/script.js
@@ -510,3 +510,74 @@ function jsError(other) {
});
}
}
+
+function getResourceGuaranteeRemainingFormat(type, data) {
+ if (type === 'display' && typeof data === "number") {
+ var resourceGuaranteeRemaining = parseFloat(data);
+ if (resourceGuaranteeRemaining > 0.0) {
+ return '<span class="resource-guarantee-remaining-positive">+' + data + '</span>'
+ }
+ if (resourceGuaranteeRemaining < 0.0) {
+ return '<span class="resource-guarantee-remaining-negative">' + data + '</span>'
+ }
+ }
+ return data;
+}
+
+var makeOwnerSummaryTable = function(response, elId, parentId) {
+ var showCpu = response.schedulerDisplayResource;
+
+ var columns = [
+ {
+ data: 'owner',
+ render: function(data, type, row) {
+ return type === 'display' ?
+ ('<a href="/owner.html?id=' + data + '">' + data + '</a>') :
+ data;
+ }
+ }, {
+ data: 'totalTopologies',
+ }, {
+ data: 'totalExecutors',
+ }, {
+ data: 'totalWorkers',
+ }, {
+ data: 'totalMemoryUsage',
+ }];
+
+ if (showCpu) {
+ columns.push({
+ data: 'memoryGuarantee'
+ });
+ columns.push({
+ data: 'memoryGuaranteeRemaining',
+ render: function(data, type, row) {
+ return getResourceGuaranteeRemainingFormat(type, data);
+ }
+ });
+ columns.push({
+ data: 'totalCpuUsage'
+ });
+ columns.push({
+ data: 'cpuGuarantee'
+ });
+ columns.push({
+ data: 'cpuGuaranteeRemaining',
+ render: function(data, type, row) {
+ return getResourceGuaranteeRemainingFormat(type, data);
+ }
+ });
+ columns.push({
+ data: 'isolatedNodes'
+ });
+ }
+
+ var userSummaryTable = dtAutoPage(elId, {
+ data: response.owners,
+ autoWidth: false,
+ columns: columns,
+ });
+
+ $(elId + ' [data-toggle="tooltip"]').tooltip();
+};
+
http://git-wip-us.apache.org/repos/asf/storm/blob/e5d6d32b/storm-core/src/ui/public/owner.html
----------------------------------------------------------------------
diff --git a/storm-core/src/ui/public/owner.html b/storm-core/src/ui/public/owner.html
new file mode 100644
index 0000000..6365123
--- /dev/null
+++ b/storm-core/src/ui/public/owner.html
@@ -0,0 +1,205 @@
+<html>
+
+<head>
+ <!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+ <meta charset="UTF-8">
+ <meta name="viewport" content="width=device-width, initial-scale=1">
+ <title>Storm UI</title>
+ <link href="/css/bootstrap-3.3.1.min.css" rel="stylesheet" type="text/css">
+ <link href="/css/jquery.dataTables.1.10.4.min.css" rel="stylesheet" type="text/css">
+ <link href="/css/dataTables.bootstrap.css" rel="stylesheet" type="text/css">
+ <link href="/css/jsonFormatter.min.css" rel="stylesheet" type="text/css">
+ <link href="/css/style.css?_ts=${packageTimestamp}" rel="stylesheet" type="text/css">
+ <script src="/js/jquery-1.11.1.min.js" type="text/javascript"></script>
+ <script src="/js/jquery.dataTables.1.10.4.min.js" type="text/javascript"></script>
+ <script src="/js/jquery.cookies.2.2.0.min.js" type="text/javascript"></script>
+ <script src="/js/jquery.mustache.js" type="text/javascript"></script>
+ <script src="/js/url.min.js" type="text/javascript"></script>
+ <script src="/js/bootstrap-3.3.1.min.js" type="text/javascript"></script>
+ <script src="/js/jquery.blockUI.min.js" type="text/javascript"></script>
+ <script src="/js/jsonFormatter.min.js" type="text/javascript"></script>
+ <script src="/js/script.js?_ts=${packageTimestamp}" type="text/javascript"></script>
+</head>
+
+<body>
+ <div class="warning" id="ras-warning-fixed-buffer">
+ <H1>This user's topologies are in danger of being unscheduled due to the owner's over-use of cluster resources.</H1>
+ <p>Please keep this user's resource consumption within guaranteed bounds to ensure topologies for this user will continue to run.</p>
+ </div>
+ <div class="warning" id="ras-warning-top-buffer"></div>
+ <div class="container-fluid">
+ <div class="row">
+ <div class="col-md-11">
+ <h1><a href="/">Storm UI</a></h1>
+ </div>
+ <div id="ui-user" class="col-md-1"></div>
+ </div>
+ <div class="row">
+ <div class="col-md-12" id="search-form">
+ </div>
+ </div>
+ <div class="row">
+ <div class="col-md-12">
+ <h2>Owner Summary</h2>
+ <div id="owner-summary"></div>
+ </div>
+ </div>
+ <div class="row">
+ <div class="col-md-12">
+ <h2 id="owner-resource-usage-summary-header">Owner Resource Usage</h2>
+ <div id="owner-resource-usage-summary"></div>
+ </div>
+ </div>
+ <div class="row">
+ <div class="col-md-12">
+ <h2 id="owner-resource-guarantee-summary-header">Owner Resource Guarantees (RAS Topologies Only)</h2>
+ <div id="owner-resource-guarantee-summary"></div>
+ </div>
+ </div>
+ <div class="row">
+ <div class="col-md-12">
+ <h2>Owner Topologies</h2>
+ <div id="topology-summary"></div>
+ </div>
+ </div>
+ <div class="row">
+ <div id="json-response-error" class="col-md-12"></div>
+ </div>
+</div>
+</body>
+<script>
+ $(document).ajaxStop($.unblockUI);
+ $(document).ajaxStart(function() {
+ $.blockUI({
+ message: '<img src="images/spinner.gif" /> <h3>Loading summary...</h3>'
+ });
+ });
+ $(document).ready(function() {
+
+ var owner = $.url("?id");
+ if (!owner) {
+ // this page is designed to be per owner, handle the case where the URL is malformed
+ getStatic("/templates/json-error-template.html", function(template) {
+ $("#json-response-error").append(Mustache.render($(template).filter("#json-error-template").html(), {
+ "error": "Invalid owner",
+ "errorMessage": "Please pass an owner id with the id URL parameter"
+ }));
+ });
+ return;
+ }
+ var url = "/api/v1/owner-resources/" + owner;
+
+ $.extend($.fn.dataTable.defaults, {
+ stateSave: true,
+ lengthMenu: [
+ [20, 40, 60, 100, -1],
+ [20, 40, 60, 100, "All"]
+ ],
+ pageLength: 20
+ });
+
+ $.ajaxSetup({
+ "error": function(jqXHR, textStatus, response) {
+ var errorJson = jQuery.parseJSON(jqXHR.responseText);
+ getStatic("/templates/json-error-template.html", function(template) {
+ $("#json-response-error").append(Mustache.render($(template).filter("#json-error-template").html(), errorJson));
+ });
+ }
+ });
+
+ function jsError(other) {
+ try {
+ other();
+ } catch (err) {
+ getStatic("/templates/json-error-template.html", function(template) {
+ $("#json-response-error").append(Mustache.render($(template).filter("#json-error-template").html(), {
+ error: "JS Error",
+ errorMessage: err
+ }));
+ });
+ }
+ }
+
+ var topologySummary = $("#topology-summary");
+ var ownerResourceUsage = $("#owner-resource-usage-summary");
+ var ownerSummary = $("#owner-summary");
+ var ownerResourceGuarantee = $("#owner-resource-guarantee-summary");
+ $.getJSON(url, function(response, status, jqXHR) {
+ getStatic("/templates/owner-page-template.html", function(template) {
+ jsError(function() {
+ //owner,totalTopologies,totalTasks,totalExecutors,totalWorkers
+ ownerSummary.append(
+ Mustache.render($(template).filter("#owner-summary-template").html(), response));
+
+ //totalReqOnHeapMem,totalReqOffHeapMem,totalReqMem,totalReqCpu,totalAssignedOnHeapMem,totalAssignedOffHeapMem,totalAssignedMem,totalAssignedCpu
+ ownerResourceUsage.append(
+ Mustache.render($(template).filter("#owner-resource-usage-template").html(), response));
+ $('#owner-resource-usage-summary [data-toggle="tooltip"]').tooltip();
+
+ if (response["cpuGuarantee"] != "N/A" || response["memoryGuarantee"] != "N/A") {
+ ownerResourceGuarantee.append(
+ Mustache.render($(template).filter("#owner-resource-guarantee-template").html(), response));
+ $('#owner-resource-guarantee-summary [data-toggle="tooltip"]').tooltip();
+
+ $('#mem-guarantee-util').html(getResourceGuaranteeRemainingFormat("display", response["memoryGuaranteeRemaining"]));
+
+ $('#cpu-guarantee-util').html(getResourceGuaranteeRemainingFormat("display", response["cpuGuaranteeRemaining"]));
+
+ } else {
+ $('#owner-resource-guarantee-summary-header').hide();
+ $('#owner-resource-guarantee-summary').hide();
+ }
+
+ var displayResource = response["schedulerDisplayResource"];
+ if (!displayResource) {
+ $('#owner-resource-usage-summary-header').hide();
+ $('#owner-resource-usage-summary').hide();
+ $('#owner-resource-guarantee-summary-header').hide();
+ $('#owner-resource-guarantee-summary').hide();
+ }
+
+ if(response && (response["memoryGuaranteeRemaining"] < 0 || response["cpuGuaranteeRemaining"] < 0
+ || response["memoryGuaranteeRemaining"] == "N/A" || response["cpuGuaranteeRemaining"] == "N/A")) {
+ $(".warning").show();
+ } else {
+ $(".warning").hide();
+ }
+
+ $('#owner-resource-usage-summary [data-toggle="tooltip"]').tooltip();
+
+ topologySummary.append(
+ Mustache.render($(template).filter("#owner-topology-summary-template").html(), response));
+ //name, owner, status, uptime, num workers, num executors, num tasks, assigned total mem, assigned total cpu, scheduler info
+ dtAutoPage("#owner-topology-summary-table", {
+ columnDefs: [{
+ type: "num",
+ targets: [4, 5, 6, 7, 8]
+ }, {
+ type: "time-str",
+ targets: [3]
+ }]
+ });
+ $('#topology-summary [data-toggle="tooltip"]').tooltip();
+
+ });
+ });
+ });
+ });
+</script>
+
+</html>
http://git-wip-us.apache.org/repos/asf/storm/blob/e5d6d32b/storm-core/src/ui/public/templates/index-page-template.html
----------------------------------------------------------------------
diff --git a/storm-core/src/ui/public/templates/index-page-template.html b/storm-core/src/ui/public/templates/index-page-template.html
index 1802452..2c46499 100644
--- a/storm-core/src/ui/public/templates/index-page-template.html
+++ b/storm-core/src/ui/public/templates/index-page-template.html
@@ -152,6 +152,71 @@
</tbody>
</table>
</script>
+<script id="owner-summary-template" type="text/html">
+ <table class="table table-striped compact" id="owner-summary-table">
+ <thead>
+ <tr>
+ <th>
+ Owner
+ </th>
+ <th>
+ <span data-toggle="tooltip" data-placement="top" title="Total number of topologies owned by user.">
+ Total Topologies
+ </span>
+ </th>
+ <th>
+ <span data-toggle="tooltip" data-placement="top" title="Total number of executors used by user.">
+ Total Executors
+ </span>
+ </th>
+ <th>
+ <span data-toggle="tooltip" data-placement="top" title="Total number of workers used by user.">
+ Total Workers
+ </span>
+ </th>
+ <th>
+ <span data-toggle="tooltip" data-placement="top" title="The amount of memory resource (in MB) used by user.">
+ Memory Usage (MB)
+ </span>
+ </th>
+ {{#schedulerDisplayResource}}
+ <th>
+ <span data-toggle="tooltip" data-placement="top" title="The amount of memory resource (in MB) guaranteed to user.">
+ Memory Guarantee (MB)
+ </span>
+ </th>
+ <th>
+ <span data-toggle="tooltip" data-placement="top" title="The amount of guaranteed memory resources (in MB) remaining.">
+ Memory Guarantee Remaining (MB)
+ </span>
+ </th>
+ <th>
+ <span data-toggle="tooltip" data-placement="top" title="Total CPU Resource Assigned on behalf of Owner. Every 100 means 1 core.">
+ CPU Usage (%)
+ </span>
+ </th>
+ <th>
+ <span data-toggle="tooltip" data-placement="top" title="The amount of CPU resource (every 100 means 1 core) guaranteed to user.">
+ CPU Guarantee (%)
+ </span>
+ </th>
+ <th>
+ <span data-toggle="tooltip" data-placement="top" title="The amount of guaranteed CPU resource (every 100 means 1 core) remaining.">
+ CPU Guarantee Remaining (%)
+ </span>
+ </th>
+ <th>
+ <span data-toggle="tooltip" data-placement="top" title="The amount of isolated nodes user may use">
+ Isolated Nodes Guarantee
+ </span>
+ </th>
+ {{/schedulerDisplayResource}}
+ </tr>
+ </thead>
+ <tbody>
+ </tbody>
+ </table>
+</script>
<script id="topology-summary-template" type="text/html">
<table class="table table-striped compact" id="topology-summary-table">
<thead>
@@ -224,7 +289,7 @@
{{#topologies}}
<tr>
<td><a href="/topology.html?id={{encodedId}}">{{name}}</a></td>
- <td>{{owner}}</td>
+ <td><a href="/owner.html?id={{owner}}">{{owner}}</a></td>
<td>{{status}}</td>
<td>{{uptime}}</td>
<td>{{workersTotal}}</td>
http://git-wip-us.apache.org/repos/asf/storm/blob/e5d6d32b/storm-core/src/ui/public/templates/owner-page-template.html
----------------------------------------------------------------------
diff --git a/storm-core/src/ui/public/templates/owner-page-template.html b/storm-core/src/ui/public/templates/owner-page-template.html
new file mode 100644
index 0000000..f97bf05
--- /dev/null
+++ b/storm-core/src/ui/public/templates/owner-page-template.html
@@ -0,0 +1,233 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<script id="owner-summary-template" type="text/html">
+ <table id="owner-summary-table" class="table compact">
+ <thead>
+ <tr>
+ <th>
+ Owner
+ </th>
+ <th>
+ <span data-toggle="tooltip" data-placement="above" title="Total number of topologies owned by owner.">
+ Total Topologies
+ </span>
+ </th>
+ <th>
+ <span data-toggle="tooltip" data-placement="above" title="Total number of tasks used by owner.">
+ Total Tasks
+ </span>
+ </th>
+ <th>
+ <span data-toggle="tooltip" data-placement="above" title="Total number of executors used by owner.">
+ Total Executors
+ </span>
+ </th>
+ <th>
+ <span data-toggle="tooltip" data-placement="above" title="Total number of workers used by owner.">
+ Total Workers
+ </span>
+ </th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td>{{owner}}</td>
+ <td>{{totalTopologies}}</td>
+ <td>{{totalTasks}}</td>
+ <td>{{totalExecutors}}</td>
+ <td>{{totalWorkers}}</td>
+ </tr>
+ </tbody>
+ </table>
+</script>
+
+<script id="owner-resource-usage-template" type="text/html">
+ <table id="owner-resource-usage-table" class="table compact">
+ <thead>
+ <tr>
+ <th>
+ </th>
+ <th>
+ On-Heap Mem (MB)
+ </th>
+ <th>
+ Off-Heap Mem (MB)
+ </th>
+ <th>
+ Total Mem (MB)
+ </th>
+ <th>
+ <span data-toggle="tooltip" data-placement="top" title="Total CPU Resource. Every 100 means 1 core.">
+ Total CPU (%)
+ </span>
+ </th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td>Requested</td>
+ <td>{{totalReqOnHeapMem}}</td>
+ <td>{{totalReqOffHeapMem}}</td>
+ <td>{{totalReqMem}}</td>
+ <td>{{totalReqCpu}}</td>
+ </tr>
+ <tr>
+ <td>Assigned</td>
+ <td>{{totalAssignedOnHeapMem}}</td>
+ <td>{{totalAssignedOffHeapMem}}</td>
+ <td>{{totalMemoryUsage}}</td>
+ <td>{{totalCpuUsage}}</td>
+ </tr>
+ </tbody>
+ </table>
+</script>
+
+<script id="owner-resource-guarantee-template" type="text/html">
+ <table id="owner-resource-guarantee-table" class="table compact">
+ <thead>
+ <tr>
+ <th>
+ <span data-toggle="tooltip" data-placement="top" title="The amount of CPU resource (every 100 means 1 core) guaranteed to owner.">
+ CPU Guarantee (%)
+ </span>
+ </th>
+ <th>
+ <span data-toggle="tooltip" data-placement="top" title="The amount of CPU resource (every 100 means 1 core) used by owner.">
+ CPU Usage (%)
+ </span>
+ </th>
+ <th>
+ <span data-toggle="tooltip" data-placement="top" title="The amount of guaranteed CPU resource (every 100 means 1 core) remaining.">
+ CPU Guarantee Remaining (%)
+ </span>
+ </th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td>{{cpuGuarantee}}</td>
+ <td>{{totalCpuUsage}}</td>
+ <td id="cpu-guarantee-util">{{cpuGuaranteeRemaining}}</td>
+ </tr>
+ </tbody>
+ <thead>
+ <tr>
+ <th>
+ <span data-toggle="tooltip" data-placement="top" title="The amount of memory resource (in MB) guaranteed to owner.">
+ Memory Guarantee (MB)
+ </span>
+ </th>
+ <th>
+ <span data-toggle="tooltip" data-placement="top" title="The amount of memory resource (in MB) used by owner.">
+ Memory Usage (MB)
+ </span>
+ </th>
+ <th>
+ <span data-toggle="tooltip" data-placement="top" title="The amount of guaranteed memory resources (in MB) remaining.">
+ Memory Guarantee Remaining (MB)
+ </span>
+ </th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td>{{memoryGuarantee}}</td>
+ <td>{{totalMemoryUsage}}</td>
+ <td id="mem-guarantee-util">{{memoryGuaranteeRemaining}}</td>
+ </tr>
+ </tbody>
+ </table>
+</script>
+
+<script id="owner-topology-summary-template" type="text/html">
+ <table id="owner-topology-summary-table" class="table table-striped compact">
+ <thead>
+ <tr>
+ <th>
+ <span data-toggle="tooltip" data-placement="top" title="The name given to the topology by when it was submitted. Click the name to view the Topology's information.">
+ Name
+ </span>
+ </th>
+ <th>
+ <span data-toggle="tooltip" data-placement="top" title="The owner that submitted the Topology, if authentication is enabled.">
+ Owner
+ </span>
+ </th>
+ <th>
+ <span data-toggle="tooltip" data-placement="top" title="The status can be one of ACTIVE, INACTIVE, KILLED, or REBALANCING.">
+ Status
+ </span>
+ </th>
+ <th>
+ <span data-toggle="tooltip" data-placement="top" title="The time since the Topology was submitted.">
+ Uptime
+ </span>
+ </th>
+ <th>
+ <span data-toggle="tooltip" data-placement="top" title="The number of Workers (processes).">
+ Num workers
+ </span>
+ </th>
+ <th>
+ <span data-toggle="tooltip" data-placement="top" title="Executors are threads in a Worker process.">
+ Num executors
+ </span>
+ </th>
+ <th>
+ <span data-toggle="tooltip" data-placement="top" title="A Task is an instance of a Bolt or Spout. The number of Tasks is almost always equal to the number of Executors.">
+ Num tasks
+ </span>
+ </th>
+ <th>
+ <span data-toggle="tooltip" data-placement="top" title="Assigned Total Memory by Scheduler.">
+ Assigned Mem (MB)
+ </span>
+ </th>
+ {{#schedulerDisplayResource}}
+ <th>
+ <span data-toggle="tooltip" data-placement="top" title="Assigned Total CPU by Scheduler. Every 100 means 1 core.">
+ Assigned CPU (%)
+ </span>
+ </th>
+ {{/schedulerDisplayResource}}
+ <th>
+ <span data-toggle="tooltip" data-placement="left" title="This shows information from the scheduler about the latest attempt to schedule the Topology on the cluster.">
+ Scheduler Info
+ </span>
+ </th>
+ </tr>
+ </thead>
+ <tbody>
+ {{#topologies}}
+ <tr>
+ <td><a href="/topology.html?id={{encodedId}}">{{name}}</a></td>
+ <td><a href="/owner.html?id={{owner}}">{{owner}}</a></td>
+ <td>{{status}}</td>
+ <td>{{uptime}}</td>
+ <td>{{workersTotal}}</td>
+ <td>{{executorsTotal}}</td>
+ <td>{{tasksTotal}}</td>
+ <td>{{assignedTotalMem}}</td>
+ {{#schedulerDisplayResource}}
+ <td>{{assignedCpu}}</td>
+ {{/schedulerDisplayResource}}
+ <td>{{schedulerInfo}}</td>
+ </tr>
+ {{/topologies}}
+ </tbody>
+ </table>
+</script>
http://git-wip-us.apache.org/repos/asf/storm/blob/e5d6d32b/storm-core/src/ui/public/templates/topology-page-template.html
----------------------------------------------------------------------
diff --git a/storm-core/src/ui/public/templates/topology-page-template.html b/storm-core/src/ui/public/templates/topology-page-template.html
index 16a4f97..a2e6201 100644
--- a/storm-core/src/ui/public/templates/topology-page-template.html
+++ b/storm-core/src/ui/public/templates/topology-page-template.html
@@ -91,7 +91,7 @@
<tr>
<td>{{name}}</td>
<td>{{id}}</td>
- <td>{{owner}}</td>
+ <td><a href="/owner.html?id={{owner}}">{{owner}}</a></td>
<td>{{status}}</td>
<td>{{uptime}}</td>
<td>{{workersTotal}}</td>
http://git-wip-us.apache.org/repos/asf/storm/blob/e5d6d32b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/nimbus_test.clj b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
index bfa8bd6..c01babb 100644
--- a/storm-core/test/clj/org/apache/storm/nimbus_test.clj
+++ b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
@@ -107,7 +107,7 @@
.get_executor_node_port
.values
(map (fn [np] (.get_node np)))
- set
+ set
)))
(defn topology-slots [state storm-name]
@@ -116,7 +116,7 @@
(->> assignment
.get_executor_node_port
.values
- set
+ set
)))
;TODO: when translating this function, don't call map-val, but instead use an inline for loop.
@@ -131,7 +131,7 @@
(group-by (fn [np] (.get_node np)))
(map-val count)
(map (fn [[_ amt]] {amt 1}))
- (apply merge-with +)
+ (apply merge-with +)
)))
(defn topology-num-nodes [state storm-name]
@@ -213,7 +213,7 @@
(is (not-nil? (.get task->node+port t)))))
(doseq [[e s] executor->node+port]
(is (not-nil? s)))
-
+
(is (= all-nodes (set (keys (.get_node_host assignment)))))
(doseq [[e s] executor->node+port]
(is (not-nil? (.get (.get_executor_start_time_secs assignment) e))))
@@ -221,7 +221,7 @@
(deftest test-bogusId
(with-open [cluster (.build (doto (LocalCluster$Builder. )
- (.withSupervisors 4)
+ (.withSupervisors 4)
(.withDaemonConf {SUPERVISOR-ENABLE false TOPOLOGY-ACKER-EXECUTORS 0 TOPOLOGY-EVENTLOGGER-EXECUTORS 0})))]
(let [state (.getClusterState cluster)
nimbus (.getNimbus cluster)]
@@ -235,7 +235,7 @@
(deftest test-assignment
(with-open [cluster (.build (doto (LocalCluster$Builder. )
(.withSimulatedTime)
- (.withSupervisors 4)
+ (.withSupervisors 4)
(.withDaemonConf {SUPERVISOR-ENABLE false TOPOLOGY-ACKER-EXECUTORS 0 TOPOLOGY-EVENTLOGGER-EXECUTORS 0})))]
(let [state (.getClusterState cluster)
topology (Thrift/buildTopology
@@ -952,6 +952,70 @@
(is (not= (executor->start t) (executor->start2 t))))
)))
+(deftest test-get-owner-resource-summaries
+ (with-open [cluster (.build (doto (LocalCluster$Builder. )
+ (.withSimulatedTime)
+ (.withSupervisors 1)
+ (.withPortsPerSupervisor 12)
+ (.withDaemonConf
+ {SUPERVISOR-ENABLE false
+ NIMBUS-MONITOR-FREQ-SECS 10
+ TOPOLOGY-MESSAGE-TIMEOUT-SECS 30
+ TOPOLOGY-ACKER-EXECUTORS 0
+ TOPOLOGY-EVENTLOGGER-EXECUTORS 0
+ })))]
+ (letlocals
+ ;test for 0-topology case
+ (.advanceClusterTime cluster 11)
+ (bind owner-resource-summaries (.getOwnerResourceSummaries (.getNimbus cluster) nil))
+ (bind summary (first owner-resource-summaries))
+ (is (nil? summary))
+
+ ;test for 1-topology case
+ (bind topology (Thrift/buildTopology
+ {"1" (Thrift/prepareSpoutDetails
+ (TestPlannerSpout. true) (Integer. 3))}
+ {}))
+ (.submitTopology cluster
+ "test"
+ {TOPOLOGY-WORKERS 3
+ TOPOLOGY-MESSAGE-TIMEOUT-SECS 90} topology)
+ (.advanceClusterTime cluster 11)
+
+ (bind owner-resource-summaries (.getOwnerResourceSummaries (.getNimbus cluster) nil))
+ (bind summary (first owner-resource-summaries))
+ (is (= (.get_total_workers summary) 3))
+ (is (= (.get_total_executors summary)) 3)
+ (is (= (.get_total_topologies summary)) 1)
+
+ ;test for many-topology case
+ (bind topology2 (Thrift/buildTopology
+ {"2" (Thrift/prepareSpoutDetails
+ (TestPlannerSpout. true) (Integer. 4))}
+ {}))
+ (bind topology3 (Thrift/buildTopology
+ {"3" (Thrift/prepareSpoutDetails
+ (TestPlannerSpout. true) (Integer. 5))}
+ {}))
+
+ (.submitTopology cluster
+ "test2"
+ {TOPOLOGY-WORKERS 4
+ TOPOLOGY-MESSAGE-TIMEOUT-SECS 90} topology2)
+
+ (.submitTopology cluster
+ "test3"
+ {TOPOLOGY-WORKERS 3
+ TOPOLOGY-MESSAGE-TIMEOUT-SECS 90} topology3)
+ (.advanceClusterTime cluster 11)
+
+ (bind owner-resource-summaries (.getOwnerResourceSummaries (.getNimbus cluster) nil))
+ (bind summary (first owner-resource-summaries))
+ (is (= (.get_total_workers summary) 10))
+ (is (= (.get_total_executors summary)) 12)
+ (is (= (.get_total_topologies summary)) 3)
+ )))
+
(deftest test-rebalance
(with-open [cluster (.build (doto (LocalCluster$Builder. )
(.withSimulatedTime)
@@ -1073,9 +1137,9 @@
node->ports (apply merge-with (fn [a b] (distinct (concat a b))) (for [np node+ports] {(.get_node np) [(first (.get_port np))]}))]]
{id node->ports}))
_ (log-message "id->node->ports: " id->node->ports)
- all-nodes (apply merge-with (fn [a b]
+ all-nodes (apply merge-with (fn [a b]
(let [ret (concat a b)]
- (log-message "Can we combine " (pr-str a) " and " (pr-str b) " without collisions? " (apply distinct? ret) " => " (pr-str ret))
+ (log-message "Can we combine " (pr-str a) " and " (pr-str b) " without collisions? " (apply distinct? ret) " => " (pr-str ret))
(is (apply distinct? ret))
(distinct ret)))
(.values id->node->ports))]
@@ -1252,7 +1316,7 @@
(.submitTopology nimbus "t1" nil "{}" topology)
;; Instead of sleeping until topology is scheduled, rebalance topology so mk-assignments is called.
(.rebalance nimbus "t1" (doto (RebalanceOptions.) (.set_wait_secs 0)))
- (wait-for-status nimbus "t1" "ACTIVE")
+ (wait-for-status nimbus "t1" "ACTIVE")
(.deactivate nimbus "t1")
(.activate nimbus "t1")
(.rebalance nimbus "t1" (RebalanceOptions.))
@@ -1302,7 +1366,7 @@
(deftest test-nimbus-iface-methods-check-authorization
(let [cluster-state (Mockito/mock IStormClusterState)
blob-store (Mockito/mock BlobStore)]
- (with-open [cluster (.build
+ (with-open [cluster (.build
(doto (LocalCluster$Builder. )
(.withClusterState cluster-state)
(.withBlobStore blob-store)
@@ -1321,7 +1385,7 @@
(deftest test-nimbus-check-authorization-params
(let [cluster-state (Mockito/mock IStormClusterState)
blob-store (Mockito/mock BlobStore)]
- (with-open [cluster (.build
+ (with-open [cluster (.build
(doto (LocalCluster$Builder. )
(.withClusterState cluster-state)
(.withBlobStore blob-store)
@@ -1333,7 +1397,7 @@
topology (Thrift/buildTopology {} {})
expected-name topology-name
expected-conf {TOPOLOGY-NAME expected-name
- "foo" "bar"}]
+ "foo" "bar"}]
(.thenReturn (Mockito/when (.getTopoId cluster-state topology-name)) (Optional/of topology-id))
(.thenReturn (Mockito/when (.readTopologyConf blob-store (Mockito/any String) (Mockito/anyObject))) expected-conf)
(.thenReturn (Mockito/when (.readTopology blob-store (Mockito/any String) (Mockito/anyObject))) nil)
@@ -1342,7 +1406,7 @@
(try
(is (= expected-conf
(->> (.getTopologyConf nimbus topology-id)
- JSONValue/parse
+ JSONValue/parse
clojurify-structure)))
(catch NotAliveException e)
(finally
@@ -1378,7 +1442,7 @@
(deftest test-check-authorization-getSupervisorPageInfo
(let [cluster-state (Mockito/mock IStormClusterState)
blob-store (Mockito/mock BlobStore)]
- (with-open [cluster (.build
+ (with-open [cluster (.build
(doto (LocalCluster$Builder. )
(.withClusterState cluster-state)
(.withBlobStore blob-store)
@@ -1400,7 +1464,7 @@
(.set_state_spouts {}))
topo-assignment {expected-name assignment}
check-auth-state (atom [])
- mock-check-authorization (fn [nimbus storm-name storm-conf operation]
+ mock-check-authorization (fn [nimbus storm-name storm-conf operation]
(swap! check-auth-state conj {:nimbus nimbus
:storm-name storm-name
:storm-conf storm-conf
@@ -1415,7 +1479,7 @@
(.thenReturn (Mockito/when (.readTopology blob-store (Mockito/any String) (Mockito/any Subject))) topology)
(.thenReturn (Mockito/when (.topologyAssignments cluster-state)) topo-assignment)
(.getSupervisorPageInfo nimbus "super1" nil true)
-
+
;; afterwards, it should get called twice
(.checkAuthorization (Mockito/verify nimbus) (Mockito/eq expected-name) (Mockito/any Map) (Mockito/eq "getSupervisorPageInfo"))
(.checkAuthorization (Mockito/verify nimbus) nil nil "getClusterInfo")
@@ -1468,7 +1532,7 @@
(deftest test-nimbus-iface-getClusterInfo-filters-topos-without-bases
(let [cluster-state (Mockito/mock IStormClusterState)
blob-store (Mockito/mock BlobStore)]
- (with-open [cluster (.build
+ (with-open [cluster (.build
(doto (LocalCluster$Builder. )
(.withClusterState cluster-state)
(.withBlobStore blob-store)))]
@@ -1495,7 +1559,7 @@
(.thenReturn (Mockito/when (.topologyBases cluster-state)) bogus-bases)
(.thenReturn (Mockito/when (.readTopologyConf blob-store (Mockito/any String) (Mockito/any Subject))) topo-conf)
(.thenReturn (Mockito/when (.readTopology blob-store (Mockito/any String) (Mockito/any Subject))) topology)
-
+
(let [topos (.get_topologies (.getClusterInfo nimbus))]
; The number of topologies in the summary is correct.
(is (= (count
@@ -1549,7 +1613,7 @@
))))
(deftest test-file-bogus-download
- (with-open [cluster (.build
+ (with-open [cluster (.build
(doto (LocalCluster$Builder. )
(.withDaemonConf {SUPERVISOR-ENABLE false TOPOLOGY-ACKER-EXECUTORS 0 TOPOLOGY-EVENTLOGGER-EXECUTORS 0})))]
(let [nimbus (.getNimbus cluster)]
@@ -1561,7 +1625,7 @@
(deftest test-validate-topo-config-on-submit
(let [cluster-state (Mockito/mock IStormClusterState)
blob-store (Mockito/mock BlobStore)]
- (with-open [cluster (.build
+ (with-open [cluster (.build
(doto (LocalCluster$Builder. )
(.withClusterState cluster-state)
(.withBlobStore blob-store)
@@ -1674,7 +1738,7 @@
(deftest empty-save-config-results-in-all-unchanged-actions
(let [cluster-state (Mockito/mock IStormClusterState)
blob-store (Mockito/mock BlobStore)]
- (with-open [cluster (.build
+ (with-open [cluster (.build
(doto (LocalCluster$Builder. )
(.withClusterState cluster-state)
(.withBlobStore blob-store)
@@ -1684,26 +1748,26 @@
mock-config (LogConfig.)
expected-config (LogConfig.)]
;; send something with content to nimbus beforehand
- (.put_to_named_logger_level previous-config "test"
+ (.put_to_named_logger_level previous-config "test"
(doto (LogLevel.)
(.set_target_log_level "ERROR")
(.set_action LogLevelAction/UPDATE)))
- (.put_to_named_logger_level expected-config "test"
+ (.put_to_named_logger_level expected-config "test"
(doto (LogLevel.)
(.set_target_log_level "ERROR")
(.set_action LogLevelAction/UNCHANGED)))
(.thenReturn (Mockito/when (.readTopologyConf blob-store (Mockito/any String) (Mockito/anyObject))) {})
(.thenReturn (Mockito/when (.topologyLogConfig cluster-state (Mockito/any String) (Mockito/anyObject))) previous-config)
-
+
(.setLogConfig nimbus "foo" mock-config)
(.setTopologyLogConfig (Mockito/verify cluster-state) (Mockito/any String) (Mockito/eq expected-config))))))
(deftest log-level-update-merges-and-flags-existent-log-level
(let [cluster-state (Mockito/mock IStormClusterState)
blob-store (Mockito/mock BlobStore)]
- (with-open [cluster (.build
+ (with-open [cluster (.build
(doto (LocalCluster$Builder. )
(.withClusterState cluster-state)
(.withBlobStore blob-store)
@@ -1713,36 +1777,36 @@
mock-config (LogConfig.)
expected-config (LogConfig.)]
;; send something with content to nimbus beforehand
- (.put_to_named_logger_level previous-config "test"
+ (.put_to_named_logger_level previous-config "test"
(doto (LogLevel.)
(.set_target_log_level "ERROR")
(.set_action LogLevelAction/UPDATE)))
- (.put_to_named_logger_level previous-config "other-test"
+ (.put_to_named_logger_level previous-config "other-test"
(doto (LogLevel.)
(.set_target_log_level "DEBUG")
(.set_action LogLevelAction/UPDATE)))
;; only change "test"
- (.put_to_named_logger_level mock-config "test"
+ (.put_to_named_logger_level mock-config "test"
(doto (LogLevel.)
(.set_target_log_level "INFO")
(.set_action LogLevelAction/UPDATE)))
- (.put_to_named_logger_level expected-config "test"
+ (.put_to_named_logger_level expected-config "test"
(doto (LogLevel.)
(.set_target_log_level "INFO")
(.set_action LogLevelAction/UPDATE)))
- (.put_to_named_logger_level expected-config "other-test"
+ (.put_to_named_logger_level expected-config "other-test"
(doto (LogLevel.)
(.set_target_log_level "DEBUG")
(.set_action LogLevelAction/UNCHANGED)))
(.thenReturn (Mockito/when (.readTopologyConf blob-store (Mockito/any String) (Mockito/anyObject))) {})
(.thenReturn (Mockito/when (.topologyLogConfig cluster-state (Mockito/any String) (Mockito/anyObject))) previous-config)
-
+
(.setLogConfig nimbus "foo" mock-config)
(.setTopologyLogConfig (Mockito/verify cluster-state) (Mockito/any String) (Mockito/eq expected-config))))))
@@ -1750,11 +1814,11 @@
(defn teardown-topo-errors [id])
(defn teardown-backpressure-dirs [id])
-(defn mock-cluster-state
- ([]
+(defn mock-cluster-state
+ ([]
(mock-cluster-state nil nil))
([active-topos inactive-topos]
- (mock-cluster-state active-topos inactive-topos inactive-topos inactive-topos))
+ (mock-cluster-state active-topos inactive-topos inactive-topos inactive-topos))
([active-topos hb-topos error-topos bp-topos]
(reify IStormClusterState
(teardownHeartbeats [this id] (teardown-heartbeats id))
@@ -1790,7 +1854,7 @@
mock-state (mock-cluster-state active-topos hb-topos error-topos bp-topos)
store (Mockito/mock BlobStore)]
(.thenReturn (Mockito/when (.storedTopoIds store)) #{})
- (is (= (Nimbus/topoIdsToClean mock-state store)
+ (is (= (Nimbus/topoIdsToClean mock-state store)
#{}))))
(deftest do-cleanup-removes-inactive-znodes
@@ -1805,8 +1869,8 @@
(.set (.getHeartbeatsCache nimbus) hb-cache)
(.thenReturn (Mockito/when (.storedTopoIds mock-blob-store)) (HashSet. inactive-topos))
(mocking
- [teardown-heartbeats
- teardown-topo-errors
+ [teardown-heartbeats
+ teardown-topo-errors
teardown-backpressure-dirs]
(.doCleanup nimbus)
@@ -1850,8 +1914,8 @@
(.set (.getHeartbeatsCache nimbus) hb-cache)
(.thenReturn (Mockito/when (.storedTopoIds mock-blob-store)) (set inactive-topos))
(mocking
- [teardown-heartbeats
- teardown-topo-errors
+ [teardown-heartbeats
+ teardown-topo-errors
teardown-backpressure-dirs]
(.doCleanup nimbus)
@@ -1883,7 +1947,7 @@
supervisor2-topologies (clojurify-structure (Nimbus/topologiesOnSupervisor assignments "super2"))
user2-topologies (clojurify-structure (.filterAuthorized nimbus "getTopology" supervisor2-topologies))]
(is (= (list "topo1") supervisor1-topologies))
- (is (= #{"topo1"} user1-topologies))
+ (is (= #{"topo1"} user1-topologies))
(is (= (list "topo1" "topo2") supervisor2-topologies))
(is (= #{"topo1" "topo2"} user2-topologies)))))
@@ -1903,6 +1967,6 @@
(.setAuthorizationHandler nimbus (reify IAuthorizer (permit [this context operation topo-conf] (= "authorized" (get topo-conf TOPOLOGY-NAME)))))
(let [supervisor-topologies (clojurify-structure (Nimbus/topologiesOnSupervisor assignments "super1"))
user-topologies (clojurify-structure (.filterAuthorized nimbus "getTopology" supervisor-topologies))]
-
+
(is (= (list "topo1" "authorized") supervisor-topologies))
(is (= #{"authorized"} user-topologies)))))
http://git-wip-us.apache.org/repos/asf/storm/blob/e5d6d32b/storm-server/src/main/java/org/apache/storm/LocalCluster.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/LocalCluster.java b/storm-server/src/main/java/org/apache/storm/LocalCluster.java
index 3f4a773..4fb4474 100644
--- a/storm-server/src/main/java/org/apache/storm/LocalCluster.java
+++ b/storm-server/src/main/java/org/apache/storm/LocalCluster.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.storm;
import java.lang.reflect.Method;
@@ -31,7 +32,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.UnaryOperator;
-
import org.apache.storm.blobstore.BlobStore;
import org.apache.storm.cluster.ClusterStateContext;
import org.apache.storm.cluster.ClusterUtils;
@@ -60,10 +60,9 @@ import org.apache.storm.generated.KeyNotFoundException;
import org.apache.storm.generated.KillOptions;
import org.apache.storm.generated.ListBlobsResult;
import org.apache.storm.generated.LogConfig;
-import org.apache.storm.generated.Nimbus.Iface;
-import org.apache.storm.generated.Nimbus.Processor;
import org.apache.storm.generated.NimbusSummary;
import org.apache.storm.generated.NotAliveException;
+import org.apache.storm.generated.OwnerResourceSummary;
import org.apache.storm.generated.ProfileAction;
import org.apache.storm.generated.ProfileRequest;
import org.apache.storm.generated.ReadableBlobMeta;
@@ -75,6 +74,8 @@ import org.apache.storm.generated.SupervisorPageInfo;
import org.apache.storm.generated.TopologyHistoryInfo;
import org.apache.storm.generated.TopologyInfo;
import org.apache.storm.generated.TopologyPageInfo;
+import org.apache.storm.generated.Nimbus.Iface;
+import org.apache.storm.generated.Nimbus.Processor;
import org.apache.storm.messaging.IContext;
import org.apache.storm.messaging.local.Context;
import org.apache.storm.nimbus.ILeaderElector;
@@ -884,33 +885,33 @@ public class LocalCluster implements ILocalClusterTrackedTopologyAware, Iface {
@Override
public void setLogConfig(String name, LogConfig config) throws TException {
// TODO Auto-generated method stub
- throw new RuntimeException("NOT IMPLMENETED YET");
+ throw new RuntimeException("NOT IMPLEMENTED YET");
}
@Override
public LogConfig getLogConfig(String name) throws TException {
// TODO Auto-generated method stub
- throw new RuntimeException("NOT IMPLMENETED YET");
+ throw new RuntimeException("NOT IMPLEMENTED YET");
}
@Override
public void debug(String name, String component, boolean enable, double samplingPercentage)
throws NotAliveException, AuthorizationException, TException {
// TODO Auto-generated method stub
- throw new RuntimeException("NOT IMPLMENETED YET");
+ throw new RuntimeException("NOT IMPLEMENTED YET");
}
@Override
public void setWorkerProfiler(String id, ProfileRequest profileRequest) throws TException {
// TODO Auto-generated method stub
- throw new RuntimeException("NOT IMPLMENETED YET");
+ throw new RuntimeException("NOT IMPLEMENTED YET");
}
@Override
public List<ProfileRequest> getComponentPendingProfileActions(String id, String component_id, ProfileAction action)
throws TException {
// TODO Auto-generated method stub
- throw new RuntimeException("NOT IMPLMENETED YET");
+ throw new RuntimeException("NOT IMPLEMENTED YET");
}
@Override
@@ -988,7 +989,7 @@ public class LocalCluster implements ILocalClusterTrackedTopologyAware, Iface {
@Override
public void createStateInZookeeper(String key) throws TException {
// TODO Auto-generated method stub
- throw new RuntimeException("NOT IMPLMENETED YET");
+ throw new RuntimeException("NOT IMPLEMENTED YET");
}
@Override
@@ -1020,7 +1021,7 @@ public class LocalCluster implements ILocalClusterTrackedTopologyAware, Iface {
@Override
public String getNimbusConf() throws AuthorizationException, TException {
// TODO Auto-generated method stub
- throw new RuntimeException("NOT IMPLMENETED YET");
+ throw new RuntimeException("NOT IMPLEMENTED YET");
}
@Override
@@ -1037,40 +1038,40 @@ public class LocalCluster implements ILocalClusterTrackedTopologyAware, Iface {
public TopologyInfo getTopologyInfoWithOpts(String id, GetInfoOptions options)
throws NotAliveException, AuthorizationException, TException {
// TODO Auto-generated method stub
- throw new RuntimeException("NOT IMPLMENETED YET");
+ throw new RuntimeException("NOT IMPLEMENTED YET");
}
@Override
public TopologyPageInfo getTopologyPageInfo(String id, String window, boolean is_include_sys)
throws NotAliveException, AuthorizationException, TException {
// TODO Auto-generated method stub
- throw new RuntimeException("NOT IMPLMENETED YET");
+ throw new RuntimeException("NOT IMPLEMENTED YET");
}
@Override
public SupervisorPageInfo getSupervisorPageInfo(String id, String host, boolean is_include_sys)
throws NotAliveException, AuthorizationException, TException {
// TODO Auto-generated method stub
- throw new RuntimeException("NOT IMPLMENETED YET");
+ throw new RuntimeException("NOT IMPLEMENTED YET");
}
@Override
public ComponentPageInfo getComponentPageInfo(String topology_id, String component_id, String window,
boolean is_include_sys) throws NotAliveException, AuthorizationException, TException {
// TODO Auto-generated method stub
- throw new RuntimeException("NOT IMPLMENETED YET");
+ throw new RuntimeException("NOT IMPLEMENTED YET");
}
@Override
public StormTopology getUserTopology(String id) throws NotAliveException, AuthorizationException, TException {
// TODO Auto-generated method stub
- throw new RuntimeException("NOT IMPLMENETED YET");
+ throw new RuntimeException("NOT IMPLEMENTED YET");
}
@Override
public TopologyHistoryInfo getTopologyHistory(String user) throws AuthorizationException, TException {
// TODO Auto-generated method stub
- throw new RuntimeException("NOT IMPLMENETED YET");
+ throw new RuntimeException("NOT IMPLEMENTED YET");
}
/**
@@ -1095,6 +1096,12 @@ public class LocalCluster implements ILocalClusterTrackedTopologyAware, Iface {
return ret;
}
}
+
+ @Override
+ public List<OwnerResourceSummary> getOwnerResourceSummaries(String owner) throws AuthorizationException, TException {
+ // TODO Auto-generated method stub
+ throw new RuntimeException("NOT IMPLEMENTED YET");
+ }
public static void main(final String [] args) throws Exception {
if (args.length < 1) {