You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/11/02 18:25:14 UTC
svn commit: r1030133 - in /activemq/activemq-apollo/trunk/apollo-stomp/src:
main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
Author: chirino
Date: Tue Nov 2 17:25:08 2010
New Revision: 1030133
URL: http://svn.apache.org/viewvc?rev=1030133&view=rev
Log:
added initial pass at the UNSUBSCRIBE implementation, simplified how durable subs are denoted.
Modified:
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=1030133&r1=1030132&r2=1030133&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala Tue Nov 2 17:25:08 2010
@@ -441,6 +441,8 @@ class StompProtocolHandler extends Proto
on_stomp_abort(frame.headers)
case SUBSCRIBE =>
on_stomp_subscribe(frame.headers)
+ case UNSUBSCRIBE =>
+ on_stomp_unsubscribe(frame.headers)
case DISCONNECT =>
connection.stop
@@ -687,121 +689,168 @@ class StompProtocolHandler extends Proto
def on_stomp_subscribe(headers:HeaderMap):Unit = {
val receipt = get(headers, RECEIPT_REQUESTED)
- get(headers, DESTINATION) match {
- case Some(dest)=>
- val destination:Destination = dest
- val subscription_id = get(headers, ID)
- var id:AsciiBuffer = subscription_id match {
- case None =>
- if( protocol_version eq V1_0 ) {
- // in 1.0 it's ok if the client does not send us the
- // the id header
- dest
- } else {
- die("The id header is missing from the SUBSCRIBE frame");
- return
- }
+ val dest = get(headers, DESTINATION) match {
+ case Some(dest)=> dest
+ case None=>
+ die("destination not set.")
+ return
+ }
+ val destination:Destination = dest
- case Some(x:AsciiBuffer)=> x
+ val subscription_id = get(headers, ID)
+ var id:AsciiBuffer = subscription_id match {
+ case None =>
+ if( protocol_version eq V1_0 ) {
+ // in 1.0 it's ok if the client does not send us the
+ // the id header
+ dest
+ } else {
+ die("The id header is missing from the SUBSCRIBE frame");
+ return
}
+ case Some(x:AsciiBuffer)=> x
+ }
- val topic = destination.getDomain == Router.TOPIC_DOMAIN
+ val topic = destination.getDomain == Router.TOPIC_DOMAIN
+ var persistent = get(headers, PERSISTENT).map( _ == TRUE ).getOrElse(false)
- var durable_name = if( topic && id.startsWith(DURABLE_PREFIX) ) {
- id
- } else {
- null
+ val ack = get(headers, ACK_MODE) match {
+ case None=> new AutoAckHandler
+ case Some(x)=> x match {
+ case ACK_MODE_AUTO=>new AutoAckHandler
+ case ACK_MODE_NONE=>new AutoAckHandler
+ case ACK_MODE_CLIENT=> new SessionAckHandler
+ case ACK_MODE_SESSION=> new SessionAckHandler
+ case ACK_MODE_MESSAGE=> new MessageAckHandler
+ case ack:AsciiBuffer =>
+ die("Unsuported ack mode: "+ack);
+ return;
+ }
+ }
+
+ val selector = get(headers, SELECTOR) match {
+ case None=> null
+ case Some(x)=> x
+ try {
+ (x, SelectorParser.parse(x.utf8.toString))
+ } catch {
+ case e:FilterException =>
+ die("Invalid selector expression: "+e.getMessage)
+ return;
}
+ }
- val ack = get(headers, ACK_MODE) match {
- case None=> new AutoAckHandler
- case Some(x)=> x match {
- case ACK_MODE_AUTO=>new AutoAckHandler
- case ACK_MODE_NONE=>new AutoAckHandler
- case ACK_MODE_CLIENT=> new SessionAckHandler
- case ACK_MODE_SESSION=> new SessionAckHandler
- case ACK_MODE_MESSAGE=> new MessageAckHandler
- case ack:AsciiBuffer => die("Unsuported ack mode: "+ack); null
- }
+ if ( consumers.contains(id) ) {
+ die("A subscription with identified with '"+id+"' allready exists")
+ return;
+ }
+
+ info("subscribing to: %s", destination)
+ val binding: BindingDTO = if( topic && !persistent ) {
+ null
+ } else {
+ // Controls how the created queue gets bound
+ // to the destination name space (this is used to
+ // recover the queue on restart and rebind it the
+ // way again)
+ if (topic) {
+ val rc = new DurableSubscriptionBindingDTO
+ rc.destination = destination.getName.toString
+ // TODO:
+ // rc.client_id =
+ rc.subscription_id = if( persistent ) id else null
+ rc.filter = if (selector == null) null else selector._1
+ rc
+ } else {
+ val rc = new PointToPointBindingDTO
+ rc.destination = destination.getName.toString
+ rc
+ }
+ }
+
+ val consumer = new StompConsumer(subscription_id, destination, ack, selector, binding);
+ consumers += (id -> consumer)
+
+ if( binding==null ) {
+
+ // consumer is bind bound as a topic
+ host.router.bind(destination, consumer, ^{
+ receipt.foreach{ receipt =>
+ connection_sink.offer(StompFrame(RECEIPT, List((RECEIPT_ID, receipt))))
}
+ })
+ consumer.release
+
+ } else {
- val selector = get(headers, SELECTOR) match {
- case None=> null
- case Some(x)=> x
- try {
- (x, SelectorParser.parse(x.utf8.toString))
- } catch {
- case e:FilterException =>
- die("Invalid selector expression: "+e.getMessage)
- null
+ // create a queue and bind the consumer to it.
+ host.router.create_queue(binding) { x=>
+ x match {
+ case Some(queue:Queue) =>
+ queue.bind(consumer::Nil)
+ receipt.foreach{ receipt =>
+ connection_sink.offer(StompFrame(RECEIPT, List((RECEIPT_ID, receipt))))
}
+ consumer.release
+ case None => throw new RuntimeException("case not yet implemented.")
}
+ }
+ }
+ }
- consumers.get(id) match {
- case None=>
- info("subscribing to: %s", destination)
+ def on_stomp_unsubscribe(headers:HeaderMap):Unit = {
- val binding: BindingDTO = if( topic && durable_name==null ) {
- null
- } else {
- // Controls how the created queue gets bound
- // to the destination name space (this is used to
- // recover the queue on restart and rebind it the
- // way again)
- if (topic) {
- val rc = new DurableSubscriptionBindingDTO
- rc.destination = destination.getName.toString
- // TODO:
- // rc.client_id =
- rc.subscription_id = durable_name
- rc.filter = if (selector == null) null else selector._1
- rc
- } else {
- val rc = new PointToPointBindingDTO
- rc.destination = destination.getName.toString
- rc
- }
- }
+ val receipt = get(headers, RECEIPT_REQUESTED)
+ var persistent = get(headers, PERSISTENT).map( _ == TRUE ).getOrElse(false)
- val consumer = new StompConsumer(subscription_id, destination, ack, selector, binding);
- consumers += (id -> consumer)
+ val id = get(headers, ID).getOrElse {
+ if( protocol_version eq V1_0 ) {
+ // in 1.0 it's ok if the client does not send us the
+ // the id header, the destination header must be set
+ get(headers, DESTINATION) match {
+ case Some(dest)=> dest
+ case None=>
+ die("destination not set.")
+ return
+ }
+ } else {
+ die("The id header is missing from the UNSUBSCRIBE frame");
+ return
+ }
+ }
- if( binding==null ) {
+ consumers.get(id) match {
+ case None=>
+ die("The subscription '%s' not found.".format(id))
+ return;
+ case Some(consumer)=>
+ // consumer.close
+ if( consumer.binding==null ) {
+ host.router.unbind(consumer.destination, consumer)
+ receipt.foreach{ receipt =>
+ connection_sink.offer(StompFrame(RECEIPT, List((RECEIPT_ID, receipt))))
+ }
+ } else {
+ host.router.get_queue(consumer.binding) { queue=>
+ queue.foreach( _.unbind(consumer::Nil) )
+ }
- // consumer is bind bound as a topic
- host.router.bind(destination, consumer, ^{
- receipt.foreach{ receipt =>
- connection_sink.offer(StompFrame(RECEIPT, List((RECEIPT_ID, receipt))))
- }
- })
- consumer.release
-
- } else {
-
- // create a queue and bind the consumer to it.
- host.router.create_queue(binding) { x=>
- x match {
- case Some(queue:Queue) =>
- queue.bind(consumer::Nil)
- receipt.foreach{ receipt =>
- connection_sink.offer(StompFrame(RECEIPT, List((RECEIPT_ID, receipt))))
- }
- consumer.release
- case None => throw new RuntimeException("case not yet implemented.")
- }
+ if( persistent && consumer.binding!=null ) {
+ host.router.destroy_queue(consumer.binding){sucess=>
+ receipt.foreach{ receipt =>
+ connection_sink.offer(StompFrame(RECEIPT, List((RECEIPT_ID, receipt))))
}
}
+ } else {
+ receipt.foreach{ receipt =>
+ connection_sink.offer(StompFrame(RECEIPT, List((RECEIPT_ID, receipt))))
+ }
+ }
-
-
- case Some(_)=>
- die("A subscription with identified with '"+id+"' allready exists")
}
- case None=>
- die("destination not set.")
- }
+ }
}
def on_stomp_ack(frame:StompFrame):Unit = {
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala?rev=1030133&r1=1030132&r2=1030133&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala Tue Nov 2 17:25:08 2010
@@ -293,13 +293,14 @@ class StompDestinationTest extends Stomp
client.write(
"SUBSCRIBE\n" +
"destination:/topic/updates\n" +
- "id:durable:my-sub-name\n" +
+ "id:my-sub-name\n" +
+ "persistent:true\n" +
"receipt:0\n" +
"\n")
wait_for_receipt("0")
client.close
- // Close him out.. since his id started /w durable: then
+ // Close him out.. since persistent:true then
// the topic subscription will be persistent accross client
// connections.
@@ -311,14 +312,15 @@ class StompDestinationTest extends Stomp
client.write(
"SUBSCRIBE\n" +
"destination:/topic/updates\n" +
- "id:durable:my-sub-name\n" +
+ "id:my-sub-name\n" +
+ "persistent:true\n" +
"\n")
def get(id:Int) = {
val frame = client.receive()
info(frame)
frame should startWith("MESSAGE\n")
- frame should include ("subscription:durable:my-sub-name\n")
+ frame should include ("subscription:my-sub-name\n")
frame should endWith regex("\n\nmessage:"+id+"\n")
}