You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/01/12 17:15:51 UTC

svn commit: r1058218 - in /activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker: Queue.scala Router.scala

Author: chirino
Date: Wed Jan 12 16:15:50 2011
New Revision: 1058218

URL: http://svn.apache.org/viewvc?rev=1058218&view=rev
Log:
release and retain directly so that it's easier to track in debug.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1058218&r1=1058217&r2=1058218&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Wed Jan 12 16:15:50 2011
@@ -540,12 +540,15 @@ class Queue(val host: VirtualHost, var i
     Success(Zilch)
   }
 
-  def bind(values: List[DeliveryConsumer]) = retaining(values) {
-    for (consumer <- values) {
-      val sub = new Subscription(this, consumer)
-      sub.open
+  def bind(values: List[DeliveryConsumer]) = {
+    values.foreach(_.retain)
+    dispatch_queue {
+      for (consumer <- values) {
+        val sub = new Subscription(this, consumer)
+        sub.open
+      }
     }
-  } >>: dispatch_queue
+  }
 
   def unbind(values: List[DeliveryConsumer]) = dispatch_queue {
     for (consumer <- values) {

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=1058218&r1=1058217&r2=1058218&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala Wed Jan 12 16:15:50 2011
@@ -314,14 +314,15 @@ class Router(val host:VirtualHost) exten
     }
   }
 
-  def unbind(destination:Destination, consumer:DeliveryConsumer) = releasing(consumer) {
+  def unbind(destination:Destination, consumer:DeliveryConsumer) = dispatchQueue {
     assert( is_topic(destination) )
     val name = destination.name
     broadcast_consumers.remove(name, consumer)
     get_destination_matches(name).foreach{ node=>
       node.remove_broadcast_consumer(consumer)
     }
-  } >>: dispatchQueue
+    consumer.release
+  }
 
 
   def connect(destination:Destination, producer:DeliveryProducer, security:SecurityContext)(completed: (Result[DeliveryProducerRoute,String])=>Unit) = {
@@ -386,7 +387,7 @@ class Router(val host:VirtualHost) exten
 
   }
 
-  def disconnect(route:DeliveryProducerRoute) = releasing(route) {
+  def disconnect(route:DeliveryProducerRoute) = dispatchQueue {
     _get_destination(route.destination.name).foreach { node=>
       val topic = is_topic(route.destination)
       if( node.unified || topic ) {
@@ -394,8 +395,8 @@ class Router(val host:VirtualHost) exten
       }
     }
     route.disconnected()
-
-  } >>: dispatchQueue
+    route.release
+  }
 
 }
 
@@ -522,9 +523,12 @@ case class DeliveryProducerRoute(val rou
     on_connected
   }
 
-  def bind(targets:List[DeliveryConsumer]) = retaining(targets) {
-    internal_bind(targets)
-  } >>: dispatch_queue
+  def bind(targets:List[DeliveryConsumer]) = {
+    targets.foreach(_.retain)
+    dispatch_queue {
+      internal_bind(targets)
+    }
+  }
 
   private def internal_bind(values:List[DeliveryConsumer]) = {
     values.foreach{ x=>
@@ -535,7 +539,7 @@ case class DeliveryProducerRoute(val rou
     }
   }
 
-  def unbind(targets:List[DeliveryConsumer]) = releasing(targets) {
+  def unbind(targets:List[DeliveryConsumer]) = dispatch_queue {
     this.targets = this.targets.filterNot { x=>
       val rc = targets.contains(x.consumer)
       if( rc ) {
@@ -550,7 +554,8 @@ case class DeliveryProducerRoute(val rou
       }
       rc
     }
-  } >>: dispatch_queue
+    targets.foreach(_.release)
+  }
 
   def disconnected() = dispatch_queue {
     this.targets.foreach { x=>