You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/07/07 00:28:43 UTC

svn commit: r1143587 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/ apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/ apoll...

Author: chirino
Date: Wed Jul  6 22:28:43 2011
New Revision: 1143587

URL: http://svn.apache.org/viewvc?rev=1143587&view=rev
Log:
Fixes https://issues.apache.org/jira/browse/APLO-67 : Support an `auto-delete:true` header on the STOMP subscribe frame to auto delete queues when the subscription ends

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
    activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala?rev=1143587&r1=1143586&r2=1143587&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala Wed Jul  6 22:28:43 2011
@@ -17,7 +17,6 @@
 package org.apache.activemq.apollo.broker
 
 import org.fusesource.hawtdispatch._
-import collection.JavaConversions
 import org.apache.activemq.apollo.util._
 import org.apache.activemq.apollo.broker.store.QueueRecord
 import path._
@@ -28,6 +27,7 @@ import scala.Array
 import org.apache.activemq.apollo.dto._
 import java.util.{Arrays, ArrayList}
 import collection.mutable.{LinkedHashMap, HashMap}
+import collection.{Iterable, JavaConversions}
 
 trait DomainDestination {
 
@@ -145,6 +145,10 @@ class LocalRouter(val virtual_host:Virtu
       }
     }
 
+    def can_destroy_destination(path:Path, destination:DestinationDTO, security:SecurityContext):Option[String]
+    def destroy_destination(path:Path, destination:DestinationDTO):Unit
+
+    def can_create_destination(path:Path, destination:DestinationDTO, security:SecurityContext):Option[String]
     def create_destination(path:Path, destination:DestinationDTO, security:SecurityContext):Result[D,String]
 
     def get_or_create_destination(path:Path, destination:DestinationDTO, security:SecurityContext):Result[D,String] = {
@@ -348,6 +352,43 @@ class LocalRouter(val virtual_host:Virtu
       }
     }
 
+    def can_destroy_destination(path:Path, destination: DestinationDTO, security: SecurityContext): Option[String] = {
+      val matches = get_destination_matches(path)
+      val rc = matches.foldLeft(None:Option[String]) { case (rc,dest) =>
+        rc.orElse {
+          if( virtual_host.authorizer!=null && security!=null && !virtual_host.authorizer.can_destroy(security, virtual_host, dest.config)) {
+            Some("Not authorized to destroy topic: %s".format(dest.id))
+          } else {
+            None
+          }
+        }
+      }
+
+      // TODO: destroy not yet supported on topics..  Need to disconnect all
+      // clients and destroy remove any durable subs on the topic.
+      Some("Topic destroy not yet implemented.")
+    }
+
+    def destroy_destination(path:Path, destination: DestinationDTO): Unit = {
+      val matches = get_destination_matches(path)
+//        matches.foreach { dest =>
+//          remove_destination(dest.path, dest)
+//        }
+    }
+
+    def can_create_destination(path:Path, destination:DestinationDTO, security:SecurityContext):Option[String] = {
+      // We can't create a wild card destination.. only wild card subscriptions.
+      assert( !PathParser.containsWildCards(path) )
+      // A new destination is being created...
+      val dto = topic_config(path)
+
+      if(  virtual_host.authorizer!=null && security!=null && !virtual_host.authorizer.can_create(security, virtual_host, dto)) {
+        Some("Not authorized to create the destination")
+      } else {
+        None
+      }
+    }
+
     def create_destination(path:Path, destination:DestinationDTO, security:SecurityContext):Result[Topic,String] = {
       // We can't create a wild card destination.. only wild card subscriptions.
       assert( !PathParser.containsWildCards(path) )
@@ -568,6 +609,14 @@ class LocalRouter(val virtual_host:Virtu
       }
     }
 
+    def can_destroy_queue(config:QueueDTO, security:SecurityContext) = {
+      if( virtual_host.authorizer==null || security==null) {
+        true
+      } else {
+        virtual_host.authorizer.can_destroy(security, virtual_host, config)
+      }
+    }
+
     def bind(queue:Queue) = {
       val path = queue.binding.destination
       assert( !PathParser.containsWildCards(path) )
@@ -594,6 +643,38 @@ class LocalRouter(val virtual_host:Virtu
       }
     }
 
+    def can_destroy_destination(path:Path, destination: DestinationDTO, security: SecurityContext): Option[String] = {
+      val matches = get_destination_matches(path)
+      matches.foldLeft(None:Option[String]) { case (rc,dest) =>
+        rc.orElse {
+          if( can_destroy_queue(dest.config, security) ) {
+            None
+          } else {
+            Some("Not authorized to destroy queue: %s".format(dest.id))
+          }
+        }
+      }
+    }
+
+    def destroy_destination(path:Path, destination: DestinationDTO): Unit = {
+      val matches = get_destination_matches(path)
+      matches.foreach { dest =>
+        _destroy_queue(dest)
+      }
+    }
+
+    def can_create_destination(path: Path, destination:DestinationDTO, security: SecurityContext):Option[String] = {
+      val dto = new QueueDestinationDTO
+      dto.path.addAll(destination.path)
+      val binding = QueueDomainQueueBinding.create(dto)
+      val config = binding.config(virtual_host)
+      if( can_create_queue(config, security) ) {
+        None
+      } else {
+        Some("Not authorized to create the queue")
+      }
+    }
+
     def create_destination(path: Path, destination:DestinationDTO, security: SecurityContext) = {
       val dto = new QueueDestinationDTO
       dto.path.addAll(destination.path)
@@ -850,6 +931,33 @@ class LocalRouter(val virtual_host:Virtu
     }
   }
 
+  def create(destinations:Array[DestinationDTO], security: SecurityContext) = dispatch_queue ! {
+    val paths = destinations.map(x=> (destination_parser.decode_path(x.path), x) )
+    val failures = paths.flatMap(x=> domain(x._2).can_create_destination(x._1, x._2, security) )
+    if( !failures.isEmpty ) {
+      Some(failures.mkString("; "))
+    } else {
+      paths.foreach { x=>
+        domain(x._2).create_destination(x._1, x._2, security)
+      }
+      None
+    }
+  }
+
+  def delete(destinations:Array[DestinationDTO], security: SecurityContext) = dispatch_queue ! {
+    val paths = destinations.map(x=> (destination_parser.decode_path(x.path), x) )
+    val failures = paths.flatMap(x=> domain(x._2).can_destroy_destination(x._1, x._2, security) )
+    if( !failures.isEmpty ) {
+      Some(failures.mkString("; "))
+    } else {
+      paths.foreach { x=>
+        domain(x._2).destroy_destination(x._1, x._2)
+      }
+      None
+    }
+  }
+
+
   def get_or_create_destination(id: DestinationDTO, security: SecurityContext) = dispatch_queue ! {
     _get_or_create_destination(id, security)
   }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=1143587&r1=1143586&r2=1143587&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala Wed Jul  6 22:28:43 2011
@@ -42,6 +42,10 @@ trait Router extends Service {
 
   def disconnect(destinations:Array[DestinationDTO], producer:BindableDeliveryProducer)
 
+  def delete(destinations:Array[DestinationDTO], security:SecurityContext): Option[String] @suspendable
+
+  def create(destinations:Array[DestinationDTO], security:SecurityContext): Option[String] @suspendable
+
   def apply_update(on_completed:Runnable):Unit
 }
 

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala?rev=1143587&r1=1143586&r2=1143587&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala Wed Jul  6 22:28:43 2011
@@ -402,6 +402,7 @@ object Stomp {
   val BROWSER = ascii("browser")
   val EXCLUSIVE = ascii("exclusive")
   val USER_ID = ascii("user-id")
+  val AUTO_DELETE = ascii("auto-delete")
 
   ///////////////////////////////////////////////////////////////////
   // Common Values

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1143587&r1=1143586&r2=1143587&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Wed Jul  6 22:28:43 2011
@@ -32,6 +32,7 @@ import org.apache.activemq.apollo.broker
 import org.apache.activemq.apollo.util._
 import java.util.concurrent.TimeUnit
 import java.util.Map.Entry
+import path.PathParser
 import scala.util.continuations._
 import org.apache.activemq.apollo.dto._
 import org.apache.activemq.apollo.transport.tcp.SslTransport
@@ -71,6 +72,9 @@ object StompProtocolHandler extends Log 
   val DEFAULT_INBOUND_HEARTBEAT = 10*1000L
   var inbound_heartbeat = DEFAULT_INBOUND_HEARTBEAT
 
+
+  def noop = shift {  k: (Unit=>Unit) => k() }
+  def unit:Unit = {}
 }
 
 /**
@@ -224,7 +228,8 @@ class StompProtocolHandler extends Proto
     val ack_handler:AckHandler,
     val selector:(String, BooleanExpression),
     override val browser:Boolean,
-    override val exclusive:Boolean
+    override val exclusive:Boolean,
+    val auto_delete:Boolean
   ) extends BaseRetained with DeliveryConsumer {
 
 ////  The following comes in handy if we need to debug the
@@ -303,20 +308,33 @@ class StompProtocolHandler extends Proto
               // and then trigger closing the session once it empties out.
               val sink = new OverflowSink(session)
               sink.refiller = ^{
-                session_manager.close(session)
-                release
+                dispose
               }
               sink.offer(frame)
             } else {
               session.offer(frame)
-              session_manager.close(session)
-              release
+              dispose
             }
           } else {
-            session_manager.close(session)
-            release
+            dispose
+          }
+        }
+      }
+
+      def dispose = {
+        session_manager.close(session)
+        if( auto_delete ) {
+          reset {
+            val rc = host.router.delete(destination, security_context)
+            rc match {
+              case Some(error) =>
+                async_die(error)
+              case None =>
+                unit
+            }
           }
         }
+        release
       }
 
       // Delegate all the flow control stuff to the session
@@ -656,8 +674,6 @@ class StompProtocolHandler extends Proto
         die("Invalid heart-beat header: "+heart_beat)
     }
 
-    def noop = shift {  k: (Unit=>Unit) => k() }
-
     def send_connected = {
 
       var connected_headers = ListBuffer((VERSION, protocol_version))
@@ -903,6 +919,17 @@ class StompProtocolHandler extends Proto
     var persistent = get(headers, PERSISTENT).map( _ == TRUE ).getOrElse(false)
     var browser = get(headers, BROWSER).map( _ == TRUE ).getOrElse(false)
     var exclusive = get(headers, EXCLUSIVE).map( _ == TRUE ).getOrElse(false)
+    var auto_delete = get(headers, AUTO_DELETE).map( _ == TRUE ).getOrElse(false)
+
+    if(auto_delete) {
+      if( destination.length != 1 ) {
+        die("The auto-delete subscription header cannot be used in conjunction with composite destinations");
+      }
+      val path = destination_parser.decode_path(destination.head.path)
+      if( PathParser.containsWildCards(path) ) {
+        die("The auto-delete subscription header cannot be used in conjunction with wildcard destinations");
+      }
+    }
 
     val ack = get(headers, ACK_MODE) match {
       case None=> new AutoAckHandler
@@ -948,11 +975,9 @@ class StompProtocolHandler extends Proto
       }
     }
 
-    val consumer = new StompConsumer(subscription_id, destination, ack, selector, browser, exclusive);
+    val consumer = new StompConsumer(subscription_id, destination, ack, selector, browser, exclusive, auto_delete);
     consumers += (id -> consumer)
 
-    def unit:Unit = {}
-
     reset {
       val rc = host.router.bind(destination, consumer, security_context)
       consumer.release

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala?rev=1143587&r1=1143586&r2=1143587&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala Wed Jul  6 22:28:43 2011
@@ -19,11 +19,13 @@ package org.apache.activemq.apollo.stomp
 import org.scalatest.matchers.ShouldMatchers
 import org.scalatest.BeforeAndAfterEach
 import java.lang.String
-import org.apache.activemq.apollo.broker.{KeyStorage, Broker, BrokerFactory}
 import org.apache.activemq.apollo.util.{FileSupport, Logging, FunSuiteSupport, ServiceControl}
 import FileSupport._
 import org.apache.activemq.apollo.dto.KeyStorageDTO
 import java.net.InetSocketAddress
+import org.fusesource.hawtdispatch._
+import org.apache.activemq.apollo.broker.{LocalRouter, KeyStorage, Broker, BrokerFactory}
+import org.fusesource.hawtbuf.Buffer._
 
 class StompTestSupport extends FunSuiteSupport with ShouldMatchers with BeforeAndAfterEach with Logging {
   var broker: Broker = null
@@ -1554,3 +1556,62 @@ class StompExpirationTest extends StompT
     get("3")
   }
 }
+
+class StompAutoDeleteTest extends StompTestSupport {
+
+  def path_separator = "."
+
+  test("Messages Expire") {
+    connect("1.1")
+
+    def put(msg:String) = {
+      client.write(
+        "SEND\n" +
+        "destination:/queue/autodel\n" +
+        "\n" +
+        "message:"+msg+"\n")
+    }
+
+    put("1")
+
+    Thread.sleep(2000)
+
+    client.write(
+      "SUBSCRIBE\n" +
+      "destination:/queue/autodel\n" +
+      "auto-delete:true\n" +
+      "id:1\n" +
+      "\n")
+
+    def get(dest:String) = {
+      val frame = client.receive()
+      frame should startWith("MESSAGE\n")
+      frame should endWith("\n\nmessage:%s\n".format(dest))
+    }
+    get("1")
+
+    def queue_exists:Boolean = {
+      val host = broker.virtual_hosts.get(ascii("default")).get
+      host.dispatch_queue.future {
+        val router = host.router.asInstanceOf[LocalRouter]
+        router.queue_domain.destination_by_id.get("autodel").isDefined
+      }.await()
+    }
+
+    // The queue should still exist..
+    expect(true)(queue_exists)
+
+    client.write(
+      "UNSUBSCRIBE\n" +
+      "id:1\n" +
+      "receipt:0\n"+
+      "\n")
+    wait_for_receipt("0")
+
+    Thread.sleep(1000);
+
+    // Now that we unsubscribe, it should not exist any more.
+    expect(false)(queue_exists)
+
+  }
+}
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md?rev=1143587&r1=1143586&r2=1143587&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md (original)
+++ activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md Wed Jul  6 22:28:43 2011
@@ -1254,6 +1254,33 @@ to the `SUBSCRIBE` frame.  Example:
     
     ^@
 
+### Queue Auto Delete on `UNSUBSCRIBE`
+
+When you subscribe to a queue, you can add the `auto-delete:true` header
+to have the queue automatically deleted when the subscription ends. This
+is typically used when implementing the request/reply messaging pattern.
+The requesting client creates a temporary dynamically named queue which it
+will use to receive responses for requests which sends out. Once the
+client unsubscribes or his connection terminates, then the queue is
+automatically deleted.
+
+Example:
+
+    SUBSCRIBE
+    id:mysub
+    destination:/queue/temp.myclientid.1308690148000
+    auto-delete:true
+    exclusive:true
+    
+    ^@
+
+If the client does not have sufficient authority to delete the queue, when
+the subscription ends, then the STOMP connection will be terminated with
+an error message.
+
+The auto-delete feature does not work with composite or wildcard destinations.
+it also does not work with topics or durable subscriptions.
+
 ### Destination Wildcards
 
 We support destination wildcards to easily subscribe to multiple destinations