You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/01/12 17:15:51 UTC
svn commit: r1058218 - in
/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker:
Queue.scala Router.scala
Author: chirino
Date: Wed Jan 12 16:15:50 2011
New Revision: 1058218
URL: http://svn.apache.org/viewvc?rev=1058218&view=rev
Log:
release and retain directly so that it's easier to track in debug.
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1058218&r1=1058217&r2=1058218&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Wed Jan 12 16:15:50 2011
@@ -540,12 +540,15 @@ class Queue(val host: VirtualHost, var i
Success(Zilch)
}
- def bind(values: List[DeliveryConsumer]) = retaining(values) {
- for (consumer <- values) {
- val sub = new Subscription(this, consumer)
- sub.open
+ def bind(values: List[DeliveryConsumer]) = {
+ values.foreach(_.retain)
+ dispatch_queue {
+ for (consumer <- values) {
+ val sub = new Subscription(this, consumer)
+ sub.open
+ }
}
- } >>: dispatch_queue
+ }
def unbind(values: List[DeliveryConsumer]) = dispatch_queue {
for (consumer <- values) {
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=1058218&r1=1058217&r2=1058218&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala Wed Jan 12 16:15:50 2011
@@ -314,14 +314,15 @@ class Router(val host:VirtualHost) exten
}
}
- def unbind(destination:Destination, consumer:DeliveryConsumer) = releasing(consumer) {
+ def unbind(destination:Destination, consumer:DeliveryConsumer) = dispatchQueue {
assert( is_topic(destination) )
val name = destination.name
broadcast_consumers.remove(name, consumer)
get_destination_matches(name).foreach{ node=>
node.remove_broadcast_consumer(consumer)
}
- } >>: dispatchQueue
+ consumer.release
+ }
def connect(destination:Destination, producer:DeliveryProducer, security:SecurityContext)(completed: (Result[DeliveryProducerRoute,String])=>Unit) = {
@@ -386,7 +387,7 @@ class Router(val host:VirtualHost) exten
}
- def disconnect(route:DeliveryProducerRoute) = releasing(route) {
+ def disconnect(route:DeliveryProducerRoute) = dispatchQueue {
_get_destination(route.destination.name).foreach { node=>
val topic = is_topic(route.destination)
if( node.unified || topic ) {
@@ -394,8 +395,8 @@ class Router(val host:VirtualHost) exten
}
}
route.disconnected()
-
- } >>: dispatchQueue
+ route.release
+ }
}
@@ -522,9 +523,12 @@ case class DeliveryProducerRoute(val rou
on_connected
}
- def bind(targets:List[DeliveryConsumer]) = retaining(targets) {
- internal_bind(targets)
- } >>: dispatch_queue
+ def bind(targets:List[DeliveryConsumer]) = {
+ targets.foreach(_.retain)
+ dispatch_queue {
+ internal_bind(targets)
+ }
+ }
private def internal_bind(values:List[DeliveryConsumer]) = {
values.foreach{ x=>
@@ -535,7 +539,7 @@ case class DeliveryProducerRoute(val rou
}
}
- def unbind(targets:List[DeliveryConsumer]) = releasing(targets) {
+ def unbind(targets:List[DeliveryConsumer]) = dispatch_queue {
this.targets = this.targets.filterNot { x=>
val rc = targets.contains(x.consumer)
if( rc ) {
@@ -550,7 +554,8 @@ case class DeliveryProducerRoute(val rou
}
rc
}
- } >>: dispatch_queue
+ targets.foreach(_.release)
+ }
def disconnected() = dispatch_queue {
this.targets.foreach { x=>