You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/12/08 18:41:20 UTC

svn commit: r1212001 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java apollo-website/src/documentation/user-manual.md

Author: chirino
Date: Thu Dec  8 17:41:19 2011
New Revision: 1212001

URL: http://svn.apache.org/viewvc?rev=1212001&view=rev
Log:
More tweaks to get APLO-96 more perfect.  We now doing swaps when fast consumers are attached.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java
    activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1212001&r1=1212000&r2=1212001&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Thu Dec  8 17:41:19 2011
@@ -37,7 +37,6 @@ object Queue extends Log {
   val subcsription_counter = new AtomicInteger(0)
 
   val PREFTCH_LOAD_FLAG = 1.toByte
-  val PREFTCH_HOLD_FLAG = 2.toByte
 
   class MemorySpace {
     var items = 0
@@ -151,7 +150,7 @@ class Queue(val router: LocalRouter, val
    *  the queue enables a enqueue rate throttle
    *  to allow consumers to catchup with producers.
    */
-  var tune_catchup_delivery_rate = 0
+  var tune_fast_delivery_rate = 0
   
   /**
    *  The rate at which to throttle producers when
@@ -170,8 +169,8 @@ class Queue(val router: LocalRouter, val
     tune_swap = tune_persistent && config.swap.getOrElse(true)
     tune_swap_range_size = config.swap_range_size.getOrElse(10000)
     tune_consumer_buffer = Option(config.consumer_buffer).map(MemoryPropertyEditor.parse(_).toInt).getOrElse(256*1024)
-    tune_catchup_delivery_rate = Option(config.catchup_delivery_rate).map(MemoryPropertyEditor.parse(_).toInt).getOrElse(-1)
-    tune_catchup_enqueue_rate = Option(config.catchup_enqueue_rate).map(MemoryPropertyEditor.parse(_).toInt).getOrElse(tune_catchup_delivery_rate)
+    tune_fast_delivery_rate = Option(config.fast_delivery_rate).map(MemoryPropertyEditor.parse(_).toInt).getOrElse(1024*1024)
+    tune_catchup_enqueue_rate = Option(config.catchup_enqueue_rate).map(MemoryPropertyEditor.parse(_).toInt).getOrElse(-1)
     tune_max_enqueue_rate = Option(config.max_enqueue_rate).map(MemoryPropertyEditor.parse(_).toInt).getOrElse(-1)
 
     tune_quota = Option(config.quota).map(MemoryPropertyEditor.parse(_)).getOrElse(-1)
@@ -229,7 +228,7 @@ class Queue(val router: LocalRouter, val
 
   var producer_counter = 0L
   var consumer_counter = 0L
-  var tail_prefetch = 0L
+  var consumers_keeping_up = true
 
   var individual_swapped_items = 0
 
@@ -550,23 +549,7 @@ class Queue(val router: LocalRouter, val
           entry.dispatch
         }
 
-        if( entry.as_loaded.acquired) {
-          // Enqueued message aquired.
-        } else if( tail_prefetch > 0 ) {
-          // Enqueued message prefeteched.
-          tail_prefetch -= entry.size
-          entry.prefetch_flags = PREFTCH_LOAD_FLAG
-          entry.load(consumer_swapped_in)
-        } else {
-//          val prev = entry.getPrevious
-//          if( (prev.as_loaded!=null && prev.as_loaded.swapping_out) || (prev.as_swapped!=null && !prev.as_swapped.swapping_in) ) {
-//            // Swap it out ASAP
-//            entry.swap(true)
-//            println("Enqueued message swapped.")
-//          } else {
-//            trigger_swap
-//            // Avoid swapping right away..
-//          }
+        if( !(consumers_keeping_up || entry.as_loaded.acquired) ) {
           entry.swap(true)
         }
 
@@ -576,6 +559,10 @@ class Queue(val router: LocalRouter, val
           queueDelivery.uow = null
         }
 
+        
+        if( full ) {
+          trigger_swap
+        }
         true
       }
     }
@@ -642,6 +629,8 @@ class Queue(val router: LocalRouter, val
     }
   }
 
+  var keep_up_delivery_rate = 0L
+  
   def swap_messages:Unit = {
     dispatch_queue.assertExecuting()
 
@@ -679,9 +668,12 @@ class Queue(val router: LocalRouter, val
     }
 
     // Set the prefetch flags
+    val was_keepingup = consumers_keeping_up
+    consumers_keeping_up = false
     all_subscriptions.valuesIterator.foreach{ x=>
       x.refill_prefetch
     }
+    consumers_keeping_up = consumers_keeping_up && delivery_rate > tune_fast_delivery_rate
 
     // swap out messages.
     cur = entries.getHead
@@ -690,8 +682,11 @@ class Queue(val router: LocalRouter, val
       val loaded = cur.as_loaded
       if( loaded!=null ) {
         if( cur.prefetch_flags==0 && !loaded.acquired  ) {
-          val asap = !cur.as_loaded.acquired
-          cur.swap(asap)
+          if( consumers_keeping_up && (loaded.space eq producer_swapped_in)) {
+            // don't move out. keeps producer mem maxed to slow down the producer
+          } else {
+            cur.swap(true)
+          }
         } else {
           cur.load(consumer_swapped_in)
         }
@@ -770,7 +765,7 @@ class Queue(val router: LocalRouter, val
 
       // Figure out what the max enqueue rate should be.
       max_enqueue_rate = Int.MaxValue
-      if( tune_catchup_delivery_rate>=0 && tune_catchup_enqueue_rate>=0 && delivery_rate>tune_catchup_delivery_rate && swapped_out_size > 0 && stall_ratio < 1.0 ) {
+      if( tune_fast_delivery_rate>=0 && tune_catchup_enqueue_rate>=0 && delivery_rate>tune_fast_delivery_rate && swapped_out_size > 0 && stall_ratio < 1.0 ) {
         max_enqueue_rate = tune_catchup_enqueue_rate
       }
       if(tune_max_enqueue_rate >=0 ) {
@@ -1904,7 +1899,6 @@ class Subscription(val queue:Queue, val 
       consumer.release
 
       queue.check_idle
-      queue.tail_prefetch = 0
       queue.trigger_swap
     } else {}
   }
@@ -1920,9 +1914,6 @@ class Subscription(val queue:Queue, val 
     if( tail_parked ) {
         if(session.consumer.close_on_drain) {
           close
-        } else {
-          var remaining = queue.tune_consumer_buffer - acquired_size;
-          queue.tail_prefetch = queue.tail_prefetch.max(remaining)
         }
     }
   }
@@ -2023,12 +2014,11 @@ class Subscription(val queue:Queue, val 
       cursor = next
     }
     
-    // If we hit the tail.. credit it so that we avoid swapping too soon.
-    if( cursor == null ) {
-      queue.tail_prefetch = queue.tail_prefetch.max(((enqueue_size_per_interval/2) - remaining).max(remaining))
+    // If we hit the tail or the producer swap in area.. let the queue know we are keeping up.
+    if( !queue.consumers_keeping_up && (cursor == null || (cursor.as_loaded!=null && (cursor.as_loaded.space eq queue.producer_swapped_in))) ) {
+      queue.consumers_keeping_up = true
     }
 
-
   }
 
   class AcquiredQueueEntry(val entry:QueueEntry) extends LinkedNode[AcquiredQueueEntry] {

Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java?rev=1212001&r1=1212000&r2=1212001&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java Thu Dec  8 17:41:19 2011
@@ -87,15 +87,22 @@ public class QueueDTO extends StringIdDT
 
     /**
      *  The message delivery rate (in bytes/sec) at which
-     *  the queue enables a producer rate throttle
-     *  to allow consumers to catchup with producers.
+     *  the queue considers the consumers fast and
+     *  may start slowing down producers to match the consumption
+     *  rate if the consumers are at the tail of the queue.
      */
-    @XmlAttribute(name="catchup_delivery_rate")
-    public String catchup_delivery_rate;
+    @XmlAttribute(name="fast_delivery_rate")
+    public String fast_delivery_rate;
 
     /**
-     *  The rate at which to throttle enqueues when
-     *  consumers are catching up.
+     * If set, and the the current delivery
+     * rate is exceeding the configured value
+     * of fast_delivery_rate and the consumers
+     * are spending more time loading from
+     * the store than delivering, then the
+     * enqueue rate will be throttled to the
+     * specified value so that the consumers
+     * can catch up and reach the tail of the queue.
      */
     @XmlAttribute(name="catchup_enqueue_rate")
     public String catchup_enqueue_rate;
@@ -122,7 +129,7 @@ public class QueueDTO extends StringIdDT
 
         if (auto_delete_after != null ? !auto_delete_after.equals(queueDTO.auto_delete_after) : queueDTO.auto_delete_after != null)
             return false;
-        if (catchup_delivery_rate != null ? !catchup_delivery_rate.equals(queueDTO.catchup_delivery_rate) : queueDTO.catchup_delivery_rate != null)
+        if (fast_delivery_rate != null ? !fast_delivery_rate.equals(queueDTO.fast_delivery_rate) : queueDTO.fast_delivery_rate != null)
             return false;
         if (catchup_enqueue_rate != null ? !catchup_enqueue_rate.equals(queueDTO.catchup_enqueue_rate) : queueDTO.catchup_enqueue_rate != null)
             return false;
@@ -151,7 +158,7 @@ public class QueueDTO extends StringIdDT
         result = 31 * result + (swap != null ? swap.hashCode() : 0);
         result = 31 * result + (swap_range_size != null ? swap_range_size.hashCode() : 0);
         result = 31 * result + (quota != null ? quota.hashCode() : 0);
-        result = 31 * result + (catchup_delivery_rate != null ? catchup_delivery_rate.hashCode() : 0);
+        result = 31 * result + (fast_delivery_rate != null ? fast_delivery_rate.hashCode() : 0);
         result = 31 * result + (catchup_enqueue_rate != null ? catchup_enqueue_rate.hashCode() : 0);
         result = 31 * result + (max_enqueue_rate != null ? max_enqueue_rate.hashCode() : 0);
         result = 31 * result + (other != null ? other.hashCode() : 0);

Modified: activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md?rev=1212001&r1=1212000&r2=1212001&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md (original)
+++ activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md Thu Dec  8 17:41:19 2011
@@ -269,19 +269,22 @@ memory.  Defaults to true.
   delete once there have been no consumers, producers or messages on it
   for the configured number of seconds.  Defaults to 300 if not set.
   
+* `fast_delivery_rate`: The message delivery rate (in bytes/sec) at which             
+  the queue considers the consumers fast enough to start slowing down enqueue
+  rate to match the consumption rate if the consumers are at the 
+  tail of the queue.           
+  
+* `catchup_enqueue_rate`:  If set, and the the current delivery
+   rate is exceeding the configured value of `fast_delivery_rate` and 
+   the consumers are spending more time loading from the store than 
+   delivering, then the enqueue rate will be throttled to the
+   specified value so that the consumers can catch up and reach the 
+   tail of the queue.
+
 * `max_enqueue_rate`: The maximum enqueue rate of the queue.  Producers
   will be flow controlled once this enqueue rate is reached.  If not set
   then it is disabled
 
-* `catchup_delivery_rate`: The message delivery rate (in bytes/sec) at which
-  the queue enables a producer rate throttle to allow consumers to catchup 
-  with producers.  The the consumers must also be stalling on message
-  loading longer than they stall to deliver.  If not set then the
-  feature is disabled.
-  
-* `catchup_enqueue_rate`:  The rate at which to throttle enqueues when
-  consumers are catching up.  Defaults to the rate configured in
-  `catchup_delivery_rate` 
 
 ##### Topics