You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/01/04 03:24:53 UTC
svn commit: r1054868 - in /activemq/activemq-apollo/trunk:
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/
apollo-dto/src/main/java/org/apache/activemq/apollo/dto/
apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/ apol...
Author: chirino
Date: Tue Jan 4 02:24:53 2011
New Revision: 1054868
URL: http://svn.apache.org/viewvc?rev=1054868&view=rev
Log:
updated queue to use "swap in/swap out" terminology instead of "load/flush" extended that into the DTO class too.
extracted the queue metrics into it's own DTO object and also added an aggregate version to collect stats from multiple queues at the virtual host and broker levels.
Added:
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/AggregateQueueMetricsDTO.java
- copied, changed from r1054867, activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostStatusDTO.java
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueMetricsDTO.java
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/BrokerStatusDTO.java
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostStatusDTO.java
activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala
activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1054868&r1=1054867&r2=1054868&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Tue Jan 4 02:24:53 2011
@@ -105,18 +105,18 @@ class Queue(val host: VirtualHost, var i
var tune_persistent = true
/**
- * Should messages be flushed or swapped out of memory if
+ * Should messages be swapped out of memory if
* no consumers need the message?
*/
var tune_swap = true
/**
- * The number max number of flushed queue entries to load
- * for the store at a time. Not that Flushed entires are just
+ * The number max number of swapped queue entries to load
+ * for the store at a time. Note that swapped entries are just
* reference pointers to the actual messages. When not loaded,
* the batch is referenced as sequence range to conserve memory.
*/
- var tune_flush_range_size = 0
+ var tune_swap_range_size = 0
/**
* The amount of memory buffer space to use per subscription.
@@ -127,27 +127,43 @@ class Queue(val host: VirtualHost, var i
config = c
tune_persistent = host.store !=null && config.persistent.getOrElse(true)
tune_swap = tune_persistent && config.swap.getOrElse(true)
- tune_flush_range_size = config.flush_range_size.getOrElse(10000)
+ tune_swap_range_size = config.swap_range_size.getOrElse(10000)
tune_consumer_buffer = config.consumer_buffer.getOrElse(32*1024)
}
configure(config)
+ var last_maintenance_ts = System.currentTimeMillis
+
var enqueue_item_counter = 0L
- var dequeue_item_counter = 0L
var enqueue_size_counter = 0L
+ var enqueue_ts = last_maintenance_ts;
+
+ var dequeue_item_counter = 0L
var dequeue_size_counter = 0L
+ var dequeue_ts = last_maintenance_ts;
+
var nack_item_counter = 0L
var nack_size_counter = 0L
+ var nack_ts = last_maintenance_ts;
def queue_size = enqueue_size_counter - dequeue_size_counter
def queue_items = enqueue_item_counter - dequeue_item_counter
- var loading_size = 0
- var flushing_size = 0
- var flushed_items = 0
+ var swapping_in_size = 0
+ var swapping_out_size = 0
+
+ var swapped_in_items = 0
+ var swapped_in_size = 0
- var capacity = 0
- var capacity_used = 0
+ var swapped_in_size_max = 0
+
+ var swap_out_item_counter = 0L
+ var swap_out_size_counter = 0L
+
+ var swap_in_item_counter = 0L
+ var swap_in_size_counter = 0L
+
+ var individual_swapped_items = 0
val swap_source = createSource(EventAggregators.INTEGER_ADD, dispatch_queue)
swap_source.setEventHandler(^{ swap_messages });
@@ -155,12 +171,12 @@ class Queue(val host: VirtualHost, var i
protected def _start(on_completed: Runnable) = {
- capacity = tune_queue_buffer;
+ swapped_in_size_max = tune_queue_buffer;
def completed: Unit = {
// by the time this is run, consumers and producers may have already joined.
on_completed.run
- schedual_consumer_sample
+ schedule_periodic_maintenance
// wake up the producers to fill us up...
if (messages.refiller != null) {
messages.refiller.run
@@ -189,7 +205,7 @@ class Queue(val host: VirtualHost, var i
} else {
- host.store.list_queue_entry_ranges(id, tune_flush_range_size) { ranges=>
+ host.store.list_queue_entry_ranges(id, tune_swap_range_size) { ranges=>
dispatch_queue {
if( ranges!=null && !ranges.isEmpty ) {
@@ -226,7 +242,7 @@ class Queue(val host: VirtualHost, var i
def addCapacity(amount:Int) = {
val was_full = messages.full
- capacity += amount
+ swapped_in_size_max += amount
if( was_full && !messages.full ) {
messages.refiller.run
}
@@ -236,7 +252,7 @@ class Queue(val host: VirtualHost, var i
var refiller: Runnable = null
- def full = (capacity_used >= capacity) || !service_state.is_started
+ def full = (swapped_in_size >= swapped_in_size_max) || !service_state.is_started
def offer(delivery: Delivery): Boolean = {
if (full) {
@@ -255,6 +271,8 @@ class Queue(val host: VirtualHost, var i
entries.addLast(entry)
enqueue_item_counter += 1
enqueue_size_counter += entry.size
+ enqueue_ts = last_maintenance_ts;
+
// Do we need to do a persistent enqueue???
if (queueDelivery.uow != null) {
@@ -269,8 +287,8 @@ class Queue(val host: VirtualHost, var i
val prev = entry.getPrevious
- if( (prev.as_loaded!=null && prev.as_loaded.flushing) || (prev.as_flushed!=null && !prev.as_flushed.loading) ) {
- entry.flush(!entry.as_loaded.acquired)
+ if( (prev.as_loaded!=null && prev.as_loaded.swapping_out ) || (prev.as_swapped!=null && !prev.as_swapped.swapping_in) ) {
+ entry.swap(!entry.as_loaded.acquired)
} else {
trigger_swap
}
@@ -288,7 +306,7 @@ class Queue(val host: VirtualHost, var i
def display_stats: Unit = {
- info("contains: %d messages worth %,.2f MB of data, producers are %s, %d/%d buffer space used.", queue_items, (queue_size.toFloat / (1024 * 1024)), {if (messages.full) "being throttled" else "not being throttled"}, capacity_used, capacity)
+ info("contains: %d messages worth %,.2f MB of data, producers are %s, %d/%d buffer space used.", queue_items, (queue_size.toFloat / (1024 * 1024)), {if (messages.full) "being throttled" else "not being throttled"}, swapped_in_size, swapped_in_size_max)
info("total messages enqueued %d, dequeues %d ", enqueue_item_counter, dequeue_item_counter)
}
@@ -297,15 +315,15 @@ class Queue(val host: VirtualHost, var i
var total_items = 0L
var total_size = 0L
while (cur != null) {
- if (cur.is_loaded || cur.hasSubs || cur.is_prefetched || cur.is_flushed_range ) {
+ if (cur.is_loaded || cur.hasSubs || cur.is_prefetched || cur.is_swapped_range ) {
info(" => " + cur)
}
total_size += cur.size
- if (cur.is_flushed || cur.is_loaded) {
+ if (cur.is_swapped || cur.is_loaded) {
total_items += 1
- } else if (cur.is_flushed_range ) {
- total_items += cur.as_flushed_range.count
+ } else if (cur.is_swapped_range ) {
+ total_items += cur.as_swapped_range.count
}
cur = cur.getNext
@@ -347,22 +365,22 @@ class Queue(val host: VirtualHost, var i
val loaded = cur.as_loaded
if( loaded!=null ) {
if( cur.prefetch_flags==0 && !loaded.acquired ) {
- val flush_asap = !cur.as_loaded.acquired
- cur.flush(flush_asap)
+ val asap = !cur.as_loaded.acquired
+ cur.swap(asap)
} else {
- cur.load // just in case it's getting flushed.
+ cur.load // just in case it's getting swapped.
}
}
cur = next
}
- // Combine flushed items into flushed ranges
- if( flushed_items > tune_flush_range_size*2 ) {
+ // Combine swapped items into swapped ranges
+ if( individual_swapped_items > tune_swap_range_size*2 ) {
- debug("Looking for flushed entries to combine")
+ debug("Looking for swapped entries to combine")
- var distance_from_sub = tune_flush_range_size;
+ var distance_from_sub = tune_swap_range_size;
var cur = entries.getHead
var combine_counter = 0;
@@ -377,11 +395,11 @@ class Queue(val host: VirtualHost, var i
} else {
distance_from_sub += 1
if( cur.can_combine_with_prev ) {
- cur.getPrevious.as_flushed_range.combineNext
+ cur.getPrevious.as_swapped_range.combineNext
combine_counter += 1
} else {
- if( cur.is_flushed && distance_from_sub > tune_flush_range_size ) {
- cur.flush_range
+ if( cur.is_swapped && distance_from_sub > tune_swap_range_size ) {
+ cur.swapped_range
combine_counter += 1
}
}
@@ -394,42 +412,37 @@ class Queue(val host: VirtualHost, var i
}
- def schedual_consumer_sample:Unit = {
-
- def slowConsumerCheck = {
- if( service_state.is_started ) {
-
- // target tune_min_subscription_rate / sec
- all_subscriptions.foreach{ case (consumer, sub)=>
-
- if ( sub.tail_parkings < 0 ) {
-
- // re-calc the avg_advanced_size
- sub.advanced_sizes += sub.advanced_size
- while( sub.advanced_sizes.size > 5 ) {
- sub.advanced_sizes = sub.advanced_sizes.drop(1)
- }
- sub.avg_advanced_size = sub.advanced_sizes.foldLeft(0)(_ + _) / sub.advanced_sizes.size
-
+ def schedule_periodic_maintenance:Unit = dispatch_queue.after(1, TimeUnit.SECONDS) {
+ if( service_state.is_started ) {
+ last_maintenance_ts = System.currentTimeMillis
+
+ // target tune_min_subscription_rate / sec
+ all_subscriptions.foreach{ case (consumer, sub)=>
+
+ if ( sub.tail_parkings < 0 ) {
+
+ // re-calc the avg_advanced_size
+ sub.advanced_sizes += sub.advanced_size
+ while( sub.advanced_sizes.size > 5 ) {
+ sub.advanced_sizes = sub.advanced_sizes.drop(1)
}
-
- sub.total_advanced_size += sub.advanced_size
- sub.advanced_size = 0
- sub.tail_parkings = 0
+ sub.avg_advanced_size = sub.advanced_sizes.foldLeft(0)(_ + _) / sub.advanced_sizes.size
}
- swap_messages
- schedual_consumer_sample
+ sub.total_advanced_size += sub.advanced_size
+ sub.advanced_size = 0
+ sub.tail_parkings = 0
+
}
- }
- dispatch_queue.dispatchAfter(1, TimeUnit.SECONDS, ^{
- slowConsumerCheck
- })
+ swap_messages
+ schedule_periodic_maintenance
+ }
}
+
def drain_acks = {
ack_source.getData.foreach {
case (entry, consumed, tx) =>
@@ -558,33 +571,33 @@ class Queue(val host: VirtualHost, var i
rc
}
- val store_flush_source = createSource(new ListEventAggregator[QueueEntry#Loaded](), dispatch_queue)
- store_flush_source.setEventHandler(^ {drain_store_flushes});
- store_flush_source.resume
+ val swap_out_completes_source = createSource(new ListEventAggregator[QueueEntry#Loaded](), dispatch_queue)
+ swap_out_completes_source.setEventHandler(^ {drain_swap_out_completes});
+ swap_out_completes_source.resume
- def drain_store_flushes() = {
- val data = store_flush_source.getData
+ def drain_swap_out_completes() = {
+ val data = swap_out_completes_source.getData
data.foreach { loaded =>
- loaded.flushed
+ loaded.swapped_out
}
messages.refiller.run
}
- val store_load_source = createSource(new ListEventAggregator[(QueueEntry#Flushed, MessageRecord)](), dispatch_queue)
+ val store_load_source = createSource(new ListEventAggregator[(QueueEntry#Swapped, MessageRecord)](), dispatch_queue)
store_load_source.setEventHandler(^ {drain_store_loads});
store_load_source.resume
def drain_store_loads() = {
val data = store_load_source.getData
- data.foreach { case (flushed,messageRecord) =>
- flushed.loaded(messageRecord)
+ data.foreach { case (swapped,message_record) =>
+ swapped.swapped_in(message_record)
}
- data.foreach { case (flushed,_) =>
- if( flushed.entry.hasSubs ) {
- flushed.entry.run
+ data.foreach { case (swapped,_) =>
+ if( swapped.entry.hasSubs ) {
+ swapped.entry.run
}
}
}
@@ -606,11 +619,11 @@ class QueueEntry(val queue:Queue, val se
// Subscriptions waiting to dispatch this entry.
var parked:List[Subscription] = Nil
- // The number of subscriptions which have requested this entry to be prefeteched (held in memory) so that it's
- // ready for them to get dispatched.
+ // subscriptions will set this to non-zero if they are interested
+ // in the entry.
var prefetch_flags:Byte = 0
- // The current state of the entry: Tail | Loaded | Flushed | Tombstone
+ // The current state of the entry: Head | Tail | Loaded | Swapped | SwappedRange
var state:EntryState = new Tail
def is_prefetched = prefetch_flags == 1
@@ -630,17 +643,18 @@ class QueueEntry(val queue:Queue, val se
def init(delivery:Delivery):QueueEntry = {
state = new Loaded(delivery, false)
- queue.capacity_used += size
+ queue.swapped_in_size += size
+ queue.swapped_in_items += 1
this
}
def init(qer:QueueEntryRecord):QueueEntry = {
- state = new Flushed(qer.message_key, qer.size)
+ state = new Swapped(qer.message_key, qer.size)
this
}
def init(range:QueueEntryRange):QueueEntry = {
- state = new FlushedRange(range.last_entry_seq, range.count, range.size)
+ state = new SwappedRange(range.last_entry_seq, range.count, range.size)
this
}
@@ -706,8 +720,8 @@ class QueueEntry(val queue:Queue, val se
def as_head = state.as_head
def as_tail = state.as_tail
- def as_flushed = state.as_flushed
- def as_flushed_range = state.as_flushed_range
+ def as_swapped = state.as_swapped
+ def as_swapped_range = state.as_swapped_range
def as_loaded = state.as_loaded
def label = state.label
@@ -716,28 +730,28 @@ class QueueEntry(val queue:Queue, val se
def is_head = this == queue.head_entry
def is_loaded = as_loaded!=null
- def is_flushed = as_flushed!=null
- def is_flushed_range = as_flushed_range!=null
+ def is_swapped = as_swapped!=null
+ def is_swapped_range = as_swapped_range!=null
// These should not change the current state.
def count = state.count
def size = state.size
def messageKey = state.message_key
- def is_flushed_or_flushing = state.is_flushed_or_flushing
+ def is_swapped_or_swapping_out = state.is_swapped_or_swapping_out
def dispatch() = state.dispatch
// These methods may cause a change in the current state.
- def flush(asap:Boolean) = state.flush(asap)
- def load = state.load
+ def swap(asap:Boolean) = state.swap_out(asap)
+ def load = state.swap_in
def remove = state.remove
- def flush_range = state.flush_range
+ def swapped_range = state.swap_range
def can_combine_with_prev = {
getPrevious !=null &&
- getPrevious.is_flushed_range &&
- ( is_flushed || is_flushed_range ) &&
- (getPrevious.count + count < queue.tune_flush_range_size)
+ getPrevious.is_swapped_range &&
+ ( is_swapped || is_swapped_range ) &&
+ (getPrevious.count + count < queue.tune_swap_range_size)
}
trait EntryState {
@@ -746,8 +760,8 @@ class QueueEntry(val queue:Queue, val se
def as_tail:Tail = null
def as_loaded:Loaded = null
- def as_flushed:Flushed = null
- def as_flushed_range:FlushedRange = null
+ def as_swapped:Swapped = null
+ def as_swapped_range:SwappedRange = null
def as_head:Head = null
/**
@@ -778,21 +792,21 @@ class QueueEntry(val queue:Queue, val se
def dispatch() = false
/**
- * @returns true if the entry is either flushed or flushing.
+ * @returns true if the entry is either swapped or swapping.
*/
- def is_flushed_or_flushing = false
+ def is_swapped_or_swapping_out = false
/**
- * Triggers the entry to get loaded if it's not already loaded.
+ * Triggers the entry to get swapped in if it's not already swapped in.
*/
- def load = {}
+ def swap_in = {}
/**
- * Triggers the entry to get flushed if it's not already flushed.
+ * Triggers the entry to get swapped out if it's not already swapped.
*/
- def flush(asap:Boolean) = {}
+ def swap_out(asap:Boolean) = {}
- def flush_range:Unit = throw new AssertionError("should only be called on flushed entries");
+ def swap_range:Unit = throw new AssertionError("should only be called on swapped entries");
/**
* Removes the entry from the queue's linked list of entries. This gets called
@@ -846,8 +860,8 @@ class QueueEntry(val queue:Queue, val se
}
override def remove = throw new AssertionError("Head entry cannot be removed")
- override def load = throw new AssertionError("Head entry cannot be loaded")
- override def flush(asap:Boolean) = throw new AssertionError("Head entry cannot be flushed")
+ override def swap_in = throw new AssertionError("Head entry cannot be loaded")
+ override def swap_out(asap:Boolean) = throw new AssertionError("Head entry cannot be swapped")
}
/**
@@ -862,8 +876,8 @@ class QueueEntry(val queue:Queue, val se
override def as_tail:Tail = this
override def remove = throw new AssertionError("Tail entry cannot be removed")
- override def load = throw new AssertionError("Tail entry cannot be loaded")
- override def flush(asap:Boolean) = throw new AssertionError("Tail entry cannot be flushed")
+ override def swap_in = throw new AssertionError("Tail entry cannot be loaded")
+ override def swap_out(asap:Boolean) = throw new AssertionError("Tail entry cannot be swapped")
}
@@ -876,27 +890,27 @@ class QueueEntry(val queue:Queue, val se
assert( delivery!=null, "delivery cannot be null")
var acquired = false
- var flushing = false
+ var swapping_out = false
def label = {
var rc = "loaded"
if( acquired ) {
rc += "|aquired"
}
- if( flushing ) {
- rc += "|flushing"
+ if( swapping_out ) {
+ rc += "|swapping out"
}
rc
}
- override def toString = { "loaded:{ stored: "+stored+", flushing: "+flushing+", acquired: "+acquired+", size:"+size+"}" }
+ override def toString = { "loaded:{ stored: "+stored+", swapping_out: "+swapping_out+", acquired: "+acquired+", size:"+size+"}" }
override def count = 1
override def size = delivery.size
override def message_key = delivery.storeKey
- override def is_flushed_or_flushing = {
- flushing
+ override def is_swapped_or_swapping_out = {
+ swapping_out
}
override def as_loaded = this
@@ -904,20 +918,20 @@ class QueueEntry(val queue:Queue, val se
def store = {
delivery.uow.enqueue(toQueueEntryRecord)
delivery.uow.on_complete(^{
- queue.store_flush_source.merge(this)
+ queue.swap_out_completes_source.merge(this)
})
}
- override def flush(asap:Boolean) = {
+ override def swap_out(asap:Boolean) = {
if( queue.tune_swap ) {
if( stored ) {
- flushing=true
- queue.flushing_size+=size
- flushed
+ swapping_out=true
+ queue.swapping_out_size+=size
+ swapped_out
} else {
- if( !flushing ) {
- flushing=true
- queue.flushing_size+=size
+ if( !swapping_out ) {
+ swapping_out=true
+ queue.swapping_out_size+=size
// The storeBatch is only set when called from the messages.offer method
if( delivery.uow!=null ) {
@@ -943,7 +957,7 @@ class QueueEntry(val queue:Queue, val se
if( asap ) {
queue.host.store.flush_message(message_key) {
- queue.store_flush_source.merge(this)
+ queue.swap_out_completes_source.merge(this)
}
}
@@ -955,36 +969,42 @@ class QueueEntry(val queue:Queue, val se
}
}
- def flushed() = {
+ def swapped_out() = {
stored = true
delivery.uow = null
- if( flushing ) {
- flushing = false
- queue.flushing_size-=size
- queue.capacity_used -= size
+ if( swapping_out ) {
+ swapping_out = false
+ queue.swapping_out_size-=size
+ queue.swapped_in_size -= size
+ queue.swapped_in_items -= 1
+
+ queue.swap_out_size_counter += size
+ queue.swap_out_item_counter += 1
+
delivery.message.release
- state = new Flushed(delivery.storeKey, size)
+ state = new Swapped(delivery.storeKey, size)
if( can_combine_with_prev ) {
- getPrevious.as_flushed_range.combineNext
+ getPrevious.as_swapped_range.combineNext
}
}
}
- override def load() = {
- if( flushing ) {
- flushing = false
- queue.flushing_size-=size
+ override def swap_in() = {
+ if( swapping_out ) {
+ swapping_out = false
+ queue.swapping_out_size-=size
}
}
override def remove = {
- if( flushing ) {
- flushing = false
- queue.flushing_size-=size
+ if( swapping_out ) {
+ swapping_out = false
+ queue.swapping_out_size-=size
}
delivery.message.release
- queue.capacity_used -= size
+ queue.swapped_in_size -= size
+ queue.swapped_in_items -= 1
super.remove
}
@@ -1072,11 +1092,11 @@ class QueueEntry(val queue:Queue, val se
// the advancing subs move on to the next entry...
advance(advancing)
-// // flush this entry out if it's not going to be needed soon.
+// // swap this entry out if it's not going to be needed soon.
// if( !hasSubs && prefetch_flags==0 ) {
-// // then flush out to make space...
-// var flush_asap = !acquired
-// flush(flush_asap)
+// // then swap out to make space...
+// var asap = !acquired
+// flush(asap)
// }
queue.trigger_swap
return true
@@ -1085,38 +1105,38 @@ class QueueEntry(val queue:Queue, val se
}
/**
- * Loaded entries are moved into the Flushed state reduce memory usage. Once a Loaded
+ * Loaded entries are moved into the Swapped state reduce memory usage. Once a Loaded
* entry is persisted, it can move into this state. This state only holds onto the
* the massage key so that it can reload the message from the store quickly when needed.
*/
- class Flushed(override val message_key:Long, override val size:Int) extends EntryState {
+ class Swapped(override val message_key:Long, override val size:Int) extends EntryState {
- queue.flushed_items += 1
+ queue.individual_swapped_items += 1
- var loading = false
+ var swapping_in = false
override def count = 1
- override def as_flushed = this
+ override def as_swapped = this
- override def is_flushed_or_flushing = true
+ override def is_swapped_or_swapping_out = true
def label = {
- var rc = "flushed"
- if( loading ) {
- rc += "|loading"
+ var rc = "swapped"
+ if( swapping_in ) {
+ rc += "|swapping in"
}
rc
}
- override def toString = { "flushed:{ loading: "+loading+", size:"+size+"}" }
+ override def toString = { "swapped:{ swapping_in: "+swapping_in+", size:"+size+"}" }
- override def load() = {
- if( !loading ) {
+ override def swap_in() = {
+ if( !swapping_in ) {
// trace("Start entry load of message seq: %s", seq)
- // start loading it back...
- loading = true
- queue.loading_size += size
+ // start swapping in...
+ swapping_in = true
+ queue.swapping_in_size += size
queue.host.store.load_message(message_key) { delivery =>
// pass off to a source so it can aggregate multiple
// loads to reduce cross thread synchronization
@@ -1136,19 +1156,24 @@ class QueueEntry(val queue:Queue, val se
}
}
- def loaded(messageRecord:MessageRecord) = {
- if( loading ) {
+ def swapped_in(messageRecord:MessageRecord) = {
+ if( swapping_in ) {
// debug("Loaded message seq: ", seq )
- loading = false
- queue.loading_size -= size
+ swapping_in = false
+ queue.swapping_in_size -= size
val delivery = new Delivery()
delivery.message = ProtocolFactory.get(messageRecord.protocol.toString).get.decode(messageRecord)
delivery.size = messageRecord.size
delivery.storeKey = messageRecord.key
- queue.capacity_used += delivery.size
- queue.flushed_items -= 1
+ queue.swapped_in_size += delivery.size
+ queue.swapped_in_items += 1
+
+ queue.swap_in_size_counter += size
+ queue.swap_in_item_counter += 1
+
+ queue.individual_swapped_items -= 1
state = new Loaded(delivery, true)
} else {
// debug("Ignoring store load of: ", messageKey)
@@ -1157,35 +1182,35 @@ class QueueEntry(val queue:Queue, val se
override def remove = {
- if( loading ) {
- loading = false
- queue.loading_size -= size
+ if( swapping_in ) {
+ swapping_in = false
+ queue.swapping_in_size -= size
}
- queue.flushed_items -= 1
+ queue.individual_swapped_items -= 1
super.remove
}
- override def flush_range = {
- if( loading ) {
- loading = false
- queue.loading_size -= size
+ override def swap_range = {
+ if( swapping_in ) {
+ swapping_in = false
+ queue.swapping_in_size -= size
}
- queue.flushed_items -= 1
- state = new FlushedRange(seq, 1, size)
+ queue.individual_swapped_items -= 1
+ state = new SwappedRange(seq, 1, size)
}
}
/**
- * A FlushedRange stat is assigned entry is used to represent a rage of flushed entries.
+ * A SwappedRange state is assigned entry is used to represent a rage of swapped entries.
*
- * Even when entries that are Flushed can us a significant amount of memory if the queue is holding
- * thousands of them. Multiple entries in the Flushed state can be combined into a single entry in
- * the FlushedRange state thereby conserving even more memory. A FlushedRange entry only tracks
+ * Even entries that are Swapped can us a significant amount of memory if the queue is holding
+ * thousands of them. Multiple entries in the swapped state can be combined into a single entry in
+ * the SwappedRange state thereby conserving even more memory. A SwappedRange entry only tracks
* the first, and last sequnce ids of the range. When the entry needs to be loaded from the range
- * it replaces the FlushedRange entry with all the Flushed entries by querying the store of all the
+ * it replaces the swapped range entry with all the swapped entries by querying the store of all the
* message keys for the entries in the range.
*/
- class FlushedRange(
+ class SwappedRange(
/** the last seq id in the range */
var last:Long,
/** the number of items in the range */
@@ -1196,24 +1221,24 @@ class QueueEntry(val queue:Queue, val se
override def count = _count
override def size = _size
- var loading = false
+ var swapping_in = false
- override def as_flushed_range = this
+ override def as_swapped_range = this
- override def is_flushed_or_flushing = true
+ override def is_swapped_or_swapping_out = true
def label = {
- var rc = "flushed_range"
- if( loading ) {
- rc = "flushed_range|loading"
+ var rc = "swapped_range"
+ if( swapping_in ) {
+ rc = "swapped_range|swapping in"
}
rc
}
- override def toString = { "flushed_range:{ loading: "+loading+", count: "+count+", size: "+size+"}" }
+ override def toString = { "swapped_range:{ swapping_in: "+swapping_in+", count: "+count+", size: "+size+"}" }
- override def load() = {
- if( !loading ) {
- loading = true
+ override def swap_in() = {
+ if( !swapping_in ) {
+ swapping_in = true
queue.host.store.list_queue_entries(queue.id, seq, last) { records =>
if( !records.isEmpty ) {
queue.dispatch_queue {
@@ -1265,17 +1290,17 @@ class QueueEntry(val queue:Queue, val se
def combineNext():Unit = {
val value = getNext
assert(value!=null)
- assert(value.is_flushed || value.is_flushed_range)
- if( value.is_flushed ) {
+ assert(value.is_swapped || value.is_swapped_range)
+ if( value.is_swapped ) {
assert(last < value.seq )
last = value.seq
_count += 1
_size += value.size
value.remove
- } else if( value.is_flushed_range ) {
+ } else if( value.is_swapped_range ) {
assert(last < value.seq )
- last = value.as_flushed_range.last
- _count += value.as_flushed_range.count
+ last = value.as_swapped_range.last
+ _count += value.as_swapped_range.count
_size += value.size
value.remove
}
@@ -1486,6 +1511,7 @@ class Subscription(val queue:Queue, val
queue.dequeue_item_counter += 1
queue.dequeue_size_counter += entry.size
+ queue.dequeue_ts = queue.last_maintenance_ts
// removes this entry from the acquired list.
unlink()
@@ -1513,6 +1539,7 @@ class Subscription(val queue:Queue, val
// track for stats
queue.nack_item_counter += 1
queue.nack_size_counter += entry.size
+ queue.nack_ts = queue.last_maintenance_ts
// The following does not need to get done for exclusive subs because
// they end up rewinding all the sub of the head of the queue.
Copied: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/AggregateQueueMetricsDTO.java (from r1054867, activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostStatusDTO.java)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/AggregateQueueMetricsDTO.java?p2=activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/AggregateQueueMetricsDTO.java&p1=activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostStatusDTO.java&r1=1054867&r2=1054868&rev=1054868&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostStatusDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/AggregateQueueMetricsDTO.java Tue Jan 4 02:24:53 2011
@@ -16,11 +16,9 @@
*/
package org.apache.activemq.apollo.dto;
-import org.codehaus.jackson.annotate.JsonProperty;
+import org.codehaus.jackson.annotate.JsonTypeInfo;
import javax.xml.bind.annotation.*;
-import java.util.ArrayList;
-import java.util.List;
/**
* <p>
@@ -28,27 +26,11 @@ import java.util.List;
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-@XmlRootElement(name="virtual_host_status")
+@XmlRootElement(name="aggregate_queue_metrics")
@XmlAccessorType(XmlAccessType.FIELD)
-public class VirtualHostStatusDTO extends ServiceStatusDTO {
+public class AggregateQueueMetricsDTO extends QueueMetricsDTO {
- /**
- * The status of the store
- */
- @XmlElementRef
- public StoreStatusDTO store;
+ @XmlAttribute(name="queues")
+ public int queues;
- /**
- * Ids of all the destinations running on the broker
- */
- @XmlElement(name="destination")
- public List<LongIdLabeledDTO> destinations = new ArrayList<LongIdLabeledDTO>();
-
-
- /**
- * The current running configuration of the object
- */
- @XmlElement
- public VirtualHostDTO config = null;
-
-}
+}
\ No newline at end of file
Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/BrokerStatusDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/BrokerStatusDTO.java?rev=1054868&r1=1054867&r2=1054868&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/BrokerStatusDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/BrokerStatusDTO.java Tue Jan 4 02:24:53 2011
@@ -60,4 +60,6 @@ public class BrokerStatusDTO extends Ser
@XmlElement
public BrokerDTO config = null;
+ @XmlElement
+ public AggregateQueueMetricsDTO aggregate_queue_metrics = new AggregateQueueMetricsDTO();
}
Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java?rev=1054868&r1=1054867&r2=1054868&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java Tue Jan 4 02:24:53 2011
@@ -87,13 +87,13 @@ public class QueueDTO {
public Boolean swap;
/**
- * The number max number of flushed queue entries to load
- * for the store at a time. Not that Flushed entires are just
+ * The number max number of swapped queue entries to load
+ * from the store at a time. Not that swapped entries are just
* reference pointers to the actual messages. When not loaded,
* the batch is referenced as sequence range to conserve memory.
*/
- @XmlAttribute(name="flush_range_size")
- public Integer flush_range_size;
+ @XmlAttribute(name="swap_range_size")
+ public Integer swap_range_size;
@XmlElement(name="acl")
public QueueAclDTO acl;
Added: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueMetricsDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueMetricsDTO.java?rev=1054868&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueMetricsDTO.java (added)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueMetricsDTO.java Tue Jan 4 02:24:53 2011
@@ -0,0 +1,77 @@
+package org.apache.activemq.apollo.dto;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlRootElement;
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+@XmlRootElement(name="queue_metrics")
+@XmlAccessorType(XmlAccessType.FIELD)
+public class QueueMetricsDTO {
+
+ @XmlAttribute(name="enqueue_item_counter")
+ public long enqueue_item_counter;
+
+ @XmlAttribute(name="enqueue_size_counter")
+ public long enqueue_size_counter;
+
+ @XmlAttribute(name="enqueue_ts")
+ public long enqueue_ts;
+
+ @XmlAttribute(name="dequeue_item_counter")
+ public long dequeue_item_counter;
+
+ @XmlAttribute(name="dequeue_size_counter")
+ public long dequeue_size_counter;
+
+ @XmlAttribute(name="dequeue_ts")
+ public long dequeue_ts;
+
+ @XmlAttribute(name="nack_item_counter")
+ public long nack_item_counter;
+
+ @XmlAttribute(name="nack_size_counter")
+ public long nack_size_counter;
+
+ @XmlAttribute(name="nack_ts")
+ public long nack_ts;
+
+ @XmlAttribute(name="queue_size")
+ public long queue_size;
+
+ @XmlAttribute(name="queue_items")
+ public long queue_items;
+
+ @XmlAttribute(name="swapped_in_size")
+ public int swapped_in_size;
+
+ @XmlAttribute(name="swapped_in_items")
+ public int swapped_in_items;
+
+ @XmlAttribute(name="swapping_in_size")
+ public int swapping_in_size;
+
+ @XmlAttribute(name="swapping_out_size")
+ public int swapping_out_size;
+
+ @XmlAttribute(name="swapped_in_size_max")
+ public int swapped_in_size_max;
+
+ @XmlAttribute(name="swap_out_item_counter")
+ public long swap_out_item_counter;
+
+ @XmlAttribute(name="swap_out_size_counter")
+ public long swap_out_size_counter;
+
+ @XmlAttribute(name="swap_in_item_counter")
+ public long swap_in_item_counter;
+
+ @XmlAttribute(name="swap_in_size_counter")
+ public long swap_in_size_counter;
+}
Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java?rev=1054868&r1=1054867&r2=1054868&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java Tue Jan 4 02:24:53 2011
@@ -38,44 +38,8 @@ public class QueueStatusDTO extends Long
@XmlElement
public BindingDTO binding;
- @XmlAttribute(name="enqueue_item_counter")
- public long enqueue_item_counter;
-
- @XmlAttribute(name="dequeue_item_counter")
- public long dequeue_item_counter;
-
- @XmlAttribute(name="enqueue_size_counter")
- public long enqueue_size_counter;
-
- @XmlAttribute(name="dequeue_size_counter")
- public long dequeue_size_counter;
-
- @XmlAttribute(name="nack_item_counter")
- public long nack_item_counter;
-
- @XmlAttribute(name="nack_size_counter")
- public long nack_size_counter;
-
- @XmlAttribute(name="queue_size")
- public long queue_size;
-
- @XmlAttribute(name="queue_items")
- public long queue_items;
-
- @XmlAttribute(name="loading_size")
- public int loading_size;
-
- @XmlAttribute(name="flushing_size")
- public int flushing_size;
-
- @XmlAttribute(name="flushed_items")
- public int flushed_items;
-
- @XmlAttribute(name="capacity_used")
- public int capacity_used;
-
- @XmlAttribute
- public int capacity;
+ @XmlElement
+ public QueueMetricsDTO metrics = new QueueMetricsDTO();
/**
* Status of the entries in the queue
Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostStatusDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostStatusDTO.java?rev=1054868&r1=1054867&r2=1054868&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostStatusDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostStatusDTO.java Tue Jan 4 02:24:53 2011
@@ -51,4 +51,7 @@ public class VirtualHostStatusDTO extend
@XmlElement
public VirtualHostDTO config = null;
+ @XmlElement
+ public AggregateQueueMetricsDTO aggregate_queue_metrics = new AggregateQueueMetricsDTO();
+
}
Modified: activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala?rev=1054868&r1=1054867&r2=1054868&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala Tue Jan 4 02:24:53 2011
@@ -16,17 +16,19 @@
*/
package org.apache.activemq.apollo.web.resources;
+import javax.ws.rs.Path
import javax.ws.rs._
import core.Response
import Response.Status._
import java.util.List
import org.apache.activemq.apollo.dto._
import java.{lang => jl}
-import collection.JavaConversions
import org.fusesource.hawtdispatch._
import org.apache.activemq.apollo.broker._
import collection.mutable.ListBuffer
import scala.util.continuations._
+import java.util.concurrent.{ConcurrentLinkedQueue, CountDownLatch}
+import scala.collection.{Iterable, JavaConversions}
/**
* <p>
@@ -51,17 +53,22 @@ case class RuntimeResource(parent:Broker
System.exit(0)
}
}
+
+ def concurrent_map[T,R](values:Iterable[T])(dqf:(T)=>DispatchQueue)(func:T=>R) = {
+ Future.all( values.map { t=>
+ dqf(t).future { func(t) }
+ })
+ }
+
private def with_broker[T](func: (org.apache.activemq.apollo.broker.Broker, Option[T]=>Unit)=>Unit):T = {
BrokerRegistry.list.headOption match {
case None=> result(NOT_FOUND)
case Some(broker)=>
-
- Future[Option[T]] { cb=>
- broker.dispatch_queue {
- func(broker, cb)
- }
- }.getOrElse(result(NOT_FOUND))
-
+ val f = Future[Option[T]]()
+ broker.dispatch_queue {
+ func(broker, f)
+ }
+ f().getOrElse(result(NOT_FOUND))
}
}
@@ -79,8 +86,8 @@ case class RuntimeResource(parent:Broker
@GET
- def get() = {
- with_broker[BrokerStatusDTO] { case (broker, cb) =>
+ def get_broker():BrokerStatusDTO = {
+ with_broker { case (broker, cb) =>
val result = new BrokerStatusDTO
result.id = broker.id
@@ -105,7 +112,10 @@ case class RuntimeResource(parent:Broker
}
}
- cb(Some(result))
+ get_queue_metrics(broker).onComplete{ metrics=>
+ result.aggregate_queue_metrics = metrics
+ cb(Some(result))
+ }
}
}
@@ -113,10 +123,68 @@ case class RuntimeResource(parent:Broker
@GET @Path("virtual-hosts")
def virtualHosts = {
val rc = new LongIdListDTO
- rc.items.addAll(get.virtual_hosts)
+ rc.items.addAll(get_broker.virtual_hosts)
rc
}
+ def aggregate_queue_metrics(queue_metrics:Iterable[QueueMetricsDTO]):AggregateQueueMetricsDTO = {
+ queue_metrics.foldLeft(new AggregateQueueMetricsDTO){ (rc, q)=>
+ rc.enqueue_item_counter += q.enqueue_item_counter
+ rc.enqueue_size_counter += q.enqueue_size_counter
+ rc.enqueue_ts = rc.enqueue_ts max q.enqueue_ts
+
+ rc.dequeue_item_counter += q.dequeue_item_counter
+ rc.dequeue_size_counter += q.dequeue_size_counter
+ rc.dequeue_ts += rc.dequeue_ts max q.dequeue_ts
+
+ rc.nack_item_counter += q.nack_item_counter
+ rc.nack_size_counter += q.nack_size_counter
+ rc.nack_ts = rc.nack_ts max q.nack_ts
+
+ rc.queue_size += q.queue_size
+ rc.queue_items += q.queue_items
+
+ rc.swap_out_item_counter += q.swap_out_item_counter
+ rc.swap_out_size_counter += q.swap_out_size_counter
+ rc.swap_in_item_counter += q.swap_in_item_counter
+ rc.swap_in_size_counter += q.swap_in_size_counter
+
+ rc.swapping_in_size += q.swapping_in_size
+ rc.swapping_out_size += q.swapping_out_size
+
+ rc.swapped_in_items += q.swapped_in_items
+ rc.swapped_in_size += q.swapped_in_size
+
+ rc.swapped_in_size_max += q.swapped_in_size_max
+
+ if( q.isInstanceOf[AggregateQueueMetricsDTO] ) {
+ rc.queues += q.asInstanceOf[AggregateQueueMetricsDTO].queues
+ } else {
+ rc.queues += 1
+ }
+ rc
+ }
+ }
+
+ def get_queue_metrics(broker:Broker):Future[AggregateQueueMetricsDTO] = {
+ val metrics = Future.all {
+ broker.virtual_hosts.values.map { host=>
+ host.dispatch_queue.flatFuture{ get_queue_metrics(host) }
+ }
+ }
+ metrics.map( x=> aggregate_queue_metrics(x) )
+ }
+
+ def get_queue_metrics(virtualHost:VirtualHost):Future[AggregateQueueMetricsDTO] = {
+ val metrics = Future.all{
+ virtualHost.router.queues.values.map { queue=>
+ queue.dispatch_queue.future { get_queue_metrics(queue) }
+ }
+ }
+ metrics.map( x=> aggregate_queue_metrics(x) )
+ }
+
+
@GET @Path("virtual-hosts/{id}")
def virtualHost(@PathParam("id") id : Long):VirtualHostStatusDTO = {
with_virtual_host(id) { case (virtualHost,cb) =>
@@ -130,14 +198,20 @@ case class RuntimeResource(parent:Broker
result.destinations.add(new LongIdLabeledDTO(node.id, node.name.toString))
}
- if( virtualHost.store != null ) {
- virtualHost.store.get_store_status { x=>
- result.store = x
+ get_queue_metrics(virtualHost).onComplete { metrics=>
+
+ result.aggregate_queue_metrics = metrics
+
+ if( virtualHost.store != null ) {
+ virtualHost.store.get_store_status { x=>
+ result.store = x
+ cb(Some(result))
+ }
+ } else {
cb(Some(result))
}
- } else {
- cb(Some(result))
}
+
}
}
@@ -215,6 +289,40 @@ case class RuntimeResource(parent:Broker
}
}
+ def get_queue_metrics(q:Queue):QueueMetricsDTO = {
+ val rc = new QueueMetricsDTO
+
+ rc.enqueue_item_counter = q.enqueue_item_counter
+ rc.enqueue_size_counter = q.enqueue_size_counter
+ rc.enqueue_ts = q.enqueue_ts
+
+ rc.dequeue_item_counter = q.dequeue_item_counter
+ rc.dequeue_size_counter = q.dequeue_size_counter
+ rc.dequeue_ts = q.dequeue_ts
+
+ rc.nack_item_counter = q.nack_item_counter
+ rc.nack_size_counter = q.nack_size_counter
+ rc.nack_ts = q.nack_ts
+
+ rc.queue_size = q.queue_size
+ rc.queue_items = q.queue_items
+
+ rc.swap_out_item_counter = q.swap_out_item_counter
+ rc.swap_out_size_counter = q.swap_out_size_counter
+ rc.swap_in_item_counter = q.swap_in_item_counter
+ rc.swap_in_size_counter = q.swap_in_size_counter
+
+ rc.swapping_in_size = q.swapping_in_size
+ rc.swapping_out_size = q.swapping_out_size
+
+ rc.swapped_in_items = q.swapped_in_items
+ rc.swapped_in_size = q.swapped_in_size
+
+ rc.swapped_in_size_max = q.swapped_in_size_max
+
+ rc
+ }
+
def status(qo:Option[Queue], entries:Boolean=false, cb:Option[QueueStatusDTO]=>Unit):Unit = if(qo==None) {
cb(None)
} else {
@@ -223,23 +331,8 @@ case class RuntimeResource(parent:Broker
val rc = new QueueStatusDTO
rc.id = q.id
rc.binding = q.binding.binding_dto
- rc.capacity_used = q.capacity_used
- rc.capacity = q.capacity
rc.config = q.config
-
- rc.enqueue_item_counter = q.enqueue_item_counter
- rc.dequeue_item_counter = q.dequeue_item_counter
- rc.enqueue_size_counter = q.enqueue_size_counter
- rc.dequeue_size_counter = q.dequeue_size_counter
- rc.nack_item_counter = q.nack_item_counter
- rc.nack_size_counter = q.nack_size_counter
-
- rc.queue_size = q.queue_size
- rc.queue_items = q.queue_items
-
- rc.loading_size = q.loading_size
- rc.flushing_size = q.flushing_size
- rc.flushed_items = q.flushed_items
+ rc.metrics = get_queue_metrics(q)
if( entries ) {
var cur = q.head_entry
@@ -296,7 +389,7 @@ case class RuntimeResource(parent:Broker
@GET @Path("connectors")
def connectors = {
val rc = new LongIdListDTO
- rc.items.addAll(get.connectors)
+ rc.items.addAll(get_broker.connectors)
rc
}
Modified: activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade?rev=1054868&r1=1054867&r2=1054868&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade (original)
+++ activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade Tue Jan 4 02:24:53 2011
@@ -16,6 +16,11 @@
- import it._
- val helper = new org.apache.activemq.apollo.web.resources.ViewHelper
- import helper._
+- def percent(n:Long, d:Long) =
+ - if( d==0 )
+ - "0.00 %"
+ - else
+ - "%,.2f %%".format(n.toFloat*100.0/d)
.breadcumbs
a(href={strip_resolve("..")}) Back
@@ -38,26 +43,30 @@
h2 Current Size
-p queue size: #{queue_items} messages
-p queue size: #{memory(queue_size)}
-- if( capacity > 0 )
- p memory used: #{ "%,.2f".format(capacity_used.toFloat*100.0/capacity) }% (#{memory(capacity_used)}/#{memory(capacity)})
-- else
- p memory used: #{ "%,.2f".format(0f) }% (#{memory(capacity_used)}/#{memory(capacity)})
+p queue size: #{metrics.queue_items} messages #{memory(metrics.queue_size)}
+p memory used: #{percent(metrics.swapped_in_size, metrics.swapped_in_size_max)} (#{memory(metrics.swapped_in_size)}/#{memory(metrics.swapped_in_size_max)})
+
+h3 Enqueue/Deqeueue Counters
+
+p enqueued: #{metrics.enqueue_item_counter} messages (#{memory(metrics.enqueue_size_counter)}), #{uptime(metrics.enqueue_ts)} ago
-h2 Enqueue/Deqeueue Counters
+p dequeued: #{metrics.dequeue_item_counter} messages (#{memory(metrics.dequeue_size_counter)}), #{uptime(metrics.dequeue_ts)} ago
-p enqueued: #{enqueue_item_counter} messages (#{memory(enqueue_size_counter)})
+p nacked: #{metrics.nack_item_counter} messages (#{memory(metrics.nack_size_counter)}), #{uptime(metrics.nack_ts)} ago
-p dequeued: #{dequeue_item_counter} messages (#{memory(dequeue_size_counter)})
+h2 Swap Metrics
-p nacked: #{nack_item_counter} messages (#{memory(nack_size_counter)})
+p swapped in: #{metrics.swapped_in_items} messages #{memory(metrics.swapped_in_size)}
+- val swapped_out_items = metrics.queue_items - metrics.swapped_in_items
+- val swapped_out_size = metrics.queue_size - metrics.swapped_in_size
+p swapped out: #{swapped_out_items} messages #{memory(swapped_out_size)}
+p percent swapped out: #{percent(swapped_out_items, metrics.queue_items)} of the messages
-h2 Swap Status
+p swapping out: #{memory(metrics.swapping_out_size)}
+p swapping in: #{memory(metrics.swapping_in_size)}
-p loading from the store: #{memory(loading_size)}
-p flushing out of memory: #{memory(flushing_size)}
-p holding : #{flushed_items} flushed message references
+p total swap outs : #{metrics.swap_out_item_counter} messages (#{memory(metrics.swap_out_size_counter)})
+p total swap ins : #{metrics.swap_in_item_counter} messages (#{memory(metrics.swap_in_size_counter)})
h3 Producers
ul