You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/11/17 15:07:44 UTC

svn commit: r1036047 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala

Author: chirino
Date: Wed Nov 17 14:07:43 2010
New Revision: 1036047

URL: http://svn.apache.org/viewvc?rev=1036047&view=rev
Log:
Simplifying the Router interface. instead of passing explicit continuation arguments, lets just use the scala compiler's continuations facility.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=1036047&r1=1036046&r2=1036047&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala Wed Nov 17 14:07:43 2010
@@ -148,7 +148,7 @@ class Router(val host:VirtualHost) exten
   /**
    * Returns the previously created queue if it already existed.
    */
-  def _create_queue(dto: BindingDTO): Some[Queue] = {
+  def _create_queue(dto: BindingDTO): Option[Queue] = {
     val binding = BindingFactory.create(dto)
     val queue = queues.get(binding) match {
       case Some(queue) => Some(queue)
@@ -157,16 +157,16 @@ class Router(val host:VirtualHost) exten
     queue
   }
 
-  def create_queue(dto:BindingDTO)(cb: (Option[Queue])=>Unit) = ^{
-    cb(_create_queue(dto))
-  } >>: dispatchQueue
+  def create_queue(dto:BindingDTO) = dispatchQueue ! {
+    _create_queue(dto)
+  }
 
   /**
    * Returns true if the queue no longer exists.
    */
-  def destroy_queue(dto:BindingDTO)(cb: (Boolean)=>Unit) = ^{
+  def destroy_queue(dto:BindingDTO) = dispatchQueue ! {
     val binding = BindingFactory.create(dto)
-    val queue = queues.get(binding) match {
+    queues.get(binding) match {
       case Some(queue) =>
         val name = binding.destination
         if( name!=null ) {
@@ -179,36 +179,35 @@ class Router(val host:VirtualHost) exten
       case None =>
         true
     }
-    cb(queue)
-  } >>: dispatchQueue
+  }
 
   /**
    * Gets an existing queue.
    */
-  def get_queue(dto:BindingDTO)(cb: (Option[Queue])=>Unit) = ^{
+  def get_queue(dto:BindingDTO) = dispatchQueue ! {
     val binding = BindingFactory.create(dto)
-    cb(queues.get(binding))
-  } >>: dispatchQueue
+    queues.get(binding)
+  }
 
-  def bind(destination:Destination, consumer:DeliveryConsumer, on_complete:Runnable = ^{} ) = retaining(consumer) {
+  def bind(destination:Destination, consumer:DeliveryConsumer) = {
+    consumer.retain
+    dispatchQueue ! {
 
-    assert( is_topic(destination) )
+      assert( is_topic(destination) )
 
-    val name = destination.getName
+      val name = destination.getName
 
-    // make sure the destination is created if this is not a wild card sub
-    if( !PathFilter.containsWildCards(name) ) {
-      val node = create_destination_or(name) { node=> }
-    }
-
-    get_destination_matches(name).foreach( node=>
-      node.add_broadcast_consumer(consumer)
-    )
-    broadcast_consumers.put(name, consumer)
+      // make sure the destination is created if this is not a wild card sub
+      if( !PathFilter.containsWildCards(name) ) {
+        val node = create_destination_or(name) { node=> }
+      }
 
-    on_complete.run
-    
-  } >>: dispatchQueue
+      get_destination_matches(name).foreach( node=>
+        node.add_broadcast_consumer(consumer)
+      )
+      broadcast_consumers.put(name, consumer)
+    }
+  }
 
   def unbind(destination:Destination, consumer:DeliveryConsumer) = releasing(consumer) {
     assert( is_topic(destination) )
@@ -352,9 +351,9 @@ case class DeliveryProducerRoute(val rou
 
   var targets = List[DeliverySession]()
 
-  def connected() = ^{
+  def connected() = dispatchQueue {
     on_connected
-  } >>: dispatchQueue
+  }
 
   def bind(targets:List[DeliveryConsumer]) = retaining(targets) {
     internal_bind(targets)
@@ -386,13 +385,13 @@ case class DeliveryProducerRoute(val rou
     }
   } >>: dispatchQueue
 
-  def disconnected() = ^ {
+  def disconnected() = dispatchQueue {
     this.targets.foreach { x=>
       debug("producer route detaching from conusmer.")
       x.close
       x.consumer.release
     }    
-  } >>: dispatchQueue
+  }
 
   protected def on_connected = {}
   protected def on_disconnected = {}

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=1036047&r1=1036046&r2=1036047&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala Wed Nov 17 14:07:43 2010
@@ -378,7 +378,8 @@ class StompProtocolHandler extends Proto
           if( consumer.binding==null ) {
             host.router.unbind(consumer.destination, consumer)
           } else {
-            host.router.get_queue(consumer.binding) { queue=>
+            reset {
+              val queue = host.router.get_queue(consumer.binding)
               queue.foreach( _.unbind(consumer::Nil) )
             }
           }
@@ -761,21 +762,22 @@ class StompProtocolHandler extends Proto
     if( binding==null ) {
 
       // consumer is bind bound as a topic
-      host.router.bind(destination, consumer, ^{
+      reset {
+        host.router.bind(destination, consumer)
         send_receipt(headers)
-      })
-      consumer.release
+        consumer.release
+      }
 
     } else {
-
-      // create a queue and bind the consumer to it.
-      host.router.create_queue(binding) { x=>
+      reset {
+        // create a queue and bind the consumer to it.
+        val x= host.router.create_queue(binding)
         x match {
           case Some(queue:Queue) =>
             queue.bind(consumer::Nil)
             send_receipt(headers)
             consumer.release
-          case None => throw new RuntimeException("case not yet implemented.")
+          case None => async_die("case not yet implemented.")
         }
       }
     }
@@ -808,12 +810,15 @@ class StompProtocolHandler extends Proto
           host.router.unbind(consumer.destination, consumer)
           send_receipt(headers)
         } else {
-          host.router.get_queue(consumer.binding) { queue=>
+
+          reset {
+            val queue = host.router.get_queue(consumer.binding)
             queue.foreach( _.unbind(consumer::Nil) )
           }
 
           if( persistent && consumer.binding!=null ) {
-            host.router.destroy_queue(consumer.binding){sucess=>
+            reset {
+              val sucess = host.router.destroy_queue(consumer.binding)
               send_receipt(headers)
             }
           } else {
@@ -883,7 +888,7 @@ class StompProtocolHandler extends Proto
   }
 
 
-  def send_receipt(headers:HeaderMap) = {
+  def send_receipt(headers:HeaderMap):Unit = {
     get(headers, RECEIPT_REQUESTED) match {
       case Some(receipt)=>
         dispatchQueue <<| ^{