You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/08/29 22:17:25 UTC

svn commit: r1162978 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/ apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo...

Author: chirino
Date: Mon Aug 29 20:17:25 2011
New Revision: 1162978

URL: http://svn.apache.org/viewvc?rev=1162978&view=rev
Log:
Topic metrics did not make sense when used slow consumer strategy = queue.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
    activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala
    activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1162978&r1=1162977&r2=1162978&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Mon Aug 29 20:17:25 2011
@@ -211,6 +211,7 @@ class Queue(val router: LocalRouter, val
   var idled_at = 0L
 
   def get_queue_metrics:DestMetricsDTO = {
+    dispatch_queue.assertExecuting()
     val rc = new DestMetricsDTO
 
     rc.enqueue_item_counter = this.enqueue_item_counter
@@ -580,7 +581,7 @@ class Queue(val router: LocalRouter, val
   }
 
   def swap_messages = {
-
+    dispatch_queue.assertExecuting()
     now = System.currentTimeMillis()
 
     var cur = entries.getHead

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala?rev=1162978&r1=1162977&r2=1162978&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala Mon Aug 29 20:17:25 2011
@@ -25,6 +25,7 @@ import org.fusesource.hawtdispatch._
 import collection.mutable.{HashSet, HashMap, ListBuffer}
 import java.lang.Long
 import security.SecuredResource
+
 /**
  * <p>
  * A logical messaging topic
@@ -34,33 +35,77 @@ import security.SecuredResource
  */
 class Topic(val router:LocalRouter, val destination_dto:TopicDestinationDTO, var config_updater: ()=>TopicDTO, val id:String, path:Path) extends DomainDestination with SecuredResource {
 
-  var enqueue_item_counter = 0L
-  var enqueue_size_counter = 0L
-  var enqueue_ts = now
-
-  var dequeue_item_counter = 0L
-  var dequeue_size_counter = 0L
-  var dequeue_ts = now
+  val topic_metrics = new DestMetricsDTO
 
   val resource_kind =SecuredResource.TopicKind
-
-  var proxy_sessions = new HashSet[TopicDeliverySession]()
+  var proxy_sessions = new HashSet[DeliverySession]()
 
   implicit def from_link(from:LinkDTO):(Long,Long,Long)=(from.enqueue_item_counter, from.enqueue_size_counter, from.enqueue_ts)
-  implicit def from_session(from:TopicDeliverySession):(Long,Long,Long)=(from.enqueue_item_counter, from.enqueue_size_counter, from.enqueue_ts)
+  implicit def from_session(from:DeliverySession):(Long,Long,Long)=(from.enqueue_item_counter, from.enqueue_size_counter, from.enqueue_ts)
 
-  def add_counters(to:LinkDTO, from:(Long,Long,Long)):Unit = {
+  def add_link_counters(to:LinkDTO, from:(Long,Long,Long)):Unit = {
     to.enqueue_item_counter += from._1
     to.enqueue_size_counter += from._2
     to.enqueue_ts = to.enqueue_ts max from._3
   }
-  def add_counters(to:Topic, from:(Long,Long,Long)):Unit = {
+  def add_enqueue_counters(to:DestMetricsDTO, from:(Long,Long,Long)):Unit = {
     to.enqueue_item_counter += from._1
     to.enqueue_size_counter += from._2
     to.enqueue_ts = to.enqueue_ts max from._3
   }
+  def add_dequeue_counters(to:DestMetricsDTO, from:(Long,Long,Long)):Unit = {
+    to.dequeue_item_counter += from._1
+    to.dequeue_size_counter += from._2
+    to.dequeue_ts = to.enqueue_ts max from._3
+  }
+
+  val producer_tracker = new DeliveryConsumer {
+    def retained() = 0
+    def retain() {}
+    def release() {}
+
+    def matches(message: Delivery) = true
+    def is_persistent = false
+    def dispatch_queue = null
+    def connect(producer: DeliveryProducer) = ProxyProducerSession(producer)
+
+  }
+
+
+  case class ProxyProducerSession(val producer:DeliveryProducer) extends DeliverySession {
+
+    dispatch_queue {
+      proxy_sessions.add(this)
+    }
+
+    def remaining_capacity = 1
+    var enqueue_ts = 0L
+    var enqueue_size_counter = 0L
+    var enqueue_item_counter = 0L
+    var refiller:Runnable = null
+
+    def offer(value: Delivery) = {
+      enqueue_item_counter += 1
+      enqueue_size_counter += value.size
+      enqueue_ts = now
+      true
+    }
+
+    def close = {
+      dispatch_queue {
+        proxy_sessions.remove(this)
+        producers.get(producer.asInstanceOf[BindableDeliveryProducer]) match {
+          case Some(link) => add_link_counters(link, this)
+          case _          => add_enqueue_counters(topic_metrics, this)
+        }
+      }
+    }
+
+    def full = false
+    def consumer = producer_tracker
+  }
 
-  case class TopicDeliverySession(session:DeliverySession) extends DeliverySession with SessionSinkFilter[Delivery] {
+  case class ProxyConsumerSession(proxy:ProxyDeliveryConsumer, session:DeliverySession) extends DeliverySession with SessionSinkFilter[Delivery] {
     def downstream = session
 
     dispatch_queue {
@@ -71,13 +116,14 @@ class Topic(val router:LocalRouter, val 
       session.close
       dispatch_queue {
         proxy_sessions.remove(this)
-        consumers.get(session.consumer) match {
-          case Some(proxy) => add_counters(proxy.link, this)
-          case _          => add_counters(Topic.this, this)
-        }
-        producers.get(session.producer.asInstanceOf[BindableDeliveryProducer]) match {
-          case Some(link) => add_counters(link, this)
-          case _          => add_counters(Topic.this, this)
+        consumers.get(proxy.registered) match {
+          case Some(proxy) => add_link_counters(proxy.link, this)
+          case _ =>
+            proxy.consumer match {
+              case queue:Queue =>
+              case _ =>
+                add_dequeue_counters(topic_metrics, this)
+            }
         }
       }
     }
@@ -88,7 +134,8 @@ class Topic(val router:LocalRouter, val 
     def offer(value: Delivery) = downstream.offer(value)
   }
 
-  case class ProxyDeliveryConsumer(consumer:DeliveryConsumer, link:LinkDTO) extends DeliveryConsumer {
+  case class ProxyDeliveryConsumer(consumer:DeliveryConsumer, link:LinkDTO, registered:DeliveryConsumer) extends DeliveryConsumer {
+
     def retained() = consumer.retained()
     def retain() = consumer.retain()
     def release() = consumer.release()
@@ -96,7 +143,7 @@ class Topic(val router:LocalRouter, val 
     def is_persistent = consumer.is_persistent
     def dispatch_queue = consumer.dispatch_queue
     def connect(producer: DeliveryProducer) = {
-      new TopicDeliverySession(consumer.connect(producer))
+      new ProxyConsumerSession(this, consumer.connect(producer))
     }
   }
 
@@ -107,8 +154,6 @@ class Topic(val router:LocalRouter, val 
   var idled_at = 0L
   val created_at = now
   var auto_delete_after = 0
-  var producer_counter = 0L
-  var consumer_counter = 0L
 
   var config:TopicDTO = _
 
@@ -124,7 +169,7 @@ class Topic(val router:LocalRouter, val 
 
   def slow_consumer_policy = config.slow_consumer_policy.getOrElse("block")
 
-  def status:TopicStatusDTO = {
+  def status(on_complete:(TopicStatusDTO)=>Unit) = {
     dispatch_queue.assertExecuting()
 
     val rc = new TopicStatusDTO
@@ -133,8 +178,6 @@ class Topic(val router:LocalRouter, val 
     rc.state_since = this.created_at
     rc.config = this.config
 
-    rc.metrics.producer_counter = producer_counter
-    rc.metrics.consumer_counter = consumer_counter
     rc.metrics.producer_count = producers.size
     rc.metrics.consumer_count = consumers.size
 
@@ -148,7 +191,7 @@ class Topic(val router:LocalRouter, val 
       rc.kind = o.kind
       rc.label = o.label
       rc.enqueue_ts = o.enqueue_ts
-      add_counters(rc, o);
+      add_link_counters(rc, o);
       rc
     }
 
@@ -162,49 +205,63 @@ class Topic(val router:LocalRouter, val 
     }
     this.consumers.foreach { case (consumer, proxy) =>
       val o = copy(proxy.link)
-      consumers_links.put(consumer, o)
+      consumers_links.put(proxy.consumer, o)
       rc.consumers.add(o)
     }
 
     // Add in the counters from the live sessions..
     proxy_sessions.foreach{ session =>
       val stats = from_session(session)
-      for( link <- producer_links.get(session.producer.asInstanceOf[BindableDeliveryProducer]) ) {
-        add_counters(link, stats)
-      }
-      for( link <- consumers_links.get(session.consumer) ) {
-        add_counters(link, stats)
+      session match {
+        case session:ProxyProducerSession =>
+          for( link <- producer_links.get(session.producer.asInstanceOf[BindableDeliveryProducer]) ) {
+            add_link_counters(link, stats)
+          }
+        case session:ProxyConsumerSession =>
+          for( link <- consumers_links.get(session.consumer) ) {
+            add_link_counters(link, stats)
+          }
       }
     }
 
     // Now update the topic counters..
-    rc.metrics.enqueue_item_counter = enqueue_item_counter
-    rc.metrics.enqueue_size_counter = enqueue_size_counter
-    rc.metrics.enqueue_ts = enqueue_ts
-
-    rc.metrics.dequeue_item_counter = dequeue_item_counter
-    rc.metrics.dequeue_size_counter = dequeue_size_counter
-    rc.metrics.dequeue_ts = dequeue_ts
-    consumers_links.values.foreach { link =>
-      rc.metrics.enqueue_item_counter += link.enqueue_item_counter
-      rc.metrics.enqueue_size_counter += link.enqueue_size_counter
-      rc.metrics.enqueue_ts = rc.metrics.enqueue_ts max link.enqueue_ts
-    }
+    rc.metrics.current_time = now
+    DestinationMetricsSupport.add_destination_metrics(rc.metrics, topic_metrics)
     producer_links.values.foreach { link =>
-      rc.metrics.dequeue_item_counter += link.enqueue_item_counter
-      rc.metrics.dequeue_size_counter += link.enqueue_size_counter
-      rc.metrics.dequeue_ts = rc.metrics.dequeue_ts max link.enqueue_ts
+      add_enqueue_counters(rc.metrics, link)
     }
 
-    // Add in any queue metrics that the topic may own.
-    for(queue <- consumer_queues.values) {
-      val metrics = queue.get_queue_metrics
-      metrics.enqueue_item_counter = 0
-      metrics.enqueue_size_counter = 0
-      metrics.enqueue_ts = 0
-      DestinationMetricsSupport.add_destination_metrics(rc.metrics, metrics)
+    var futures = List[Future[(TopicStatusDTO)=>Unit]]()
+
+    consumers_links.foreach { case (consumer, link) =>
+      consumer match {
+        case queue:Queue =>
+          // aggregate the queue stats instead of the link stats.
+          val future = Future[(TopicStatusDTO)=>Unit]()
+          futures ::= future
+          queue.dispatch_queue {
+            val metrics = queue.get_queue_metrics
+            metrics.enqueue_item_counter = 0
+            metrics.enqueue_size_counter = 0
+            metrics.enqueue_ts = 0
+            metrics.producer_counter = 0
+            metrics.producer_count = 0
+            metrics.consumer_counter = 0
+            metrics.consumer_count = 0
+            future.set((rc)=>{
+              DestinationMetricsSupport.add_destination_metrics(rc.metrics, metrics)
+            })
+          }
+        case _ =>
+          // plain link, add it's ats.
+          add_dequeue_counters(rc.metrics, link)
+      }
+    }
+
+    Future.all(futures).onComplete{ data=>
+      data.foreach(_(rc))
+      on_complete(rc)
     }
-    rc
   }
 
 
@@ -299,9 +356,9 @@ class Topic(val router:LocalRouter, val 
         }
     }
 
-    val proxy = ProxyDeliveryConsumer(target, link)
+    val proxy = ProxyDeliveryConsumer(target, link, consumer)
     consumers.put(consumer, proxy)
-    consumer_counter += 1
+    topic_metrics.consumer_counter += 1
     val list = proxy :: Nil
     producers.keys.foreach({ r=>
       r.bind(list)
@@ -312,13 +369,27 @@ class Topic(val router:LocalRouter, val 
   def unbind (consumer:DeliveryConsumer, persistent:Boolean) = {
 
     for(proxy <- consumers.remove(consumer)) {
-      add_counters(Topic.this, proxy.link)
+      add_dequeue_counters(topic_metrics, proxy.link)
       val list = consumer_queues.remove(consumer) match {
         case Some(queue) =>
           queue.unbind(List(consumer))
           queue.binding match {
             case x:TempQueueBinding =>
-              router._destroy_queue(queue)
+              queue.dispatch_queue {
+                val metrics = queue.get_queue_metrics
+                router._destroy_queue(queue)
+                dispatch_queue {
+                  topic_metrics.dequeue_item_counter += metrics.dequeue_item_counter
+                  topic_metrics.dequeue_size_counter += metrics.dequeue_size_counter
+                  topic_metrics.dequeue_ts = topic_metrics.dequeue_ts max metrics.dequeue_ts
+                  topic_metrics.nack_item_counter += metrics.nack_item_counter
+                  topic_metrics.nack_size_counter += metrics.nack_size_counter
+                  topic_metrics.nack_ts  = topic_metrics.nack_ts max metrics.nack_ts
+                  topic_metrics.expired_item_counter += metrics.expired_item_counter
+                  topic_metrics.expired_size_counter += metrics.expired_size_counter
+                  topic_metrics.expired_ts  = topic_metrics.expired_ts max metrics.expired_ts
+                }
+              }
           }
           List(queue)
         case None =>
@@ -375,21 +446,21 @@ class Topic(val router:LocalRouter, val 
         link.label = "unknown"
     }
     producers.put(producer, link)
-    producer_counter += 1
-    producer.bind(consumers.values.toList ::: durable_subscriptions.toList)
+    topic_metrics.producer_counter += 1
+    producer.bind(producer_tracker::consumers.values.toList ::: durable_subscriptions.toList)
     check_idle
   }
 
   def disconnect (producer:BindableDeliveryProducer) = {
     for(link <- producers.remove(producer) ) {
-      add_counters(this, link)
+      add_enqueue_counters(topic_metrics, link)
     }
     check_idle
   }
 
   def disconnect_producers:Unit ={
     for( (_, link) <- producers ) {
-      add_counters(this, link)
+      add_enqueue_counters(topic_metrics, link)
     }
     producers.clear
     check_idle

Modified: activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala?rev=1162978&r1=1162977&r2=1162978&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala Mon Aug 29 20:17:25 2011
@@ -303,8 +303,13 @@ case class BrokerResource() extends Reso
   def get_topic_metrics(host:VirtualHost):FutureResult[AggregateDestMetricsDTO] = {
     val router:LocalRouter = host
     val topics: Iterable[Topic] = router.topic_domain.destinations
-    val metrics = topics.map(_.status.metrics)
-    FutureResult(Success(aggregate_dest_metrics(metrics)))
+
+    val metrics = Future.all {
+      topics.map { topics =>
+        topic_status(topics).map(_.map_success(_.metrics))
+      }
+    }
+    metrics.map( x=> Success(aggregate_dest_metrics(x.flatMap(_.success_option))) )
   }
 
   def get_dsub_metrics(broker:Broker):FutureResult[AggregateDestMetricsDTO] = {
@@ -498,7 +503,7 @@ case class BrokerResource() extends Reso
       val router: LocalRouter = host
       val records = Future.all {
         router.topic_domain.destination_by_id.values.map { value  =>
-          status(value)
+          topic_status(value)
         }
       }
       val rc:FutureResult[DataPageDTO] = records.map(narrow(classOf[TopicStatusDTO], _, f, q, p, ps, o))
@@ -511,7 +516,7 @@ case class BrokerResource() extends Reso
     with_virtual_host(id) { host =>
       val router:LocalRouter = host
       val node = router.topic_domain.destination_by_id.get(name).getOrElse(result(NOT_FOUND))
-      status(node)
+      topic_status(node)
     }
   }
 
@@ -522,7 +527,9 @@ case class BrokerResource() extends Reso
       val node = router.topic_domain.destination_by_id.get(name).getOrElse(result(NOT_FOUND))
       val queue =router.queues_by_store_id.get(qid).getOrElse(result(NOT_FOUND))
       monitoring(node) {
-        queue.status(entries)
+        sync(queue) {
+          queue.status(entries)
+        }
       }
     }
   }
@@ -630,8 +637,10 @@ case class BrokerResource() extends Reso
     }
   }
 
-  def status(node: Topic): FutureResult[TopicStatusDTO] = monitoring(node) {
-    node.status
+  def topic_status(node: Topic): FutureResult[TopicStatusDTO] = monitoring(node) {
+    val rc = FutureResult[TopicStatusDTO]()
+    node.status(x=>rc.set(Success(x)))
+    rc
   }
 
   def status(q:Queue, entries:Boolean=false) = monitoring(q) {

Modified: activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade?rev=1162978&r1=1162977&r2=1162978&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade (original)
+++ activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade Mon Aug 29 20:17:25 2011
@@ -23,11 +23,11 @@
   - else
     - "%,.2f %%".format(n.toFloat*100.0/d)
 
-
+- var vhost_path = ".."
 - binding match
   - case x:QueueDestinationDTO =>
     .breadcumbs
-      a(href={strip_resolve("..")+".html"}) Back
+      a(href={strip_resolve(vhost_path)+".html"}) Back
 
     h1 Queue #{id}
     p state: #{state} #{ uptime(state_since) } ago
@@ -36,7 +36,7 @@
         input(type="submit" value="delete")
   - case x:DurableSubscriptionDestinationDTO =>
     .breadcumbs
-      a(href={strip_resolve("..")+".html"}) Back
+      a(href={strip_resolve(vhost_path)+".html"}) Back
 
     h1 Durable Subscription on #{id}
     - if( x.selector != null )
@@ -47,8 +47,9 @@
         input(type="submit" value="delete")
 
   - case _ =>
+    - vhost_path = "../.."
     .breadcumbs
-      a(href={strip_resolve("../..")+".html"}) Back
+      a(href={strip_resolve(vhost_path)+".html"}) Back
     h1 Temporary Queue
     p state: #{state} #{ uptime(state_since) } ago
 
@@ -89,9 +90,9 @@ ul
     li.producer
       - x.kind match
         - case "queue" =>
-          a(href={ path("../../queues/"+x.id+".html") }) #{x.label}
+          a(href={ path(vhost_path+"/../queues/"+x.id+".html") }) #{x.label}
         - case "connection" =>
-          a(href={ path("../../../../connections/"+x.id+".html") }) #{x.label}
+          a(href={ path(vhost_path+"/../../../connections/"+x.id+".html") }) #{x.label}
         - case _ =>
       p dispatched: #{x.enqueue_item_counter} messages (#{memory(x.enqueue_size_counter)}), #{uptime(x.enqueue_ts)} ago
 
@@ -103,9 +104,9 @@ ul
     li.consumer
       - x.kind match
         - case "queue" =>
-          a(href={ path("../../queues/"+x.id+".html") }) #{x.label}
+          a(href={ path(vhost_path+"/../queues/"+x.id+".html") }) #{x.label}
         - case "connection" =>
-          a(href={ path("../../../../connections/"+x.id+".html") }) #{x.label}
+          a(href={ path(vhost_path+"/../../../connections/"+x.id+".html") }) #{x.label}
         - case _ =>
       p dispatched: #{x.enqueue_item_counter} messages (#{memory(x.enqueue_size_counter)}), #{uptime(x.enqueue_ts)} ago
       p next message seq: #{x.position}