You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/04/12 18:54:52 UTC

svn commit: r1325376 - in /activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker: LocalRouter.scala Topic.scala

Author: chirino
Date: Thu Apr 12 16:54:51 2012
New Revision: 1325376

URL: http://svn.apache.org/viewvc?rev=1325376&view=rev
Log:
Fixes an NPE on shutdown.  Might be related to APLO-189: UDP connector cannot be stopped, but I'm not 100% sure yet.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala?rev=1325376&r1=1325375&r2=1325376&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala Thu Apr 12 16:54:51 2012
@@ -936,13 +936,13 @@ class LocalRouter(val virtual_host:Virtu
   }
 
   protected def _stop(on_completed: Task) = {
-//    val tracker = new LoggingTracker("router shutdown", virtual_host.console_log, dispatch_queue)
     queues_by_store_id.valuesIterator.foreach { queue=>
       queue.stop(NOOP)
-//      tracker.stop(queue)
     }
-//    tracker.callback(on_completed)
-    on_completed.run
+    on_queues_destroyed_actions ::= ^{
+      on_completed.run
+    }
+    check_on_queues_destroyed_actions
   }
 
 
@@ -1288,8 +1288,33 @@ class LocalRouter(val virtual_host:Virtu
     None
   }
 
+  var pending_queue_destroys = 0
+  var on_queues_destroyed_actions = List[Runnable]()
+
+  def on_queue_destroy_start = {
+    dispatch_queue.assertExecuting()
+    pending_queue_destroys += 1
+  }
+
+  def on_queue_destroy_end = {
+    dispatch_queue.assertExecuting()
+    assert(pending_queue_destroys > 0)
+    pending_queue_destroys -= 1
+    check_on_queues_destroyed_actions
+  }
+
+  def check_on_queues_destroyed_actions = {
+    if( pending_queue_destroys==0 && !on_queues_destroyed_actions.isEmpty) {
+      val actions = on_queues_destroyed_actions
+      on_queues_destroyed_actions = Nil
+      for( action <- actions ) {
+        action.run()
+      }
+    }
+  }
 
   def _destroy_queue(queue: Queue) {
+    on_queue_destroy_start
     queue.stop(^{
       var metrics = queue.get_queue_metrics
       dispatch_queue {
@@ -1310,10 +1335,14 @@ class LocalRouter(val virtual_host:Virtu
         queues_by_store_id.remove(queue.store_id)
         if (queue.tune_persistent) {
           queue.dispatch_queue {
-            virtual_host.store.remove_queue(queue.store_id) {
-              x => Unit
+            virtual_host.store.remove_queue(queue.store_id) { x =>
+              dispatch_queue {
+                on_queue_destroy_end
+              }
             }
           }
+        } else {
+          on_queue_destroy_end
         }
       }
     })

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala?rev=1325376&r1=1325375&r2=1325376&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala Thu Apr 12 16:54:51 2012
@@ -429,7 +429,9 @@ class Topic(val router:LocalRouter, val 
             case x:TempQueueBinding =>
               queue.dispatch_queue {
                 val metrics = queue.get_queue_metrics
-                router._destroy_queue(queue)
+                router.dispatch_queue {
+                  router._destroy_queue(queue)
+                }
                 dispatch_queue {
                   topic_metrics.dequeue_item_counter += metrics.dequeue_item_counter
                   topic_metrics.dequeue_size_counter += metrics.dequeue_size_counter