You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/02/22 18:34:40 UTC

svn commit: r1073428 - in /activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker: LocalRouter.scala QueueBinding.scala Topic.scala

Author: chirino
Date: Tue Feb 22 17:34:40 2011
New Revision: 1073428

URL: http://svn.apache.org/viewvc?rev=1073428&view=rev
Log:
Fixes NPE that occurred when restoring a stored durable subscription on broker startup.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueBinding.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala?rev=1073428&r1=1073427&r2=1073428&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala Tue Feb 22 17:34:40 2011
@@ -275,7 +275,7 @@ class LocalRouter(val host:VirtualHost) 
         destination match {
           case destination:DurableSubscriptionDestinationDTO=>
             // So the user can subscribe to the topic.. but can he create durable sub??
-            val qc = ds_config(destination)
+            val qc = DurableSubscriptionQueueBinding.create(destination).config(host).asInstanceOf[DurableSubscriptionDTO]
             if( !can_create_ds(qc, security) ) {
                return Failure("Not authorized to create the durable subscription.")
             }
@@ -286,10 +286,11 @@ class LocalRouter(val host:VirtualHost) 
     }
 
     def get_or_create_durable_subscription(destination:DurableSubscriptionDestinationDTO):Queue = {
-      durable_subscriptions_by_id.get( (destination.client_id, destination.subscription_id) ).getOrElse {
-        val binding = QueueBinding.create(destination)
-        val qc = ds_config(destination)
-        _create_queue(-1, binding, qc)
+      val key = (destination.client_id, destination.subscription_id)
+      durable_subscriptions_by_id.get( key ).getOrElse {
+        val queue = _create_queue(QueueBinding.create(destination))
+        durable_subscriptions_by_id.put(key, queue)
+        queue
       }
     }
 
@@ -319,27 +320,6 @@ class LocalRouter(val host:VirtualHost) 
       }
     }
 
-    def ds_config(destination:DurableSubscriptionDestinationDTO):DurableSubscriptionDTO = {
-      import collection.JavaConversions._
-      import DestinationParser.default._
-      import AsciiBuffer._
-
-      val name = DestinationParser.decode_path(destination.name)
-      def matches(x:DurableSubscriptionDTO):Boolean = {
-        if( x.name != null && !parseFilter(ascii(x.name)).matches(name)) {
-          return false
-        }
-        if( x.client_id != null && x.client_id!=x.client_id ) {
-          return false
-        }
-        if( x.subscription_id != null && x.subscription_id!=x.subscription_id ) {
-          return false
-        }
-        true
-      }
-      host.config.durable_subscriptions.find(matches _).getOrElse(new DurableSubscriptionDTO)
-    }
-
     def bind(queue:Queue) = {
 
       val destination = queue.binding.binding_dto.asInstanceOf[DurableSubscriptionDestinationDTO]
@@ -362,6 +342,7 @@ class LocalRouter(val host:VirtualHost) 
     def unbind(queue:Queue) = {
       val path = queue.binding.destination
       durable_subscriptions_by_path.remove(path, queue)
+
     }
 
     def create_destination(path:Path, security:SecurityContext):Result[Topic,String] = {
@@ -386,19 +367,6 @@ class LocalRouter(val host:VirtualHost) 
 
   object queue_domain extends Domain[Queue] {
 
-    def config(binding:QueueBinding):QueueDTO = {
-      import collection.JavaConversions._
-      import DestinationParser.default._
-
-      def matches(x:QueueDTO):Boolean = {
-        if( x.name != null && !parseFilter(ascii(x.name)).matches(binding.destination)) {
-          return false
-        }
-        true
-      }
-      host.config.queues.find(matches _).getOrElse(new QueueDTO)
-    }
-
     def can_create_queue(config:QueueDTO, security:SecurityContext) = {
       if( host.authorizer==null || security==null) {
         true
@@ -411,11 +379,25 @@ class LocalRouter(val host:VirtualHost) 
       val path = queue.binding.destination
       assert( !PathParser.containsWildCards(path) )
       add_destination(path, queue)
+
+      import OptionSupport._
+      if( queue.config.unified.getOrElse(false) ) {
+        // hook up the queue to be a subscriber of the topic.
+        val topic = topic_domain.get_or_create_destination(path, null).success
+        topic.bind(null, queue)
+      }
     }
 
     def unbind(queue:Queue) = {
       val path = queue.binding.destination
       remove_destination(path, queue)
+
+      import OptionSupport._
+      if( queue.config.unified.getOrElse(false) ) {
+        // unhook the queue from the topic
+        val topic = topic_domain.get_or_create_destination(path, null).success
+        topic.unbind(queue, false)
+      }
     }
 
     def create_destination(path: Path, security: SecurityContext) = {
@@ -423,16 +405,9 @@ class LocalRouter(val host:VirtualHost) 
       dto.name = DestinationParser.encode_path(path)
 
       val binding = QueueDomainQueueBinding.create(dto)
-      val qc = config(binding)
-      if( can_create_queue(qc, security) ) {
-        val queue = _create_queue(-1, binding, qc)
-        import OptionSupport._
-        if( qc.unified.getOrElse(false) ) {
-          // hook up the queue to be a subscriber of the topic.
-          val topic = topic_domain.get_or_create_destination(path, null).success
-          topic.bind(null, queue)
-        }
-        Success(queue)
+      val config = binding.config(host)
+      if( can_create_queue(config, security) ) {
+        Success(_create_queue(binding))
       } else {
         Failure("Not authorized to create the queue")
       }
@@ -459,9 +434,14 @@ class LocalRouter(val host:VirtualHost) 
             host.store.get_queue(queue_key) { x =>
               x match {
                 case Some(record)=>
-                  dispatch_queue {
-                    _create_queue(record.key, QueueBinding.create(record.binding_kind, record.binding_data), null)
-                    task.run
+                  if( record.binding_kind == TempQueueBinding.TEMP_KIND ) {
+                    // Drop temp queues on restart..
+                    host.store.remove_queue(queue_key){x=> task.run}
+                  } else {
+                    dispatch_queue {
+                      _create_queue(QueueBinding.create(record.binding_kind, record.binding_data), queue_key)
+                      task.run
+                    }
                   }
                 case _ => task.run
               }
@@ -645,13 +625,16 @@ class LocalRouter(val host:VirtualHost) 
     queues_by_id.get(id)
   }
 
-  def _create_queue(id:Long, binding:QueueBinding, config:QueueDTO):Queue = {
+
+  def _create_queue(binding:QueueBinding, id:Long= -1):Queue = {
 
     var qid = id
     if( qid == -1 ) {
       qid = host.queue_id_counter.incrementAndGet
     }
 
+    val config = binding.config(host)
+
     val queue = new Queue(this, qid, binding, config)
     if( queue.tune_persistent && id == -1 ) {
 

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueBinding.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueBinding.scala?rev=1073428&r1=1073427&r2=1073428&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueBinding.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueBinding.scala Tue Feb 22 17:34:40 2011
@@ -90,6 +90,8 @@ trait QueueBinding {
   def message_filter:BooleanExpression = ConstantExpression.TRUE
 
   def destination:Path
+
+  def config(host:VirtualHost):QueueDTO
 }
 
 object QueueDomainQueueBinding extends QueueBinding.Provider {
@@ -149,6 +151,20 @@ class QueueDomainQueueBinding(val bindin
     case _ => false
   }
 
+
+  def config(host:VirtualHost):QueueDTO = {
+    import collection.JavaConversions._
+    import DestinationParser.default._
+
+    def matches(x:QueueDTO):Boolean = {
+      if( x.name != null && !parseFilter(ascii(x.name)).matches(destination)) {
+        return false
+      }
+      true
+    }
+    host.config.queues.find(matches _).getOrElse(new QueueDTO)
+  }
+
 }
 
 
@@ -170,8 +186,8 @@ object DurableSubscriptionQueueBinding e
       null
     }
   }
-}
 
+}
 
 /**
  * <p>
@@ -220,6 +236,27 @@ class DurableSubscriptionQueueBinding(va
       SelectorParser.parse(binding_dto.filter)
     }
   }
+
+  def config(host:VirtualHost):DurableSubscriptionDTO = {
+      import collection.JavaConversions._
+      import DestinationParser.default._
+      import AsciiBuffer._
+
+      def matches(x:DurableSubscriptionDTO):Boolean = {
+        if( x.name != null && !parseFilter(ascii(x.name)).matches(destination)) {
+          return false
+        }
+        if( x.client_id != null && x.client_id!=x.client_id ) {
+          return false
+        }
+        if( x.subscription_id != null && x.subscription_id!=x.subscription_id ) {
+          return false
+        }
+        true
+      }
+      host.config.durable_subscriptions.find(matches _).getOrElse(new DurableSubscriptionDTO)
+  }
+
 }
 
 
@@ -268,4 +305,5 @@ class TempQueueBinding(val key:AnyRef, v
     case _ => false
   }
 
+  def config(host: VirtualHost) = new QueueDTO
 }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala?rev=1073428&r1=1073427&r2=1073428&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala Tue Feb 22 17:34:40 2011
@@ -70,7 +70,7 @@ class Topic(val router:LocalRouter, val 
           case "queue" =>
 
             // create a temp queue so that it can spool
-            val queue = router._create_queue(-1, new TempQueueBinding(consumer), new QueueDTO)
+            val queue = router._create_queue(new TempQueueBinding(consumer))
             queue.dispatch_queue.setTargetQueue(consumer.dispatch_queue)
             queue.bind(List(consumer))