You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/11/17 15:07:44 UTC
svn commit: r1036047 - in /activemq/activemq-apollo/trunk:
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
Author: chirino
Date: Wed Nov 17 14:07:43 2010
New Revision: 1036047
URL: http://svn.apache.org/viewvc?rev=1036047&view=rev
Log:
Simplifying the Router interface. instead of passing explicit continuation arguments, lets just use the scala compiler's continuations facility.
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=1036047&r1=1036046&r2=1036047&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala Wed Nov 17 14:07:43 2010
@@ -148,7 +148,7 @@ class Router(val host:VirtualHost) exten
/**
* Returns the previously created queue if it already existed.
*/
- def _create_queue(dto: BindingDTO): Some[Queue] = {
+ def _create_queue(dto: BindingDTO): Option[Queue] = {
val binding = BindingFactory.create(dto)
val queue = queues.get(binding) match {
case Some(queue) => Some(queue)
@@ -157,16 +157,16 @@ class Router(val host:VirtualHost) exten
queue
}
- def create_queue(dto:BindingDTO)(cb: (Option[Queue])=>Unit) = ^{
- cb(_create_queue(dto))
- } >>: dispatchQueue
+ def create_queue(dto:BindingDTO) = dispatchQueue ! {
+ _create_queue(dto)
+ }
/**
* Returns true if the queue no longer exists.
*/
- def destroy_queue(dto:BindingDTO)(cb: (Boolean)=>Unit) = ^{
+ def destroy_queue(dto:BindingDTO) = dispatchQueue ! {
val binding = BindingFactory.create(dto)
- val queue = queues.get(binding) match {
+ queues.get(binding) match {
case Some(queue) =>
val name = binding.destination
if( name!=null ) {
@@ -179,36 +179,35 @@ class Router(val host:VirtualHost) exten
case None =>
true
}
- cb(queue)
- } >>: dispatchQueue
+ }
/**
* Gets an existing queue.
*/
- def get_queue(dto:BindingDTO)(cb: (Option[Queue])=>Unit) = ^{
+ def get_queue(dto:BindingDTO) = dispatchQueue ! {
val binding = BindingFactory.create(dto)
- cb(queues.get(binding))
- } >>: dispatchQueue
+ queues.get(binding)
+ }
- def bind(destination:Destination, consumer:DeliveryConsumer, on_complete:Runnable = ^{} ) = retaining(consumer) {
+ def bind(destination:Destination, consumer:DeliveryConsumer) = {
+ consumer.retain
+ dispatchQueue ! {
- assert( is_topic(destination) )
+ assert( is_topic(destination) )
- val name = destination.getName
+ val name = destination.getName
- // make sure the destination is created if this is not a wild card sub
- if( !PathFilter.containsWildCards(name) ) {
- val node = create_destination_or(name) { node=> }
- }
-
- get_destination_matches(name).foreach( node=>
- node.add_broadcast_consumer(consumer)
- )
- broadcast_consumers.put(name, consumer)
+ // make sure the destination is created if this is not a wild card sub
+ if( !PathFilter.containsWildCards(name) ) {
+ val node = create_destination_or(name) { node=> }
+ }
- on_complete.run
-
- } >>: dispatchQueue
+ get_destination_matches(name).foreach( node=>
+ node.add_broadcast_consumer(consumer)
+ )
+ broadcast_consumers.put(name, consumer)
+ }
+ }
def unbind(destination:Destination, consumer:DeliveryConsumer) = releasing(consumer) {
assert( is_topic(destination) )
@@ -352,9 +351,9 @@ case class DeliveryProducerRoute(val rou
var targets = List[DeliverySession]()
- def connected() = ^{
+ def connected() = dispatchQueue {
on_connected
- } >>: dispatchQueue
+ }
def bind(targets:List[DeliveryConsumer]) = retaining(targets) {
internal_bind(targets)
@@ -386,13 +385,13 @@ case class DeliveryProducerRoute(val rou
}
} >>: dispatchQueue
- def disconnected() = ^ {
+ def disconnected() = dispatchQueue {
this.targets.foreach { x=>
debug("producer route detaching from conusmer.")
x.close
x.consumer.release
}
- } >>: dispatchQueue
+ }
protected def on_connected = {}
protected def on_disconnected = {}
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=1036047&r1=1036046&r2=1036047&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala Wed Nov 17 14:07:43 2010
@@ -378,7 +378,8 @@ class StompProtocolHandler extends Proto
if( consumer.binding==null ) {
host.router.unbind(consumer.destination, consumer)
} else {
- host.router.get_queue(consumer.binding) { queue=>
+ reset {
+ val queue = host.router.get_queue(consumer.binding)
queue.foreach( _.unbind(consumer::Nil) )
}
}
@@ -761,21 +762,22 @@ class StompProtocolHandler extends Proto
if( binding==null ) {
// consumer is bind bound as a topic
- host.router.bind(destination, consumer, ^{
+ reset {
+ host.router.bind(destination, consumer)
send_receipt(headers)
- })
- consumer.release
+ consumer.release
+ }
} else {
-
- // create a queue and bind the consumer to it.
- host.router.create_queue(binding) { x=>
+ reset {
+ // create a queue and bind the consumer to it.
+ val x= host.router.create_queue(binding)
x match {
case Some(queue:Queue) =>
queue.bind(consumer::Nil)
send_receipt(headers)
consumer.release
- case None => throw new RuntimeException("case not yet implemented.")
+ case None => async_die("case not yet implemented.")
}
}
}
@@ -808,12 +810,15 @@ class StompProtocolHandler extends Proto
host.router.unbind(consumer.destination, consumer)
send_receipt(headers)
} else {
- host.router.get_queue(consumer.binding) { queue=>
+
+ reset {
+ val queue = host.router.get_queue(consumer.binding)
queue.foreach( _.unbind(consumer::Nil) )
}
if( persistent && consumer.binding!=null ) {
- host.router.destroy_queue(consumer.binding){sucess=>
+ reset {
+ val sucess = host.router.destroy_queue(consumer.binding)
send_receipt(headers)
}
} else {
@@ -883,7 +888,7 @@ class StompProtocolHandler extends Proto
}
- def send_receipt(headers:HeaderMap) = {
+ def send_receipt(headers:HeaderMap):Unit = {
get(headers, RECEIPT_REQUESTED) match {
case Some(receipt)=>
dispatchQueue <<| ^{