You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/05/27 16:27:56 UTC

svn commit: r1128322 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-dto/src/main/java/org/apache/activemq/apollo/dto/ apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/ apol...

Author: chirino
Date: Fri May 27 14:27:56 2011
New Revision: 1128322

URL: http://svn.apache.org/viewvc?rev=1128322&view=rev
Log:
Fixes https://issues.apache.org/jira/browse/APLO-26 - You can now send/receive directly to a durable subscription without having to use the topic.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/DestinationParser.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueBinding.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationDTO.java
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionDTO.java
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionDestinationDTO.java
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TopicDestinationDTO.java
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
    activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Result.scala
    activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/DestinationParser.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/DestinationParser.scala?rev=1128322&r1=1128321&r2=1128322&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/DestinationParser.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/DestinationParser.scala Fri May 27 14:27:56 2011
@@ -21,10 +21,10 @@ import BufferConversions._
 import Buffer._
 import org.apache.activemq.apollo.util.path.{Path, PathParser}
 import scala.collection.mutable.ListBuffer
-import org.apache.activemq.apollo.dto.{TopicDestinationDTO, QueueDestinationDTO, DestinationDTO}
 import collection.JavaConversions._
 import java.lang.StringBuilder
 import java.util.regex.Pattern
+import org.apache.activemq.apollo.dto.{DurableSubscriptionDestinationDTO, TopicDestinationDTO, QueueDestinationDTO, DestinationDTO}
 
 object DestinationParser {
 
@@ -47,21 +47,21 @@ object DestinationParser {
 class DestinationParser extends PathParser {
   import DestinationParser._
 
-  var default_domain:String = null
   var queue_prefix = "queue:"
   var topic_prefix = "topic:"
-  var temp_queue_prefix = "temp-queue:"
-  var temp_topic_prefix = "temp-topic:"
+  var dsub_prefix = "dsub:"
   var destination_separator = ","
+//  var temp_queue_prefix = "temp-queue:"
+//  var temp_topic_prefix = "temp-topic:"
 
   def copy(other:DestinationParser) = {
     super.copy(other)
-    default_domain = other.default_domain
     queue_prefix = other.queue_prefix
     topic_prefix = other.topic_prefix
-    temp_queue_prefix = other.temp_queue_prefix
-    temp_topic_prefix = other.temp_topic_prefix
+    dsub_prefix = other.dsub_prefix
     destination_separator = other.destination_separator
+//    temp_queue_prefix = other.temp_queue_prefix
+//    temp_topic_prefix = other.temp_topic_prefix
     this
   }
 
@@ -78,8 +78,13 @@ class DestinationParser extends PathPars
         dest match {
           case d:QueueDestinationDTO =>
             rc.append(queue_prefix)
+            rc.append(encode_path(dest.path.toIterable))
+          case d:DurableSubscriptionDestinationDTO =>
+            rc.append(dsub_prefix)
+            rc.append(d.subscription_id)
           case d:TopicDestinationDTO =>
             rc.append(topic_prefix)
+            rc.append(encode_path(dest.path.toIterable))
 //          case Router.TEMP_QUEUE_DOMAIN =>
 //            baos.write(temp_queue_prefix)
 //          case Router.TEMP_TOPIC_DOMAIN =>
@@ -87,7 +92,6 @@ class DestinationParser extends PathPars
           case _ =>
             throw new Exception("Uknown destination type: "+dest.getClass);
         }
-        rc.append(encode_path(dest.path.toIterable))
 
       }
       rc.toString
@@ -121,10 +125,13 @@ class DestinationParser extends PathPars
 
       if (queue_prefix != null && value.startsWith(queue_prefix)) {
         var name = value.substring(queue_prefix.length)
-        return Array( create_destination(LocalRouter.QUEUE_DOMAIN, parts(name)) )
+        return Array( new QueueDestinationDTO(parts(name)) )
       } else if (topic_prefix != null && value.startsWith(topic_prefix)) {
         var name = value.substring(topic_prefix.length)
-        return Array(create_destination(LocalRouter.TOPIC_DOMAIN, parts(name)))
+        return Array( new TopicDestinationDTO(parts(name)) )
+      } else if (dsub_prefix != null && value.startsWith(dsub_prefix)) {
+        var name = value.substring(dsub_prefix.length)
+        return Array( new DurableSubscriptionDestinationDTO(name) )
 //      } else if (temp_queue_prefix != null && value.startsWith(temp_queue_prefix)) {
 //        var name = value.slice(temp_queue_prefix.length, value.length).ascii()
 //        return new DestinationDTO(LocalRouter.TEMP_QUEUE_DOMAIN, name.toString)
@@ -132,10 +139,7 @@ class DestinationParser extends PathPars
 //        var name = value.slice(temp_topic_prefix.length, value.length).ascii()
 //        return new DestinationDTO(LocalRouter.TEMP_TOPIC_DOMAIN, name.toString)
       } else {
-        if (default_domain == null) {
-          return null;
-        }
-        return Array(create_destination(default_domain, parts(value)))
+        return null;
       }
     }
   }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala?rev=1128322&r1=1128321&r2=1128322&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala Fri May 27 14:27:56 2011
@@ -28,6 +28,8 @@ import org.apache.activemq.apollo.dto._
 import path._
 import security.SecurityContext
 import java.util.concurrent.TimeUnit
+import collection.mutable.ListBuffer._
+import collection.mutable.HashMap._
 
 trait DomainDestination {
 
@@ -53,6 +55,8 @@ object LocalRouter extends Log {
 
   val TOPIC_DOMAIN = "topic"
   val QUEUE_DOMAIN = "queue"
+  val DSUB_DOMAIN = "dsub"
+
   val TEMP_TOPIC_DOMAIN = "temp-topic"
   val TEMP_QUEUE_DOMAIN = "temp-queue"
 
@@ -158,7 +162,7 @@ class LocalRouter(val virtual_host:Virtu
     }
 
     def can_bind_one(path:Path, destination:DestinationDTO, consumer:DeliveryConsumer, security:SecurityContext):Boolean
-    def can_bind_all(path:Path, destination:DestinationDTO, consumer:DeliveryConsumer, security:SecurityContext):Result[Zilch, String] = {
+    def can_bind_all(path:Path, destination:DestinationDTO, consumer:DeliveryConsumer, security:SecurityContext):Option[String] = {
 
       val wildcard = PathParser.containsWildCards(path)
       var matches = get_destination_matches(path)
@@ -168,21 +172,21 @@ class LocalRouter(val virtual_host:Virtu
         if ( matches.isEmpty && auto_create_destinations ) {
           val rc = create_destination(path, destination, security)
           if( rc.failed ) {
-            return rc.map_success(_=> Zilch);
+            return Some(rc.failure)
           }
           matches = get_destination_matches(path)
         }
         if( matches.isEmpty ) {
-          return Failure("The destination does not exist.")
+          return Some("The destination does not exist.")
         }
 
         matches.foreach { dest =>
           if( !can_bind_one(path, destination, consumer, security) ) {
-            return Failure("Not authorized to receive from the destination.")
+            return Some("Not authorized to receive from the destination.")
           }
         }
       }
-      Success(Zilch)
+      None
     }
 
     def bind(path:Path, destination:DestinationDTO, consumer:DeliveryConsumer, security:SecurityContext):Unit = {
@@ -196,7 +200,7 @@ class LocalRouter(val virtual_host:Virtu
       consumers_by_path.put(path, new ConsumerContext(destination, consumer, security))
     }
 
-    def unbind(destination:DestinationDTO, consumer:DeliveryConsumer, persistent:Boolean) = {
+    def unbind(destination:DestinationDTO, consumer:DeliveryConsumer, persistent:Boolean, security: SecurityContext) = {
       val path = destination_parser.decode_path(destination.path)
       if( consumers_by_path.remove(path, new ConsumerContext(destination, consumer, null) ) ) {
         get_destination_matches(path).foreach{ dest=>
@@ -215,7 +219,7 @@ class LocalRouter(val virtual_host:Virtu
 
     def can_connect_one(path:Path, destination:DestinationDTO, producer:BindableDeliveryProducer, security:SecurityContext):Boolean
 
-    def can_connect_all(path:Path, destination:DestinationDTO, producer:BindableDeliveryProducer, security:SecurityContext):Result[Zilch, String] = {
+    def can_connect_all(path:Path, destination:DestinationDTO, producer:BindableDeliveryProducer, security:SecurityContext):Option[String] = {
 
       val wildcard = PathParser.containsWildCards(path)
       var matches = get_destination_matches(path)
@@ -224,7 +228,7 @@ class LocalRouter(val virtual_host:Virtu
 
         // Wild card sends never fail authorization... since a destination
         // may get crated later which the user is authorized to use.
-        Success(Zilch)
+        None
 
       } else {
 
@@ -232,22 +236,22 @@ class LocalRouter(val virtual_host:Virtu
         if ( matches.isEmpty && auto_create_destinations ) {
           val rc = create_destination(path, destination, security)
           if( rc.failed ) {
-            return rc.map_success(_=> Zilch);
+            return Some(rc.failure)
           }
           matches = get_destination_matches(path)
         }
 
         if( matches.isEmpty ) {
-          return Failure("The destination does not exist.")
+          return Some("The destination does not exist.")
         }
 
         // since this is not a wild card, we should have only matched one..
         assert( matches.size == 1 )
         if( !can_connect_one(path, destination, producer, security) ) {
-          return Failure("Not authorized to send to the destination.")
+          return Some("Not authorized to send to the destination.")
         }
 
-        Success(Zilch)
+        None
       }
     }
 
@@ -296,21 +300,69 @@ class LocalRouter(val virtual_host:Virtu
       }
     }
 
+    def dsub_config(subid:String) = DurableSubscriptionQueueBinding.dsub_config(virtual_host, subid)
+
     def topic_config(name:Path):TopicDTO = {
       import collection.JavaConversions._
       import destination_parser._
       virtual_host.config.topics.find( x=> decode_filter(x.id).matches(name) ).getOrElse(new TopicDTO)
     }
 
-    def can_create_ds(config:DurableSubscriptionDTO, security:SecurityContext) = {
-      if( virtual_host.authorizer==null || security==null) {
-        true
-      } else {
-        virtual_host.authorizer.can_create(security, virtual_host, config)
+    override def connect(path:Path, destination:DestinationDTO, producer:BindableDeliveryProducer, security:SecurityContext):Unit = {
+      destination match {
+        case destination:DurableSubscriptionDestinationDTO =>
+
+          // Connects a producer directly to a durable subscription..
+          durable_subscriptions_by_id.get(destination.subscription_id).foreach { dest=>
+            dest.connect(destination, producer)
+          }
+
+        case _ => super.connect(path, destination, producer, security)
       }
     }
 
-    def bind(queue:Queue) = {
+    override def disconnect(destination:DestinationDTO, producer:BindableDeliveryProducer) = {
+      destination match {
+        case destination:DurableSubscriptionDestinationDTO =>
+          durable_subscriptions_by_id.get(destination.subscription_id).foreach { dest=>
+            dest.disconnect(producer)
+          }
+        case _ => super.disconnect(destination, producer)
+      }
+    }
+
+    def create_destination(path:Path, destination:DestinationDTO, security:SecurityContext):Result[Topic,String] = {
+      // We can't create a wild card destination.. only wild card subscriptions.
+      assert( !PathParser.containsWildCards(path) )
+
+      // A new destination is being created...
+      val dto = topic_config(path)
+
+      if(  virtual_host.authorizer!=null && security!=null && !virtual_host.authorizer.can_create(security, virtual_host, dto)) {
+        return new Failure("Not authorized to create the destination")
+      }
+
+      val topic = new Topic(LocalRouter.this, destination.asInstanceOf[TopicDestinationDTO], dto, path.toString(destination_parser))
+      add_destination(path, topic)
+      Success(topic)
+    }
+
+    def can_bind_one(path:Path, destination:DestinationDTO, consumer:DeliveryConsumer, security:SecurityContext):Boolean = {
+      val config = topic_config(path)
+      val authorizer = virtual_host.authorizer
+      if( authorizer!=null && security!=null && !authorizer.can_receive_from(security, virtual_host, config) ) {
+        return false;
+      }
+      true
+    }
+
+    def can_connect_one(path:Path, destination:DestinationDTO, producer:BindableDeliveryProducer, security:SecurityContext):Boolean = {
+      val config = topic_config(path)
+      val authorizer = virtual_host.authorizer
+      !(authorizer!=null && security!=null && !authorizer.can_send_to(security, virtual_host, config) )
+    }
+
+    def bind_dsub(queue:Queue) = {
 
       val destination = queue.binding.binding_dto.asInstanceOf[DurableSubscriptionDestinationDTO]
       val path = queue.binding.destination
@@ -329,54 +381,142 @@ class LocalRouter(val virtual_host:Virtu
       matches.foreach( _.bind_durable_subscription(destination, queue) )
     }
 
-    def unbind(queue:Queue) = {
+    def unbind_dsub(queue:Queue) = {
+
+      val destination = queue.binding.binding_dto.asInstanceOf[DurableSubscriptionDestinationDTO]
       val path = queue.binding.destination
+      val wildcard = PathParser.containsWildCards(path)
+      var matches = get_destination_matches(path)
+
+      // We may need to create the topic...
+      if( !wildcard && matches.isEmpty ) {
+        create_destination(path, destination, null)
+        matches = get_destination_matches(path)
+      }
+
       durable_subscriptions_by_path.remove(path, queue)
+      durable_subscriptions_by_id.remove(destination.subscription_id)
+
+      matches.foreach( _.unbind_durable_subscription(destination, queue) )
 
     }
 
-    def create_destination(path:Path, destination:DestinationDTO, security:SecurityContext):Result[Topic,String] = {
+    override def bind(path: Path, destination: DestinationDTO, consumer: DeliveryConsumer, security: SecurityContext) {
+      destination match {
+        case destination:DurableSubscriptionDestinationDTO =>
 
-      // We can't create a wild card destination.. only wild card subscriptions.
-      assert( !PathParser.containsWildCards(path) )
+          val key = destination.subscription_id
+          val queue = durable_subscriptions_by_id.get( key ).getOrElse {
+            _create_queue(QueueBinding.create(destination))
+          }
 
-      // A new destination is being created...
-      val dto = topic_config(path)
+          // Typically durable subs are only consumed by one connection at a time. So collocate the
+          // queue onto the consumer's dispatch queue.
+          queue.dispatch_queue.setTargetQueue(consumer.dispatch_queue)
+          queue.bind(destination, consumer)
 
-      if(  virtual_host.authorizer!=null && security!=null && !virtual_host.authorizer.can_create(security, virtual_host, dto)) {
-        return new Failure("Not authorized to create the destination")
+        case _ =>
+          super.bind(path, destination, consumer, security)
       }
+    }
 
-      val topic = new Topic(LocalRouter.this, destination.asInstanceOf[TopicDestinationDTO], dto, path.toString(destination_parser))
-      add_destination(path, topic)
-      Success(topic)
+    override def unbind(destination: DestinationDTO, consumer: DeliveryConsumer, persistent: Boolean, security: SecurityContext) = {
+      destination match {
+        case destination:DurableSubscriptionDestinationDTO =>
+          durable_subscriptions_by_id.get(destination.subscription_id).foreach { queue =>
+            queue.unbind(consumer, persistent)
+            if( persistent ) {
+              _destroy_queue(queue, security)
+            }
+          }
+        case _ =>
+          super.unbind( destination, consumer, persistent, security)
+      }
     }
 
-    def can_bind_one(path:Path, destination:DestinationDTO, consumer:DeliveryConsumer, security:SecurityContext):Boolean = {
-      val config = topic_config(path)
-      val authorizer = virtual_host.authorizer
-      if( authorizer!=null && security!=null && !authorizer.can_receive_from(security, virtual_host, config) ) {
-        return false;
+    override def can_bind_all(path: Path, destination: DestinationDTO, consumer: DeliveryConsumer, security: SecurityContext) = {
+      destination match {
+        case destination:DurableSubscriptionDestinationDTO =>
+          val config = dsub_config(destination.subscription_id)
+          if( !path.parts.isEmpty ) {
+            super.can_bind_all(path, destination, consumer, security) orElse {
+              if( !durable_subscriptions_by_id.contains(destination.subscription_id) ) {
+                can_create_dsub(config, security)
+              } else {
+                None
+              } orElse {
+                can_bind_dsub(config, consumer, security)
+              }
+            }
+          } else {
+            // User is trying to directly receive from a durable subscription.. has to allready exist.
+            if( !durable_subscriptions_by_id.contains(destination.subscription_id) ) {
+              Some("Durable subscription does not exist")
+            } else {
+              can_bind_dsub(config, consumer, security)
+            }
+          }
+        case _ =>
+          super.can_bind_all(path, destination, consumer, security)
       }
+    }
+
 
+    override def can_connect_all(path: Path, destination: DestinationDTO, producer: BindableDeliveryProducer, security: SecurityContext) = {
       destination match {
-        case destination:DurableSubscriptionDestinationDTO=>
-          // So the user can subscribe to the topic.. but can he create durable sub??
-          val qc = DurableSubscriptionQueueBinding.create(destination).config(virtual_host).asInstanceOf[DurableSubscriptionDTO]
-          if( !can_create_ds(qc, security) ) {
-             return false;
+        case destination:DurableSubscriptionDestinationDTO =>
+          val config = dsub_config(destination.subscription_id)
+
+            // User is trying to directly send to a durable subscription.. has to allready exist.
+          if( !durable_subscriptions_by_id.contains(destination.subscription_id) ) {
+            Some("Durable subscription does not exist")
+          } else {
+            can_connect_dsub(config, security)
           }
         case _ =>
+          super.can_connect_all(path, destination, producer, security)
       }
-      true
     }
 
-    def can_connect_one(path:Path, destination:DestinationDTO, producer:BindableDeliveryProducer, security:SecurityContext):Boolean = {
-      val config = topic_config(path)
+
+    def can_create_dsub(config:DurableSubscriptionDTO, security:SecurityContext) = {
       val authorizer = virtual_host.authorizer
-      !(authorizer!=null && security!=null && !authorizer.can_send_to(security, virtual_host, config) )
+      if( authorizer!=null && security!=null && !authorizer.can_create(security, virtual_host, config) ) {
+        Some("Not authorized to create the durable subscription.")
+      } else {
+        None
+      }
     }
 
+    def can_connect_dsub(config:DurableSubscriptionDTO, security:SecurityContext):Option[String] = {
+      val authorizer = virtual_host.authorizer
+      if( authorizer!=null && security!=null && !authorizer.can_send_to(security, virtual_host, config) ) {
+        Some("Not authorized to send to the durable subscription.")
+      } else {
+        None
+      }
+    }
+
+    def can_bind_dsub(config:DurableSubscriptionDTO, consumer:DeliveryConsumer, security:SecurityContext):Option[String] = {
+      val authorizer = virtual_host.authorizer
+      if( authorizer!=null && security!=null ) {
+        if ( consumer.browser ) {
+          if( !authorizer.can_receive_from(security, virtual_host, config) ) {
+            Some("Not authorized to receive from the durable subscription.")
+          } else {
+            None
+          }
+        } else {
+          if( !authorizer.can_consume_from(security, virtual_host, config) ) {
+            Some("Not authorized to consume from the durable subscription.")
+          } else {
+            None
+          }
+        }
+      } else {
+        None
+      }
+    }
   }
 
   val queue_domain = new QueueDomain
@@ -576,25 +716,25 @@ class LocalRouter(val virtual_host:Virtu
     consumer.retain
     val paths = destination.map(x=> (destination_parser.decode_path(x.path), x) )
     dispatch_queue ! {
-      val failures = paths.map(x=> domain(x._2).can_bind_all(x._1, x._2, consumer, security) ).flatMap( _.failure_option )
+      val failures = paths.flatMap(x=> domain(x._2).can_bind_all(x._1, x._2, consumer, security) )
       val rc = if( !failures.isEmpty ) {
-        Failure(failures.mkString("; "))
+        Some(failures.mkString("; "))
       } else {
         paths.foreach { x=>
           domain(x._2).bind(x._1, x._2, consumer, security)
         }
-        Success(Zilch)
+        None
       }
       consumer.release
       rc
     }
   }
 
-  def unbind(destinations: Array[DestinationDTO], consumer: DeliveryConsumer, persistent:Boolean=false) = {
+  def unbind(destinations: Array[DestinationDTO], consumer: DeliveryConsumer, persistent:Boolean, security: SecurityContext) = {
     consumer.retain
     dispatch_queue {
       destinations.foreach { destination=>
-        domain(destination).unbind(destination, consumer, persistent)
+        domain(destination).unbind(destination, consumer, persistent, security)
       }
       consumer.release
     }
@@ -605,16 +745,16 @@ class LocalRouter(val virtual_host:Virtu
     val paths = destinations.map(x=> (destination_parser.decode_path(x.path), x) )
     dispatch_queue ! {
 
-      val failures = paths.map(x=> domain(x._2).can_connect_all(x._1, x._2, producer, security) ).flatMap( _.failure_option )
+      val failures = paths.flatMap(x=> domain(x._2).can_connect_all(x._1, x._2, producer, security) )
       if( !failures.isEmpty ) {
         producer.release
-        Failure(failures.mkString("; "))
+        Some(failures.mkString("; "))
       } else {
         paths.foreach { x=>
           domain(x._2).connect(x._1, x._2, producer, security)
         }
         producer.connected()
-        Success(Zilch)
+        None
       }
     }
   }
@@ -703,12 +843,12 @@ class LocalRouter(val virtual_host:Virtu
    */
   def destroy_queue(id:String, security:SecurityContext) = dispatch_queue ! { _destroy_queue(id,security) }
 
-  def _destroy_queue(id:String, security:SecurityContext):Result[Zilch, String] = {
+  def _destroy_queue(id:String, security:SecurityContext):Option[String] = {
     queues_by_id.get(id) match {
       case Some(queue) =>
         _destroy_queue(queue,security)
       case None =>
-        Failure("Does not exist")
+        Some("Does not exist")
     }
   }
 
@@ -717,20 +857,20 @@ class LocalRouter(val virtual_host:Virtu
    */
   def destroy_queue(dto:DestinationDTO, security:SecurityContext) = dispatch_queue ! { _destroy_queue(dto, security) }
 
-  def _destroy_queue(dto:DestinationDTO, security:SecurityContext):Result[Zilch, String] = {
+  def _destroy_queue(dto:DestinationDTO, security:SecurityContext):Option[String] = {
     queues_by_binding.get(QueueBinding.create(dto)) match {
       case Some(queue) =>
         _destroy_queue(queue, security)
       case None =>
-        Failure("Does not exist")
+        Some("Does not exist")
     }
   }
 
-  def _destroy_queue(queue:Queue, security:SecurityContext):Result[Zilch, String] = {
+  def _destroy_queue(queue:Queue, security:SecurityContext):Option[String] = {
 
     if( security!=null && queue.config.acl!=null ) {
       if( !virtual_host.authorizer.can_destroy(security, virtual_host, queue.config) ) {
-        return Failure("Not authorized to destroy")
+        return Some("Not authorized to destroy")
       }
     }
 
@@ -743,7 +883,7 @@ class LocalRouter(val virtual_host:Virtu
         virtual_host.store.remove_queue(queue.store_id){x=> Unit}
       }
     }
-    Success(Zilch)
+    None
   }
 
 }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueBinding.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueBinding.scala?rev=1128322&r1=1128321&r2=1128322&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueBinding.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueBinding.scala Fri May 27 14:27:56 2011
@@ -185,6 +185,25 @@ object DurableSubscriptionQueueBinding e
     }
   }
 
+
+  def dsub_config(host:VirtualHost, id:String) = {
+    import collection.JavaConversions._
+    def matches(x:DurableSubscriptionDTO):Boolean = {
+      if( x.id != null && x.id!=id ) {
+        return false
+      }
+
+      if( x.id_regex != null ) {
+        // May need to cache the regex...
+        val regex = x.id_regex.r
+        if( !regex.findFirstIn(id).isDefined ) {
+          return false
+        }
+      }
+      true
+    }
+    host.config.dsubs.find(matches _).getOrElse(new DurableSubscriptionDTO)
+  }
 }
 
 /**
@@ -202,11 +221,11 @@ class DurableSubscriptionQueueBinding(va
 
 
   def unbind(router: LocalRouter, queue: Queue) = {
-    router.topic_domain.unbind(queue)
+    router.topic_domain.unbind_dsub(queue)
   }
 
   def bind(router: LocalRouter, queue: Queue) = {
-    router.topic_domain.bind(queue)
+    router.topic_domain.bind_dsub(queue)
   }
 
   def id = binding_dto.subscription_id
@@ -226,22 +245,7 @@ class DurableSubscriptionQueueBinding(va
     }
   }
 
-  def config(host:VirtualHost):DurableSubscriptionDTO = {
-      import collection.JavaConversions._
-      import LocalRouter.destination_parser._
-      import AsciiBuffer._
-
-      def matches(x:DurableSubscriptionDTO):Boolean = {
-        if( x.id != null && !decode_filter(x.id).matches(destination)) {
-          return false
-        }
-        if( x.subscription_id != null && x.subscription_id!=binding_dto.subscription_id ) {
-          return false
-        }
-        true
-      }
-      host.config.dsubs.find(matches _).getOrElse(new DurableSubscriptionDTO)
-  }
+  def config(host:VirtualHost):DurableSubscriptionDTO = dsub_config(host, binding_dto.subscription_id)
 
 }
 

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=1128322&r1=1128321&r2=1128322&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala Fri May 27 14:27:56 2011
@@ -35,11 +35,11 @@ trait Router extends Service {
 
   def get_queue(dto:String):Option[Queue] @suspendable
 
-  def bind(destinations:Array[DestinationDTO], consumer:DeliveryConsumer, security:SecurityContext) : Result[Zilch,String] @suspendable
+  def bind(destinations:Array[DestinationDTO], consumer:DeliveryConsumer, security:SecurityContext) : Option[String] @suspendable
 
-  def unbind(destinations:Array[DestinationDTO], consumer:DeliveryConsumer, persistent:Boolean=false)
+  def unbind(destinations:Array[DestinationDTO], consumer:DeliveryConsumer, persistent:Boolean, security:SecurityContext)
 
-  def connect(destinations:Array[DestinationDTO], producer:BindableDeliveryProducer, security:SecurityContext): Result[Zilch,String] @suspendable
+  def connect(destinations:Array[DestinationDTO], producer:BindableDeliveryProducer, security:SecurityContext): Option[String] @suspendable
 
   def disconnect(destinations:Array[DestinationDTO], producer:BindableDeliveryProducer)
 

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala?rev=1128322&r1=1128321&r2=1128322&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala Fri May 27 14:27:56 2011
@@ -53,23 +53,6 @@ class Topic(val router:LocalRouter, val 
           r.bind(list)
         })
 
-      case destination:DurableSubscriptionDestinationDTO=>
-
-        val queue = router.topic_domain.get_or_create_durable_subscription(destination)
-        if( !durable_subscriptions.contains(queue) ) {
-          durable_subscriptions += queue
-          val list = List(queue)
-          producers.foreach({ r=>
-            r.bind(list)
-          })
-        }
-
-        // Typically durable subs are only consumed by on connection at a time. So collocate the
-        // queue onto the consumer's dispatch queue.
-        queue.dispatch_queue.setTargetQueue(consumer.dispatch_queue)
-        queue.bind(destination, consumer)
-        consumer_queues += consumer->queue
-
       case destination:TopicDestinationDTO=>
         var target = consumer
         slow_consumer_policy match {
@@ -112,10 +95,6 @@ class Topic(val router:LocalRouter, val 
             })
             router._destroy_queue(queue.id, null)
 
-          case x:DurableSubscriptionQueueBinding =>
-            if( persistent ) {
-              router.topic_domain.destroy_durable_subscription(queue)
-            }
         }
 
       case None=>

Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationDTO.java?rev=1128322&r1=1128321&r2=1128322&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationDTO.java Fri May 27 14:27:56 2011
@@ -59,4 +59,21 @@ abstract public class DestinationDTO {
         }
         return sb.toString();
     }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (!(o instanceof DestinationDTO)) return false;
+
+        DestinationDTO that = (DestinationDTO) o;
+
+        if (path != null ? !path.equals(that.path) : that.path != null) return false;
+
+        return true;
+    }
+
+    @Override
+    public int hashCode() {
+        return path != null ? path.hashCode() : 0;
+    }
 }
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionDTO.java?rev=1128322&r1=1128321&r2=1128322&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionDTO.java Fri May 27 14:27:56 2011
@@ -28,9 +28,10 @@ import javax.xml.bind.annotation.*;
 public class DurableSubscriptionDTO extends QueueDTO {
 
     /**
-     * To narrow down matches to a subscription id
+     *  A regular expression used to match the subsciption id.
      */
-    @XmlAttribute(name="subscription_id")
-    public String subscription_id;
+    @XmlAttribute(name="id_regex")
+    public String id_regex;
+
 
 }

Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionDestinationDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionDestinationDTO.java?rev=1128322&r1=1128321&r2=1128322&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionDestinationDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionDestinationDTO.java Fri May 27 14:27:56 2011
@@ -40,11 +40,10 @@ public class DurableSubscriptionDestinat
 
     public DurableSubscriptionDestinationDTO() {
     }
-    public DurableSubscriptionDestinationDTO(List<String> name) {
-        super(name);
-    }
-    public DurableSubscriptionDestinationDTO(String[] name) {
-        super(name);
+
+    public DurableSubscriptionDestinationDTO(String subscription_id) {
+        super();
+        this.subscription_id = subscription_id;
     }
 
     @Override

Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TopicDestinationDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TopicDestinationDTO.java?rev=1128322&r1=1128321&r2=1128322&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TopicDestinationDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TopicDestinationDTO.java Fri May 27 14:27:56 2011
@@ -39,23 +39,4 @@ public class TopicDestinationDTO extends
         super(name);
     }
 
-
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-
-        TopicDestinationDTO that = (TopicDestinationDTO) o;
-
-        if (path != null ? !path.equals(that.path) : that.path != null) return false;
-
-        return true;
-    }
-
-    @Override
-    public int hashCode() {
-        return path != null ? path.hashCode() : 0;
-    }
-
 }
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1128322&r1=1128321&r2=1128322&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala Fri May 27 14:27:56 2011
@@ -467,7 +467,8 @@ class OpenwireProtocolHandler extends Pr
       if( is_durable_sub ) {
         destination = destination.map { _ match {
           case x:TopicDestinationDTO=>
-            val rc = new DurableSubscriptionDestinationDTO(x.path)
+            val rc = new DurableSubscriptionDestinationDTO()
+            rc.path = x.path
             if( is_durable_sub ) {
               rc.subscription_id = ""
               if( parent.parent.info.getClientId != null ) {
@@ -485,10 +486,10 @@ class OpenwireProtocolHandler extends Pr
       reset {
         val rc = host.router.bind(destination, this, security_context)
         rc match {
-          case Success(_) =>
+          case None =>
             ack(info)
             noop
-          case Failure(reason) =>
+          case Some(reason) =>
             fail(reason, info)
             noop
         }
@@ -497,7 +498,7 @@ class OpenwireProtocolHandler extends Pr
     }
 
     def dettach = {
-      host.router.unbind(destination, this)
+      host.router.unbind(destination, this, false , security_context)
       parent.consumers.remove(info.getConsumerId)
       all_consumers.remove(info.getConsumerId)
     }
@@ -828,14 +829,15 @@ class OpenwireProtocolHandler extends Pr
         connection.transport.suspendRead
         reset {
           val rc = host.router.connect(destiantion, route, security_context)
-          if( rc.failed ) {
-            async_fail(rc.failure, msg)
-          } else {
-            if (!connection.stopped) {
-              resumeRead
-              producerRoutes.put(key, route)
-              send_via_route(route, msg, uow)
-            }
+          rc match {
+            case Some(failure) =>
+              async_fail(failure, msg)
+            case None =>
+              if (!connection.stopped) {
+                resumeRead
+                producerRoutes.put(key, route)
+                send_via_route(route, msg, uow)
+              }
           }
         }
 

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala?rev=1128322&r1=1128321&r2=1128322&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala Fri May 27 14:27:56 2011
@@ -298,13 +298,12 @@ object Stomp {
   val destination_parser = new DestinationParser
   destination_parser.queue_prefix = "/queue/"
   destination_parser.topic_prefix = "/topic/"
+  destination_parser.dsub_prefix = "/dsub/"
   destination_parser.destination_separator = ","
   destination_parser.path_separator = "."
   destination_parser.any_child_wildcard = "*"
   destination_parser.any_descendant_wildcard = "**"
 
-  destination_parser.default_domain = LocalRouter.QUEUE_DOMAIN
-
   type HeaderMap = List[(AsciiBuffer, AsciiBuffer)]
   type HeaderMapBuffer = ListBuffer[(AsciiBuffer, AsciiBuffer)]
   val NO_DATA = new Buffer(0);

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1128322&r1=1128321&r2=1128322&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Fri May 27 14:27:56 2011
@@ -483,7 +483,7 @@ class StompProtocolHandler extends Proto
       producerRoutes.clear
       consumers.foreach {
         case (_,consumer)=>
-          host.router.unbind(consumer.destination, consumer)
+          host.router.unbind(consumer.destination, consumer, false , security_context)
       }
       consumers = Map()
       trace("stomp protocol resources released")
@@ -767,16 +767,16 @@ class StompProtocolHandler extends Proto
         connection.transport.suspendRead
         reset {
           val rc = host.router.connect(destiantion, route, security_context)
-          if( rc.failed ) {
-            async_die(rc.failure)
-          } else {
-            if (!connection.stopped) {
-              resumeRead
-              producerRoutes.put(key, route)
-              send_via_route(route, frame, uow)
-            }
+          rc match {
+            case Some(failure) =>
+              async_die(failure)
+            case None =>
+              if (!connection.stopped) {
+                resumeRead
+                producerRoutes.put(key, route)
+                send_via_route(route, frame, uow)
+              }
           }
-
         }
 
       case route =>
@@ -922,8 +922,11 @@ class StompProtocolHandler extends Proto
 
     if( persistent ) {
       destination = destination.map { _ match {
+        case x:DurableSubscriptionDestinationDTO=>
+          x
         case x:TopicDestinationDTO=>
-          val rc = new DurableSubscriptionDestinationDTO(x.path)
+          val rc = new DurableSubscriptionDestinationDTO()
+          rc.path = x.path
           rc.subscription_id = decode_header(id)
           rc.filter = if (selector == null) null else selector._1
           rc
@@ -941,10 +944,10 @@ class StompProtocolHandler extends Proto
       val rc = host.router.bind(destination, consumer, security_context)
       consumer.release
       rc match {
-        case Failure(reason)=>
+        case Some(reason)=>
           consumers -= id
           async_die(reason)
-        case _=>
+        case None =>
           send_receipt(headers)
           unit
       }
@@ -997,7 +1000,7 @@ class StompProtocolHandler extends Proto
         // consumer gets disposed after all producer stop sending to it...
         consumer.setDisposer(^{ send_receipt(headers) })
         consumers -= id
-        host.router.unbind(consumer.destination, consumer, persistent)
+        host.router.unbind(consumer.destination, consumer, persistent, security_context)
     }
   }
 

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala?rev=1128322&r1=1128321&r2=1128322&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala Fri May 27 14:27:56 2011
@@ -667,12 +667,7 @@ class DurableSubscriptionTest extends St
     client.close
     connect("1.1")
 
-
-    client.close
-    connect("1.1")
-
     // Now send a bunch of messages....
-    println("sending")
     def put(id:Int) = {
       client.write(
         "SEND\n" +
@@ -701,13 +696,11 @@ class DurableSubscriptionTest extends St
       "persistent:true\n" +
       "\n")
 
-    println("getting from sub 1")
     for( i <- 1 to 1000 ) {
       get(i)
     }
 
     // Empty out the 2nd durable sub
-    println("getting from sub 2")
     client.write(
       "SUBSCRIBE\n" +
       "destination:/topic/sometopic\n" +
@@ -720,6 +713,105 @@ class DurableSubscriptionTest extends St
     }
 
   }
+
+  test("Can directly send an recieve from a durable sub") {
+    connect("1.1")
+
+    // establish 2 durable subs..
+    client.write(
+      "SUBSCRIBE\n" +
+      "destination:/topic/sometopic\n" +
+      "id:sub1\n" +
+      "persistent:true\n" +
+      "receipt:0\n" +
+      "\n")
+    wait_for_receipt("0")
+
+    client.write(
+      "SUBSCRIBE\n" +
+      "destination:/topic/sometopic\n" +
+      "id:sub2\n" +
+      "persistent:true\n" +
+      "receipt:0\n" +
+      "\n")
+    wait_for_receipt("0")
+
+    client.close
+    connect("1.1")
+
+    // Now send a bunch of messages....
+    // Send only to sub 1
+    client.write(
+      "SEND\n" +
+      "destination:/dsub/sub1\n" +
+      "\n" +
+      "sub1 msg\n")
+
+    // Send to all subs
+    client.write(
+      "SEND\n" +
+      "destination:/topic/sometopic\n" +
+      "\n" +
+      "LAST\n")
+
+
+    // Now try to get all the previously sent messages.
+    def get(expected:String) = {
+      val frame = client.receive()
+      frame should startWith("MESSAGE\n")
+      frame should endWith("\n\n"+expected)
+    }
+
+    // Empty out the first durable sub
+    client.write(
+      "SUBSCRIBE\n" +
+      "destination:/dsub/sub1\n" +
+      "id:1\n" +
+      "\n")
+
+    get("sub1 msg\n")
+    get("LAST\n")
+
+    // Empty out the 2nd durable sub
+    client.write(
+      "SUBSCRIBE\n" +
+      "destination:/dsub/sub2\n" +
+      "id:2\n" +
+      "\n")
+
+    get("LAST\n")
+  }
+
+  test("Direct send to a non-existant a durable sub fails") {
+    connect("1.1")
+
+    client.write(
+      "SEND\n" +
+      "destination:/dsub/doesnotexist\n" +
+      "receipt:0\n" +
+      "\n" +
+      "content\n")
+
+    val frame = client.receive()
+    frame should startWith("ERROR\n")
+    frame should include("message:Durable subscription does not exist")
+  }
+
+  test("Direct subscribe to a non-existant a durable sub fails") {
+    connect("1.1")
+
+    client.write(
+      "SUBSCRIBE\n" +
+      "destination:/dsub/doesnotexist\n" +
+      "id:1\n" +
+      "receipt:0\n" +
+      "\n")
+
+    val frame = client.receive()
+    frame should startWith("ERROR\n")
+    frame should include("message:Durable subscription does not exist")
+
+  }
 }
 
 class StompUnifiedQueueTest extends StompTestSupport {

Modified: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Result.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Result.scala?rev=1128322&r1=1128321&r2=1128322&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Result.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Result.scala Fri May 27 14:27:56 2011
@@ -1,5 +1,3 @@
-package org.apache.activemq.apollo.util
-
 /**
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -16,6 +14,22 @@ package org.apache.activemq.apollo.util
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.activemq.apollo.util
+
+object ResultSupport {
+
+  case class RichResult[A,F](self: Result[A,F]) {
+    def then[B](r2: =>Result[B,F]):Result[B,F] = {
+      if( self.failed ) {
+        Failure(self.failure)
+      } else {
+        r2
+      }
+    }
+  }
+
+  implicit def to_rich_result[A,F](value:Result[A,F]) = new RichResult[A,F](value)
+}
 
 /**
  * <p>A Result can either be a Success or a Failure</p>

Modified: activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md?rev=1128322&r1=1128321&r2=1128322&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md (original)
+++ activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md Fri May 27 14:27:56 2011
@@ -943,7 +943,7 @@ Example STOMP frame to connect to the br
 
 STOMP 1.0 clients do specify which virtual host they are connecting to so
 the broker connects those clients to the first virtual host defined in
-it's configuration.  STOMP 1.1 clients do specify a virtual host when they 
+it's configuration.  STOMP 1.1 clients do specify a virtual host when they
 connect.  If no configured virtual host `host_name` matches the client's 
 requested host, the connection is terminated with an ERROR.  Therefore,
 it is critical that the virtual hosts configuration define all the 
@@ -951,15 +951,24 @@ possible host names that clients may con
 
 ### Destination Types
 
-${project_name} supports two main types of destinations, queues and topics.
-The most striking difference between queues and topics is how messages are 
-delivered to consumers.  A queue will load it's messages over the connected
-subscribers so that only one subscriber gets a message.  A topic follows the
-publish/subscribe patterns and it's subscribers each get a copy of every
-message sent.
+${project_name} supports three types of destinations, queues, topics, and
+durable subscriptions.
 
-If you want to send or subscribe to a queue or topic, the STOMP destination
-should be prefixed with `/queue/` or `/topic/` respectively.
+The difference between queues and topics is how messages are delivered to
+consumers. A queue will load balance it's messages over the connected
+subscribers so that only one subscriber gets a message. Topics replicate
+every message sent to it to all the connected subscribers.  Queues hold 
+on to unconsumed messages even when there are no subscriptions attached,
+while a topic will drop messages when there are no connected subscriptions.
+
+A durable subscription allows you to create a subscription against a topic
+which will queue messages even after the client disconnects.  Clients
+can reconnect and consume the queued message originating from the topic
+at a later time.
+
+If you want to send or subscribe to a queue, topic, or durable
+subscription the STOMP destination should be prefixed with `/queue/`,
+`/topic/` or `/dsub/` respectively.
 
 Example STOMP frame sending to a queue:
 
@@ -969,13 +978,6 @@ Example STOMP frame sending to a queue:
     hello queue a
     ^@
 
-Another major difference between queues and topics is that queues hold 
-on to unconsumed messages even when there are no subscriptions attached,
-while a topic will drop messages when there are no connected subscriptions.
-${project_name} allows you to create Durable Subscriptions against topics
-so that a subscription can "out live" the connection that created the 
-subscription.  This allows you consume all the messages sent to the 
-topic without messages getting dropped.
 
 ### Reliable Messaging
 
@@ -1052,6 +1054,26 @@ to the `UNSUBSCRIBE` frame.  Example:
     
     ^@
 
+If the durable subscription already exists you can address it directly 
+using `/dsub/` prefix on the `destination` header.  For example,
+send a message to the previously created `mysub` durable subscription,
+you send the following STOMP frame:
+
+
+    SEND
+    destination:/dsub/mysub
+
+    hello durable sub!
+    ^@
+
+Similarly, you can also subscribe to the subscription in the same way:
+
+    SUBSCRIBE
+    id:0
+    destination:/dsub/mysub
+    
+    ^@
+
 ### Browsing Subscriptions
 
 A normal subscription on a queue will consume messages so that no other