You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/11/29 13:15:00 UTC

svn commit: r1040082 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/resources/ apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-broker/src/test/resources/org/apache/activemq/apollo/broker/ apollo-broker/src/test/scal...

Author: chirino
Date: Mon Nov 29 12:14:59 2010
New Revision: 1040082

URL: http://svn.apache.org/viewvc?rev=1040082&view=rev
Log:
Hierarchical destinations decoupled from their encoded representation, protocols are free to customize the encoding.
Destination and queue settings are now configurable.

Added:
    activemq/activemq-apollo/trunk/apollo-broker/src/test/resources/org/apache/activemq/apollo/broker/destination-config.xml
      - copied, changed from r1039378, activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/jaxb/testSimpleConfig.xml
    activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/DestinationConfigurationTest.scala
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java
    activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/Path.java
    activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathParser.java
Removed:
    activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathMapEntry.java
    activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathSupport.java
    activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PrefixPathFilter.java
    activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/SimplePathFilter.java
    activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/WildcardPathFilter.java
Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/non-persistent-activemq.xml
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Binding.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Destination.java
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Destinations.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/jaxb/XMLBrokerFactoryTest.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/jaxb/testSimpleConfig.xml
    activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationDTO.java
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java
    activemq/activemq-apollo/trunk/apollo-dto/src/main/resources/org/apache/activemq/apollo/dto/jaxb.index
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala
    activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/AnyChildPathNode.java
    activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathFilter.java
    activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathMap.java
    activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathMapNode.java
    activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathNode.java
    activemq/activemq-apollo/trunk/apollo-util/src/test/scala/org/apache/activemq/apollo/util/path/PathMapMemoryTest.java
    activemq/activemq-apollo/trunk/apollo-util/src/test/scala/org/apache/activemq/apollo/util/path/PathMapTest.java

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/non-persistent-activemq.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/non-persistent-activemq.xml?rev=1040082&r1=1040081&r2=1040082&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/non-persistent-activemq.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/non-persistent-activemq.xml Mon Nov 29 12:14:59 2010
@@ -18,6 +18,5 @@
 <broker xmlns="http://activemq.apache.org/schema/activemq/apollo">
   <virtual-host>
     <host-name>default</host-name>
-    <memory-store/>    
   </virtual-host>
 </broker>

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Binding.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Binding.scala?rev=1040082&r1=1040081&r2=1040082&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Binding.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Binding.scala Mon Nov 29 12:14:59 2010
@@ -16,11 +16,13 @@
  */
 package org.apache.activemq.apollo.broker
 
-import org.apache.activemq.apollo.util.ClassFinder
 import org.fusesource.hawtbuf.{Buffer, AsciiBuffer}
-import org.apache.activemq.apollo.dto.{JsonCodec, DurableSubscriptionBindingDTO, PointToPointBindingDTO, BindingDTO}
 import org.apache.activemq.apollo.selector.SelectorParser
 import org.apache.activemq.apollo.filter.{ConstantExpression, BooleanExpression}
+import org.apache.activemq.apollo.util.path.Path
+import Buffer._
+import org.apache.activemq.apollo.dto._
+import org.apache.activemq.apollo.util.{OptionSupport, ClassFinder}
 
 /**
  * <p>
@@ -60,8 +62,18 @@ object BindingFactory {
     }
     throw new IllegalArgumentException("Invalid binding type: "+binding_dto);
   }
+
+}
+
+object Binding {
+  val destination_parser = new DestinationParser
+
+  def encode(value:Array[Path]):String = destination_parser.toString(value)
+  def decode(value:String):Array[Path] = destination_parser.parsePath(ascii(value))
 }
 
+import Binding._
+
 /**
  * <p>
  * </p>
@@ -91,11 +103,19 @@ trait Binding {
 
   def message_filter:BooleanExpression = ConstantExpression.TRUE
 
-  def destination:AsciiBuffer
+  def matches(config:QueueDTO):Boolean = {
+    import Binding.destination_parser._
+    import OptionSupport._
+    var rc = (o(config.destination).map{ x=> parseFilter(ascii(x)).matches(destination) }.getOrElse(true))
+    rc = rc && (o(config.kind).map{ x=> x == binding_kind.toString }.getOrElse(true))
+    rc
+  }
+
+  def destination:Array[Path]
 }
 
 object PointToPointBinding {
-  val POINT_TO_POINT_KIND = new AsciiBuffer("p2p")
+  val POINT_TO_POINT_KIND = new AsciiBuffer("ptp")
   val DESTINATION_PATH = new AsciiBuffer("default");
 }
 
@@ -115,9 +135,9 @@ class PointToPointBindingFactory extends
 
   def create(binding_dto:BindingDTO) = {
     if( binding_dto.isInstanceOf[PointToPointBindingDTO] ) {
-      val p2p_dto = binding_dto.asInstanceOf[PointToPointBindingDTO]
-      val data = new AsciiBuffer(p2p_dto.destination).buffer
-      new PointToPointBinding(data, p2p_dto)
+      val ptp_dto = binding_dto.asInstanceOf[PointToPointBindingDTO]
+      val data = new AsciiBuffer(ptp_dto.destination).buffer
+      new PointToPointBinding(data, ptp_dto)
     } else {
       null
     }
@@ -132,6 +152,7 @@ class PointToPointBindingFactory extends
  */
 class PointToPointBinding(val binding_data:Buffer, val binding_dto:PointToPointBindingDTO) extends Binding {
 
+  val destination = Binding.decode(binding_dto.destination)
   def binding_kind = POINT_TO_POINT_KIND
 
   def unbind(node: RoutingNode, queue: Queue) = {
@@ -155,7 +176,6 @@ class PointToPointBinding(val binding_da
     case _ => false
   }
 
-  def destination = new AsciiBuffer(binding_dto.destination)
 }
 
 
@@ -191,6 +211,8 @@ class DurableSubBindingFactory extends B
  */
 class DurableSubBinding(val binding_data:Buffer, val binding_dto:DurableSubscriptionBindingDTO) extends Binding {
 
+  val destination = Binding.decode(binding_dto.destination)
+
   def binding_kind = DURABLE_SUB_KIND
 
 
@@ -228,6 +250,11 @@ class DurableSubBinding(val binding_data
     }
   }
 
-  def destination = new AsciiBuffer(binding_dto.destination)
-
+  override def matches(config: QueueDTO): Boolean = {
+    import OptionSupport._
+    var rc = super.matches(config)
+    rc = rc && (o(config.client_id).map{ x=> x == binding_dto.client_id }.getOrElse(true))
+    rc = rc && (o(config.subscription_id).map{ x=> x == binding_dto.subscription_id }.getOrElse(true))
+    rc
+  }
 }
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala?rev=1040082&r1=1040081&r2=1040082&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala Mon Nov 29 12:14:59 2010
@@ -191,7 +191,7 @@ class Broker() extends BaseService with 
   /**
    * Validates and then applies the configuration.
    */
-  def configure(config: BrokerDTO, reporter:Reporter) = ^{
+  def configure(config: BrokerDTO, reporter:Reporter) = dispatchQueue {
     if ( validate(config, reporter) < ERROR ) {
       this.config = config
 
@@ -201,7 +201,7 @@ class Broker() extends BaseService with 
 
       }
     }
-  } >>: dispatchQueue
+  }
 
 
   override def _start(onCompleted:Runnable) = {

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Destination.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Destination.java?rev=1040082&r1=1040081&r2=1040082&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Destination.java (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Destination.java Mon Nov 29 12:14:59 2010
@@ -16,12 +16,13 @@
  */
 package org.apache.activemq.apollo.broker;
 
+import org.apache.activemq.apollo.util.path.Path;
 import org.fusesource.hawtbuf.AsciiBuffer;
 
 /**
  */
 public interface Destination {
   AsciiBuffer getDomain();
-  AsciiBuffer getName();
+  Path[] path();
   Destination[] getDestinations();
 }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Destinations.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Destinations.scala?rev=1040082&r1=1040081&r2=1040082&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Destinations.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Destinations.scala Mon Nov 29 12:14:59 2010
@@ -18,51 +18,48 @@ package org.apache.activemq.apollo.broke
 
 import _root_.org.fusesource.hawtbuf._
 import BufferConversions._
+import org.apache.activemq.apollo.util.path.{Path, PathParser}
+import Buffer._
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class ParserOptions {
-  var defaultDomain: AsciiBuffer = null
-  var queuePrefix: AsciiBuffer = null
-  var topicPrefix: AsciiBuffer = null
-  var tempQueuePrefix: AsciiBuffer = null
-  var tempTopicPrefix: AsciiBuffer = null
-  var separator: Option[Byte] = None
-}
+class DestinationParser extends PathParser {
 
-/**
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-object DestinationParser {
+  var default_domain: AsciiBuffer = null
+  var queue_prefix: AsciiBuffer = ascii("queue:")
+  var topic_prefix: AsciiBuffer = ascii("topic:")
+  var temp_queue_prefix: AsciiBuffer = ascii("temp-queue:")
+  var temp_topic_prefix: AsciiBuffer = ascii("temp-topic:")
+  var destination_separator: Option[Byte] = Some(','.toByte)
 
-  def toBuffer(value: Destination, options: ParserOptions): AsciiBuffer = {
+  def toBuffer(value: Destination): AsciiBuffer = {
     if (value == null) {
       null
     } else {
       val baos = new ByteArrayOutputStream
       def write(value: Destination):Unit = {
         if (value.getDestinations != null) {
-          assert( options.separator.isDefined )
+          assert( destination_separator.isDefined )
           val first = true
           for (d <- value.getDestinations) {
             if (!first) {
-              baos.write(options.separator.get)
+              baos.write(destination_separator.get)
             }
             write(d)
           }
         } else {
           value.getDomain match {
             case Router.QUEUE_DOMAIN =>
-              baos.write(options.queuePrefix)
+              baos.write(queue_prefix)
             case Router.TOPIC_DOMAIN =>
-              baos.write(options.topicPrefix)
+              baos.write(topic_prefix)
             case Router.TEMP_QUEUE_DOMAIN =>
-              baos.write(options.tempQueuePrefix)
+              baos.write(temp_queue_prefix)
             case Router.TEMP_TOPIC_DOMAIN =>
-              baos.write(options.tempTopicPrefix)
+              baos.write(temp_topic_prefix)
           }
-          baos.write(value.getName)
+          this.write(value.path, baos)
         }
       }
       write(value)
@@ -74,20 +71,19 @@ object DestinationParser {
    * Parses a destination which may or may not be a composite.
    *
    * @param value
-   * @param options
    * @param compositeSeparator
    * @return
    */
-  def parse(value: AsciiBuffer, options: ParserOptions): Destination = {
+  def parse(value: AsciiBuffer): Destination = {
     if (value == null) {
       return null;
     }
 
-    if (options.separator.isDefined && value.contains(options.separator.get)) {
-      var rc = value.split(options.separator.get);
+    if (destination_separator.isDefined && value.contains(destination_separator.get)) {
+      var rc = value.split(destination_separator.get);
       var dl: List[Destination] = Nil
       for (buffer <- rc) {
-        val d = parse(buffer, options)
+        val d = parse(buffer)
         if (d == null) {
           return null;
         }
@@ -95,23 +91,23 @@ object DestinationParser {
       }
       return new MultiDestination(dl.toArray[Destination]);
     } else {
-      if (options.queuePrefix != null && value.startsWith(options.queuePrefix)) {
-        var name = value.slice(options.queuePrefix.length, value.length).ascii();
-        return new SingleDestination(Router.QUEUE_DOMAIN, name);
-      } else if (options.topicPrefix != null && value.startsWith(options.topicPrefix)) {
-        var name = value.slice(options.topicPrefix.length, value.length).ascii();
-        return new SingleDestination(Router.TOPIC_DOMAIN, name);
-      } else if (options.tempQueuePrefix != null && value.startsWith(options.tempQueuePrefix)) {
-        var name = value.slice(options.tempQueuePrefix.length, value.length).ascii();
-        return new SingleDestination(Router.TEMP_QUEUE_DOMAIN, name);
-      } else if (options.tempTopicPrefix != null && value.startsWith(options.tempTopicPrefix)) {
-        var name = value.slice(options.tempTopicPrefix.length, value.length).ascii();
-        return new SingleDestination(Router.TEMP_TOPIC_DOMAIN, name);
+      if (queue_prefix != null && value.startsWith(queue_prefix)) {
+        var name = value.slice(queue_prefix.length, value.length).ascii();
+        return new SingleDestination(Router.QUEUE_DOMAIN, parsePath(name));
+      } else if (topic_prefix != null && value.startsWith(topic_prefix)) {
+        var name = value.slice(topic_prefix.length, value.length).ascii();
+        return new SingleDestination(Router.TOPIC_DOMAIN, parsePath(name));
+      } else if (temp_queue_prefix != null && value.startsWith(temp_queue_prefix)) {
+        var name = value.slice(temp_queue_prefix.length, value.length).ascii();
+        return new SingleDestination(Router.TEMP_QUEUE_DOMAIN, parsePath(name));
+      } else if (temp_topic_prefix != null && value.startsWith(temp_topic_prefix)) {
+        var name = value.slice(temp_topic_prefix.length, value.length).ascii();
+        return new SingleDestination(Router.TEMP_TOPIC_DOMAIN, parsePath(name));
       } else {
-        if (options.defaultDomain == null) {
+        if (default_domain == null) {
           return null;
         }
-        return new SingleDestination(options.defaultDomain, value);
+        return new SingleDestination(default_domain, parsePath(value));
       }
     }
   }
@@ -120,11 +116,11 @@ object DestinationParser {
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-case class SingleDestination(var domain: AsciiBuffer = null, var name: AsciiBuffer = null) extends Destination {
+case class SingleDestination(var domain: AsciiBuffer = null, var name: Array[Path] = null) extends Destination {
   def getDestinations(): Array[Destination] = null;
   def getDomain(): AsciiBuffer = domain
 
-  def getName(): AsciiBuffer = name
+  def path() = name
 
   override def toString() = "" + domain + ":" + name
 }
@@ -136,7 +132,7 @@ case class MultiDestination(var destinat
   def getDestinations(): Array[Destination] = destinations;
   def getDomain(): AsciiBuffer = null
 
-  def getName(): AsciiBuffer = null
+  def path() = null
 
   override def toString() = destinations.mkString(",")
 }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1040082&r1=1040081&r2=1040082&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Mon Nov 29 12:14:59 2010
@@ -30,6 +30,8 @@ import org.apache.activemq.apollo.store.
 import org.apache.activemq.apollo.util._
 import org.apache.activemq.apollo.util.list._
 import org.fusesource.hawtdispatch.{Dispatch, ListEventAggregator, DispatchQueue, BaseRetained}
+import org.apache.activemq.apollo.dto.QueueDTO
+import OptionSupport._
 
 object Queue extends Log {
   val subcsription_counter = new AtomicInteger(0)
@@ -39,7 +41,7 @@ object Queue extends Log {
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class Queue(val host: VirtualHost, var id:Long, val binding:Binding) extends BaseRetained with Route with DeliveryConsumer with BaseService with DispatchLogging {
+class Queue(val host: VirtualHost, var id:Long, val binding:Binding, var config:QueueDTO) extends BaseRetained with Route with DeliveryConsumer with BaseService with DispatchLogging {
   override protected def log = Queue
 
   var inbound_sessions = Set[DeliverySession]()
@@ -75,45 +77,50 @@ class Queue(val host: VirtualHost, var i
   entries.addFirst(head_entry)
 
   //
-  // Tuning options.
+  // In-frequently accessed tuning configuration.
   //
 
   /**
    *  The amount of memory buffer space for receiving messages.
    */
-  var tune_producer_buffer = 1024*32
+  def tune_producer_buffer = config.producer_buffer.getOrElse(1024*32)
 
   /**
    *  The amount of memory buffer space for the queue..
    */
-  var tune_queue_buffer = 1024*32
-
-  /**
-   *  The amount of memory buffer space to use per subscription.
-   */
-  var tune_consumer_buffer = 1024*64
+  def tune_queue_buffer = config.queue_buffer.getOrElse(1024*32)
 
   /**
    * Subscribers that consume slower than this rate per seconds will be considered
    * slow.  Once a consumer is considered slow, we may switch to disk spooling.
    */
-  var tune_slow_subscription_rate = 500*1024
+  def tune_slow_subscription_rate = config.slow_subscription_rate.getOrElse(500*1024)
 
   /**
    * The number of milliseconds between slow consumer checks.
    */
-  var tune_slow_check_interval = 500L
+  def tune_slow_check_interval = config.slow_check_interval.getOrElse(500L)
+
+  /**
+   * The number of intervals that a consumer must not meeting the subscription rate before it is
+   * flagged as a slow consumer.
+   */
+  def tune_max_slow_intervals = config.max_slow_intervals.getOrElse(10)
+
+  //
+  // Frequently accessed tuning configuration.
+  //
 
   /**
    * Should this queue persistently store it's entries?
    */
-  def tune_persistent = host.store !=null
+  var tune_persistent = true
 
   /**
    * Should messages be flushed or swapped out of memory if
    * no consumers need the message?
    */
-  def tune_flush_to_store = tune_persistent
+  var tune_flush_to_store = true
 
   /**
    * The number max number of flushed queue entries to load
@@ -121,13 +128,21 @@ class Queue(val host: VirtualHost, var i
    * reference pointers to the actual messages.  When not loaded,
    * the batch is referenced as sequence range to conserve memory.
    */
-  def tune_flush_range_size = 10000
+  var tune_flush_range_size = 0
 
   /**
-   * The number of intervals that a consumer must not meeting the subscription rate before it is
-   * flagged as a slow consumer. 
+   *  The amount of memory buffer space to use per subscription.
    */
-  var tune_max_slow_intervals = 10
+  var tune_consumer_buffer = 0
+
+  def configure(c:QueueDTO) = {
+    config = c
+    tune_persistent = host.store !=null && config.persistent.getOrElse(true)
+    tune_flush_to_store = tune_persistent && config.flush_to_store.getOrElse(true)
+    tune_flush_range_size = config.flush_range_size.getOrElse(10000)
+    tune_consumer_buffer = config.consumer_buffer.getOrElse(1024*64)
+  }
+  configure(config)
 
   var enqueue_item_counter = 0L
   var dequeue_item_counter = 0L

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=1040082&r1=1040081&r2=1040082&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala Mon Nov 29 12:14:59 2010
@@ -25,22 +25,23 @@ import _root_.org.fusesource.hawtdispatc
 import collection.JavaConversions
 import org.apache.activemq.apollo.util._
 import collection.mutable.{ListBuffer, HashMap}
-import org.apache.activemq.apollo.dto.{PointToPointBindingDTO, BindingDTO}
-import path.{PathFilter, PathMap}
 import scala.collection.immutable.List
 import org.apache.activemq.apollo.store.{StoreUOW, QueueRecord}
+import org.apache.activemq.apollo.util.path.{Path, PathMap, PathParser}
+import Buffer._
+import org.apache.activemq.apollo.dto.{QueueDTO, PointToPointBindingDTO, BindingDTO}
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
 object Router extends Log {
-  val TOPIC_DOMAIN = new AsciiBuffer("topic");
-  val QUEUE_DOMAIN = new AsciiBuffer("queue");
-  val TEMP_TOPIC_DOMAIN = new AsciiBuffer("temp-topic");
-  val TEMP_QUEUE_DOMAIN = new AsciiBuffer("temp-queue");
+  val TOPIC_DOMAIN = ascii("topic");
+  val QUEUE_DOMAIN = ascii("queue");
+  val TEMP_TOPIC_DOMAIN = ascii("temp-topic");
+  val TEMP_QUEUE_DOMAIN = ascii("temp-queue");
 
-  val QUEUE_KIND = new AsciiBuffer("queue");
-  val DEFAULT_QUEUE_PATH = new AsciiBuffer("default");
+  val QUEUE_KIND = ascii("queue");
+  val DEFAULT_QUEUE_PATH = ascii("default");
 }
 
 /**
@@ -83,12 +84,12 @@ class Router(val host:VirtualHost) exten
     }
   }
 
-  def routing_nodes:Iterable[RoutingNode] = JavaConversions.asIterable(destinations.get(PathFilter.ANY_DESCENDENT))
+  def routing_nodes:Iterable[RoutingNode] = JavaConversions.asIterable(destinations.get(Array(PathParser.ANY_DESCENDANT)))
   
-  def create_destination_or(destination:AsciiBuffer)(func:(RoutingNode)=>Unit):RoutingNode = {
+  def create_destination_or(destination:Array[Path])(func:(RoutingNode)=>Unit):RoutingNode = {
 
     // We can't create a wild card destination.. only wild card subscriptions.
-    assert( !PathFilter.containsWildCards(destination) )
+    assert( !PathParser.containsWildCards(destination) )
 
     var rc = destinations.chooseValue( destination )
     if( rc == null ) {
@@ -112,13 +113,21 @@ class Router(val host:VirtualHost) exten
     rc
   }
 
-  def get_destination_matches(destination:AsciiBuffer) = {
+  def get_destination_matches(destination:Array[Path]) = {
     import JavaConversions._
     asIterable(destinations.get( destination ))
   }
 
   def _create_queue(id:Long, binding:Binding):Queue = {
-    val queue = new Queue(host, id, binding)
+
+    val config = {
+      import collection.JavaConversions._
+      host.config.queues.find{ config=>
+        binding.matches(config)
+      }
+    }.getOrElse(new QueueDTO)
+
+    val queue = new Queue(host, id, binding, config)
     queue.start
     queues.put(binding, queue)
 
@@ -127,7 +136,7 @@ class Router(val host:VirtualHost) exten
     if( name!=null ) {
       bindings.put(name, queue)
       // make sure the destination is created if this is not a wild card sub
-      if( !PathFilter.containsWildCards(name) ) {
+      if( !PathParser.containsWildCards(name) ) {
         create_destination_or(name) { node=>
           node.add_queue(queue)
         }
@@ -195,11 +204,11 @@ class Router(val host:VirtualHost) exten
 
       assert( is_topic(destination) )
 
-      val name = destination.getName
+      val name = destination.path
 
       // make sure the destination is created if this is not a wild card sub
-      if( !PathFilter.containsWildCards(name) ) {
-        val node = create_destination_or(name) { node=> }
+      if( !PathParser.containsWildCards(name) ) {
+        val node = create_destination_or(name) { node=> Unit }
       }
 
       get_destination_matches(name).foreach( node=>
@@ -211,7 +220,7 @@ class Router(val host:VirtualHost) exten
 
   def unbind(destination:Destination, consumer:DeliveryConsumer) = releasing(consumer) {
     assert( is_topic(destination) )
-    val name = destination.getName
+    val name = destination.path
     broadcast_consumers.remove(name, consumer)
     get_destination_matches(name).foreach{ node=>
       node.remove_broadcast_consumer(consumer)
@@ -234,13 +243,13 @@ class Router(val host:VirtualHost) exten
       // Looking up the queue will cause it to get created if it does not exist.
       val queue = if( !topic ) {
         val dto = new PointToPointBindingDTO
-        dto.destination = destination.getName.toString
+        dto.destination = Binding.encode(destination.path)
         _create_queue(dto)
       } else {
         None
       }
 
-      val node = create_destination_or(destination.getName) { node=> }
+      val node = create_destination_or(destination.path) { node=> Unit }
       if( node.unified || topic ) {
         node.add_broadcast_producer( route )
       } else {
@@ -254,7 +263,7 @@ class Router(val host:VirtualHost) exten
   def disconnect(route:DeliveryProducerRoute) = releasing(route) {
 
     val topic = is_topic(route.destination)
-    val node = create_destination_or(route.destination.getName) { node=> }
+    val node = create_destination_or(route.destination.path) { node=> Unit }
     if( node.unified || topic ) {
       node.remove_broadcast_producer(route)
     }
@@ -268,7 +277,7 @@ class Router(val host:VirtualHost) exten
 /**
  * Tracks state associated with a destination name.
  */
-class RoutingNode(val router:Router, val name:AsciiBuffer) {
+class RoutingNode(val router:Router, val name:Array[Path]) {
 
   val id = router.destination_id_counter.incrementAndGet
 
@@ -276,8 +285,14 @@ class RoutingNode(val router:Router, val
   var broadcast_consumers = ListBuffer[DeliveryConsumer]()
   var queues = ListBuffer[Queue]()
 
-  // TODO: extract the node's config from the host config object
-  def unified = false
+  val unified = {
+    import collection.JavaConversions._
+    import OptionSupport._
+    import Binding.destination_parser._
+
+    val t= router.host.config.destinations.find( x=> parseFilter(ascii(x.path)).matches(name) )
+    t.flatMap(x=> o(x.unified)).getOrElse(false)
+  }
 
   def add_broadcast_consumer (consumer:DeliveryConsumer) = {
     broadcast_consumers += consumer

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=1040082&r1=1040081&r2=1040082&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala Mon Nov 29 12:14:59 2010
@@ -32,18 +32,13 @@ import collection.JavaConversions
 import java.util.concurrent.atomic.AtomicLong
 import org.apache.activemq.apollo.util.OptionSupport._
 import security.{Authenticator, Authorizer}
+import org.apache.activemq.apollo.util.path.PathParser
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
 object VirtualHost extends Log {
 
-  val destination_parser_options = new ParserOptions
-  destination_parser_options.queuePrefix = new AsciiBuffer("queue:")
-  destination_parser_options.topicPrefix = new AsciiBuffer("topic:")
-  destination_parser_options.tempQueuePrefix = new AsciiBuffer("temp-queue:")
-  destination_parser_options.tempTopicPrefix = new AsciiBuffer("temp-topic:")
-
   /**
    * Creates a default a configuration object.
    */
@@ -223,9 +218,7 @@ class VirtualHost(val broker: Broker, va
       // rates between producers and consumers, look for natural data flow partitions
       // and then try to equally divide the load over the available processing
       // threads/cores.
-      val nodes = router.destinations.get(PathFilter.ANY_DESCENDENT)
-
-      JavaConversions.asIterable(nodes).foreach { node =>
+      router.routing_nodes.foreach { node =>
 
         // For the topics, just collocate the producers onto the first consumer's
         // thread.

Copied: activemq/activemq-apollo/trunk/apollo-broker/src/test/resources/org/apache/activemq/apollo/broker/destination-config.xml (from r1039378, activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/jaxb/testSimpleConfig.xml)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/resources/org/apache/activemq/apollo/broker/destination-config.xml?p2=activemq/activemq-apollo/trunk/apollo-broker/src/test/resources/org/apache/activemq/apollo/broker/destination-config.xml&p1=activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/jaxb/testSimpleConfig.xml&r1=1039378&r2=1040082&rev=1040082&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/jaxb/testSimpleConfig.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/resources/org/apache/activemq/apollo/broker/destination-config.xml Mon Nov 29 12:14:59 2010
@@ -15,18 +15,18 @@
     See the License for the specific language governing permissions and
     limitations under the License.
 -->
-<broker xmlns="http://activemq.apache.org/schema/activemq/apollo">
-  <dispatcher name="test dispatcher" threads="4"/>
+<broker xmlns="http://activemq.apache.org/schema/activemq/apollo" id="default">
 
-  <transport-server>pipe://test1</transport-server>
+  <virtual-host id="default">
+    <host-name>test</host-name>
 
-  <connector/>
-  <connector>pipe://test1</connector>
+    <destination path="unified.*" unified="true"/>
+    <destination path="notunified.*" unified="false"/>
+
+    <queue destination="unified.a" kind="ptp" queue-buffer="333"/>
+    <queue destination="unified.*" kind="ds" queue-buffer="444"/>
+    <queue queue-buffer="111"/>
 
-  <virtual-host>
-    <host-name>localhost</host-name>
-    <host-name>test.localhost</host-name>
-    <memory-store/>
   </virtual-host>
   
 </broker>

Added: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/DestinationConfigurationTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/DestinationConfigurationTest.scala?rev=1040082&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/DestinationConfigurationTest.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/DestinationConfigurationTest.scala Mon Nov 29 12:14:59 2010
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker
+
+import org.fusesource.hawtbuf.Buffer._
+import scala.util.continuations._
+import org.apache.activemq.apollo.util.{ServiceControl, FunSuiteSupport}
+import org.apache.activemq.apollo.dto.{BindingDTO, DurableSubscriptionBindingDTO, PointToPointBindingDTO}
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class DestinationConfigurationTest extends FunSuiteSupport {
+
+  test("Simple Config") {
+    val uri = "xml:classpath:org/apache/activemq/apollo/broker/destination-config.xml"
+    info("Loading broker configuration from the classpath with URI: " + uri)
+    val broker = BrokerFactory.createBroker(uri)
+    ServiceControl.start(broker, "broker")
+
+    val host = broker.config.virtual_hosts.get(0)
+
+    expect("test") {
+      host.host_names.get(0)
+    }
+
+    // Let make sure we are reading in the expected config..
+    expect(2) {
+      host.destinations.size
+    }
+    expect(3) {
+      host.queues.size
+    }
+
+    val router = broker.default_virtual_host.router
+
+    def check_tune_queue_buffer(expected:Int)(dto:BindingDTO) = {
+      var actual=0
+      reset {
+        var q = router.create_queue(dto).get
+        actual = q.tune_queue_buffer
+      }
+      expect(expected) {actual}
+    }
+
+    check_tune_queue_buffer(333) {
+      var p = new PointToPointBindingDTO()
+      p.destination = "unified.a"
+      p
+    }
+    check_tune_queue_buffer(444) {
+      val p = new DurableSubscriptionBindingDTO()
+      p.destination = "unified.b"
+      p.client_id = "a"
+      p.subscription_id = "b"
+      p
+    }
+
+    check_tune_queue_buffer(111) {
+      var p = new PointToPointBindingDTO()
+      p.destination = "notunified.other"
+      p
+    }
+
+    def dest(v:String) = Binding.destination_parser.parsePath(ascii(v))
+    expect(true) {
+      router.destinations.chooseValue(dest("unified.a")).unified
+    }
+    expect(false) {
+      router.destinations.chooseValue(dest("notunified.other")).unified
+    }
+    ServiceControl.stop(broker, "broker")
+  }
+
+}

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/jaxb/XMLBrokerFactoryTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/jaxb/XMLBrokerFactoryTest.scala?rev=1040082&r1=1040081&r2=1040082&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/jaxb/XMLBrokerFactoryTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/jaxb/XMLBrokerFactoryTest.scala Mon Nov 29 12:14:59 2010
@@ -31,9 +31,6 @@ class XMLBrokerFactoryTest extends FunSu
     info("Loading broker configuration from the classpath with URI: " + uri)
     val broker = BrokerFactory.createBroker(uri)
 
-    //		assertEquals(4, p.getSize())
-    //		assertEquals("test dispatcher", p.getName())
-
     expect(1) {
       broker.config.connectors.size()
     }
@@ -50,10 +47,6 @@ class XMLBrokerFactoryTest extends FunSu
       broker.config.virtual_hosts.size()
     }
 
-    //		Assert.assertNotNull(broker.defaultVirtualHost().getDatabase())
-    //		Assert.assertNotNull(broker.defaultVirtualHost().getDatabase().getStore())
-    //		Assert.assertTrue((broker.defaultVirtualHost().getDatabase().getStore() instanceof MemoryStore))
-
   }
 
   def expectException(msg: String = "Expected exeception.")(func: => Unit) = {

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/jaxb/testSimpleConfig.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/jaxb/testSimpleConfig.xml?rev=1040082&r1=1040081&r2=1040082&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/jaxb/testSimpleConfig.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/jaxb/testSimpleConfig.xml Mon Nov 29 12:14:59 2010
@@ -16,11 +16,7 @@
     limitations under the License.
 -->
 <broker xmlns="http://activemq.apache.org/schema/activemq/apollo">
-  <dispatcher name="test dispatcher" threads="4"/>
 
-  <transport-server>pipe://test1</transport-server>
-
-  <connector/>
   <connector>pipe://test1</connector>
 
   <virtual-host>

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala?rev=1040082&r1=1040081&r2=1040082&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala Mon Nov 29 12:14:59 2010
@@ -208,13 +208,15 @@ abstract class BrokerPerfSupport extends
     config
   }
 
+  val parser = new DestinationParser
+
   def createDestinations(destCount: Int): Array[Destination] = {
     var dests = new Array[Destination](destCount)
 
     for (i <- 0 until destCount) {
       val domain = if (PTP) {Router.QUEUE_DOMAIN} else {Router.TOPIC_DOMAIN}
       val name = new AsciiBuffer("dest" + (i + 1))
-      var bean = new SingleDestination(domain, name)
+      var bean = new SingleDestination(domain, parser.parsePath(name))
       dests(i) = bean
       //        if (PTP) {
       //          sendBroker.defaultVirtualHost.createQueue(dests(i))

Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationDTO.java?rev=1040082&r1=1040081&r2=1040082&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationDTO.java Mon Nov 29 12:14:59 2010
@@ -26,17 +26,13 @@ import java.util.ArrayList;
  */
 @XmlRootElement(name = "destination")
 @XmlAccessorType(XmlAccessType.FIELD)
-public class DestinationDTO {
+public class DestinationDTO extends StringIdDTO {
 
     /**
-     * The name or wild card name of the destination
+     * The path to the destination.  You can use wild cards.
      */
-    public String name;
-
-    /**
-     * The kind of destination, "queue" or "topic"
-     */
-    public String kind;
+	@XmlAttribute
+	public String path;
 
     /**
      * If set to true, then routing then there is no difference between
@@ -44,8 +40,7 @@ public class DestinationDTO {
      * a queue subscriptions is created, it will act like if a durable
      * subscription was created on the topic. 
      */
+    @XmlAttribute
     public Boolean unified;
 
-    
-
 }

Added: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java?rev=1040082&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java (added)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java Mon Nov 29 12:14:59 2010
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.dto;
+
+import org.codehaus.jackson.annotate.JsonProperty;
+
+import javax.xml.bind.annotation.*;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+@XmlRootElement(name = "queue")
+@XmlAccessorType(XmlAccessType.FIELD)
+public class QueueDTO {
+
+    /*
+     * The destination this queue is associated with.  You can use wild cards.
+     */
+    @XmlAttribute
+    public String destination;
+
+    /*
+     * The kind of queue.  It may be "ptp" for standard
+     * point to point queues or "ds" for durable subscriptions.
+     * If not set, then this configuration applies to all queue types.
+     */
+    @XmlAttribute
+    public String kind;
+
+    /**
+     * If the kind is "ds" then you can specify which client
+     * id this configuration should match.
+     */
+    @XmlAttribute(name="client-id")
+    public String client_id;
+
+    /**
+     * If the kind is "ds" then you can specify which subscription
+     * id this configuration should match.
+     */
+    @XmlAttribute(name="subscription-id")
+    public String subscription_id;
+
+
+    /**
+     *  The amount of memory buffer space for receiving messages.
+     */
+    @XmlAttribute(name="producer-buffer")
+    @JsonProperty("producer_buffer")
+    public Integer producer_buffer;
+
+    /**
+     *  The amount of memory buffer space for the queue..
+     */
+    @XmlAttribute(name="queue-buffer")
+    @JsonProperty("queue_buffer")
+    public Integer queue_buffer;
+
+    /**
+     *  The amount of memory buffer space to use per subscription.
+     */
+    @XmlAttribute(name="consumer-buffer")
+    @JsonProperty("consumer_buffer")
+    public Integer consumer_buffer;
+
+    /**
+     * Subscribers that consume slower than this rate per seconds will be considered
+     * slow.  Once a consumer is considered slow, we may switch to disk spooling.
+     */
+    @XmlAttribute(name="slow-subscription-rate")
+    @JsonProperty("slow_subscription_rate")
+    public Integer slow_subscription_rate;
+
+    /**
+     * The number of milliseconds between slow consumer checks.
+     */
+    @XmlAttribute(name="slow-check-interval")
+    @JsonProperty("slow_check_interval")
+    public Long slow_check_interval;
+
+    /**
+     * Should this queue persistently store it's entries?
+     */
+    @XmlAttribute(name="persistent")
+    @JsonProperty("persistent")
+    public Boolean persistent;
+
+    /**
+     * Should messages be flushed or swapped out of memory if
+     * no consumers need the message?
+     */
+    @XmlAttribute(name="flush-to-store")
+    @JsonProperty("flush_to_store")
+    public Boolean flush_to_store;
+
+    /**
+     * The number max number of flushed queue entries to load
+     * for the store at a time.  Not that Flushed entires are just
+     * reference pointers to the actual messages.  When not loaded,
+     * the batch is referenced as sequence range to conserve memory.
+     */
+    @XmlAttribute(name="flush-range-size")
+    @JsonProperty("flush_range_size")
+    public Integer flush_range_size;
+
+    /**
+     * The number of intervals that a consumer must not meeting the subscription rate before it is
+     * flagged as a slow consumer.
+     */
+    @XmlAttribute(name="max-slow-intervals")
+    @JsonProperty("max_slow_intervals")
+    public Integer max_slow_intervals;
+
+}

Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java?rev=1040082&r1=1040081&r2=1040082&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java Mon Nov 29 12:14:59 2010
@@ -53,9 +53,17 @@ public class VirtualHostDTO extends Serv
      * Holds the configuration for the destinations.
      */
     @XmlElement(name="destination")
+    @JsonProperty("destinations")
     public ArrayList<DestinationDTO> destinations = new ArrayList<DestinationDTO>();
 
     /**
+     * Holds the configuration for the queues.
+     */
+    @XmlElement(name="queue")
+    @JsonProperty("queues")
+    public ArrayList<QueueDTO> queues = new ArrayList<QueueDTO>();
+
+    /**
      * Should connections get regroups so they get serviced by the same thread?
      */
     @XmlAttribute(name="regroup-connections")

Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/resources/org/apache/activemq/apollo/dto/jaxb.index
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/resources/org/apache/activemq/apollo/dto/jaxb.index?rev=1040082&r1=1040081&r2=1040082&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/resources/org/apache/activemq/apollo/dto/jaxb.index (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/resources/org/apache/activemq/apollo/dto/jaxb.index Mon Nov 29 12:14:59 2010
@@ -40,4 +40,6 @@ VirtualHostStatusDTO
 StompConnectionStatusDTO
 KeyStorageDTO
 SimpleStoreStatusDTO
-NullStoreDTO
\ No newline at end of file
+NullStoreDTO
+QueueDTO
+DestinationDTO
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala?rev=1040082&r1=1040081&r2=1040082&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala Mon Nov 29 12:14:59 2010
@@ -305,14 +305,17 @@ object Stomp {
   val DURABLE_PREFIX = ascii("durable:")
   val DURABLE_QUEUE_KIND = ascii("stomp:sub")
 
-  val options = new ParserOptions
-  options.queuePrefix = ascii("/queue/")
-  options.topicPrefix = ascii("/topic/")
+  val destination_parser = new DestinationParser
+  destination_parser.queue_prefix = ascii("/queue/")
+  destination_parser.topic_prefix = ascii("/topic/")
+  destination_parser.path_seperator = ascii("/")
+  destination_parser.any_child_wildcard = ascii("*")
+  destination_parser.any_descendant_wildcard = ascii("**")
 
-  options.defaultDomain = Router.QUEUE_DOMAIN
+  destination_parser.default_domain = Router.QUEUE_DOMAIN
 
   implicit def toDestination(value:AsciiBuffer):Destination = {
-    val d = DestinationParser.parse(value, options)
+    val d = destination_parser.parse(value)
     if( d==null ) {
       throw new ProtocolException("Invalid stomp destiantion name: "+value);
     }

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=1040082&r1=1040081&r2=1040082&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala Mon Nov 29 12:14:59 2010
@@ -743,7 +743,7 @@ class StompProtocolHandler extends Proto
       // way again)
       if (topic) {
         val rc = new DurableSubscriptionBindingDTO
-        rc.destination = destination.getName.toString
+        rc.destination = Binding.encode(destination.path)
         // TODO:
         // rc.client_id =
         rc.subscription_id = if( persistent ) id else null
@@ -751,7 +751,7 @@ class StompProtocolHandler extends Proto
         rc
       } else {
         val rc = new PointToPointBindingDTO
-        rc.destination = destination.getName.toString
+        rc.destination = Binding.encode(destination.path)
         rc
       }
     }

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala?rev=1040082&r1=1040081&r2=1040082&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala Mon Nov 29 12:14:59 2010
@@ -39,9 +39,9 @@ class StompRemoteConsumer extends Remote
     outboundSink.refiller = ^ {}
 
     val stompDestination = if (destination.getDomain() == Router.QUEUE_DOMAIN) {
-      ascii("/queue/" + destination.getName().toString());
+      ascii("/queue/" + destination.path().toString());
     } else {
-      ascii("/topic/" + destination.getName().toString());
+      ascii("/topic/" + destination.path().toString());
     }
 
     var frame = StompFrame(CONNECT);
@@ -150,9 +150,9 @@ class StompRemoteProducer extends Remote
     outboundSink.refiller = ^ {drain}
 
     if (destination.getDomain() == Router.QUEUE_DOMAIN) {
-      stompDestination = ascii("/queue/" + destination.getName().toString());
+      stompDestination = ascii("/queue/" + destination.path().toString());
     } else {
-      stompDestination = ascii("/topic/" + destination.getName().toString());
+      stompDestination = ascii("/topic/" + destination.path().toString());
     }
     outboundSink.offer(StompFrame(CONNECT));
     send_next

Modified: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/AnyChildPathNode.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/AnyChildPathNode.java?rev=1040082&r1=1040081&r2=1040082&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/AnyChildPathNode.java (original)
+++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/AnyChildPathNode.java Mon Nov 29 12:14:59 2010
@@ -34,14 +34,14 @@ public class AnyChildPathNode<Value> imp
         this.node = node;
     }
 
-    public void appendMatchingValues(Set<Value> answer, ArrayList<AsciiBuffer> paths, int startIndex) {
+    public void appendMatchingValues(Set<Value> answer, Path[] paths, int startIndex) {
     	for (PathNode<Value> child : getChildNodes()) {
             child.appendMatchingValues(answer, paths, startIndex);
         }
     }
 
 
-    public void appendMatchingWildcards(Set<Value> answer, ArrayList<AsciiBuffer> paths, int startIndex) {
+    public void appendMatchingWildcards(Set<Value> answer, Path[] paths, int startIndex) {
     	for (PathNode<Value> child : getChildNodes()) {
             child.appendMatchingWildcards(answer, paths, startIndex);
         }
@@ -54,7 +54,7 @@ public class AnyChildPathNode<Value> imp
         }
     }
 
-    public PathNode<Value> getChild(AsciiBuffer path) {
+    public PathNode<Value> getChild(Path path) {
         final Collection<PathNode<Value>> list = new ArrayList<PathNode<Value>>();
     	for (PathNode<Value> child : getChildNodes()) {
             PathNode<Value> answer = child.getChild(path);

Added: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/Path.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/Path.java?rev=1040082&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/Path.java (added)
+++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/Path.java Mon Nov 29 12:14:59 2010
@@ -0,0 +1,20 @@
+package org.apache.activemq.apollo.util.path;
+
+/**
+ * Holds the delimiters used to parse paths.
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+abstract public class Path {
+
+    public boolean matches(Path p) {
+        return true;
+    }
+
+    abstract public String toString(PathParser parser);
+
+    public boolean isLiteral() {
+        return false;
+    }
+
+}

Modified: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathFilter.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathFilter.java?rev=1040082&r1=1040081&r2=1040082&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathFilter.java (original)
+++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathFilter.java Mon Nov 29 12:14:59 2010
@@ -27,48 +27,8 @@ import org.fusesource.hawtbuf.AsciiBuffe
  * 
  * @version $Revision: 1.3 $
  */
-public abstract class PathFilter {
+public interface PathFilter {
 
-    public static final AsciiBuffer ANY_DESCENDENT = new AsciiBuffer(">");
-    public static final AsciiBuffer ANY_CHILD = new AsciiBuffer("*");
-    
-    public abstract boolean matches(AsciiBuffer path);
+    public boolean matches(Path[] path);
 
-    public static PathFilter parseFilter(AsciiBuffer path) {
-    	if( containsWildCards(path) ) { 
-	        ArrayList<AsciiBuffer> paths = PathSupport.parse(path);
-	        int idx = paths.size() - 1;
-	        if (idx >= 0) {
-	        	AsciiBuffer lastPath = paths.get(idx);
-	            if (lastPath.equals(ANY_DESCENDENT)) {
-	                return new PrefixPathFilter(paths);
-	            } else {
-	                while (idx >= 0) {
-	                    lastPath = paths.get(idx--);
-	                    if (lastPath.equals(ANY_CHILD)) {
-	                        return new WildcardPathFilter(paths);
-	                    }
-	                }
-	            }
-	        }
-    	}
-
-        // if none of the paths contain a wildcard then use equality
-        return new SimplePathFilter(path);
-    }
-    
-    public static boolean containsWildCards(AsciiBuffer path) {
-    	byte b1 = ANY_DESCENDENT.getData()[0];
-    	byte b2 = ANY_CHILD.getData()[0];
-    	
-    	byte[] data = path.getData();
-    	int length = path.getOffset()+path.getLength();
-		for (int i = path.getOffset(); i < length; i++) {
-			if( data[i] == b1 || data[i]==b2 ) {
-				return true;
-			}
-		}
-		return false;
-    }
-    
 }

Modified: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathMap.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathMap.java?rev=1040082&r1=1040081&r2=1040082&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathMap.java (original)
+++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathMap.java Mon Nov 29 12:14:59 2010
@@ -39,8 +39,6 @@ import org.fusesource.hawtbuf.AsciiBuffe
  * @version $Revision: 1.3 $
  */
 public class PathMap<Value> {
-    protected static final AsciiBuffer ANY_DESCENDENT = PathFilter.ANY_DESCENDENT;
-    protected static final AsciiBuffer ANY_CHILD = PathFilter.ANY_CHILD;
 
     private final PathMapNode<Value> root = new PathMapNode<Value>(null);
 
@@ -54,20 +52,18 @@ public class PathMap<Value> {
      * @return a List of matching values or an empty list if there are no
      *         matching values.
      */
-    public Set<Value> get(AsciiBuffer key) {
+    public Set<Value> get(Path[] key) {
         return findWildcardMatches(key);
     }
 
-    public void put(AsciiBuffer key, Value value) {
-        ArrayList<AsciiBuffer> paths = PathSupport.parse(key);
-        root.add(paths, 0, value);
+    public void put(Path[] key, Value value) {
+        root.add(key, 0, value);
     }
 
     /**
      * Removes the value from the associated path
      */
-    public void remove(AsciiBuffer key, Value value) {
-        ArrayList<AsciiBuffer> paths = PathSupport.parse(key);
+    public void remove(Path[] paths, Value value) {
         root.remove(paths, 0, value);
 
     }
@@ -78,20 +74,7 @@ public class PathMap<Value> {
 
     // Implementation methods
     // -------------------------------------------------------------------------
-
-    /**
-     * A helper method to allow the path map to be populated from a
-     * dependency injection framework such as Spring
-     */
-    @SuppressWarnings("unchecked")
-	protected void setEntries(List<PathMapEntry> entries) {
-    	for (PathMapEntry entry : entries) {
-            put(entry.getKey(), (Value) entry);
-        }
-    }
-
-    protected Set<Value> findWildcardMatches(AsciiBuffer key) {
-    	ArrayList<AsciiBuffer> paths = PathSupport.parse(key);
+    protected Set<Value> findWildcardMatches(Path[] paths) {
         HashSet<Value> answer = new HashSet<Value>();
         root.appendMatchingValues(answer, paths, 0);
         return answer;
@@ -101,10 +84,9 @@ public class PathMap<Value> {
      * @param key
      * @return
      */
-    public Set<Value> removeAll(AsciiBuffer key) {
+    public Set<Value> removeAll(Path[] key) {
     	HashSet<Value> rc = new HashSet<Value>();
-        ArrayList<AsciiBuffer> paths = PathSupport.parse(key);
-        root.removeAll(rc, paths, 0);
+        root.removeAll(rc, key, 0);
         return rc;
     }
 
@@ -112,11 +94,11 @@ public class PathMap<Value> {
      * Returns the value which matches the given path or null if there is
      * no matching value. If there are multiple values, the results are sorted
      * and the last item (the biggest) is returned.
-     * 
+     *
      * @param path the path to find the value for
      * @return the largest matching value or null if no value matches
      */
-    public Value chooseValue(AsciiBuffer path) {
+    public Value chooseValue(Path[] path) {
         Set<Value> set = get(path);
         if (set == null || set.isEmpty()) {
             return null;

Modified: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathMapNode.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathMapNode.java?rev=1040082&r1=1040081&r2=1040082&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathMapNode.java (original)
+++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathMapNode.java Mon Nov 29 12:14:59 2010
@@ -24,22 +24,18 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import org.fusesource.hawtbuf.AsciiBuffer;
-
 /**
  * An implementation class used to implement {@link PathMap}
  * 
  * @version $Revision: 1.2 $
  */
 public class PathMapNode<Value> implements PathNode<Value> {
-    protected static final AsciiBuffer ANY_CHILD = PathMap.ANY_CHILD;
-    protected static final AsciiBuffer ANY_DESCENDENT = PathMap.ANY_DESCENDENT;
 
     // we synchronize at the PathMap level
     private PathMapNode<Value> parent;
     private List<Value> values = new ArrayList<Value>();
-    private Map<AsciiBuffer, PathNode<Value>> childNodes = new HashMap<AsciiBuffer, PathNode<Value>>();
-    private AsciiBuffer path = new AsciiBuffer("Root");
+    private Map<Path, PathNode<Value>> childNodes = new HashMap<Path, PathNode<Value>>();
+    private Path path = PathParser.ROOT;
     private int pathLength;
 
     public PathMapNode(PathMapNode<Value> parent) {
@@ -55,7 +51,7 @@ public class PathMapNode<Value> implemen
      * Returns the child node for the given named path or null if it does not
      * exist
      */
-    public PathMapNode<Value> getChild(AsciiBuffer path) {
+    public PathMapNode<Value> getChild(Path path) {
         return (PathMapNode<Value>)childNodes.get(path);
     }
 
@@ -74,12 +70,12 @@ public class PathMapNode<Value> implemen
      * Returns the child node for the given named path, lazily creating one if
      * it does not yet exist
      */
-    public PathMapNode<Value> getChildOrCreate(AsciiBuffer asciiBuffer) {
-        PathMapNode<Value> answer = (PathMapNode<Value>)childNodes.get(asciiBuffer);
+    public PathMapNode<Value> getChildOrCreate(Path path) {
+        PathMapNode<Value> answer = (PathMapNode<Value>)childNodes.get(path);
         if (answer == null) {
             answer = createChildNode();
-            answer.path = asciiBuffer;
-            childNodes.put(asciiBuffer, answer);
+            answer.path = path;
+            childNodes.put(path, answer);
         }
         return answer;
     }
@@ -124,8 +120,8 @@ public class PathMapNode<Value> implemen
         return answer;
     }
 
-    public void add(ArrayList<AsciiBuffer> paths, int idx, Value value) {
-        if (idx >= paths.size()) {
+    public void add(Path[] paths, int idx, Value value) {
+        if (idx >= paths.length) {
             values.add(value);
         } else {
             // if (idx == paths.size() - 1) {
@@ -134,12 +130,12 @@ public class PathMapNode<Value> implemen
             // else {
             // getAnyChildNode().add(paths, idx + 1, value);
             // }
-            getChildOrCreate(paths.get(idx)).add(paths, idx + 1, value);
+            getChildOrCreate(paths[idx]).add(paths, idx + 1, value);
         }
     }
 
-    public void remove(ArrayList<AsciiBuffer> paths, int idx, Value value) {
-        if (idx >= paths.size()) {
+    public void remove(Path[] paths, int idx, Value value) {
+        if (idx >= paths.length) {
             values.remove(value);
             pruneIfEmpty();
         } else {
@@ -149,23 +145,23 @@ public class PathMapNode<Value> implemen
             // else {
             // getAnyChildNode().remove(paths, idx + 1, value);
             // }
-            getChildOrCreate(paths.get(idx)).remove(paths, ++idx, value);
+            getChildOrCreate(paths[idx]).remove(paths, ++idx, value);
         }
     }
 
-    public void removeAll(Set<Value> answer, ArrayList<AsciiBuffer> paths, int startIndex) {
+    public void removeAll(Set<Value> answer, Path[] paths, int startIndex) {
         PathNode<Value> node = this;
-        int size = paths.size();
+        int size = paths.length;
         for (int i = startIndex; i < size && node != null; i++) {
 
-            AsciiBuffer path = paths.get(i);
-            if (path.equals(ANY_DESCENDENT)) {
+            Path path = paths[i];
+            if (path == PathParser.ANY_DESCENDANT) {
                 answer.addAll(node.removeDesendentValues());
                 break;
             }
 
             node.appendMatchingWildcards(answer, paths, i);
-            if (path.equals(ANY_CHILD)) {
+            if (path == PathParser.ANY_CHILD ) {
                 // node = node.getAnyChildNode();
                 node = new AnyChildPathNode<Value>(node);
             } else {
@@ -203,27 +199,27 @@ public class PathMapNode<Value> implemen
     /**
      * Matches any entries in the map containing wildcards
      */
-    public void appendMatchingWildcards(Set<Value> answer, ArrayList<AsciiBuffer> paths, int idx) {
+    public void appendMatchingWildcards(Set<Value> answer, Path[] paths, int idx) {
         if (idx - 1 > pathLength) {
             return;
         }
-        PathMapNode<Value> wildCardNode = getChild(ANY_CHILD);
+        PathMapNode<Value> wildCardNode = getChild(PathParser.ANY_CHILD);
         if (wildCardNode != null) {
             wildCardNode.appendMatchingValues(answer, paths, idx + 1);
         }
-        wildCardNode = getChild(ANY_DESCENDENT);
+        wildCardNode = getChild(PathParser.ANY_DESCENDANT);
         if (wildCardNode != null) {
             answer.addAll(wildCardNode.getDesendentValues());
         }
     }
 
-    public void appendMatchingValues(Set<Value> answer, ArrayList<AsciiBuffer> paths, int startIndex) {
+    public void appendMatchingValues(Set<Value> answer, Path[] paths, int startIndex) {
         PathNode<Value> node = this;
         boolean couldMatchAny = true;
-        int size = paths.size();
+        int size = paths.length;
         for (int i = startIndex; i < size && node != null; i++) {
-            AsciiBuffer path = paths.get(i);
-            if (path.equals(ANY_DESCENDENT)) {
+            Path path = paths[i];
+            if (path.equals(PathParser.ANY_DESCENDANT)) {
                 answer.addAll(node.getDesendentValues());
                 couldMatchAny = false;
                 break;
@@ -231,7 +227,7 @@ public class PathMapNode<Value> implemen
 
             node.appendMatchingWildcards(answer, paths, i);
 
-            if (path.equals(ANY_CHILD)) {
+            if (path.equals(PathParser.ANY_CHILD)) {
                 node = new AnyChildPathNode<Value>(node);
             } else {
                 node = node.getChild(path);
@@ -241,7 +237,7 @@ public class PathMapNode<Value> implemen
             answer.addAll(node.getValues());
             if (couldMatchAny) {
                 // lets allow FOO.BAR to match the FOO.BAR.> entry in the map
-                PathNode<Value> child = node.getChild(ANY_DESCENDENT);
+                PathNode<Value> child = node.getChild(PathParser.ANY_DESCENDANT);
                 if (child != null) {
                     answer.addAll(child.getValues());
                 }
@@ -249,7 +245,7 @@ public class PathMapNode<Value> implemen
         }
     }
 
-    public AsciiBuffer getPath() {
+    public Path getPath() {
         return path;
     }
 

Modified: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathNode.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathNode.java?rev=1040082&r1=1040081&r2=1040082&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathNode.java (original)
+++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathNode.java Mon Nov 29 12:14:59 2010
@@ -27,15 +27,15 @@ import org.fusesource.hawtbuf.AsciiBuffe
  *
  */
 public interface PathNode<Value> {
-    void appendMatchingValues(Set<Value> answer, ArrayList<AsciiBuffer> paths, int startIndex);
+    void appendMatchingValues(Set<Value> answer, Path[] paths, int startIndex);
 
-    void appendMatchingWildcards(Set<Value> answer, ArrayList<AsciiBuffer> paths, int startIndex);
+    void appendMatchingWildcards(Set<Value> answer, Path[] paths, int startIndex);
 
     void appendDescendantValues(Set<Value> answer);
 
     Collection<Value> getDesendentValues();
 
-    PathNode<Value> getChild(AsciiBuffer path);
+    PathNode<Value> getChild(Path path);
 
     Collection<Value> getValues();
 

Added: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathParser.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathParser.java?rev=1040082&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathParser.java (added)
+++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathParser.java Mon Nov 29 12:14:59 2010
@@ -0,0 +1,237 @@
+package org.apache.activemq.apollo.util.path;
+
+import org.fusesource.hawtbuf.AsciiBuffer;
+import org.fusesource.hawtbuf.ByteArrayOutputStream;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedList;
+
+/**
+ * Holds the delimiters used to parse paths.
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class PathParser {
+
+    public static final RootPath ROOT = new RootPath();
+    public static final AnyDescendantPath ANY_DESCENDANT = new AnyDescendantPath();
+    public static final AnyChildPath ANY_CHILD = new AnyChildPath();
+
+    public AsciiBuffer any_descendant_wildcard = new AsciiBuffer("**");
+    public AsciiBuffer any_child_wildcard = new AsciiBuffer("*");
+    public AsciiBuffer path_seperator = new AsciiBuffer(".");
+
+    private static class RootPath extends Path {
+        public String toString(PathParser parser) {
+            return "";
+        }
+        public boolean matches(Path p) {
+            return p == ROOT;
+        }
+    }
+
+    private static class AnyChildPath extends Path {
+        public String toString(PathParser parser) {
+            return parser.any_child_wildcard.toString();
+        }
+    }
+
+    private static class AnyDescendantPath extends Path {
+        public String toString(PathParser parser) {
+            return parser.any_descendant_wildcard.toString();
+        }
+    }
+
+    class LiteralPath extends Path {
+
+        private final AsciiBuffer value;
+
+        public LiteralPath(AsciiBuffer value) {
+            this.value = value;
+        }
+        public boolean isLiteral() {
+            return true;
+        }
+
+        public String toString(PathParser parser) {
+            return value.toString();
+        }
+
+        public boolean matches(Path p) {
+            if( p.isLiteral() ) {
+                return ((LiteralPath)p).value.equals(value);
+            }
+            // we match any type of wildcard..
+            return true;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+
+            LiteralPath that = (LiteralPath) o;
+            return value.equals(that.value);
+        }
+
+        @Override
+        public int hashCode() {
+            return value.hashCode();
+        }
+    }
+
+    public Path[] parsePath(AsciiBuffer subject) {
+    	ArrayList<Path> list = new ArrayList<Path>(10);
+        int previous = 0;
+        int lastIndex = subject.getLength() - 1;
+        while (true) {
+            int idx = subject.indexOf(path_seperator, previous);
+            if (idx < 0) {
+            	AsciiBuffer buffer = subject.slice(previous, lastIndex + 1).ascii();
+                list.add(parsePart(buffer));
+                break;
+            }
+        	AsciiBuffer buffer = subject.slice(previous, idx).ascii();
+            list.add(parsePart(buffer));
+            previous = idx + 1;
+        }
+        return list.toArray(new Path[list.size()]);
+    }
+
+    private Path parsePart(AsciiBuffer value) {
+        if( value.equals(any_child_wildcard) ) {
+            return ANY_CHILD;
+        } else if( value.equals(any_descendant_wildcard) ) {
+            return ANY_DESCENDANT;
+        } else {
+            return new LiteralPath(value);
+        }
+    }
+
+    /**
+     * Converts the paths back to the string representation.
+     *
+     * @param paths
+     * @return
+     */
+    public String toString(Path[] paths) {
+        StringBuffer buffer = new StringBuffer();
+        for (int i = 0; i < paths.length; i++) {
+            if (i > 0) {
+                buffer.append(path_seperator);
+            }
+            buffer.append(paths[i].toString(this));
+        }
+        return buffer.toString();
+    }
+
+    public void write(Path[] paths, ByteArrayOutputStream os) {
+        StringBuffer buffer = new StringBuffer();
+        for (int i = 0; i < paths.length; i++) {
+            if (i > 0) {
+                buffer.append(path_seperator);
+            }
+            buffer.append(paths[i].toString(this));
+        }
+    }
+
+    static interface PartFilter {
+        public boolean matches(LinkedList<Path> remaining);
+    }
+
+    class LitteralPathFilter implements PartFilter {
+
+        private final PartFilter next;
+        private final LiteralPath path;
+
+        public LitteralPathFilter(PartFilter next, LiteralPath path) {
+            this.next = next;
+
+            this.path = path;
+        }
+        public boolean matches(LinkedList<Path> remaining) {
+            if( !remaining.isEmpty() ) {
+                Path p = remaining.removeFirst();
+                if( !path.matches(p) ) {
+                    return false;
+                }
+                if( next!=null ) {
+                    return next.matches(remaining);
+                } else {
+                    return remaining.isEmpty();
+                }
+            } else {
+                return false;
+            }
+        }
+    }
+
+    static class AnyChildPathFilter implements PartFilter {
+        private final PartFilter next;
+
+        public AnyChildPathFilter(PartFilter next) {
+            this.next = next;
+        }
+        public boolean matches(LinkedList<Path> remaining) {
+            if( !remaining.isEmpty() ) {
+                Path p = remaining.removeFirst();
+                if( next!=null ) {
+                    return next.matches(remaining);
+                } else {
+                    return remaining.isEmpty();
+                }
+            } else {
+                return false;
+            }
+        }
+    }
+
+    static class AnyDecendentPathFilter implements PartFilter {
+        private final PartFilter next;
+
+        public AnyDecendentPathFilter(PartFilter next) {
+            this.next = next;
+        }
+        public boolean matches(LinkedList<Path> remaining) {
+            if( !remaining.isEmpty() ) {
+                remaining.clear();
+                return true;
+            } else {
+                return false;
+            }
+        }
+    }
+
+    public PathFilter parseFilter(AsciiBuffer path) {
+        Path[] paths = parsePath(path);
+        Collections.reverse(Arrays.asList(paths));
+        PartFilter last = null;
+        for( Path p: paths ) {
+            if( p.isLiteral() ) {
+                last = new LitteralPathFilter(last, (LiteralPath)p);
+            } else if( p == ANY_CHILD ) {
+                last = new AnyChildPathFilter(last);
+            } else if( p == ANY_DESCENDANT ) {
+                last = new AnyDecendentPathFilter(last);
+            }
+        }
+        final PartFilter filter = last;
+        return new PathFilter() {
+            public boolean matches(Path[] path) {
+                return filter.matches(new LinkedList(Arrays.asList(path)));
+            }
+        };
+    }
+
+    static public boolean containsWildCards(Path[] paths) {
+        for(Path p:paths) {
+            if( p==ANY_DESCENDANT || p==ANY_CHILD) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+}

Modified: activemq/activemq-apollo/trunk/apollo-util/src/test/scala/org/apache/activemq/apollo/util/path/PathMapMemoryTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/test/scala/org/apache/activemq/apollo/util/path/PathMapMemoryTest.java?rev=1040082&r1=1040081&r2=1040082&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/test/scala/org/apache/activemq/apollo/util/path/PathMapMemoryTest.java (original)
+++ activemq/activemq-apollo/trunk/apollo-util/src/test/scala/org/apache/activemq/apollo/util/path/PathMapMemoryTest.java Mon Nov 29 12:14:59 2010
@@ -28,9 +28,15 @@ import static org.junit.Assert.*;
  */
 public class PathMapMemoryTest {
 
+    PathParser parser = new PathParser();
+
+    protected Path[] createDestination(String name) {
+   		return parser.parsePath(new AsciiBuffer(name));
+    }
+
     @Test()
 	public void testLongPath() throws Exception {
-    	AsciiBuffer d1 = new AsciiBuffer("1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18");
+    	Path[] d1 = createDestination("1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18");
         PathMap<String> map = new PathMap<String>();
         map.put(d1, "test");
     }
@@ -45,7 +51,7 @@ public class PathMapMemoryTest {
             }
             // System.out.println("Checking: " + name);
             try {
-            	AsciiBuffer d1 = new AsciiBuffer(name);
+            	Path[] d1 = createDestination(name);
                 PathMap<String> map = new PathMap<String>();
                 map.put(d1, "test");
             } catch (Throwable e) {
@@ -60,11 +66,11 @@ public class PathMapMemoryTest {
         Object value = new Object();
         int count = 1000;
         for (int i = 0; i < count; i++) {
-            AsciiBuffer queue = new AsciiBuffer("connection:"+i);
+            Path[] queue = createDestination("connection:"+i);
             map.put(queue, value);
         }
         for (int i = 0; i < count; i++) {
-            AsciiBuffer queue = new AsciiBuffer("connection:"+i);
+            Path[] queue = createDestination("connection:"+i);
             map.remove(queue, value);
             Set<Object> set = map.get(queue);
             assertTrue(set.isEmpty());

Modified: activemq/activemq-apollo/trunk/apollo-util/src/test/scala/org/apache/activemq/apollo/util/path/PathMapTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/test/scala/org/apache/activemq/apollo/util/path/PathMapTest.java?rev=1040082&r1=1040081&r2=1040082&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/test/scala/org/apache/activemq/apollo/util/path/PathMapTest.java (original)
+++ activemq/activemq-apollo/trunk/apollo-util/src/test/scala/org/apache/activemq/apollo/util/path/PathMapTest.java Mon Nov 29 12:14:59 2010
@@ -33,9 +33,11 @@ import static org.junit.Assert.*;
  */
 public class PathMapTest {
 
-    protected AsciiBuffer d1 = createDestination("TEST.D1");
-    protected AsciiBuffer d2 = createDestination("TEST.BAR.D2");
-    protected AsciiBuffer d3 = createDestination("TEST.BAR.D3");
+    PathParser parser = new PathParser();
+
+    protected Path[] d1 = createDestination("TEST.D1");
+    protected Path[] d2 = createDestination("TEST.BAR.D2");
+    protected Path[] d3 = createDestination("TEST.BAR.D3");
 
     protected String v1 = "value1";
     protected String v2 = "value2";
@@ -46,8 +48,8 @@ public class PathMapTest {
 
     @Test()
 	public void testCompositePaths() throws Exception {
-        AsciiBuffer d1 = createDestination("TEST.BAR.D2");
-        AsciiBuffer d2 = createDestination("TEST.BAR.D3");
+        Path[] d1 = createDestination("TEST.BAR.D2");
+        Path[] d2 = createDestination("TEST.BAR.D3");
         PathMap<String> map = new PathMap<String>();
         map.put(d1, v1);
         map.put(d2, v2);
@@ -113,11 +115,11 @@ public class PathMapTest {
         map.put(d2, v2);
         map.put(d3, v3);
 
-        assertMapValue(map, ">", v1, v2, v3);
-        assertMapValue(map, "TEST.>", v1, v2, v3);
-        assertMapValue(map, "*.>", v1, v2, v3);
+        assertMapValue(map, "**", v1, v2, v3);
+        assertMapValue(map, "TEST.**", v1, v2, v3);
+        assertMapValue(map, "*.**", v1, v2, v3);
 
-        assertMapValue(map, "FOO.>");
+        assertMapValue(map, "FOO.**");
     }
 
     @Test()
@@ -163,9 +165,9 @@ public class PathMapTest {
 
         assertMapValue(map, "TEST.*", v1, v2);
         assertMapValue(map, "TEST.*.*", v2, v3, v4, v5, v6);
-        assertMapValue(map, "TEST.*.>", v1, v2, v3, v4, v5, v6);
-        assertMapValue(map, "TEST.>", v1, v2, v3, v4, v5, v6);
-        assertMapValue(map, "TEST.>.>", v1, v2, v3, v4, v5, v6);
+        assertMapValue(map, "TEST.*.**", v1, v2, v3, v4, v5, v6);
+        assertMapValue(map, "TEST.**", v1, v2, v3, v4, v5, v6);
+        assertMapValue(map, "TEST.**.**", v1, v2, v3, v4, v5, v6);
         assertMapValue(map, "*.*.D3", v2, v3, v5);
         assertMapValue(map, "TEST.BAR.*", v2, v5, v6);
 
@@ -194,7 +196,7 @@ public class PathMapTest {
     @Test()
 	public void testAnyPathWildcardInMap() throws Exception {
         PathMap<String> map = new PathMap<String>();
-        put(map, "TEST.FOO.>", v1);
+        put(map, "TEST.FOO.**", v1);
 
         assertMapValue(map, "TEST.FOO.BAR.WHANOT.A.B.C", v1);
         assertMapValue(map, "TEST.FOO.BAR.WHANOT", v1);
@@ -245,12 +247,12 @@ public class PathMapTest {
         assertMapValue(map, "TEST.*", v3, v4);
         assertMapValue(map, "*.*", v3, v4);
 
-        remove(map, ">", v4);
+        remove(map, "**", v4);
 
         assertMapValue(map, "TEST.*", v3);
         assertMapValue(map, "*.*", v3);
 
-        remove(map, "TEST.>", v3);
+        remove(map, "TEST.**", v3);
         remove(map, "TEST.FOO.BAR", v5);
 
         assertMapValue(map, "FOO");
@@ -272,7 +274,7 @@ public class PathMapTest {
 
         assertSample2(map);
 
-        remove(map, ">", v4);
+        remove(map, "**", v4);
         remove(map, "TEST.*", v2);
 
         assertMapValue(map, "FOO");
@@ -296,22 +298,22 @@ public class PathMapTest {
         PathMap<String> map = new PathMap<String>();
 
         put(map, "FOO.A", v1);
-        assertMapValue(map, "FOO.>", v1);
+        assertMapValue(map, "FOO.**", v1);
 
         put(map, "FOO.B", v2);
-        assertMapValue(map, "FOO.>", v1, v2);
+        assertMapValue(map, "FOO.**", v1, v2);
 
         map.removeAll(createDestination("FOO.A"));
 
-        assertMapValue(map, "FOO.>", v2);
+        assertMapValue(map, "FOO.**", v2);
 
     }
 
     protected void loadSample2(PathMap<String> map) {
         put(map, "TEST.FOO", v1);
         put(map, "TEST.*", v2);
-        put(map, "TEST.>", v3);
-        put(map, ">", v4);
+        put(map, "TEST.**", v3);
+        put(map, "**", v4);
         put(map, "TEST.FOO.BAR", v5);
         put(map, "TEST.XYZ", v6);
     }
@@ -338,17 +340,17 @@ public class PathMapTest {
     }
 
     protected void remove(PathMap<String> map, String name, String value) {
-        AsciiBuffer destination = createDestination(name);
+        Path[] destination = createDestination(name);
         map.remove(destination, value);
     }
 
     protected void assertMapValue(PathMap<String> map, String destinationName, Object... expected) {
-        AsciiBuffer destination = createDestination(destinationName);
+        Path[] destination = createDestination(destinationName);
         assertMapValue(map, destination, expected);
     }
 
     @SuppressWarnings("unchecked")
-    protected void assertMapValue(PathMap<String> map, AsciiBuffer destination, Object... expected) {
+    protected void assertMapValue(PathMap<String> map, Path[] destination, Object... expected) {
         List expectedList = Arrays.asList(expected);
         Collections.sort(expectedList);
         Set actualSet = map.get(destination);
@@ -357,7 +359,7 @@ public class PathMapTest {
         assertEquals(("map value for destinationName:  " + destination), expectedList, actual);
     }
 
-    protected AsciiBuffer createDestination(String name) {
-   		return new AsciiBuffer(new AsciiBuffer(name));
+    protected Path[] createDestination(String name) {
+   		return parser.parsePath(new AsciiBuffer(name));
     }
 }