You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/02/22 18:34:40 UTC
svn commit: r1073428 - in
/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker:
LocalRouter.scala QueueBinding.scala Topic.scala
Author: chirino
Date: Tue Feb 22 17:34:40 2011
New Revision: 1073428
URL: http://svn.apache.org/viewvc?rev=1073428&view=rev
Log:
Fixes NPE that occurred when restoring a stored durable subscription on broker startup.
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueBinding.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala?rev=1073428&r1=1073427&r2=1073428&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala Tue Feb 22 17:34:40 2011
@@ -275,7 +275,7 @@ class LocalRouter(val host:VirtualHost)
destination match {
case destination:DurableSubscriptionDestinationDTO=>
// So the user can subscribe to the topic.. but can he create durable sub??
- val qc = ds_config(destination)
+ val qc = DurableSubscriptionQueueBinding.create(destination).config(host).asInstanceOf[DurableSubscriptionDTO]
if( !can_create_ds(qc, security) ) {
return Failure("Not authorized to create the durable subscription.")
}
@@ -286,10 +286,11 @@ class LocalRouter(val host:VirtualHost)
}
def get_or_create_durable_subscription(destination:DurableSubscriptionDestinationDTO):Queue = {
- durable_subscriptions_by_id.get( (destination.client_id, destination.subscription_id) ).getOrElse {
- val binding = QueueBinding.create(destination)
- val qc = ds_config(destination)
- _create_queue(-1, binding, qc)
+ val key = (destination.client_id, destination.subscription_id)
+ durable_subscriptions_by_id.get( key ).getOrElse {
+ val queue = _create_queue(QueueBinding.create(destination))
+ durable_subscriptions_by_id.put(key, queue)
+ queue
}
}
@@ -319,27 +320,6 @@ class LocalRouter(val host:VirtualHost)
}
}
- def ds_config(destination:DurableSubscriptionDestinationDTO):DurableSubscriptionDTO = {
- import collection.JavaConversions._
- import DestinationParser.default._
- import AsciiBuffer._
-
- val name = DestinationParser.decode_path(destination.name)
- def matches(x:DurableSubscriptionDTO):Boolean = {
- if( x.name != null && !parseFilter(ascii(x.name)).matches(name)) {
- return false
- }
- if( x.client_id != null && x.client_id!=x.client_id ) {
- return false
- }
- if( x.subscription_id != null && x.subscription_id!=x.subscription_id ) {
- return false
- }
- true
- }
- host.config.durable_subscriptions.find(matches _).getOrElse(new DurableSubscriptionDTO)
- }
-
def bind(queue:Queue) = {
val destination = queue.binding.binding_dto.asInstanceOf[DurableSubscriptionDestinationDTO]
@@ -362,6 +342,7 @@ class LocalRouter(val host:VirtualHost)
def unbind(queue:Queue) = {
val path = queue.binding.destination
durable_subscriptions_by_path.remove(path, queue)
+
}
def create_destination(path:Path, security:SecurityContext):Result[Topic,String] = {
@@ -386,19 +367,6 @@ class LocalRouter(val host:VirtualHost)
object queue_domain extends Domain[Queue] {
- def config(binding:QueueBinding):QueueDTO = {
- import collection.JavaConversions._
- import DestinationParser.default._
-
- def matches(x:QueueDTO):Boolean = {
- if( x.name != null && !parseFilter(ascii(x.name)).matches(binding.destination)) {
- return false
- }
- true
- }
- host.config.queues.find(matches _).getOrElse(new QueueDTO)
- }
-
def can_create_queue(config:QueueDTO, security:SecurityContext) = {
if( host.authorizer==null || security==null) {
true
@@ -411,11 +379,25 @@ class LocalRouter(val host:VirtualHost)
val path = queue.binding.destination
assert( !PathParser.containsWildCards(path) )
add_destination(path, queue)
+
+ import OptionSupport._
+ if( queue.config.unified.getOrElse(false) ) {
+ // hook up the queue to be a subscriber of the topic.
+ val topic = topic_domain.get_or_create_destination(path, null).success
+ topic.bind(null, queue)
+ }
}
def unbind(queue:Queue) = {
val path = queue.binding.destination
remove_destination(path, queue)
+
+ import OptionSupport._
+ if( queue.config.unified.getOrElse(false) ) {
+ // unhook the queue from the topic
+ val topic = topic_domain.get_or_create_destination(path, null).success
+ topic.unbind(queue, false)
+ }
}
def create_destination(path: Path, security: SecurityContext) = {
@@ -423,16 +405,9 @@ class LocalRouter(val host:VirtualHost)
dto.name = DestinationParser.encode_path(path)
val binding = QueueDomainQueueBinding.create(dto)
- val qc = config(binding)
- if( can_create_queue(qc, security) ) {
- val queue = _create_queue(-1, binding, qc)
- import OptionSupport._
- if( qc.unified.getOrElse(false) ) {
- // hook up the queue to be a subscriber of the topic.
- val topic = topic_domain.get_or_create_destination(path, null).success
- topic.bind(null, queue)
- }
- Success(queue)
+ val config = binding.config(host)
+ if( can_create_queue(config, security) ) {
+ Success(_create_queue(binding))
} else {
Failure("Not authorized to create the queue")
}
@@ -459,9 +434,14 @@ class LocalRouter(val host:VirtualHost)
host.store.get_queue(queue_key) { x =>
x match {
case Some(record)=>
- dispatch_queue {
- _create_queue(record.key, QueueBinding.create(record.binding_kind, record.binding_data), null)
- task.run
+ if( record.binding_kind == TempQueueBinding.TEMP_KIND ) {
+ // Drop temp queues on restart..
+ host.store.remove_queue(queue_key){x=> task.run}
+ } else {
+ dispatch_queue {
+ _create_queue(QueueBinding.create(record.binding_kind, record.binding_data), queue_key)
+ task.run
+ }
}
case _ => task.run
}
@@ -645,13 +625,16 @@ class LocalRouter(val host:VirtualHost)
queues_by_id.get(id)
}
- def _create_queue(id:Long, binding:QueueBinding, config:QueueDTO):Queue = {
+
+ def _create_queue(binding:QueueBinding, id:Long= -1):Queue = {
var qid = id
if( qid == -1 ) {
qid = host.queue_id_counter.incrementAndGet
}
+ val config = binding.config(host)
+
val queue = new Queue(this, qid, binding, config)
if( queue.tune_persistent && id == -1 ) {
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueBinding.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueBinding.scala?rev=1073428&r1=1073427&r2=1073428&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueBinding.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueBinding.scala Tue Feb 22 17:34:40 2011
@@ -90,6 +90,8 @@ trait QueueBinding {
def message_filter:BooleanExpression = ConstantExpression.TRUE
def destination:Path
+
+ def config(host:VirtualHost):QueueDTO
}
object QueueDomainQueueBinding extends QueueBinding.Provider {
@@ -149,6 +151,20 @@ class QueueDomainQueueBinding(val bindin
case _ => false
}
+
+ def config(host:VirtualHost):QueueDTO = {
+ import collection.JavaConversions._
+ import DestinationParser.default._
+
+ def matches(x:QueueDTO):Boolean = {
+ if( x.name != null && !parseFilter(ascii(x.name)).matches(destination)) {
+ return false
+ }
+ true
+ }
+ host.config.queues.find(matches _).getOrElse(new QueueDTO)
+ }
+
}
@@ -170,8 +186,8 @@ object DurableSubscriptionQueueBinding e
null
}
}
-}
+}
/**
* <p>
@@ -220,6 +236,27 @@ class DurableSubscriptionQueueBinding(va
SelectorParser.parse(binding_dto.filter)
}
}
+
+ def config(host:VirtualHost):DurableSubscriptionDTO = {
+ import collection.JavaConversions._
+ import DestinationParser.default._
+ import AsciiBuffer._
+
+ def matches(x:DurableSubscriptionDTO):Boolean = {
+ if( x.name != null && !parseFilter(ascii(x.name)).matches(destination)) {
+ return false
+ }
+ if( x.client_id != null && x.client_id!=x.client_id ) {
+ return false
+ }
+ if( x.subscription_id != null && x.subscription_id!=x.subscription_id ) {
+ return false
+ }
+ true
+ }
+ host.config.durable_subscriptions.find(matches _).getOrElse(new DurableSubscriptionDTO)
+ }
+
}
@@ -268,4 +305,5 @@ class TempQueueBinding(val key:AnyRef, v
case _ => false
}
+ def config(host: VirtualHost) = new QueueDTO
}
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala?rev=1073428&r1=1073427&r2=1073428&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala Tue Feb 22 17:34:40 2011
@@ -70,7 +70,7 @@ class Topic(val router:LocalRouter, val
case "queue" =>
// create a temp queue so that it can spool
- val queue = router._create_queue(-1, new TempQueueBinding(consumer), new QueueDTO)
+ val queue = router._create_queue(new TempQueueBinding(consumer))
queue.dispatch_queue.setTargetQueue(consumer.dispatch_queue)
queue.bind(List(consumer))