You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/07/07 00:28:43 UTC
svn commit: r1143587 - in /activemq/activemq-apollo/trunk:
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/
apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/
apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/ apoll...
Author: chirino
Date: Wed Jul 6 22:28:43 2011
New Revision: 1143587
URL: http://svn.apache.org/viewvc?rev=1143587&view=rev
Log:
Fixes https://issues.apache.org/jira/browse/APLO-67 : Support an `auto-delete:true` header on the STOMP subscribe frame to auto delete queues when the subscription ends
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala?rev=1143587&r1=1143586&r2=1143587&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala Wed Jul 6 22:28:43 2011
@@ -17,7 +17,6 @@
package org.apache.activemq.apollo.broker
import org.fusesource.hawtdispatch._
-import collection.JavaConversions
import org.apache.activemq.apollo.util._
import org.apache.activemq.apollo.broker.store.QueueRecord
import path._
@@ -28,6 +27,7 @@ import scala.Array
import org.apache.activemq.apollo.dto._
import java.util.{Arrays, ArrayList}
import collection.mutable.{LinkedHashMap, HashMap}
+import collection.{Iterable, JavaConversions}
trait DomainDestination {
@@ -145,6 +145,10 @@ class LocalRouter(val virtual_host:Virtu
}
}
+ def can_destroy_destination(path:Path, destination:DestinationDTO, security:SecurityContext):Option[String]
+ def destroy_destination(path:Path, destination:DestinationDTO):Unit
+
+ def can_create_destination(path:Path, destination:DestinationDTO, security:SecurityContext):Option[String]
def create_destination(path:Path, destination:DestinationDTO, security:SecurityContext):Result[D,String]
def get_or_create_destination(path:Path, destination:DestinationDTO, security:SecurityContext):Result[D,String] = {
@@ -348,6 +352,43 @@ class LocalRouter(val virtual_host:Virtu
}
}
+ def can_destroy_destination(path:Path, destination: DestinationDTO, security: SecurityContext): Option[String] = {
+ val matches = get_destination_matches(path)
+ val rc = matches.foldLeft(None:Option[String]) { case (rc,dest) =>
+ rc.orElse {
+ if( virtual_host.authorizer!=null && security!=null && !virtual_host.authorizer.can_destroy(security, virtual_host, dest.config)) {
+ Some("Not authorized to destroy topic: %s".format(dest.id))
+ } else {
+ None
+ }
+ }
+ }
+
+ // TODO: destroy not yet supported on topics.. Need to disconnect all
+ // clients and destroy remove any durable subs on the topic.
+ Some("Topic destroy not yet implemented.")
+ }
+
+ def destroy_destination(path:Path, destination: DestinationDTO): Unit = {
+ val matches = get_destination_matches(path)
+// matches.foreach { dest =>
+// remove_destination(dest.path, dest)
+// }
+ }
+
+ def can_create_destination(path:Path, destination:DestinationDTO, security:SecurityContext):Option[String] = {
+ // We can't create a wild card destination.. only wild card subscriptions.
+ assert( !PathParser.containsWildCards(path) )
+ // A new destination is being created...
+ val dto = topic_config(path)
+
+ if( virtual_host.authorizer!=null && security!=null && !virtual_host.authorizer.can_create(security, virtual_host, dto)) {
+ Some("Not authorized to create the destination")
+ } else {
+ None
+ }
+ }
+
def create_destination(path:Path, destination:DestinationDTO, security:SecurityContext):Result[Topic,String] = {
// We can't create a wild card destination.. only wild card subscriptions.
assert( !PathParser.containsWildCards(path) )
@@ -568,6 +609,14 @@ class LocalRouter(val virtual_host:Virtu
}
}
+ def can_destroy_queue(config:QueueDTO, security:SecurityContext) = {
+ if( virtual_host.authorizer==null || security==null) {
+ true
+ } else {
+ virtual_host.authorizer.can_destroy(security, virtual_host, config)
+ }
+ }
+
def bind(queue:Queue) = {
val path = queue.binding.destination
assert( !PathParser.containsWildCards(path) )
@@ -594,6 +643,38 @@ class LocalRouter(val virtual_host:Virtu
}
}
+ def can_destroy_destination(path:Path, destination: DestinationDTO, security: SecurityContext): Option[String] = {
+ val matches = get_destination_matches(path)
+ matches.foldLeft(None:Option[String]) { case (rc,dest) =>
+ rc.orElse {
+ if( can_destroy_queue(dest.config, security) ) {
+ None
+ } else {
+ Some("Not authorized to destroy queue: %s".format(dest.id))
+ }
+ }
+ }
+ }
+
+ def destroy_destination(path:Path, destination: DestinationDTO): Unit = {
+ val matches = get_destination_matches(path)
+ matches.foreach { dest =>
+ _destroy_queue(dest)
+ }
+ }
+
+ def can_create_destination(path: Path, destination:DestinationDTO, security: SecurityContext):Option[String] = {
+ val dto = new QueueDestinationDTO
+ dto.path.addAll(destination.path)
+ val binding = QueueDomainQueueBinding.create(dto)
+ val config = binding.config(virtual_host)
+ if( can_create_queue(config, security) ) {
+ None
+ } else {
+ Some("Not authorized to create the queue")
+ }
+ }
+
def create_destination(path: Path, destination:DestinationDTO, security: SecurityContext) = {
val dto = new QueueDestinationDTO
dto.path.addAll(destination.path)
@@ -850,6 +931,33 @@ class LocalRouter(val virtual_host:Virtu
}
}
+ def create(destinations:Array[DestinationDTO], security: SecurityContext) = dispatch_queue ! {
+ val paths = destinations.map(x=> (destination_parser.decode_path(x.path), x) )
+ val failures = paths.flatMap(x=> domain(x._2).can_create_destination(x._1, x._2, security) )
+ if( !failures.isEmpty ) {
+ Some(failures.mkString("; "))
+ } else {
+ paths.foreach { x=>
+ domain(x._2).create_destination(x._1, x._2, security)
+ }
+ None
+ }
+ }
+
+ def delete(destinations:Array[DestinationDTO], security: SecurityContext) = dispatch_queue ! {
+ val paths = destinations.map(x=> (destination_parser.decode_path(x.path), x) )
+ val failures = paths.flatMap(x=> domain(x._2).can_destroy_destination(x._1, x._2, security) )
+ if( !failures.isEmpty ) {
+ Some(failures.mkString("; "))
+ } else {
+ paths.foreach { x=>
+ domain(x._2).destroy_destination(x._1, x._2)
+ }
+ None
+ }
+ }
+
+
def get_or_create_destination(id: DestinationDTO, security: SecurityContext) = dispatch_queue ! {
_get_or_create_destination(id, security)
}
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=1143587&r1=1143586&r2=1143587&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala Wed Jul 6 22:28:43 2011
@@ -42,6 +42,10 @@ trait Router extends Service {
def disconnect(destinations:Array[DestinationDTO], producer:BindableDeliveryProducer)
+ def delete(destinations:Array[DestinationDTO], security:SecurityContext): Option[String] @suspendable
+
+ def create(destinations:Array[DestinationDTO], security:SecurityContext): Option[String] @suspendable
+
def apply_update(on_completed:Runnable):Unit
}
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala?rev=1143587&r1=1143586&r2=1143587&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala Wed Jul 6 22:28:43 2011
@@ -402,6 +402,7 @@ object Stomp {
val BROWSER = ascii("browser")
val EXCLUSIVE = ascii("exclusive")
val USER_ID = ascii("user-id")
+ val AUTO_DELETE = ascii("auto-delete")
///////////////////////////////////////////////////////////////////
// Common Values
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1143587&r1=1143586&r2=1143587&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Wed Jul 6 22:28:43 2011
@@ -32,6 +32,7 @@ import org.apache.activemq.apollo.broker
import org.apache.activemq.apollo.util._
import java.util.concurrent.TimeUnit
import java.util.Map.Entry
+import path.PathParser
import scala.util.continuations._
import org.apache.activemq.apollo.dto._
import org.apache.activemq.apollo.transport.tcp.SslTransport
@@ -71,6 +72,9 @@ object StompProtocolHandler extends Log
val DEFAULT_INBOUND_HEARTBEAT = 10*1000L
var inbound_heartbeat = DEFAULT_INBOUND_HEARTBEAT
+
+ def noop = shift { k: (Unit=>Unit) => k() }
+ def unit:Unit = {}
}
/**
@@ -224,7 +228,8 @@ class StompProtocolHandler extends Proto
val ack_handler:AckHandler,
val selector:(String, BooleanExpression),
override val browser:Boolean,
- override val exclusive:Boolean
+ override val exclusive:Boolean,
+ val auto_delete:Boolean
) extends BaseRetained with DeliveryConsumer {
//// The following comes in handy if we need to debug the
@@ -303,20 +308,33 @@ class StompProtocolHandler extends Proto
// and then trigger closing the session once it empties out.
val sink = new OverflowSink(session)
sink.refiller = ^{
- session_manager.close(session)
- release
+ dispose
}
sink.offer(frame)
} else {
session.offer(frame)
- session_manager.close(session)
- release
+ dispose
}
} else {
- session_manager.close(session)
- release
+ dispose
+ }
+ }
+ }
+
+ def dispose = {
+ session_manager.close(session)
+ if( auto_delete ) {
+ reset {
+ val rc = host.router.delete(destination, security_context)
+ rc match {
+ case Some(error) =>
+ async_die(error)
+ case None =>
+ unit
+ }
}
}
+ release
}
// Delegate all the flow control stuff to the session
@@ -656,8 +674,6 @@ class StompProtocolHandler extends Proto
die("Invalid heart-beat header: "+heart_beat)
}
- def noop = shift { k: (Unit=>Unit) => k() }
-
def send_connected = {
var connected_headers = ListBuffer((VERSION, protocol_version))
@@ -903,6 +919,17 @@ class StompProtocolHandler extends Proto
var persistent = get(headers, PERSISTENT).map( _ == TRUE ).getOrElse(false)
var browser = get(headers, BROWSER).map( _ == TRUE ).getOrElse(false)
var exclusive = get(headers, EXCLUSIVE).map( _ == TRUE ).getOrElse(false)
+ var auto_delete = get(headers, AUTO_DELETE).map( _ == TRUE ).getOrElse(false)
+
+ if(auto_delete) {
+ if( destination.length != 1 ) {
+ die("The auto-delete subscription header cannot be used in conjunction with composite destinations");
+ }
+ val path = destination_parser.decode_path(destination.head.path)
+ if( PathParser.containsWildCards(path) ) {
+ die("The auto-delete subscription header cannot be used in conjunction with wildcard destinations");
+ }
+ }
val ack = get(headers, ACK_MODE) match {
case None=> new AutoAckHandler
@@ -948,11 +975,9 @@ class StompProtocolHandler extends Proto
}
}
- val consumer = new StompConsumer(subscription_id, destination, ack, selector, browser, exclusive);
+ val consumer = new StompConsumer(subscription_id, destination, ack, selector, browser, exclusive, auto_delete);
consumers += (id -> consumer)
- def unit:Unit = {}
-
reset {
val rc = host.router.bind(destination, consumer, security_context)
consumer.release
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala?rev=1143587&r1=1143586&r2=1143587&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala Wed Jul 6 22:28:43 2011
@@ -19,11 +19,13 @@ package org.apache.activemq.apollo.stomp
import org.scalatest.matchers.ShouldMatchers
import org.scalatest.BeforeAndAfterEach
import java.lang.String
-import org.apache.activemq.apollo.broker.{KeyStorage, Broker, BrokerFactory}
import org.apache.activemq.apollo.util.{FileSupport, Logging, FunSuiteSupport, ServiceControl}
import FileSupport._
import org.apache.activemq.apollo.dto.KeyStorageDTO
import java.net.InetSocketAddress
+import org.fusesource.hawtdispatch._
+import org.apache.activemq.apollo.broker.{LocalRouter, KeyStorage, Broker, BrokerFactory}
+import org.fusesource.hawtbuf.Buffer._
class StompTestSupport extends FunSuiteSupport with ShouldMatchers with BeforeAndAfterEach with Logging {
var broker: Broker = null
@@ -1554,3 +1556,62 @@ class StompExpirationTest extends StompT
get("3")
}
}
+
+class StompAutoDeleteTest extends StompTestSupport {
+
+ def path_separator = "."
+
+ test("Messages Expire") {
+ connect("1.1")
+
+ def put(msg:String) = {
+ client.write(
+ "SEND\n" +
+ "destination:/queue/autodel\n" +
+ "\n" +
+ "message:"+msg+"\n")
+ }
+
+ put("1")
+
+ Thread.sleep(2000)
+
+ client.write(
+ "SUBSCRIBE\n" +
+ "destination:/queue/autodel\n" +
+ "auto-delete:true\n" +
+ "id:1\n" +
+ "\n")
+
+ def get(dest:String) = {
+ val frame = client.receive()
+ frame should startWith("MESSAGE\n")
+ frame should endWith("\n\nmessage:%s\n".format(dest))
+ }
+ get("1")
+
+ def queue_exists:Boolean = {
+ val host = broker.virtual_hosts.get(ascii("default")).get
+ host.dispatch_queue.future {
+ val router = host.router.asInstanceOf[LocalRouter]
+ router.queue_domain.destination_by_id.get("autodel").isDefined
+ }.await()
+ }
+
+ // The queue should still exist..
+ expect(true)(queue_exists)
+
+ client.write(
+ "UNSUBSCRIBE\n" +
+ "id:1\n" +
+ "receipt:0\n"+
+ "\n")
+ wait_for_receipt("0")
+
+ Thread.sleep(1000);
+
+ // Now that we unsubscribe, it should not exist any more.
+ expect(false)(queue_exists)
+
+ }
+}
\ No newline at end of file
Modified: activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md?rev=1143587&r1=1143586&r2=1143587&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md (original)
+++ activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md Wed Jul 6 22:28:43 2011
@@ -1254,6 +1254,33 @@ to the `SUBSCRIBE` frame. Example:
^@
+### Queue Auto Delete on `UNSUBSCRIBE`
+
+When you subscribe to a queue, you can add the `auto-delete:true` header
+to have the queue automatically deleted when the subscription ends. This
+is typically used when implementing the request/reply messaging pattern.
+The requesting client creates a temporary dynamically named queue which it
+will use to receive responses for requests which sends out. Once the
+client unsubscribes or his connection terminates, then the queue is
+automatically deleted.
+
+Example:
+
+ SUBSCRIBE
+ id:mysub
+ destination:/queue/temp.myclientid.1308690148000
+ auto-delete:true
+ exclusive:true
+
+ ^@
+
+If the client does not have sufficient authority to delete the queue, when
+the subscription ends, then the STOMP connection will be terminated with
+an error message.
+
+The auto-delete feature does not work with composite or wildcard destinations.
+it also does not work with topics or durable subscriptions.
+
### Destination Wildcards
We support destination wildcards to easily subscribe to multiple destinations