You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 06:14:34 UTC
svn commit: r961182 -
/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Author: chirino
Date: Wed Jul 7 04:14:34 2010
New Revision: 961182
URL: http://svn.apache.org/viewvc?rev=961182&view=rev
Log:
Moved all the prefetch related logic into a new PrefetchingSubscription sub class. Should make it easier to maintain that logic.
Modified:
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=961182&r1=961181&r2=961182&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Wed Jul 7 04:14:34 2010
@@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit
import java.util.{HashSet, Collections, ArrayList, LinkedList}
import org.apache.activemq.apollo.store.{QueueEntryRange, QueueEntryRecord, MessageRecord}
import collection.mutable.ListBuffer
+import java.util.concurrent.atomic.AtomicInteger
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -52,6 +53,7 @@ trait QueueLifecyleListener {
object Queue extends Log {
+ val subcsription_counter = new AtomicInteger(0)
}
/**
@@ -248,7 +250,7 @@ class Queue(val host: VirtualHost, val d
} else {
// we flush the entry out right away if it looks
// it wont be needed.
- entry.flush
+ entry.flush(true)
}
// release the store batch...
@@ -384,7 +386,7 @@ class Queue(val host: VirtualHost, val d
def haveQuickConsumer = fast_subscriptions.find( sub=> sub.pos.seq <= cur.seq ).isDefined
if( cur.is_loaded && !cur.hasSubs && !cur.is_prefetched && !cur.as_loaded.acquired && !haveQuickConsumer ) {
// then flush out to make space...
- cur.flush
+ cur.flush(true)
cur = cur.getPrevious
} else {
cur = null
@@ -477,7 +479,11 @@ class Queue(val host: VirtualHost, val d
def bind(values: List[DeliveryConsumer]) = retaining(values) {
for (consumer <- values) {
- val subscription = new Subscription(this)
+ val subscription = if( tune_flush_to_store) {
+ new PrefetchingSubscription(this)
+ } else {
+ new Subscription(this)
+ }
subscription.open(consumer)
all_subscriptions += consumer -> subscription
fast_subscriptions ::= subscription
@@ -535,7 +541,7 @@ class Queue(val host: VirtualHost, val d
entry.load
} else {
// flush the the others out of memory.
- entry.flush
+ entry.flush(true)
}
entry = entry.getNext
}
@@ -602,6 +608,9 @@ class QueueEntry(val queue:Queue, val se
def is_prefetched = prefetched>0
+ def <(value:QueueEntry) = this.seq < value.seq
+ def <=(value:QueueEntry) = this.seq <= value.seq
+
def head():QueueEntry = {
state = new Head
this
@@ -708,7 +717,7 @@ class QueueEntry(val queue:Queue, val se
def dispatch() = state.dispatch
// These methods may cause a change in the current state.
- def flush = state.flush
+ def flush(asap:Boolean) = state.flush(asap)
def load = state.load
def remove = state.remove
@@ -752,32 +761,49 @@ class QueueEntry(val queue:Queue, val se
/**
* Triggers the entry to get flushed if it's not already flushed.
*/
- def flush = {}
+ def flush(asap:Boolean) = {}
/**
* Takes the current entry out of the prefetch of all subscriptions
- * which have prefetched the entry. Returns the list of subscriptions which
- * had prefetched it.
+ * which have prefetched the entry. Runs the partial function then
+ * refills the prefetch of those subs that were affected.
*/
- def prefetch_remove = {
- var rc = List[Subscription]()
+ def with_prefetch_droped(func: =>Unit ):Unit = {
if( queue.tune_flush_to_store ) {
- // Update the prefetch counter to reflect that this entry is no longer being prefetched.
- var cur = entry
- while( cur!=null && is_prefetched ) {
- if( cur.hasSubs ) {
- (cur.parked).foreach { sub =>
- if( sub.is_prefetched(entry) ) {
- sub.remove_from_prefetch(entry)
- rc ::= sub
+
+ // drop the prefetch
+ val expected = prefetched
+ var prefechingSubs = List[Subscription]()
+ if( queue.tune_flush_to_store ) {
+ // Update the prefetch counter to reflect that this entry is no longer being prefetched.
+ var cur = entry
+ while( cur!=null && is_prefetched ) {
+ if( cur.hasSubs ) {
+ (cur.parked).foreach { case sub:PrefetchingSubscription =>
+ if( sub.is_prefetched(entry) ) {
+ sub.remove_from_prefetch(entry)
+ prefechingSubs ::= sub
+ }
}
}
+ cur = cur.getPrevious
}
- cur = cur.getPrevious
}
- assert(!is_prefetched, "entry should not be prefetched.")
+ if(prefetched!=0) {
+ assert(prefetched==0, "entry should not be prefetched.")
+ }
+ assert(expected == prefechingSubs.size, "should get all the subs")
+
+ func
+
+ // refill the prefetch
+ prefechingSubs.foreach{ case sub =>
+ sub.refill_prefetch
+ }
+
+ } else {
+ func
}
- rc
}
/**
@@ -785,16 +811,16 @@ class QueueEntry(val queue:Queue, val se
* as a result of an aquired ack.
*/
def remove = {
- // take us out of subscription prefetches..
- var refill_preftch_list = prefetch_remove
- // advance subscriptions that were on this entry..
- parked.foreach(_.advance(next))
- nextOrTail :::= parked
- parked = Nil
- // take the entry of the entries list..
- unlink
- // refill the subscription prefetches..
- refill_preftch_list.foreach( _.refill_prefetch )
+ with_prefetch_droped {
+
+ // advance subscriptions that were on this entry..
+ advance(parked)
+ parked = Nil
+
+ // take the entry of the entries list..
+ unlink
+
+ }
}
/**
@@ -835,7 +861,7 @@ class QueueEntry(val queue:Queue, val se
override def remove = throw new AssertionError("Head entry cannot be removed")
override def load = throw new AssertionError("Head entry cannot be loaded")
- override def flush = throw new AssertionError("Head entry cannot be flushed")
+ override def flush(asap:Boolean) = throw new AssertionError("Head entry cannot be flushed")
}
/**
@@ -850,7 +876,7 @@ class QueueEntry(val queue:Queue, val se
override def remove = throw new AssertionError("Tail entry cannot be removed")
override def load = throw new AssertionError("Tail entry cannot be loaded")
- override def flush = throw new AssertionError("Tail entry cannot be flushed")
+ override def flush(asap:Boolean) = throw new AssertionError("Tail entry cannot be flushed")
}
@@ -883,7 +909,7 @@ class QueueEntry(val queue:Queue, val se
})
}
- override def flush() = {
+ override def flush(asap:Boolean) = {
if( queue.tune_flush_to_store ) {
if( stored ) {
flushing=true
@@ -895,7 +921,9 @@ class QueueEntry(val queue:Queue, val se
// The storeBatch is only set when called from the messages.offer method
if( delivery.uow!=null ) {
- delivery.uow.completeASAP
+ if( asap ) {
+ delivery.uow.completeASAP
+ }
} else {
// Are swapping out a non-persistent message?
@@ -905,14 +933,18 @@ class QueueEntry(val queue:Queue, val se
val uow = delivery.uow
delivery.storeKey = uow.store(delivery.createMessageRecord)
store
- uow.completeASAP
+ if( asap ) {
+ uow.completeASAP
+ }
uow.release
delivery.uow = null
} else {
-
- queue.host.store.flushMessage(messageKey) {
- queue.store_flush_source.merge(this)
+
+ if( asap ) {
+ queue.host.store.flushMessage(messageKey) {
+ queue.store_flush_source.merge(this)
+ }
}
}
@@ -987,7 +1019,7 @@ class QueueEntry(val queue:Queue, val se
} else {
// Is the sub flow controlled?
- if( sub.full || (sub.prefetchFull && !sub.is_prefetched(entry) ) ) {
+ if( sub.full ) {
// hold back: flow controlled
heldBack += sub
} else {
@@ -1028,7 +1060,7 @@ class QueueEntry(val queue:Queue, val se
def haveQuickConsumer = queue.fast_subscriptions.find( sub=> sub.pos.seq <= seq ).isDefined
if( !hasSubs && !is_prefetched && !acquired && !haveQuickConsumer ) {
// then flush out to make space...
- flush
+ flush(false)
}
return true
}
@@ -1161,17 +1193,18 @@ class QueueEntry(val queue:Queue, val se
queue.enqueue_size_counter += size_delta
}
- var refill_preftch_list = prefetch_remove
+ with_prefetch_droped {
+
+ linkAfter(tmpList)
+ val next = getNext
- linkAfter(tmpList)
- val next = getNext
+ // move the subs to the first entry that we just loaded.
+ parked.foreach(_.advance(next))
+ next :::= parked
- // move the subs to the first entry that we just loaded.
- parked.foreach(_.advance(next))
- next :::= parked
+ unlink
- unlink
- refill_preftch_list.foreach( _.refill_prefetch )
+ }
}
}
}
@@ -1185,12 +1218,18 @@ class QueueEntry(val queue:Queue, val se
}
-
+/**
+ * Interfaces a DispatchConsumer with a Queue. Tracks current position of the consumer
+ * on the queue, and the delivery rate so that slow consumers can be detected. It also
+ * tracks the entries which the consumer has acquired.
+ *
+ */
class Subscription(queue:Queue) extends DeliveryProducer with DispatchLogging {
override protected def log = Queue
def dispatchQueue = queue.dispatchQueue
+ val id = Queue.subcsription_counter.incrementAndGet
var acquired = new LinkedNodeList[AcquiredQueueEntry]
var session: DeliverySession = null
var pos:QueueEntry = null
@@ -1204,13 +1243,11 @@ class Subscription(queue:Queue) extends
def slow = slow_intervals > queue.tune_max_slow_intervals
- var prefetch_tail:QueueEntry = null
- var prefetched_size = 0
var acquired_size = 0L
override def toString = {
def seq(entry:QueueEntry) = if(entry==null) null else entry.seq
- "{ acquired_size: "+acquired_size+", prefetch_size: "+prefetched_size+", pos: "+seq(pos)+" prefetch_tail: "+seq(prefetch_tail)+", tail_parkings: "+tail_parkings+"}"
+ "{ id: "+id+", acquired_size: "+acquired_size+", pos: "+seq(pos)+", tail_parkings: "+tail_parkings+"}"
}
def browser = session.consumer.browser
@@ -1230,10 +1267,8 @@ class Subscription(queue:Queue) extends
def close() = {
pos -= this
-
- invalidate_prefetch
-
pos = null
+
session.refiller = null
session.close
session = null
@@ -1252,21 +1287,16 @@ class Subscription(queue:Queue) extends
* queue entry.
*/
def advance(value:QueueEntry):Unit = {
- assert(value!=null)
- assert(pos!=null)
-
- // Remove the previous pos from the prefetch counters.
- if( prefetch_tail!=null && !pos.is_head) {
- remove_from_prefetch(pos)
- }
+ assert(value!=null)
+ assert(pos!=null)
advanced_size += pos.size
pos = value
session.refiller = pos
- refill_prefetch()
+ refill_prefetch
if( tail_parked ) {
tail_parkings += 1
}
@@ -1278,90 +1308,20 @@ class Subscription(queue:Queue) extends
*/
def rewind(value:QueueEntry):Unit = {
assert(value!=null)
- invalidate_prefetch
pos = value
session.refiller = pos
queue.dispatchQueue << value // queue up the entry to get dispatched..
}
- def invalidate_prefetch: Unit = {
- if (prefetch_tail != null) {
- // release the prefetch counters...
- var cur = pos
- while (cur.seq <= prefetch_tail.seq) {
- if (!cur.is_head) {
- prefetched_size -= cur.size
- cur.prefetched -= 1
- }
- cur = cur.nextOrTail
- }
- assert(prefetched_size == 0, "inconsistent prefetch size.")
- }
- }
-
-
- /**
- * Is the specified queue entry prefeteched by this subscription?
- */
- def is_prefetched(value:QueueEntry) = {
- prefetch_tail!=null && value!=null && pos.seq <= value.seq && value.seq <= prefetch_tail.seq
- }
-
-
- def add_to_prefetch(entry:QueueEntry):Unit = {
- assert( !entry.is_head, "tombstones should not be prefetched..")
- prefetched_size += entry.size
- entry.prefetched += 1
- entry.load
- prefetch_tail = entry
- }
-
- def remove_from_prefetch(entry:QueueEntry):Unit = {
- prefetched_size -= entry.size
- entry.prefetched -= 1
-
- if( entry == prefetch_tail ) {
- prefetch_tail = prefetch_tail.getPrevious;
- if( prefetch_tail==null || prefetch_tail.seq < pos.seq ) {
- prefetch_tail = null
- assert( prefetched_size == 0 , "inconsistent prefetch size.")
- }
- } else {
- assert( prefetched_size >= 0 , "inconsistent prefetch size.")
- }
- }
-
- def refill_prefetch() = {
- if( queue.tune_flush_to_store ) {
- def next_prefetch_pos = if(prefetch_tail==null) {
- if( !pos.is_tail ) {
- pos
- } else {
- null
- }
- } else {
- prefetch_tail.getNext
- }
-
- // attempts to fill the prefetch...
- var next = next_prefetch_pos
- while( !prefetchFull && next!=null ) {
- if( !next.is_head ) {
- add_to_prefetch(next)
- }
- next = next.getNext
- }
- }
- }
-
- def prefetchFull() = acquired_size + prefetched_size >= queue.tune_consumer_buffer
-
def tail_parked = pos eq queue.tail_entry
def matches(entry:Delivery) = session.consumer.matches(entry)
def full = session.full
def offer(delivery:Delivery) = session.offer(delivery)
+ def acquire(entry:QueueEntry) = new AcquiredQueueEntry(entry)
+
+ def refill_prefetch = {}
class AcquiredQueueEntry(val entry:QueueEntry) extends LinkedNode[AcquiredQueueEntry] {
@@ -1420,7 +1380,117 @@ class Subscription(queue:Queue) extends
}
}
+}
- def acquire(entry:QueueEntry) = new AcquiredQueueEntry(entry)
+/**
+ * A subscription which issues message load requests so that messages are prefetched from
+ * the store before they are needed for dispatching purposes.
+ */
+class PrefetchingSubscription(queue:Queue) extends Subscription(queue) {
-}
+ var prefetch_head:QueueEntry = null
+ var prefetch_tail:QueueEntry = null
+ var prefetched_size = 0
+
+ override def toString = {
+ def seq(entry:QueueEntry) = if(entry==null) null else entry.seq
+ "{ id: "+id+", acquired_size: "+acquired_size+", pos: "+seq(pos)+", prefetch_size: "+prefetched_size+", prefetch_head: "+seq(prefetch_head)+", prefetch_tail: "+seq(prefetch_tail)+", tail_parkings: "+tail_parkings+", prefetchFull: "+prefetch_full+"}"
+ }
+
+
+ override def advance(value:QueueEntry):Unit = {
+ super.advance(value)
+ refill_prefetch // update the prefetch window.
+ }
+
+
+ override def rewind(value: QueueEntry) = {
+ invalidate_prefetch
+ super.rewind(value)
+ }
+
+
+ override def close() = {
+ invalidate_prefetch
+ super.close
+ }
+
+ def prefetch_full = acquired_size + prefetched_size >= queue.tune_consumer_buffer
+
+ override def refill_prefetch() = {
+
+ // first lets reclaim prefetch space
+ while( prefetch_head!=null && prefetch_head < pos ) {
+ remove_from_prefetch(prefetch_head)
+ }
+
+ // now lets fill the prefetch if it has capacity.
+ if( !prefetch_full ) {
+
+ var next = if(prefetch_tail==null) {
+ if( pos.is_tail ) {
+ null // can't prefetch the tail..
+ } else if( pos.is_head ) {
+ pos.getNext // can't prefetch the head.
+ } else {
+ pos // start prefetching from the current position.
+ }
+ } else {
+ prefetch_tail.getNext // continue prefetching from the last prefetch tail
+ }
+
+ while( !prefetch_full && next!=null ) {
+
+ prefetched_size += next.size
+ next.prefetched += 1
+ next.load
+
+ if( prefetch_head==null ) {
+ prefetch_head = next
+ }
+ prefetch_tail = next
+
+ next = next.getNext
+ }
+ }
+ }
+
+
+
+ /**
+ * Is the specified queue entry prefeteched by this subscription?
+ */
+ def is_prefetched(value:QueueEntry) = {
+ assert(value!=null)
+ prefetch_head!=null && prefetch_head <= value && value <= prefetch_tail
+ }
+
+ def remove_from_prefetch(entry:QueueEntry):Unit = {
+ prefetched_size -= entry.size
+ entry.prefetched -= 1
+
+ if( entry == prefetch_head ) {
+ if( entry == prefetch_tail ) {
+ prefetch_head = null
+ prefetch_tail = null
+ assert( prefetched_size == 0 , "inconsistent prefetch size.")
+ } else {
+ prefetch_head = prefetch_head.getNext
+ assert( prefetched_size != 0 , "inconsistent prefetch size.")
+ }
+ } else {
+ if( entry == prefetch_tail ) {
+ prefetch_tail = prefetch_tail.getPrevious
+ }
+ assert( prefetched_size != 0 , "inconsistent prefetch size.")
+ }
+ }
+
+ def invalidate_prefetch: Unit = {
+ while (prefetch_head !=null ) {
+ remove_from_prefetch(prefetch_head)
+ }
+ assert(prefetched_size == 0, "inconsistent prefetch size.")
+ }
+
+}
\ No newline at end of file