You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 06:10:41 UTC
svn commit: r961145 - in
/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker:
Queue.scala VirtualHost.scala
Author: chirino
Date: Wed Jul 7 04:10:40 2010
New Revision: 961145
URL: http://svn.apache.org/viewvc?rev=961145&view=rev
Log:
making queue consumers more decoupled.
Modified:
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=961145&r1=961144&r2=961145&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Wed Jul 7 04:10:40 2010
@@ -63,6 +63,7 @@ class Queue(val host: VirtualHost, val d
import Queue._
var consumerSubs = Map[DeliveryConsumer, Subscription]()
+ var fastSubs = List[Subscription]()
override val dispatchQueue: DispatchQueue = createQueue(destination.toString);
dispatchQueue.setTargetQueue(getRandomThreadQueue)
@@ -93,8 +94,8 @@ class Queue(val host: VirtualHost, val d
// sequence numbers.. used to track what's in the store.
var message_seq_counter = 1L
- val headEntry = new QueueEntry(this).tombstone
- var tailEntry = new QueueEntry(this)
+ val headEntry = new QueueEntry(this, 0L).tombstone
+ var tailEntry = new QueueEntry(this, next_message_seq)
var counter = 0
val entries = new LinkedNodeList[QueueEntry]()
@@ -149,7 +150,7 @@ class Queue(val host: VirtualHost, val d
this.storeId = storeId
if( !records.isEmpty ) {
records.foreach { qer =>
- val entry = new QueueEntry(Queue.this).flushed(qer)
+ val entry = new QueueEntry(Queue.this,qer.queueSeq).flushed(qer)
entries.addLast(entry)
}
@@ -180,9 +181,9 @@ class Queue(val host: VirtualHost, val d
} else {
val entry = tailEntry
- tailEntry = new QueueEntry(Queue.this)
+ tailEntry = new QueueEntry(Queue.this, next_message_seq)
val queueDelivery = delivery.copy
- entry.created(next_message_seq, queueDelivery)
+ entry.created(queueDelivery)
queueDelivery.storeBatch = delivery.storeBatch
size += entry.size
@@ -196,42 +197,26 @@ class Queue(val host: VirtualHost, val d
queueDelivery.storeBatch.enqueue(entry.createQueueEntryRecord)
}
- // do we have at least 1 subscription that is keeping up with the producers
- // and is interested in this message?
-// val hold = consumerSubs.valuesIterator.find( sub=> !sub.slow && sub.matches(delivery) ).isDefined
- def haveQuickConsumer = consumerSubs.valuesIterator.find( sub=> !sub.slow ).isDefined
+// var haveQuickConsumer = false
+// fastSubs.foreach{ sub=>
+// if( sub.pos.seq < entry.seq ) {
+// haveQuickConsumer = true
+// }
+// }
+
+ def haveQuickConsumer = fastSubs.find( sub=> !sub.slow && sub.pos.seq <= entry.seq ).isDefined
var dispatched = false
if( entry.prefetched > 0 || haveQuickConsumer ) {
// try to dispatch it directly...
-// println("hold: "+delivery.message.getProperty("color"))
entry.dispatch
-
} else {
// println("flush: "+delivery.message.getProperty("color"))
// we flush the entry out right away if it looks
// it wont be needed.
entry.flush
- if( full ) {
- println("full... waiting for flushes");
- }
-
- // just make it hit the disk quick.. but keep it in memory.
- // delivery.storeBatch.eagerFlush(^{})
}
-// // Does it look like we need to start swapping to make room
-// // for more messages?
-// if( !dispatched && host.store!=null && full ) {
-// val wasAt = dequeue_size
-// dispatchQueue.dispatchAfter(tune_slow_check_interval, TimeUnit.MILLISECONDS, ^{
-// // start swapping if was still blocked after a short delay
-// if( dequeue_size == wasAt && full ) {
-// swap
-// }
-// })
-// }
-
// release the store batch...
if (queueDelivery.storeBatch != null) {
queueDelivery.storeBatch.release
@@ -253,6 +238,21 @@ class Queue(val host: VirtualHost, val d
var idleConsumerCount = 0
+
+// if( consumerSubs.isEmpty ) {
+// println("using "+size+" out of "+capacity+" buffer space.");
+// var cur = entries.getHead
+// while( cur!=null ) {
+// if( cur.asLoaded!=null ) {
+// println(" => "+cur)
+// }
+// cur = cur.getNext
+// }
+// println("tail: "+tailEntry)
+// }
+
+ fastSubs = Nil
+
consumerSubs.foreach{ case (consumer, sub)=>
// Skip over new consumers...
@@ -285,6 +285,10 @@ class Queue(val host: VirtualHost, val d
sub.tailParkings = 0
}
+
+ if( !sub.slow ) {
+ fastSubs ::= sub
+ }
}
// Trigger a swap if we have slow consumers and we are full..
@@ -399,6 +403,7 @@ class Queue(val host: VirtualHost, val d
val subscription = new Subscription(this)
subscription.connect(consumer)
consumerSubs += consumer -> subscription
+ fastSubs ::= subscription
addCapacity( tune_subscription_buffer )
}
} >>: dispatchQueue
@@ -409,9 +414,11 @@ class Queue(val host: VirtualHost, val d
case Some(cs) =>
cs.close
consumerSubs -= consumer
+ fastSubs = fastSubs.filterNot(_ eq cs)
addCapacity( -tune_subscription_buffer )
case None =>
}
+
}
} >>: dispatchQueue
@@ -441,16 +448,19 @@ class Queue(val host: VirtualHost, val d
}
debug("swapping...")
+
var entry = entries.getHead
while( entry!=null ) {
- println(entries)
if( entry.asTombstone == null ) {
- // only keep prefetch entries around..
- if( entry.prefetched == 0 ) {
- entry.flush
- } else {
+ val loaded = entry.asLoaded
+
+ // Keep around prefetched and loaded entries.
+ if( entry.prefetched < 0 || (loaded!=null && loaded.aquired)) {
entry.load
+ } else {
+ // flush the the others out of memory.
+ entry.flush
}
}
@@ -494,7 +504,6 @@ class Queue(val host: VirtualHost, val d
entry.flushed
}
}
- println("flushes done... full: "+messages.full);
messages.refiller.run
}
@@ -506,15 +515,14 @@ object QueueEntry extends Sizer[QueueEnt
def size(value: QueueEntry): Int = value.size
}
-class QueueEntry(val queue:Queue) extends LinkedNode[QueueEntry] with Comparable[QueueEntry] with Runnable {
+class QueueEntry(val queue:Queue, val seq:Long) extends LinkedNode[QueueEntry] with Comparable[QueueEntry] with Runnable {
import QueueEntry._
- var seq: Long = 0L
var competing:List[Subscription] = Nil
var browsing:List[Subscription] = Nil
var prefetched = 0
- var value:EntryType = null
+ var value:EntryType = new Tail
override def toString = {
"{seq: "+seq+", prefetched: "+prefetched+", value: "+value+", competing: "+competing+", browsing: "+browsing+"}"
@@ -533,11 +541,9 @@ class QueueEntry(val queue:Queue) extend
(seq - o.seq).toInt
}
- def created(seq:Long, delivery:Delivery) = {
- this.seq = seq
- this.value = new Loaded(delivery)
- (browsing:::competing).foreach { sub => sub.addPrefetch(this) }
+ def created(delivery:Delivery) = {
+ this.value = new Loaded(delivery)
this
}
@@ -553,7 +559,6 @@ class QueueEntry(val queue:Queue) extend
}
def flushed(qer:QueueEntryRecord) = {
- this.seq = qer.queueSeq
this.value = new Flushed(qer.messageKey, qer.size)
this
}
@@ -569,23 +574,23 @@ class QueueEntry(val queue:Queue) extend
cur = cur.getPrevious
}
-
this.value = new Tombstone()
if( seq != 0L ) {
- def merge(lv:QueueEntry, rv:QueueEntry):Boolean = {
+ def merge(lv:QueueEntry, rv:QueueEntry):Unit = {
if( lv==null || rv==null) {
- return false
+ return
}
val lts = lv.value.asTombstone
val rts = rv.value.asTombstone
if( lts==null || rts==null ) {
- return false
+ return
}
if( lv.seq + lts.count == rv.seq ) {
+
lts.count += rts.count
rts.count = 0
@@ -596,19 +601,13 @@ class QueueEntry(val queue:Queue) extend
rv.competing = Nil
}
- return true
- } else {
- return false
+ rv.unlink
}
}
// Merge adjacent tombstones
- if( merge(this, getNext) ) {
- getNext.unlink
- }
- if( merge(getPrevious, this) ) {
- this.unlink
- }
+ merge(this, getNext)
+ merge(getPrevious, this)
}
this
}
@@ -642,7 +641,7 @@ class QueueEntry(val queue:Queue) extend
competing = competing.filterNot(_ == s)
}
- def nextEntry():QueueEntry = {
+ def nextOrTail():QueueEntry = {
var entry = getNext
if (entry == null) {
entry = queue.tailEntry
@@ -658,32 +657,34 @@ class QueueEntry(val queue:Queue) extend
def asTombstone = this.value.asTombstone
def asFlushed = this.value.asFlushed
def asLoaded = this.value.asLoaded
+ def asTail = this.value.asTail
def isFlushedOrFlushing = value.isFlushedOrFlushing
- def dispatch():QueueEntry = {
- if( value == null ) {
- // tail entry can't be dispatched.
- null
- } else {
- value.dispatch
- }
- }
-
+ def dispatch():QueueEntry = value.dispatch
trait EntryType {
def size:Int
def dispatch():QueueEntry
def ref:Long
- def asTombstone:Tombstone = null
- def asFlushed:Flushed = null
+ def asTail:Tail = null
def asLoaded:Loaded = null
+ def asFlushed:Flushed = null
+ def asTombstone:Tombstone = null
def flush = {}
def load = {}
def isFlushedOrFlushing = false
}
+ class Tail extends EntryType {
+ override def asTail:Tail = this
+ def size = 0
+ def ref = -1
+
+ def dispatch():QueueEntry = null
+ }
+
class Tombstone extends EntryType {
var count = 1L
@@ -694,7 +695,7 @@ class QueueEntry(val queue:Queue) extend
override def asTombstone = this
def dispatch():QueueEntry = {
- val p = nextEntry
+ val p = nextOrTail
p.addBrowsing(browsing)
p.addCompeting(competing)
browsing = Nil
@@ -779,7 +780,7 @@ class QueueEntry(val queue:Queue) extend
}
override def flush():Unit = {
- if( !flushing ) {
+ if( queue.host.store!=null && !flushing ) {
flushing=true
queue.flushingSize+=size
@@ -867,13 +868,13 @@ class QueueEntry(val queue:Queue) extend
// the fast subs move on to the next entry...
if ( browsingFastSubs!=null && competingFastSubs!=null) {
- val p = nextEntry
+ val p = nextOrTail
p.addBrowsing(browsingFastSubs)
p.addCompeting(competingFastSubs)
-
- // if we are no longer needed and we are under pressure to make room and the previous was flushed....
- if( !hasSubs && prefetched==0 && !aquired && queue.messages.full && getPrevious.isFlushedOrFlushing ) {
+ // flush this entry out if it's not going to be needed soon.
+ def haveQuickConsumer = queue.fastSubs.find( sub=> !sub.slow && sub.pos.seq <= seq ).isDefined
+ if( !hasSubs && prefetched==0 && !aquired && !haveQuickConsumer ) {
// then flush out to make space...
flush
}
@@ -891,7 +892,8 @@ class QueueEntry(val queue:Queue) extend
class LinkedQueueEntry(val value:QueueEntry) extends LinkedNode[LinkedQueueEntry]
-class Subscription(queue:Queue) extends DeliveryProducer {
+class Subscription(queue:Queue) extends DeliveryProducer with DispatchLogging {
+ override protected def log = Queue
def dispatchQueue = queue.dispatchQueue
@@ -908,36 +910,28 @@ class Subscription(queue:Queue) extends
def slow = slowIntervals > queue.tune_max_slow_intervals
- var lastPrefetchPos:QueueEntry = null
+ var nextPrefetchPos:QueueEntry = null
var prefetchSize = 0
- override def toString = "{ prefetchSize: "+prefetchSize+", pos: "+(if(pos==null) null else pos.seq)+" lastPrefetchPos: "+(if(lastPrefetchPos==null) null else lastPrefetchPos.seq)+" }"
+ override def toString = "{ prefetchSize: "+prefetchSize+", pos: "+(if(pos==null) null else pos.seq)+" nextPrefetchPos: "+(if(nextPrefetchPos==null) null else nextPrefetchPos.seq)+" }"
def position(value:QueueEntry):Unit = {
if( value!=null ) {
// setting a new position..
- if( pos!=null && pos.value!=null ) {
+ if( pos!=null ) {
// Remove the previous pos from the prefetch counters.
removePrefetch(pos)
cursoredCounter += pos.size
}
} else {
// setting null pos, happens when the sub is closed.
- if( lastPrefetchPos!=null ) {
- var cur = pos
+ var cur = pos
- // clean up it's prefetch counters on the entries..
- while( cur!=null && cur.value!=null ) {
- cur.prefetched -= 1
- cur = if( cur == lastPrefetchPos ) {
- null
- } else {
- cur.nextEntry
- }
- }
- lastPrefetchPos = null
- prefetchSize=0
+ // clean up it's prefetch counters on the entries..
+ while( cur!=nextPrefetchPos ) {
+ cur.prefetched -= 1
+ cur = cur.nextOrTail
}
}
pos = value
@@ -949,11 +943,11 @@ class Subscription(queue:Queue) extends
def prefetched(value:QueueEntry) = {
- pos.seq <= value.seq && value.seq <= lastPrefetchPos.seq
+ pos.seq <= value.seq && value.seq < nextPrefetchPos.seq
}
def removePrefetch(value:QueueEntry):Unit = {
-// println("prefetch rm: "+value.seq)
+// trace("prefetch rm: "+value.seq)
value.prefetched -= 1
prefetchSize -= value.size
fillPrefetch()
@@ -961,27 +955,25 @@ class Subscription(queue:Queue) extends
def fillPrefetch() = {
// attempts to fill the prefetch...
- var next = lastPrefetchPos.getNext
- while(prefetchSize < queue.tune_subscription_buffer && next!=null && next.value!=null ) {
- next.load
- addPrefetch(next)
- next = next.getNext
+ while(prefetchSize < queue.tune_subscription_buffer && nextPrefetchPos.asTail==null ) {
+ addPrefetch(nextPrefetchPos)
}
}
def addPrefetch(value:QueueEntry):Unit = {
-// println("prefetch add: "+value.seq)
+// trace("prefetch add: "+value.seq)
prefetchSize += value.size
- lastPrefetchPos = value
value.prefetched += 1
+ value.load
+ nextPrefetchPos = value.nextOrTail
}
def tailParked = pos eq queue.tailEntry
def connect(consumer: DeliveryConsumer) = {
session = consumer.connect(this)
- queue.headEntry.addCompeting(this :: Nil)
addPrefetch(queue.headEntry)
+ queue.headEntry.addCompeting(this :: Nil)
queue.dispatchQueue << queue.headEntry
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=961145&r1=961144&r2=961145&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala Wed Jul 7 04:10:40 2010
@@ -126,9 +126,8 @@ class VirtualHost(val broker: Broker) ex
override protected def _start(onCompleted:Runnable):Unit = {
val tracker = new LoggingTracker("virtual host startup", dispatchQueue)
store = StoreFactory.create(config.store)
-
- store.configure(config.store, this)
if( store!=null ) {
+ store.configure(config.store, this)
val task = tracker.task("store startup")
store.start(^{
if( config.purgeOnStartup ) {