You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/12/06 06:24:38 UTC

svn commit: r1210785 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/

Author: chirino
Date: Tue Dec  6 05:24:38 2011
New Revision: 1210785

URL: http://svn.apache.org/viewvc?rev=1210785&view=rev
Log:
Fixes APLO-94 : Store and aggregate queue or topic metrics when the destination gets deleted

Patch provided by Stan Lewis.  Thanks!

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
    activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1210785&r1=1210784&r2=1210785&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Tue Dec  6 05:24:38 2011
@@ -447,6 +447,11 @@ class Queue(val router: LocalRouter, val
     swapped_in_size_max -= tune_queue_buffer;
     trigger_swap
 
+    this.destination_dto match {
+      case t:TopicDestinationDTO =>
+      case _ =>
+        DestinationMetricsSupport.add_destination_metrics(virtual_host.dead_queue_metrics, get_queue_metrics)
+    }
     on_completed.run
   }
 

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala?rev=1210785&r1=1210784&r2=1210785&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala Tue Dec  6 05:24:38 2011
@@ -169,6 +169,12 @@ class Topic(val router:LocalRouter, val 
 
   def slow_consumer_policy = config.slow_consumer_policy.getOrElse("block")
 
+  def status: FutureResult[TopicStatusDTO] = {
+    val rc = FutureResult[TopicStatusDTO]()
+    status(x => rc.set(Success(x)))
+    rc
+  }
+
   def status(on_complete:(TopicStatusDTO)=>Unit) = {
     dispatch_queue.assertExecuting()
 
@@ -293,6 +299,7 @@ class Topic(val router:LocalRouter, val 
           dispatch_queue.after(auto_delete_after, TimeUnit.SECONDS) {
             if( previously_idle_at == idled_at ) {
               router.topic_domain.remove_destination(path, this)
+              DestinationMetricsSupport.add_destination_metrics(router.virtual_host.dead_topic_metrics, topic_metrics)
             }
           }
         }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=1210785&r1=1210784&r2=1210785&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala Tue Dec  6 05:24:38 2011
@@ -16,7 +16,6 @@
  */
 package org.apache.activemq.apollo.broker;
 
-import _root_.java.lang.String
 import _root_.scala.collection.JavaConversions._
 import org.fusesource.hawtdispatch._
 
@@ -26,6 +25,7 @@ import org.apache.activemq.apollo.dto._
 import security._
 import security.SecuredResource.VirtualHostKind
 import store._
+import java.lang.{Throwable, String}
 
 trait VirtualHostFactory {
   def create(broker:Broker, dto:VirtualHostDTO):VirtualHost
@@ -96,6 +96,10 @@ class VirtualHost(val broker: Broker, va
 
   val session_counter = new PersistentLongCounter("session_counter")
 
+  var dead_topic_metrics = new DestMetricsDTO
+  var dead_queue_metrics = new DestMetricsDTO
+  var dead_dsub_metrics = new DestMetricsDTO
+
   var authenticator:Authenticator = _
   var authorizer = Authorizer()
 
@@ -247,5 +251,60 @@ class VirtualHost(val broker: Broker, va
     })
   }
 
+  def local_router = router.asInstanceOf[LocalRouter]
+
+  def reset_metrics = {
+    dead_queue_metrics = new DestMetricsDTO
+    dead_topic_metrics = new DestMetricsDTO
+  }
+  
+  def aggregate_dest_metrics(metrics:Iterable[DestMetricsDTO]):AggregateDestMetricsDTO = {
+    metrics.foldLeft(new AggregateDestMetricsDTO) { (to, from) =>
+      DestinationMetricsSupport.add_destination_metrics(to, from)
+      from match {
+        case from:AggregateDestMetricsDTO =>
+          to.objects += from.objects
+        case _ =>
+          to.objects += 1
+      }
+      to
+    }
+  }
+
+  def get_topic_metrics:FutureResult[AggregateDestMetricsDTO] = {
+    val topics:Iterable[Topic] = local_router.topic_domain.destinations
+    val metrics: Future[Iterable[Result[DestMetricsDTO, Throwable]]] = Future.all {
+      topics.map(_.status.map(_.map_success(_.metrics)))
+    }
+    metrics.map( x => Success {
+      val rc = aggregate_dest_metrics(x.flatMap(_.success_option))
+      DestinationMetricsSupport.add_destination_metrics(rc, dead_topic_metrics)
+      rc
+    })
+  }
+  
+  def get_queue_metrics:FutureResult[AggregateDestMetricsDTO] = {
+    val queues:Iterable[Queue] = local_router.queue_domain.destinations
+    val metrics = sync_all (queues) { queue =>
+      queue.get_queue_metrics
+    }
+    metrics.map( x => Success {
+      val rc = aggregate_dest_metrics(x.flatMap(_.success_option))
+      DestinationMetricsSupport.add_destination_metrics(rc, dead_queue_metrics)
+      rc
+    })
+  }
+  
+  def get_dsub_metrics:FutureResult[AggregateDestMetricsDTO] = sync(this) {
+    val dsubs:Iterable[Queue] = local_router.topic_domain.durable_subscriptions_by_id.values
+    val metrics = sync_all (dsubs) { dsub =>
+      dsub.get_queue_metrics
+    }
+    metrics.map( x => Success {
+      val rc = aggregate_dest_metrics(x.flatMap(_.success_option))
+      DestinationMetricsSupport.add_destination_metrics(rc, dead_dsub_metrics)
+      rc
+    })
+  }
 
 }

Modified: activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala?rev=1210785&r1=1210784&r2=1210785&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala Tue Dec  6 05:24:38 2011
@@ -283,15 +283,7 @@ case class BrokerResource() extends Reso
     metrics.map( x=> Success(aggregate_dest_metrics(x.flatMap(_.success_option)) ))
   }
 
-  def get_queue_metrics(host:VirtualHost):FutureResult[AggregateDestMetricsDTO] = {
-    val router:LocalRouter = host
-    val queues: Iterable[Queue] = router.queue_domain.destinations
-    val metrics = sync_all(queues) { queue =>
-      queue.get_queue_metrics
-    }
-    metrics.map( x=> Success(aggregate_dest_metrics(x.flatMap(_.success_option))) )
-  }
-
+  def get_queue_metrics(host:VirtualHost):FutureResult[AggregateDestMetricsDTO] = host.get_queue_metrics
 
   def get_topic_metrics(broker:Broker):FutureResult[AggregateDestMetricsDTO] = {
     val metrics = sync_all(broker.virtual_hosts.values) { host =>
@@ -300,17 +292,7 @@ case class BrokerResource() extends Reso
     metrics.map( x=> Success(aggregate_dest_metrics(x.flatMap(_.success_option)) ))
   }
 
-  def get_topic_metrics(host:VirtualHost):FutureResult[AggregateDestMetricsDTO] = {
-    val router:LocalRouter = host
-    val topics: Iterable[Topic] = router.topic_domain.destinations
-
-    val metrics = Future.all {
-      topics.map { topics =>
-        topic_status(topics).map(_.map_success(_.metrics))
-      }
-    }
-    metrics.map( x=> Success(aggregate_dest_metrics(x.flatMap(_.success_option))) )
-  }
+  def get_topic_metrics(host:VirtualHost):FutureResult[AggregateDestMetricsDTO] = host.get_topic_metrics
 
   def get_dsub_metrics(broker:Broker):FutureResult[AggregateDestMetricsDTO] = {
     val metrics = sync_all(broker.virtual_hosts.values) { host =>
@@ -319,14 +301,7 @@ case class BrokerResource() extends Reso
     metrics.map( x=> Success(aggregate_dest_metrics(x.flatMap(_.success_option)) ))
   }
 
-  def get_dsub_metrics(host:VirtualHost):FutureResult[AggregateDestMetricsDTO] = {
-    val router:LocalRouter = host
-    val dsubs: Iterable[Queue] = router.topic_domain.durable_subscriptions_by_id.values
-    val metrics = sync_all(dsubs) { dsub =>
-      dsub.get_queue_metrics
-    }
-    metrics.map( x=> Success(aggregate_dest_metrics(x.flatMap(_.success_option))) )
-  }
+  def get_dsub_metrics(host:VirtualHost):FutureResult[AggregateDestMetricsDTO] = host.get_dsub_metrics
 
 
   @GET @Path("virtual-hosts")
@@ -503,7 +478,9 @@ case class BrokerResource() extends Reso
       val router: LocalRouter = host
       val records = Future.all {
         router.topic_domain.destination_by_id.values.map { value  =>
-          topic_status(value)
+          monitoring(value) {
+            value.status
+          }
         }
       }
       val rc:FutureResult[DataPageDTO] = records.map(narrow(classOf[TopicStatusDTO], _, f, q, p, ps, o))
@@ -516,7 +493,9 @@ case class BrokerResource() extends Reso
     with_virtual_host(id) { host =>
       val router:LocalRouter = host
       val node = router.topic_domain.destination_by_id.get(name).getOrElse(result(NOT_FOUND))
-      topic_status(node)
+      monitoring(node) {
+        node.status
+      }
     }
   }
 
@@ -637,12 +616,6 @@ case class BrokerResource() extends Reso
     }
   }
 
-  def topic_status(node: Topic): FutureResult[TopicStatusDTO] = monitoring(node) {
-    val rc = FutureResult[TopicStatusDTO]()
-    node.status(x=>rc.set(Success(x)))
-    rc
-  }
-
   def status(q:Queue, entries:Boolean=false) = monitoring(q) {
     q.status(entries)
   }