You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/01/04 03:24:53 UTC

svn commit: r1054868 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-dto/src/main/java/org/apache/activemq/apollo/dto/ apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/ apol...

Author: chirino
Date: Tue Jan  4 02:24:53 2011
New Revision: 1054868

URL: http://svn.apache.org/viewvc?rev=1054868&view=rev
Log:
updated queue to use "swap in/swap out" terminology instead of "load/flush" extended that into the DTO class too.

extracted the queue metrics into it's own DTO object and also added an aggregate version to collect stats from multiple queues at the virtual host and broker levels.

Added:
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/AggregateQueueMetricsDTO.java
      - copied, changed from r1054867, activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostStatusDTO.java
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueMetricsDTO.java
Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/BrokerStatusDTO.java
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostStatusDTO.java
    activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala
    activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1054868&r1=1054867&r2=1054868&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Tue Jan  4 02:24:53 2011
@@ -105,18 +105,18 @@ class Queue(val host: VirtualHost, var i
   var tune_persistent = true
 
   /**
-   * Should messages be flushed or swapped out of memory if
+   * Should messages be swapped out of memory if
    * no consumers need the message?
    */
   var tune_swap = true
 
   /**
-   * The number max number of flushed queue entries to load
-   * for the store at a time.  Not that Flushed entires are just
+   * The number max number of swapped queue entries to load
+   * for the store at a time.  Note that swapped entries are just
    * reference pointers to the actual messages.  When not loaded,
    * the batch is referenced as sequence range to conserve memory.
    */
-  var tune_flush_range_size = 0
+  var tune_swap_range_size = 0
 
   /**
    *  The amount of memory buffer space to use per subscription.
@@ -127,27 +127,43 @@ class Queue(val host: VirtualHost, var i
     config = c
     tune_persistent = host.store !=null && config.persistent.getOrElse(true)
     tune_swap = tune_persistent && config.swap.getOrElse(true)
-    tune_flush_range_size = config.flush_range_size.getOrElse(10000)
+    tune_swap_range_size = config.swap_range_size.getOrElse(10000)
     tune_consumer_buffer = config.consumer_buffer.getOrElse(32*1024)
   }
   configure(config)
 
+  var last_maintenance_ts = System.currentTimeMillis
+
   var enqueue_item_counter = 0L
-  var dequeue_item_counter = 0L
   var enqueue_size_counter = 0L
+  var enqueue_ts = last_maintenance_ts;
+
+  var dequeue_item_counter = 0L
   var dequeue_size_counter = 0L
+  var dequeue_ts = last_maintenance_ts;
+
   var nack_item_counter = 0L
   var nack_size_counter = 0L
+  var nack_ts = last_maintenance_ts;
 
   def queue_size = enqueue_size_counter - dequeue_size_counter
   def queue_items = enqueue_item_counter - dequeue_item_counter
 
-  var loading_size = 0
-  var flushing_size = 0
-  var flushed_items = 0
+  var swapping_in_size = 0
+  var swapping_out_size = 0
+
+  var swapped_in_items = 0
+  var swapped_in_size = 0
 
-  var capacity = 0
-  var capacity_used = 0
+  var swapped_in_size_max = 0
+
+  var swap_out_item_counter = 0L
+  var swap_out_size_counter = 0L
+
+  var swap_in_item_counter = 0L
+  var swap_in_size_counter = 0L
+
+  var individual_swapped_items = 0
 
   val swap_source = createSource(EventAggregators.INTEGER_ADD, dispatch_queue)
   swap_source.setEventHandler(^{ swap_messages });
@@ -155,12 +171,12 @@ class Queue(val host: VirtualHost, var i
 
   protected def _start(on_completed: Runnable) = {
 
-    capacity = tune_queue_buffer;
+    swapped_in_size_max = tune_queue_buffer;
 
     def completed: Unit = {
       // by the time this is run, consumers and producers may have already joined.
       on_completed.run
-      schedual_consumer_sample
+      schedule_periodic_maintenance
       // wake up the producers to fill us up...
       if (messages.refiller != null) {
         messages.refiller.run
@@ -189,7 +205,7 @@ class Queue(val host: VirtualHost, var i
 
       } else {
 
-        host.store.list_queue_entry_ranges(id, tune_flush_range_size) { ranges=>
+        host.store.list_queue_entry_ranges(id, tune_swap_range_size) { ranges=>
           dispatch_queue {
             if( ranges!=null && !ranges.isEmpty ) {
 
@@ -226,7 +242,7 @@ class Queue(val host: VirtualHost, var i
 
   def addCapacity(amount:Int) = {
     val was_full = messages.full
-    capacity += amount
+    swapped_in_size_max += amount
     if( was_full && !messages.full ) {
       messages.refiller.run
     }
@@ -236,7 +252,7 @@ class Queue(val host: VirtualHost, var i
 
     var refiller: Runnable = null
 
-    def full = (capacity_used >= capacity) || !service_state.is_started
+    def full = (swapped_in_size >= swapped_in_size_max) || !service_state.is_started
 
     def offer(delivery: Delivery): Boolean = {
       if (full) {
@@ -255,6 +271,8 @@ class Queue(val host: VirtualHost, var i
         entries.addLast(entry)
         enqueue_item_counter += 1
         enqueue_size_counter += entry.size
+        enqueue_ts = last_maintenance_ts;
+
 
         // Do we need to do a persistent enqueue???
         if (queueDelivery.uow != null) {
@@ -269,8 +287,8 @@ class Queue(val host: VirtualHost, var i
 
         val prev = entry.getPrevious
 
-        if( (prev.as_loaded!=null && prev.as_loaded.flushing) || (prev.as_flushed!=null && !prev.as_flushed.loading) ) {
-          entry.flush(!entry.as_loaded.acquired)
+        if( (prev.as_loaded!=null && prev.as_loaded.swapping_out ) || (prev.as_swapped!=null && !prev.as_swapped.swapping_in) ) {
+          entry.swap(!entry.as_loaded.acquired)
         } else {
           trigger_swap
         }
@@ -288,7 +306,7 @@ class Queue(val host: VirtualHost, var i
 
 
   def display_stats: Unit = {
-    info("contains: %d messages worth %,.2f MB of data, producers are %s, %d/%d buffer space used.", queue_items, (queue_size.toFloat / (1024 * 1024)), {if (messages.full) "being throttled" else "not being throttled"}, capacity_used, capacity)
+    info("contains: %d messages worth %,.2f MB of data, producers are %s, %d/%d buffer space used.", queue_items, (queue_size.toFloat / (1024 * 1024)), {if (messages.full) "being throttled" else "not being throttled"}, swapped_in_size, swapped_in_size_max)
     info("total messages enqueued %d, dequeues %d ", enqueue_item_counter, dequeue_item_counter)
   }
 
@@ -297,15 +315,15 @@ class Queue(val host: VirtualHost, var i
     var total_items = 0L
     var total_size = 0L
     while (cur != null) {
-      if (cur.is_loaded || cur.hasSubs || cur.is_prefetched || cur.is_flushed_range ) {
+      if (cur.is_loaded || cur.hasSubs || cur.is_prefetched || cur.is_swapped_range ) {
         info("  => " + cur)
       }
 
       total_size += cur.size
-      if (cur.is_flushed || cur.is_loaded) {
+      if (cur.is_swapped || cur.is_loaded) {
         total_items += 1
-      } else if (cur.is_flushed_range ) {
-        total_items += cur.as_flushed_range.count
+      } else if (cur.is_swapped_range ) {
+        total_items += cur.as_swapped_range.count
       }
       
       cur = cur.getNext
@@ -347,22 +365,22 @@ class Queue(val host: VirtualHost, var i
       val loaded = cur.as_loaded
       if( loaded!=null ) {
         if( cur.prefetch_flags==0 && !loaded.acquired  ) {
-          val flush_asap = !cur.as_loaded.acquired
-          cur.flush(flush_asap)
+          val asap = !cur.as_loaded.acquired
+          cur.swap(asap)
         } else {
-          cur.load // just in case it's getting flushed.
+          cur.load // just in case it's getting swapped.
         }
       }
       cur = next
     }
 
 
-    // Combine flushed items into flushed ranges
-    if( flushed_items > tune_flush_range_size*2 ) {
+    // Combine swapped items into swapped ranges
+    if( individual_swapped_items > tune_swap_range_size*2 ) {
 
-      debug("Looking for flushed entries to combine")
+      debug("Looking for swapped entries to combine")
 
-      var distance_from_sub = tune_flush_range_size;
+      var distance_from_sub = tune_swap_range_size;
       var cur = entries.getHead
       var combine_counter = 0;
 
@@ -377,11 +395,11 @@ class Queue(val host: VirtualHost, var i
         } else {
           distance_from_sub += 1
           if( cur.can_combine_with_prev ) {
-            cur.getPrevious.as_flushed_range.combineNext
+            cur.getPrevious.as_swapped_range.combineNext
             combine_counter += 1
           } else {
-            if( cur.is_flushed && distance_from_sub > tune_flush_range_size ) {
-              cur.flush_range
+            if( cur.is_swapped && distance_from_sub > tune_swap_range_size ) {
+              cur.swapped_range
               combine_counter += 1
             }
           }
@@ -394,42 +412,37 @@ class Queue(val host: VirtualHost, var i
 
   }
 
-  def schedual_consumer_sample:Unit = {
-
-    def slowConsumerCheck = {
-      if( service_state.is_started ) {
-
-        // target tune_min_subscription_rate / sec
-        all_subscriptions.foreach{ case (consumer, sub)=>
-          
-          if ( sub.tail_parkings < 0 ) {
-
-            // re-calc the avg_advanced_size
-            sub.advanced_sizes += sub.advanced_size
-            while( sub.advanced_sizes.size > 5 ) {
-              sub.advanced_sizes = sub.advanced_sizes.drop(1)
-            }
-            sub.avg_advanced_size = sub.advanced_sizes.foldLeft(0)(_ + _) /  sub.advanced_sizes.size
-
+  def schedule_periodic_maintenance:Unit = dispatch_queue.after(1, TimeUnit.SECONDS) {
+    if( service_state.is_started ) {
+      last_maintenance_ts = System.currentTimeMillis
+
+      // target tune_min_subscription_rate / sec
+      all_subscriptions.foreach{ case (consumer, sub)=>
+
+        if ( sub.tail_parkings < 0 ) {
+
+          // re-calc the avg_advanced_size
+          sub.advanced_sizes += sub.advanced_size
+          while( sub.advanced_sizes.size > 5 ) {
+            sub.advanced_sizes = sub.advanced_sizes.drop(1)
           }
-          
-          sub.total_advanced_size += sub.advanced_size
-          sub.advanced_size = 0
-          sub.tail_parkings = 0
+          sub.avg_advanced_size = sub.advanced_sizes.foldLeft(0)(_ + _) /  sub.advanced_sizes.size
 
         }
 
-        swap_messages
-        schedual_consumer_sample
+        sub.total_advanced_size += sub.advanced_size
+        sub.advanced_size = 0
+        sub.tail_parkings = 0
+
       }
-    }
 
-    dispatch_queue.dispatchAfter(1, TimeUnit.SECONDS, ^{
-      slowConsumerCheck
-    })
+      swap_messages
+      schedule_periodic_maintenance
+    }
   }
 
 
+
   def drain_acks = {
     ack_source.getData.foreach {
       case (entry, consumed, tx) =>
@@ -558,33 +571,33 @@ class Queue(val host: VirtualHost, var i
     rc
   }
 
-  val store_flush_source = createSource(new ListEventAggregator[QueueEntry#Loaded](), dispatch_queue)
-  store_flush_source.setEventHandler(^ {drain_store_flushes});
-  store_flush_source.resume
+  val swap_out_completes_source = createSource(new ListEventAggregator[QueueEntry#Loaded](), dispatch_queue)
+  swap_out_completes_source.setEventHandler(^ {drain_swap_out_completes});
+  swap_out_completes_source.resume
 
-  def drain_store_flushes() = {
-    val data = store_flush_source.getData
+  def drain_swap_out_completes() = {
+    val data = swap_out_completes_source.getData
     data.foreach { loaded =>
-      loaded.flushed
+      loaded.swapped_out
     }
     messages.refiller.run
 
   }
 
-  val store_load_source = createSource(new ListEventAggregator[(QueueEntry#Flushed, MessageRecord)](), dispatch_queue)
+  val store_load_source = createSource(new ListEventAggregator[(QueueEntry#Swapped, MessageRecord)](), dispatch_queue)
   store_load_source.setEventHandler(^ {drain_store_loads});
   store_load_source.resume
 
 
   def drain_store_loads() = {
     val data = store_load_source.getData
-    data.foreach { case (flushed,messageRecord) =>
-      flushed.loaded(messageRecord)
+    data.foreach { case (swapped,message_record) =>
+      swapped.swapped_in(message_record)
     }
 
-    data.foreach { case (flushed,_) =>
-      if( flushed.entry.hasSubs ) {
-        flushed.entry.run
+    data.foreach { case (swapped,_) =>
+      if( swapped.entry.hasSubs ) {
+        swapped.entry.run
       }
     }
   }
@@ -606,11 +619,11 @@ class QueueEntry(val queue:Queue, val se
   // Subscriptions waiting to dispatch this entry.
   var parked:List[Subscription] = Nil
 
-  // The number of subscriptions which have requested this entry to be prefeteched (held in memory) so that it's
-  // ready for them to get dispatched.
+  // subscriptions will set this to non-zero if they are interested
+  // in the entry.
   var prefetch_flags:Byte = 0
 
-  // The current state of the entry: Tail | Loaded | Flushed | Tombstone
+  // The current state of the entry: Head | Tail | Loaded | Swapped | SwappedRange
   var state:EntryState = new Tail
 
   def is_prefetched = prefetch_flags == 1
@@ -630,17 +643,18 @@ class QueueEntry(val queue:Queue, val se
 
   def init(delivery:Delivery):QueueEntry = {
     state = new Loaded(delivery, false)
-    queue.capacity_used += size
+    queue.swapped_in_size += size
+    queue.swapped_in_items += 1
     this
   }
 
   def init(qer:QueueEntryRecord):QueueEntry = {
-    state = new Flushed(qer.message_key, qer.size)
+    state = new Swapped(qer.message_key, qer.size)
     this
   }
 
   def init(range:QueueEntryRange):QueueEntry = {
-    state = new FlushedRange(range.last_entry_seq, range.count, range.size)
+    state = new SwappedRange(range.last_entry_seq, range.count, range.size)
     this
   }
 
@@ -706,8 +720,8 @@ class QueueEntry(val queue:Queue, val se
   def as_head = state.as_head
   def as_tail = state.as_tail
 
-  def as_flushed = state.as_flushed
-  def as_flushed_range = state.as_flushed_range
+  def as_swapped = state.as_swapped
+  def as_swapped_range = state.as_swapped_range
   def as_loaded = state.as_loaded
 
   def label = state.label
@@ -716,28 +730,28 @@ class QueueEntry(val queue:Queue, val se
   def is_head = this == queue.head_entry
 
   def is_loaded = as_loaded!=null
-  def is_flushed = as_flushed!=null
-  def is_flushed_range = as_flushed_range!=null
+  def is_swapped = as_swapped!=null
+  def is_swapped_range = as_swapped_range!=null
 
   // These should not change the current state.
   def count = state.count
   def size = state.size
   def messageKey = state.message_key
-  def is_flushed_or_flushing = state.is_flushed_or_flushing
+  def is_swapped_or_swapping_out = state.is_swapped_or_swapping_out
   def dispatch() = state.dispatch
 
   // These methods may cause a change in the current state.
-  def flush(asap:Boolean) = state.flush(asap)
-  def load = state.load
+  def swap(asap:Boolean) = state.swap_out(asap)
+  def load = state.swap_in
   def remove = state.remove
 
-  def flush_range = state.flush_range
+  def swapped_range = state.swap_range
 
   def can_combine_with_prev = {
     getPrevious !=null &&
-      getPrevious.is_flushed_range &&
-        ( is_flushed || is_flushed_range ) &&
-          (getPrevious.count + count  < queue.tune_flush_range_size)
+      getPrevious.is_swapped_range &&
+        ( is_swapped || is_swapped_range ) &&
+          (getPrevious.count + count  < queue.tune_swap_range_size)
   }
 
   trait EntryState {
@@ -746,8 +760,8 @@ class QueueEntry(val queue:Queue, val se
 
     def as_tail:Tail = null
     def as_loaded:Loaded = null
-    def as_flushed:Flushed = null
-    def as_flushed_range:FlushedRange = null
+    def as_swapped:Swapped = null
+    def as_swapped_range:SwappedRange = null
     def as_head:Head = null
 
     /**
@@ -778,21 +792,21 @@ class QueueEntry(val queue:Queue, val se
     def dispatch() = false
 
     /**
-     * @returns true if the entry is either flushed or flushing.
+     * @returns true if the entry is either swapped or swapping.
      */
-    def is_flushed_or_flushing = false
+    def is_swapped_or_swapping_out = false
 
     /**
-     * Triggers the entry to get loaded if it's not already loaded.
+     * Triggers the entry to get swapped in if it's not already swapped in.
      */
-    def load = {}
+    def swap_in = {}
 
     /**
-     * Triggers the entry to get flushed if it's not already flushed.
+     * Triggers the entry to get swapped out if it's not already swapped.
      */
-    def flush(asap:Boolean) = {}
+    def swap_out(asap:Boolean) = {}
 
-    def flush_range:Unit = throw new AssertionError("should only be called on flushed entries");
+    def swap_range:Unit = throw new AssertionError("should only be called on swapped entries");
 
     /**
      * Removes the entry from the queue's linked list of entries.  This gets called
@@ -846,8 +860,8 @@ class QueueEntry(val queue:Queue, val se
     }
 
     override def remove = throw new AssertionError("Head entry cannot be removed")
-    override def load = throw new AssertionError("Head entry cannot be loaded")
-    override def flush(asap:Boolean) = throw new AssertionError("Head entry cannot be flushed")
+    override def swap_in = throw new AssertionError("Head entry cannot be loaded")
+    override def swap_out(asap:Boolean) = throw new AssertionError("Head entry cannot be swapped")
   }
 
   /**
@@ -862,8 +876,8 @@ class QueueEntry(val queue:Queue, val se
     override def as_tail:Tail = this
 
     override def remove = throw new AssertionError("Tail entry cannot be removed")
-    override def load = throw new AssertionError("Tail entry cannot be loaded")
-    override def flush(asap:Boolean) = throw new AssertionError("Tail entry cannot be flushed")
+    override def swap_in = throw new AssertionError("Tail entry cannot be loaded")
+    override def swap_out(asap:Boolean) = throw new AssertionError("Tail entry cannot be swapped")
 
   }
 
@@ -876,27 +890,27 @@ class QueueEntry(val queue:Queue, val se
     assert( delivery!=null, "delivery cannot be null")
 
     var acquired = false
-    var flushing = false
+    var swapping_out = false
 
     def label = {
       var rc = "loaded"
       if( acquired ) {
         rc += "|aquired"
       }
-      if( flushing ) {
-        rc += "|flushing"
+      if( swapping_out ) {
+        rc += "|swapping out"
       }
       rc
     }
 
-    override def toString = { "loaded:{ stored: "+stored+", flushing: "+flushing+", acquired: "+acquired+", size:"+size+"}" }
+    override def toString = { "loaded:{ stored: "+stored+", swapping_out: "+swapping_out+", acquired: "+acquired+", size:"+size+"}" }
 
     override def count = 1
     override def size = delivery.size
     override def message_key = delivery.storeKey
 
-    override def is_flushed_or_flushing = {
-      flushing
+    override def is_swapped_or_swapping_out = {
+      swapping_out
     }
 
     override  def as_loaded = this
@@ -904,20 +918,20 @@ class QueueEntry(val queue:Queue, val se
     def store = {
       delivery.uow.enqueue(toQueueEntryRecord)
       delivery.uow.on_complete(^{
-        queue.store_flush_source.merge(this)
+        queue.swap_out_completes_source.merge(this)
       })
     }
 
-    override def flush(asap:Boolean) = {
+    override def swap_out(asap:Boolean) = {
       if( queue.tune_swap ) {
         if( stored ) {
-          flushing=true
-          queue.flushing_size+=size
-          flushed
+          swapping_out=true
+          queue.swapping_out_size+=size
+          swapped_out
         } else {
-          if( !flushing ) {
-            flushing=true
-            queue.flushing_size+=size
+          if( !swapping_out ) {
+            swapping_out=true
+            queue.swapping_out_size+=size
 
             // The storeBatch is only set when called from the messages.offer method
             if( delivery.uow!=null ) {
@@ -943,7 +957,7 @@ class QueueEntry(val queue:Queue, val se
                   
                 if( asap ) {
                   queue.host.store.flush_message(message_key) {
-                    queue.store_flush_source.merge(this)
+                    queue.swap_out_completes_source.merge(this)
                   }
                 }
 
@@ -955,36 +969,42 @@ class QueueEntry(val queue:Queue, val se
       }
     }
 
-    def flushed() = {
+    def swapped_out() = {
       stored = true
       delivery.uow = null
-      if( flushing ) {
-        flushing = false
-        queue.flushing_size-=size
-        queue.capacity_used -= size
+      if( swapping_out ) {
+        swapping_out = false
+        queue.swapping_out_size-=size
+        queue.swapped_in_size -= size
+        queue.swapped_in_items -= 1
+
+        queue.swap_out_size_counter += size
+        queue.swap_out_item_counter += 1
+
         delivery.message.release
 
-        state = new Flushed(delivery.storeKey, size)
+        state = new Swapped(delivery.storeKey, size)
         if( can_combine_with_prev ) {
-          getPrevious.as_flushed_range.combineNext
+          getPrevious.as_swapped_range.combineNext
         }
       }
     }
 
-    override def load() = {
-      if( flushing ) {
-        flushing = false
-        queue.flushing_size-=size
+    override def swap_in() = {
+      if( swapping_out ) {
+        swapping_out = false
+        queue.swapping_out_size-=size
       }
     }
 
     override def remove = {
-      if( flushing ) {
-        flushing = false
-        queue.flushing_size-=size
+      if( swapping_out ) {
+        swapping_out = false
+        queue.swapping_out_size-=size
       }
       delivery.message.release
-      queue.capacity_used -= size
+      queue.swapped_in_size -= size
+      queue.swapped_in_items -= 1
       super.remove
     }
 
@@ -1072,11 +1092,11 @@ class QueueEntry(val queue:Queue, val se
         // the advancing subs move on to the next entry...
         advance(advancing)
 
-//        // flush this entry out if it's not going to be needed soon.
+//        // swap this entry out if it's not going to be needed soon.
 //        if( !hasSubs && prefetch_flags==0 ) {
-//          // then flush out to make space...
-//          var flush_asap = !acquired
-//          flush(flush_asap)
+//          // then swap out to make space...
+//          var asap = !acquired
+//          flush(asap)
 //        }
         queue.trigger_swap
         return true
@@ -1085,38 +1105,38 @@ class QueueEntry(val queue:Queue, val se
   }
 
   /**
-   * Loaded entries are moved into the Flushed state reduce memory usage.  Once a Loaded
+   * Loaded entries are moved into the Swapped state reduce memory usage.  Once a Loaded
    * entry is persisted, it can move into this state.  This state only holds onto the
    * the massage key so that it can reload the message from the store quickly when needed.
    */
-  class Flushed(override val message_key:Long, override val size:Int) extends EntryState {
+  class Swapped(override val message_key:Long, override val size:Int) extends EntryState {
 
-    queue.flushed_items += 1
+    queue.individual_swapped_items += 1
 
-    var loading = false
+    var swapping_in = false
 
 
     override def count = 1
 
-    override def as_flushed = this
+    override def as_swapped = this
 
-    override def is_flushed_or_flushing = true
+    override def is_swapped_or_swapping_out = true
 
     def label = {
-      var rc = "flushed"
-      if( loading ) {
-        rc += "|loading"
+      var rc = "swapped"
+      if( swapping_in ) {
+        rc += "|swapping in"
       }
       rc
     }
-    override def toString = { "flushed:{ loading: "+loading+", size:"+size+"}" }
+    override def toString = { "swapped:{ swapping_in: "+swapping_in+", size:"+size+"}" }
 
-    override def load() = {
-      if( !loading ) {
+    override def swap_in() = {
+      if( !swapping_in ) {
 //        trace("Start entry load of message seq: %s", seq)
-        // start loading it back...
-        loading = true
-        queue.loading_size += size
+        // start swapping in...
+        swapping_in = true
+        queue.swapping_in_size += size
         queue.host.store.load_message(message_key) { delivery =>
           // pass off to a source so it can aggregate multiple
           // loads to reduce cross thread synchronization
@@ -1136,19 +1156,24 @@ class QueueEntry(val queue:Queue, val se
       }
     }
 
-    def loaded(messageRecord:MessageRecord) = {
-      if( loading ) {
+    def swapped_in(messageRecord:MessageRecord) = {
+      if( swapping_in ) {
 //        debug("Loaded message seq: ", seq )
-        loading = false
-        queue.loading_size -= size
+        swapping_in = false
+        queue.swapping_in_size -= size
 
         val delivery = new Delivery()
         delivery.message = ProtocolFactory.get(messageRecord.protocol.toString).get.decode(messageRecord)
         delivery.size = messageRecord.size
         delivery.storeKey = messageRecord.key
 
-        queue.capacity_used += delivery.size
-        queue.flushed_items -= 1
+        queue.swapped_in_size += delivery.size
+        queue.swapped_in_items += 1
+
+        queue.swap_in_size_counter += size
+        queue.swap_in_item_counter += 1
+
+        queue.individual_swapped_items -= 1
         state = new Loaded(delivery, true)
       } else {
 //        debug("Ignoring store load of: ", messageKey)
@@ -1157,35 +1182,35 @@ class QueueEntry(val queue:Queue, val se
 
 
     override def remove = {
-      if( loading ) {
-        loading = false
-        queue.loading_size -= size
+      if( swapping_in ) {
+        swapping_in = false
+        queue.swapping_in_size -= size
       }
-      queue.flushed_items -= 1
+      queue.individual_swapped_items -= 1
       super.remove
     }
 
-    override def flush_range = {
-      if( loading ) {
-        loading = false
-        queue.loading_size -= size
+    override def swap_range = {
+      if( swapping_in ) {
+        swapping_in = false
+        queue.swapping_in_size -= size
       }
-      queue.flushed_items -= 1
-      state = new FlushedRange(seq, 1, size)
+      queue.individual_swapped_items -= 1
+      state = new SwappedRange(seq, 1, size)
     }
   }
 
   /**
-   * A FlushedRange stat is assigned entry is used to represent a rage of flushed entries.
+   * A SwappedRange state is assigned entry is used to represent a rage of swapped entries.
    *
-   * Even when entries that are Flushed can us a significant amount of memory if the queue is holding
-   * thousands of them.  Multiple entries in the Flushed state can be combined into a single entry in
-   * the FlushedRange state thereby conserving even more memory.  A FlushedRange entry only tracks
+   * Even entries that are Swapped can us a significant amount of memory if the queue is holding
+   * thousands of them.  Multiple entries in the swapped state can be combined into a single entry in
+   * the SwappedRange state thereby conserving even more memory.  A SwappedRange entry only tracks
    * the first, and last sequnce ids of the range.  When the entry needs to be loaded from the range
-   * it replaces the FlushedRange entry with all the Flushed entries by querying the store of all the
+   * it replaces the swapped range entry with all the swapped entries by querying the store of all the
    * message keys for the entries in the range.
    */
-  class FlushedRange(
+  class SwappedRange(
     /** the last seq id in the range */
     var last:Long,
     /** the number of items in the range */
@@ -1196,24 +1221,24 @@ class QueueEntry(val queue:Queue, val se
     override def count = _count
     override def size = _size
 
-    var loading = false
+    var swapping_in = false
 
-    override def as_flushed_range = this
+    override def as_swapped_range = this
 
-    override def is_flushed_or_flushing = true
+    override def is_swapped_or_swapping_out = true
 
     def label = {
-      var rc = "flushed_range"
-      if( loading ) {
-        rc = "flushed_range|loading"
+      var rc = "swapped_range"
+      if( swapping_in ) {
+        rc = "swapped_range|swapping in"
       }
       rc
     }
-    override def toString = { "flushed_range:{ loading: "+loading+", count: "+count+", size: "+size+"}" }
+    override def toString = { "swapped_range:{ swapping_in: "+swapping_in+", count: "+count+", size: "+size+"}" }
 
-    override def load() = {
-      if( !loading ) {
-        loading = true
+    override def swap_in() = {
+      if( !swapping_in ) {
+        swapping_in = true
         queue.host.store.list_queue_entries(queue.id, seq, last) { records =>
           if( !records.isEmpty ) {
             queue.dispatch_queue {
@@ -1265,17 +1290,17 @@ class QueueEntry(val queue:Queue, val se
     def combineNext():Unit = {
       val value = getNext
       assert(value!=null)
-      assert(value.is_flushed || value.is_flushed_range)
-      if( value.is_flushed ) {
+      assert(value.is_swapped || value.is_swapped_range)
+      if( value.is_swapped ) {
         assert(last < value.seq )
         last = value.seq
         _count += 1
         _size += value.size
         value.remove
-      } else if( value.is_flushed_range ) {
+      } else if( value.is_swapped_range ) {
         assert(last < value.seq )
-        last = value.as_flushed_range.last
-        _count += value.as_flushed_range.count
+        last = value.as_swapped_range.last
+        _count += value.as_swapped_range.count
         _size += value.size
         value.remove
       }
@@ -1486,6 +1511,7 @@ class Subscription(val queue:Queue, val 
 
       queue.dequeue_item_counter += 1
       queue.dequeue_size_counter += entry.size
+      queue.dequeue_ts = queue.last_maintenance_ts
 
       // removes this entry from the acquired list.
       unlink()
@@ -1513,6 +1539,7 @@ class Subscription(val queue:Queue, val 
       // track for stats
       queue.nack_item_counter += 1
       queue.nack_size_counter += entry.size
+      queue.nack_ts = queue.last_maintenance_ts
 
       // The following does not need to get done for exclusive subs because
       // they end up rewinding all the sub of the head of the queue.

Copied: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/AggregateQueueMetricsDTO.java (from r1054867, activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostStatusDTO.java)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/AggregateQueueMetricsDTO.java?p2=activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/AggregateQueueMetricsDTO.java&p1=activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostStatusDTO.java&r1=1054867&r2=1054868&rev=1054868&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostStatusDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/AggregateQueueMetricsDTO.java Tue Jan  4 02:24:53 2011
@@ -16,11 +16,9 @@
  */
 package org.apache.activemq.apollo.dto;
 
-import org.codehaus.jackson.annotate.JsonProperty;
+import org.codehaus.jackson.annotate.JsonTypeInfo;
 
 import javax.xml.bind.annotation.*;
-import java.util.ArrayList;
-import java.util.List;
 
 /**
  * <p>
@@ -28,27 +26,11 @@ import java.util.List;
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-@XmlRootElement(name="virtual_host_status")
+@XmlRootElement(name="aggregate_queue_metrics")
 @XmlAccessorType(XmlAccessType.FIELD)
-public class VirtualHostStatusDTO extends ServiceStatusDTO {
+public class AggregateQueueMetricsDTO extends QueueMetricsDTO {
 
-    /**
-     * The status of the store
-     */
-    @XmlElementRef
-    public StoreStatusDTO store;
+    @XmlAttribute(name="queues")
+    public int queues;
 
-    /**
-     * Ids of all the destinations running on the broker
-     */
-    @XmlElement(name="destination")
-    public List<LongIdLabeledDTO> destinations = new ArrayList<LongIdLabeledDTO>();
-
-
-    /**
-     * The current running configuration of the object
-     */
-    @XmlElement
-    public VirtualHostDTO config = null;
-
-}
+}
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/BrokerStatusDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/BrokerStatusDTO.java?rev=1054868&r1=1054867&r2=1054868&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/BrokerStatusDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/BrokerStatusDTO.java Tue Jan  4 02:24:53 2011
@@ -60,4 +60,6 @@ public class BrokerStatusDTO extends Ser
     @XmlElement
     public BrokerDTO config = null;
 
+    @XmlElement
+    public AggregateQueueMetricsDTO aggregate_queue_metrics = new AggregateQueueMetricsDTO();
 }

Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java?rev=1054868&r1=1054867&r2=1054868&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java Tue Jan  4 02:24:53 2011
@@ -87,13 +87,13 @@ public class QueueDTO {
     public Boolean swap;
 
     /**
-     * The number max number of flushed queue entries to load
-     * for the store at a time.  Not that Flushed entires are just
+     * The number max number of swapped queue entries to load
+     * from the store at a time.  Not that swapped entries are just
      * reference pointers to the actual messages.  When not loaded,
      * the batch is referenced as sequence range to conserve memory.
      */
-    @XmlAttribute(name="flush_range_size")
-    public Integer flush_range_size;
+    @XmlAttribute(name="swap_range_size")
+    public Integer swap_range_size;
 
     @XmlElement(name="acl")
     public QueueAclDTO acl;

Added: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueMetricsDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueMetricsDTO.java?rev=1054868&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueMetricsDTO.java (added)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueMetricsDTO.java Tue Jan  4 02:24:53 2011
@@ -0,0 +1,77 @@
+package org.apache.activemq.apollo.dto;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlRootElement;
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+@XmlRootElement(name="queue_metrics")
+@XmlAccessorType(XmlAccessType.FIELD)
+public class QueueMetricsDTO {
+
+    @XmlAttribute(name="enqueue_item_counter")
+    public long enqueue_item_counter;
+
+    @XmlAttribute(name="enqueue_size_counter")
+    public long enqueue_size_counter;
+
+    @XmlAttribute(name="enqueue_ts")
+    public long enqueue_ts;
+
+    @XmlAttribute(name="dequeue_item_counter")
+    public long dequeue_item_counter;
+
+    @XmlAttribute(name="dequeue_size_counter")
+    public long dequeue_size_counter;
+
+    @XmlAttribute(name="dequeue_ts")
+    public long dequeue_ts;
+
+    @XmlAttribute(name="nack_item_counter")
+    public long nack_item_counter;
+
+    @XmlAttribute(name="nack_size_counter")
+    public long nack_size_counter;
+
+    @XmlAttribute(name="nack_ts")
+    public long nack_ts;
+
+    @XmlAttribute(name="queue_size")
+    public long queue_size;
+
+    @XmlAttribute(name="queue_items")
+    public long queue_items;
+
+    @XmlAttribute(name="swapped_in_size")
+    public int swapped_in_size;
+
+    @XmlAttribute(name="swapped_in_items")
+    public int swapped_in_items;
+
+    @XmlAttribute(name="swapping_in_size")
+    public int swapping_in_size;
+
+    @XmlAttribute(name="swapping_out_size")
+    public int swapping_out_size;
+
+    @XmlAttribute(name="swapped_in_size_max")
+    public int swapped_in_size_max;
+
+    @XmlAttribute(name="swap_out_item_counter")
+    public long swap_out_item_counter;
+
+    @XmlAttribute(name="swap_out_size_counter")
+    public long swap_out_size_counter;
+
+    @XmlAttribute(name="swap_in_item_counter")
+    public long swap_in_item_counter;
+
+    @XmlAttribute(name="swap_in_size_counter")
+    public long swap_in_size_counter;
+}

Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java?rev=1054868&r1=1054867&r2=1054868&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java Tue Jan  4 02:24:53 2011
@@ -38,44 +38,8 @@ public class QueueStatusDTO extends Long
     @XmlElement
     public BindingDTO binding;
 
-    @XmlAttribute(name="enqueue_item_counter")
-    public long enqueue_item_counter;
-
-    @XmlAttribute(name="dequeue_item_counter")
-    public long dequeue_item_counter;
-
-    @XmlAttribute(name="enqueue_size_counter")
-    public long enqueue_size_counter;
-
-    @XmlAttribute(name="dequeue_size_counter")
-    public long dequeue_size_counter;
-
-    @XmlAttribute(name="nack_item_counter")
-    public long nack_item_counter;
-
-    @XmlAttribute(name="nack_size_counter")
-    public long nack_size_counter;
-
-    @XmlAttribute(name="queue_size")
-    public long queue_size;
-
-    @XmlAttribute(name="queue_items")
-    public long queue_items;
-
-    @XmlAttribute(name="loading_size")
-    public int loading_size;
-
-    @XmlAttribute(name="flushing_size")
-    public int flushing_size;
-
-    @XmlAttribute(name="flushed_items")
-    public int flushed_items;
-
-    @XmlAttribute(name="capacity_used")
-    public int capacity_used;
-
-    @XmlAttribute
-    public int capacity;
+    @XmlElement
+    public QueueMetricsDTO metrics = new QueueMetricsDTO();
 
     /**
      * Status of the entries in the queue

Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostStatusDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostStatusDTO.java?rev=1054868&r1=1054867&r2=1054868&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostStatusDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostStatusDTO.java Tue Jan  4 02:24:53 2011
@@ -51,4 +51,7 @@ public class VirtualHostStatusDTO extend
     @XmlElement
     public VirtualHostDTO config = null;
 
+    @XmlElement
+    public AggregateQueueMetricsDTO aggregate_queue_metrics = new AggregateQueueMetricsDTO();
+
 }

Modified: activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala?rev=1054868&r1=1054867&r2=1054868&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala Tue Jan  4 02:24:53 2011
@@ -16,17 +16,19 @@
  */
 package org.apache.activemq.apollo.web.resources;
 
+import javax.ws.rs.Path
 import javax.ws.rs._
 import core.Response
 import Response.Status._
 import java.util.List
 import org.apache.activemq.apollo.dto._
 import java.{lang => jl}
-import collection.JavaConversions
 import org.fusesource.hawtdispatch._
 import org.apache.activemq.apollo.broker._
 import collection.mutable.ListBuffer
 import scala.util.continuations._
+import java.util.concurrent.{ConcurrentLinkedQueue, CountDownLatch}
+import scala.collection.{Iterable, JavaConversions}
 
 /**
  * <p>
@@ -51,17 +53,22 @@ case class RuntimeResource(parent:Broker
       System.exit(0)
     }
   }
+
+  def concurrent_map[T,R](values:Iterable[T])(dqf:(T)=>DispatchQueue)(func:T=>R) = {
+    Future.all( values.map { t=>
+      dqf(t).future { func(t) }
+    })
+  }
+
   private def with_broker[T](func: (org.apache.activemq.apollo.broker.Broker, Option[T]=>Unit)=>Unit):T = {
     BrokerRegistry.list.headOption match {
       case None=> result(NOT_FOUND)
       case Some(broker)=>
-
-        Future[Option[T]] { cb=>
-          broker.dispatch_queue {
-            func(broker, cb)
-          }
-        }.getOrElse(result(NOT_FOUND))
-
+        val f = Future[Option[T]]()
+        broker.dispatch_queue {
+          func(broker, f)
+        }
+        f().getOrElse(result(NOT_FOUND))
     }
   }
 
@@ -79,8 +86,8 @@ case class RuntimeResource(parent:Broker
 
 
   @GET
-  def get() = {
-    with_broker[BrokerStatusDTO] { case (broker, cb) =>
+  def get_broker():BrokerStatusDTO = {
+    with_broker { case (broker, cb) =>
       val result = new BrokerStatusDTO
 
       result.id = broker.id
@@ -105,7 +112,10 @@ case class RuntimeResource(parent:Broker
         }
       }
 
-      cb(Some(result))
+      get_queue_metrics(broker).onComplete{ metrics=>
+        result.aggregate_queue_metrics = metrics
+        cb(Some(result))
+      }
     }
   }
 
@@ -113,10 +123,68 @@ case class RuntimeResource(parent:Broker
   @GET @Path("virtual-hosts")
   def virtualHosts = {
     val rc = new LongIdListDTO
-    rc.items.addAll(get.virtual_hosts)
+    rc.items.addAll(get_broker.virtual_hosts)
     rc
   }
 
+  def aggregate_queue_metrics(queue_metrics:Iterable[QueueMetricsDTO]):AggregateQueueMetricsDTO = {
+    queue_metrics.foldLeft(new AggregateQueueMetricsDTO){ (rc, q)=>
+      rc.enqueue_item_counter += q.enqueue_item_counter
+      rc.enqueue_size_counter += q.enqueue_size_counter
+      rc.enqueue_ts = rc.enqueue_ts max q.enqueue_ts
+
+      rc.dequeue_item_counter += q.dequeue_item_counter
+      rc.dequeue_size_counter += q.dequeue_size_counter
+      rc.dequeue_ts += rc.dequeue_ts max q.dequeue_ts
+
+      rc.nack_item_counter += q.nack_item_counter
+      rc.nack_size_counter += q.nack_size_counter
+      rc.nack_ts = rc.nack_ts max q.nack_ts
+
+      rc.queue_size += q.queue_size
+      rc.queue_items += q.queue_items
+
+      rc.swap_out_item_counter += q.swap_out_item_counter
+      rc.swap_out_size_counter += q.swap_out_size_counter
+      rc.swap_in_item_counter += q.swap_in_item_counter
+      rc.swap_in_size_counter += q.swap_in_size_counter
+
+      rc.swapping_in_size += q.swapping_in_size
+      rc.swapping_out_size += q.swapping_out_size
+
+      rc.swapped_in_items += q.swapped_in_items
+      rc.swapped_in_size += q.swapped_in_size
+
+      rc.swapped_in_size_max += q.swapped_in_size_max
+
+      if( q.isInstanceOf[AggregateQueueMetricsDTO] ) {
+        rc.queues += q.asInstanceOf[AggregateQueueMetricsDTO].queues
+      } else {
+        rc.queues += 1
+      }
+      rc
+    }
+  }
+
+  def get_queue_metrics(broker:Broker):Future[AggregateQueueMetricsDTO] = {
+    val metrics = Future.all {
+      broker.virtual_hosts.values.map { host=>
+        host.dispatch_queue.flatFuture{ get_queue_metrics(host) }
+      }
+    }
+    metrics.map( x=> aggregate_queue_metrics(x) )
+  }
+
+  def get_queue_metrics(virtualHost:VirtualHost):Future[AggregateQueueMetricsDTO] = {
+    val metrics = Future.all{
+      virtualHost.router.queues.values.map { queue=>
+        queue.dispatch_queue.future { get_queue_metrics(queue) }
+      }
+    }
+    metrics.map( x=> aggregate_queue_metrics(x) )
+  }
+
+
   @GET @Path("virtual-hosts/{id}")
   def virtualHost(@PathParam("id") id : Long):VirtualHostStatusDTO = {
     with_virtual_host(id) { case (virtualHost,cb) =>
@@ -130,14 +198,20 @@ case class RuntimeResource(parent:Broker
         result.destinations.add(new LongIdLabeledDTO(node.id, node.name.toString))
       }
 
-      if( virtualHost.store != null ) {
-        virtualHost.store.get_store_status { x=>
-          result.store = x
+      get_queue_metrics(virtualHost).onComplete { metrics=>
+
+        result.aggregate_queue_metrics = metrics
+
+        if( virtualHost.store != null ) {
+          virtualHost.store.get_store_status { x=>
+            result.store = x
+            cb(Some(result))
+          }
+        } else {
           cb(Some(result))
         }
-      } else {
-        cb(Some(result))
       }
+
     }
   }
 
@@ -215,6 +289,40 @@ case class RuntimeResource(parent:Broker
     }
   }
 
+  def get_queue_metrics(q:Queue):QueueMetricsDTO = {
+    val rc = new QueueMetricsDTO
+
+    rc.enqueue_item_counter = q.enqueue_item_counter
+    rc.enqueue_size_counter = q.enqueue_size_counter
+    rc.enqueue_ts = q.enqueue_ts
+
+    rc.dequeue_item_counter = q.dequeue_item_counter
+    rc.dequeue_size_counter = q.dequeue_size_counter
+    rc.dequeue_ts = q.dequeue_ts
+
+    rc.nack_item_counter = q.nack_item_counter
+    rc.nack_size_counter = q.nack_size_counter
+    rc.nack_ts = q.nack_ts
+
+    rc.queue_size = q.queue_size
+    rc.queue_items = q.queue_items
+
+    rc.swap_out_item_counter = q.swap_out_item_counter
+    rc.swap_out_size_counter = q.swap_out_size_counter
+    rc.swap_in_item_counter = q.swap_in_item_counter
+    rc.swap_in_size_counter = q.swap_in_size_counter
+
+    rc.swapping_in_size = q.swapping_in_size
+    rc.swapping_out_size = q.swapping_out_size
+
+    rc.swapped_in_items = q.swapped_in_items
+    rc.swapped_in_size = q.swapped_in_size
+
+    rc.swapped_in_size_max = q.swapped_in_size_max
+
+    rc
+  }
+
   def status(qo:Option[Queue], entries:Boolean=false, cb:Option[QueueStatusDTO]=>Unit):Unit = if(qo==None) {
     cb(None)
   } else {
@@ -223,23 +331,8 @@ case class RuntimeResource(parent:Broker
       val rc = new QueueStatusDTO
       rc.id = q.id
       rc.binding = q.binding.binding_dto
-      rc.capacity_used = q.capacity_used
-      rc.capacity = q.capacity
       rc.config = q.config
-
-      rc.enqueue_item_counter = q.enqueue_item_counter
-      rc.dequeue_item_counter = q.dequeue_item_counter
-      rc.enqueue_size_counter = q.enqueue_size_counter
-      rc.dequeue_size_counter = q.dequeue_size_counter
-      rc.nack_item_counter = q.nack_item_counter
-      rc.nack_size_counter = q.nack_size_counter
-
-      rc.queue_size = q.queue_size
-      rc.queue_items = q.queue_items
-
-      rc.loading_size = q.loading_size
-      rc.flushing_size = q.flushing_size
-      rc.flushed_items = q.flushed_items
+      rc.metrics = get_queue_metrics(q)
 
       if( entries ) {
         var cur = q.head_entry
@@ -296,7 +389,7 @@ case class RuntimeResource(parent:Broker
   @GET @Path("connectors")
   def connectors = {
     val rc = new LongIdListDTO
-    rc.items.addAll(get.connectors)
+    rc.items.addAll(get_broker.connectors)
     rc
   }
 

Modified: activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade?rev=1054868&r1=1054867&r2=1054868&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade (original)
+++ activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade Tue Jan  4 02:24:53 2011
@@ -16,6 +16,11 @@
 - import it._
 - val helper = new org.apache.activemq.apollo.web.resources.ViewHelper
 - import helper._
+- def percent(n:Long, d:Long) =
+  - if( d==0 )
+    - "0.00 %"
+  - else
+    - "%,.2f %%".format(n.toFloat*100.0/d)
 
 .breadcumbs
   a(href={strip_resolve("..")}) Back
@@ -38,26 +43,30 @@
 
 h2 Current Size
 
-p queue size: #{queue_items} messages
-p queue size: #{memory(queue_size)}
-- if( capacity > 0 )
-  p memory used: #{ "%,.2f".format(capacity_used.toFloat*100.0/capacity) }% (#{memory(capacity_used)}/#{memory(capacity)})
-- else
-  p memory used: #{ "%,.2f".format(0f) }% (#{memory(capacity_used)}/#{memory(capacity)})
+p queue size: #{metrics.queue_items} messages #{memory(metrics.queue_size)}
+p memory used: #{percent(metrics.swapped_in_size, metrics.swapped_in_size_max)} (#{memory(metrics.swapped_in_size)}/#{memory(metrics.swapped_in_size_max)})
+
+h3 Enqueue/Deqeueue Counters
+
+p enqueued: #{metrics.enqueue_item_counter} messages (#{memory(metrics.enqueue_size_counter)}), #{uptime(metrics.enqueue_ts)} ago
 
-h2 Enqueue/Deqeueue Counters
+p dequeued: #{metrics.dequeue_item_counter} messages (#{memory(metrics.dequeue_size_counter)}), #{uptime(metrics.dequeue_ts)} ago
 
-p enqueued: #{enqueue_item_counter} messages (#{memory(enqueue_size_counter)})
+p nacked: #{metrics.nack_item_counter} messages (#{memory(metrics.nack_size_counter)}), #{uptime(metrics.nack_ts)} ago
 
-p dequeued: #{dequeue_item_counter} messages (#{memory(dequeue_size_counter)})
+h2 Swap Metrics
 
-p nacked: #{nack_item_counter} messages (#{memory(nack_size_counter)})
+p swapped in: #{metrics.swapped_in_items} messages #{memory(metrics.swapped_in_size)}
+- val swapped_out_items = metrics.queue_items - metrics.swapped_in_items
+- val swapped_out_size = metrics.queue_size - metrics.swapped_in_size
+p swapped out: #{swapped_out_items} messages #{memory(swapped_out_size)}
+p percent swapped out: #{percent(swapped_out_items, metrics.queue_items)} of the messages
 
-h2 Swap Status
+p swapping out: #{memory(metrics.swapping_out_size)}
+p swapping in: #{memory(metrics.swapping_in_size)}
 
-p loading from the store: #{memory(loading_size)}
-p flushing out of memory: #{memory(flushing_size)}
-p holding : #{flushed_items} flushed message references
+p total swap outs : #{metrics.swap_out_item_counter} messages (#{memory(metrics.swap_out_size_counter)})
+p total swap ins : #{metrics.swap_in_item_counter} messages (#{memory(metrics.swap_in_size_counter)})
 
 h3 Producers
 ul