You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/08/29 22:17:25 UTC
svn commit: r1162978 - in /activemq/activemq-apollo/trunk:
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/
apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/
apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo...
Author: chirino
Date: Mon Aug 29 20:17:25 2011
New Revision: 1162978
URL: http://svn.apache.org/viewvc?rev=1162978&view=rev
Log:
Topic metrics did not make sense when used slow consumer strategy = queue.
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala
activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1162978&r1=1162977&r2=1162978&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Mon Aug 29 20:17:25 2011
@@ -211,6 +211,7 @@ class Queue(val router: LocalRouter, val
var idled_at = 0L
def get_queue_metrics:DestMetricsDTO = {
+ dispatch_queue.assertExecuting()
val rc = new DestMetricsDTO
rc.enqueue_item_counter = this.enqueue_item_counter
@@ -580,7 +581,7 @@ class Queue(val router: LocalRouter, val
}
def swap_messages = {
-
+ dispatch_queue.assertExecuting()
now = System.currentTimeMillis()
var cur = entries.getHead
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala?rev=1162978&r1=1162977&r2=1162978&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala Mon Aug 29 20:17:25 2011
@@ -25,6 +25,7 @@ import org.fusesource.hawtdispatch._
import collection.mutable.{HashSet, HashMap, ListBuffer}
import java.lang.Long
import security.SecuredResource
+
/**
* <p>
* A logical messaging topic
@@ -34,33 +35,77 @@ import security.SecuredResource
*/
class Topic(val router:LocalRouter, val destination_dto:TopicDestinationDTO, var config_updater: ()=>TopicDTO, val id:String, path:Path) extends DomainDestination with SecuredResource {
- var enqueue_item_counter = 0L
- var enqueue_size_counter = 0L
- var enqueue_ts = now
-
- var dequeue_item_counter = 0L
- var dequeue_size_counter = 0L
- var dequeue_ts = now
+ val topic_metrics = new DestMetricsDTO
val resource_kind =SecuredResource.TopicKind
-
- var proxy_sessions = new HashSet[TopicDeliverySession]()
+ var proxy_sessions = new HashSet[DeliverySession]()
implicit def from_link(from:LinkDTO):(Long,Long,Long)=(from.enqueue_item_counter, from.enqueue_size_counter, from.enqueue_ts)
- implicit def from_session(from:TopicDeliverySession):(Long,Long,Long)=(from.enqueue_item_counter, from.enqueue_size_counter, from.enqueue_ts)
+ implicit def from_session(from:DeliverySession):(Long,Long,Long)=(from.enqueue_item_counter, from.enqueue_size_counter, from.enqueue_ts)
- def add_counters(to:LinkDTO, from:(Long,Long,Long)):Unit = {
+ def add_link_counters(to:LinkDTO, from:(Long,Long,Long)):Unit = {
to.enqueue_item_counter += from._1
to.enqueue_size_counter += from._2
to.enqueue_ts = to.enqueue_ts max from._3
}
- def add_counters(to:Topic, from:(Long,Long,Long)):Unit = {
+ def add_enqueue_counters(to:DestMetricsDTO, from:(Long,Long,Long)):Unit = {
to.enqueue_item_counter += from._1
to.enqueue_size_counter += from._2
to.enqueue_ts = to.enqueue_ts max from._3
}
+ def add_dequeue_counters(to:DestMetricsDTO, from:(Long,Long,Long)):Unit = {
+ to.dequeue_item_counter += from._1
+ to.dequeue_size_counter += from._2
+ to.dequeue_ts = to.enqueue_ts max from._3
+ }
+
+ val producer_tracker = new DeliveryConsumer {
+ def retained() = 0
+ def retain() {}
+ def release() {}
+
+ def matches(message: Delivery) = true
+ def is_persistent = false
+ def dispatch_queue = null
+ def connect(producer: DeliveryProducer) = ProxyProducerSession(producer)
+
+ }
+
+
+ case class ProxyProducerSession(val producer:DeliveryProducer) extends DeliverySession {
+
+ dispatch_queue {
+ proxy_sessions.add(this)
+ }
+
+ def remaining_capacity = 1
+ var enqueue_ts = 0L
+ var enqueue_size_counter = 0L
+ var enqueue_item_counter = 0L
+ var refiller:Runnable = null
+
+ def offer(value: Delivery) = {
+ enqueue_item_counter += 1
+ enqueue_size_counter += value.size
+ enqueue_ts = now
+ true
+ }
+
+ def close = {
+ dispatch_queue {
+ proxy_sessions.remove(this)
+ producers.get(producer.asInstanceOf[BindableDeliveryProducer]) match {
+ case Some(link) => add_link_counters(link, this)
+ case _ => add_enqueue_counters(topic_metrics, this)
+ }
+ }
+ }
+
+ def full = false
+ def consumer = producer_tracker
+ }
- case class TopicDeliverySession(session:DeliverySession) extends DeliverySession with SessionSinkFilter[Delivery] {
+ case class ProxyConsumerSession(proxy:ProxyDeliveryConsumer, session:DeliverySession) extends DeliverySession with SessionSinkFilter[Delivery] {
def downstream = session
dispatch_queue {
@@ -71,13 +116,14 @@ class Topic(val router:LocalRouter, val
session.close
dispatch_queue {
proxy_sessions.remove(this)
- consumers.get(session.consumer) match {
- case Some(proxy) => add_counters(proxy.link, this)
- case _ => add_counters(Topic.this, this)
- }
- producers.get(session.producer.asInstanceOf[BindableDeliveryProducer]) match {
- case Some(link) => add_counters(link, this)
- case _ => add_counters(Topic.this, this)
+ consumers.get(proxy.registered) match {
+ case Some(proxy) => add_link_counters(proxy.link, this)
+ case _ =>
+ proxy.consumer match {
+ case queue:Queue =>
+ case _ =>
+ add_dequeue_counters(topic_metrics, this)
+ }
}
}
}
@@ -88,7 +134,8 @@ class Topic(val router:LocalRouter, val
def offer(value: Delivery) = downstream.offer(value)
}
- case class ProxyDeliveryConsumer(consumer:DeliveryConsumer, link:LinkDTO) extends DeliveryConsumer {
+ case class ProxyDeliveryConsumer(consumer:DeliveryConsumer, link:LinkDTO, registered:DeliveryConsumer) extends DeliveryConsumer {
+
def retained() = consumer.retained()
def retain() = consumer.retain()
def release() = consumer.release()
@@ -96,7 +143,7 @@ class Topic(val router:LocalRouter, val
def is_persistent = consumer.is_persistent
def dispatch_queue = consumer.dispatch_queue
def connect(producer: DeliveryProducer) = {
- new TopicDeliverySession(consumer.connect(producer))
+ new ProxyConsumerSession(this, consumer.connect(producer))
}
}
@@ -107,8 +154,6 @@ class Topic(val router:LocalRouter, val
var idled_at = 0L
val created_at = now
var auto_delete_after = 0
- var producer_counter = 0L
- var consumer_counter = 0L
var config:TopicDTO = _
@@ -124,7 +169,7 @@ class Topic(val router:LocalRouter, val
def slow_consumer_policy = config.slow_consumer_policy.getOrElse("block")
- def status:TopicStatusDTO = {
+ def status(on_complete:(TopicStatusDTO)=>Unit) = {
dispatch_queue.assertExecuting()
val rc = new TopicStatusDTO
@@ -133,8 +178,6 @@ class Topic(val router:LocalRouter, val
rc.state_since = this.created_at
rc.config = this.config
- rc.metrics.producer_counter = producer_counter
- rc.metrics.consumer_counter = consumer_counter
rc.metrics.producer_count = producers.size
rc.metrics.consumer_count = consumers.size
@@ -148,7 +191,7 @@ class Topic(val router:LocalRouter, val
rc.kind = o.kind
rc.label = o.label
rc.enqueue_ts = o.enqueue_ts
- add_counters(rc, o);
+ add_link_counters(rc, o);
rc
}
@@ -162,49 +205,63 @@ class Topic(val router:LocalRouter, val
}
this.consumers.foreach { case (consumer, proxy) =>
val o = copy(proxy.link)
- consumers_links.put(consumer, o)
+ consumers_links.put(proxy.consumer, o)
rc.consumers.add(o)
}
// Add in the counters from the live sessions..
proxy_sessions.foreach{ session =>
val stats = from_session(session)
- for( link <- producer_links.get(session.producer.asInstanceOf[BindableDeliveryProducer]) ) {
- add_counters(link, stats)
- }
- for( link <- consumers_links.get(session.consumer) ) {
- add_counters(link, stats)
+ session match {
+ case session:ProxyProducerSession =>
+ for( link <- producer_links.get(session.producer.asInstanceOf[BindableDeliveryProducer]) ) {
+ add_link_counters(link, stats)
+ }
+ case session:ProxyConsumerSession =>
+ for( link <- consumers_links.get(session.consumer) ) {
+ add_link_counters(link, stats)
+ }
}
}
// Now update the topic counters..
- rc.metrics.enqueue_item_counter = enqueue_item_counter
- rc.metrics.enqueue_size_counter = enqueue_size_counter
- rc.metrics.enqueue_ts = enqueue_ts
-
- rc.metrics.dequeue_item_counter = dequeue_item_counter
- rc.metrics.dequeue_size_counter = dequeue_size_counter
- rc.metrics.dequeue_ts = dequeue_ts
- consumers_links.values.foreach { link =>
- rc.metrics.enqueue_item_counter += link.enqueue_item_counter
- rc.metrics.enqueue_size_counter += link.enqueue_size_counter
- rc.metrics.enqueue_ts = rc.metrics.enqueue_ts max link.enqueue_ts
- }
+ rc.metrics.current_time = now
+ DestinationMetricsSupport.add_destination_metrics(rc.metrics, topic_metrics)
producer_links.values.foreach { link =>
- rc.metrics.dequeue_item_counter += link.enqueue_item_counter
- rc.metrics.dequeue_size_counter += link.enqueue_size_counter
- rc.metrics.dequeue_ts = rc.metrics.dequeue_ts max link.enqueue_ts
+ add_enqueue_counters(rc.metrics, link)
}
- // Add in any queue metrics that the topic may own.
- for(queue <- consumer_queues.values) {
- val metrics = queue.get_queue_metrics
- metrics.enqueue_item_counter = 0
- metrics.enqueue_size_counter = 0
- metrics.enqueue_ts = 0
- DestinationMetricsSupport.add_destination_metrics(rc.metrics, metrics)
+ var futures = List[Future[(TopicStatusDTO)=>Unit]]()
+
+ consumers_links.foreach { case (consumer, link) =>
+ consumer match {
+ case queue:Queue =>
+ // aggregate the queue stats instead of the link stats.
+ val future = Future[(TopicStatusDTO)=>Unit]()
+ futures ::= future
+ queue.dispatch_queue {
+ val metrics = queue.get_queue_metrics
+ metrics.enqueue_item_counter = 0
+ metrics.enqueue_size_counter = 0
+ metrics.enqueue_ts = 0
+ metrics.producer_counter = 0
+ metrics.producer_count = 0
+ metrics.consumer_counter = 0
+ metrics.consumer_count = 0
+ future.set((rc)=>{
+ DestinationMetricsSupport.add_destination_metrics(rc.metrics, metrics)
+ })
+ }
+ case _ =>
+ // plain link, add it's ats.
+ add_dequeue_counters(rc.metrics, link)
+ }
+ }
+
+ Future.all(futures).onComplete{ data=>
+ data.foreach(_(rc))
+ on_complete(rc)
}
- rc
}
@@ -299,9 +356,9 @@ class Topic(val router:LocalRouter, val
}
}
- val proxy = ProxyDeliveryConsumer(target, link)
+ val proxy = ProxyDeliveryConsumer(target, link, consumer)
consumers.put(consumer, proxy)
- consumer_counter += 1
+ topic_metrics.consumer_counter += 1
val list = proxy :: Nil
producers.keys.foreach({ r=>
r.bind(list)
@@ -312,13 +369,27 @@ class Topic(val router:LocalRouter, val
def unbind (consumer:DeliveryConsumer, persistent:Boolean) = {
for(proxy <- consumers.remove(consumer)) {
- add_counters(Topic.this, proxy.link)
+ add_dequeue_counters(topic_metrics, proxy.link)
val list = consumer_queues.remove(consumer) match {
case Some(queue) =>
queue.unbind(List(consumer))
queue.binding match {
case x:TempQueueBinding =>
- router._destroy_queue(queue)
+ queue.dispatch_queue {
+ val metrics = queue.get_queue_metrics
+ router._destroy_queue(queue)
+ dispatch_queue {
+ topic_metrics.dequeue_item_counter += metrics.dequeue_item_counter
+ topic_metrics.dequeue_size_counter += metrics.dequeue_size_counter
+ topic_metrics.dequeue_ts = topic_metrics.dequeue_ts max metrics.dequeue_ts
+ topic_metrics.nack_item_counter += metrics.nack_item_counter
+ topic_metrics.nack_size_counter += metrics.nack_size_counter
+ topic_metrics.nack_ts = topic_metrics.nack_ts max metrics.nack_ts
+ topic_metrics.expired_item_counter += metrics.expired_item_counter
+ topic_metrics.expired_size_counter += metrics.expired_size_counter
+ topic_metrics.expired_ts = topic_metrics.expired_ts max metrics.expired_ts
+ }
+ }
}
List(queue)
case None =>
@@ -375,21 +446,21 @@ class Topic(val router:LocalRouter, val
link.label = "unknown"
}
producers.put(producer, link)
- producer_counter += 1
- producer.bind(consumers.values.toList ::: durable_subscriptions.toList)
+ topic_metrics.producer_counter += 1
+ producer.bind(producer_tracker::consumers.values.toList ::: durable_subscriptions.toList)
check_idle
}
def disconnect (producer:BindableDeliveryProducer) = {
for(link <- producers.remove(producer) ) {
- add_counters(this, link)
+ add_enqueue_counters(topic_metrics, link)
}
check_idle
}
def disconnect_producers:Unit ={
for( (_, link) <- producers ) {
- add_counters(this, link)
+ add_enqueue_counters(topic_metrics, link)
}
producers.clear
check_idle
Modified: activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala?rev=1162978&r1=1162977&r2=1162978&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala Mon Aug 29 20:17:25 2011
@@ -303,8 +303,13 @@ case class BrokerResource() extends Reso
def get_topic_metrics(host:VirtualHost):FutureResult[AggregateDestMetricsDTO] = {
val router:LocalRouter = host
val topics: Iterable[Topic] = router.topic_domain.destinations
- val metrics = topics.map(_.status.metrics)
- FutureResult(Success(aggregate_dest_metrics(metrics)))
+
+ val metrics = Future.all {
+ topics.map { topics =>
+ topic_status(topics).map(_.map_success(_.metrics))
+ }
+ }
+ metrics.map( x=> Success(aggregate_dest_metrics(x.flatMap(_.success_option))) )
}
def get_dsub_metrics(broker:Broker):FutureResult[AggregateDestMetricsDTO] = {
@@ -498,7 +503,7 @@ case class BrokerResource() extends Reso
val router: LocalRouter = host
val records = Future.all {
router.topic_domain.destination_by_id.values.map { value =>
- status(value)
+ topic_status(value)
}
}
val rc:FutureResult[DataPageDTO] = records.map(narrow(classOf[TopicStatusDTO], _, f, q, p, ps, o))
@@ -511,7 +516,7 @@ case class BrokerResource() extends Reso
with_virtual_host(id) { host =>
val router:LocalRouter = host
val node = router.topic_domain.destination_by_id.get(name).getOrElse(result(NOT_FOUND))
- status(node)
+ topic_status(node)
}
}
@@ -522,7 +527,9 @@ case class BrokerResource() extends Reso
val node = router.topic_domain.destination_by_id.get(name).getOrElse(result(NOT_FOUND))
val queue =router.queues_by_store_id.get(qid).getOrElse(result(NOT_FOUND))
monitoring(node) {
- queue.status(entries)
+ sync(queue) {
+ queue.status(entries)
+ }
}
}
}
@@ -630,8 +637,10 @@ case class BrokerResource() extends Reso
}
}
- def status(node: Topic): FutureResult[TopicStatusDTO] = monitoring(node) {
- node.status
+ def topic_status(node: Topic): FutureResult[TopicStatusDTO] = monitoring(node) {
+ val rc = FutureResult[TopicStatusDTO]()
+ node.status(x=>rc.set(Success(x)))
+ rc
}
def status(q:Queue, entries:Boolean=false) = monitoring(q) {
Modified: activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade?rev=1162978&r1=1162977&r2=1162978&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade (original)
+++ activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade Mon Aug 29 20:17:25 2011
@@ -23,11 +23,11 @@
- else
- "%,.2f %%".format(n.toFloat*100.0/d)
-
+- var vhost_path = ".."
- binding match
- case x:QueueDestinationDTO =>
.breadcumbs
- a(href={strip_resolve("..")+".html"}) Back
+ a(href={strip_resolve(vhost_path)+".html"}) Back
h1 Queue #{id}
p state: #{state} #{ uptime(state_since) } ago
@@ -36,7 +36,7 @@
input(type="submit" value="delete")
- case x:DurableSubscriptionDestinationDTO =>
.breadcumbs
- a(href={strip_resolve("..")+".html"}) Back
+ a(href={strip_resolve(vhost_path)+".html"}) Back
h1 Durable Subscription on #{id}
- if( x.selector != null )
@@ -47,8 +47,9 @@
input(type="submit" value="delete")
- case _ =>
+ - vhost_path = "../.."
.breadcumbs
- a(href={strip_resolve("../..")+".html"}) Back
+ a(href={strip_resolve(vhost_path)+".html"}) Back
h1 Temporary Queue
p state: #{state} #{ uptime(state_since) } ago
@@ -89,9 +90,9 @@ ul
li.producer
- x.kind match
- case "queue" =>
- a(href={ path("../../queues/"+x.id+".html") }) #{x.label}
+ a(href={ path(vhost_path+"/../queues/"+x.id+".html") }) #{x.label}
- case "connection" =>
- a(href={ path("../../../../connections/"+x.id+".html") }) #{x.label}
+ a(href={ path(vhost_path+"/../../../connections/"+x.id+".html") }) #{x.label}
- case _ =>
p dispatched: #{x.enqueue_item_counter} messages (#{memory(x.enqueue_size_counter)}), #{uptime(x.enqueue_ts)} ago
@@ -103,9 +104,9 @@ ul
li.consumer
- x.kind match
- case "queue" =>
- a(href={ path("../../queues/"+x.id+".html") }) #{x.label}
+ a(href={ path(vhost_path+"/../queues/"+x.id+".html") }) #{x.label}
- case "connection" =>
- a(href={ path("../../../../connections/"+x.id+".html") }) #{x.label}
+ a(href={ path(vhost_path+"/../../../connections/"+x.id+".html") }) #{x.label}
- case _ =>
p dispatched: #{x.enqueue_item_counter} messages (#{memory(x.enqueue_size_counter)}), #{uptime(x.enqueue_ts)} ago
p next message seq: #{x.position}