You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/04/04 16:59:59 UTC

svn commit: r1309437 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-dto/src/main/java/org/apache/activemq/apollo/dto/ apollo-stomp/src/test/resources/ apollo-website/src/documentation/

Author: chirino
Date: Wed Apr  4 14:59:59 2012
New Revision: 1309437

URL: http://svn.apache.org/viewvc?rev=1309437&view=rev
Log:
Fixes APLO-183 : Allow customizing the settings of the per subscription queues created when slow_consumer_policy="queue" on a topic

Added:
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueSettingsDTO.java
      - copied, changed from r1308215, activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java
Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Binding.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TopicDTO.java
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp.xml
    activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Binding.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Binding.scala?rev=1309437&r1=1309436&r2=1309437&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Binding.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Binding.scala Wed Apr  4 14:59:59 2012
@@ -76,7 +76,7 @@ trait Binding {
 
   def message_filter:BooleanExpression = ConstantExpression.TRUE
 
-  def config(host:VirtualHost):QueueDTO
+  def config(host:VirtualHost):QueueSettingsDTO
 
   override def toString = address.toString
 }
@@ -255,7 +255,7 @@ object TempQueueBinding {
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-case class TempQueueBinding(key:AnyRef, address:DestinationAddress) extends Binding {
+case class TempQueueBinding(key:AnyRef, address:DestinationAddress, settings:QueueSettingsDTO) extends Binding {
   import TempQueueBinding._
 
   def binding_kind = TEMP_KIND
@@ -270,7 +270,7 @@ case class TempQueueBinding(key:AnyRef, 
 
   override def hashCode = if(key==null) 0 else key.hashCode
 
-  def config(host: VirtualHost) = new QueueDTO
+  def config(host: VirtualHost) = settings
 
   override def equals(o:Any):Boolean = o match {
     case x: TempQueueBinding => x.key == key

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala?rev=1309437&r1=1309436&r2=1309437&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala Wed Apr  4 14:59:59 2012
@@ -153,11 +153,11 @@ object LocalRouter extends Log {
 
   val destination_parser = new DestinationParser
 
-  def is_wildcard_config(dto:StringIdDTO) = {
-    if( dto.id == null ) {
+  def is_wildcard_destination(id:String) = {
+    if( id == null ) {
       true
     } else {
-      val path = destination_parser.decode_path(dto.id)
+      val path = destination_parser.decode_path(id)
       PathParser.containsWildCards(path)
     }
   }
@@ -740,8 +740,7 @@ class LocalRouter(val virtual_host:Virtu
       assert( !PathParser.containsWildCards(path) )
       add_destination(path, queue)
 
-      import OptionSupport._
-      if( queue.config.mirrored.getOrElse(false) ) {
+      if( queue.mirrored ) {
         // hook up the queue to be a subscriber of the topic.
         val topic = local_topic_domain.get_or_create_destination(SimpleAddress("topic", path), null).success
         topic.bind(SimpleAddress("queue", path), queue)
@@ -752,8 +751,7 @@ class LocalRouter(val virtual_host:Virtu
       val path = queue.address.path
       remove_destination(path, queue)
 
-      import OptionSupport._
-      if( queue.config.mirrored.getOrElse(false) ) {
+      if( queue.mirrored ) {
         // unhook the queue from the topic
         val topic = local_topic_domain.get_or_create_destination(SimpleAddress("topic", path), null).success
         topic.unbind(queue, false)
@@ -818,22 +816,22 @@ class LocalRouter(val virtual_host:Virtu
 
   protected def create_configure_destinations {
     import collection.JavaConversions._
-    def create_configured_dests(list: ArrayList[_ <: StringIdDTO], d: Domain[_], to_address: (Path) => DestinationAddress) = {
-      list.foreach { dto =>
-        if (dto.id != null) {
+    def create_configured_dests(list: Traversable[String], d: Domain[_], to_address: (Path) => DestinationAddress) = {
+      list.foreach { id =>
+        if (id != null) {
           try {
-            val path = destination_parser.decode_path(dto.id)
+            val path = destination_parser.decode_path(id)
             if (!PathParser.containsWildCards(path)) {
               d.get_or_create_destination(to_address(path), null)
             }
           } catch {
-            case x:PathException => warn(x, "Invalid destination id '%s'", dto.id)
+            case x:PathException => warn(x, "Invalid destination id '%s'", id)
           }
         }
       }
     }
-    create_configured_dests(virtual_host.config.queues, local_queue_domain, (path) => SimpleAddress("queue", path))
-    create_configured_dests(virtual_host.config.topics, local_topic_domain, (path) => SimpleAddress("topic", path))
+    create_configured_dests(virtual_host.config.queues.map(_.id), local_queue_domain, (path) => SimpleAddress("queue", path))
+    create_configured_dests(virtual_host.config.topics.map(_.id), local_topic_domain, (path) => SimpleAddress("topic", path))
 
     virtual_host.config.dsubs.foreach { dto =>
       if (dto.id != null && ( dto.topic!=null || !dto.topics.isEmpty) ) {

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1309437&r1=1309436&r2=1309437&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Wed Apr  4 14:59:59 2012
@@ -214,12 +214,12 @@ class Queue(val router: LocalRouter, val
   var loaded_size = 0
   def swapped_in_size_max = this.producer_swapped_in.size_max + this.consumer_swapped_in.size_max
 
-  var config:QueueDTO = _
+  var config:QueueSettingsDTO = _
   var full_drop_policy:FullDropPolicy = Block
 
   def dlq_nak_limit = OptionSupport(config.nak_limit).getOrElse(0)
 
-  def configure(update:QueueDTO) = {
+  def configure(update:QueueSettingsDTO) = {
     def mem_size(value:String, default:String) = MemoryPropertyEditor.parse(Option(value).getOrElse(default)).toInt
 
     producer_swapped_in.size_max += mem_size(update.tail_buffer, "640k") - Option(config).map{ config=>
@@ -242,19 +242,29 @@ class Queue(val router: LocalRouter, val
         warn("Invalid 'full_drop_policy' configured for queue '%s': '%s'", id, update.full_policy)
         Block
     }
-    
-    auto_delete_after = update.auto_delete_after.getOrElse(30)
-    if( auto_delete_after!= 0 ) {
-      // we don't auto delete explicitly configured queues,
-      // non destination queues, or mirrored queues.
-      if( update.mirrored.getOrElse(false) || !binding.isInstanceOf[QueueDomainQueueBinding] || !LocalRouter.is_wildcard_config(update) ) {
-        auto_delete_after = 0
-      }
+
+    update match {
+      case update:QueueDTO =>
+        auto_delete_after = update.auto_delete_after.getOrElse(30)
+        if( auto_delete_after!= 0 ) {
+          // we don't auto delete explicitly configured queues,
+          // non destination queues, or mirrored queues.
+          if( update.mirrored.getOrElse(false) || !binding.isInstanceOf[QueueDomainQueueBinding] || !LocalRouter.is_wildcard_destination(update.id) ) {
+            auto_delete_after = 0
+          }
+        }
+      case _ =>
     }
     config = update
     this
   }
 
+  def mirrored = config match {
+    case config:QueueDTO =>
+      config.mirrored.getOrElse(false)
+    case _ => false
+  }
+
   def get_queue_metrics:DestMetricsDTO = {
     dispatch_queue.assertExecuting()
     val rc = new DestMetricsDTO
@@ -1149,8 +1159,7 @@ class Queue(val router: LocalRouter, val
   }
 
   def connect (connect_address:ConnectAddress, producer:BindableDeliveryProducer) = {
-    import OptionSupport._
-    if( config.mirrored.getOrElse(false) ) {
+    if( mirrored ) {
       // this is a mirrored queue.. actually have the produce bind to the topic, instead of the
       val topic_address = new SimpleAddress("topic", binding.address.path)
       val topic = router.local_topic_domain.get_or_create_destination(topic_address, null).success
@@ -1165,8 +1174,7 @@ class Queue(val router: LocalRouter, val
   }
 
   def disconnect (producer:BindableDeliveryProducer) = {
-    import OptionSupport._
-    if( config.mirrored.getOrElse(false) ) {
+    if( mirrored ) {
       val topic_address = new SimpleAddress("topic", binding.address.path)
       val topic = router.local_topic_domain.get_or_create_destination(topic_address, null).success
       topic.disconnect(producer)

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala?rev=1309437&r1=1309436&r2=1309437&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala Wed Apr  4 14:59:59 2012
@@ -306,7 +306,7 @@ class Topic(val router:LocalRouter, val 
     auto_delete_after = config.auto_delete_after.getOrElse(30)
     if( auto_delete_after!= 0 ) {
       // we don't auto delete explicitly configured destinations.
-      if( !LocalRouter.is_wildcard_config(config) ) {
+      if( !LocalRouter.is_wildcard_destination(config.id) ) {
         auto_delete_after = 0
       }
     }
@@ -343,7 +343,7 @@ class Topic(val router:LocalRouter, val 
           case "queue" =>
 
             // create a temp queue so that it can spool
-            val queue = router._create_queue(new TempQueueBinding(consumer, address))
+            val queue = router._create_queue(new TempQueueBinding(consumer, address, Option(config.subscription).getOrElse(new QueueSettingsDTO)))
             queue.dispatch_queue.setTargetQueue(consumer.dispatch_queue)
             queue.bind(List(consumer))
             consumer_queues += consumer->queue

Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java?rev=1309437&r1=1309436&r2=1309437&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java Wed Apr  4 14:59:59 2012
@@ -30,7 +30,13 @@ import java.util.List;
 @XmlRootElement(name = "queue")
 @XmlAccessorType(XmlAccessType.FIELD)
 @JsonIgnoreProperties(ignoreUnknown = true)
-public class QueueDTO extends StringIdDTO {
+public class QueueDTO extends QueueSettingsDTO {
+
+    /**
+     * A unique id of queue
+     */
+	@XmlAttribute
+	public String id;
 
     /**
      * Controls when the queue will auto delete.
@@ -53,116 +59,6 @@ public class QueueDTO extends StringIdDT
     @XmlAttribute
     public Boolean mirrored;
 
-    /**
-     *  The amount of memory buffer space to use for swapping messages
-     *  out.
-     */
-    @XmlAttribute(name="tail_buffer")
-    public String tail_buffer;
-
-    /**
-     * Should this queue persistently store it's entries?
-     */
-    @XmlAttribute(name="persistent")
-    public Boolean persistent;
-
-    /**
-     * Should messages be swapped out of memory if
-     * no consumers need the message?
-     */
-    @XmlAttribute(name="swap")
-    public Boolean swap;
-
-    /**
-     * The number max number of swapped queue entries to load
-     * from the store at a time.  Note that swapped entries are just
-     * reference pointers to the actual messages.  When not loaded,
-     * the batch is referenced as sequence range to conserve memory.
-     */
-    @XmlAttribute(name="swap_range_size")
-    public Integer swap_range_size;
-
-    /**
-     * The maximum amount of size the queue is allowed
-     * to grow to.  If not set then there is no limit.  You can
-     * use settings values like: 500mb or 1g just plain 1024000
-     */
-    @XmlAttribute(name="quota")
-    public String quota;
-
-    /**
-     * Once the queue is full, the `full_policy` controls how the
-     * queue behaves when additional messages attempt to be enqueued
-     * onto the queue.
-     *
-     * You can set it to one of the following options:
-     *  `block`: The producer blocks until some space frees up.
-     *  `drop tail`: Drops new messages being enqueued on the queue.
-     *  `drop head`: Drops old messages at the front of the queue.
-     *
-     * If the queue is persistent then it is considered full when the max
-     * quota size is reached.  If the queue is not persistent then
-     * the queue is considered full once it's `tail_buffer` fills up.
-     * Defaults to 'block' if not specified.
-     */
-    @XmlAttribute(name="full_policy")
-    public String full_policy;
-
-    /**
-     *  The message delivery rate (in bytes/sec) at which
-     *  the queue considers the consumers fast and
-     *  may start slowing down producers to match the consumption
-     *  rate if the consumers are at the tail of the queue.
-     */
-    @XmlAttribute(name="fast_delivery_rate")
-    public String fast_delivery_rate;
-
-    /**
-     * If set, and the the current delivery
-     * rate is exceeding the configured value
-     * of fast_delivery_rate and the consumers
-     * are spending more time loading from
-     * the store than delivering, then the
-     * enqueue rate will be throttled to the
-     * specified value so that the consumers
-     * can catch up and reach the tail of the queue.
-     */
-    @XmlAttribute(name="catchup_enqueue_rate")
-    public String catchup_enqueue_rate;
-
-    /**
-     *  The maximum enqueue rate of the queue
-     */
-    @XmlAttribute(name="max_enqueue_rate")
-    public String max_enqueue_rate;
-
-    /**
-     * To hold any other non-matching XML elements
-     */
-    @XmlAnyElement(lax=true)
-    public List<Object> other = new ArrayList<Object>();
-
-    /**
-     * Is the dead letter queue configured for the destination.  A
-     * dead letter queue is used for storing messages that failed to get processed
-     * by consumers.  If not set, then messages that fail to get processed
-     * will be dropped.  If '*' appears in the name it will be replaced with
-     * the queue's id.
-     */
-    @XmlAttribute(name="dlq")
-    public String dlq;
-
-    /**
-     * Once a message has been nacked the configured
-     * number of times the message will be considered to be a
-     * poison message and will get moved to the dead letter queue if that's
-     * configured or dropped.  If set to less than one, then the message
-     * will never be considered to be a poison message.
-     * Defaults to zero.
-     */
-    @XmlAttribute(name="nak_limit")
-    public Integer nak_limit;
-
     @Override
     public boolean equals(Object o) {
         if (this == o) return true;
@@ -173,21 +69,8 @@ public class QueueDTO extends StringIdDT
 
         if (auto_delete_after != null ? !auto_delete_after.equals(queueDTO.auto_delete_after) : queueDTO.auto_delete_after != null)
             return false;
-        if (fast_delivery_rate != null ? !fast_delivery_rate.equals(queueDTO.fast_delivery_rate) : queueDTO.fast_delivery_rate != null)
-            return false;
-        if (catchup_enqueue_rate != null ? !catchup_enqueue_rate.equals(queueDTO.catchup_enqueue_rate) : queueDTO.catchup_enqueue_rate != null)
-            return false;
-        if (max_enqueue_rate != null ? !max_enqueue_rate.equals(queueDTO.max_enqueue_rate) : queueDTO.max_enqueue_rate != null)
-            return false;
-        if (other != null ? !other.equals(queueDTO.other) : queueDTO.other != null) return false;
-        if (persistent != null ? !persistent.equals(queueDTO.persistent) : queueDTO.persistent != null) return false;
-        if (quota != null ? !quota.equals(queueDTO.quota) : queueDTO.quota != null) return false;
-        if (swap != null ? !swap.equals(queueDTO.swap) : queueDTO.swap != null) return false;
-        if (swap_range_size != null ? !swap_range_size.equals(queueDTO.swap_range_size) : queueDTO.swap_range_size != null)
-            return false;
+        if (id != null ? !id.equals(queueDTO.id) : queueDTO.id != null) return false;
         if (mirrored != null ? !mirrored.equals(queueDTO.mirrored) : queueDTO.mirrored != null) return false;
-        if (dlq != null ? !dlq.equals(queueDTO.dlq) : queueDTO.dlq != null) return false;
-        if (nak_limit != null ? !nak_limit.equals(queueDTO.nak_limit) : queueDTO.nak_limit != null) return false;
 
         return true;
     }
@@ -195,18 +78,9 @@ public class QueueDTO extends StringIdDT
     @Override
     public int hashCode() {
         int result = super.hashCode();
+        result = 31 * result + (id != null ? id.hashCode() : 0);
         result = 31 * result + (auto_delete_after != null ? auto_delete_after.hashCode() : 0);
         result = 31 * result + (mirrored != null ? mirrored.hashCode() : 0);
-        result = 31 * result + (persistent != null ? persistent.hashCode() : 0);
-        result = 31 * result + (swap != null ? swap.hashCode() : 0);
-        result = 31 * result + (swap_range_size != null ? swap_range_size.hashCode() : 0);
-        result = 31 * result + (quota != null ? quota.hashCode() : 0);
-        result = 31 * result + (fast_delivery_rate != null ? fast_delivery_rate.hashCode() : 0);
-        result = 31 * result + (catchup_enqueue_rate != null ? catchup_enqueue_rate.hashCode() : 0);
-        result = 31 * result + (max_enqueue_rate != null ? max_enqueue_rate.hashCode() : 0);
-        result = 31 * result + (other != null ? other.hashCode() : 0);
-        result = 31 * result + (dlq != null ? dlq.hashCode() : 0);
-        result = 31 * result + (nak_limit != null ? nak_limit.hashCode() : 0);
         return result;
     }
 }

Copied: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueSettingsDTO.java (from r1308215, activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueSettingsDTO.java?p2=activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueSettingsDTO.java&p1=activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java&r1=1308215&r2=1309437&rev=1309437&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueSettingsDTO.java Wed Apr  4 14:59:59 2012
@@ -17,41 +17,21 @@
 package org.apache.activemq.apollo.dto;
 
 
-
 import org.codehaus.jackson.annotate.JsonIgnoreProperties;
 
-import javax.xml.bind.annotation.*;
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlAnyElement;
+import javax.xml.bind.annotation.XmlAttribute;
 import java.util.ArrayList;
 import java.util.List;
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-@XmlRootElement(name = "queue")
 @XmlAccessorType(XmlAccessType.FIELD)
 @JsonIgnoreProperties(ignoreUnknown = true)
-public class QueueDTO extends StringIdDTO {
-
-    /**
-     * Controls when the queue will auto delete.
-     * If set to zero, then the queue will NOT auto
-     * delete, otherwise the queue will auto delete
-     * after it has been unused for the number
-     * of seconds configured in this field.  If unset,
-     * it defaults to 5 minutes.
-     */
-    @XmlAttribute(name="auto_delete_after")
-    public Integer auto_delete_after;
-
-    /**
-     * If set to true, then once the queue
-     * is created all messages sent to the queue
-     * will be mirrored to a topic of the same name
-     * and all messages sent to the topic will be mirror
-     * to the queue.
-     */
-    @XmlAttribute
-    public Boolean mirrored;
+public class QueueSettingsDTO {
 
     /**
      *  The amount of memory buffer space to use for swapping messages
@@ -137,12 +117,6 @@ public class QueueDTO extends StringIdDT
     public String max_enqueue_rate;
 
     /**
-     * To hold any other non-matching XML elements
-     */
-    @XmlAnyElement(lax=true)
-    public List<Object> other = new ArrayList<Object>();
-
-    /**
      * Is the dead letter queue configured for the destination.  A
      * dead letter queue is used for storing messages that failed to get processed
      * by consumers.  If not set, then messages that fail to get processed
@@ -163,50 +137,53 @@ public class QueueDTO extends StringIdDT
     @XmlAttribute(name="nak_limit")
     public Integer nak_limit;
 
+    /**
+     * To hold any other non-matching XML elements
+     */
+    @XmlAnyElement(lax=true)
+    public List<Object> other = new ArrayList<Object>();
+
     @Override
     public boolean equals(Object o) {
         if (this == o) return true;
-        if (!(o instanceof QueueDTO)) return false;
-        if (!super.equals(o)) return false;
+        if (!(o instanceof QueueSettingsDTO)) return false;
 
-        QueueDTO queueDTO = (QueueDTO) o;
+        QueueSettingsDTO that = (QueueSettingsDTO) o;
 
-        if (auto_delete_after != null ? !auto_delete_after.equals(queueDTO.auto_delete_after) : queueDTO.auto_delete_after != null)
-            return false;
-        if (fast_delivery_rate != null ? !fast_delivery_rate.equals(queueDTO.fast_delivery_rate) : queueDTO.fast_delivery_rate != null)
+        if (catchup_enqueue_rate != null ? !catchup_enqueue_rate.equals(that.catchup_enqueue_rate) : that.catchup_enqueue_rate != null)
             return false;
-        if (catchup_enqueue_rate != null ? !catchup_enqueue_rate.equals(queueDTO.catchup_enqueue_rate) : queueDTO.catchup_enqueue_rate != null)
+        if (dlq != null ? !dlq.equals(that.dlq) : that.dlq != null) return false;
+        if (fast_delivery_rate != null ? !fast_delivery_rate.equals(that.fast_delivery_rate) : that.fast_delivery_rate != null)
             return false;
-        if (max_enqueue_rate != null ? !max_enqueue_rate.equals(queueDTO.max_enqueue_rate) : queueDTO.max_enqueue_rate != null)
+        if (full_policy != null ? !full_policy.equals(that.full_policy) : that.full_policy != null) return false;
+        if (max_enqueue_rate != null ? !max_enqueue_rate.equals(that.max_enqueue_rate) : that.max_enqueue_rate != null)
             return false;
-        if (other != null ? !other.equals(queueDTO.other) : queueDTO.other != null) return false;
-        if (persistent != null ? !persistent.equals(queueDTO.persistent) : queueDTO.persistent != null) return false;
-        if (quota != null ? !quota.equals(queueDTO.quota) : queueDTO.quota != null) return false;
-        if (swap != null ? !swap.equals(queueDTO.swap) : queueDTO.swap != null) return false;
-        if (swap_range_size != null ? !swap_range_size.equals(queueDTO.swap_range_size) : queueDTO.swap_range_size != null)
+        if (nak_limit != null ? !nak_limit.equals(that.nak_limit) : that.nak_limit != null) return false;
+        if (other != null ? !other.equals(that.other) : that.other != null) return false;
+        if (persistent != null ? !persistent.equals(that.persistent) : that.persistent != null) return false;
+        if (quota != null ? !quota.equals(that.quota) : that.quota != null) return false;
+        if (swap != null ? !swap.equals(that.swap) : that.swap != null) return false;
+        if (swap_range_size != null ? !swap_range_size.equals(that.swap_range_size) : that.swap_range_size != null)
             return false;
-        if (mirrored != null ? !mirrored.equals(queueDTO.mirrored) : queueDTO.mirrored != null) return false;
-        if (dlq != null ? !dlq.equals(queueDTO.dlq) : queueDTO.dlq != null) return false;
-        if (nak_limit != null ? !nak_limit.equals(queueDTO.nak_limit) : queueDTO.nak_limit != null) return false;
+        if (tail_buffer != null ? !tail_buffer.equals(that.tail_buffer) : that.tail_buffer != null) return false;
 
         return true;
     }
 
     @Override
     public int hashCode() {
-        int result = super.hashCode();
-        result = 31 * result + (auto_delete_after != null ? auto_delete_after.hashCode() : 0);
-        result = 31 * result + (mirrored != null ? mirrored.hashCode() : 0);
+        int result = tail_buffer != null ? tail_buffer.hashCode() : 0;
         result = 31 * result + (persistent != null ? persistent.hashCode() : 0);
         result = 31 * result + (swap != null ? swap.hashCode() : 0);
         result = 31 * result + (swap_range_size != null ? swap_range_size.hashCode() : 0);
         result = 31 * result + (quota != null ? quota.hashCode() : 0);
+        result = 31 * result + (full_policy != null ? full_policy.hashCode() : 0);
         result = 31 * result + (fast_delivery_rate != null ? fast_delivery_rate.hashCode() : 0);
         result = 31 * result + (catchup_enqueue_rate != null ? catchup_enqueue_rate.hashCode() : 0);
         result = 31 * result + (max_enqueue_rate != null ? max_enqueue_rate.hashCode() : 0);
-        result = 31 * result + (other != null ? other.hashCode() : 0);
         result = 31 * result + (dlq != null ? dlq.hashCode() : 0);
         result = 31 * result + (nak_limit != null ? nak_limit.hashCode() : 0);
+        result = 31 * result + (other != null ? other.hashCode() : 0);
         return result;
     }
 }

Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java?rev=1309437&r1=1309436&r2=1309437&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java Wed Apr  4 14:59:59 2012
@@ -34,7 +34,7 @@ import java.util.List;
 public class QueueStatusDTO extends ServiceStatusDTO {
 
     @XmlElement
-    public QueueDTO config;
+    public QueueSettingsDTO config;
 
     @XmlElement
     public DestinationDTO binding;

Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TopicDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TopicDTO.java?rev=1309437&r1=1309436&r2=1309437&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TopicDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TopicDTO.java Wed Apr  4 14:59:59 2012
@@ -30,7 +30,13 @@ import java.util.List;
 @XmlRootElement(name = "topic")
 @XmlAccessorType(XmlAccessType.FIELD)
 @JsonIgnoreProperties(ignoreUnknown = true)
-public class TopicDTO extends StringIdDTO {
+public class TopicDTO {
+
+    /**
+     * A unique id of the topic.
+     */
+	@XmlAttribute
+	public String id;
 
     /**
      * Controls when the topic will auto delete.
@@ -47,6 +53,13 @@ public class TopicDTO extends StringIdDT
     public String slow_consumer_policy;
 
     /**
+     * The subscription settings that will be used for queues which are created
+     * for each subscription when the `slow_consumer_policy` is set to `queue`.
+     */
+    @XmlElement(name="subscription")
+    public QueueSettingsDTO subscription;
+
+    /**
      * To hold any other non-matching XML elements
      */
     @XmlAnyElement(lax=true)
@@ -56,13 +69,14 @@ public class TopicDTO extends StringIdDT
     public boolean equals(Object o) {
         if (this == o) return true;
         if (!(o instanceof TopicDTO)) return false;
-        if (!super.equals(o)) return false;
 
         TopicDTO topicDTO = (TopicDTO) o;
 
         if (auto_delete_after != null ? !auto_delete_after.equals(topicDTO.auto_delete_after) : topicDTO.auto_delete_after != null)
             return false;
+        if (id != null ? !id.equals(topicDTO.id) : topicDTO.id != null) return false;
         if (other != null ? !other.equals(topicDTO.other) : topicDTO.other != null) return false;
+        if (subscription != null ? !subscription.equals(topicDTO.subscription) : topicDTO.subscription != null) return false;
         if (slow_consumer_policy != null ? !slow_consumer_policy.equals(topicDTO.slow_consumer_policy) : topicDTO.slow_consumer_policy != null)
             return false;
 
@@ -71,9 +85,10 @@ public class TopicDTO extends StringIdDT
 
     @Override
     public int hashCode() {
-        int result = super.hashCode();
+        int result = id != null ? id.hashCode() : 0;
         result = 31 * result + (auto_delete_after != null ? auto_delete_after.hashCode() : 0);
         result = 31 * result + (slow_consumer_policy != null ? slow_consumer_policy.hashCode() : 0);
+        result = 31 * result + (subscription != null ? subscription.hashCode() : 0);
         result = 31 * result + (other != null ? other.hashCode() : 0);
         return result;
     }

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp.xml?rev=1309437&r1=1309436&r2=1309437&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp.xml Wed Apr  4 14:59:59 2012
@@ -23,7 +23,9 @@
 
     <queue id="nacker.**" dlq="dlq.*" nak_limit="2"/>
     <queue id="mirrored.**" mirrored="true"/>
-    <topic id="queued.**" slow_consumer_policy="queue"/>
+    <topic id="queued.**" slow_consumer_policy="queue">
+      <subscription tail_buffer="4k"/>
+    </topic>
 
   </virtual_host>
 

Modified: activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md?rev=1309437&r1=1309436&r2=1309437&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md (original)
+++ activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md Wed Apr  4 14:59:59 2012
@@ -389,6 +389,19 @@ memory.  Defaults to true.
   the queue is considered full once it's `tail_buffer` fills up.
   Defaults to 'block' if not specified.
 
+Example configuraiton:
+
+{pygmentize:: xml}
+...
+  <virtual_host id="default">
+    ...
+    <queue id="app1.**" dlq="dlq.*" nak_limit="3" auto_delete_after="0"/>
+    ...
+  </virtual_host>
+...
+{pygmentize}
+
+
 ##### Topics
 
 When a new topic is first created in the broker, it's configuration will be
@@ -415,6 +428,25 @@ A `topic` element may be configured with
   delete once there have been no consumers or producers on it
   for the configured number of seconds.  Defaults to 30 if not set.
 
+A `topic` that has the `slow_consumer_policy` set to `queue` can customize
+the settings of the per subscription queues by adding a nested `subscription`
+element.  The `subscription` element supports the following configuration
+attributes of the `queue` element: `tail_buffer`, `persistent`, `swap`
+`swap_range_size`, `quota`, `full_policy`, `fast_delivery_rate`, 
+`catchup_enqueue_rate`, `max_enqueue_rate`, `dlq`, `nak_limit`.  Example:
+
+{pygmentize:: xml}
+...
+  <virtual_host id="default">
+    ...
+    <topic id="example" slow_consumer_policy="queue">
+      <subscription tail_buffer="4k"/>
+    </topic>
+    ...
+  </virtual_host>
+...
+{pygmentize}
+
 ##### Durable Subscriptions
 
 When a new durable subscription is first created in the broker, it's