You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/12/07 23:27:42 UTC

svn commit: r1211679 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/ apollo-dto/src/main/java/org/apache/activemq/apollo/dto/ apollo-w...

Author: chirino
Date: Wed Dec  7 22:27:41 2011
New Revision: 1211679

URL: http://svn.apache.org/viewvc?rev=1211679&view=rev
Log:
Fixes APLO-96 : Topic queue on a durable subscription throttles producer

Also added support to throttle the maximum enqueue rate of a queue and an option to only apply a throttle when you have fast consumers which being slowed down due to message persistence.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/DestinationConfigurationTest.scala
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java
    activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1211679&r1=1211678&r2=1211679&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Wed Dec  7 22:27:41 2011
@@ -38,6 +38,23 @@ object Queue extends Log {
 
   val PREFTCH_LOAD_FLAG = 1.toByte
   val PREFTCH_HOLD_FLAG = 2.toByte
+
+  class MemorySpace {
+    var items = 0
+    var size = 0
+    var size_max = 0
+
+    def +=(delivery:Delivery) = {
+      items += 1
+      size += delivery.size
+    }
+
+    def -=(delivery:Delivery) = {
+      items -= 1
+      size -= delivery.size
+    }
+  }
+
 }
 
 import Queue._
@@ -96,11 +113,6 @@ class Queue(val router: LocalRouter, val
   // In-frequently accessed tuning configuration.
   //
 
-  /**
-   *  The amount of memory buffer space for the queue..
-   */
-  def tune_queue_buffer = config.queue_buffer.getOrElse(32*1024)
-
   //
   // Frequently accessed tuning configuration.
   //
@@ -133,13 +145,34 @@ class Queue(val router: LocalRouter, val
    *  The max memory to allow this queue to grow to.
    */
   var tune_quota = -1L
+  
+  /**
+   *  The message delivery rate (in bytes/sec) at which
+   *  the queue enables a enqueue rate throttle
+   *  to allow consumers to catchup with producers.
+   */
+  var tune_catchup_delivery_rate = 0
+  
+  /**
+   *  The rate at which to throttle producers when
+   *  consumers are catching up.  
+   */
+  var tune_catchup_enqueue_rate = 0
+
+  /**
+   *  Tthe rate at which producers are throttled at.
+   */
+  var tune_max_enqueue_rate = 0
 
   def configure(c:QueueDTO) = {
     config = c
     tune_persistent = virtual_host.store !=null && config.persistent.getOrElse(true)
     tune_swap = tune_persistent && config.swap.getOrElse(true)
     tune_swap_range_size = config.swap_range_size.getOrElse(10000)
-    tune_consumer_buffer = config.consumer_buffer.getOrElse(256*1024)
+    tune_consumer_buffer = Option(config.consumer_buffer).map(MemoryPropertyEditor.parse(_).toInt).getOrElse(256*1024)
+    tune_catchup_delivery_rate = Option(config.catchup_delivery_rate).map(MemoryPropertyEditor.parse(_).toInt).getOrElse(-1)
+    tune_catchup_enqueue_rate = Option(config.catchup_enqueue_rate).map(MemoryPropertyEditor.parse(_).toInt).getOrElse(tune_catchup_delivery_rate)
+    tune_max_enqueue_rate = Option(config.max_enqueue_rate).map(MemoryPropertyEditor.parse(_).toInt).getOrElse(-1)
 
     tune_quota = Option(config.quota).map(MemoryPropertyEditor.parse(_)).getOrElse(-1)
 
@@ -185,10 +218,8 @@ class Queue(val router: LocalRouter, val
   var swapping_in_size = 0
   var swapping_out_size = 0
 
-  var swapped_in_items = 0
-  var swapped_in_size = 0
-
-  var swapped_in_size_max = 0
+  val producer_swapped_in = new MemorySpace
+  val consumer_swapped_in = new MemorySpace
 
   var swap_out_item_counter = 0L
   var swap_out_size_counter = 0L
@@ -198,6 +229,7 @@ class Queue(val router: LocalRouter, val
 
   var producer_counter = 0L
   var consumer_counter = 0L
+  var tail_prefetch = 0L
 
   var individual_swapped_items = 0
 
@@ -210,6 +242,10 @@ class Queue(val router: LocalRouter, val
   var auto_delete_after = 0
   var idled_at = 0L
 
+  def swapped_in_items = this.producer_swapped_in.items + this.consumer_swapped_in.items
+  def swapped_in_size = this.producer_swapped_in.size + this.consumer_swapped_in.size
+  def swapped_in_size_max = this.producer_swapped_in.size_max + this.consumer_swapped_in.size_max
+
   def get_queue_metrics:DestMetricsDTO = {
     dispatch_queue.assertExecuting()
     val rc = new DestMetricsDTO
@@ -241,10 +277,9 @@ class Queue(val router: LocalRouter, val
     rc.swapping_in_size = this.swapping_in_size
     rc.swapping_out_size = this.swapping_out_size
 
-    rc.swapped_in_items = this.swapped_in_items
-    rc.swapped_in_size = this.swapped_in_size
-
-    rc.swapped_in_size_max = this.swapped_in_size_max
+    rc.swapped_in_items = swapped_in_items
+    rc.swapped_in_size = swapped_in_size
+    rc.swapped_in_size_max = swapped_in_size_max
 
     rc.producer_counter = this.producer_counter
     rc.consumer_counter = this.consumer_counter
@@ -262,7 +297,7 @@ class Queue(val router: LocalRouter, val
     rc.binding = this.binding.binding_dto
     rc.config = this.config
     rc.metrics = this.get_queue_metrics
-    rc.metrics.current_time = System.currentTimeMillis()
+    rc.metrics.current_time = now
 
     if( entries ) {
       var cur = this.head_entry
@@ -340,7 +375,6 @@ class Queue(val router: LocalRouter, val
 
     val prev_persistent = tune_persistent
     val prev_consumer_size = tune_consumer_buffer
-    val prev_queue_buffer = tune_queue_buffer
 
     configure(binding.config(virtual_host))
 
@@ -351,13 +385,11 @@ class Queue(val router: LocalRouter, val
         // open session
         if( sub.session!=null ) {
           // change the queue capacity, by the change in consumer buffer change.
-          addCapacity(consumer_buffer_change)
+          change_consumer_capacity(consumer_buffer_change)
         }
       }
     }
 
-    swapped_in_size_max += (tune_queue_buffer-prev_queue_buffer)
-
     restore_from_store {
       check_idle
       trigger_swap
@@ -368,7 +400,6 @@ class Queue(val router: LocalRouter, val
   def check_idle {
     if (producers.isEmpty && all_subscriptions.isEmpty && queue_items==0 ) {
       if (idled_at==0) {
-        now = System.currentTimeMillis()
         idled_at = now
         if( auto_delete_after!=0 ) {
           dispatch_queue.after(auto_delete_after, TimeUnit.SECONDS) {
@@ -413,8 +444,6 @@ class Queue(val router: LocalRouter, val
   }
 
   protected def _start(on_completed: Runnable) = {
-    swapped_in_size_max += tune_queue_buffer;
-
     restore_from_store {
 
 
@@ -444,7 +473,6 @@ class Queue(val router: LocalRouter, val
       sub.close()
     }
 
-    swapped_in_size_max -= tune_queue_buffer;
     trigger_swap
 
     destination_dto match {
@@ -458,19 +486,29 @@ class Queue(val router: LocalRouter, val
     on_completed.run
   }
 
-  def addCapacity(amount:Int) = {
+  def might_unfill[T](func: =>T):T = {
     val was_full = messages.full
-    swapped_in_size_max += amount
-    if( was_full && !messages.full ) {
-      messages.refiller.run
+    try {
+      func
+    } finally {
+      if( was_full && !messages.full ) {
+        messages.refiller.run
+      }
     }
   }
 
+  def change_producer_capacity(amount:Int) = might_unfill {
+    producer_swapped_in.size_max += amount
+  }
+  def change_consumer_capacity(amount:Int) = might_unfill {
+    consumer_swapped_in.size_max += amount
+  }
+
   object messages extends Sink[Delivery] {
 
     var refiller: Runnable = null
 
-    def full = (swapped_in_size >= swapped_in_size_max) || !service_state.is_started || (tune_quota >=0 && queue_size > tune_quota)
+    def full = (producer_swapped_in.size >= producer_swapped_in.size_max) || is_enqueue_throttled || !service_state.is_started || (tune_quota >=0 && queue_size > tune_quota)
 
     def offer(delivery: Delivery): Boolean = {
       if (full) {
@@ -499,24 +537,36 @@ class Queue(val router: LocalRouter, val
         enqueue_size_counter += entry.size
         enqueue_ts = now;
 
+        // To decrease the enqueue throttle.
+        enqueue_remaining_take(entry.size)
 
         // Do we need to do a persistent enqueue???
         if (queueDelivery.uow != null) {
           entry.as_loaded.store
         }
 
-        var dispatched = false
         if( entry.hasSubs ) {
           // try to dispatch it directly...
           entry.dispatch
         }
 
-        val prev = entry.getPrevious
-
-        if( (prev.as_loaded!=null && prev.as_loaded.swapping_out ) || (prev.as_swapped!=null && !prev.as_swapped.swapping_in) ) {
-          entry.swap(!entry.as_loaded.acquired)
+        if( entry.as_loaded.acquired) {
+          // Enqueued message aquired.
+        } else if( tail_prefetch > 0 ) {
+          // Enqueued message prefeteched.
+          tail_prefetch -= entry.size
+          entry.prefetch_flags = PREFTCH_LOAD_FLAG
         } else {
-          trigger_swap
+//          val prev = entry.getPrevious
+//          if( (prev.as_loaded!=null && prev.as_loaded.swapping_out) || (prev.as_swapped!=null && !prev.as_swapped.swapping_in) ) {
+//            // Swap it out ASAP
+//            entry.swap(true)
+//            println("Enqueued message swapped.")
+//          } else {
+//            trigger_swap
+//            // Avoid swapping right away..
+//          }
+          entry.swap(true)
         }
 
         // release the store batch...
@@ -538,10 +588,11 @@ class Queue(val router: LocalRouter, val
 
   def expired(entry:QueueEntry, dequeue:Boolean=true):Unit = {
     if(dequeue) {
-      dequeue_item_counter += 1
-      dequeue_size_counter += entry.size
-      dequeue_ts = now
-      messages.refiller.run
+      might_unfill {
+        dequeue_item_counter += 1
+        dequeue_size_counter += entry.size
+        dequeue_ts = now
+      }
     }
 
     expired_ts = now
@@ -592,7 +643,6 @@ class Queue(val router: LocalRouter, val
 
   def swap_messages:Unit = {
     dispatch_queue.assertExecuting()
-    now = System.currentTimeMillis()
 
     if( !service_state.is_started )
       return
@@ -609,7 +659,7 @@ class Queue(val router: LocalRouter, val
         cur.state match {
           case x:QueueEntry#SwappedRange =>
             // load the range to expire the messages in it.
-            cur.load
+            cur.load(null)
           case x:QueueEntry#Swapped =>
             // remove the expired swapped message.
             expired(cur)
@@ -628,7 +678,9 @@ class Queue(val router: LocalRouter, val
     }
 
     // Set the prefetch flags
-    all_subscriptions.valuesIterator.foreach( _.refill_prefetch )
+    all_subscriptions.valuesIterator.foreach{ x=>
+      x.refill_prefetch
+    }
 
     // swap out messages.
     cur = entries.getHead
@@ -640,11 +692,11 @@ class Queue(val router: LocalRouter, val
           val asap = !cur.as_loaded.acquired
           cur.swap(asap)
         } else {
-          cur.load // just in case it's getting swapped.
+          cur.load(consumer_swapped_in)
         }
       }
       cur = next
-    }
+    }                               
 
 
     // Combine swapped items into swapped ranges
@@ -681,41 +733,93 @@ class Queue(val router: LocalRouter, val
       }
       debug("combined %d entries", combine_counter)
     }
+    
+    if(!messages.full) {
+      messages.refiller.run
+    }
 
   }
 
+  var delivery_rate = 0L
+  def swapped_out_size = queue_size - (producer_swapped_in.size + consumer_swapped_in.size)
+
   def schedule_periodic_maintenance:Unit = dispatch_queue.after(1, TimeUnit.SECONDS) {
     if( service_state.is_started ) {
-      now = System.currentTimeMillis
-
-      // target tune_min_subscription_rate / sec
-      all_subscriptions.foreach{ case (consumer, sub)=>
+      var elapsed = System.currentTimeMillis-now
+      now += elapsed
 
-        if ( sub.tail_parkings < 0 ) {
+      delivery_rate = 0L
 
-          // re-calc the avg_advanced_size
-          sub.advanced_sizes += sub.advanced_size
-          while( sub.advanced_sizes.size > 5 ) {
-            sub.advanced_sizes = sub.advanced_sizes.drop(1)
-          }
-          sub.avg_advanced_size = sub.advanced_sizes.foldLeft(0)(_ + _) /  sub.advanced_sizes.size
+      var consumer_stall_ms = 0L
+      var load_stall_ms = 0L
 
+      all_subscriptions.values.foreach{ sub=>
+        val (cs, ls) = sub.adjust_prefetch_size
+        consumer_stall_ms += cs
+        load_stall_ms += ls
+        if(!sub.browser) {
+          delivery_rate += sub.enqueue_size_per_interval
         }
+      }
+      
+      val rate_adjustment = elapsed.toFloat / 1000.toFloat
+      delivery_rate  = (delivery_rate / rate_adjustment).toLong
 
-        sub.total_advanced_size += sub.advanced_size
-        sub.advanced_size = 0
-        sub.tail_parkings = 0
+      val stall_ratio = ((consumer_stall_ms*100)+1).toFloat / ((load_stall_ms*100)+1).toFloat
 
+      // Figure out what the max enqueue rate should be.
+      max_enqueue_rate = Int.MaxValue
+      if( tune_catchup_delivery_rate>=0 && tune_catchup_enqueue_rate>=0 && delivery_rate>tune_catchup_delivery_rate && swapped_out_size > 0 && stall_ratio < 1.0 ) {
+        max_enqueue_rate = tune_catchup_enqueue_rate
+      }
+      if(tune_max_enqueue_rate >=0 ) {
+        max_enqueue_rate = max_enqueue_rate.min(tune_max_enqueue_rate)
+      }
+      if( max_enqueue_rate < Int.MaxValue ) {
+        if(enqueues_remaining==null) {
+          enqueues_remaining = new LongCounter()
+          enqueue_throttle_release(enqueues_remaining)
+        }
+      } else {
+        if(enqueues_remaining!=null) {
+          enqueues_remaining = null
+        }
       }
 
       swap_messages
       schedule_periodic_maintenance
     }
   }
+    
+  var max_enqueue_rate = Int.MaxValue
+  var enqueues_remaining:LongCounter = _
+  
+  def is_enqueue_throttled = enqueues_remaining!=null && enqueues_remaining.get() <= 0
 
+  def enqueue_remaining_take(amount:Int) = {
+    if(enqueues_remaining!=null) {
+      enqueues_remaining.addAndGet(-amount)
+    }
+  }
+  
+  def enqueue_throttle_release(throttle:LongCounter):Unit = {
+    if( enqueues_remaining==throttle ) {
+      might_unfill {
+        val amount = max_enqueue_rate / 10
+        val remaining = throttle.get
+//        if(remaining < 0) {
+//          throttle.addAndGet(amount)
+//        } else {
+          throttle.set(amount)
+//        }
+      }
+      dispatch_queue.after(100, TimeUnit.MILLISECONDS) {
+        enqueue_throttle_release(throttle)
+      }
+    }
+  }
 
-
-  def drain_acks = {
+  def drain_acks = might_unfill {
     ack_source.getData.foreach {
       case (entry, consumed, uow) =>
         consumed match {
@@ -737,12 +841,8 @@ class Queue(val router: LocalRouter, val
           uow.release()
         }
     }
-    messages.refiller.run
   }
 
-
-  
-
   /////////////////////////////////////////////////////////////////////
   //
   // Implementation of the DeliveryConsumer trait.  Allows this queue
@@ -765,13 +865,13 @@ class Queue(val router: LocalRouter, val
 
     dispatch_queue {
       inbound_sessions += this
-      addCapacity( session_max )
+      change_producer_capacity( session_max )
     }
 
     def close = {
       session_manager.close(downstream)
       dispatch_queue {
-        addCapacity( -session_max )
+        change_producer_capacity( -session_max )
         inbound_sessions -= this
       }
       release
@@ -880,8 +980,6 @@ class Queue(val router: LocalRouter, val
 
   override def connection:Option[BrokerConnection] = None
 
-  override def send_buffer_size = tune_queue_buffer
-
   /////////////////////////////////////////////////////////////////////
   //
   // Implementation methods.
@@ -899,13 +997,11 @@ class Queue(val router: LocalRouter, val
   swap_out_completes_source.setEventHandler(^ {drain_swap_out_completes});
   swap_out_completes_source.resume
 
-  def drain_swap_out_completes() = {
+  def drain_swap_out_completes() = might_unfill {
     val data = swap_out_completes_source.getData
     data.foreach { loaded =>
       loaded.swapped_out
     }
-    messages.refiller.run
-
   }
 
   val store_load_source = createSource(new ListEventAggregator[(QueueEntry#Swapped, MessageRecord)](), dispatch_queue)
@@ -961,9 +1057,8 @@ class QueueEntry(val queue:Queue, val se
   }
 
   def init(delivery:Delivery):QueueEntry = {
-    state = new Loaded(delivery, false)
-    queue.swapped_in_size += size
-    queue.swapped_in_items += 1
+    queue.producer_swapped_in += delivery
+    state = new Loaded(delivery, false, queue.producer_swapped_in)
     this
   }
 
@@ -1059,6 +1154,7 @@ class QueueEntry(val queue:Queue, val se
   def is_loaded = as_loaded!=null
   def is_swapped = as_swapped!=null
   def is_swapped_range = as_swapped_range!=null
+  def is_swapped_or_swapped_range = is_swapped || is_swapped_range
 
   // These should not change the current state.
   def count = state.count
@@ -1072,7 +1168,7 @@ class QueueEntry(val queue:Queue, val se
 
   // These methods may cause a change in the current state.
   def swap(asap:Boolean) = state.swap_out(asap)
-  def load = state.swap_in
+  def load(space:MemorySpace) = state.swap_in(space)
   def remove = state.remove
 
   def swapped_range = state.swap_range
@@ -1146,7 +1242,7 @@ class QueueEntry(val queue:Queue, val se
     /**
      * Triggers the entry to get swapped in if it's not already swapped in.
      */
-    def swap_in = {}
+    def swap_in(space:MemorySpace) = {}
 
     /**
      * Triggers the entry to get swapped out if it's not already swapped.
@@ -1207,7 +1303,7 @@ class QueueEntry(val queue:Queue, val se
     }
 
     override def remove = throw new AssertionError("Head entry cannot be removed")
-    override def swap_in = throw new AssertionError("Head entry cannot be loaded")
+    override def swap_in(space:MemorySpace) = throw new AssertionError("Head entry cannot be loaded")
     override def swap_out(asap:Boolean) = throw new AssertionError("Head entry cannot be swapped")
   }
 
@@ -1223,7 +1319,7 @@ class QueueEntry(val queue:Queue, val se
     override def as_tail:Tail = this
 
     override def remove = throw new AssertionError("Tail entry cannot be removed")
-    override def swap_in = throw new AssertionError("Tail entry cannot be loaded")
+    override def swap_in(space:MemorySpace) = throw new AssertionError("Tail entry cannot be loaded")
     override def swap_out(asap:Boolean) = throw new AssertionError("Tail entry cannot be swapped")
 
   }
@@ -1232,7 +1328,7 @@ class QueueEntry(val queue:Queue, val se
    * The entry is in this state while a message is loaded in memory.  A message must be in this state
    * before it can be dispatched to a subscription.
    */
-  class Loaded(val delivery: Delivery, var stored:Boolean) extends EntryState {
+  class Loaded(val delivery: Delivery, var stored:Boolean, var space:MemorySpace) extends EntryState {
 
     assert( delivery!=null, "delivery cannot be null")
 
@@ -1329,9 +1425,8 @@ class QueueEntry(val queue:Queue, val se
       delivery.uow = null
       if( swapping_out ) {
         swapping_out = false
+        space -= delivery
         queue.swapping_out_size-=size
-        queue.swapped_in_size -= size
-        queue.swapped_in_items -= 1
 
         queue.swap_out_size_counter += size
         queue.swap_out_item_counter += 1
@@ -1346,14 +1441,18 @@ class QueueEntry(val queue:Queue, val se
       } else {
         if( remove_pending ) {
           delivery.message.release
-          queue.swapped_in_size -= size
-          queue.swapped_in_items -= 1
+          space -= delivery
           super.remove
         }
       }
     }
 
-    override def swap_in() = {
+    override def swap_in(space:MemorySpace) = {
+      if(space ne this.space) {
+        this.space -= delivery
+        this.space = space
+        this.space += delivery
+      }
       if( swapping_out ) {
         swapping_out = false
         queue.swapping_out_size-=size
@@ -1365,8 +1464,7 @@ class QueueEntry(val queue:Queue, val se
         remove_pending = true
       } else {
         delivery.message.release
-        queue.swapped_in_size -= size
-        queue.swapped_in_items -= 1
+        space -= delivery
         super.remove
       }
     }
@@ -1488,7 +1586,7 @@ class QueueEntry(val queue:Queue, val se
 
     queue.individual_swapped_items += 1
 
-    var swapping_in = false
+    var swap_in_space:MemorySpace = _
 
     override def redeliveries = _redeliveries
     override def redelivered = _redeliveries = ((_redeliveries+1).min(Short.MaxValue)).toShort
@@ -1501,18 +1599,18 @@ class QueueEntry(val queue:Queue, val se
 
     def label = {
       var rc = "swapped"
-      if( swapping_in ) {
+      if( swap_in_space!=null ) {
         rc += "|swapping in"
       }
       rc
     }
-    override def toString = { "swapped:{ swapping_in: "+swapping_in+", size:"+size+"}" }
+    override def toString = { "swapped:{ swapping_in: "+swap_in_space+", size:"+size+"}" }
 
-    override def swap_in() = {
-      if( !swapping_in ) {
+    override def swap_in(space:MemorySpace) = {
+      if( swap_in_space==null ) {
 //        trace("Start entry load of message seq: %s", seq)
         // start swapping in...
-        swapping_in = true
+        swap_in_space = space
         queue.swapping_in_size += size
         queue.virtual_host.store.load_message(message_key, message_locator) { delivery =>
           // pass off to a source so it can aggregate multiple
@@ -1534,9 +1632,8 @@ class QueueEntry(val queue:Queue, val se
     }
 
     def swapped_in(messageRecord:MessageRecord) = {
-      if( swapping_in ) {
+      if( swap_in_space!=null ) {
 //        debug("Loaded message seq: ", seq )
-        swapping_in = false
         queue.swapping_in_size -= size
 
         val delivery = new Delivery()
@@ -1546,14 +1643,14 @@ class QueueEntry(val queue:Queue, val se
         delivery.storeLocator = messageRecord.locator
         delivery.redeliveries = redeliveries
 
-        queue.swapped_in_size += delivery.size
-        queue.swapped_in_items += 1
+        swap_in_space += delivery
 
         queue.swap_in_size_counter += size
         queue.swap_in_item_counter += 1
 
         queue.individual_swapped_items -= 1
-        state = new Loaded(delivery, true)
+        state = new Loaded(delivery, true, swap_in_space)
+        swap_in_space = null
       } else {
 //        debug("Ignoring store load of: ", messageKey)
       }
@@ -1561,8 +1658,8 @@ class QueueEntry(val queue:Queue, val se
 
 
     override def remove = {
-      if( swapping_in ) {
-        swapping_in = false
+      if( swap_in_space!=null ) {
+        swap_in_space = null
         queue.swapping_in_size -= size
       }
       queue.individual_swapped_items -= 1
@@ -1570,8 +1667,8 @@ class QueueEntry(val queue:Queue, val se
     }
 
     override def swap_range = {
-      if( swapping_in ) {
-        swapping_in = false
+      if( swap_in_space!=null ) {
+        swap_in_space = null
         queue.swapping_in_size -= size
       }
       queue.individual_swapped_items -= 1
@@ -1618,7 +1715,7 @@ class QueueEntry(val queue:Queue, val se
     }
     override def toString = { "swapped_range:{ swapping_in: "+swapping_in+", count: "+count+", size: "+size+"}" }
 
-    override def swap_in() = {
+    override def swap_in(space:MemorySpace) = {
       if( !swapping_in ) {
         swapping_in = true
         queue.virtual_host.store.list_queue_entries(queue.store_id, seq, last) { records =>
@@ -1718,12 +1815,14 @@ class Subscription(val queue:Queue, val 
   var acquired_size = 0L
   def acquired_count = acquired.size()
 
-  var total_advanced_size = 0L
-  var advanced_size = 0
-  var advanced_sizes = ListBuffer[Int]() // use circular buffer instead.
+  var enqueue_size_per_interval = 0L
+  var enqueue_size_at_last_interval = 0L
 
-  var avg_advanced_size = queue.tune_consumer_buffer
-  var tail_parkings = 1
+  var consumer_stall_ms = 0L
+  var load_stall_ms = 0L
+
+  var consumer_stall_start = 0L
+  var load_stall_start = 0L
 
   var total_ack_count = 0L
   var total_nack_count = 0L
@@ -1749,6 +1848,7 @@ class Subscription(val queue:Queue, val 
 
     session = consumer.connect(this)
     session.refiller = dispatch_queue.runnable {
+      check_consumer_stall
       if( pos!=null ) {
         pos.run
       }
@@ -1757,7 +1857,7 @@ class Subscription(val queue:Queue, val 
 
     queue.all_subscriptions += consumer -> this
     queue.consumer_counter += 1
-    queue.addCapacity( queue.tune_consumer_buffer )
+    queue.change_consumer_capacity( queue.tune_consumer_buffer )
 
     if( exclusive ) {
       queue.exclusive_subscriptions.append(this)
@@ -1778,7 +1878,7 @@ class Subscription(val queue:Queue, val 
 
       queue.exclusive_subscriptions = queue.exclusive_subscriptions.filterNot( _ == this )
       queue.all_subscriptions -= consumer
-      queue.addCapacity( - queue.tune_consumer_buffer )
+      queue.change_consumer_capacity( - queue.tune_consumer_buffer )
 
 
       // nack all the acquired entries.
@@ -1801,6 +1901,7 @@ class Subscription(val queue:Queue, val 
       consumer.release
 
       queue.check_idle
+      queue.tail_prefetch = 0
       queue.trigger_swap
     } else {}
   }
@@ -1810,18 +1911,16 @@ class Subscription(val queue:Queue, val 
    * queue entry.
    */
   def advance(value:QueueEntry):Unit = {
-
     assert(value!=null)
-
-    advanced_size += pos.size
-
     pos = value
-
+    check_load_stall
     if( tail_parked ) {
-      tail_parkings += 0
-      if( session.consumer.close_on_drain ) {
-        close
-      }
+        if(session.consumer.close_on_drain) {
+          close
+        } else {
+          var remaining = queue.tune_consumer_buffer - acquired_size;
+          queue.tail_prefetch = queue.tail_prefetch.max(remaining)
+        }
     }
   }
 
@@ -1834,6 +1933,7 @@ class Subscription(val queue:Queue, val 
     pos -= this
     value ::= this
     pos = value
+    check_load_stall
     queue.dispatch_queue << value // queue up the entry to get dispatched..
   }
 
@@ -1842,10 +1942,63 @@ class Subscription(val queue:Queue, val 
   def matches(entry:Delivery) = session.consumer.matches(entry)
   def full = session.full
 
-  def offer(delivery:Delivery) = session.offer(delivery)
+  def offer(delivery:Delivery) = try {
+    session.offer(delivery)
+  } finally {
+    check_consumer_stall
+  }
 
   def acquire(entry:QueueEntry) = new AcquiredQueueEntry(entry)
 
+  def check_load_stall = {
+    if ( pos.is_swapped_or_swapped_range ) {
+      if(load_stall_start==0) {
+        load_stall_start = queue.virtual_host.broker.now
+      }
+    } else {
+      if(load_stall_start!=0) {
+        load_stall_ms += queue.virtual_host.broker.now - load_stall_start
+        load_stall_start = 0
+      }
+    }
+  }
+
+  def check_consumer_stall = {
+    if ( full ) {
+      if(consumer_stall_start==0) {
+        consumer_stall_start = queue.virtual_host.broker.now
+      }
+    } else {
+      if( consumer_stall_start!=0 ) {
+        consumer_stall_ms += queue.virtual_host.broker.now - consumer_stall_start
+        consumer_stall_start = 0
+      }
+    }
+  }
+
+  def adjust_prefetch_size = {
+
+    enqueue_size_per_interval = session.enqueue_size_counter - enqueue_size_at_last_interval
+    enqueue_size_at_last_interval = session.enqueue_size_counter
+
+    if(consumer_stall_start !=0) {
+      val now = queue.virtual_host.broker.now
+      consumer_stall_ms += now - consumer_stall_start
+      consumer_stall_start = now
+    }
+
+    if(load_stall_start !=0) {
+      val now = queue.virtual_host.broker.now
+      load_stall_ms += now - load_stall_start
+      load_stall_start = now
+    }
+
+    val rc = (consumer_stall_ms, load_stall_ms)
+    consumer_stall_ms = 0
+    load_stall_ms = 0
+    rc
+  }
+
   def refill_prefetch = {
 
     var cursor = if( pos.is_tail ) {
@@ -1856,22 +2009,23 @@ class Subscription(val queue:Queue, val 
       pos // start prefetching from the current position.
     }
 
-    var remaining = queue.tune_consumer_buffer - acquired_size
+    var remaining = queue.tune_consumer_buffer - acquired_size; // 3/4 of the prefetch is triggers loading
     while( remaining>0 && cursor!=null ) {
       val next = cursor.getNext
-      remaining -= cursor.size
-      cursor.prefetch_flags = (cursor.prefetch_flags | PREFTCH_LOAD_FLAG).toByte
-      cursor.load
+      if( (cursor.prefetch_flags & PREFTCH_LOAD_FLAG) == 0 ) {
+        remaining -= cursor.size
+        cursor.prefetch_flags = (cursor.prefetch_flags | PREFTCH_LOAD_FLAG).toByte
+        cursor.load(queue.consumer_swapped_in)
+      }
       cursor = next
     }
-
-    remaining = avg_advanced_size
-    while( remaining>0 && cursor!=null ) {
-      remaining -= cursor.size
-      cursor.prefetch_flags = (cursor.prefetch_flags | PREFTCH_HOLD_FLAG).toByte
-      cursor = cursor.getNext
+    
+    // If we hit the tail.. credit it so that we avoid swapping too soon.
+    if( cursor == null ) {
+      queue.tail_prefetch = queue.tail_prefetch.max(((enqueue_size_per_interval/2) - remaining).max(remaining))
     }
 
+
   }
 
   class AcquiredQueueEntry(val entry:QueueEntry) extends LinkedNode[AcquiredQueueEntry] {

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/DestinationConfigurationTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/DestinationConfigurationTest.scala?rev=1211679&r1=1211678&r2=1211679&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/DestinationConfigurationTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/DestinationConfigurationTest.scala Wed Dec  7 22:27:41 2011
@@ -51,29 +51,6 @@ class DestinationConfigurationTest exten
 
     val router = broker.default_virtual_host.router.asInstanceOf[LocalRouter]
 
-    def check_tune_queue_buffer(expected:Int)(dto:DestinationDTO) = {
-      var actual=0
-      reset {
-        var q = router.get_or_create_destination(dto, null).success.asInstanceOf[Queue]
-        actual = q.tune_queue_buffer
-      }
-      expect(expected) {actual}
-    }
-
-    check_tune_queue_buffer(333) {
-      var p = new QueueDestinationDTO()
-      p.path.add("unified")
-      p.path.add("a")
-      p
-    }
-
-    check_tune_queue_buffer(111) {
-      var p = new QueueDestinationDTO()
-      p.path.add("notunified")
-      p.path.add("other")
-      p
-    }
-
     ServiceControl.stop(broker, "broker")
   }
 

Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java?rev=1211679&r1=1211678&r2=1211679&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java Wed Dec  7 22:27:41 2011
@@ -50,16 +50,10 @@ public class QueueDTO extends StringIdDT
     public Boolean unified;
 
     /**
-     *  The amount of memory buffer space for the queue..
-     */
-    @XmlAttribute(name="queue_buffer")
-    public Integer queue_buffer;
-
-    /**
      *  The amount of memory buffer space to use per consumer.
      */
     @XmlAttribute(name="consumer_buffer")
-    public Integer consumer_buffer;
+    public String consumer_buffer;
 
     /**
      * Should this queue persistently store it's entries?
@@ -92,6 +86,27 @@ public class QueueDTO extends StringIdDT
     public String quota;
 
     /**
+     *  The message delivery rate (in bytes/sec) at which
+     *  the queue enables a producer rate throttle
+     *  to allow consumers to catchup with producers.
+     */
+    @XmlAttribute(name="catchup_delivery_rate")
+    public String catchup_delivery_rate;
+
+    /**
+     *  The rate at which to throttle enqueues when
+     *  consumers are catching up.
+     */
+    @XmlAttribute(name="catchup_enqueue_rate")
+    public String catchup_enqueue_rate;
+
+    /**
+     *  The maximum enqueue rate of the queue
+     */
+    @XmlAttribute(name="max_enqueue_rate")
+    public String max_enqueue_rate;
+
+    /**
      * To hold any other non-matching XML elements
      */
     @XmlAnyElement(lax=true)
@@ -107,12 +122,17 @@ public class QueueDTO extends StringIdDT
 
         if (auto_delete_after != null ? !auto_delete_after.equals(queueDTO.auto_delete_after) : queueDTO.auto_delete_after != null)
             return false;
+        if (catchup_delivery_rate != null ? !catchup_delivery_rate.equals(queueDTO.catchup_delivery_rate) : queueDTO.catchup_delivery_rate != null)
+            return false;
+        if (catchup_enqueue_rate != null ? !catchup_enqueue_rate.equals(queueDTO.catchup_enqueue_rate) : queueDTO.catchup_enqueue_rate != null)
+            return false;
         if (consumer_buffer != null ? !consumer_buffer.equals(queueDTO.consumer_buffer) : queueDTO.consumer_buffer != null)
             return false;
+        if (max_enqueue_rate != null ? !max_enqueue_rate.equals(queueDTO.max_enqueue_rate) : queueDTO.max_enqueue_rate != null)
+            return false;
         if (other != null ? !other.equals(queueDTO.other) : queueDTO.other != null) return false;
         if (persistent != null ? !persistent.equals(queueDTO.persistent) : queueDTO.persistent != null) return false;
-        if (queue_buffer != null ? !queue_buffer.equals(queueDTO.queue_buffer) : queueDTO.queue_buffer != null)
-            return false;
+        if (quota != null ? !quota.equals(queueDTO.quota) : queueDTO.quota != null) return false;
         if (swap != null ? !swap.equals(queueDTO.swap) : queueDTO.swap != null) return false;
         if (swap_range_size != null ? !swap_range_size.equals(queueDTO.swap_range_size) : queueDTO.swap_range_size != null)
             return false;
@@ -126,11 +146,14 @@ public class QueueDTO extends StringIdDT
         int result = super.hashCode();
         result = 31 * result + (auto_delete_after != null ? auto_delete_after.hashCode() : 0);
         result = 31 * result + (unified != null ? unified.hashCode() : 0);
-        result = 31 * result + (queue_buffer != null ? queue_buffer.hashCode() : 0);
         result = 31 * result + (consumer_buffer != null ? consumer_buffer.hashCode() : 0);
         result = 31 * result + (persistent != null ? persistent.hashCode() : 0);
         result = 31 * result + (swap != null ? swap.hashCode() : 0);
         result = 31 * result + (swap_range_size != null ? swap_range_size.hashCode() : 0);
+        result = 31 * result + (quota != null ? quota.hashCode() : 0);
+        result = 31 * result + (catchup_delivery_rate != null ? catchup_delivery_rate.hashCode() : 0);
+        result = 31 * result + (catchup_enqueue_rate != null ? catchup_enqueue_rate.hashCode() : 0);
+        result = 31 * result + (max_enqueue_rate != null ? max_enqueue_rate.hashCode() : 0);
         result = 31 * result + (other != null ? other.hashCode() : 0);
         return result;
     }

Modified: activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md?rev=1211679&r1=1211678&r2=1211679&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md (original)
+++ activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md Wed Dec  7 22:27:41 2011
@@ -245,9 +245,6 @@ A `queue` element may be configured with
   [Unified Destinations](#Unified_Destinations) documentation for more 
   details.  Defaults to false.
 
-* `queue_buffer` : The amount of memory buffer space allocated for each queue.
-   Defaults to 32k.
-
 * `consumer_buffer` : The amount of memory buffer space allocated to each
 subscription for receiving messages.  Defaults to 256k.
 
@@ -271,6 +268,20 @@ memory.  Defaults to true.
 * `auto_delete_after`: If not set to `0` then the queue will automatically
   delete once there have been no consumers, producers or messages on it
   for the configured number of seconds.  Defaults to 300 if not set.
+  
+* `max_enqueue_rate`: The maximum enqueue rate of the queue.  Producers
+  will be flow controlled once this enqueue rate is reached.  If not set
+  then it is disabled
+
+* `catchup_delivery_rate`: The message delivery rate (in bytes/sec) at which
+  the queue enables a producer rate throttle to allow consumers to catchup 
+  with producers.  The the consumers must also be stalling on message
+  loading longer than they stall to deliver.  If not set then the
+  feature is disabled.
+  
+* `catchup_enqueue_rate`:  The rate at which to throttle enqueues when
+  consumers are catching up.  Defaults to the rate configured in
+  `catchup_delivery_rate` 
 
 ##### Topics