You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/11/02 18:25:14 UTC

svn commit: r1030133 - in /activemq/activemq-apollo/trunk/apollo-stomp/src: main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala test/scala/org/apache/activemq/apollo/stomp/StompTest.scala

Author: chirino
Date: Tue Nov  2 17:25:08 2010
New Revision: 1030133

URL: http://svn.apache.org/viewvc?rev=1030133&view=rev
Log:
added initial pass at the UNSUBSCRIBE implementation, simplified how durable subs are denoted.

Modified:
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=1030133&r1=1030132&r2=1030133&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala Tue Nov  2 17:25:08 2010
@@ -441,6 +441,8 @@ class StompProtocolHandler extends Proto
                 on_stomp_abort(frame.headers)
               case SUBSCRIBE =>
                 on_stomp_subscribe(frame.headers)
+              case UNSUBSCRIBE =>
+                on_stomp_unsubscribe(frame.headers)
 
               case DISCONNECT =>
                 connection.stop
@@ -687,121 +689,168 @@ class StompProtocolHandler extends Proto
 
   def on_stomp_subscribe(headers:HeaderMap):Unit = {
     val receipt = get(headers, RECEIPT_REQUESTED)
-    get(headers, DESTINATION) match {
-      case Some(dest)=>
 
-        val destination:Destination = dest
-        val subscription_id = get(headers, ID)
-        var id:AsciiBuffer = subscription_id match {
-          case None =>
-            if( protocol_version eq V1_0 ) {
-              // in 1.0 it's ok if the client does not send us the
-              // the id header
-              dest
-            } else {
-              die("The id header is missing from the SUBSCRIBE frame");
-              return
-            }
+    val dest = get(headers, DESTINATION) match {
+      case Some(dest)=> dest
+      case None=>
+        die("destination not set.")
+        return
+    }
+    val destination:Destination = dest
 
-          case Some(x:AsciiBuffer)=> x
+    val subscription_id = get(headers, ID)
+    var id:AsciiBuffer = subscription_id match {
+      case None =>
+        if( protocol_version eq V1_0 ) {
+          // in 1.0 it's ok if the client does not send us the
+          // the id header
+          dest
+        } else {
+          die("The id header is missing from the SUBSCRIBE frame");
+          return
         }
+      case Some(x:AsciiBuffer)=> x
+    }
 
-        val topic = destination.getDomain == Router.TOPIC_DOMAIN
+    val topic = destination.getDomain == Router.TOPIC_DOMAIN
+    var persistent = get(headers, PERSISTENT).map( _ == TRUE ).getOrElse(false)
 
-        var durable_name = if( topic && id.startsWith(DURABLE_PREFIX) ) {
-          id
-        } else {
-          null
+    val ack = get(headers, ACK_MODE) match {
+      case None=> new AutoAckHandler
+      case Some(x)=> x match {
+        case ACK_MODE_AUTO=>new AutoAckHandler
+        case ACK_MODE_NONE=>new AutoAckHandler
+        case ACK_MODE_CLIENT=> new SessionAckHandler
+        case ACK_MODE_SESSION=> new SessionAckHandler
+        case ACK_MODE_MESSAGE=> new MessageAckHandler
+        case ack:AsciiBuffer =>
+          die("Unsuported ack mode: "+ack);
+          return;
+      }
+    }
+
+    val selector = get(headers, SELECTOR) match {
+      case None=> null
+      case Some(x)=> x
+        try {
+          (x, SelectorParser.parse(x.utf8.toString))
+        } catch {
+          case e:FilterException =>
+            die("Invalid selector expression: "+e.getMessage)
+            return;
         }
+    }
 
-        val ack = get(headers, ACK_MODE) match {
-          case None=> new AutoAckHandler
-          case Some(x)=> x match {
-            case ACK_MODE_AUTO=>new AutoAckHandler
-            case ACK_MODE_NONE=>new AutoAckHandler
-            case ACK_MODE_CLIENT=> new SessionAckHandler
-            case ACK_MODE_SESSION=> new SessionAckHandler
-            case ACK_MODE_MESSAGE=> new MessageAckHandler
-            case ack:AsciiBuffer => die("Unsuported ack mode: "+ack); null
-          }
+    if ( consumers.contains(id) ) {
+      die("A subscription with identified with '"+id+"' allready exists")
+      return;
+    }
+
+    info("subscribing to: %s", destination)
+    val binding: BindingDTO = if( topic && !persistent ) {
+      null
+    } else {
+      // Controls how the created queue gets bound
+      // to the destination name space (this is used to
+      // recover the queue on restart and rebind it the
+      // way again)
+      if (topic) {
+        val rc = new DurableSubscriptionBindingDTO
+        rc.destination = destination.getName.toString
+        // TODO:
+        // rc.client_id =
+        rc.subscription_id = if( persistent ) id else null
+        rc.filter = if (selector == null) null else selector._1
+        rc
+      } else {
+        val rc = new PointToPointBindingDTO
+        rc.destination = destination.getName.toString
+        rc
+      }
+    }
+
+    val consumer = new StompConsumer(subscription_id, destination, ack, selector, binding);
+    consumers += (id -> consumer)
+
+    if( binding==null ) {
+
+      // consumer is bind bound as a topic
+      host.router.bind(destination, consumer, ^{
+        receipt.foreach{ receipt =>
+          connection_sink.offer(StompFrame(RECEIPT, List((RECEIPT_ID, receipt))))
         }
+      })
+      consumer.release
+
+    } else {
 
-        val selector = get(headers, SELECTOR) match {
-          case None=> null
-          case Some(x)=> x
-            try {
-              (x, SelectorParser.parse(x.utf8.toString))
-            } catch {
-              case e:FilterException =>
-                die("Invalid selector expression: "+e.getMessage)
-              null
+      // create a queue and bind the consumer to it.
+      host.router.create_queue(binding) { x=>
+        x match {
+          case Some(queue:Queue) =>
+            queue.bind(consumer::Nil)
+            receipt.foreach{ receipt =>
+              connection_sink.offer(StompFrame(RECEIPT, List((RECEIPT_ID, receipt))))
             }
+            consumer.release
+          case None => throw new RuntimeException("case not yet implemented.")
         }
+      }
+    }
+  }
 
-        consumers.get(id) match {
-          case None=>
-            info("subscribing to: %s", destination)
+  def on_stomp_unsubscribe(headers:HeaderMap):Unit = {
 
-            val binding: BindingDTO = if( topic && durable_name==null ) {
-              null
-            } else {
-              // Controls how the created queue gets bound
-              // to the destination name space (this is used to
-              // recover the queue on restart and rebind it the
-              // way again)
-              if (topic) {
-                val rc = new DurableSubscriptionBindingDTO
-                rc.destination = destination.getName.toString
-                // TODO:
-                // rc.client_id =
-                rc.subscription_id = durable_name
-                rc.filter = if (selector == null) null else selector._1
-                rc
-              } else {
-                val rc = new PointToPointBindingDTO
-                rc.destination = destination.getName.toString
-                rc
-              }
-            }
+    val receipt = get(headers, RECEIPT_REQUESTED)
+    var persistent = get(headers, PERSISTENT).map( _ == TRUE ).getOrElse(false)
 
-            val consumer = new StompConsumer(subscription_id, destination, ack, selector, binding);
-            consumers += (id -> consumer)
+    val id = get(headers, ID).getOrElse {
+      if( protocol_version eq V1_0 ) {
+        // in 1.0 it's ok if the client does not send us the
+        // the id header, the destination header must be set
+        get(headers, DESTINATION) match {
+          case Some(dest)=> dest
+          case None=>
+            die("destination not set.")
+            return
+        }
+      } else {
+        die("The id header is missing from the UNSUBSCRIBE frame");
+        return
+      }
+    }
 
-            if( binding==null ) {
+    consumers.get(id) match {
+      case None=>
+        die("The subscription '%s' not found.".format(id))
+        return;
+      case Some(consumer)=>
+        // consumer.close
+        if( consumer.binding==null ) {
+          host.router.unbind(consumer.destination, consumer)
+          receipt.foreach{ receipt =>
+            connection_sink.offer(StompFrame(RECEIPT, List((RECEIPT_ID, receipt))))
+          }
+        } else {
+          host.router.get_queue(consumer.binding) { queue=>
+            queue.foreach( _.unbind(consumer::Nil) )
+          }
 
-              // consumer is bind bound as a topic
-              host.router.bind(destination, consumer, ^{
-                receipt.foreach{ receipt =>
-                  connection_sink.offer(StompFrame(RECEIPT, List((RECEIPT_ID, receipt))))
-                }
-              })
-              consumer.release
-
-            } else {
-
-              // create a queue and bind the consumer to it.
-              host.router.create_queue(binding) { x=>
-                x match {
-                  case Some(queue:Queue) =>
-                    queue.bind(consumer::Nil)
-                    receipt.foreach{ receipt =>
-                      connection_sink.offer(StompFrame(RECEIPT, List((RECEIPT_ID, receipt))))
-                    }
-                    consumer.release
-                  case None => throw new RuntimeException("case not yet implemented.")
-                }
+          if( persistent && consumer.binding!=null ) {
+            host.router.destroy_queue(consumer.binding){sucess=>
+              receipt.foreach{ receipt =>
+                connection_sink.offer(StompFrame(RECEIPT, List((RECEIPT_ID, receipt))))
               }
             }
+          } else {
+            receipt.foreach{ receipt =>
+              connection_sink.offer(StompFrame(RECEIPT, List((RECEIPT_ID, receipt))))
+            }
+          }
 
-
-
-          case Some(_)=>
-            die("A subscription with identified with '"+id+"' allready exists")
         }
-      case None=>
-        die("destination not set.")
-    }
 
+    }
   }
 
   def on_stomp_ack(frame:StompFrame):Unit = {

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala?rev=1030133&r1=1030132&r2=1030133&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala Tue Nov  2 17:25:08 2010
@@ -293,13 +293,14 @@ class StompDestinationTest extends Stomp
     client.write(
       "SUBSCRIBE\n" +
       "destination:/topic/updates\n" +
-      "id:durable:my-sub-name\n" +
+      "id:my-sub-name\n" +
+      "persistent:true\n" +
       "receipt:0\n" +
       "\n")
     wait_for_receipt("0")
     client.close
 
-    // Close him out.. since his id started /w durable: then
+    // Close him out.. since persistent:true then
     // the topic subscription will be persistent accross client
     // connections.
 
@@ -311,14 +312,15 @@ class StompDestinationTest extends Stomp
     client.write(
       "SUBSCRIBE\n" +
       "destination:/topic/updates\n" +
-      "id:durable:my-sub-name\n" +
+      "id:my-sub-name\n" +
+      "persistent:true\n" +
       "\n")
 
     def get(id:Int) = {
       val frame = client.receive()
       info(frame)
       frame should startWith("MESSAGE\n")
-      frame should include ("subscription:durable:my-sub-name\n")
+      frame should include ("subscription:my-sub-name\n")
       frame should endWith regex("\n\nmessage:"+id+"\n")
     }