You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/04/02 01:03:30 UTC

svn commit: r1308215 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-broker/src/test/scala/ apollo-dto/src/main/java/org/apache/activemq/apollo/dto/ apollo-stomp/src/test/resources/ apollo-st...

Author: chirino
Date: Sun Apr  1 23:03:29 2012
New Revision: 1308215

URL: http://svn.apache.org/viewvc?rev=1308215&view=rev
Log:
Fixes APLO-76: It would be nice to be able to configure Apollo's behavior when a queue is full.  You can now block, or drop from the tail or head of the queue.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-leveldb.xml
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
    activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1308215&r1=1308214&r2=1308215&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Sun Apr  1 23:03:29 2012
@@ -34,6 +34,11 @@ import org.apache.activemq.apollo.dto._
 import org.fusesource.hawtbuf._
 import java.util.regex.Pattern
 
+sealed trait FullDropPolicy
+object Block extends FullDropPolicy
+object DropHead extends FullDropPolicy
+object DropTail extends FullDropPolicy
+
 object Queue extends Log {
   val subcsription_counter = new AtomicInteger(0)
 
@@ -106,10 +111,6 @@ class Queue(val router: LocalRouter, val
   entries.addFirst(head_entry)
 
   //
-  // In-frequently accessed tuning configuration.
-  //
-
-  //
   // Frequently accessed tuning configuration.
   //
 
@@ -214,6 +215,7 @@ class Queue(val router: LocalRouter, val
   def swapped_in_size_max = this.producer_swapped_in.size_max + this.consumer_swapped_in.size_max
 
   var config:QueueDTO = _
+  var full_drop_policy:FullDropPolicy = Block
 
   def dlq_nak_limit = OptionSupport(config.nak_limit).getOrElse(0)
 
@@ -232,6 +234,15 @@ class Queue(val router: LocalRouter, val
     tune_max_enqueue_rate = mem_size(update.max_enqueue_rate,"-1")
     tune_quota = mem_size(update.quota,"-1")
 
+    full_drop_policy = Option(update.full_policy).getOrElse("none").toLowerCase match {
+      case "drop head" => DropHead
+      case "drop tail" => DropTail
+      case "block" => Block
+      case _ =>
+        warn("Invalid 'full_drop_policy' configured for queue '%s': '%s'", id, update.full_policy)
+        Block
+    }
+    
     auto_delete_after = update.auto_delete_after.getOrElse(30)
     if( auto_delete_after!= 0 ) {
       // we don't auto delete explicitly configured queues,
@@ -500,7 +511,7 @@ class Queue(val router: LocalRouter, val
   }
 
   def change_producer_capacity(amount:Int) = might_unfill {
-    producer_swapped_in.size_max += amount
+    // producer_swapped_in.size_max += amount
   }
   def change_consumer_capacity(amount:Int) = might_unfill {
     consumer_swapped_in.size_max += amount
@@ -515,7 +526,12 @@ class Queue(val router: LocalRouter, val
     def is_enqueue_buffer_maxed = (producer_swapped_in.size >= producer_swapped_in.size_max)
 
     def full = if( service_state.is_started ) {
-      is_enqueue_buffer_maxed || is_enqueue_throttled || is_quota_exceeded
+      if ( full_drop_policy eq Block ) {
+        is_enqueue_buffer_maxed || is_enqueue_throttled || is_quota_exceeded
+      } else {
+        // we are never full since we can just drop messages at will.
+        false
+      }
     } else if( service_state.is_starting) {
       true
     } else {
@@ -527,11 +543,71 @@ class Queue(val router: LocalRouter, val
         false
       } else {
 
-        // Don't even enqueue if the message has expired or the queue has stopped.
+        // We may need to drop this enqueue or head entries due
+        // to the drop policy.
+        var drop = false
+        if( full_drop_policy ne Block ) {
+
+          def eval_drop(entry:QueueEntry) = entry.state match {
+            case state: entry.Loaded =>
+              var next = entry.getNext
+              if (!entry.is_acquired) {
+                dequeue_item_counter += 1
+                dequeue_size_counter += entry.size
+                dequeue_ts = now
+                entry.remove
+              }
+              next
+            case state: entry.Swapped =>
+              var next = entry.getNext
+              if (!entry.is_acquired) {
+                dequeue_item_counter += 1
+                dequeue_size_counter += entry.size
+                dequeue_ts = now
+                entry.remove
+              }
+              next
+            case state: entry.SwappedRange =>
+              // we need to load in the range before we can drop entries..
+              entry.load(null)
+              null
+          }
+
+          if( tune_persistent ) {
+            var exceeded = is_quota_exceeded
+            if( exceeded) {
+              full_drop_policy match {
+                case Block =>
+                case DropTail =>
+                  drop = true // we can drop this enqueue attempt.
+                case DropHead =>
+                  var entry = head_entry.getNext
+                  while(entry!=null && is_quota_exceeded) {
+                    entry = eval_drop(entry)
+                  }
+              }
+            }
+          } else {
+            if( is_enqueue_buffer_maxed) {
+              full_drop_policy match {
+                case DropTail =>
+                  drop = true // we can drop this enqueue attempt.
+                case DropHead =>
+                  var entry = head_entry.getNext
+                  while(entry!=null && is_enqueue_buffer_maxed) {
+                    entry = eval_drop(entry)
+                  }
+              }
+            }
+          }
+        }
+        
         val expiration = delivery.expiration
         val expired = expiration != 0 && expiration <= now
 
-        if( !service_state.is_started || expired) {
+        // Don't even enqueue if the message has expired or
+        // the queue has stopped or message needs to get dropped.
+        if( !service_state.is_started || expired || drop) {
           if( delivery.ack!=null ) {
             delivery.ack(if ( expired ) Expired else Undelivered, delivery.uow)
           }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala?rev=1308215&r1=1308214&r2=1308215&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala Sun Apr  1 23:03:29 2012
@@ -20,6 +20,7 @@ import org.apache.activemq.apollo.util.{
 import java.net.InetSocketAddress
 import org.apache.activemq.apollo.util._
 import FileSupport._
+import org.fusesource.hawtdispatch._
 import org.apache.activemq.apollo.dto.{AggregateDestMetricsDTO, QueueStatusDTO, TopicStatusDTO}
 
 /**
@@ -121,7 +122,10 @@ class BrokerFunSuiteSupport extends FunS
     val host = broker.default_virtual_host
     sync(host) {
       val router = host.router.asInstanceOf[LocalRouter]
-      router.local_queue_domain.destination_by_id.get(name).get.status(false)
+      val queue = router.local_queue_domain.destination_by_id.get(name).get
+      sync(queue) {
+        queue.status(false)
+      }
     }
   }
 

Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java?rev=1308215&r1=1308214&r2=1308215&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java Sun Apr  1 23:03:29 2012
@@ -83,7 +83,7 @@ public class QueueDTO extends StringIdDT
     public Integer swap_range_size;
 
     /**
-     * The maximum amount of disk space the queue is allowed
+     * The maximum amount of size the queue is allowed
      * to grow to.  If not set then there is no limit.  You can
      * use settings values like: 500mb or 1g just plain 1024000
      */
@@ -91,6 +91,24 @@ public class QueueDTO extends StringIdDT
     public String quota;
 
     /**
+     * Once the queue is full, the `full_policy` controls how the
+     * queue behaves when additional messages attempt to be enqueued
+     * onto the queue.
+     *
+     * You can set it to one of the following options:
+     *  `block`: The producer blocks until some space frees up.
+     *  `drop tail`: Drops new messages being enqueued on the queue.
+     *  `drop head`: Drops old messages at the front of the queue.
+     *
+     * If the queue is persistent then it is considered full when the max
+     * quota size is reached.  If the queue is not persistent then
+     * the queue is considered full once it's `tail_buffer` fills up.
+     * Defaults to 'block' if not specified.
+     */
+    @XmlAttribute(name="full_policy")
+    public String full_policy;
+
+    /**
      *  The message delivery rate (in bytes/sec) at which
      *  the queue considers the consumers fast and
      *  may start slowing down producers to match the consumption

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-leveldb.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-leveldb.xml?rev=1308215&r1=1308214&r2=1308215&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-leveldb.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-leveldb.xml Sun Apr  1 23:03:29 2012
@@ -21,6 +21,11 @@
   <virtual_host id="default">
     <host_name>localhost</host_name>
 
+    <queue id="drop.head.persistent" full_policy="drop head" quota="100k"/>
+    <queue id="drop.tail.persistent" full_policy="drop tail" quota="100k"/>
+    <queue id="drop.head.non" full_policy="drop head" tail_buffer="100k" persistent="false"/>
+    <queue id="drop.tail.non" full_policy="drop tail" tail_buffer="100k" persistent="false"/>
+
     <queue id="nacker.**" dlq="dlq.*" nak_limit="2"/>
     <queue id="mirrored.**" mirrored="true"/>
 

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala?rev=1308215&r1=1308214&r2=1308215&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala Sun Apr  1 23:03:29 2012
@@ -2413,4 +2413,72 @@ class StompNackTest extends StompTestSup
 
 class StompNackTestOnLevelDBTest extends StompNackTest {
   override val broker_config_uri: String = "xml:classpath:apollo-stomp-leveldb.xml"
+}
+
+class StompDropPolicyTest extends StompTestSupport {
+
+  override val broker_config_uri: String = "xml:classpath:apollo-stomp-leveldb.xml"
+
+  test("Head Drop Policy: Persistent") {
+    connect("1.1")
+    // Some of these messages should get dropped.
+    for(i <- 0 until 1000) {
+      sync_send("/queue/drop.head.persistent", "%0100d".format(i))
+    }
+    subscribe("0", "/queue/drop.head.persistent")
+    for(i <- 446 until 1000) {
+      assert_received("%0100d".format(i))
+    }
+  }
+
+  test("Head Drop Policy: Non Persistent") {
+    connect("1.1")
+    // Some of these messages should get dropped.
+    for(i <- 0 until 1000) {
+      sync_send("/queue/drop.head.non", "%0100d".format(i))
+    }
+    subscribe("0", "/queue/drop.head.non")
+    for(i <- 427 until 1000) {
+      assert_received("%0100d".format(i))
+    }
+  }
+
+  test("Tail Drop Policy: Persistent") {
+    connect("1.1")
+    // Some of these messages should get dropped.
+    for(i <- 0 until 1000) {
+      sync_send("/queue/drop.tail.persistent", "%0100d".format(i))
+    }
+
+    val metrics = queue_status("drop.tail.persistent").metrics
+    metrics.queue_items should be < ( 1000L )
+
+    subscribe("0", "/queue/drop.tail.persistent")
+    for(i <- 0L until metrics.queue_items) {
+      assert_received("%0100d".format(i))
+    }
+
+    async_send("/queue/drop.tail.persistent", "end")
+    assert_received("end")
+
+  }
+
+  test("Tail Drop Policy: Non Persistent") {
+    connect("1.1")
+    // Some of these messages should get dropped.
+    for(i <- 0 until 1000) {
+      sync_send("/queue/drop.tail.non", "%0100d".format(i))
+    }
+
+    val metrics = queue_status("drop.tail.non").metrics
+    metrics.queue_items should be < ( 1000L )
+
+    subscribe("0", "/queue/drop.tail.non")
+    for(i <- 0L until metrics.queue_items) {
+      assert_received("%0100d".format(i))
+    }
+
+    async_send("/queue/drop.tail.non", "end")
+    assert_received("end")
+  }
 }
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md?rev=1308215&r1=1308214&r2=1308215&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md (original)
+++ activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md Sun Apr  1 23:03:29 2012
@@ -368,9 +368,26 @@ memory.  Defaults to true.
    will be dropped.  If '*' appears in the name it will be replaced with 
    the queue's id.
 
-* `max_enqueue_rate`: The maximum enqueue rate of the queue.  Producers
-  will be flow controlled once this enqueue rate is reached.  If not set
-  then it is disabled
+* `nak_limit`: Once a message has been nacked the configured
+   number of times the message will be considered to be a
+   poison message and will get moved to the dead letter queue if that's
+   configured or dropped.  If set to less than one, then the message
+   will never be considered to be a poison message. Defaults to zero.
+
+* `full_drop_policy`: Once the queue is full, the `full_policy` 
+  controls how the   queue behaves when additional messages attempt to 
+  be enqueued onto the queue.
+  
+  You can set it to one of the following options:
+
+   * `block`: The producer blocks until some space frees up.
+   * `drop tail`: Drops the new messages being enqueued on the queue.
+   * `drop head`: Drops old messages at the front of the queue.
+  
+  If the queue is persistent then it is considered full when the max
+  quota size is reached.  If the queue is not persistent then
+  the queue is considered full once it's `tail_buffer` fills up.
+  Defaults to 'block' if not specified.
 
 ##### Topics