You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/12/07 23:27:42 UTC
svn commit: r1211679 - in /activemq/activemq-apollo/trunk:
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/
apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/
apollo-dto/src/main/java/org/apache/activemq/apollo/dto/ apollo-w...
Author: chirino
Date: Wed Dec 7 22:27:41 2011
New Revision: 1211679
URL: http://svn.apache.org/viewvc?rev=1211679&view=rev
Log:
Fixes APLO-96 : Topic queue on a durable subscription throttles producer
Also added support to throttle the maximum enqueue rate of a queue and an option to only apply a throttle when you have fast consumers which being slowed down due to message persistence.
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/DestinationConfigurationTest.scala
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java
activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1211679&r1=1211678&r2=1211679&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Wed Dec 7 22:27:41 2011
@@ -38,6 +38,23 @@ object Queue extends Log {
val PREFTCH_LOAD_FLAG = 1.toByte
val PREFTCH_HOLD_FLAG = 2.toByte
+
+ class MemorySpace {
+ var items = 0
+ var size = 0
+ var size_max = 0
+
+ def +=(delivery:Delivery) = {
+ items += 1
+ size += delivery.size
+ }
+
+ def -=(delivery:Delivery) = {
+ items -= 1
+ size -= delivery.size
+ }
+ }
+
}
import Queue._
@@ -96,11 +113,6 @@ class Queue(val router: LocalRouter, val
// In-frequently accessed tuning configuration.
//
- /**
- * The amount of memory buffer space for the queue..
- */
- def tune_queue_buffer = config.queue_buffer.getOrElse(32*1024)
-
//
// Frequently accessed tuning configuration.
//
@@ -133,13 +145,34 @@ class Queue(val router: LocalRouter, val
* The max memory to allow this queue to grow to.
*/
var tune_quota = -1L
+
+ /**
+ * The message delivery rate (in bytes/sec) at which
+ * the queue enables a enqueue rate throttle
+ * to allow consumers to catchup with producers.
+ */
+ var tune_catchup_delivery_rate = 0
+
+ /**
+ * The rate at which to throttle producers when
+ * consumers are catching up.
+ */
+ var tune_catchup_enqueue_rate = 0
+
+ /**
+ * Tthe rate at which producers are throttled at.
+ */
+ var tune_max_enqueue_rate = 0
def configure(c:QueueDTO) = {
config = c
tune_persistent = virtual_host.store !=null && config.persistent.getOrElse(true)
tune_swap = tune_persistent && config.swap.getOrElse(true)
tune_swap_range_size = config.swap_range_size.getOrElse(10000)
- tune_consumer_buffer = config.consumer_buffer.getOrElse(256*1024)
+ tune_consumer_buffer = Option(config.consumer_buffer).map(MemoryPropertyEditor.parse(_).toInt).getOrElse(256*1024)
+ tune_catchup_delivery_rate = Option(config.catchup_delivery_rate).map(MemoryPropertyEditor.parse(_).toInt).getOrElse(-1)
+ tune_catchup_enqueue_rate = Option(config.catchup_enqueue_rate).map(MemoryPropertyEditor.parse(_).toInt).getOrElse(tune_catchup_delivery_rate)
+ tune_max_enqueue_rate = Option(config.max_enqueue_rate).map(MemoryPropertyEditor.parse(_).toInt).getOrElse(-1)
tune_quota = Option(config.quota).map(MemoryPropertyEditor.parse(_)).getOrElse(-1)
@@ -185,10 +218,8 @@ class Queue(val router: LocalRouter, val
var swapping_in_size = 0
var swapping_out_size = 0
- var swapped_in_items = 0
- var swapped_in_size = 0
-
- var swapped_in_size_max = 0
+ val producer_swapped_in = new MemorySpace
+ val consumer_swapped_in = new MemorySpace
var swap_out_item_counter = 0L
var swap_out_size_counter = 0L
@@ -198,6 +229,7 @@ class Queue(val router: LocalRouter, val
var producer_counter = 0L
var consumer_counter = 0L
+ var tail_prefetch = 0L
var individual_swapped_items = 0
@@ -210,6 +242,10 @@ class Queue(val router: LocalRouter, val
var auto_delete_after = 0
var idled_at = 0L
+ def swapped_in_items = this.producer_swapped_in.items + this.consumer_swapped_in.items
+ def swapped_in_size = this.producer_swapped_in.size + this.consumer_swapped_in.size
+ def swapped_in_size_max = this.producer_swapped_in.size_max + this.consumer_swapped_in.size_max
+
def get_queue_metrics:DestMetricsDTO = {
dispatch_queue.assertExecuting()
val rc = new DestMetricsDTO
@@ -241,10 +277,9 @@ class Queue(val router: LocalRouter, val
rc.swapping_in_size = this.swapping_in_size
rc.swapping_out_size = this.swapping_out_size
- rc.swapped_in_items = this.swapped_in_items
- rc.swapped_in_size = this.swapped_in_size
-
- rc.swapped_in_size_max = this.swapped_in_size_max
+ rc.swapped_in_items = swapped_in_items
+ rc.swapped_in_size = swapped_in_size
+ rc.swapped_in_size_max = swapped_in_size_max
rc.producer_counter = this.producer_counter
rc.consumer_counter = this.consumer_counter
@@ -262,7 +297,7 @@ class Queue(val router: LocalRouter, val
rc.binding = this.binding.binding_dto
rc.config = this.config
rc.metrics = this.get_queue_metrics
- rc.metrics.current_time = System.currentTimeMillis()
+ rc.metrics.current_time = now
if( entries ) {
var cur = this.head_entry
@@ -340,7 +375,6 @@ class Queue(val router: LocalRouter, val
val prev_persistent = tune_persistent
val prev_consumer_size = tune_consumer_buffer
- val prev_queue_buffer = tune_queue_buffer
configure(binding.config(virtual_host))
@@ -351,13 +385,11 @@ class Queue(val router: LocalRouter, val
// open session
if( sub.session!=null ) {
// change the queue capacity, by the change in consumer buffer change.
- addCapacity(consumer_buffer_change)
+ change_consumer_capacity(consumer_buffer_change)
}
}
}
- swapped_in_size_max += (tune_queue_buffer-prev_queue_buffer)
-
restore_from_store {
check_idle
trigger_swap
@@ -368,7 +400,6 @@ class Queue(val router: LocalRouter, val
def check_idle {
if (producers.isEmpty && all_subscriptions.isEmpty && queue_items==0 ) {
if (idled_at==0) {
- now = System.currentTimeMillis()
idled_at = now
if( auto_delete_after!=0 ) {
dispatch_queue.after(auto_delete_after, TimeUnit.SECONDS) {
@@ -413,8 +444,6 @@ class Queue(val router: LocalRouter, val
}
protected def _start(on_completed: Runnable) = {
- swapped_in_size_max += tune_queue_buffer;
-
restore_from_store {
@@ -444,7 +473,6 @@ class Queue(val router: LocalRouter, val
sub.close()
}
- swapped_in_size_max -= tune_queue_buffer;
trigger_swap
destination_dto match {
@@ -458,19 +486,29 @@ class Queue(val router: LocalRouter, val
on_completed.run
}
- def addCapacity(amount:Int) = {
+ def might_unfill[T](func: =>T):T = {
val was_full = messages.full
- swapped_in_size_max += amount
- if( was_full && !messages.full ) {
- messages.refiller.run
+ try {
+ func
+ } finally {
+ if( was_full && !messages.full ) {
+ messages.refiller.run
+ }
}
}
+ def change_producer_capacity(amount:Int) = might_unfill {
+ producer_swapped_in.size_max += amount
+ }
+ def change_consumer_capacity(amount:Int) = might_unfill {
+ consumer_swapped_in.size_max += amount
+ }
+
object messages extends Sink[Delivery] {
var refiller: Runnable = null
- def full = (swapped_in_size >= swapped_in_size_max) || !service_state.is_started || (tune_quota >=0 && queue_size > tune_quota)
+ def full = (producer_swapped_in.size >= producer_swapped_in.size_max) || is_enqueue_throttled || !service_state.is_started || (tune_quota >=0 && queue_size > tune_quota)
def offer(delivery: Delivery): Boolean = {
if (full) {
@@ -499,24 +537,36 @@ class Queue(val router: LocalRouter, val
enqueue_size_counter += entry.size
enqueue_ts = now;
+ // To decrease the enqueue throttle.
+ enqueue_remaining_take(entry.size)
// Do we need to do a persistent enqueue???
if (queueDelivery.uow != null) {
entry.as_loaded.store
}
- var dispatched = false
if( entry.hasSubs ) {
// try to dispatch it directly...
entry.dispatch
}
- val prev = entry.getPrevious
-
- if( (prev.as_loaded!=null && prev.as_loaded.swapping_out ) || (prev.as_swapped!=null && !prev.as_swapped.swapping_in) ) {
- entry.swap(!entry.as_loaded.acquired)
+ if( entry.as_loaded.acquired) {
+ // Enqueued message aquired.
+ } else if( tail_prefetch > 0 ) {
+ // Enqueued message prefeteched.
+ tail_prefetch -= entry.size
+ entry.prefetch_flags = PREFTCH_LOAD_FLAG
} else {
- trigger_swap
+// val prev = entry.getPrevious
+// if( (prev.as_loaded!=null && prev.as_loaded.swapping_out) || (prev.as_swapped!=null && !prev.as_swapped.swapping_in) ) {
+// // Swap it out ASAP
+// entry.swap(true)
+// println("Enqueued message swapped.")
+// } else {
+// trigger_swap
+// // Avoid swapping right away..
+// }
+ entry.swap(true)
}
// release the store batch...
@@ -538,10 +588,11 @@ class Queue(val router: LocalRouter, val
def expired(entry:QueueEntry, dequeue:Boolean=true):Unit = {
if(dequeue) {
- dequeue_item_counter += 1
- dequeue_size_counter += entry.size
- dequeue_ts = now
- messages.refiller.run
+ might_unfill {
+ dequeue_item_counter += 1
+ dequeue_size_counter += entry.size
+ dequeue_ts = now
+ }
}
expired_ts = now
@@ -592,7 +643,6 @@ class Queue(val router: LocalRouter, val
def swap_messages:Unit = {
dispatch_queue.assertExecuting()
- now = System.currentTimeMillis()
if( !service_state.is_started )
return
@@ -609,7 +659,7 @@ class Queue(val router: LocalRouter, val
cur.state match {
case x:QueueEntry#SwappedRange =>
// load the range to expire the messages in it.
- cur.load
+ cur.load(null)
case x:QueueEntry#Swapped =>
// remove the expired swapped message.
expired(cur)
@@ -628,7 +678,9 @@ class Queue(val router: LocalRouter, val
}
// Set the prefetch flags
- all_subscriptions.valuesIterator.foreach( _.refill_prefetch )
+ all_subscriptions.valuesIterator.foreach{ x=>
+ x.refill_prefetch
+ }
// swap out messages.
cur = entries.getHead
@@ -640,11 +692,11 @@ class Queue(val router: LocalRouter, val
val asap = !cur.as_loaded.acquired
cur.swap(asap)
} else {
- cur.load // just in case it's getting swapped.
+ cur.load(consumer_swapped_in)
}
}
cur = next
- }
+ }
// Combine swapped items into swapped ranges
@@ -681,41 +733,93 @@ class Queue(val router: LocalRouter, val
}
debug("combined %d entries", combine_counter)
}
+
+ if(!messages.full) {
+ messages.refiller.run
+ }
}
+ var delivery_rate = 0L
+ def swapped_out_size = queue_size - (producer_swapped_in.size + consumer_swapped_in.size)
+
def schedule_periodic_maintenance:Unit = dispatch_queue.after(1, TimeUnit.SECONDS) {
if( service_state.is_started ) {
- now = System.currentTimeMillis
-
- // target tune_min_subscription_rate / sec
- all_subscriptions.foreach{ case (consumer, sub)=>
+ var elapsed = System.currentTimeMillis-now
+ now += elapsed
- if ( sub.tail_parkings < 0 ) {
+ delivery_rate = 0L
- // re-calc the avg_advanced_size
- sub.advanced_sizes += sub.advanced_size
- while( sub.advanced_sizes.size > 5 ) {
- sub.advanced_sizes = sub.advanced_sizes.drop(1)
- }
- sub.avg_advanced_size = sub.advanced_sizes.foldLeft(0)(_ + _) / sub.advanced_sizes.size
+ var consumer_stall_ms = 0L
+ var load_stall_ms = 0L
+ all_subscriptions.values.foreach{ sub=>
+ val (cs, ls) = sub.adjust_prefetch_size
+ consumer_stall_ms += cs
+ load_stall_ms += ls
+ if(!sub.browser) {
+ delivery_rate += sub.enqueue_size_per_interval
}
+ }
+
+ val rate_adjustment = elapsed.toFloat / 1000.toFloat
+ delivery_rate = (delivery_rate / rate_adjustment).toLong
- sub.total_advanced_size += sub.advanced_size
- sub.advanced_size = 0
- sub.tail_parkings = 0
+ val stall_ratio = ((consumer_stall_ms*100)+1).toFloat / ((load_stall_ms*100)+1).toFloat
+ // Figure out what the max enqueue rate should be.
+ max_enqueue_rate = Int.MaxValue
+ if( tune_catchup_delivery_rate>=0 && tune_catchup_enqueue_rate>=0 && delivery_rate>tune_catchup_delivery_rate && swapped_out_size > 0 && stall_ratio < 1.0 ) {
+ max_enqueue_rate = tune_catchup_enqueue_rate
+ }
+ if(tune_max_enqueue_rate >=0 ) {
+ max_enqueue_rate = max_enqueue_rate.min(tune_max_enqueue_rate)
+ }
+ if( max_enqueue_rate < Int.MaxValue ) {
+ if(enqueues_remaining==null) {
+ enqueues_remaining = new LongCounter()
+ enqueue_throttle_release(enqueues_remaining)
+ }
+ } else {
+ if(enqueues_remaining!=null) {
+ enqueues_remaining = null
+ }
}
swap_messages
schedule_periodic_maintenance
}
}
+
+ var max_enqueue_rate = Int.MaxValue
+ var enqueues_remaining:LongCounter = _
+
+ def is_enqueue_throttled = enqueues_remaining!=null && enqueues_remaining.get() <= 0
+ def enqueue_remaining_take(amount:Int) = {
+ if(enqueues_remaining!=null) {
+ enqueues_remaining.addAndGet(-amount)
+ }
+ }
+
+ def enqueue_throttle_release(throttle:LongCounter):Unit = {
+ if( enqueues_remaining==throttle ) {
+ might_unfill {
+ val amount = max_enqueue_rate / 10
+ val remaining = throttle.get
+// if(remaining < 0) {
+// throttle.addAndGet(amount)
+// } else {
+ throttle.set(amount)
+// }
+ }
+ dispatch_queue.after(100, TimeUnit.MILLISECONDS) {
+ enqueue_throttle_release(throttle)
+ }
+ }
+ }
-
- def drain_acks = {
+ def drain_acks = might_unfill {
ack_source.getData.foreach {
case (entry, consumed, uow) =>
consumed match {
@@ -737,12 +841,8 @@ class Queue(val router: LocalRouter, val
uow.release()
}
}
- messages.refiller.run
}
-
-
-
/////////////////////////////////////////////////////////////////////
//
// Implementation of the DeliveryConsumer trait. Allows this queue
@@ -765,13 +865,13 @@ class Queue(val router: LocalRouter, val
dispatch_queue {
inbound_sessions += this
- addCapacity( session_max )
+ change_producer_capacity( session_max )
}
def close = {
session_manager.close(downstream)
dispatch_queue {
- addCapacity( -session_max )
+ change_producer_capacity( -session_max )
inbound_sessions -= this
}
release
@@ -880,8 +980,6 @@ class Queue(val router: LocalRouter, val
override def connection:Option[BrokerConnection] = None
- override def send_buffer_size = tune_queue_buffer
-
/////////////////////////////////////////////////////////////////////
//
// Implementation methods.
@@ -899,13 +997,11 @@ class Queue(val router: LocalRouter, val
swap_out_completes_source.setEventHandler(^ {drain_swap_out_completes});
swap_out_completes_source.resume
- def drain_swap_out_completes() = {
+ def drain_swap_out_completes() = might_unfill {
val data = swap_out_completes_source.getData
data.foreach { loaded =>
loaded.swapped_out
}
- messages.refiller.run
-
}
val store_load_source = createSource(new ListEventAggregator[(QueueEntry#Swapped, MessageRecord)](), dispatch_queue)
@@ -961,9 +1057,8 @@ class QueueEntry(val queue:Queue, val se
}
def init(delivery:Delivery):QueueEntry = {
- state = new Loaded(delivery, false)
- queue.swapped_in_size += size
- queue.swapped_in_items += 1
+ queue.producer_swapped_in += delivery
+ state = new Loaded(delivery, false, queue.producer_swapped_in)
this
}
@@ -1059,6 +1154,7 @@ class QueueEntry(val queue:Queue, val se
def is_loaded = as_loaded!=null
def is_swapped = as_swapped!=null
def is_swapped_range = as_swapped_range!=null
+ def is_swapped_or_swapped_range = is_swapped || is_swapped_range
// These should not change the current state.
def count = state.count
@@ -1072,7 +1168,7 @@ class QueueEntry(val queue:Queue, val se
// These methods may cause a change in the current state.
def swap(asap:Boolean) = state.swap_out(asap)
- def load = state.swap_in
+ def load(space:MemorySpace) = state.swap_in(space)
def remove = state.remove
def swapped_range = state.swap_range
@@ -1146,7 +1242,7 @@ class QueueEntry(val queue:Queue, val se
/**
* Triggers the entry to get swapped in if it's not already swapped in.
*/
- def swap_in = {}
+ def swap_in(space:MemorySpace) = {}
/**
* Triggers the entry to get swapped out if it's not already swapped.
@@ -1207,7 +1303,7 @@ class QueueEntry(val queue:Queue, val se
}
override def remove = throw new AssertionError("Head entry cannot be removed")
- override def swap_in = throw new AssertionError("Head entry cannot be loaded")
+ override def swap_in(space:MemorySpace) = throw new AssertionError("Head entry cannot be loaded")
override def swap_out(asap:Boolean) = throw new AssertionError("Head entry cannot be swapped")
}
@@ -1223,7 +1319,7 @@ class QueueEntry(val queue:Queue, val se
override def as_tail:Tail = this
override def remove = throw new AssertionError("Tail entry cannot be removed")
- override def swap_in = throw new AssertionError("Tail entry cannot be loaded")
+ override def swap_in(space:MemorySpace) = throw new AssertionError("Tail entry cannot be loaded")
override def swap_out(asap:Boolean) = throw new AssertionError("Tail entry cannot be swapped")
}
@@ -1232,7 +1328,7 @@ class QueueEntry(val queue:Queue, val se
* The entry is in this state while a message is loaded in memory. A message must be in this state
* before it can be dispatched to a subscription.
*/
- class Loaded(val delivery: Delivery, var stored:Boolean) extends EntryState {
+ class Loaded(val delivery: Delivery, var stored:Boolean, var space:MemorySpace) extends EntryState {
assert( delivery!=null, "delivery cannot be null")
@@ -1329,9 +1425,8 @@ class QueueEntry(val queue:Queue, val se
delivery.uow = null
if( swapping_out ) {
swapping_out = false
+ space -= delivery
queue.swapping_out_size-=size
- queue.swapped_in_size -= size
- queue.swapped_in_items -= 1
queue.swap_out_size_counter += size
queue.swap_out_item_counter += 1
@@ -1346,14 +1441,18 @@ class QueueEntry(val queue:Queue, val se
} else {
if( remove_pending ) {
delivery.message.release
- queue.swapped_in_size -= size
- queue.swapped_in_items -= 1
+ space -= delivery
super.remove
}
}
}
- override def swap_in() = {
+ override def swap_in(space:MemorySpace) = {
+ if(space ne this.space) {
+ this.space -= delivery
+ this.space = space
+ this.space += delivery
+ }
if( swapping_out ) {
swapping_out = false
queue.swapping_out_size-=size
@@ -1365,8 +1464,7 @@ class QueueEntry(val queue:Queue, val se
remove_pending = true
} else {
delivery.message.release
- queue.swapped_in_size -= size
- queue.swapped_in_items -= 1
+ space -= delivery
super.remove
}
}
@@ -1488,7 +1586,7 @@ class QueueEntry(val queue:Queue, val se
queue.individual_swapped_items += 1
- var swapping_in = false
+ var swap_in_space:MemorySpace = _
override def redeliveries = _redeliveries
override def redelivered = _redeliveries = ((_redeliveries+1).min(Short.MaxValue)).toShort
@@ -1501,18 +1599,18 @@ class QueueEntry(val queue:Queue, val se
def label = {
var rc = "swapped"
- if( swapping_in ) {
+ if( swap_in_space!=null ) {
rc += "|swapping in"
}
rc
}
- override def toString = { "swapped:{ swapping_in: "+swapping_in+", size:"+size+"}" }
+ override def toString = { "swapped:{ swapping_in: "+swap_in_space+", size:"+size+"}" }
- override def swap_in() = {
- if( !swapping_in ) {
+ override def swap_in(space:MemorySpace) = {
+ if( swap_in_space==null ) {
// trace("Start entry load of message seq: %s", seq)
// start swapping in...
- swapping_in = true
+ swap_in_space = space
queue.swapping_in_size += size
queue.virtual_host.store.load_message(message_key, message_locator) { delivery =>
// pass off to a source so it can aggregate multiple
@@ -1534,9 +1632,8 @@ class QueueEntry(val queue:Queue, val se
}
def swapped_in(messageRecord:MessageRecord) = {
- if( swapping_in ) {
+ if( swap_in_space!=null ) {
// debug("Loaded message seq: ", seq )
- swapping_in = false
queue.swapping_in_size -= size
val delivery = new Delivery()
@@ -1546,14 +1643,14 @@ class QueueEntry(val queue:Queue, val se
delivery.storeLocator = messageRecord.locator
delivery.redeliveries = redeliveries
- queue.swapped_in_size += delivery.size
- queue.swapped_in_items += 1
+ swap_in_space += delivery
queue.swap_in_size_counter += size
queue.swap_in_item_counter += 1
queue.individual_swapped_items -= 1
- state = new Loaded(delivery, true)
+ state = new Loaded(delivery, true, swap_in_space)
+ swap_in_space = null
} else {
// debug("Ignoring store load of: ", messageKey)
}
@@ -1561,8 +1658,8 @@ class QueueEntry(val queue:Queue, val se
override def remove = {
- if( swapping_in ) {
- swapping_in = false
+ if( swap_in_space!=null ) {
+ swap_in_space = null
queue.swapping_in_size -= size
}
queue.individual_swapped_items -= 1
@@ -1570,8 +1667,8 @@ class QueueEntry(val queue:Queue, val se
}
override def swap_range = {
- if( swapping_in ) {
- swapping_in = false
+ if( swap_in_space!=null ) {
+ swap_in_space = null
queue.swapping_in_size -= size
}
queue.individual_swapped_items -= 1
@@ -1618,7 +1715,7 @@ class QueueEntry(val queue:Queue, val se
}
override def toString = { "swapped_range:{ swapping_in: "+swapping_in+", count: "+count+", size: "+size+"}" }
- override def swap_in() = {
+ override def swap_in(space:MemorySpace) = {
if( !swapping_in ) {
swapping_in = true
queue.virtual_host.store.list_queue_entries(queue.store_id, seq, last) { records =>
@@ -1718,12 +1815,14 @@ class Subscription(val queue:Queue, val
var acquired_size = 0L
def acquired_count = acquired.size()
- var total_advanced_size = 0L
- var advanced_size = 0
- var advanced_sizes = ListBuffer[Int]() // use circular buffer instead.
+ var enqueue_size_per_interval = 0L
+ var enqueue_size_at_last_interval = 0L
- var avg_advanced_size = queue.tune_consumer_buffer
- var tail_parkings = 1
+ var consumer_stall_ms = 0L
+ var load_stall_ms = 0L
+
+ var consumer_stall_start = 0L
+ var load_stall_start = 0L
var total_ack_count = 0L
var total_nack_count = 0L
@@ -1749,6 +1848,7 @@ class Subscription(val queue:Queue, val
session = consumer.connect(this)
session.refiller = dispatch_queue.runnable {
+ check_consumer_stall
if( pos!=null ) {
pos.run
}
@@ -1757,7 +1857,7 @@ class Subscription(val queue:Queue, val
queue.all_subscriptions += consumer -> this
queue.consumer_counter += 1
- queue.addCapacity( queue.tune_consumer_buffer )
+ queue.change_consumer_capacity( queue.tune_consumer_buffer )
if( exclusive ) {
queue.exclusive_subscriptions.append(this)
@@ -1778,7 +1878,7 @@ class Subscription(val queue:Queue, val
queue.exclusive_subscriptions = queue.exclusive_subscriptions.filterNot( _ == this )
queue.all_subscriptions -= consumer
- queue.addCapacity( - queue.tune_consumer_buffer )
+ queue.change_consumer_capacity( - queue.tune_consumer_buffer )
// nack all the acquired entries.
@@ -1801,6 +1901,7 @@ class Subscription(val queue:Queue, val
consumer.release
queue.check_idle
+ queue.tail_prefetch = 0
queue.trigger_swap
} else {}
}
@@ -1810,18 +1911,16 @@ class Subscription(val queue:Queue, val
* queue entry.
*/
def advance(value:QueueEntry):Unit = {
-
assert(value!=null)
-
- advanced_size += pos.size
-
pos = value
-
+ check_load_stall
if( tail_parked ) {
- tail_parkings += 0
- if( session.consumer.close_on_drain ) {
- close
- }
+ if(session.consumer.close_on_drain) {
+ close
+ } else {
+ var remaining = queue.tune_consumer_buffer - acquired_size;
+ queue.tail_prefetch = queue.tail_prefetch.max(remaining)
+ }
}
}
@@ -1834,6 +1933,7 @@ class Subscription(val queue:Queue, val
pos -= this
value ::= this
pos = value
+ check_load_stall
queue.dispatch_queue << value // queue up the entry to get dispatched..
}
@@ -1842,10 +1942,63 @@ class Subscription(val queue:Queue, val
def matches(entry:Delivery) = session.consumer.matches(entry)
def full = session.full
- def offer(delivery:Delivery) = session.offer(delivery)
+ def offer(delivery:Delivery) = try {
+ session.offer(delivery)
+ } finally {
+ check_consumer_stall
+ }
def acquire(entry:QueueEntry) = new AcquiredQueueEntry(entry)
+ def check_load_stall = {
+ if ( pos.is_swapped_or_swapped_range ) {
+ if(load_stall_start==0) {
+ load_stall_start = queue.virtual_host.broker.now
+ }
+ } else {
+ if(load_stall_start!=0) {
+ load_stall_ms += queue.virtual_host.broker.now - load_stall_start
+ load_stall_start = 0
+ }
+ }
+ }
+
+ def check_consumer_stall = {
+ if ( full ) {
+ if(consumer_stall_start==0) {
+ consumer_stall_start = queue.virtual_host.broker.now
+ }
+ } else {
+ if( consumer_stall_start!=0 ) {
+ consumer_stall_ms += queue.virtual_host.broker.now - consumer_stall_start
+ consumer_stall_start = 0
+ }
+ }
+ }
+
+ def adjust_prefetch_size = {
+
+ enqueue_size_per_interval = session.enqueue_size_counter - enqueue_size_at_last_interval
+ enqueue_size_at_last_interval = session.enqueue_size_counter
+
+ if(consumer_stall_start !=0) {
+ val now = queue.virtual_host.broker.now
+ consumer_stall_ms += now - consumer_stall_start
+ consumer_stall_start = now
+ }
+
+ if(load_stall_start !=0) {
+ val now = queue.virtual_host.broker.now
+ load_stall_ms += now - load_stall_start
+ load_stall_start = now
+ }
+
+ val rc = (consumer_stall_ms, load_stall_ms)
+ consumer_stall_ms = 0
+ load_stall_ms = 0
+ rc
+ }
+
def refill_prefetch = {
var cursor = if( pos.is_tail ) {
@@ -1856,22 +2009,23 @@ class Subscription(val queue:Queue, val
pos // start prefetching from the current position.
}
- var remaining = queue.tune_consumer_buffer - acquired_size
+ var remaining = queue.tune_consumer_buffer - acquired_size; // 3/4 of the prefetch is triggers loading
while( remaining>0 && cursor!=null ) {
val next = cursor.getNext
- remaining -= cursor.size
- cursor.prefetch_flags = (cursor.prefetch_flags | PREFTCH_LOAD_FLAG).toByte
- cursor.load
+ if( (cursor.prefetch_flags & PREFTCH_LOAD_FLAG) == 0 ) {
+ remaining -= cursor.size
+ cursor.prefetch_flags = (cursor.prefetch_flags | PREFTCH_LOAD_FLAG).toByte
+ cursor.load(queue.consumer_swapped_in)
+ }
cursor = next
}
-
- remaining = avg_advanced_size
- while( remaining>0 && cursor!=null ) {
- remaining -= cursor.size
- cursor.prefetch_flags = (cursor.prefetch_flags | PREFTCH_HOLD_FLAG).toByte
- cursor = cursor.getNext
+
+ // If we hit the tail.. credit it so that we avoid swapping too soon.
+ if( cursor == null ) {
+ queue.tail_prefetch = queue.tail_prefetch.max(((enqueue_size_per_interval/2) - remaining).max(remaining))
}
+
}
class AcquiredQueueEntry(val entry:QueueEntry) extends LinkedNode[AcquiredQueueEntry] {
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/DestinationConfigurationTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/DestinationConfigurationTest.scala?rev=1211679&r1=1211678&r2=1211679&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/DestinationConfigurationTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/DestinationConfigurationTest.scala Wed Dec 7 22:27:41 2011
@@ -51,29 +51,6 @@ class DestinationConfigurationTest exten
val router = broker.default_virtual_host.router.asInstanceOf[LocalRouter]
- def check_tune_queue_buffer(expected:Int)(dto:DestinationDTO) = {
- var actual=0
- reset {
- var q = router.get_or_create_destination(dto, null).success.asInstanceOf[Queue]
- actual = q.tune_queue_buffer
- }
- expect(expected) {actual}
- }
-
- check_tune_queue_buffer(333) {
- var p = new QueueDestinationDTO()
- p.path.add("unified")
- p.path.add("a")
- p
- }
-
- check_tune_queue_buffer(111) {
- var p = new QueueDestinationDTO()
- p.path.add("notunified")
- p.path.add("other")
- p
- }
-
ServiceControl.stop(broker, "broker")
}
Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java?rev=1211679&r1=1211678&r2=1211679&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java Wed Dec 7 22:27:41 2011
@@ -50,16 +50,10 @@ public class QueueDTO extends StringIdDT
public Boolean unified;
/**
- * The amount of memory buffer space for the queue..
- */
- @XmlAttribute(name="queue_buffer")
- public Integer queue_buffer;
-
- /**
* The amount of memory buffer space to use per consumer.
*/
@XmlAttribute(name="consumer_buffer")
- public Integer consumer_buffer;
+ public String consumer_buffer;
/**
* Should this queue persistently store it's entries?
@@ -92,6 +86,27 @@ public class QueueDTO extends StringIdDT
public String quota;
/**
+ * The message delivery rate (in bytes/sec) at which
+ * the queue enables a producer rate throttle
+ * to allow consumers to catchup with producers.
+ */
+ @XmlAttribute(name="catchup_delivery_rate")
+ public String catchup_delivery_rate;
+
+ /**
+ * The rate at which to throttle enqueues when
+ * consumers are catching up.
+ */
+ @XmlAttribute(name="catchup_enqueue_rate")
+ public String catchup_enqueue_rate;
+
+ /**
+ * The maximum enqueue rate of the queue
+ */
+ @XmlAttribute(name="max_enqueue_rate")
+ public String max_enqueue_rate;
+
+ /**
* To hold any other non-matching XML elements
*/
@XmlAnyElement(lax=true)
@@ -107,12 +122,17 @@ public class QueueDTO extends StringIdDT
if (auto_delete_after != null ? !auto_delete_after.equals(queueDTO.auto_delete_after) : queueDTO.auto_delete_after != null)
return false;
+ if (catchup_delivery_rate != null ? !catchup_delivery_rate.equals(queueDTO.catchup_delivery_rate) : queueDTO.catchup_delivery_rate != null)
+ return false;
+ if (catchup_enqueue_rate != null ? !catchup_enqueue_rate.equals(queueDTO.catchup_enqueue_rate) : queueDTO.catchup_enqueue_rate != null)
+ return false;
if (consumer_buffer != null ? !consumer_buffer.equals(queueDTO.consumer_buffer) : queueDTO.consumer_buffer != null)
return false;
+ if (max_enqueue_rate != null ? !max_enqueue_rate.equals(queueDTO.max_enqueue_rate) : queueDTO.max_enqueue_rate != null)
+ return false;
if (other != null ? !other.equals(queueDTO.other) : queueDTO.other != null) return false;
if (persistent != null ? !persistent.equals(queueDTO.persistent) : queueDTO.persistent != null) return false;
- if (queue_buffer != null ? !queue_buffer.equals(queueDTO.queue_buffer) : queueDTO.queue_buffer != null)
- return false;
+ if (quota != null ? !quota.equals(queueDTO.quota) : queueDTO.quota != null) return false;
if (swap != null ? !swap.equals(queueDTO.swap) : queueDTO.swap != null) return false;
if (swap_range_size != null ? !swap_range_size.equals(queueDTO.swap_range_size) : queueDTO.swap_range_size != null)
return false;
@@ -126,11 +146,14 @@ public class QueueDTO extends StringIdDT
int result = super.hashCode();
result = 31 * result + (auto_delete_after != null ? auto_delete_after.hashCode() : 0);
result = 31 * result + (unified != null ? unified.hashCode() : 0);
- result = 31 * result + (queue_buffer != null ? queue_buffer.hashCode() : 0);
result = 31 * result + (consumer_buffer != null ? consumer_buffer.hashCode() : 0);
result = 31 * result + (persistent != null ? persistent.hashCode() : 0);
result = 31 * result + (swap != null ? swap.hashCode() : 0);
result = 31 * result + (swap_range_size != null ? swap_range_size.hashCode() : 0);
+ result = 31 * result + (quota != null ? quota.hashCode() : 0);
+ result = 31 * result + (catchup_delivery_rate != null ? catchup_delivery_rate.hashCode() : 0);
+ result = 31 * result + (catchup_enqueue_rate != null ? catchup_enqueue_rate.hashCode() : 0);
+ result = 31 * result + (max_enqueue_rate != null ? max_enqueue_rate.hashCode() : 0);
result = 31 * result + (other != null ? other.hashCode() : 0);
return result;
}
Modified: activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md?rev=1211679&r1=1211678&r2=1211679&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md (original)
+++ activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md Wed Dec 7 22:27:41 2011
@@ -245,9 +245,6 @@ A `queue` element may be configured with
[Unified Destinations](#Unified_Destinations) documentation for more
details. Defaults to false.
-* `queue_buffer` : The amount of memory buffer space allocated for each queue.
- Defaults to 32k.
-
* `consumer_buffer` : The amount of memory buffer space allocated to each
subscription for receiving messages. Defaults to 256k.
@@ -271,6 +268,20 @@ memory. Defaults to true.
* `auto_delete_after`: If not set to `0` then the queue will automatically
delete once there have been no consumers, producers or messages on it
for the configured number of seconds. Defaults to 300 if not set.
+
+* `max_enqueue_rate`: The maximum enqueue rate of the queue. Producers
+ will be flow controlled once this enqueue rate is reached. If not set
+ then it is disabled
+
+* `catchup_delivery_rate`: The message delivery rate (in bytes/sec) at which
+ the queue enables a producer rate throttle to allow consumers to catchup
+ with producers. The the consumers must also be stalling on message
+ loading longer than they stall to deliver. If not set then the
+ feature is disabled.
+
+* `catchup_enqueue_rate`: The rate at which to throttle enqueues when
+ consumers are catching up. Defaults to the rate configured in
+ `catchup_delivery_rate`
##### Topics