You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/04/04 16:59:59 UTC
svn commit: r1309437 - in /activemq/activemq-apollo/trunk:
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/
apollo-dto/src/main/java/org/apache/activemq/apollo/dto/
apollo-stomp/src/test/resources/ apollo-website/src/documentation/
Author: chirino
Date: Wed Apr 4 14:59:59 2012
New Revision: 1309437
URL: http://svn.apache.org/viewvc?rev=1309437&view=rev
Log:
Fixes APLO-183 : Allow customizing the settings of the per subscription queues created when slow_consumer_policy="queue" on a topic
Added:
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueSettingsDTO.java
- copied, changed from r1308215, activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Binding.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TopicDTO.java
activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp.xml
activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Binding.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Binding.scala?rev=1309437&r1=1309436&r2=1309437&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Binding.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Binding.scala Wed Apr 4 14:59:59 2012
@@ -76,7 +76,7 @@ trait Binding {
def message_filter:BooleanExpression = ConstantExpression.TRUE
- def config(host:VirtualHost):QueueDTO
+ def config(host:VirtualHost):QueueSettingsDTO
override def toString = address.toString
}
@@ -255,7 +255,7 @@ object TempQueueBinding {
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-case class TempQueueBinding(key:AnyRef, address:DestinationAddress) extends Binding {
+case class TempQueueBinding(key:AnyRef, address:DestinationAddress, settings:QueueSettingsDTO) extends Binding {
import TempQueueBinding._
def binding_kind = TEMP_KIND
@@ -270,7 +270,7 @@ case class TempQueueBinding(key:AnyRef,
override def hashCode = if(key==null) 0 else key.hashCode
- def config(host: VirtualHost) = new QueueDTO
+ def config(host: VirtualHost) = settings
override def equals(o:Any):Boolean = o match {
case x: TempQueueBinding => x.key == key
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala?rev=1309437&r1=1309436&r2=1309437&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala Wed Apr 4 14:59:59 2012
@@ -153,11 +153,11 @@ object LocalRouter extends Log {
val destination_parser = new DestinationParser
- def is_wildcard_config(dto:StringIdDTO) = {
- if( dto.id == null ) {
+ def is_wildcard_destination(id:String) = {
+ if( id == null ) {
true
} else {
- val path = destination_parser.decode_path(dto.id)
+ val path = destination_parser.decode_path(id)
PathParser.containsWildCards(path)
}
}
@@ -740,8 +740,7 @@ class LocalRouter(val virtual_host:Virtu
assert( !PathParser.containsWildCards(path) )
add_destination(path, queue)
- import OptionSupport._
- if( queue.config.mirrored.getOrElse(false) ) {
+ if( queue.mirrored ) {
// hook up the queue to be a subscriber of the topic.
val topic = local_topic_domain.get_or_create_destination(SimpleAddress("topic", path), null).success
topic.bind(SimpleAddress("queue", path), queue)
@@ -752,8 +751,7 @@ class LocalRouter(val virtual_host:Virtu
val path = queue.address.path
remove_destination(path, queue)
- import OptionSupport._
- if( queue.config.mirrored.getOrElse(false) ) {
+ if( queue.mirrored ) {
// unhook the queue from the topic
val topic = local_topic_domain.get_or_create_destination(SimpleAddress("topic", path), null).success
topic.unbind(queue, false)
@@ -818,22 +816,22 @@ class LocalRouter(val virtual_host:Virtu
protected def create_configure_destinations {
import collection.JavaConversions._
- def create_configured_dests(list: ArrayList[_ <: StringIdDTO], d: Domain[_], to_address: (Path) => DestinationAddress) = {
- list.foreach { dto =>
- if (dto.id != null) {
+ def create_configured_dests(list: Traversable[String], d: Domain[_], to_address: (Path) => DestinationAddress) = {
+ list.foreach { id =>
+ if (id != null) {
try {
- val path = destination_parser.decode_path(dto.id)
+ val path = destination_parser.decode_path(id)
if (!PathParser.containsWildCards(path)) {
d.get_or_create_destination(to_address(path), null)
}
} catch {
- case x:PathException => warn(x, "Invalid destination id '%s'", dto.id)
+ case x:PathException => warn(x, "Invalid destination id '%s'", id)
}
}
}
}
- create_configured_dests(virtual_host.config.queues, local_queue_domain, (path) => SimpleAddress("queue", path))
- create_configured_dests(virtual_host.config.topics, local_topic_domain, (path) => SimpleAddress("topic", path))
+ create_configured_dests(virtual_host.config.queues.map(_.id), local_queue_domain, (path) => SimpleAddress("queue", path))
+ create_configured_dests(virtual_host.config.topics.map(_.id), local_topic_domain, (path) => SimpleAddress("topic", path))
virtual_host.config.dsubs.foreach { dto =>
if (dto.id != null && ( dto.topic!=null || !dto.topics.isEmpty) ) {
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1309437&r1=1309436&r2=1309437&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Wed Apr 4 14:59:59 2012
@@ -214,12 +214,12 @@ class Queue(val router: LocalRouter, val
var loaded_size = 0
def swapped_in_size_max = this.producer_swapped_in.size_max + this.consumer_swapped_in.size_max
- var config:QueueDTO = _
+ var config:QueueSettingsDTO = _
var full_drop_policy:FullDropPolicy = Block
def dlq_nak_limit = OptionSupport(config.nak_limit).getOrElse(0)
- def configure(update:QueueDTO) = {
+ def configure(update:QueueSettingsDTO) = {
def mem_size(value:String, default:String) = MemoryPropertyEditor.parse(Option(value).getOrElse(default)).toInt
producer_swapped_in.size_max += mem_size(update.tail_buffer, "640k") - Option(config).map{ config=>
@@ -242,19 +242,29 @@ class Queue(val router: LocalRouter, val
warn("Invalid 'full_drop_policy' configured for queue '%s': '%s'", id, update.full_policy)
Block
}
-
- auto_delete_after = update.auto_delete_after.getOrElse(30)
- if( auto_delete_after!= 0 ) {
- // we don't auto delete explicitly configured queues,
- // non destination queues, or mirrored queues.
- if( update.mirrored.getOrElse(false) || !binding.isInstanceOf[QueueDomainQueueBinding] || !LocalRouter.is_wildcard_config(update) ) {
- auto_delete_after = 0
- }
+
+ update match {
+ case update:QueueDTO =>
+ auto_delete_after = update.auto_delete_after.getOrElse(30)
+ if( auto_delete_after!= 0 ) {
+ // we don't auto delete explicitly configured queues,
+ // non destination queues, or mirrored queues.
+ if( update.mirrored.getOrElse(false) || !binding.isInstanceOf[QueueDomainQueueBinding] || !LocalRouter.is_wildcard_destination(update.id) ) {
+ auto_delete_after = 0
+ }
+ }
+ case _ =>
}
config = update
this
}
+ def mirrored = config match {
+ case config:QueueDTO =>
+ config.mirrored.getOrElse(false)
+ case _ => false
+ }
+
def get_queue_metrics:DestMetricsDTO = {
dispatch_queue.assertExecuting()
val rc = new DestMetricsDTO
@@ -1149,8 +1159,7 @@ class Queue(val router: LocalRouter, val
}
def connect (connect_address:ConnectAddress, producer:BindableDeliveryProducer) = {
- import OptionSupport._
- if( config.mirrored.getOrElse(false) ) {
+ if( mirrored ) {
// this is a mirrored queue.. actually have the produce bind to the topic, instead of the
val topic_address = new SimpleAddress("topic", binding.address.path)
val topic = router.local_topic_domain.get_or_create_destination(topic_address, null).success
@@ -1165,8 +1174,7 @@ class Queue(val router: LocalRouter, val
}
def disconnect (producer:BindableDeliveryProducer) = {
- import OptionSupport._
- if( config.mirrored.getOrElse(false) ) {
+ if( mirrored ) {
val topic_address = new SimpleAddress("topic", binding.address.path)
val topic = router.local_topic_domain.get_or_create_destination(topic_address, null).success
topic.disconnect(producer)
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala?rev=1309437&r1=1309436&r2=1309437&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala Wed Apr 4 14:59:59 2012
@@ -306,7 +306,7 @@ class Topic(val router:LocalRouter, val
auto_delete_after = config.auto_delete_after.getOrElse(30)
if( auto_delete_after!= 0 ) {
// we don't auto delete explicitly configured destinations.
- if( !LocalRouter.is_wildcard_config(config) ) {
+ if( !LocalRouter.is_wildcard_destination(config.id) ) {
auto_delete_after = 0
}
}
@@ -343,7 +343,7 @@ class Topic(val router:LocalRouter, val
case "queue" =>
// create a temp queue so that it can spool
- val queue = router._create_queue(new TempQueueBinding(consumer, address))
+ val queue = router._create_queue(new TempQueueBinding(consumer, address, Option(config.subscription).getOrElse(new QueueSettingsDTO)))
queue.dispatch_queue.setTargetQueue(consumer.dispatch_queue)
queue.bind(List(consumer))
consumer_queues += consumer->queue
Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java?rev=1309437&r1=1309436&r2=1309437&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java Wed Apr 4 14:59:59 2012
@@ -30,7 +30,13 @@ import java.util.List;
@XmlRootElement(name = "queue")
@XmlAccessorType(XmlAccessType.FIELD)
@JsonIgnoreProperties(ignoreUnknown = true)
-public class QueueDTO extends StringIdDTO {
+public class QueueDTO extends QueueSettingsDTO {
+
+ /**
+ * A unique id of queue
+ */
+ @XmlAttribute
+ public String id;
/**
* Controls when the queue will auto delete.
@@ -53,116 +59,6 @@ public class QueueDTO extends StringIdDT
@XmlAttribute
public Boolean mirrored;
- /**
- * The amount of memory buffer space to use for swapping messages
- * out.
- */
- @XmlAttribute(name="tail_buffer")
- public String tail_buffer;
-
- /**
- * Should this queue persistently store it's entries?
- */
- @XmlAttribute(name="persistent")
- public Boolean persistent;
-
- /**
- * Should messages be swapped out of memory if
- * no consumers need the message?
- */
- @XmlAttribute(name="swap")
- public Boolean swap;
-
- /**
- * The number max number of swapped queue entries to load
- * from the store at a time. Note that swapped entries are just
- * reference pointers to the actual messages. When not loaded,
- * the batch is referenced as sequence range to conserve memory.
- */
- @XmlAttribute(name="swap_range_size")
- public Integer swap_range_size;
-
- /**
- * The maximum amount of size the queue is allowed
- * to grow to. If not set then there is no limit. You can
- * use settings values like: 500mb or 1g just plain 1024000
- */
- @XmlAttribute(name="quota")
- public String quota;
-
- /**
- * Once the queue is full, the `full_policy` controls how the
- * queue behaves when additional messages attempt to be enqueued
- * onto the queue.
- *
- * You can set it to one of the following options:
- * `block`: The producer blocks until some space frees up.
- * `drop tail`: Drops new messages being enqueued on the queue.
- * `drop head`: Drops old messages at the front of the queue.
- *
- * If the queue is persistent then it is considered full when the max
- * quota size is reached. If the queue is not persistent then
- * the queue is considered full once it's `tail_buffer` fills up.
- * Defaults to 'block' if not specified.
- */
- @XmlAttribute(name="full_policy")
- public String full_policy;
-
- /**
- * The message delivery rate (in bytes/sec) at which
- * the queue considers the consumers fast and
- * may start slowing down producers to match the consumption
- * rate if the consumers are at the tail of the queue.
- */
- @XmlAttribute(name="fast_delivery_rate")
- public String fast_delivery_rate;
-
- /**
- * If set, and the the current delivery
- * rate is exceeding the configured value
- * of fast_delivery_rate and the consumers
- * are spending more time loading from
- * the store than delivering, then the
- * enqueue rate will be throttled to the
- * specified value so that the consumers
- * can catch up and reach the tail of the queue.
- */
- @XmlAttribute(name="catchup_enqueue_rate")
- public String catchup_enqueue_rate;
-
- /**
- * The maximum enqueue rate of the queue
- */
- @XmlAttribute(name="max_enqueue_rate")
- public String max_enqueue_rate;
-
- /**
- * To hold any other non-matching XML elements
- */
- @XmlAnyElement(lax=true)
- public List<Object> other = new ArrayList<Object>();
-
- /**
- * Is the dead letter queue configured for the destination. A
- * dead letter queue is used for storing messages that failed to get processed
- * by consumers. If not set, then messages that fail to get processed
- * will be dropped. If '*' appears in the name it will be replaced with
- * the queue's id.
- */
- @XmlAttribute(name="dlq")
- public String dlq;
-
- /**
- * Once a message has been nacked the configured
- * number of times the message will be considered to be a
- * poison message and will get moved to the dead letter queue if that's
- * configured or dropped. If set to less than one, then the message
- * will never be considered to be a poison message.
- * Defaults to zero.
- */
- @XmlAttribute(name="nak_limit")
- public Integer nak_limit;
-
@Override
public boolean equals(Object o) {
if (this == o) return true;
@@ -173,21 +69,8 @@ public class QueueDTO extends StringIdDT
if (auto_delete_after != null ? !auto_delete_after.equals(queueDTO.auto_delete_after) : queueDTO.auto_delete_after != null)
return false;
- if (fast_delivery_rate != null ? !fast_delivery_rate.equals(queueDTO.fast_delivery_rate) : queueDTO.fast_delivery_rate != null)
- return false;
- if (catchup_enqueue_rate != null ? !catchup_enqueue_rate.equals(queueDTO.catchup_enqueue_rate) : queueDTO.catchup_enqueue_rate != null)
- return false;
- if (max_enqueue_rate != null ? !max_enqueue_rate.equals(queueDTO.max_enqueue_rate) : queueDTO.max_enqueue_rate != null)
- return false;
- if (other != null ? !other.equals(queueDTO.other) : queueDTO.other != null) return false;
- if (persistent != null ? !persistent.equals(queueDTO.persistent) : queueDTO.persistent != null) return false;
- if (quota != null ? !quota.equals(queueDTO.quota) : queueDTO.quota != null) return false;
- if (swap != null ? !swap.equals(queueDTO.swap) : queueDTO.swap != null) return false;
- if (swap_range_size != null ? !swap_range_size.equals(queueDTO.swap_range_size) : queueDTO.swap_range_size != null)
- return false;
+ if (id != null ? !id.equals(queueDTO.id) : queueDTO.id != null) return false;
if (mirrored != null ? !mirrored.equals(queueDTO.mirrored) : queueDTO.mirrored != null) return false;
- if (dlq != null ? !dlq.equals(queueDTO.dlq) : queueDTO.dlq != null) return false;
- if (nak_limit != null ? !nak_limit.equals(queueDTO.nak_limit) : queueDTO.nak_limit != null) return false;
return true;
}
@@ -195,18 +78,9 @@ public class QueueDTO extends StringIdDT
@Override
public int hashCode() {
int result = super.hashCode();
+ result = 31 * result + (id != null ? id.hashCode() : 0);
result = 31 * result + (auto_delete_after != null ? auto_delete_after.hashCode() : 0);
result = 31 * result + (mirrored != null ? mirrored.hashCode() : 0);
- result = 31 * result + (persistent != null ? persistent.hashCode() : 0);
- result = 31 * result + (swap != null ? swap.hashCode() : 0);
- result = 31 * result + (swap_range_size != null ? swap_range_size.hashCode() : 0);
- result = 31 * result + (quota != null ? quota.hashCode() : 0);
- result = 31 * result + (fast_delivery_rate != null ? fast_delivery_rate.hashCode() : 0);
- result = 31 * result + (catchup_enqueue_rate != null ? catchup_enqueue_rate.hashCode() : 0);
- result = 31 * result + (max_enqueue_rate != null ? max_enqueue_rate.hashCode() : 0);
- result = 31 * result + (other != null ? other.hashCode() : 0);
- result = 31 * result + (dlq != null ? dlq.hashCode() : 0);
- result = 31 * result + (nak_limit != null ? nak_limit.hashCode() : 0);
return result;
}
}
Copied: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueSettingsDTO.java (from r1308215, activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueSettingsDTO.java?p2=activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueSettingsDTO.java&p1=activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java&r1=1308215&r2=1309437&rev=1309437&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueSettingsDTO.java Wed Apr 4 14:59:59 2012
@@ -17,41 +17,21 @@
package org.apache.activemq.apollo.dto;
-
import org.codehaus.jackson.annotate.JsonIgnoreProperties;
-import javax.xml.bind.annotation.*;
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlAnyElement;
+import javax.xml.bind.annotation.XmlAttribute;
import java.util.ArrayList;
import java.util.List;
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-@XmlRootElement(name = "queue")
@XmlAccessorType(XmlAccessType.FIELD)
@JsonIgnoreProperties(ignoreUnknown = true)
-public class QueueDTO extends StringIdDTO {
-
- /**
- * Controls when the queue will auto delete.
- * If set to zero, then the queue will NOT auto
- * delete, otherwise the queue will auto delete
- * after it has been unused for the number
- * of seconds configured in this field. If unset,
- * it defaults to 5 minutes.
- */
- @XmlAttribute(name="auto_delete_after")
- public Integer auto_delete_after;
-
- /**
- * If set to true, then once the queue
- * is created all messages sent to the queue
- * will be mirrored to a topic of the same name
- * and all messages sent to the topic will be mirror
- * to the queue.
- */
- @XmlAttribute
- public Boolean mirrored;
+public class QueueSettingsDTO {
/**
* The amount of memory buffer space to use for swapping messages
@@ -137,12 +117,6 @@ public class QueueDTO extends StringIdDT
public String max_enqueue_rate;
/**
- * To hold any other non-matching XML elements
- */
- @XmlAnyElement(lax=true)
- public List<Object> other = new ArrayList<Object>();
-
- /**
* Is the dead letter queue configured for the destination. A
* dead letter queue is used for storing messages that failed to get processed
* by consumers. If not set, then messages that fail to get processed
@@ -163,50 +137,53 @@ public class QueueDTO extends StringIdDT
@XmlAttribute(name="nak_limit")
public Integer nak_limit;
+ /**
+ * To hold any other non-matching XML elements
+ */
+ @XmlAnyElement(lax=true)
+ public List<Object> other = new ArrayList<Object>();
+
@Override
public boolean equals(Object o) {
if (this == o) return true;
- if (!(o instanceof QueueDTO)) return false;
- if (!super.equals(o)) return false;
+ if (!(o instanceof QueueSettingsDTO)) return false;
- QueueDTO queueDTO = (QueueDTO) o;
+ QueueSettingsDTO that = (QueueSettingsDTO) o;
- if (auto_delete_after != null ? !auto_delete_after.equals(queueDTO.auto_delete_after) : queueDTO.auto_delete_after != null)
- return false;
- if (fast_delivery_rate != null ? !fast_delivery_rate.equals(queueDTO.fast_delivery_rate) : queueDTO.fast_delivery_rate != null)
+ if (catchup_enqueue_rate != null ? !catchup_enqueue_rate.equals(that.catchup_enqueue_rate) : that.catchup_enqueue_rate != null)
return false;
- if (catchup_enqueue_rate != null ? !catchup_enqueue_rate.equals(queueDTO.catchup_enqueue_rate) : queueDTO.catchup_enqueue_rate != null)
+ if (dlq != null ? !dlq.equals(that.dlq) : that.dlq != null) return false;
+ if (fast_delivery_rate != null ? !fast_delivery_rate.equals(that.fast_delivery_rate) : that.fast_delivery_rate != null)
return false;
- if (max_enqueue_rate != null ? !max_enqueue_rate.equals(queueDTO.max_enqueue_rate) : queueDTO.max_enqueue_rate != null)
+ if (full_policy != null ? !full_policy.equals(that.full_policy) : that.full_policy != null) return false;
+ if (max_enqueue_rate != null ? !max_enqueue_rate.equals(that.max_enqueue_rate) : that.max_enqueue_rate != null)
return false;
- if (other != null ? !other.equals(queueDTO.other) : queueDTO.other != null) return false;
- if (persistent != null ? !persistent.equals(queueDTO.persistent) : queueDTO.persistent != null) return false;
- if (quota != null ? !quota.equals(queueDTO.quota) : queueDTO.quota != null) return false;
- if (swap != null ? !swap.equals(queueDTO.swap) : queueDTO.swap != null) return false;
- if (swap_range_size != null ? !swap_range_size.equals(queueDTO.swap_range_size) : queueDTO.swap_range_size != null)
+ if (nak_limit != null ? !nak_limit.equals(that.nak_limit) : that.nak_limit != null) return false;
+ if (other != null ? !other.equals(that.other) : that.other != null) return false;
+ if (persistent != null ? !persistent.equals(that.persistent) : that.persistent != null) return false;
+ if (quota != null ? !quota.equals(that.quota) : that.quota != null) return false;
+ if (swap != null ? !swap.equals(that.swap) : that.swap != null) return false;
+ if (swap_range_size != null ? !swap_range_size.equals(that.swap_range_size) : that.swap_range_size != null)
return false;
- if (mirrored != null ? !mirrored.equals(queueDTO.mirrored) : queueDTO.mirrored != null) return false;
- if (dlq != null ? !dlq.equals(queueDTO.dlq) : queueDTO.dlq != null) return false;
- if (nak_limit != null ? !nak_limit.equals(queueDTO.nak_limit) : queueDTO.nak_limit != null) return false;
+ if (tail_buffer != null ? !tail_buffer.equals(that.tail_buffer) : that.tail_buffer != null) return false;
return true;
}
@Override
public int hashCode() {
- int result = super.hashCode();
- result = 31 * result + (auto_delete_after != null ? auto_delete_after.hashCode() : 0);
- result = 31 * result + (mirrored != null ? mirrored.hashCode() : 0);
+ int result = tail_buffer != null ? tail_buffer.hashCode() : 0;
result = 31 * result + (persistent != null ? persistent.hashCode() : 0);
result = 31 * result + (swap != null ? swap.hashCode() : 0);
result = 31 * result + (swap_range_size != null ? swap_range_size.hashCode() : 0);
result = 31 * result + (quota != null ? quota.hashCode() : 0);
+ result = 31 * result + (full_policy != null ? full_policy.hashCode() : 0);
result = 31 * result + (fast_delivery_rate != null ? fast_delivery_rate.hashCode() : 0);
result = 31 * result + (catchup_enqueue_rate != null ? catchup_enqueue_rate.hashCode() : 0);
result = 31 * result + (max_enqueue_rate != null ? max_enqueue_rate.hashCode() : 0);
- result = 31 * result + (other != null ? other.hashCode() : 0);
result = 31 * result + (dlq != null ? dlq.hashCode() : 0);
result = 31 * result + (nak_limit != null ? nak_limit.hashCode() : 0);
+ result = 31 * result + (other != null ? other.hashCode() : 0);
return result;
}
}
Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java?rev=1309437&r1=1309436&r2=1309437&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java Wed Apr 4 14:59:59 2012
@@ -34,7 +34,7 @@ import java.util.List;
public class QueueStatusDTO extends ServiceStatusDTO {
@XmlElement
- public QueueDTO config;
+ public QueueSettingsDTO config;
@XmlElement
public DestinationDTO binding;
Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TopicDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TopicDTO.java?rev=1309437&r1=1309436&r2=1309437&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TopicDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TopicDTO.java Wed Apr 4 14:59:59 2012
@@ -30,7 +30,13 @@ import java.util.List;
@XmlRootElement(name = "topic")
@XmlAccessorType(XmlAccessType.FIELD)
@JsonIgnoreProperties(ignoreUnknown = true)
-public class TopicDTO extends StringIdDTO {
+public class TopicDTO {
+
+ /**
+ * A unique id of the topic.
+ */
+ @XmlAttribute
+ public String id;
/**
* Controls when the topic will auto delete.
@@ -47,6 +53,13 @@ public class TopicDTO extends StringIdDT
public String slow_consumer_policy;
/**
+ * The subscription settings that will be used for queues which are created
+ * for each subscription when the `slow_consumer_policy` is set to `queue`.
+ */
+ @XmlElement(name="subscription")
+ public QueueSettingsDTO subscription;
+
+ /**
* To hold any other non-matching XML elements
*/
@XmlAnyElement(lax=true)
@@ -56,13 +69,14 @@ public class TopicDTO extends StringIdDT
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof TopicDTO)) return false;
- if (!super.equals(o)) return false;
TopicDTO topicDTO = (TopicDTO) o;
if (auto_delete_after != null ? !auto_delete_after.equals(topicDTO.auto_delete_after) : topicDTO.auto_delete_after != null)
return false;
+ if (id != null ? !id.equals(topicDTO.id) : topicDTO.id != null) return false;
if (other != null ? !other.equals(topicDTO.other) : topicDTO.other != null) return false;
+ if (subscription != null ? !subscription.equals(topicDTO.subscription) : topicDTO.subscription != null) return false;
if (slow_consumer_policy != null ? !slow_consumer_policy.equals(topicDTO.slow_consumer_policy) : topicDTO.slow_consumer_policy != null)
return false;
@@ -71,9 +85,10 @@ public class TopicDTO extends StringIdDT
@Override
public int hashCode() {
- int result = super.hashCode();
+ int result = id != null ? id.hashCode() : 0;
result = 31 * result + (auto_delete_after != null ? auto_delete_after.hashCode() : 0);
result = 31 * result + (slow_consumer_policy != null ? slow_consumer_policy.hashCode() : 0);
+ result = 31 * result + (subscription != null ? subscription.hashCode() : 0);
result = 31 * result + (other != null ? other.hashCode() : 0);
return result;
}
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp.xml?rev=1309437&r1=1309436&r2=1309437&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp.xml Wed Apr 4 14:59:59 2012
@@ -23,7 +23,9 @@
<queue id="nacker.**" dlq="dlq.*" nak_limit="2"/>
<queue id="mirrored.**" mirrored="true"/>
- <topic id="queued.**" slow_consumer_policy="queue"/>
+ <topic id="queued.**" slow_consumer_policy="queue">
+ <subscription tail_buffer="4k"/>
+ </topic>
</virtual_host>
Modified: activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md?rev=1309437&r1=1309436&r2=1309437&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md (original)
+++ activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md Wed Apr 4 14:59:59 2012
@@ -389,6 +389,19 @@ memory. Defaults to true.
the queue is considered full once it's `tail_buffer` fills up.
Defaults to 'block' if not specified.
+Example configuraiton:
+
+{pygmentize:: xml}
+...
+ <virtual_host id="default">
+ ...
+ <queue id="app1.**" dlq="dlq.*" nak_limit="3" auto_delete_after="0"/>
+ ...
+ </virtual_host>
+...
+{pygmentize}
+
+
##### Topics
When a new topic is first created in the broker, it's configuration will be
@@ -415,6 +428,25 @@ A `topic` element may be configured with
delete once there have been no consumers or producers on it
for the configured number of seconds. Defaults to 30 if not set.
+A `topic` that has the `slow_consumer_policy` set to `queue` can customize
+the settings of the per subscription queues by adding a nested `subscription`
+element. The `subscription` element supports the following configuration
+attributes of the `queue` element: `tail_buffer`, `persistent`, `swap`
+`swap_range_size`, `quota`, `full_policy`, `fast_delivery_rate`,
+`catchup_enqueue_rate`, `max_enqueue_rate`, `dlq`, `nak_limit`. Example:
+
+{pygmentize:: xml}
+...
+ <virtual_host id="default">
+ ...
+ <topic id="example" slow_consumer_policy="queue">
+ <subscription tail_buffer="4k"/>
+ </topic>
+ ...
+ </virtual_host>
+...
+{pygmentize}
+
##### Durable Subscriptions
When a new durable subscription is first created in the broker, it's