You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/12/08 18:41:20 UTC
svn commit: r1212001 - in /activemq/activemq-apollo/trunk:
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java
apollo-website/src/documentation/user-manual.md
Author: chirino
Date: Thu Dec 8 17:41:19 2011
New Revision: 1212001
URL: http://svn.apache.org/viewvc?rev=1212001&view=rev
Log:
More tweaks to get APLO-96 more perfect. We now doing swaps when fast consumers are attached.
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java
activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1212001&r1=1212000&r2=1212001&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Thu Dec 8 17:41:19 2011
@@ -37,7 +37,6 @@ object Queue extends Log {
val subcsription_counter = new AtomicInteger(0)
val PREFTCH_LOAD_FLAG = 1.toByte
- val PREFTCH_HOLD_FLAG = 2.toByte
class MemorySpace {
var items = 0
@@ -151,7 +150,7 @@ class Queue(val router: LocalRouter, val
* the queue enables a enqueue rate throttle
* to allow consumers to catchup with producers.
*/
- var tune_catchup_delivery_rate = 0
+ var tune_fast_delivery_rate = 0
/**
* The rate at which to throttle producers when
@@ -170,8 +169,8 @@ class Queue(val router: LocalRouter, val
tune_swap = tune_persistent && config.swap.getOrElse(true)
tune_swap_range_size = config.swap_range_size.getOrElse(10000)
tune_consumer_buffer = Option(config.consumer_buffer).map(MemoryPropertyEditor.parse(_).toInt).getOrElse(256*1024)
- tune_catchup_delivery_rate = Option(config.catchup_delivery_rate).map(MemoryPropertyEditor.parse(_).toInt).getOrElse(-1)
- tune_catchup_enqueue_rate = Option(config.catchup_enqueue_rate).map(MemoryPropertyEditor.parse(_).toInt).getOrElse(tune_catchup_delivery_rate)
+ tune_fast_delivery_rate = Option(config.fast_delivery_rate).map(MemoryPropertyEditor.parse(_).toInt).getOrElse(1024*1024)
+ tune_catchup_enqueue_rate = Option(config.catchup_enqueue_rate).map(MemoryPropertyEditor.parse(_).toInt).getOrElse(-1)
tune_max_enqueue_rate = Option(config.max_enqueue_rate).map(MemoryPropertyEditor.parse(_).toInt).getOrElse(-1)
tune_quota = Option(config.quota).map(MemoryPropertyEditor.parse(_)).getOrElse(-1)
@@ -229,7 +228,7 @@ class Queue(val router: LocalRouter, val
var producer_counter = 0L
var consumer_counter = 0L
- var tail_prefetch = 0L
+ var consumers_keeping_up = true
var individual_swapped_items = 0
@@ -550,23 +549,7 @@ class Queue(val router: LocalRouter, val
entry.dispatch
}
- if( entry.as_loaded.acquired) {
- // Enqueued message aquired.
- } else if( tail_prefetch > 0 ) {
- // Enqueued message prefeteched.
- tail_prefetch -= entry.size
- entry.prefetch_flags = PREFTCH_LOAD_FLAG
- entry.load(consumer_swapped_in)
- } else {
-// val prev = entry.getPrevious
-// if( (prev.as_loaded!=null && prev.as_loaded.swapping_out) || (prev.as_swapped!=null && !prev.as_swapped.swapping_in) ) {
-// // Swap it out ASAP
-// entry.swap(true)
-// println("Enqueued message swapped.")
-// } else {
-// trigger_swap
-// // Avoid swapping right away..
-// }
+ if( !(consumers_keeping_up || entry.as_loaded.acquired) ) {
entry.swap(true)
}
@@ -576,6 +559,10 @@ class Queue(val router: LocalRouter, val
queueDelivery.uow = null
}
+
+ if( full ) {
+ trigger_swap
+ }
true
}
}
@@ -642,6 +629,8 @@ class Queue(val router: LocalRouter, val
}
}
+ var keep_up_delivery_rate = 0L
+
def swap_messages:Unit = {
dispatch_queue.assertExecuting()
@@ -679,9 +668,12 @@ class Queue(val router: LocalRouter, val
}
// Set the prefetch flags
+ val was_keepingup = consumers_keeping_up
+ consumers_keeping_up = false
all_subscriptions.valuesIterator.foreach{ x=>
x.refill_prefetch
}
+ consumers_keeping_up = consumers_keeping_up && delivery_rate > tune_fast_delivery_rate
// swap out messages.
cur = entries.getHead
@@ -690,8 +682,11 @@ class Queue(val router: LocalRouter, val
val loaded = cur.as_loaded
if( loaded!=null ) {
if( cur.prefetch_flags==0 && !loaded.acquired ) {
- val asap = !cur.as_loaded.acquired
- cur.swap(asap)
+ if( consumers_keeping_up && (loaded.space eq producer_swapped_in)) {
+ // don't move out. keeps producer mem maxed to slow down the producer
+ } else {
+ cur.swap(true)
+ }
} else {
cur.load(consumer_swapped_in)
}
@@ -770,7 +765,7 @@ class Queue(val router: LocalRouter, val
// Figure out what the max enqueue rate should be.
max_enqueue_rate = Int.MaxValue
- if( tune_catchup_delivery_rate>=0 && tune_catchup_enqueue_rate>=0 && delivery_rate>tune_catchup_delivery_rate && swapped_out_size > 0 && stall_ratio < 1.0 ) {
+ if( tune_fast_delivery_rate>=0 && tune_catchup_enqueue_rate>=0 && delivery_rate>tune_fast_delivery_rate && swapped_out_size > 0 && stall_ratio < 1.0 ) {
max_enqueue_rate = tune_catchup_enqueue_rate
}
if(tune_max_enqueue_rate >=0 ) {
@@ -1904,7 +1899,6 @@ class Subscription(val queue:Queue, val
consumer.release
queue.check_idle
- queue.tail_prefetch = 0
queue.trigger_swap
} else {}
}
@@ -1920,9 +1914,6 @@ class Subscription(val queue:Queue, val
if( tail_parked ) {
if(session.consumer.close_on_drain) {
close
- } else {
- var remaining = queue.tune_consumer_buffer - acquired_size;
- queue.tail_prefetch = queue.tail_prefetch.max(remaining)
}
}
}
@@ -2023,12 +2014,11 @@ class Subscription(val queue:Queue, val
cursor = next
}
- // If we hit the tail.. credit it so that we avoid swapping too soon.
- if( cursor == null ) {
- queue.tail_prefetch = queue.tail_prefetch.max(((enqueue_size_per_interval/2) - remaining).max(remaining))
+ // If we hit the tail or the producer swap in area.. let the queue know we are keeping up.
+ if( !queue.consumers_keeping_up && (cursor == null || (cursor.as_loaded!=null && (cursor.as_loaded.space eq queue.producer_swapped_in))) ) {
+ queue.consumers_keeping_up = true
}
-
}
class AcquiredQueueEntry(val entry:QueueEntry) extends LinkedNode[AcquiredQueueEntry] {
Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java?rev=1212001&r1=1212000&r2=1212001&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java Thu Dec 8 17:41:19 2011
@@ -87,15 +87,22 @@ public class QueueDTO extends StringIdDT
/**
* The message delivery rate (in bytes/sec) at which
- * the queue enables a producer rate throttle
- * to allow consumers to catchup with producers.
+ * the queue considers the consumers fast and
+ * may start slowing down producers to match the consumption
+ * rate if the consumers are at the tail of the queue.
*/
- @XmlAttribute(name="catchup_delivery_rate")
- public String catchup_delivery_rate;
+ @XmlAttribute(name="fast_delivery_rate")
+ public String fast_delivery_rate;
/**
- * The rate at which to throttle enqueues when
- * consumers are catching up.
+ * If set, and the the current delivery
+ * rate is exceeding the configured value
+ * of fast_delivery_rate and the consumers
+ * are spending more time loading from
+ * the store than delivering, then the
+ * enqueue rate will be throttled to the
+ * specified value so that the consumers
+ * can catch up and reach the tail of the queue.
*/
@XmlAttribute(name="catchup_enqueue_rate")
public String catchup_enqueue_rate;
@@ -122,7 +129,7 @@ public class QueueDTO extends StringIdDT
if (auto_delete_after != null ? !auto_delete_after.equals(queueDTO.auto_delete_after) : queueDTO.auto_delete_after != null)
return false;
- if (catchup_delivery_rate != null ? !catchup_delivery_rate.equals(queueDTO.catchup_delivery_rate) : queueDTO.catchup_delivery_rate != null)
+ if (fast_delivery_rate != null ? !fast_delivery_rate.equals(queueDTO.fast_delivery_rate) : queueDTO.fast_delivery_rate != null)
return false;
if (catchup_enqueue_rate != null ? !catchup_enqueue_rate.equals(queueDTO.catchup_enqueue_rate) : queueDTO.catchup_enqueue_rate != null)
return false;
@@ -151,7 +158,7 @@ public class QueueDTO extends StringIdDT
result = 31 * result + (swap != null ? swap.hashCode() : 0);
result = 31 * result + (swap_range_size != null ? swap_range_size.hashCode() : 0);
result = 31 * result + (quota != null ? quota.hashCode() : 0);
- result = 31 * result + (catchup_delivery_rate != null ? catchup_delivery_rate.hashCode() : 0);
+ result = 31 * result + (fast_delivery_rate != null ? fast_delivery_rate.hashCode() : 0);
result = 31 * result + (catchup_enqueue_rate != null ? catchup_enqueue_rate.hashCode() : 0);
result = 31 * result + (max_enqueue_rate != null ? max_enqueue_rate.hashCode() : 0);
result = 31 * result + (other != null ? other.hashCode() : 0);
Modified: activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md?rev=1212001&r1=1212000&r2=1212001&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md (original)
+++ activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md Thu Dec 8 17:41:19 2011
@@ -269,19 +269,22 @@ memory. Defaults to true.
delete once there have been no consumers, producers or messages on it
for the configured number of seconds. Defaults to 300 if not set.
+* `fast_delivery_rate`: The message delivery rate (in bytes/sec) at which
+ the queue considers the consumers fast enough to start slowing down enqueue
+ rate to match the consumption rate if the consumers are at the
+ tail of the queue.
+
+* `catchup_enqueue_rate`: If set, and the the current delivery
+ rate is exceeding the configured value of `fast_delivery_rate` and
+ the consumers are spending more time loading from the store than
+ delivering, then the enqueue rate will be throttled to the
+ specified value so that the consumers can catch up and reach the
+ tail of the queue.
+
* `max_enqueue_rate`: The maximum enqueue rate of the queue. Producers
will be flow controlled once this enqueue rate is reached. If not set
then it is disabled
-* `catchup_delivery_rate`: The message delivery rate (in bytes/sec) at which
- the queue enables a producer rate throttle to allow consumers to catchup
- with producers. The the consumers must also be stalling on message
- loading longer than they stall to deliver. If not set then the
- feature is disabled.
-
-* `catchup_enqueue_rate`: The rate at which to throttle enqueues when
- consumers are catching up. Defaults to the rate configured in
- `catchup_delivery_rate`
##### Topics