You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/04/02 01:03:30 UTC
svn commit: r1308215 - in /activemq/activemq-apollo/trunk:
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/
apollo-broker/src/test/scala/
apollo-dto/src/main/java/org/apache/activemq/apollo/dto/
apollo-stomp/src/test/resources/ apollo-st...
Author: chirino
Date: Sun Apr 1 23:03:29 2012
New Revision: 1308215
URL: http://svn.apache.org/viewvc?rev=1308215&view=rev
Log:
Fixes APLO-76: It would be nice to be able to configure Apollo's behavior when a queue is full. You can now block, or drop from the tail or head of the queue.
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java
activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-leveldb.xml
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1308215&r1=1308214&r2=1308215&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Sun Apr 1 23:03:29 2012
@@ -34,6 +34,11 @@ import org.apache.activemq.apollo.dto._
import org.fusesource.hawtbuf._
import java.util.regex.Pattern
+sealed trait FullDropPolicy
+object Block extends FullDropPolicy
+object DropHead extends FullDropPolicy
+object DropTail extends FullDropPolicy
+
object Queue extends Log {
val subcsription_counter = new AtomicInteger(0)
@@ -106,10 +111,6 @@ class Queue(val router: LocalRouter, val
entries.addFirst(head_entry)
//
- // In-frequently accessed tuning configuration.
- //
-
- //
// Frequently accessed tuning configuration.
//
@@ -214,6 +215,7 @@ class Queue(val router: LocalRouter, val
def swapped_in_size_max = this.producer_swapped_in.size_max + this.consumer_swapped_in.size_max
var config:QueueDTO = _
+ var full_drop_policy:FullDropPolicy = Block
def dlq_nak_limit = OptionSupport(config.nak_limit).getOrElse(0)
@@ -232,6 +234,15 @@ class Queue(val router: LocalRouter, val
tune_max_enqueue_rate = mem_size(update.max_enqueue_rate,"-1")
tune_quota = mem_size(update.quota,"-1")
+ full_drop_policy = Option(update.full_policy).getOrElse("none").toLowerCase match {
+ case "drop head" => DropHead
+ case "drop tail" => DropTail
+ case "block" => Block
+ case _ =>
+ warn("Invalid 'full_drop_policy' configured for queue '%s': '%s'", id, update.full_policy)
+ Block
+ }
+
auto_delete_after = update.auto_delete_after.getOrElse(30)
if( auto_delete_after!= 0 ) {
// we don't auto delete explicitly configured queues,
@@ -500,7 +511,7 @@ class Queue(val router: LocalRouter, val
}
def change_producer_capacity(amount:Int) = might_unfill {
- producer_swapped_in.size_max += amount
+ // producer_swapped_in.size_max += amount
}
def change_consumer_capacity(amount:Int) = might_unfill {
consumer_swapped_in.size_max += amount
@@ -515,7 +526,12 @@ class Queue(val router: LocalRouter, val
def is_enqueue_buffer_maxed = (producer_swapped_in.size >= producer_swapped_in.size_max)
def full = if( service_state.is_started ) {
- is_enqueue_buffer_maxed || is_enqueue_throttled || is_quota_exceeded
+ if ( full_drop_policy eq Block ) {
+ is_enqueue_buffer_maxed || is_enqueue_throttled || is_quota_exceeded
+ } else {
+ // we are never full since we can just drop messages at will.
+ false
+ }
} else if( service_state.is_starting) {
true
} else {
@@ -527,11 +543,71 @@ class Queue(val router: LocalRouter, val
false
} else {
- // Don't even enqueue if the message has expired or the queue has stopped.
+ // We may need to drop this enqueue or head entries due
+ // to the drop policy.
+ var drop = false
+ if( full_drop_policy ne Block ) {
+
+ def eval_drop(entry:QueueEntry) = entry.state match {
+ case state: entry.Loaded =>
+ var next = entry.getNext
+ if (!entry.is_acquired) {
+ dequeue_item_counter += 1
+ dequeue_size_counter += entry.size
+ dequeue_ts = now
+ entry.remove
+ }
+ next
+ case state: entry.Swapped =>
+ var next = entry.getNext
+ if (!entry.is_acquired) {
+ dequeue_item_counter += 1
+ dequeue_size_counter += entry.size
+ dequeue_ts = now
+ entry.remove
+ }
+ next
+ case state: entry.SwappedRange =>
+ // we need to load in the range before we can drop entries..
+ entry.load(null)
+ null
+ }
+
+ if( tune_persistent ) {
+ var exceeded = is_quota_exceeded
+ if( exceeded) {
+ full_drop_policy match {
+ case Block =>
+ case DropTail =>
+ drop = true // we can drop this enqueue attempt.
+ case DropHead =>
+ var entry = head_entry.getNext
+ while(entry!=null && is_quota_exceeded) {
+ entry = eval_drop(entry)
+ }
+ }
+ }
+ } else {
+ if( is_enqueue_buffer_maxed) {
+ full_drop_policy match {
+ case DropTail =>
+ drop = true // we can drop this enqueue attempt.
+ case DropHead =>
+ var entry = head_entry.getNext
+ while(entry!=null && is_enqueue_buffer_maxed) {
+ entry = eval_drop(entry)
+ }
+ }
+ }
+ }
+ }
+
val expiration = delivery.expiration
val expired = expiration != 0 && expiration <= now
- if( !service_state.is_started || expired) {
+ // Don't even enqueue if the message has expired or
+ // the queue has stopped or message needs to get dropped.
+ if( !service_state.is_started || expired || drop) {
if( delivery.ack!=null ) {
delivery.ack(if ( expired ) Expired else Undelivered, delivery.uow)
}
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala?rev=1308215&r1=1308214&r2=1308215&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala Sun Apr 1 23:03:29 2012
@@ -20,6 +20,7 @@ import org.apache.activemq.apollo.util.{
import java.net.InetSocketAddress
import org.apache.activemq.apollo.util._
import FileSupport._
+import org.fusesource.hawtdispatch._
import org.apache.activemq.apollo.dto.{AggregateDestMetricsDTO, QueueStatusDTO, TopicStatusDTO}
/**
@@ -121,7 +122,10 @@ class BrokerFunSuiteSupport extends FunS
val host = broker.default_virtual_host
sync(host) {
val router = host.router.asInstanceOf[LocalRouter]
- router.local_queue_domain.destination_by_id.get(name).get.status(false)
+ val queue = router.local_queue_domain.destination_by_id.get(name).get
+ sync(queue) {
+ queue.status(false)
+ }
}
}
Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java?rev=1308215&r1=1308214&r2=1308215&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java Sun Apr 1 23:03:29 2012
@@ -83,7 +83,7 @@ public class QueueDTO extends StringIdDT
public Integer swap_range_size;
/**
- * The maximum amount of disk space the queue is allowed
+ * The maximum amount of size the queue is allowed
* to grow to. If not set then there is no limit. You can
* use settings values like: 500mb or 1g just plain 1024000
*/
@@ -91,6 +91,24 @@ public class QueueDTO extends StringIdDT
public String quota;
/**
+ * Once the queue is full, the `full_policy` controls how the
+ * queue behaves when additional messages attempt to be enqueued
+ * onto the queue.
+ *
+ * You can set it to one of the following options:
+ * `block`: The producer blocks until some space frees up.
+ * `drop tail`: Drops new messages being enqueued on the queue.
+ * `drop head`: Drops old messages at the front of the queue.
+ *
+ * If the queue is persistent then it is considered full when the max
+ * quota size is reached. If the queue is not persistent then
+ * the queue is considered full once it's `tail_buffer` fills up.
+ * Defaults to 'block' if not specified.
+ */
+ @XmlAttribute(name="full_policy")
+ public String full_policy;
+
+ /**
* The message delivery rate (in bytes/sec) at which
* the queue considers the consumers fast and
* may start slowing down producers to match the consumption
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-leveldb.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-leveldb.xml?rev=1308215&r1=1308214&r2=1308215&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-leveldb.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-leveldb.xml Sun Apr 1 23:03:29 2012
@@ -21,6 +21,11 @@
<virtual_host id="default">
<host_name>localhost</host_name>
+ <queue id="drop.head.persistent" full_policy="drop head" quota="100k"/>
+ <queue id="drop.tail.persistent" full_policy="drop tail" quota="100k"/>
+ <queue id="drop.head.non" full_policy="drop head" tail_buffer="100k" persistent="false"/>
+ <queue id="drop.tail.non" full_policy="drop tail" tail_buffer="100k" persistent="false"/>
+
<queue id="nacker.**" dlq="dlq.*" nak_limit="2"/>
<queue id="mirrored.**" mirrored="true"/>
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala?rev=1308215&r1=1308214&r2=1308215&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala Sun Apr 1 23:03:29 2012
@@ -2413,4 +2413,72 @@ class StompNackTest extends StompTestSup
class StompNackTestOnLevelDBTest extends StompNackTest {
override val broker_config_uri: String = "xml:classpath:apollo-stomp-leveldb.xml"
+}
+
+class StompDropPolicyTest extends StompTestSupport {
+
+ override val broker_config_uri: String = "xml:classpath:apollo-stomp-leveldb.xml"
+
+ test("Head Drop Policy: Persistent") {
+ connect("1.1")
+ // Some of these messages should get dropped.
+ for(i <- 0 until 1000) {
+ sync_send("/queue/drop.head.persistent", "%0100d".format(i))
+ }
+ subscribe("0", "/queue/drop.head.persistent")
+ for(i <- 446 until 1000) {
+ assert_received("%0100d".format(i))
+ }
+ }
+
+ test("Head Drop Policy: Non Persistent") {
+ connect("1.1")
+ // Some of these messages should get dropped.
+ for(i <- 0 until 1000) {
+ sync_send("/queue/drop.head.non", "%0100d".format(i))
+ }
+ subscribe("0", "/queue/drop.head.non")
+ for(i <- 427 until 1000) {
+ assert_received("%0100d".format(i))
+ }
+ }
+
+ test("Tail Drop Policy: Persistent") {
+ connect("1.1")
+ // Some of these messages should get dropped.
+ for(i <- 0 until 1000) {
+ sync_send("/queue/drop.tail.persistent", "%0100d".format(i))
+ }
+
+ val metrics = queue_status("drop.tail.persistent").metrics
+ metrics.queue_items should be < ( 1000L )
+
+ subscribe("0", "/queue/drop.tail.persistent")
+ for(i <- 0L until metrics.queue_items) {
+ assert_received("%0100d".format(i))
+ }
+
+ async_send("/queue/drop.tail.persistent", "end")
+ assert_received("end")
+
+ }
+
+ test("Tail Drop Policy: Non Persistent") {
+ connect("1.1")
+ // Some of these messages should get dropped.
+ for(i <- 0 until 1000) {
+ sync_send("/queue/drop.tail.non", "%0100d".format(i))
+ }
+
+ val metrics = queue_status("drop.tail.non").metrics
+ metrics.queue_items should be < ( 1000L )
+
+ subscribe("0", "/queue/drop.tail.non")
+ for(i <- 0L until metrics.queue_items) {
+ assert_received("%0100d".format(i))
+ }
+
+ async_send("/queue/drop.tail.non", "end")
+ assert_received("end")
+ }
}
\ No newline at end of file
Modified: activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md?rev=1308215&r1=1308214&r2=1308215&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md (original)
+++ activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md Sun Apr 1 23:03:29 2012
@@ -368,9 +368,26 @@ memory. Defaults to true.
will be dropped. If '*' appears in the name it will be replaced with
the queue's id.
-* `max_enqueue_rate`: The maximum enqueue rate of the queue. Producers
- will be flow controlled once this enqueue rate is reached. If not set
- then it is disabled
+* `nak_limit`: Once a message has been nacked the configured
+ number of times the message will be considered to be a
+ poison message and will get moved to the dead letter queue if that's
+ configured or dropped. If set to less than one, then the message
+ will never be considered to be a poison message. Defaults to zero.
+
+* `full_drop_policy`: Once the queue is full, the `full_policy`
+ controls how the queue behaves when additional messages attempt to
+ be enqueued onto the queue.
+
+ You can set it to one of the following options:
+
+ * `block`: The producer blocks until some space frees up.
+ * `drop tail`: Drops the new messages being enqueued on the queue.
+ * `drop head`: Drops old messages at the front of the queue.
+
+ If the queue is persistent then it is considered full when the max
+ quota size is reached. If the queue is not persistent then
+ the queue is considered full once it's `tail_buffer` fills up.
+ Defaults to 'block' if not specified.
##### Topics