You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 06:11:16 UTC
svn commit: r961151 -
/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Author: chirino
Date: Wed Jul 7 04:11:16 2010
New Revision: 961151
URL: http://svn.apache.org/viewvc?rev=961151&view=rev
Log:
Improving the state management of the queue entries.
Modified:
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=961151&r1=961150&r2=961151&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Wed Jul 7 04:11:16 2010
@@ -1,5 +1,5 @@
/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
+ * Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
@@ -81,25 +81,19 @@ class Queue(val host: VirtualHost, val d
ack_source.setEventHandler(^ {drain_acks});
ack_source.resume
- val store_load_source = createSource(new ListEventAggregator[(QueueEntry, MessageRecord)](), dispatchQueue)
- store_load_source.setEventHandler(^ {drain_store_loads});
- store_load_source.resume
-
- val store_flush_source = createSource(new ListEventAggregator[QueueEntry](), dispatchQueue)
- store_flush_source.setEventHandler(^ {drain_store_flushes});
- store_flush_source.resume
-
val session_manager = new SinkMux[Delivery](messages, dispatchQueue, Delivery)
// sequence numbers.. used to track what's in the store.
var message_seq_counter = 1L
- val headEntry = new QueueEntry(this, 0L).tombstone
- var tailEntry = new QueueEntry(this, next_message_seq)
-
var counter = 0
+
val entries = new LinkedNodeList[QueueEntry]()
+ val headEntry = new QueueEntry(this, 0L);
+ var tailEntry = new QueueEntry(this, next_message_seq)
+
entries.addFirst(headEntry)
+ headEntry.tombstone
var loadingSize = 0
var flushingSize = 0
@@ -142,7 +136,7 @@ class Queue(val host: VirtualHost, val d
var dequeue_size = 0L
private var capacity = tune_inbound_buffer
- private var size = 0
+ var size = 0
schedualSlowConsumerCheck
@@ -150,7 +144,7 @@ class Queue(val host: VirtualHost, val d
this.storeId = storeId
if( !records.isEmpty ) {
records.foreach { qer =>
- val entry = new QueueEntry(Queue.this,qer.queueSeq).flushed(qer)
+ val entry = new QueueEntry(Queue.this,qer.queueSeq).init(qer)
entries.addLast(entry)
}
@@ -183,10 +177,9 @@ class Queue(val host: VirtualHost, val d
val entry = tailEntry
tailEntry = new QueueEntry(Queue.this, next_message_seq)
val queueDelivery = delivery.copy
- entry.created(queueDelivery)
+ entry.init(queueDelivery)
queueDelivery.storeBatch = delivery.storeBatch
- size += entry.size
entries.addLast(entry)
counter += 1;
enqueue_counter += 1
@@ -194,14 +187,14 @@ class Queue(val host: VirtualHost, val d
// Do we need to do a persistent enqueue???
if (queueDelivery.storeBatch != null) {
- queueDelivery.storeBatch.enqueue(entry.createQueueEntryRecord)
+ queueDelivery.storeBatch.enqueue(entry.toQueueEntryRecord)
}
def haveQuickConsumer = fastSubs.find( sub=> sub.pos.seq <= entry.seq ).isDefined
var dispatched = false
- if( entry.prefetched > 0 || haveQuickConsumer ) {
+ if( entry.hasSubs || haveQuickConsumer ) {
// try to dispatch it directly...
entry.dispatch
} else {
@@ -227,15 +220,9 @@ class Queue(val host: VirtualHost, val d
def slowConsumerCheck = {
if( retained > 0 ) {
- checkCounter += 1
-
- // target tune_min_subscription_rate / sec
- val slowCursorDelta = (((tune_slow_subscription_rate) * tune_slow_check_interval) / 1000).toInt
- var idleConsumerCount = 0
-
-// Handy for periodically looking at the dispatch state...
-//
+ // Handy for periodically looking at the dispatch state...
+// checkCounter += 1
// if( !consumerSubs.isEmpty && (checkCounter%100)==0 ) {
// println("using "+size+" out of "+capacity+" buffer space.");
// var cur = entries.getHead
@@ -248,6 +235,9 @@ class Queue(val host: VirtualHost, val d
// println("tail: "+tailEntry)
// }
+ // target tune_min_subscription_rate / sec
+ val slowCursorDelta = (((tune_slow_subscription_rate) * tune_slow_check_interval) / 1000).toInt
+ var idleConsumerCount = 0
fastSubs = Nil
consumerSubs.foreach{ case (consumer, sub)=>
@@ -316,13 +306,13 @@ class Queue(val host: VirtualHost, val d
}
def ack(entry: QueueEntry, sb:StoreBatch) = {
- if (entry.ref != -1) {
+ if (entry.messageKey != -1) {
val storeBatch = if( sb == null ) {
host.store.createStoreBatch
} else {
sb
}
- storeBatch.dequeue(entry.createQueueEntryRecord)
+ storeBatch.dequeue(entry.toQueueEntryRecord)
if( sb == null ) {
storeBatch.release
}
@@ -334,7 +324,6 @@ class Queue(val host: VirtualHost, val d
dequeue_counter += 1
counter -= 1
dequeue_size += entry.size
- size -= entry.size
entry.tombstone
}
@@ -477,158 +466,78 @@ class Queue(val host: VirtualHost, val d
}
}
- def drain_store_loads() = {
- val data = store_load_source.getData
- data.foreach { case (entry,flushed) =>
- loadingSize -= entry.size
+ val store_flush_source = createSource(new ListEventAggregator[QueueEntry#Loaded](), dispatchQueue)
+ store_flush_source.setEventHandler(^ {drain_store_flushes});
+ store_flush_source.resume
- val delivery = new Delivery()
- delivery.message = ProtocolFactory.get(flushed.protocol).decode(flushed.value)
- delivery.size = flushed.size
- delivery.storeKey = flushed.key
+ def drain_store_flushes() = {
+ val data = store_flush_source.getData
+ data.foreach { loaded =>
+ loaded.flushed
+ }
+ messages.refiller.run
- entry.loaded(delivery)
+ }
- size += entry.size
+ val store_load_source = createSource(new ListEventAggregator[(QueueEntry#Flushed, MessageRecord)](), dispatchQueue)
+ store_load_source.setEventHandler(^ {drain_store_loads});
+ store_load_source.resume
- }
- data.foreach { case (entry,_) =>
- if( entry.hasSubs ) {
- entry.run
- }
+ def drain_store_loads() = {
+ val data = store_load_source.getData
+ data.foreach { case (flushed,messageRecord) =>
+ flushed.loaded(messageRecord)
}
- }
- def drain_store_flushes() = {
- val data = store_flush_source.getData
- data.foreach { entry =>
- flushingSize -= entry.size
-
- // by the time we get called back, subs my be interested in the entry
- // or it may have been acked.
- if( !entry.hasSubs && entry.asLoaded!=null ) {
- size -= entry.size
- entry.flushed
+ data.foreach { case (flushed,_) =>
+ if( flushed.entry.hasSubs ) {
+ flushed.entry.run
}
}
- messages.refiller.run
-
}
}
-
object QueueEntry extends Sizer[QueueEntry] {
-
def size(value: QueueEntry): Int = value.size
}
class QueueEntry(val queue:Queue, val seq:Long) extends LinkedNode[QueueEntry] with Comparable[QueueEntry] with Runnable with DispatchLogging {
override protected def log = Queue
-
import QueueEntry._
+ // Competing subscriptions try to exclusivly aquire the entry.
var competing:List[Subscription] = Nil
+ // These are subscriptions which will not be exclusivly aquiring the entry.
var browsing:List[Subscription] = Nil
+ // The number of subscriptions which have requested this entry to be prefetech (held in memory) so that it's
+ // ready for them to get dispatched.
var prefetched = 0
- var value:EntryType = new Tail
-
- override def toString = {
- "{seq: "+seq+", prefetched: "+prefetched+", value: "+value+", competing: "+competing+", browsing: "+browsing+"}"
- }
+ // The current state of the entry: Tail | Loaded | Flushed | Tombstone
+ var state:EntryState = new Tail
- def createQueueEntryRecord = {
- val qer = new QueueEntryRecord
- qer.queueKey = queue.storeId
- qer.queueSeq = seq
- qer.messageKey = value.ref
- qer.size = value.size
- qer
- }
- def compareTo(o: QueueEntry) = {
- (seq - o.seq).toInt
- }
-
- def created(delivery:Delivery) = {
- this.value = new Loaded(delivery)
+ def init(delivery:Delivery):QueueEntry = {
+ this.state = new Loaded(delivery)
+ queue.size += size
this
}
- def loaded(delivery:Delivery) = {
- this.value = new Loaded(delivery)
- this
- }
-
- def flushed() = {
- val loaded = value.asLoaded
- this.value = new Flushed(loaded.delivery.storeKey, loaded.size)
- this
- }
-
- def flushed(qer:QueueEntryRecord) = {
- this.value = new Flushed(qer.messageKey, qer.size)
- this
- }
-
- def tombstone = {
-
- // remove from prefetch counters..
- var cur = this;
- while( cur!=null && prefetched > 0 ) {
- if( cur.hasSubs ) {
- (cur.browsing ::: cur.competing).foreach { sub => if( sub.prefetched(cur) ) { sub.removePrefetch(cur) } }
- }
- cur = cur.getPrevious
- if( cur == null ) {
- error("illegal prefetch state detected.")
- }
- }
-
- this.value = new Tombstone()
- if( seq != 0L ) {
-
- def merge(lv:QueueEntry, rv:QueueEntry):Unit = {
- if( lv==null || rv==null) {
- return
- }
-
- val lts = lv.value.asTombstone
- val rts = rv.value.asTombstone
-
- if( lts==null || rts==null ) {
- return
- }
-
- if( lv.seq + lts.count == rv.seq ) {
-
- lts.count += rts.count
- rts.count = 0
-
- if( rv.browsing!=Nil || rv.competing!=Nil ){
- lv.addBrowsing(rv.browsing)
- lv.addCompeting(rv.competing)
- rv.browsing = Nil
- rv.competing = Nil
- }
-
- rv.unlink
- }
- }
-
- // Merge adjacent tombstones
- merge(this, getNext)
- merge(getPrevious, this)
- }
+ def init(qer:QueueEntryRecord):QueueEntry = {
+ this.state = new Flushed(qer.messageKey, qer.size)
this
}
def hasSubs = !(competing == Nil && browsing == Nil)
+ /**
+ * Dispatches this entry to the consumers and continues dispatching subsequent
+ * entries if it has subscriptions which accept the dispatch.
+ */
def run() = {
var next = dispatch()
while( next!=null ) {
@@ -664,126 +573,151 @@ class QueueEntry(val queue:Queue, val se
entry
}
- def size = this.value.size
- def flush = this.value.flush
- def load = this.value.load
- def ref = this.value.ref
-
- def asTombstone = this.value.asTombstone
- def asFlushed = this.value.asFlushed
- def asLoaded = this.value.asLoaded
- def asTail = this.value.asTail
- def isFlushedOrFlushing = value.isFlushedOrFlushing
- def dispatch():QueueEntry = value.dispatch
+ def compareTo(o: QueueEntry) = {
+ (seq - o.seq).toInt
+ }
- trait EntryType {
- def size:Int
- def dispatch():QueueEntry
- def ref:Long
+ def toQueueEntryRecord = {
+ val qer = new QueueEntryRecord
+ qer.queueKey = queue.storeId
+ qer.queueSeq = seq
+ qer.messageKey = state.messageKey
+ qer.size = state.size
+ qer
+ }
+
+ override def toString = {
+ "{seq: "+seq+", prefetched: "+prefetched+", value: "+state+", competing: "+competing+", browsing: "+browsing+"}"
+ }
+
+ /////////////////////////////////////////////////////
+ //
+ // State delegates..
+ //
+ /////////////////////////////////////////////////////
+
+ // What state is it in?
+ def asTombstone = this.state.asTombstone
+ def asFlushed = this.state.asFlushed
+ def asLoaded = this.state.asLoaded
+ def asTail = this.state.asTail
+
+ // These should not change the current state.
+ def size = this.state.size
+ def messageKey = this.state.messageKey
+ def isFlushedOrFlushing = state.isFlushedOrFlushing
+ def dispatch():QueueEntry = state.dispatch
+
+ // These methods may cause a change in the current state.
+ def flush:QueueEntry = this.state.flush
+ def load:QueueEntry = this.state.load
+ def tombstone:QueueEntry = this.state.tombstone
+
+ trait EntryState {
+
+ final def entry:QueueEntry = QueueEntry.this
def asTail:Tail = null
def asLoaded:Loaded = null
def asFlushed:Flushed = null
def asTombstone:Tombstone = null
- def flush = {}
- def load = {}
+ def size:Int
+ def dispatch():QueueEntry
+ def messageKey:Long
def isFlushedOrFlushing = false
- }
- class Tail extends EntryType {
- override def asTail:Tail = this
- def size = 0
- def ref = -1
+ def load = entry
- def dispatch():QueueEntry = null
- }
+ def flush = entry
- class Tombstone extends EntryType {
+ def tombstone = {
+ queue.size -= size
- var count = 1L
+ // Update the prefetch counter to reflect that this entry is no longer being prefetched.
+ var cur = entry
+ while( cur!=null && prefetched > 0 ) {
+ if( cur.hasSubs ) {
+ (cur.browsing ::: cur.competing).foreach { sub => if( sub.prefetched(entry) ) { sub.removePrefetch(entry) } }
+ }
+ cur = cur.getPrevious
- def size = 0
- def ref = -1
+ // Sanity check.. we should always stop before we get to the last entry in the list.
+ assert( cur != null , "illegal prefetch state detected.")
+ }
- override def asTombstone = this
+ // if rv and lv are both adjacent tombstones, then this merges the rv
+ // tombstone into lv, unlinks rv, and returns lv, otherwise it returns
+ // rv.
+ def merge(lv:QueueEntry, rv:QueueEntry):QueueEntry = {
+ if( lv==null || rv==null) {
+ return rv
+ }
- def dispatch():QueueEntry = {
- val p = nextOrTail
- p.addBrowsing(browsing)
- p.addCompeting(competing)
- browsing = Nil
- competing = Nil
- p
- }
+ val lts = lv.state.asTombstone
+ val rts = rv.state.asTombstone
- override def toString = { "ts:{ count: "+count+"}" }
+ if( lts==null || rts==null ) {
+ return rv
+ }
- }
+ // Sanity check: the the entries are adjacent.. this should
+ // always be the case.
+ if( lv.seq + lts.count != rv.seq ) {
+ throw new AssertionError("entries are not adjacent.")
+ }
- class Flushed(val ref:Long, val size:Int) extends EntryType {
+ lts.count += rts.count
+ rts.count = 0
- var loading = false
+ if( rv.browsing!=Nil || rv.competing!=Nil ){
+ lv.addBrowsing(rv.browsing)
+ lv.addCompeting(rv.competing)
+ rv.browsing = Nil
+ rv.competing = Nil
+ }
+ rv.unlink
+ return lv
+ }
- override def asFlushed = this
+ state = new Tombstone()
+ merge(entry, getNext)
+ merge(getPrevious, entry)
+ }
- override def isFlushedOrFlushing = true
+ }
- override def toString = { "flushed:{ loading: "+loading+", size:"+size+"}" }
+ /**
+ * This state is used on the last entry of the queue. It still has not been initialized
+ * with a message, but it may be holding subscriptions. This state transitions to Loaded
+ * once a message is received.
+ */
+ class Tail extends EntryState {
- // Flushed entries can't be dispatched until
- // they get loaded.
- def dispatch():QueueEntry = {
- if( !loading ) {
- var remaining = queue.tune_subscription_buffer - size
- load
+ override def asTail:Tail = this
+ def size = 0
+ def messageKey = -1
+ def dispatch():QueueEntry = null
- // make sure the next few entries are loaded too..
- var cur = getNext
- while( remaining>0 && cur!=null ) {
- remaining -= cur.size
- val flushed = cur.asFlushed
- if( flushed!=null && !flushed.loading) {
- flushed.load
- }
- cur = getNext
- }
+ override def toString = { "tail" }
- }
- null
- }
+ override def load = throw new AssertionError("Tail entry cannot be loaded")
+ override def flush = throw new AssertionError("Tail entry cannot be flushed")
- override def load() = {
- if( !loading ) {
- // start loading it back...
- loading = true
- queue.loadingSize += size
- queue.host.store.loadMessage(ref) { delivery =>
- // pass off to a source so it can aggregate multiple
- // loads to reduce cross thread synchronization
- if( delivery.isDefined ) {
- queue.store_load_source.merge((QueueEntry.this, delivery.get))
- } else {
- // Looks like someone else removed the message from the store.. lets just
- // tombstone this entry now.
- queue.dispatchQueue {
- debug("Detected store drop of message key: %d", ref)
- tombstone
- }
- }
- }
- }
- }
}
- class Loaded(val delivery: Delivery) extends EntryType {
+ /**
+ * This state is used while a message is loaded in memory. A message must be in this state
+ * before it can be dispatched to a consumer. It can transition to Flushed or Tombstone.
+ */
+ class Loaded(val delivery: Delivery) extends EntryState {
var aquired = false
- def ref = delivery.storeKey
+ def messageKey = delivery.storeKey
def size = delivery.size
var flushing = false
-
+
override def toString = { "loaded:{ flushing: "+flushing+", aquired: "+aquired+", size:"+size+"}" }
override def isFlushedOrFlushing = {
@@ -796,27 +730,51 @@ class QueueEntry(val queue:Queue, val se
if( delivery.storeKey == -1 ) {
val tx = queue.host.store.createStoreBatch
delivery.storeKey = tx.store(delivery.createMessageRecord)
- tx.enqueue(createQueueEntryRecord)
+ tx.enqueue(toQueueEntryRecord)
tx.release
}
}
- override def flush():Unit = {
+ override def flush() = {
if( queue.host.store!=null && !flushing ) {
flushing=true
queue.flushingSize+=size
-
if( delivery.storeBatch!=null ) {
delivery.storeBatch.eagerFlush(^{
- queue.store_flush_source.merge(QueueEntry.this)
+ queue.store_flush_source.merge(this)
})
} else {
store
- queue.host.store.flushMessage(ref) {
- queue.store_flush_source.merge(QueueEntry.this)
+ queue.host.store.flushMessage(messageKey) {
+ queue.store_flush_source.merge(this)
}
}
}
+ entry
+ }
+
+ def flushed() = {
+ if( flushing ) {
+ queue.flushingSize-=size
+ queue.size -= size
+ state = new Flushed(delivery.storeKey, size)
+ }
+ }
+
+ override def load() = {
+ if( flushing ) {
+ flushing = false
+ queue.flushingSize-=size
+ }
+ entry
+ }
+
+ override def tombstone = {
+ if( flushing ) {
+ flushing = false
+ queue.flushingSize-=size
+ }
+ super.tombstone
}
def dispatch():QueueEntry = {
@@ -858,7 +816,7 @@ class QueueEntry(val queue:Queue, val se
competingSlowSubs = competingSlowSubs ::: sub :: Nil
if( !sub.full ) {
- val node = sub.add(QueueEntry.this)
+ val node = sub.add(entry)
val offering = delivery.copy
offering.ack = (tx)=> {
queue.ack_source.merge((node, tx))
@@ -909,6 +867,125 @@ class QueueEntry(val queue:Queue, val se
}
}
+
+ /**
+ * Entries which have been deleted get put into the Tombstone state. Adjacent entries in the
+ * Tombstone state get merged into a single entry.
+ */
+ class Tombstone extends EntryState {
+
+ /** The number of adjacent entries this Tombstone represents. */
+ var count = 1L
+
+ def size = 0
+ def messageKey = -1
+
+ override def asTombstone = this
+
+ /**
+ * Nothing ot dispatch in a Tombstone, move the subscriptions to the next entry.
+ */
+ def dispatch():QueueEntry = {
+ val p = nextOrTail
+ p.addBrowsing(browsing)
+ p.addCompeting(competing)
+ browsing = Nil
+ competing = Nil
+ p
+ }
+
+ override def tombstone = throw new AssertionError("Tombstone entry cannot be tombstoned")
+ override def toString = { "tombstone:{ count: "+count+"}" }
+
+ }
+
+ /**
+ * Entries in the Flushed state are not holding the referenced messages in memory anymore.
+ * This state can transition to Loaded or Tombstone.
+ *
+ * TODO: Add a new FlushedList state which can be used to merge multiple
+ * adjacent Flushed entries into a single FlushedList state. This would allow us
+ * to support queues with millions of flushed entries without much memory impact.
+ */
+ class Flushed(val messageKey:Long, val size:Int) extends EntryState {
+
+ var loading = false
+
+ override def asFlushed = this
+
+ override def isFlushedOrFlushing = true
+
+ override def toString = { "flushed:{ loading: "+loading+", size:"+size+"}" }
+
+ // Flushed entries can't be dispatched until
+ // they get loaded.
+ def dispatch():QueueEntry = {
+ if( !loading ) {
+ var remaining = queue.tune_subscription_buffer - size
+ load
+
+ // make sure the next few entries are loaded too..
+ var cur = getNext
+ while( remaining>0 && cur!=null ) {
+ remaining -= cur.size
+ val flushed = cur.asFlushed
+ if( flushed!=null && !flushed.loading) {
+ flushed.load
+ }
+ cur = getNext
+ }
+
+ }
+ null
+ }
+
+ override def load() = {
+ if( !loading ) {
+ // start loading it back...
+ loading = true
+ queue.loadingSize += size
+ queue.host.store.loadMessage(messageKey) { delivery =>
+ // pass off to a source so it can aggregate multiple
+ // loads to reduce cross thread synchronization
+ if( delivery.isDefined ) {
+ queue.store_load_source.merge((this, delivery.get))
+ } else {
+ // Looks like someone else removed the message from the store.. lets just
+ // tombstone this entry now.
+ queue.dispatchQueue {
+ debug("Detected store drop of message key: %d", messageKey)
+ tombstone
+ }
+ }
+ }
+ }
+ entry
+ }
+
+ def loaded(messageRecord:MessageRecord) = {
+ if( loading ) {
+ loading = false
+ queue.loadingSize -= size
+
+ val delivery = new Delivery()
+ delivery.message = ProtocolFactory.get(messageRecord.protocol).decode(messageRecord.value)
+ delivery.size = messageRecord.size
+ delivery.storeKey = messageRecord.key
+
+ queue.size += size
+ state = new Loaded(delivery)
+ }
+ }
+
+
+ override def tombstone = {
+ if( loading ) {
+ loading = false
+ queue.loadingSize -= size
+ }
+ super.tombstone
+ }
+ }
}
@@ -971,7 +1048,6 @@ class Subscription(queue:Queue) extends
def removePrefetch(value:QueueEntry):Unit = {
value.prefetched -= 1
prefetchSize -= value.size
-
fillPrefetch()
}