You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/04/12 18:54:52 UTC
svn commit: r1325376 - in
/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker:
LocalRouter.scala Topic.scala
Author: chirino
Date: Thu Apr 12 16:54:51 2012
New Revision: 1325376
URL: http://svn.apache.org/viewvc?rev=1325376&view=rev
Log:
Fixes an NPE on shutdown. Might be related to APLO-189: UDP connector cannot be stopped, but I'm not 100% sure yet.
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala?rev=1325376&r1=1325375&r2=1325376&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala Thu Apr 12 16:54:51 2012
@@ -936,13 +936,13 @@ class LocalRouter(val virtual_host:Virtu
}
protected def _stop(on_completed: Task) = {
-// val tracker = new LoggingTracker("router shutdown", virtual_host.console_log, dispatch_queue)
queues_by_store_id.valuesIterator.foreach { queue=>
queue.stop(NOOP)
-// tracker.stop(queue)
}
-// tracker.callback(on_completed)
- on_completed.run
+ on_queues_destroyed_actions ::= ^{
+ on_completed.run
+ }
+ check_on_queues_destroyed_actions
}
@@ -1288,8 +1288,33 @@ class LocalRouter(val virtual_host:Virtu
None
}
+ var pending_queue_destroys = 0
+ var on_queues_destroyed_actions = List[Runnable]()
+
+ def on_queue_destroy_start = {
+ dispatch_queue.assertExecuting()
+ pending_queue_destroys += 1
+ }
+
+ def on_queue_destroy_end = {
+ dispatch_queue.assertExecuting()
+ assert(pending_queue_destroys > 0)
+ pending_queue_destroys -= 1
+ check_on_queues_destroyed_actions
+ }
+
+ def check_on_queues_destroyed_actions = {
+ if( pending_queue_destroys==0 && !on_queues_destroyed_actions.isEmpty) {
+ val actions = on_queues_destroyed_actions
+ on_queues_destroyed_actions = Nil
+ for( action <- actions ) {
+ action.run()
+ }
+ }
+ }
def _destroy_queue(queue: Queue) {
+ on_queue_destroy_start
queue.stop(^{
var metrics = queue.get_queue_metrics
dispatch_queue {
@@ -1310,10 +1335,14 @@ class LocalRouter(val virtual_host:Virtu
queues_by_store_id.remove(queue.store_id)
if (queue.tune_persistent) {
queue.dispatch_queue {
- virtual_host.store.remove_queue(queue.store_id) {
- x => Unit
+ virtual_host.store.remove_queue(queue.store_id) { x =>
+ dispatch_queue {
+ on_queue_destroy_end
+ }
}
}
+ } else {
+ on_queue_destroy_end
}
}
})
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala?rev=1325376&r1=1325375&r2=1325376&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala Thu Apr 12 16:54:51 2012
@@ -429,7 +429,9 @@ class Topic(val router:LocalRouter, val
case x:TempQueueBinding =>
queue.dispatch_queue {
val metrics = queue.get_queue_metrics
- router._destroy_queue(queue)
+ router.dispatch_queue {
+ router._destroy_queue(queue)
+ }
dispatch_queue {
topic_metrics.dequeue_item_counter += metrics.dequeue_item_counter
topic_metrics.dequeue_size_counter += metrics.dequeue_size_counter