You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 06:10:36 UTC
svn commit: r961144 -
/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Author: chirino
Date: Wed Jul 7 04:10:35 2010
New Revision: 961144
URL: http://svn.apache.org/viewvc?rev=961144&view=rev
Log:
working towards better consumer decoupling on a single queue, and better mesage prefetching from storage
Modified:
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=961144&r1=961143&r2=961144&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Wed Jul 7 04:10:35 2010
@@ -104,21 +104,47 @@ class Queue(val host: VirtualHost, val d
var flushingSize = 0
var storeId: Long = -1L
+ //
+ // Tuning options.
+ //
+
+ /**
+ * The amount of memory buffer space for receiving messages.
+ */
+ var tune_inbound_buffer = 1024 * 32
+
+ /**
+ * The amount of memory buffer space to use per subscription.
+ */
+ var tune_subscription_buffer = 1024*32
+
/**
- * Tunning options.
+ * Subscribers that consume slower than this rate per seconds will be considered
+ * slow.
*/
- var tune_max_size = 1024 * 256
- var tune_subscription_prefetch = 1024*32
- var tune_max_outbound_size = 1024 * 1204 * 5
- var tune_swap_delay = 100L
+ var tune_slow_subscription_rate = 1000*1024
+
+ /**
+ * The number of milliseconds between slow consumer checks.
+ */
+ var tune_slow_check_interval = 100L
+
+ /**
+ * The number of intervals that a consumer must not meeting the subscription rate before it is
+ * flagged as a slow consumer.
+ */
+ var tune_max_slow_intervals = 10
var enqueue_counter = 0L
var dequeue_counter = 0L
var enqueue_size = 0L
var dequeue_size = 0L
+ private var capacity = tune_inbound_buffer
private var size = 0
+ schedualSlowConsumerCheck
+
def restore(storeId: Long, records:Seq[QueueEntryRecord]) = ^{
this.storeId = storeId
if( !records.isEmpty ) {
@@ -135,11 +161,15 @@ class Queue(val host: VirtualHost, val d
}
} >>: dispatchQueue
+ def addCapacity(amount:Int) = {
+ capacity += amount
+ }
+
object messages extends Sink[Delivery] {
var refiller: Runnable = null
- def full = if(size >= tune_max_size)
+ def full = if(size >= capacity)
true
else
false
@@ -166,54 +196,42 @@ class Queue(val host: VirtualHost, val d
queueDelivery.storeBatch.enqueue(entry.createQueueEntryRecord)
}
- var swap_check = false
- if( !entry.hasSubs ) {
+ // do we have at least 1 subscription that is keeping up with the producers
+ // and is interested in this message?
+// val hold = consumerSubs.valuesIterator.find( sub=> !sub.slow && sub.matches(delivery) ).isDefined
+ def haveQuickConsumer = consumerSubs.valuesIterator.find( sub=> !sub.slow ).isDefined
+
+ var dispatched = false
+ if( entry.prefetched > 0 || haveQuickConsumer ) {
+ // try to dispatch it directly...
+// println("hold: "+delivery.message.getProperty("color"))
+ entry.dispatch
+
+ } else {
+// println("flush: "+delivery.message.getProperty("color"))
// we flush the entry out right away if it looks
// it wont be needed.
-
- if( entry.getPrevious.isFlushedOrFlushing ) {
- // in this case take it out of memory too...
- flushingSize += entry.flush
- } else {
- if( slow_consumers ) {
- if( delivery.storeBatch!=null ) {
- // just make it hit the disk quick.. but keep it in memory.
- delivery.storeBatch.eagerFlush(^{})
- }
- } else {
- if( !checking_for_slow_consumers ) {
- checking_for_slow_consumers=true
- val tail_consumer_counter_copy = tail_consumer_counter
- dispatchQueue.dispatchAfter(tune_swap_delay, TimeUnit.MILLISECONDS, ^{
- if( tail_consumer_counter_copy == tail_consumer_counter ) {
- slow_consumers = true
- }
- checking_for_slow_consumers = false
- })
- }
- }
- swap_check=true
+ entry.flush
+ if( full ) {
+ println("full... waiting for flushes");
}
- } else {
- slow_consumers = false
- tail_consumer_counter += 1
- // entry.dispatch==null if the entry was fully dispatched
- swap_check = entry.dispatch!=null
- }
- // Does it look like we need to start swapping to make room
- // for more messages?
- if( swap_check && host.store!=null && full ) {
- val wasAt = dequeue_size
- dispatchQueue.dispatchAfter(tune_swap_delay, TimeUnit.MILLISECONDS, ^{
- // start swapping if was still blocked after a short delay
- if( dequeue_size == wasAt && full ) {
- println("swapping...")
- swap
- }
- })
+ // just make it hit the disk quick.. but keep it in memory.
+ // delivery.storeBatch.eagerFlush(^{})
}
+// // Does it look like we need to start swapping to make room
+// // for more messages?
+// if( !dispatched && host.store!=null && full ) {
+// val wasAt = dequeue_size
+// dispatchQueue.dispatchAfter(tune_slow_check_interval, TimeUnit.MILLISECONDS, ^{
+// // start swapping if was still blocked after a short delay
+// if( dequeue_size == wasAt && full ) {
+// swap
+// }
+// })
+// }
+
// release the store batch...
if (queueDelivery.storeBatch != null) {
queueDelivery.storeBatch.release
@@ -225,9 +243,62 @@ class Queue(val host: VirtualHost, val d
}
}
- var tail_consumer_counter = 0L
- var checking_for_slow_consumers = false
- var slow_consumers = false
+ def schedualSlowConsumerCheck:Unit = {
+
+ def slowConsumerCheck = {
+ if( retained > 0 ) {
+
+ // target tune_min_subscription_rate / sec
+ val slowCursorDelta = (((tune_slow_subscription_rate) * tune_slow_check_interval) / 1000).toInt
+
+ var idleConsumerCount = 0
+
+ consumerSubs.foreach{ case (consumer, sub)=>
+
+ // Skip over new consumers...
+ if( sub.cursoredCounter != 0 ) {
+
+ val cursorDelta = sub.cursoredCounter - sub.prevCursoredCounter
+ sub.prevCursoredCounter = sub.cursoredCounter
+
+ // If the subscription is NOT slow if it's been tail parked or
+ // it's been parking and cursoring through the data at the tune_slow_subscription_rate
+ if( (sub.tailParked && sub.tailParkings==0) || ( sub.tailParkings > 0 && cursorDelta >= slowCursorDelta ) ) {
+ if( sub.slow ) {
+ debug("consumer is no longer slow: %s", consumer)
+ sub.slowIntervals = 0
+ }
+ } else {
+ if( !sub.slow ) {
+ debug("slow interval: %d, %d, %d", sub.slowIntervals, sub.tailParkings, cursorDelta)
+ sub.slowIntervals += 1
+ if( sub.slow ) {
+ debug("consumer is slow: %s", consumer)
+ }
+ }
+ }
+
+ // has the consumer been stuck at the tail?
+ if( sub.tailParked && sub.tailParkings==0 ) {
+ idleConsumerCount += 1;
+ }
+
+ sub.tailParkings = 0
+ }
+ }
+
+ // Trigger a swap if we have slow consumers and we are full..
+ if( idleConsumerCount > 0 && messages.full && flushingSize==0 ) {
+ swap
+ }
+ schedualSlowConsumerCheck
+ }
+ }
+
+ dispatchQueue.dispatchAfter(tune_slow_check_interval, TimeUnit.MILLISECONDS, ^{
+ slowConsumerCheck
+ })
+ }
def ack(entry: QueueEntry, sb:StoreBatch) = {
if (entry.ref != -1) {
@@ -250,8 +321,6 @@ class Queue(val host: VirtualHost, val d
dequeue_size += entry.size
size -= entry.size
entry.tombstone
-
- messages.refiller.run
}
@@ -266,6 +335,9 @@ class Queue(val host: VirtualHost, val d
entry.unlink
ack(entry.value, tx)
}
+
+// println("acked... full: "+messages.full)
+ messages.refiller.run
}
/////////////////////////////////////////////////////////////////////
@@ -327,6 +399,7 @@ class Queue(val host: VirtualHost, val d
val subscription = new Subscription(this)
subscription.connect(consumer)
consumerSubs += consumer -> subscription
+ addCapacity( tune_subscription_buffer )
}
} >>: dispatchQueue
@@ -336,6 +409,7 @@ class Queue(val host: VirtualHost, val d
case Some(cs) =>
cs.close
consumerSubs -= consumer
+ addCapacity( -tune_subscription_buffer )
case None =>
}
}
@@ -362,78 +436,25 @@ class Queue(val host: VirtualHost, val d
* messages from the producer.
*/
def swap():Unit = {
-
if( !host.serviceState.isStarted ) {
return
}
-
- class Prio(val entry:QueueEntry) extends Comparable[Prio] {
- var value = 0
- def compareTo(o: Prio) = o.value - value
- }
-
- val prios = new ArrayList[Prio](entries.size())
-
+
+ debug("swapping...")
var entry = entries.getHead
while( entry!=null ) {
+ println(entries)
if( entry.asTombstone == null ) {
- prios.add(new Prio(entry))
- }
- entry = entry.getNext
- }
-
-
- /**
- * adds keep priority to the range of entries starting at x
- * and spanning the size provided.
- */
- def prioritize(i:Int, size:Int, p:Int):Unit = {
- val prio = prios.get(i)
- prio.value += p
- val remainingSize = size - prio.entry.size
- if( remainingSize > 0 ) {
- val next = i + 1
- if( next < prios.size ) {
- prioritize(next, remainingSize, p-1)
- }
- }
- }
- // Prioritize the entries so that higher priority entries are swapped in,
- // and lower priority entries are swapped out.
- var i = 0
- while( i < prios.size ) {
- val prio = prios.get(i)
- if( prio.entry.hasSubs ) {
-
- var credits =0;
- if( prio.entry.competing != Nil) {
- credits += prio.entry.competing.size * tune_subscription_prefetch
- } else{
- if( prio.entry.browsing != Nil ) {
- credits += tune_subscription_prefetch
- }
+ // only keep prefetch entries around..
+ if( entry.prefetched == 0 ) {
+ entry.flush
+ } else {
+ entry.load
}
- prioritize(i, credits, 1000)
-
}
- i += 1
- }
- Collections.sort(prios)
-
- var remaining = tune_max_size / 2
- i = 0
- while( i < prios.size ) {
- val prio = prios.get(i)
- val entry = prio.entry
- if( remaining > 0 ) {
- loadingSize += entry.load
- remaining -= entry.size
- } else {
- flushingSize += entry.flush
- }
- i += 1
+ entry = entry.getNext
}
}
@@ -473,7 +494,7 @@ class Queue(val host: VirtualHost, val d
entry.flushed
}
}
-
+ println("flushes done... full: "+messages.full);
messages.refiller.run
}
@@ -488,11 +509,17 @@ object QueueEntry extends Sizer[QueueEnt
class QueueEntry(val queue:Queue) extends LinkedNode[QueueEntry] with Comparable[QueueEntry] with Runnable {
import QueueEntry._
- var seq: Long = -1L
+ var seq: Long = 0L
var competing:List[Subscription] = Nil
var browsing:List[Subscription] = Nil
+ var prefetched = 0
+
var value:EntryType = null
+ override def toString = {
+ "{seq: "+seq+", prefetched: "+prefetched+", value: "+value+", competing: "+competing+", browsing: "+browsing+"}"
+ }
+
def createQueueEntryRecord = {
val qer = new QueueEntryRecord
qer.queueKey = queue.storeId
@@ -509,6 +536,8 @@ class QueueEntry(val queue:Queue) extend
def created(seq:Long, delivery:Delivery) = {
this.seq = seq
this.value = new Loaded(delivery)
+
+ (browsing:::competing).foreach { sub => sub.addPrefetch(this) }
this
}
@@ -530,8 +559,19 @@ class QueueEntry(val queue:Queue) extend
}
def tombstone = {
+
+ // remove from prefetch counters..
+ var cur = this;
+ while( prefetched > 0 ) {
+ if( cur.hasSubs ) {
+ (cur.browsing ::: cur.competing).foreach { sub => if( sub.prefetched(cur) ) { sub.removePrefetch(cur) } }
+ }
+ cur = cur.getPrevious
+ }
+
+
this.value = new Tombstone()
- if( seq != -1L ) {
+ if( seq != 0L ) {
def merge(lv:QueueEntry, rv:QueueEntry):Boolean = {
if( lv==null || rv==null) {
@@ -639,8 +679,8 @@ class QueueEntry(val queue:Queue) extend
def asFlushed:Flushed = null
def asLoaded:Loaded = null
- def flush:Int = 0
- def load:Int = 0
+ def flush = {}
+ def load = {}
def isFlushedOrFlushing = false
}
@@ -661,7 +701,9 @@ class QueueEntry(val queue:Queue) extend
competing = Nil
p
}
-
+
+ override def toString = { "ts:{ count: "+count+"}" }
+
}
class Flushed(val ref:Long, val size:Int) extends EntryType {
@@ -672,11 +714,13 @@ class QueueEntry(val queue:Queue) extend
override def isFlushedOrFlushing = true
+ override def toString = { "flushed:{ loading: "+loading+", size:"+size+"}" }
+
// Flushed entries can't be dispatched until
// they get loaded.
def dispatch():QueueEntry = {
if( !loading ) {
- var remaining = queue.tune_subscription_prefetch - size
+ var remaining = queue.tune_subscription_buffer - size
load
// make sure the next few entries are loaded too..
@@ -694,12 +738,11 @@ class QueueEntry(val queue:Queue) extend
null
}
- override def load():Int = {
- if( loading ) {
- 0
- } else {
+ override def load() = {
+ if( !loading ) {
// start loading it back...
loading = true
+ queue.loadingSize += size
queue.host.store.loadMessage(ref) { delivery =>
// pass off to a source so it can aggregate multiple
// loads to reduce cross thread synchronization
@@ -707,7 +750,6 @@ class QueueEntry(val queue:Queue) extend
queue.store_load_source.merge((QueueEntry.this, delivery.get))
}
}
- size
}
}
}
@@ -719,6 +761,8 @@ class QueueEntry(val queue:Queue) extend
def size = delivery.size
var flushing = false
+ override def toString = { "loaded:{ flushing: "+flushing+", aquired: "+aquired+", size:"+size+"}" }
+
override def isFlushedOrFlushing = {
flushing
}
@@ -734,11 +778,10 @@ class QueueEntry(val queue:Queue) extend
}
}
- override def flush():Int = {
- if( flushing ) {
- 0
- } else {
+ override def flush():Unit = {
+ if( !flushing ) {
flushing=true
+ queue.flushingSize+=size
if( delivery.storeBatch!=null ) {
delivery.storeBatch.eagerFlush(^{
@@ -750,14 +793,12 @@ class QueueEntry(val queue:Queue) extend
queue.store_flush_source.merge(QueueEntry.this)
}
}
-
- size
}
}
def dispatch():QueueEntry = {
if( delivery==null ) {
- // can't dispatch untill the delivery is set.
+ // can't dispatch until the delivery is set.
null
} else {
@@ -829,6 +870,14 @@ class QueueEntry(val queue:Queue) extend
val p = nextEntry
p.addBrowsing(browsingFastSubs)
p.addCompeting(competingFastSubs)
+
+
+ // if we are no longer needed and we are under pressure to make room and the previous was flushed....
+ if( !hasSubs && prefetched==0 && !aquired && queue.messages.full && getPrevious.isFlushedOrFlushing ) {
+ // then flush out to make space...
+ flush
+ }
+
p
} else {
null
@@ -850,14 +899,89 @@ class Subscription(queue:Queue) extends
var session: DeliverySession = null
var pos:QueueEntry = null
+ var cursoredCounter = 0L
+
+ // Vars used to detect slow consumers.
+ var prevCursoredCounter = 0L
+ var tailParkings = 0
+ var slowIntervals = 0
+
+ def slow = slowIntervals > queue.tune_max_slow_intervals
+
+ var lastPrefetchPos:QueueEntry = null
+ var prefetchSize = 0
+
+
+ override def toString = "{ prefetchSize: "+prefetchSize+", pos: "+(if(pos==null) null else pos.seq)+" lastPrefetchPos: "+(if(lastPrefetchPos==null) null else lastPrefetchPos.seq)+" }"
+
def position(value:QueueEntry):Unit = {
+ if( value!=null ) {
+ // setting a new position..
+ if( pos!=null && pos.value!=null ) {
+ // Remove the previous pos from the prefetch counters.
+ removePrefetch(pos)
+ cursoredCounter += pos.size
+ }
+ } else {
+ // setting null pos, happens when the sub is closed.
+ if( lastPrefetchPos!=null ) {
+ var cur = pos
+
+ // clean up it's prefetch counters on the entries..
+ while( cur!=null && cur.value!=null ) {
+ cur.prefetched -= 1
+ cur = if( cur == lastPrefetchPos ) {
+ null
+ } else {
+ cur.nextEntry
+ }
+ }
+ lastPrefetchPos = null
+ prefetchSize=0
+ }
+ }
pos = value
session.refiller = pos
+ if( tailParked ) {
+ tailParkings += 1
+ }
+ }
+
+
+ def prefetched(value:QueueEntry) = {
+ pos.seq <= value.seq && value.seq <= lastPrefetchPos.seq
}
+ def removePrefetch(value:QueueEntry):Unit = {
+// println("prefetch rm: "+value.seq)
+ value.prefetched -= 1
+ prefetchSize -= value.size
+ fillPrefetch()
+ }
+
+ def fillPrefetch() = {
+ // attempts to fill the prefetch...
+ var next = lastPrefetchPos.getNext
+ while(prefetchSize < queue.tune_subscription_buffer && next!=null && next.value!=null ) {
+ next.load
+ addPrefetch(next)
+ next = next.getNext
+ }
+ }
+
+ def addPrefetch(value:QueueEntry):Unit = {
+// println("prefetch add: "+value.seq)
+ prefetchSize += value.size
+ lastPrefetchPos = value
+ value.prefetched += 1
+ }
+
+ def tailParked = pos eq queue.tailEntry
+
def connect(consumer: DeliveryConsumer) = {
session = consumer.connect(this)
queue.headEntry.addCompeting(this :: Nil)
+ addPrefetch(queue.headEntry)
queue.dispatchQueue << queue.headEntry
}