You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/08/26 04:49:46 UTC

svn commit: r1161969 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-dto/src/main/java/org/apache/activemq/apollo/dto/ apollo-dto/src/main/resources/org/apache/activemq/apollo/dto/ apollo-ope...

Author: chirino
Date: Fri Aug 26 02:49:45 2011
New Revision: 1161969

URL: http://svn.apache.org/viewvc?rev=1161969&view=rev
Log:
Fixes https://issues.apache.org/jira/browse/APLO-78, https://issues.apache.org/jira/browse/APLO-80, and https://issues.apache.org/jira/browse/APLO-81 : Improvements and new features for the REST API

Topics now provide messaging metrics similar to the what was provided by queues.  You now also have aggregate statistics are now accessible via the following routes:
/queue-metrics
/topic-metrics
/dsub-metrics
/dest-metrics

dest-metrics aggregates the queue, topic, and dsub metrics.  queue-metrics no longer includes metrics from dsubs since they can be access via /dsub-mtrics.  The JSON structure of the queue and topic json objects have slightly changes to make them more consistent with each other and also provide messaging metrics per producer/consumer.

Added:
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/AggregateTopicMetricsDTO.java
      - copied, changed from r1161956, activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/AggregateQueueMetricsDTO.java
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationMetricsDTO.java
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueConsumerLinkDTO.java
      - copied, changed from r1161956, activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueConsumerStatusDTO.java
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TopicMetricsDTO.java
      - copied, changed from r1161956, activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/AggregateQueueMetricsDTO.java
Removed:
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueConsumerStatusDTO.java
Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/AggregateQueueMetricsDTO.java
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/LinkDTO.java
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueMetricsDTO.java
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TopicStatusDTO.java
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostStatusDTO.java
    activemq/activemq-apollo/trunk/apollo-dto/src/main/resources/org/apache/activemq/apollo/dto/jaxb.index
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala
    activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade
    activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/TopicStatusDTO.jade

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala?rev=1161969&r1=1161968&r2=1161969&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala Fri Aug 26 02:49:45 2011
@@ -34,6 +34,8 @@ import management.ManagementFactory
 import org.apache.activemq.apollo.dto._
 import javax.management.ObjectName
 import org.fusesource.hawtdispatch.TaskTracker._
+import java.util.concurrent.TimeUnit
+import collection.mutable.ListBuffer._
 
 /**
  * <p>
@@ -250,6 +252,9 @@ class Broker() extends BaseService {
 
   var web_server:WebServer = _
 
+  @volatile
+  var now = System.currentTimeMillis()
+
   var config_log:Log = Log(new MemoryLogger(Broker.log))
   var audit_log:Log = Broker
   var security_log:Log  = Broker
@@ -292,7 +297,9 @@ class Broker() extends BaseService {
     log_versions
     check_file_limit
     init_dispatch_queue(dispatch_queue)
+
     BrokerRegistry.add(this)
+    schedule_periodic_maintenance
 
     val tracker = new LoggingTracker("broker startup", console_log, SERVICE_TIMEOUT)
     apply_update(tracker)
@@ -333,6 +340,14 @@ class Broker() extends BaseService {
 
     BrokerRegistry.remove(this)
     tracker.callback(on_completed)
+
+  }
+
+  def schedule_periodic_maintenance:Unit = dispatch_queue.after(1, TimeUnit.SECONDS) {
+    if( service_state.is_started ) {
+      now = System.currentTimeMillis
+      schedule_periodic_maintenance
+    }
   }
 
   protected def init_logs = {

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=1161969&r1=1161968&r2=1161969&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala Fri Aug 26 02:49:45 2011
@@ -76,6 +76,14 @@ trait DeliveryConsumer extends Retained 
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
 trait DeliverySession extends Sink[Delivery] {
+  /**
+   * The number of deliveries accepted by this session.
+   */
+  def enqueue_item_counter:Long
+  /**
+   * The total size of the deliveries accepted by this session.
+   */
+  def enqueue_size_counter:Long
   def producer:DeliveryProducer
   def consumer:DeliveryConsumer
   def remaining_capacity:Int

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala?rev=1161969&r1=1161968&r2=1161969&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala Fri Aug 26 02:49:45 2011
@@ -957,8 +957,8 @@ class LocalRouter(val virtual_host:Virtu
     // For the topics, just collocate the producers onto the first consumer's thread.
     topic_domain.destinations.foreach { node =>
 
-      node.consumers.headOption.foreach{ consumer =>
-        node.producers.foreach { r=>
+      node.consumers.keys.headOption.foreach{ consumer =>
+        node.producers.keys.foreach { r=>
           r.collocate(consumer.dispatch_queue)
         }
       }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1161969&r1=1161968&r2=1161969&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Fri Aug 26 02:49:45 2011
@@ -27,10 +27,10 @@ import org.apache.activemq.apollo.util.l
 import org.fusesource.hawtdispatch.{ListEventAggregator, DispatchQueue, BaseRetained}
 import OptionSupport._
 import security.SecurityContext
-import org.apache.activemq.apollo.dto.{DestinationDTO, QueueDTO}
 import java.util.concurrent.atomic.{AtomicReference, AtomicLong, AtomicInteger}
 import org.fusesource.hawtbuf.Buffer
 import java.lang.UnsupportedOperationException
+import org.apache.activemq.apollo.dto._
 
 object Queue extends Log {
   val subcsription_counter = new AtomicInteger(0)
@@ -206,6 +206,129 @@ class Queue(val router: LocalRouter, val
   var auto_delete_after = 0
   var idled_at = 0L
 
+  def get_queue_metrics:QueueMetricsDTO = {
+    val rc = new QueueMetricsDTO
+
+    rc.enqueue_item_counter = this.enqueue_item_counter
+    rc.enqueue_size_counter = this.enqueue_size_counter
+    rc.enqueue_ts = this.enqueue_ts
+
+    rc.dequeue_item_counter = this.dequeue_item_counter
+    rc.dequeue_size_counter = this.dequeue_size_counter
+    rc.dequeue_ts = this.dequeue_ts
+
+    rc.nack_item_counter = this.nack_item_counter
+    rc.nack_size_counter = this.nack_size_counter
+    rc.nack_ts = this.nack_ts
+
+    rc.expired_item_counter = this.expired_item_counter
+    rc.expired_size_counter = this.expired_size_counter
+    rc.expired_ts = this.expired_ts
+
+    rc.queue_size = this.queue_size
+    rc.queue_items = this.queue_items
+
+    rc.swap_out_item_counter = this.swap_out_item_counter
+    rc.swap_out_size_counter = this.swap_out_size_counter
+    rc.swap_in_item_counter = this.swap_in_item_counter
+    rc.swap_in_size_counter = this.swap_in_size_counter
+
+    rc.swapping_in_size = this.swapping_in_size
+    rc.swapping_out_size = this.swapping_out_size
+
+    rc.swapped_in_items = this.swapped_in_items
+    rc.swapped_in_size = this.swapped_in_size
+
+    rc.swapped_in_size_max = this.swapped_in_size_max
+
+    rc.producer_counter = this.producer_counter
+    rc.consumer_counter = this.consumer_counter
+
+    rc.producer_count = this.producers.size
+    rc.consumer_count = this.all_subscriptions.size
+    rc
+  }
+
+  def status(entries:Boolean=false) = {
+    val rc = new QueueStatusDTO
+    rc.id = this.id
+    rc.state = this.service_state.toString
+    rc.state_since = this.service_state.since
+    rc.binding = this.binding.binding_dto
+    rc.config = this.config
+    rc.metrics = this.get_queue_metrics
+    rc.metrics.current_time = System.currentTimeMillis()
+
+    if( entries ) {
+      var cur = this.head_entry
+      while( cur!=null ) {
+
+        val e = new EntryStatusDTO
+        e.seq = cur.seq
+        e.count = cur.count
+        e.size = cur.size
+        e.consumer_count = cur.parked.size
+        e.is_prefetched = cur.is_prefetched
+        e.state = cur.label
+
+        rc.entries.add(e)
+
+        cur = if( cur == this.tail_entry ) {
+          null
+        } else {
+          cur.nextOrTail
+        }
+      }
+    }
+
+    this.inbound_sessions.foreach { session:DeliverySession =>
+      val link = new LinkDTO()
+      session.producer.connection match {
+        case Some(connection) =>
+          link.kind = "connection"
+          link.id = connection.id.toString
+          link.label = connection.transport.getRemoteAddress.toString
+        case _ =>
+          link.kind = "unknown"
+          link.label = "unknown"
+      }
+      link.enqueue_item_counter = session.enqueue_item_counter
+      link.enqueue_size_counter = session.enqueue_size_counter
+      rc.producers.add(link)
+    }
+
+    this.all_subscriptions.valuesIterator.toSeq.foreach{ sub =>
+      val link = new QueueConsumerLinkDTO
+      sub.consumer.connection match {
+        case Some(connection) =>
+          link.kind = "connection"
+          link.id = connection.id.toString
+          link.label = connection.transport.getRemoteAddress.toString
+        case _ =>
+          link.kind = "unknown"
+          link.label = "unknown"
+      }
+      link.position = sub.pos.seq
+      link.enqueue_item_counter = sub.total_dispatched_count
+      link.enqueue_size_counter = sub.total_dispatched_size
+      link.total_ack_count = sub.total_ack_count
+      link.total_nack_count = sub.total_nack_count
+      link.acquired_size = sub.acquired_size
+      link.acquired_count = sub.acquired_count
+      link.waiting_on = if( sub.full ) {
+        "ack"
+      } else if( sub.pos.is_tail ) {
+        "producer"
+      } else if( !sub.pos.is_loaded ) {
+        "load"
+      } else {
+        "dispatch"
+      }
+      rc.consumers.add(link)
+    }
+    rc
+  }
+
   def update(on_completed:Runnable) = dispatch_queue {
 
     val prev_persistent = tune_persistent
@@ -630,6 +753,8 @@ class Queue(val router: LocalRouter, val
     }
 
     def remaining_capacity = session.remaining_capacity
+    def enqueue_item_counter = session.accepted_count
+    def enqueue_size_counter = session.accepted_size
 
     def close = {
       session_manager.close(session)

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=1161969&r1=1161968&r2=1161969&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala Fri Aug 26 02:49:45 2011
@@ -91,13 +91,15 @@ abstract class DeliveryProducerRoute(rou
     dispatch_queue {
       consumers.foreach{ x=>
         debug("producer route attaching to conusmer.")
-        val target = x.connect(this);
+        val target = connect(x);
         target.refiller = drainer
         targets ::= target
       }
     }
   }
 
+  def connect(x:DeliveryConsumer) = x.connect(this)
+
   def unbind(targets:List[DeliveryConsumer]) = dispatch_queue {
     this.targets = this.targets.filterNot { x=>
       val rc = targets.contains(x.consumer)
@@ -181,6 +183,7 @@ abstract class DeliveryProducerRoute(rou
     }
   }
 
+
   private def delivered(delivery: Delivery): Unit = {
     if (pendingAck != null) {
       if (delivery.uow != null) {

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala?rev=1161969&r1=1161968&r2=1161969&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala Fri Aug 26 02:49:45 2011
@@ -21,6 +21,7 @@ import org.fusesource.hawtdispatch._
 import java.util.LinkedList
 import org.apache.activemq.apollo.transport.Transport
 import collection.mutable.HashSet
+import java.util.concurrent.atomic.AtomicLong
 
 /**
  * <p>
@@ -229,6 +230,8 @@ class CreditWindowFilter[T](val downstre
 }
 
 trait SessionSink[T] extends Sink[T] {
+  def accepted_count:Long
+  def accepted_size:Long
   def remaining_capacity:Int
 }
 
@@ -322,6 +325,11 @@ class Session[T](val producer_queue:Disp
   private def sizer = mux.sizer
   private def downstream = mux.source
 
+  @volatile
+  var accepted_count = 0L
+  @volatile
+  var accepted_size = 0L
+
   // create a source to coalesce credit events back to the producer side...
   val credit_adder = createSource(EventAggregators.INTEGER_ADD , producer_queue)
   credit_adder.onEvent{
@@ -361,7 +369,12 @@ class Session[T](val producer_queue:Disp
     if( _full || closed ) {
       false
     } else {
-      add_credits(-sizer.size(value))
+      val size = sizer.size(value)
+
+      accepted_count += 1
+      accepted_size += size
+
+      add_credits(-size)
       downstream.merge((this, value))
       true
     }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala?rev=1161969&r1=1161968&r2=1161969&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala Fri Aug 26 02:49:45 2011
@@ -20,9 +20,10 @@ import org.apache.activemq.apollo.util._
 import path.Path
 import scala.collection.immutable.List
 import org.apache.activemq.apollo.dto._
-import collection.mutable.{HashMap, ListBuffer}
 import java.util.concurrent.TimeUnit
 import org.fusesource.hawtdispatch._
+import collection.mutable.{HashSet, HashMap, ListBuffer}
+import java.lang.Long
 
 /**
  * <p>
@@ -33,8 +34,82 @@ import org.fusesource.hawtdispatch._
  */
 class Topic(val router:LocalRouter, val destination_dto:TopicDestinationDTO, var config_updater: ()=>TopicDTO, val id:String, path:Path) extends DomainDestination {
 
-  var producers = ListBuffer[BindableDeliveryProducer]()
-  var consumers = ListBuffer[DeliveryConsumer]()
+  var enqueue_item_counter = 0L
+  var enqueue_size_counter = 0L
+  var enqueue_ts = System.currentTimeMillis()
+
+  var dequeue_item_counter = 0L
+  var dequeue_size_counter = 0L
+  var dequeue_ts = System.currentTimeMillis()
+
+  var proxy_sessions = new HashSet[ProxyDeliverySession]()
+
+  implicit def from_link(from:LinkDTO):(Long,Long,Long)=(from.enqueue_item_counter, from.enqueue_size_counter, from.enqueue_ts)
+  implicit def from_session(from:ProxyDeliverySession):(Long,Long,Long)=(from.enqueue_item_counter, from.enqueue_size_counter, from.enqueue_ts)
+
+  def add_counters(to:LinkDTO, from:(Long,Long,Long)):Unit = {
+    to.enqueue_item_counter += from._1
+    to.enqueue_size_counter += from._2
+    to.enqueue_ts = to.enqueue_ts max from._2
+  }
+  def add_counters(to:Topic, from:(Long,Long,Long)):Unit = {
+    to.enqueue_item_counter += from._1
+    to.enqueue_size_counter += from._2
+    to.enqueue_ts = to.enqueue_ts max from._2
+  }
+
+  case class ProxyDeliverySession(session:DeliverySession) extends DeliverySession with SinkFilter[Delivery] {
+    dispatch_queue {
+      proxy_sessions.add(this)
+    }
+
+    def close = {
+      session.close
+      dispatch_queue {
+        proxy_sessions.remove(this)
+        consumers.get(session.consumer) match {
+          case Some(proxy) => add_counters(proxy.link, this)
+          case _          => add_counters(Topic.this, this)
+        }
+        producers.get(session.producer.asInstanceOf[BindableDeliveryProducer]) match {
+          case Some(link) => add_counters(link, this)
+          case _          => add_counters(Topic.this, this)
+        }
+      }
+    }
+
+    var enqueue_ts = now
+    def offer(value: Delivery) = {
+      if( session.offer(value) ) {
+        enqueue_ts = now
+        true
+      } else {
+        false
+      }
+    }
+
+    def downstream = session
+    def remaining_capacity = session.remaining_capacity
+    def producer = session.producer
+    def enqueue_size_counter = session.enqueue_size_counter
+    def enqueue_item_counter = session.enqueue_item_counter
+    def consumer = session.consumer
+  }
+
+  case class ProxyDeliveryConsumer(consumer:DeliveryConsumer, link:LinkDTO) extends DeliveryConsumer {
+    def retained() = consumer.retained()
+    def retain() = consumer.retain()
+    def release() = consumer.release()
+    def matches(message: Delivery) = consumer.matches(message)
+    def is_persistent = consumer.is_persistent
+    def dispatch_queue = consumer.dispatch_queue
+    def connect(producer: DeliveryProducer) = {
+      new ProxyDeliverySession(consumer.connect(producer))
+    }
+  }
+
+  val producers = HashMap[BindableDeliveryProducer, LinkDTO]()
+  val consumers = HashMap[DeliveryConsumer, ProxyDeliveryConsumer]()
   var durable_subscriptions = ListBuffer[Queue]()
   var consumer_queues = HashMap[DeliveryConsumer, Queue]()
   var idled_at = 0L
@@ -52,9 +127,87 @@ class Topic(val router:LocalRouter, val 
   override def toString = destination_dto.toString
 
   def virtual_host: VirtualHost = router.virtual_host
+  def now = virtual_host.broker.now
+  def dispatch_queue = virtual_host.dispatch_queue
 
   def slow_consumer_policy = config.slow_consumer_policy.getOrElse("block")
 
+  def status:TopicStatusDTO = {
+    dispatch_queue.assertExecuting()
+
+    val rc = new TopicStatusDTO
+    rc.id = this.id
+    rc.state = "STARTED"
+    rc.state_since = this.created_at
+    rc.config = this.config
+
+    rc.metrics.producer_counter = producer_counter
+    rc.metrics.consumer_counter = consumer_counter
+    rc.metrics.producer_count = producers.size
+    rc.metrics.consumer_counter = consumers.size
+
+    this.durable_subscriptions.foreach { q =>
+      rc.dsubs.add(q.id)
+    }
+
+    def copy(o:LinkDTO) = {
+      val rc = new LinkDTO()
+      rc.id = o.id
+      rc.kind = o.kind
+      rc.label = o.label
+      rc.enqueue_ts = o.enqueue_ts
+      add_counters(rc, o);
+      rc
+    }
+
+    // build the list of producer and consumer links..
+    val producer_links = HashMap[BindableDeliveryProducer, LinkDTO]()
+    val consumers_links = HashMap[DeliveryConsumer, LinkDTO]()
+    this.producers.foreach { case (producer, link) =>
+      val o = copy(link)
+      producer_links.put(producer, o)
+      rc.producers.add(o)
+    }
+    this.consumers.foreach { case (consumer, proxy) =>
+      val o = copy(proxy.link)
+      consumers_links.put(consumer, o)
+      rc.consumers.add(o)
+    }
+
+    // Add in the counters from the live sessions..
+    proxy_sessions.foreach{ session =>
+      val stats = from_session(session)
+      for( link <- producer_links.get(session.producer.asInstanceOf[BindableDeliveryProducer]) ) {
+        add_counters(link, stats)
+      }
+      for( link <- consumers_links.get(session.consumer) ) {
+        add_counters(link, stats)
+      }
+    }
+
+    // Now update the topic counters..
+    rc.metrics.enqueue_item_counter = enqueue_item_counter
+    rc.metrics.enqueue_size_counter = enqueue_size_counter
+    rc.metrics.enqueue_ts = enqueue_ts
+
+    rc.metrics.dequeue_item_counter = dequeue_item_counter
+    rc.metrics.dequeue_size_counter = dequeue_size_counter
+    rc.metrics.dequeue_ts = dequeue_ts
+    consumers_links.values.foreach { link =>
+      rc.metrics.enqueue_item_counter += link.enqueue_item_counter
+      rc.metrics.enqueue_size_counter += link.enqueue_size_counter
+      rc.metrics.enqueue_ts = rc.metrics.enqueue_ts max link.enqueue_ts
+    }
+    producer_links.values.foreach { link =>
+      rc.metrics.dequeue_item_counter += link.enqueue_item_counter
+      rc.metrics.dequeue_size_counter += link.enqueue_size_counter
+      rc.metrics.dequeue_ts = rc.metrics.dequeue_ts max link.enqueue_ts
+    }
+
+    rc
+  }
+
+
   def update(on_completed:Runnable) = {
     refresh_config
     on_completed.run
@@ -80,7 +233,7 @@ class Topic(val router:LocalRouter, val 
         val now = System.currentTimeMillis()
         idled_at = now
         if( auto_delete_after!=0 ) {
-          virtual_host.dispatch_queue.after(auto_delete_after, TimeUnit.SECONDS) {
+          dispatch_queue.after(auto_delete_after, TimeUnit.SECONDS) {
             if( now == idled_at ) {
               router.topic_domain.remove_destination(path, this)
             }
@@ -93,15 +246,11 @@ class Topic(val router:LocalRouter, val 
   }
 
   def bind (destination: DestinationDTO, consumer:DeliveryConsumer) = {
-    destination match {
-      case null=> // unified queue case
-
-        consumers += consumer
-        val list = List(consumer)
-        producers.foreach({ r=>
-          r.bind(list)
-        })
 
+    val target = destination match {
+      case null=>
+        // this is the unified queue case..
+        consumer
       case destination:TopicDestinationDTO=>
         var target = consumer
         slow_consumer_policy match {
@@ -111,64 +260,70 @@ class Topic(val router:LocalRouter, val 
             val queue = router._create_queue(new TempQueueBinding(consumer))
             queue.dispatch_queue.setTargetQueue(consumer.dispatch_queue)
             queue.bind(List(consumer))
-
             consumer_queues += consumer->queue
-            target = queue
+            queue
 
           case "block" =>
             // just have dispatcher dispatch directly to them..
+            consumer
         }
+    }
 
-        consumers += target
-        consumer_counter += 1
-        val list = target :: Nil
-        producers.foreach({ r=>
-          r.bind(list)
-        })
-
+    val link = new LinkDTO()
+    target match {
+      case queue:Queue =>
+        link.kind = "queue"
+        link.id = queue.id
+        link.label = queue.id
+      case _ =>
+        target.connection match {
+          case Some(connection) =>
+            link.kind = "connection"
+            link.id = connection.id.toString
+            link.label = connection.transport.getRemoteAddress.toString
+          case _ =>
+            link.kind = "unknown"
+            link.label = "unknown"
+        }
     }
+
+    val proxy = ProxyDeliveryConsumer(target, link)
+    consumers.put(target, proxy)
+    consumer_counter += 1
+    val list = proxy :: Nil
+    producers.keys.foreach({ r=>
+      r.bind(list)
+    })
     check_idle
   }
 
   def unbind (consumer:DeliveryConsumer, persistent:Boolean) = {
 
-    consumer_queues.remove(consumer) match {
-      case Some(queue)=>
-
-        queue.unbind(List(consumer))
-
-        queue.binding match {
-          case x:TempQueueBinding =>
-
-            val list = List(queue)
-            producers.foreach({ r=>
-              r.unbind(list)
-            })
-            router._destroy_queue(queue.id, null)
-
-        }
-
-      case None=>
-
-        // producers are directly delivering to the consumer..
-        val original = consumers.size
-        consumers -= consumer
-        if( original!= consumers.size ) {
-          val list = List(consumer)
-          producers.foreach({ r=>
-            r.unbind(list)
-          })
-        }
+    for(proxy <- consumers.remove(consumer)) {
+      add_counters(Topic.this, proxy.link)
+      val list = consumer_queues.remove(consumer) match {
+        case Some(queue) =>
+          queue.unbind(List(consumer))
+          queue.binding match {
+            case x:TempQueueBinding =>
+              router._destroy_queue(queue.id, null)
+          }
+          List(queue)
+        case None =>
+          List(consumer)
+      }
+      producers.keys.foreach({ r=>
+        r.unbind(list)
+      })
     }
     check_idle
-
   }
 
   def bind_durable_subscription(destination: DurableSubscriptionDestinationDTO, queue:Queue)  = {
     if( !durable_subscriptions.contains(queue) ) {
       durable_subscriptions += queue
       val list = List(queue)
-      producers.foreach({ r=>
+      producers.keys.foreach({ r=>
         r.bind(list)
       })
       consumer_queues.foreach{case (consumer, q)=>
@@ -184,7 +339,7 @@ class Topic(val router:LocalRouter, val 
     if( durable_subscriptions.contains(queue) ) {
       durable_subscriptions -= queue
       val list = List(queue)
-      producers.foreach({ r=>
+      producers.keys.foreach({ r=>
         r.unbind(list)
       })
       consumer_queues.foreach{case (consumer, q)=>
@@ -197,21 +352,32 @@ class Topic(val router:LocalRouter, val 
   }
 
   def connect (destination:DestinationDTO, producer:BindableDeliveryProducer) = {
-    producers += producer
+    val link = new LinkDTO()
+    producer.connection match {
+      case Some(connection) =>
+        link.kind = "connection"
+        link.id = connection.id.toString
+        link.label = connection.transport.getRemoteAddress.toString
+      case _ =>
+        link.kind = "unknown"
+        link.label = "unknown"
+    }
+    producers.put(producer, link)
     producer_counter += 1
-    producer.bind(consumers.toList ::: durable_subscriptions.toList)
+    producer.bind(consumers.values.toList ::: durable_subscriptions.toList)
     check_idle
   }
 
   def disconnect (producer:BindableDeliveryProducer) = {
-    producers = producers.filterNot( _ == producer )
-    producer.unbind(consumers.toList ::: durable_subscriptions.toList)
+    for(link <- producers.remove(producer) ) {
+      add_counters(this, link)
+    }
     check_idle
   }
 
   def disconnect_producers:Unit ={
-    for( producer <- producers ) {
-      producer.unbind(consumers.toList ::: durable_subscriptions.toList)
+    for( (_, link) <- producers ) {
+      add_counters(this, link)
     }
     producers.clear
     check_idle

Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/AggregateQueueMetricsDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/AggregateQueueMetricsDTO.java?rev=1161969&r1=1161968&r2=1161969&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/AggregateQueueMetricsDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/AggregateQueueMetricsDTO.java Fri Aug 26 02:49:45 2011
@@ -31,9 +31,9 @@ import javax.xml.bind.annotation.*;
 public class AggregateQueueMetricsDTO extends QueueMetricsDTO {
 
     /**
-     * The number of queues which where aggregated.
+     * The number of objects which where aggregated.
      */
-    @XmlAttribute(name="queues")
-    public int queues;
+    @XmlAttribute(name="objects")
+    public int objects;
 
 }
\ No newline at end of file

Copied: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/AggregateTopicMetricsDTO.java (from r1161956, activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/AggregateQueueMetricsDTO.java)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/AggregateTopicMetricsDTO.java?p2=activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/AggregateTopicMetricsDTO.java&p1=activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/AggregateQueueMetricsDTO.java&r1=1161956&r2=1161969&rev=1161969&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/AggregateQueueMetricsDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/AggregateTopicMetricsDTO.java Fri Aug 26 02:49:45 2011
@@ -16,9 +16,10 @@
  */
 package org.apache.activemq.apollo.dto;
 
-import org.codehaus.jackson.annotate.JsonTypeInfo;
-
-import javax.xml.bind.annotation.*;
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlRootElement;
 
 /**
  * <p>
@@ -26,14 +27,14 @@ import javax.xml.bind.annotation.*;
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-@XmlRootElement(name="aggregate_queue_metrics")
+@XmlRootElement(name="aggregate_topic_metrics")
 @XmlAccessorType(XmlAccessType.FIELD)
-public class AggregateQueueMetricsDTO extends QueueMetricsDTO {
+public class AggregateTopicMetricsDTO extends TopicMetricsDTO {
 
     /**
-     * The number of queues which where aggregated.
+     * The number of topics which where aggregated.
      */
-    @XmlAttribute(name="queues")
-    public int queues;
+    @XmlAttribute(name="objects")
+    public int objects;
 
 }
\ No newline at end of file

Added: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationMetricsDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationMetricsDTO.java?rev=1161969&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationMetricsDTO.java (added)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationMetricsDTO.java Fri Aug 26 02:49:45 2011
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.apollo.dto;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlAttribute;
+
+/**
+ * <p>
+ *     Collects metrics about the status of a destination since the
+ *     time a broker gets started.
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+@XmlAccessorType(XmlAccessType.FIELD)
+public class DestinationMetricsDTO {
+
+    /**
+     * The current time on the broker machine.  In milliseconds since the epoch.
+     */
+	@XmlAttribute(name="current_time")
+	public long current_time;
+
+    /**
+     * The number of messages that have been sent to the destination.
+     */
+    @XmlAttribute(name="enqueue_item_counter")
+    public long enqueue_item_counter;
+
+    /**
+     * The total size in bytes of messages that have been sent
+     * to the destination
+     */
+    @XmlAttribute(name="enqueue_size_counter")
+    public long enqueue_size_counter;
+
+    /**
+     * The time stamp of when the last message was sent to the destination.
+     */
+    @XmlAttribute(name="enqueue_ts")
+    public long enqueue_ts;
+
+    /**
+     * The number of messages that have been sent to consumers on
+     * the destination.
+     */
+    @XmlAttribute(name="dequeue_item_counter")
+    public long dequeue_item_counter;
+
+    /**
+     * The total size in bytes of messages that have been sent to consumers on
+     * the destination.
+     */
+    @XmlAttribute(name="dequeue_size_counter")
+    public long dequeue_size_counter;
+
+    /**
+     * The time stamp of when the last dequeue to a consumers occurred.
+     */
+    @XmlAttribute(name="dequeue_ts")
+    public long dequeue_ts;
+
+    /**
+     * The total number of producers that have ever sent to
+     * the destination.
+     */
+    @XmlAttribute(name="producer_counter")
+    public long producer_counter;
+
+    /**
+     * The total number of consumers that have ever subscribed to
+     * the queue.
+     */
+    @XmlAttribute(name="consumer_counter")
+    public long consumer_counter;
+
+
+    /**
+     * The current number of producers sending to the destination
+     * the queue.
+     */
+    @XmlAttribute(name="producer_count")
+    public long producer_count;
+
+    /**
+     * The current number of producers consuming from the destination.
+     */
+    @XmlAttribute(name="consumer_count")
+    public long consumer_count;
+
+}

Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/LinkDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/LinkDTO.java?rev=1161969&r1=1161968&r2=1161969&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/LinkDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/LinkDTO.java Fri Aug 26 02:49:45 2011
@@ -36,5 +36,23 @@ public class LinkDTO {
     @XmlAttribute
     public String label;
 
+    /**
+     * The number of messages that have been dispatched over the link
+     */
+    @XmlAttribute(name="enqueue_item_counter")
+    public long enqueue_item_counter;
+
+    /**
+     * The total size in bytes of messages that have been dispatched
+     * over the link
+     */
+    @XmlAttribute(name="enqueue_size_counter")
+    public long enqueue_size_counter;
+
+    /**
+     * Timestamp of when a message last went over the link.
+     */
+    @XmlAttribute(name="enqueue_tsr")
+    public long enqueue_ts;
 
 }
\ No newline at end of file

Copied: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueConsumerLinkDTO.java (from r1161956, activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueConsumerStatusDTO.java)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueConsumerLinkDTO.java?p2=activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueConsumerLinkDTO.java&p1=activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueConsumerStatusDTO.java&r1=1161956&r2=1161969&rev=1161969&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueConsumerStatusDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueConsumerLinkDTO.java Fri Aug 26 02:49:45 2011
@@ -21,23 +21,15 @@ import javax.xml.bind.annotation.*;
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-@XmlRootElement(name="queue_consumer_status")
+@XmlRootElement(name="queue_consumer_link")
 @XmlAccessorType(XmlAccessType.FIELD)
-public class QueueConsumerStatusDTO {
-
-    /**
-     * link to who is consuming from the queue.
-     */
-    public LinkDTO link;
+public class QueueConsumerLinkDTO extends LinkDTO {
 
     public long position = 0;
 
     public int acquired_count;
     public long acquired_size;
 
-    public long total_dispatched_count;
-    public long total_dispatched_size;
-
     public long total_ack_count;
     public long total_nack_count;
 

Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueMetricsDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueMetricsDTO.java?rev=1161969&r1=1161968&r2=1161969&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueMetricsDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueMetricsDTO.java Fri Aug 26 02:49:45 2011
@@ -39,56 +39,7 @@ import javax.xml.bind.annotation.XmlRoot
  */
 @XmlRootElement(name="queue_metrics")
 @XmlAccessorType(XmlAccessType.FIELD)
-public class QueueMetricsDTO {
-
-    /**
-     * The current time on the broker machine.  In milliseconds since the epoch.
-     */
-	@XmlAttribute(name="current_time")
-	public long current_time;
-
-    /**
-     * The number of messages that have been added to the queue.
-     * This includes any messages which were recovered from
-     * persistent storage when the broker was first started.
-     */
-    @XmlAttribute(name="enqueue_item_counter")
-    public long enqueue_item_counter;
-
-    /**
-     * The total size in bytes of messages that have been added
-     * to the queue. This includes any messages
-     * which were recovered from persistent storage when the broker
-     * was first started.
-     */
-    @XmlAttribute(name="enqueue_size_counter")
-    public long enqueue_size_counter;
-
-    /**
-     * The time stamp of when the last enqueue occurred.
-     */
-    @XmlAttribute(name="enqueue_ts")
-    public long enqueue_ts;
-
-    /**
-     * The number of messages that have been removed from the queue.
-     */
-    @XmlAttribute(name="dequeue_item_counter")
-    public long dequeue_item_counter;
-
-    /**
-     * The total size in bytes of messages that have been
-     * removed from the queue.
-     */
-    @XmlAttribute(name="dequeue_size_counter")
-    public long dequeue_size_counter;
-
-    /**
-     * The time stamp of when the last dequeue occurred.
-     */
-    @XmlAttribute(name="dequeue_ts")
-    public long dequeue_ts;
-
+public class QueueMetricsDTO extends DestinationMetricsDTO {
     /**
      * The number of messages which expired before they could be processed.
      */
@@ -204,33 +155,4 @@ public class QueueMetricsDTO {
     @XmlAttribute(name="swap_in_size_counter")
     public long swap_in_size_counter;
 
-    /**
-     * The total number of producers that have sent to
-     * the queue.
-     */
-    @XmlAttribute(name="producer_counter")
-    public long producer_counter;
-
-    /**
-     * The total number of consumers that have ever subscribed to
-     * the queue.
-     */
-    @XmlAttribute(name="consumer_counter")
-    public long consumer_counter;
-
-
-    /**
-     * The current number of producers attached to
-     * the queue.
-     */
-    @XmlAttribute(name="producer_count")
-    public long producer_count;
-
-    /**
-     * The current number of consumers attached to
-     * the queue.
-     */
-    @XmlAttribute(name="consumer_count")
-    public long consumer_count;
-
 }

Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java?rev=1161969&r1=1161968&r2=1161969&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java Fri Aug 26 02:49:45 2011
@@ -45,7 +45,6 @@ public class QueueStatusDTO extends Serv
     @XmlElement(name="entry")
     public List<EntryStatusDTO> entries = new ArrayList<EntryStatusDTO>();
 
-
     /**
      * Ids of all connections that are producing to the destination
      */
@@ -56,6 +55,6 @@ public class QueueStatusDTO extends Serv
      * Ids of all connections that are consuming from the destination
      */
     @XmlElement(name="consumer")
-    public List<QueueConsumerStatusDTO> consumers = new ArrayList<QueueConsumerStatusDTO>();
+    public List<QueueConsumerLinkDTO> consumers = new ArrayList<QueueConsumerLinkDTO>();
 
 }
\ No newline at end of file

Copied: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TopicMetricsDTO.java (from r1161956, activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/AggregateQueueMetricsDTO.java)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TopicMetricsDTO.java?p2=activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TopicMetricsDTO.java&p1=activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/AggregateQueueMetricsDTO.java&r1=1161956&r2=1161969&rev=1161969&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/AggregateQueueMetricsDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TopicMetricsDTO.java Fri Aug 26 02:49:45 2011
@@ -14,26 +14,24 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.apollo.dto;
 
-import org.codehaus.jackson.annotate.JsonTypeInfo;
+package org.apache.activemq.apollo.dto;
 
-import javax.xml.bind.annotation.*;
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlRootElement;
 
 /**
  * <p>
+ *     Collects metrics about the status of a topic since the
+ *     time a broker gets started.
  * </p>
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-@XmlRootElement(name="aggregate_queue_metrics")
+@XmlRootElement(name="topic_metrics")
 @XmlAccessorType(XmlAccessType.FIELD)
-public class AggregateQueueMetricsDTO extends QueueMetricsDTO {
-
-    /**
-     * The number of queues which where aggregated.
-     */
-    @XmlAttribute(name="queues")
-    public int queues;
+public class TopicMetricsDTO extends DestinationMetricsDTO {
 
-}
\ No newline at end of file
+}

Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TopicStatusDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TopicStatusDTO.java?rev=1161969&r1=1161968&r2=1161969&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TopicStatusDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TopicStatusDTO.java Fri Aug 26 02:49:45 2011
@@ -51,17 +51,7 @@ public class TopicStatusDTO extends Serv
     @XmlElement(name="dsub")
     public List<String> dsubs = new ArrayList<String>();
 
-    /**
-     * The total number of producers that have sent to
-     * the topic.
-     */
-    @XmlAttribute(name="producer_counter")
-    public long producer_counter;
+    @XmlElement
+    public TopicMetricsDTO metrics = new TopicMetricsDTO();
 
-    /**
-     * The total number of consumers that have ever subscribed to
-     * the topic.
-     */
-    @XmlAttribute(name="consumer_counter")
-    public long consumer_counter;
 }
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostStatusDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostStatusDTO.java?rev=1161969&r1=1161968&r2=1161969&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostStatusDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostStatusDTO.java Fri Aug 26 02:49:45 2011
@@ -39,36 +39,18 @@ public class VirtualHostStatusDTO extend
     public List<String> topics = new ArrayList<String>();
 
     /**
-     * The number of topics that exist on the virtual host
-     */
-    @XmlAttribute(name="topic_count")
-    public int topic_count;
-
-    /**
      * Ids of all the queues that exist on the broker
      */
     @XmlElement(name="queue")
     public List<String> queues = new ArrayList<String>();
 
     /**
-     * The number of queues that exist on the virtual host
-     */
-    @XmlAttribute(name="queue_count")
-    public int queue_count;
-
-    /**
      * Ids of all the durable subscriptions that exist on the broker
      */
     @XmlElement(name="dsub")
     public List<String> dsubs = new ArrayList<String>();
 
     /**
-     * The number of durable subscriptions that exist on the virtual host
-     */
-    @XmlAttribute(name="dsub_count")
-    public int dsub_count;
-
-    /**
      * Is the virtual host using a store.
      */
     @XmlAttribute(name="store")

Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/resources/org/apache/activemq/apollo/dto/jaxb.index
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/resources/org/apache/activemq/apollo/dto/jaxb.index?rev=1161969&r1=1161968&r2=1161969&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/resources/org/apache/activemq/apollo/dto/jaxb.index (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/resources/org/apache/activemq/apollo/dto/jaxb.index Fri Aug 26 02:49:45 2011
@@ -42,7 +42,10 @@ NullStoreDTO
 QueueDTO
 TopicDTO
 LinkDTO
-QueueConsumerStatusDTO
+QueueConsumerLinkDTO
 ValueDTO
 StringListDTO
-DataPageDTO
\ No newline at end of file
+DataPageDTO
+AggregateTopicMetricsDTO
+AggregateQueueMetricsDTO
+DestinationMetricsDTO
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1161969&r1=1161968&r2=1161969&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala Fri Aug 26 02:49:45 2011
@@ -869,6 +869,8 @@ class OpenwireProtocolHandler extends Pr
 
       val downstream = session_manager.open(producer.dispatch_queue, receive_buffer_size)
       def remaining_capacity = downstream.remaining_capacity
+      def enqueue_item_counter = downstream.accepted_count
+      def enqueue_size_counter = downstream.accepted_size
 
       def close = {
         assert(producer.dispatch_queue.isExecuting)

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1161969&r1=1161968&r2=1161969&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Fri Aug 26 02:49:45 2011
@@ -389,6 +389,9 @@ class StompProtocolHandler extends Proto
       val downstream = session_manager.open(producer.dispatch_queue, receive_buffer_size)
 
       def remaining_capacity = downstream.remaining_capacity
+      def enqueue_item_counter = downstream.accepted_count
+      def enqueue_size_counter = downstream.accepted_size
+
 
       def close = {
         assert(producer.dispatch_queue.isExecuting)

Modified: activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala?rev=1161969&r1=1161968&r2=1161969&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala Fri Aug 26 02:49:45 2011
@@ -215,50 +215,110 @@ case class BrokerResource() extends Reso
     rc
   }
 
-  def aggregate_queue_metrics(queue_metrics:Iterable[QueueMetricsDTO]):AggregateQueueMetricsDTO = {
-    queue_metrics.foldLeft(new AggregateQueueMetricsDTO){ (rc, q)=>
-      rc.enqueue_item_counter += q.enqueue_item_counter
-      rc.enqueue_size_counter += q.enqueue_size_counter
-      rc.enqueue_ts = rc.enqueue_ts max q.enqueue_ts
-
-      rc.dequeue_item_counter += q.dequeue_item_counter
-      rc.dequeue_size_counter += q.dequeue_size_counter
-      rc.dequeue_ts = rc.dequeue_ts max q.dequeue_ts
-
-      rc.nack_item_counter += q.nack_item_counter
-      rc.nack_size_counter += q.nack_size_counter
-      rc.nack_ts = rc.nack_ts max q.nack_ts
-
-      rc.expired_item_counter += q.expired_item_counter
-      rc.expired_size_counter += q.expired_size_counter
-      rc.expired_ts = rc.expired_ts max q.expired_ts
-
-      rc.queue_size += q.queue_size
-      rc.queue_items += q.queue_items
-
-      rc.swap_out_item_counter += q.swap_out_item_counter
-      rc.swap_out_size_counter += q.swap_out_size_counter
-      rc.swap_in_item_counter += q.swap_in_item_counter
-      rc.swap_in_size_counter += q.swap_in_size_counter
-
-      rc.swapping_in_size += q.swapping_in_size
-      rc.swapping_out_size += q.swapping_out_size
-
-      rc.swapped_in_items += q.swapped_in_items
-      rc.swapped_in_size += q.swapped_in_size
-
-      rc.swapped_in_size_max += q.swapped_in_size_max
-      rc.producer_counter += q.producer_counter
-      rc.consumer_counter += q.consumer_counter
-      rc.producer_count += q.producer_count
-      rc.consumer_count += q.consumer_count
+  @GET
+  @Path("topic-metrics")
+  def get_topic_metrics(): AggregateTopicMetricsDTO = {
+    val rc:AggregateTopicMetricsDTO = with_broker { broker =>
+      monitoring(broker) {
+        get_topic_metrics(broker)
+      }
+    }
+    rc.current_time = System.currentTimeMillis()
+    rc
+  }
+
+  @GET
+  @Path("dsub-metrics")
+  def get_dsub_metrics(): AggregateQueueMetricsDTO = {
+    val rc:AggregateQueueMetricsDTO = with_broker { broker =>
+      monitoring(broker) {
+        get_dsub_metrics(broker)
+      }
+    }
+    rc.current_time = System.currentTimeMillis()
+    rc
+  }
+
+  def aggregate(queue:AggregateQueueMetricsDTO, topic:AggregateTopicMetricsDTO, dsub:AggregateQueueMetricsDTO):AggregateQueueMetricsDTO = {
+    // zero out the enqueue stats on the dsubs since they will already be accounted for in the topic
+    // stats.
+    dsub.enqueue_item_counter = 0
+    dsub.enqueue_size_counter = 0
+    dsub.enqueue_ts = 0
+    val rc = aggregate_queue_metrics(List(queue, dsub))
+    add_destination_metrics(rc, topic)
+    rc.objects += topic.objects
+    rc.current_time = System.currentTimeMillis()
+    rc
+  }
+
+  @GET
+  @Path("dest-metrics")
+  def get_dest_metrics(): AggregateQueueMetricsDTO = {
+    aggregate(get_queue_metrics(), get_topic_metrics(), get_dsub_metrics())
+  }
+
+  def add_destination_metrics(to:DestinationMetricsDTO, from:DestinationMetricsDTO) = {
+    to.enqueue_item_counter += from.enqueue_item_counter
+    to.enqueue_size_counter += from.enqueue_size_counter
+    to.enqueue_ts = to.enqueue_ts max from.enqueue_ts
+
+    to.dequeue_item_counter += from.dequeue_item_counter
+    to.dequeue_size_counter += from.dequeue_size_counter
+    to.dequeue_ts = to.dequeue_ts max from.dequeue_ts
+
+    to.producer_counter += from.producer_counter
+    to.consumer_counter += from.consumer_counter
+    to.producer_count += from.producer_count
+    to.consumer_count += from.consumer_count
+  }
+
+  def aggregate_queue_metrics(metrics:Iterable[QueueMetricsDTO]):AggregateQueueMetricsDTO = {
+    metrics.foldLeft(new AggregateQueueMetricsDTO){ (memo, metric)=>
+      add_destination_metrics(memo, metric)
+
+      memo.nack_item_counter += metric.nack_item_counter
+      memo.nack_size_counter += metric.nack_size_counter
+      memo.nack_ts = memo.nack_ts max metric.nack_ts
+
+      memo.expired_item_counter += metric.expired_item_counter
+      memo.expired_size_counter += metric.expired_size_counter
+      memo.expired_ts = memo.expired_ts max metric.expired_ts
+
+      memo.queue_size += metric.queue_size
+      memo.queue_items += metric.queue_items
+
+      memo.swap_out_item_counter += metric.swap_out_item_counter
+      memo.swap_out_size_counter += metric.swap_out_size_counter
+      memo.swap_in_item_counter += metric.swap_in_item_counter
+      memo.swap_in_size_counter += metric.swap_in_size_counter
+
+      memo.swapping_in_size += metric.swapping_in_size
+      memo.swapping_out_size += metric.swapping_out_size
 
-      if( q.isInstanceOf[AggregateQueueMetricsDTO] ) {
-        rc.queues += q.asInstanceOf[AggregateQueueMetricsDTO].queues
+      memo.swapped_in_items += metric.swapped_in_items
+      memo.swapped_in_size += metric.swapped_in_size
+
+      memo.swapped_in_size_max += metric.swapped_in_size_max
+
+      if( metric.isInstanceOf[AggregateQueueMetricsDTO] ) {
+        memo.objects += metric.asInstanceOf[AggregateQueueMetricsDTO].objects
       } else {
-        rc.queues += 1
+        memo.objects += 1
       }
-      rc
+      memo
+    }
+  }
+
+  def aggregate_topic_metrics(metrics:Iterable[TopicMetricsDTO]):AggregateTopicMetricsDTO = {
+    metrics.foldLeft(new AggregateTopicMetricsDTO){ (memo, metric)=>
+      add_destination_metrics(memo, metric)
+      if( metric.isInstanceOf[AggregateTopicMetricsDTO] ) {
+        memo.objects += metric.asInstanceOf[AggregateTopicMetricsDTO].objects
+      } else {
+        memo.objects += 1
+      }
+      memo
     }
   }
 
@@ -271,9 +331,40 @@ case class BrokerResource() extends Reso
 
   def get_queue_metrics(host:VirtualHost):FutureResult[AggregateQueueMetricsDTO] = {
     val router:LocalRouter = host
-    val queues: Iterable[Queue] = router.queues_by_id.values
+    val queues: Iterable[Queue] = router.queue_domain.destinations
     val metrics = sync_all(queues) { queue =>
-      get_queue_metrics(queue)
+      queue.get_queue_metrics
+    }
+    metrics.map( x=> Success(aggregate_queue_metrics(x.flatMap(_.success_option))) )
+  }
+
+
+  def get_topic_metrics(broker:Broker):FutureResult[AggregateTopicMetricsDTO] = {
+    val metrics = sync_all(broker.virtual_hosts.values) { host =>
+      get_topic_metrics(host)
+    }
+    metrics.map( x=> Success(aggregate_topic_metrics(x.flatMap(_.success_option)) ))
+  }
+
+  def get_topic_metrics(host:VirtualHost):FutureResult[AggregateTopicMetricsDTO] = {
+    val router:LocalRouter = host
+    val topics: Iterable[Topic] = router.topic_domain.destinations
+    val metrics = topics.map(_.status.metrics)
+    FutureResult(Success(aggregate_topic_metrics(metrics)))
+  }
+
+  def get_dsub_metrics(broker:Broker):FutureResult[AggregateQueueMetricsDTO] = {
+    val metrics = sync_all(broker.virtual_hosts.values) { host =>
+      get_dsub_metrics(host)
+    }
+    metrics.map( x=> Success(aggregate_queue_metrics(x.flatMap(_.success_option)) ))
+  }
+
+  def get_dsub_metrics(host:VirtualHost):FutureResult[AggregateQueueMetricsDTO] = {
+    val router:LocalRouter = host
+    val dsubs: Iterable[Queue] = router.topic_domain.durable_subscriptions_by_id.values
+    val metrics = sync_all(dsubs) { dsub =>
+      dsub.get_queue_metrics
     }
     metrics.map( x=> Success(aggregate_queue_metrics(x.flatMap(_.success_option))) )
   }
@@ -316,15 +407,12 @@ case class BrokerResource() extends Reso
     router.queue_domain.destinations.foreach { node =>
       result.queues.add(node.id)
     }
-    result.queue_count = result.queues.size()
     router.topic_domain.destinations.foreach { node =>
       result.topics.add(node.id)
     }
-    result.topic_count = result.topics.size()
     router.topic_domain.durable_subscriptions_by_id.keys.foreach { id =>
       result.dsubs.add(id)
     }
-    result.dsub_count = result.dsubs.size()
 
     result
   }
@@ -340,6 +428,34 @@ case class BrokerResource() extends Reso
     rc
   }
 
+  @GET @Path("virtual-hosts/{id}/topic-metrics")
+  def virtual_host_topic_metrics(@PathParam("id") id : String): AggregateTopicMetricsDTO = {
+    val rc:AggregateTopicMetricsDTO = with_virtual_host(id) { host =>
+      monitoring(host) {
+        get_topic_metrics(host)
+      }
+    }
+    rc.current_time = System.currentTimeMillis()
+    rc
+  }
+
+  @GET @Path("virtual-hosts/{id}/dsub-metrics")
+  def virtual_host_dsub_metrics(@PathParam("id") id : String): AggregateQueueMetricsDTO = {
+    val rc:AggregateQueueMetricsDTO = with_virtual_host(id) { host =>
+      monitoring(host) {
+        get_dsub_metrics(host)
+      }
+    }
+    rc.current_time = System.currentTimeMillis()
+    rc
+  }
+
+  @GET @Path("virtual-hosts/{id}/dest-metrics")
+  def virtual_host_dest_metrics(@PathParam("id") id : String): AggregateQueueMetricsDTO = {
+    aggregate(virtual_host_queue_metrics(id), virtual_host_topic_metrics(id), virtual_host_dsub_metrics(id))
+  }
+
+
   @GET @Path("virtual-hosts/{id}/store")
   def store(@PathParam("id") id : String):StoreStatusDTO = {
     with_virtual_host(id) { host =>
@@ -357,22 +473,6 @@ case class BrokerResource() extends Reso
     }
   }
 
-  def link(connection:BrokerConnection) = {
-    val link = new LinkDTO()
-    link.kind = "connection"
-    link.id = connection.id.toString
-    link.label = connection.transport.getRemoteAddress.toString
-    link
-  }
-
-  def link(queue:Queue) = {
-    val link = new LinkDTO()
-    link.kind = "queue"
-    link.id = queue.id
-    link.label = queue.id
-    link
-  }
-
   class JosqlHelper {
 
     def get(o:AnyRef, name:String):AnyRef = {
@@ -564,101 +664,14 @@ case class BrokerResource() extends Reso
     }
   }
 
-  def status(node: Topic): FutureResult[TopicStatusDTO] = {
-    monitoring(node) {
-      val rc = new TopicStatusDTO
-      rc.id = node.id
-      rc.state = "STARTED"
-      rc.state_since = node.created_at
-      rc.config = node.config
-      rc.producer_counter = node.producer_counter
-      rc.consumer_counter = node.consumer_counter
-
-      node.durable_subscriptions.foreach {
-        q =>
-          rc.dsubs.add(q.id)
-      }
-      node.consumers.foreach {
-        consumer =>
-          consumer match {
-            case queue: Queue =>
-              rc.consumers.add(link(queue))
-            case _ =>
-              consumer.connection.foreach {
-                c =>
-                  rc.consumers.add(link(c))
-              }
-          }
-      }
-      node.producers.flatMap(_.connection).foreach {
-        connection =>
-          rc.producers.add(link(connection))
-      }
-
-      rc
-    }
+  def status(node: Topic): FutureResult[TopicStatusDTO] = monitoring(node) {
+    node.status
   }
 
   def status(q:Queue, entries:Boolean=false) = monitoring(q) {
-    val rc = new QueueStatusDTO
-    rc.id = q.id
-    rc.state = q.service_state.toString
-    rc.state_since = q.service_state.since
-    rc.binding = q.binding.binding_dto
-    rc.config = q.config
-    rc.metrics = get_queue_metrics(q)
-    rc.metrics.current_time = System.currentTimeMillis()
-
-    if( entries ) {
-      var cur = q.head_entry
-      while( cur!=null ) {
-
-        val e = new EntryStatusDTO
-        e.seq = cur.seq
-        e.count = cur.count
-        e.size = cur.size
-        e.consumer_count = cur.parked.size
-        e.is_prefetched = cur.is_prefetched
-        e.state = cur.label
-
-        rc.entries.add(e)
-
-        cur = if( cur == q.tail_entry ) {
-          null
-        } else {
-          cur.nextOrTail
-        }
-      }
-    }
-
-    q.inbound_sessions.flatMap( _.producer.connection ).foreach { connection=>
-      rc.producers.add(link(connection))
-    }
-    q.all_subscriptions.valuesIterator.toSeq.foreach{ sub =>
-      val status = new QueueConsumerStatusDTO
-      sub.consumer.connection.foreach(x=> status.link = link(x))
-      status.position = sub.pos.seq
-      status.total_dispatched_count = sub.total_dispatched_count
-      status.total_dispatched_size = sub.total_dispatched_size
-      status.total_ack_count = sub.total_ack_count
-      status.total_nack_count = sub.total_nack_count
-      status.acquired_size = sub.acquired_size
-      status.acquired_count = sub.acquired_count
-      status.waiting_on = if( sub.full ) {
-        "ack"
-      } else if( sub.pos.is_tail ) {
-        "producer"
-      } else if( !sub.pos.is_loaded ) {
-        "load"
-      } else {
-        "dispatch"
-      }
-      rc.consumers.add(status)
-    }
-    rc
+    q.status(entries)
   }
 
-
   @GET @Path("connectors")
   @Produces(Array("application/json"))
   def connectors(@QueryParam("f") f:java.util.List[String], @QueryParam("q") q:String,
@@ -778,50 +791,4 @@ case class BrokerResource() extends Reso
     }
   }
 
-  def get_queue_metrics(q:Queue):QueueMetricsDTO = {
-    val rc = new QueueMetricsDTO
-
-    rc.enqueue_item_counter = q.enqueue_item_counter
-    rc.enqueue_size_counter = q.enqueue_size_counter
-    rc.enqueue_ts = q.enqueue_ts
-
-    rc.dequeue_item_counter = q.dequeue_item_counter
-    rc.dequeue_size_counter = q.dequeue_size_counter
-    rc.dequeue_ts = q.dequeue_ts
-
-    rc.nack_item_counter = q.nack_item_counter
-    rc.nack_size_counter = q.nack_size_counter
-    rc.nack_ts = q.nack_ts
-
-    rc.expired_item_counter = q.expired_item_counter
-    rc.expired_size_counter = q.expired_size_counter
-    rc.expired_ts = q.expired_ts
-
-    rc.queue_size = q.queue_size
-    rc.queue_items = q.queue_items
-
-    rc.swap_out_item_counter = q.swap_out_item_counter
-    rc.swap_out_size_counter = q.swap_out_size_counter
-    rc.swap_in_item_counter = q.swap_in_item_counter
-    rc.swap_in_size_counter = q.swap_in_size_counter
-
-    rc.swapping_in_size = q.swapping_in_size
-    rc.swapping_out_size = q.swapping_out_size
-
-    rc.swapped_in_items = q.swapped_in_items
-    rc.swapped_in_size = q.swapped_in_size
-
-    rc.swapped_in_size_max = q.swapped_in_size_max
-
-    rc.producer_counter = q.producer_counter
-    rc.consumer_counter = q.consumer_counter
-
-    rc.producer_count = q.producers.size
-    rc.consumer_count = q.all_subscriptions.size
-
-    rc
-  }
-
-
-
 }

Modified: activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade?rev=1161969&r1=1161968&r2=1161969&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade (original)
+++ activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade Fri Aug 26 02:49:45 2011
@@ -74,28 +74,33 @@ h3 Producers
 p total producers ever : #{metrics.producer_counter}
 ul
   - for( x <- producers )
-    - x.kind match
-      - case "connection" =>
-        li.producer
+    li.producer
+      - x.kind match
+        - case "queue" =>
+          a(href={ path("../../queues/"+x.id+".html") }) #{x.label}
+        - case "connection" =>
           a(href={ path("../../../../connections/"+x.id+".html") }) #{x.label}
-      - case _ =>
+        - case _ =>
+      p dispatched: #{x.enqueue_item_counter} messages (#{memory(x.enqueue_size_counter)}), #{uptime(x.enqueue_ts)} ago
 
 
 h3 Consumers
 p total consumers ever : #{metrics.consumer_counter}
 ul
-  - for( consumer <- consumers )
-    - import consumer._
+  - for( x <- consumers )
     li.consumer
-      - if( link !=null )
-        a(href={ path("../../../../connections/"+link.id+".html") }) #{link.label}
-
-      p next message seq: #{position}
-      p acquired: #{acquired_count} messages (#{memory(acquired_size)})
-      p dispatched: #{total_dispatched_count} messages (#{memory(total_dispatched_size)})
-      p acks: #{total_ack_count} messages
-      p naks: #{total_nack_count} messages
-      p waiting on: #{waiting_on}
+      - x.kind match
+        - case "queue" =>
+          a(href={ path("../../queues/"+x.id+".html") }) #{x.label}
+        - case "connection" =>
+          a(href={ path("../../../../connections/"+x.id+".html") }) #{x.label}
+        - case _ =>
+      p dispatched: #{x.enqueue_item_counter} messages (#{memory(x.enqueue_size_counter)}), #{uptime(x.enqueue_ts)} ago
+      p next message seq: #{x.position}
+      p acquired: #{x.acquired_count} messages (#{memory(x.acquired_size)})
+      p acks: #{x.total_ack_count} messages
+      p naks: #{x.total_nack_count} messages
+      p waiting on: #{x.waiting_on}
 
 - if ( entries.isEmpty )
   h2

Modified: activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/TopicStatusDTO.jade
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/TopicStatusDTO.jade?rev=1161969&r1=1161968&r2=1161969&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/TopicStatusDTO.jade (original)
+++ activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/TopicStatusDTO.jade Fri Aug 26 02:49:45 2011
@@ -26,27 +26,30 @@ p state: #{state} #{ uptime(state_since)
 h3 Topic Domain
 
 h4 Producers
-p total producers ever : #{producer_counter}
+p total producers ever : #{metrics.producer_counter}
 ul
   - for( x <- producers )
-    - x.kind match
-      - case "connection" =>
-        li
+    li.producer
+      - x.kind match
+        - case "queue" =>
+          a(href={ path("../../queues/"+x.id+".html") }) #{x.label}
+        - case "connection" =>
           a(href={ path("../../../../connections/"+x.id+".html") }) #{x.label}
-      - case _ =>
+        - case _ =>
+      p dispatched: #{x.enqueue_item_counter} messages (#{memory(x.enqueue_size_counter)}), #{uptime(x.enqueue_ts)} ago
 
 h4 Consumers
-p total consumers ever : #{consumer_counter}
+p total consumers ever : #{metrics.producer_counter}
 ul
   - for( x <- consumers )
-    - x.kind match
-      - case "queue" =>
-        li
+    li.consumer
+      - x.kind match
+        - case "queue" =>
           a(href={ path("../../queues/"+x.id+".html") }) #{x.label}
-      - case "connection" =>
-        li
+        - case "connection" =>
           a(href={ path("../../../../connections/"+x.id+".html") }) #{x.label}
-      - case _ =>
+        - case _ =>
+      p dispatched: #{x.enqueue_item_counter} messages (#{memory(x.enqueue_size_counter)}), #{uptime(x.enqueue_ts)} ago
 
 h4 Durable Subscribers
 ul