You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 06:11:50 UTC
svn commit: r961156 - in /activemq/sandbox/activemq-apollo-actor:
activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/
activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/
Author: chirino
Date: Wed Jul 7 04:11:49 2010
New Revision: 961156
URL: http://svn.apache.org/viewvc?rev=961156&view=rev
Log:
The Queue now has much more robust prefetch tracking. This has made consumers/producers much more decoupled. i.e. persisetnce events on one should not aftect the other as much.
the hawtdb store loads messages in batches now.
Modified:
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala
activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=961156&r1=961155&r2=961156&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala Wed Jul 7 04:11:49 2010
@@ -48,6 +48,7 @@ trait DeliveryProducer {
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
trait DeliveryConsumer extends Retained {
+ def browser = false
def dispatchQueue:DispatchQueue;
def matches(message:Delivery):Boolean
def connect(producer:DeliveryProducer):DeliverySession
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=961156&r1=961155&r2=961156&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Wed Jul 7 04:11:49 2010
@@ -21,12 +21,12 @@ import org.apache.activemq.util.TreeMap
import collection.{SortedMap}
import org.fusesource.hawtdispatch.{ScalaDispatch, DispatchQueue, BaseRetained}
import org.apache.activemq.util.TreeMap.TreeEntry
-import java.util.{Collections, ArrayList, LinkedList}
import org.apache.activemq.util.list.{LinkedNodeList, LinkedNode}
import org.apache.activemq.broker.store.{StoreBatch}
import protocol.ProtocolFactory
import org.apache.activemq.apollo.store.{QueueEntryRecord, MessageRecord}
import java.util.concurrent.TimeUnit
+import java.util.{HashSet, Collections, ArrayList, LinkedList}
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -62,8 +62,8 @@ class Queue(val host: VirtualHost, val d
import Queue._
- var consumerSubs = Map[DeliveryConsumer, Subscription]()
- var fastSubs = List[Subscription]()
+ var all_subscriptions = Map[DeliveryConsumer, Subscription]()
+ var fast_subscriptions = List[Subscription]()
override val dispatchQueue: DispatchQueue = createQueue(destination.toString);
dispatchQueue.setTargetQueue(getRandomThreadQueue)
@@ -77,7 +77,7 @@ class Queue(val host: VirtualHost, val d
})
- val ack_source = createSource(new ListEventAggregator[(LinkedQueueEntry, StoreBatch)](), dispatchQueue)
+ val ack_source = createSource(new ListEventAggregator[(Subscription#AcquiredQueueEntry, StoreBatch)](), dispatchQueue)
ack_source.setEventHandler(^ {drain_acks});
ack_source.resume
@@ -86,18 +86,16 @@ class Queue(val host: VirtualHost, val d
// sequence numbers.. used to track what's in the store.
var message_seq_counter = 1L
- var counter = 0
-
val entries = new LinkedNodeList[QueueEntry]()
- val headEntry = new QueueEntry(this, 0L);
- var tailEntry = new QueueEntry(this, next_message_seq)
+ val head_entry = new QueueEntry(this, 0L);
+ var tail_entry = new QueueEntry(this, next_message_seq)
- entries.addFirst(headEntry)
- headEntry.tombstone
+ entries.addFirst(head_entry)
+ head_entry.tombstone
- var loadingSize = 0
- var flushingSize = 0
- var storeId: Long = -1L
+ var loading_size = 0
+ var flushing_size = 0
+ var store_id: Long = -1L
//
// Tuning options.
@@ -106,12 +104,12 @@ class Queue(val host: VirtualHost, val d
/**
* The amount of memory buffer space for receiving messages.
*/
- var tune_inbound_buffer = 1024 * 32
+ var tune_producer_buffer = 1024*32
/**
* The amount of memory buffer space to use per subscription.
*/
- var tune_subscription_buffer = 1024*32
+ var tune_consumer_buffer = 1024*64
/**
* Subscribers that consume slower than this rate per seconds will be considered
@@ -122,7 +120,18 @@ class Queue(val host: VirtualHost, val d
/**
* The number of milliseconds between slow consumer checks.
*/
- var tune_slow_check_interval = 100L
+ var tune_slow_check_interval = 200L
+
+ /**
+ * Should messages be flushed or swapped out of memory if
+ * no consumers need the message?
+ */
+ def tune_flush_to_store = tune_persistent
+
+ /**
+ * Should this queue persistently store it's entries?
+ */
+ def tune_persistent = host.store !=null
/**
* The number of intervals that a consumer must not meeting the subscription rate before it is
@@ -130,28 +139,35 @@ class Queue(val host: VirtualHost, val d
*/
var tune_max_slow_intervals = 10
- var enqueue_counter = 0L
- var dequeue_counter = 0L
- var enqueue_size = 0L
- var dequeue_size = 0L
+ var enqueue_item_counter = 0L
+ var dequeue_item_counter = 0L
+ var enqueue_size_counter = 0L
+ var dequeue_size_counter = 0L
+ var nack_item_counter = 0L
+ var nack_size_counter = 0L
+
+ def queue_size = enqueue_size_counter - dequeue_size_counter
+ def queue_items = enqueue_item_counter - dequeue_item_counter
- private var capacity = tune_inbound_buffer
+ private var capacity = tune_producer_buffer
var size = 0
- schedualSlowConsumerCheck
+ schedual_slow_consumer_check
def restore(storeId: Long, records:Seq[QueueEntryRecord]) = ^{
- this.storeId = storeId
+ this.store_id = storeId
if( !records.isEmpty ) {
+
+ // adjust the head tombstone.
+ head_entry.as_tombstone.count = records.head.queueSeq
+
records.foreach { qer =>
val entry = new QueueEntry(Queue.this,qer.queueSeq).init(qer)
entries.addLast(entry)
}
message_seq_counter = records.last.queueSeq+1
-
- counter = records.size
- enqueue_counter += records.size
+ enqueue_item_counter += records.size
debug("restored: "+records.size )
}
} >>: dispatchQueue
@@ -174,16 +190,18 @@ class Queue(val host: VirtualHost, val d
false
} else {
- val entry = tailEntry
- tailEntry = new QueueEntry(Queue.this, next_message_seq)
+ val entry = tail_entry
+ tail_entry = new QueueEntry(Queue.this, next_message_seq)
val queueDelivery = delivery.copy
entry.init(queueDelivery)
- queueDelivery.storeBatch = delivery.storeBatch
+
+ if( tune_persistent ) {
+ queueDelivery.storeBatch = delivery.storeBatch
+ }
entries.addLast(entry)
- counter += 1;
- enqueue_counter += 1
- enqueue_size += entry.size
+ enqueue_item_counter += 1
+ enqueue_size_counter += entry.size
// Do we need to do a persistent enqueue???
if (queueDelivery.storeBatch != null) {
@@ -191,7 +209,7 @@ class Queue(val host: VirtualHost, val d
}
- def haveQuickConsumer = fastSubs.find( sub=> sub.pos.seq <= entry.seq ).isDefined
+ def haveQuickConsumer = fast_subscriptions.find( sub=> sub.pos.seq <= entry.seq ).isDefined
var dispatched = false
if( entry.hasSubs || haveQuickConsumer ) {
@@ -215,74 +233,117 @@ class Queue(val host: VirtualHost, val d
}
- var checkCounter = 0
- def schedualSlowConsumerCheck:Unit = {
+ var check_counter = 0
+ def display_stats: Unit = {
+ info("contains: %d messages worth %,.2f MB of data, producers are %s, %d/%d buffer space used.", queue_items, (queue_size.toFloat / (1024 * 1024)), {if (messages.full) "being throttled" else "not being throttled"}, size, capacity)
+ info("total messages enqueued %d, dequeues %d ", enqueue_item_counter, dequeue_item_counter)
+ }
+
+ def display_active_entries: Unit = {
+ var cur = entries.getHead
+ var total_items = 0L
+ var total_size = 0L
+ while (cur != null) {
+ if (cur.is_loaded || cur.hasSubs || cur.is_prefetched) {
+ info(" => " + cur)
+ }
+ if (cur.is_flushed || cur.is_loaded) {
+ total_items += 1
+ total_size += cur.size
+ }
+ cur = cur.getNext
+ }
+ info("tail: " + tail_entry)
+
+ // sanitiy checks..
+ assert(total_items == queue_items)
+ assert(total_size == queue_size)
+ }
+
+ def schedual_slow_consumer_check:Unit = {
def slowConsumerCheck = {
if( retained > 0 ) {
// Handy for periodically looking at the dispatch state...
-// checkCounter += 1
-// if( !consumerSubs.isEmpty && (checkCounter%100)==0 ) {
-// println("using "+size+" out of "+capacity+" buffer space.");
-// var cur = entries.getHead
-// while( cur!=null ) {
-// if( cur.asLoaded!=null || cur.hasSubs || cur.prefetched>0 ) {
-// println(" => "+cur)
-// }
-// cur = cur.getNext
+ check_counter += 1
+
+ if( (check_counter%10)==0 ) {
+ display_stats
+ }
+
+// if( (check_counter%100)==0 ) {
+// if (!all_subscriptions.isEmpty) {
+// display_active_entries
// }
-// println("tail: "+tailEntry)
// }
// target tune_min_subscription_rate / sec
- val slowCursorDelta = (((tune_slow_subscription_rate) * tune_slow_check_interval) / 1000).toInt
+ val slow_cursor_delta = (((tune_slow_subscription_rate) * tune_slow_check_interval) / 1000).toInt
var idleConsumerCount = 0
- fastSubs = Nil
- consumerSubs.foreach{ case (consumer, sub)=>
+
+ var startedWithFastSubs = !fast_subscriptions.isEmpty
+ fast_subscriptions = Nil
+
+ all_subscriptions.foreach{ case (consumer, sub)=>
// Skip over new consumers...
- if( sub.cursoredCounter != 0 ) {
+ if( sub.advanced_size != 0 ) {
- val cursorDelta = sub.cursoredCounter - sub.prevCursoredCounter
- sub.prevCursoredCounter = sub.cursoredCounter
+ val cursor_delta = sub.advanced_size - sub.last_cursored_size
+ sub.last_cursored_size = sub.advanced_size
// If the subscription is NOT slow if it's been tail parked or
// it's been parking and cursoring through the data at the tune_slow_subscription_rate
- if( (sub.tailParked && sub.tailParkings==0) || ( sub.tailParkings > 0 && cursorDelta >= slowCursorDelta ) ) {
+ if( (sub.tail_parked && sub.tail_parkings==0) || ( sub.tail_parkings > 0 && cursor_delta >= slow_cursor_delta ) ) {
if( sub.slow ) {
- debug("consumer is no longer slow: %s", consumer)
- sub.slowIntervals = 0
+ info("subscription is now fast: %s", sub)
+ sub.slow_intervals = 0
}
} else {
if( !sub.slow ) {
-// debug("slow interval: %d, %d, %d", sub.slowIntervals, sub.tailParkings, cursorDelta)
- sub.slowIntervals += 1
+ info("slow interval: %d, %d, %d", sub.slow_intervals, sub.tail_parkings, cursor_delta)
+ sub.slow_intervals += 1
if( sub.slow ) {
- debug("consumer is slow: %s", consumer)
+ info("subscription is now slow: %s", sub)
}
}
}
// has the consumer been stuck at the tail?
- if( sub.tailParked && sub.tailParkings==0 ) {
+ if( sub.tail_parked && sub.tail_parkings==0 ) {
idleConsumerCount += 1;
}
- sub.tailParkings = 0
+ sub.tail_parkings = 0
}
if( !sub.slow ) {
- fastSubs ::= sub
+ fast_subscriptions ::= sub
+ }
+ }
+
+
+ // If we no longer have fast subs...
+ if( startedWithFastSubs && fast_subscriptions.isEmpty ) {
+ // Flush out the tail entries..
+ var cur = entries.getTail
+ while( cur!=null ) {
+ if( !cur.hasSubs && !cur.is_prefetched ) {
+ cur
+ }
+ cur = cur.getPrevious
}
+
+
}
// flush tail entries that are still loaded but which have no fast subs that can process them.
var cur = entries.getTail
while( cur!=null ) {
- def haveQuickConsumer = fastSubs.find( sub=> sub.pos.seq <= cur.seq ).isDefined
- if( !cur.hasSubs && cur.prefetched==0 && cur.asFlushed==null && !haveQuickConsumer ) {
+ def haveQuickConsumer = fast_subscriptions.find( sub=> sub.pos.seq <= cur.seq ).isDefined
+ if( cur.is_loaded && !cur.hasSubs && !cur.is_prefetched && !cur.as_loaded.acquired && !haveQuickConsumer ) {
// then flush out to make space...
cur.flush
cur = cur.getPrevious
@@ -293,10 +354,10 @@ class Queue(val host: VirtualHost, val d
// Trigger a swap if we have consumers waiting for messages and we are full..
- if( idleConsumerCount > 0 && messages.full && flushingSize==0 ) {
+ if( idleConsumerCount > 0 && messages.full && flushing_size==0 ) {
swap
}
- schedualSlowConsumerCheck
+ schedual_slow_consumer_check
}
}
@@ -305,39 +366,11 @@ class Queue(val host: VirtualHost, val d
})
}
- def ack(entry: QueueEntry, sb:StoreBatch) = {
- if (entry.messageKey != -1) {
- val storeBatch = if( sb == null ) {
- host.store.createStoreBatch
- } else {
- sb
- }
- storeBatch.dequeue(entry.toQueueEntryRecord)
- if( sb == null ) {
- storeBatch.release
- }
- }
- if( sb != null ) {
- sb.release
- }
-
- dequeue_counter += 1
- counter -= 1
- dequeue_size += entry.size
- entry.tombstone
- }
-
-
- def nack(values: LinkedNodeList[LinkedQueueEntry]) = {
- // TODO:
- }
-
def drain_acks = {
ack_source.getData.foreach {
case (entry, tx) =>
- entry.unlink
- ack(entry.value, tx)
+ entry.ack(tx)
}
messages.refiller.run
}
@@ -354,6 +387,10 @@ class Queue(val host: VirtualHost, val d
def connect(p: DeliveryProducer) = new DeliverySession {
retain
+ dispatchQueue {
+ addCapacity( tune_producer_buffer )
+ }
+
override def consumer = Queue.this
override def producer = p
@@ -362,6 +399,9 @@ class Queue(val host: VirtualHost, val d
def close = {
session_manager.close(session)
+ dispatchQueue {
+ addCapacity( -tune_producer_buffer )
+ }
release
}
@@ -373,7 +413,7 @@ class Queue(val host: VirtualHost, val d
false
} else {
- if( delivery.storeBatch!=null ) {
+ if( tune_persistent && delivery.storeBatch!=null ) {
delivery.storeBatch.retain
}
val rc = session.offer(delivery)
@@ -394,26 +434,26 @@ class Queue(val host: VirtualHost, val d
//
/////////////////////////////////////////////////////////////////////
- def connected(consumers: List[DeliveryConsumer]) = bind(consumers)
+ def connected(values: List[DeliveryConsumer]) = bind(values)
- def bind(consumers: List[DeliveryConsumer]) = retaining(consumers) {
- for (consumer <- consumers) {
+ def bind(values: List[DeliveryConsumer]) = retaining(values) {
+ for (consumer <- values) {
val subscription = new Subscription(this)
- subscription.connect(consumer)
- consumerSubs += consumer -> subscription
- fastSubs ::= subscription
- addCapacity( tune_subscription_buffer )
+ subscription.open(consumer)
+ all_subscriptions += consumer -> subscription
+ fast_subscriptions ::= subscription
+ addCapacity( tune_consumer_buffer )
}
} >>: dispatchQueue
- def unbind(consumers: List[DeliveryConsumer]) = releasing(consumers) {
- for (consumer <- consumers) {
- consumerSubs.get(consumer) match {
- case Some(cs) =>
- cs.close
- consumerSubs -= consumer
- fastSubs = fastSubs.filterNot(_ eq cs)
- addCapacity( -tune_subscription_buffer )
+ def unbind(values: List[DeliveryConsumer]) = releasing(values) {
+ for (consumer <- values) {
+ all_subscriptions.get(consumer) match {
+ case Some(subscription) =>
+ all_subscriptions -= consumer
+ fast_subscriptions = fast_subscriptions.filterNot(_ eq subscription)
+ subscription.close
+ addCapacity( -tune_consumer_buffer )
case None =>
}
@@ -449,12 +489,12 @@ class Queue(val host: VirtualHost, val d
var entry = entries.getHead
while( entry!=null ) {
- if( entry.asTombstone == null ) {
+ if( entry.as_tombstone == null ) {
- val loaded = entry.asLoaded
+ val loaded = entry.as_loaded
// Keep around prefetched and loaded entries.
- if( entry.prefetched < 0 || (loaded!=null && loaded.aquired)) {
+ if( entry.is_prefetched || (loaded!=null && loaded.acquired)) {
entry.load
} else {
// flush the the others out of memory.
@@ -508,18 +548,16 @@ class QueueEntry(val queue:Queue, val se
override protected def log = Queue
import QueueEntry._
- // Competing subscriptions try to exclusivly aquire the entry.
- var competing:List[Subscription] = Nil
- // These are subscriptions which will not be exclusivly aquiring the entry.
- var browsing:List[Subscription] = Nil
- // The number of subscriptions which have requested this entry to be prefetech (held in memory) so that it's
+ // Subscriptions waiting to dispatch this entry.
+ var subscriptions:List[Subscription] = Nil
+ // The number of subscriptions which have requested this entry to be prefeteched (held in memory) so that it's
// ready for them to get dispatched.
var prefetched = 0
// The current state of the entry: Tail | Loaded | Flushed | Tombstone
var state:EntryState = new Tail
-
+ def is_prefetched = prefetched>0
def init(delivery:Delivery):QueueEntry = {
this.state = new Loaded(delivery)
@@ -532,7 +570,7 @@ class QueueEntry(val queue:Queue, val se
this
}
- def hasSubs = !(competing == Nil && browsing == Nil)
+ def hasSubs = !(subscriptions == Nil )
/**
* Dispatches this entry to the consumers and continues dispatching subsequent
@@ -545,30 +583,19 @@ class QueueEntry(val queue:Queue, val se
}
}
- def addBrowsing(l:List[Subscription]) = {
- l.foreach(x=>x.position(this))
- browsing :::= l
- }
-
- def addCompeting(l:List[Subscription]) = {
- l.foreach(x=>x.position(this))
- competing :::= l
+ def addSubscriptions(l:List[Subscription]) = {
+ subscriptions :::= l
}
- def removeBrowsing(s:Subscription) = {
- s.position(null)
- browsing = browsing.filterNot(_ == s)
- }
- def removeCompeting(s:Subscription) = {
- s.position(null)
- competing = competing.filterNot(_ == s)
+ def removeSubscriptions(s:Subscription) = {
+ subscriptions = subscriptions.filterNot(_ == s)
}
def nextOrTail():QueueEntry = {
var entry = getNext
if (entry == null) {
- entry = queue.tailEntry
+ entry = queue.tail_entry
}
entry
}
@@ -580,7 +607,7 @@ class QueueEntry(val queue:Queue, val se
def toQueueEntryRecord = {
val qer = new QueueEntryRecord
- qer.queueKey = queue.storeId
+ qer.queueKey = queue.store_id
qer.queueSeq = seq
qer.messageKey = state.messageKey
qer.size = state.size
@@ -588,7 +615,7 @@ class QueueEntry(val queue:Queue, val se
}
override def toString = {
- "{seq: "+seq+", prefetched: "+prefetched+", value: "+state+", competing: "+competing+", browsing: "+browsing+"}"
+ "{seq: "+seq+", prefetched: "+prefetched+", value: "+state+", subscriptions: "+subscriptions+"}"
}
/////////////////////////////////////////////////////
@@ -598,15 +625,22 @@ class QueueEntry(val queue:Queue, val se
/////////////////////////////////////////////////////
// What state is it in?
- def asTombstone = this.state.asTombstone
- def asFlushed = this.state.asFlushed
- def asLoaded = this.state.asLoaded
- def asTail = this.state.asTail
+ def as_tombstone = this.state.as_tombstone
+ def as_flushed = this.state.as_flushed
+ def as_loaded = this.state.as_loaded
+ def as_tail = this.state.as_tail
+
+ def is_tail = this == queue.tail_entry
+ def is_head = this == queue.head_entry
+
+ def is_loaded = as_loaded!=null
+ def is_flushed = as_flushed!=null
+ def is_tombstone = as_tombstone!=null
// These should not change the current state.
def size = this.state.size
def messageKey = this.state.messageKey
- def isFlushedOrFlushing = state.isFlushedOrFlushing
+ def is_flushed_or_flushing = state.is_flushed_or_flushing
def dispatch():QueueEntry = state.dispatch
// These methods may cause a change in the current state.
@@ -618,15 +652,15 @@ class QueueEntry(val queue:Queue, val se
final def entry:QueueEntry = QueueEntry.this
- def asTail:Tail = null
- def asLoaded:Loaded = null
- def asFlushed:Flushed = null
- def asTombstone:Tombstone = null
+ def as_tail:Tail = null
+ def as_loaded:Loaded = null
+ def as_flushed:Flushed = null
+ def as_tombstone:Tombstone = null
def size:Int
def dispatch():QueueEntry
def messageKey:Long
- def isFlushedOrFlushing = false
+ def is_flushed_or_flushing = false
def load = entry
@@ -634,13 +668,22 @@ class QueueEntry(val queue:Queue, val se
def tombstone = {
- // Update the prefetch counter to reflect that this entry is no longer being prefetched.
- var cur = entry
- while( cur!=null && prefetched > 0 ) {
- if( cur.hasSubs ) {
- (cur.browsing ::: cur.competing).foreach { sub => if( sub.prefetched(entry) ) { sub.removePrefetch(entry) } }
+ var refill_preftch_list = List[Subscription]()
+ if( queue.tune_flush_to_store ) {
+ // Update the prefetch counter to reflect that this entry is no longer being prefetched.
+ var cur = entry
+ while( cur!=null && is_prefetched ) {
+ if( cur.hasSubs ) {
+ (cur.subscriptions).foreach { sub =>
+ if( sub.is_prefetched(entry) ) {
+ sub.remove_from_prefetch(entry)
+ refill_preftch_list ::= sub
+ }
+ }
+ }
+ cur = cur.getPrevious
}
- cur = cur.getPrevious
+ assert(!is_prefetched, "entry should not be prefetched.")
}
// if rv and lv are both adjacent tombstones, then this merges the rv
@@ -651,8 +694,8 @@ class QueueEntry(val queue:Queue, val se
return rv
}
- val lts = lv.state.asTombstone
- val rts = rv.state.asTombstone
+ val lts = lv.state.as_tombstone
+ val rts = rv.state.as_tombstone
if( lts==null || rts==null ) {
return rv
@@ -660,26 +703,22 @@ class QueueEntry(val queue:Queue, val se
// Sanity check: the the entries are adjacent.. this should
// always be the case.
- if( lv.seq + lts.count != rv.seq ) {
- throw new AssertionError("entries are not adjacent.")
- }
+ assert( lv.seq + lts.count == rv.seq , "entries are not adjacent.")
lts.count += rts.count
- rts.count = 0
-
- if( rv.browsing!=Nil || rv.competing!=Nil ){
- lv.addBrowsing(rv.browsing)
- lv.addCompeting(rv.competing)
- rv.browsing = Nil
- rv.competing = Nil
- }
+ rv.dispatch // moves the subs to the next entry.
rv.unlink
return lv
}
state = new Tombstone()
merge(entry, getNext)
- merge(getPrevious, entry)
+ val rc = merge(getPrevious, entry)
+
+ refill_preftch_list.foreach( _.refill_prefetch )
+
+ rc.run // dispatch to move the subs to the next entry..
+ rc
}
}
@@ -691,7 +730,7 @@ class QueueEntry(val queue:Queue, val se
*/
class Tail extends EntryState {
- override def asTail:Tail = this
+ override def as_tail:Tail = this
def size = 0
def messageKey = -1
def dispatch():QueueEntry = null
@@ -709,18 +748,18 @@ class QueueEntry(val queue:Queue, val se
*/
class Loaded(val delivery: Delivery) extends EntryState {
- var aquired = false
+ var acquired = false
def messageKey = delivery.storeKey
def size = delivery.size
var flushing = false
- override def toString = { "loaded:{ flushing: "+flushing+", aquired: "+aquired+", size:"+size+"}" }
+ override def toString = { "loaded:{ flushing: "+flushing+", acquired: "+acquired+", size:"+size+"}" }
- override def isFlushedOrFlushing = {
+ override def is_flushed_or_flushing = {
flushing
}
- override def asLoaded = this
+ override def as_loaded = this
def store() = {
if( delivery.storeKey == -1 ) {
@@ -732,9 +771,9 @@ class QueueEntry(val queue:Queue, val se
}
override def flush() = {
- if( queue.host.store!=null && !flushing ) {
+ if( queue.tune_flush_to_store && !flushing ) {
flushing=true
- queue.flushingSize+=size
+ queue.flushing_size+=size
if( delivery.storeBatch!=null ) {
delivery.storeBatch.eagerFlush(^{
queue.store_flush_source.merge(this)
@@ -751,7 +790,7 @@ class QueueEntry(val queue:Queue, val se
def flushed() = {
if( flushing ) {
- queue.flushingSize-=size
+ queue.flushing_size-=size
queue.size -= size
state = new Flushed(delivery.storeKey, size)
}
@@ -760,7 +799,7 @@ class QueueEntry(val queue:Queue, val se
override def load() = {
if( flushing ) {
flushing = false
- queue.flushingSize-=size
+ queue.flushing_size-=size
}
entry
}
@@ -768,99 +807,111 @@ class QueueEntry(val queue:Queue, val se
override def tombstone = {
if( flushing ) {
flushing = false
- queue.flushingSize-=size
+ queue.flushing_size-=size
}
queue.size -= size
super.tombstone
}
def dispatch():QueueEntry = {
+
+ // Nothing to dispatch if we don't have subs..
+ if( subscriptions==Nil ) {
+ // This usualy happens when a new consumer connects, it's not marked as slow but
+ // is not at the tail. And this entry is an entry just sent by a producer.
+ return nextOrTail
+ }
+
+ // can't dispatch until the delivery is set.
if( delivery==null ) {
- // can't dispatch until the delivery is set.
- null
- } else {
+ // TODO: check to see if this ever happens
+ return null
+ }
- var browsingSlowSubs:List[Subscription] = Nil
- var browsingFastSubs:List[Subscription] = Nil
- var competingSlowSubs:List[Subscription] = Nil
- var competingFastSubs:List[Subscription] = Nil
-
- if( browsing!=Nil ) {
- val offering = delivery.copy
- browsing.foreach { sub =>
- if (sub.matches(offering)) {
- if (sub.offer(offering)) {
- browsingFastSubs ::= sub
- } else {
- browsingSlowSubs ::= sub
- }
+ var heldBack:List[Subscription] = Nil
+ var advancing:List[Subscription] = Nil
+
+
+ var acquiringSub: Subscription = null
+ subscriptions.foreach{ sub=>
+
+ if( sub.browser ) {
+ if (!sub.matches(delivery)) {
+ // advance: not interested.
+ advancing ::= sub
+ } else {
+ if (sub.offer(delivery)) {
+ // advance: accepted...
+ advancing ::= sub
} else {
- browsingFastSubs ::= sub
+ // hold back: flow controlled
+ heldBack ::= sub
}
}
- }
- if( competing!=Nil ) {
- if (!this.aquired) {
- aquired = true
-
- var picked: Subscription = null
- var remaining = competing
- while( remaining!=Nil && picked == null ) {
- val sub = remaining.head
- remaining = remaining.drop(1)
- if (sub.matches(delivery)) {
- competingSlowSubs = competingSlowSubs ::: sub :: Nil
-
- if( !sub.full ) {
- val node = sub.add(entry)
- val offering = delivery.copy
- offering.ack = (tx)=> {
- queue.ack_source.merge((node, tx))
- }
- if (sub.offer(offering)) {
- picked = sub
- }
+ } else {
+ if( acquired ) {
+ // advance: another sub already acquired this entry..
+ advancing = advancing ::: sub :: Nil
+ } else {
+ if (!sub.matches(delivery)) {
+ // advance: not interested.
+ advancing = advancing ::: sub :: Nil
+ } else {
+ if( sub.full ) {
+ // hold back: flow controlled
+ heldBack = heldBack ::: sub :: Nil
+ } else {
+ // advance: accepted...
+ acquiringSub = sub
+ acquired = true
+
+ val acquiredQueueEntry = sub.acquire(entry)
+ val acquiredDelivery = delivery.copy
+ acquiredDelivery.ack = (tx)=> {
+ queue.ack_source.merge((acquiredQueueEntry, tx))
}
- } else {
- competingFastSubs = competingFastSubs ::: sub :: Nil
+ assert(sub.offer(acquiredDelivery), "sub should have accepted, it had reported not full earlier.")
}
}
-
- if (picked == null) {
- aquired = false
- } else {
- competingFastSubs = remaining ::: competingFastSubs ::: competingSlowSubs
- competingSlowSubs = Nil
- }
- } else {
- competingFastSubs = competing
}
}
+ }
- // The slow subs stay on this entry..
- browsing = browsingSlowSubs
- competing = competingSlowSubs
-
- // the fast subs move on to the next entry...
- if ( browsingFastSubs!=null && competingFastSubs!=null) {
- val p = nextOrTail
- p.addBrowsing(browsingFastSubs)
- p.addCompeting(competingFastSubs)
-
- // flush this entry out if it's not going to be needed soon.
- def haveQuickConsumer = queue.fastSubs.find( sub=> sub.pos.seq <= seq ).isDefined
- if( !hasSubs && prefetched==0 && !aquired && !haveQuickConsumer ) {
- // then flush out to make space...
- flush
- }
-
- p
+ // The acquiring sub is added last to the list so that
+ // the other competing subs get first dibs at the next entry.
+ if( acquiringSub != null ) {
+
+ // Advancing may need to be held back because the sub's prefetch is full.
+ if( acquiringSub.prefetchFull ) {
+ advancing = advancing ::: acquiringSub :: Nil
} else {
- null
+ heldBack = heldBack ::: acquiringSub :: Nil
}
}
+
+ // The held back subs stay on this entry..
+ subscriptions = heldBack
+
+ // the advancing subs move on to the next entry...
+ if ( advancing!=Nil ) {
+
+ val next = nextOrTail
+ next.addSubscriptions(advancing)
+ advancing.foreach(_.advance(next))
+
+ // flush this entry out if it's not going to be needed soon.
+ def haveQuickConsumer = queue.fast_subscriptions.find( sub=> sub.pos.seq <= seq ).isDefined
+ if( !hasSubs && !is_prefetched && !acquired && !haveQuickConsumer ) {
+ // then flush out to make space...
+ flush
+ }
+
+ return next
+ } else {
+ return null
+ }
}
}
@@ -877,18 +928,19 @@ class QueueEntry(val queue:Queue, val se
def size = 0
def messageKey = -1
- override def asTombstone = this
+ override def as_tombstone = this
/**
* Nothing ot dispatch in a Tombstone, move the subscriptions to the next entry.
*/
def dispatch():QueueEntry = {
- val p = nextOrTail
- p.addBrowsing(browsing)
- p.addCompeting(competing)
- browsing = Nil
- competing = Nil
- p
+ assert(prefetched==0, "tombstones should never be prefetched.")
+
+ val next = nextOrTail
+ next.addSubscriptions(subscriptions)
+ subscriptions.foreach(_.advance(next))
+ subscriptions = Nil
+ next
}
override def tombstone = throw new AssertionError("Tombstone entry cannot be tombstoned")
@@ -908,27 +960,18 @@ class QueueEntry(val queue:Queue, val se
var loading = false
- override def asFlushed = this
+ override def as_flushed = this
- override def isFlushedOrFlushing = true
+ override def is_flushed_or_flushing = true
override def toString = { "flushed:{ loading: "+loading+", size:"+size+"}" }
// Flushed entries can't be dispatched until
// they get loaded.
def dispatch():QueueEntry = {
- if( !loading && hasSubs) {
-
- // I don't think this should ever happen as we should be prefetching the
- // entry before we dispatch it.
- warn("dispatch called on a flushed entry that is not loading.")
-
- // ask the subs to fill the prefetches.. that should
- // kick off a load
- (browsing ::: competing).foreach { sub =>
- sub.fillPrefetch
- }
- }
+ // This dispatch can happen when a subscription is holding onto lots of acquired entries
+ // it can't prefetch anymore as it's waiting for ack on those messages to avoid
+ // blowing it's memory limits.
null
}
@@ -937,7 +980,7 @@ class QueueEntry(val queue:Queue, val se
// trace("Start entry load of message seq: %s", seq)
// start loading it back...
loading = true
- queue.loadingSize += size
+ queue.loading_size += size
queue.host.store.loadMessage(messageKey) { delivery =>
// pass off to a source so it can aggregate multiple
// loads to reduce cross thread synchronization
@@ -946,7 +989,7 @@ class QueueEntry(val queue:Queue, val se
queue.store_load_source.merge((this, delivery.get))
} else {
- debug("Detected store drop of message seq: %d", seq)
+ info("Detected store dropped message at seq: %d", seq)
// Looks like someone else removed the message from the store.. lets just
// tombstone this entry now.
@@ -963,7 +1006,7 @@ class QueueEntry(val queue:Queue, val se
if( loading ) {
// debug("Loaded message seq: ", seq )
loading = false
- queue.loadingSize -= size
+ queue.loading_size -= size
val delivery = new Delivery()
delivery.message = ProtocolFactory.get(messageRecord.protocol).decode(messageRecord.value)
@@ -982,7 +1025,7 @@ class QueueEntry(val queue:Queue, val se
if( loading ) {
// debug("Tombstoned, will ignore store load of seq: ", seq)
loading = false
- queue.loadingSize -= size
+ queue.loading_size -= size
}
super.tombstone
}
@@ -990,105 +1033,238 @@ class QueueEntry(val queue:Queue, val se
}
-class LinkedQueueEntry(val value:QueueEntry) extends LinkedNode[LinkedQueueEntry]
-
class Subscription(queue:Queue) extends DeliveryProducer with DispatchLogging {
override protected def log = Queue
def dispatchQueue = queue.dispatchQueue
- var dispatched = new LinkedNodeList[LinkedQueueEntry]
+ var acquired = new LinkedNodeList[AcquiredQueueEntry]
var session: DeliverySession = null
var pos:QueueEntry = null
- var cursoredCounter = 0L
+ var advanced_size = 0L
// Vars used to detect slow consumers.
- var prevCursoredCounter = 0L
- var tailParkings = 0
- var slowIntervals = 0
-
- def slow = slowIntervals > queue.tune_max_slow_intervals
-
- var nextPrefetchPos:QueueEntry = null
- var prefetchSize = 0
-
-
- override def toString = "{ prefetchSize: "+prefetchSize+", pos: "+(if(pos==null) null else pos.seq)+" nextPrefetchPos: "+(if(nextPrefetchPos==null) null else nextPrefetchPos.seq)+" }"
-
- def position(value:QueueEntry):Unit = {
- if( value!=null ) {
- // setting a new position..
- if( pos!=null ) {
- // Remove the previous pos from the prefetch counters.
- pos.prefetched -= 1
- removePrefetch(pos)
- cursoredCounter += pos.size
- }
- } else {
- // setting null pos, happens when the sub is closed.
- var cur = pos
+ var last_cursored_size = 0L
+ var tail_parkings = 0
+ var slow_intervals = 0
+
+ def slow = slow_intervals > queue.tune_max_slow_intervals
+
+ var prefetch_tail:QueueEntry = null
+ var prefetched_size = 0
+ var acquired_size = 0L
- // clean up it's prefetch counters on the entries..
- while( cur!=nextPrefetchPos ) {
- cur.prefetched -= 1
- cur = cur.nextOrTail
- }
- }
- pos = value
+ override def toString = {
+ def seq(entry:QueueEntry) = if(entry==null) null else entry.seq
+ "{ acquired_size: "+acquired_size+", prefetch_size: "+prefetched_size+", pos: "+seq(pos)+" prefetch_tail: "+seq(prefetch_tail)+" }"
+ }
+
+ def browser = session.consumer.browser
+
+ def open(consumer: DeliveryConsumer) = {
+ pos = queue.head_entry;
+ session = consumer.connect(this)
session.refiller = pos
- if( tailParked ) {
- tailParkings += 1
+
+ queue.head_entry.addSubscriptions(this :: Nil)
+ refill_prefetch
+
+ // kick off the initial dispatch.
+ queue.dispatchQueue << queue.head_entry
+ }
+
+ def close() = {
+ pos.removeSubscriptions(this)
+
+ invalidate_prefetch
+
+ pos = null
+ session.refiller = null
+ session.close
+ session = null
+
+ // nack all the acquired entries.
+ var next = acquired.getHead
+ while( next !=null ) {
+ val cur = next;
+ next = next.getNext
+ cur.nack // this unlinks the entry.
}
+
+ // show the queue entries... after we disconnect.
+ queue.dispatchQueue.dispatchAfter(1, TimeUnit.SECONDS, ^{
+ queue.display_active_entries
+ })
}
+ /**
+ * Advances the subscriptions position to the specified
+ * queue entry.
+ */
+ def advance(value:QueueEntry):Unit = {
+ assert(value!=null)
+
+ // Remove the previous pos from the prefetch counters.
+ if( prefetch_tail!=null && !pos.is_tombstone) {
+ remove_from_prefetch(pos)
+ }
+ advanced_size += pos.size
- def prefetched(value:QueueEntry) = {
- pos.seq <= value.seq && value.seq < nextPrefetchPos.seq
+ pos = value
+ session.refiller = pos
+
+ refill_prefetch()
+ if( tail_parked ) {
+ tail_parkings += 1
+ }
}
- def removePrefetch(value:QueueEntry):Unit = {
- prefetchSize -= value.size
- fillPrefetch()
+ /**
+ * Rewinds to a previously seen location.. Happens when
+ * a nack occurs from another consumer.
+ */
+ def rewind(value:QueueEntry):Unit = {
+ assert(value!=null)
+ invalidate_prefetch
+ pos = value
+ session.refiller = pos
+ queue.dispatchQueue << value // queue up the entry to get dispatched..
}
- def fillPrefetch() = {
- // attempts to fill the prefetch...
- while(prefetchSize < queue.tune_subscription_buffer && nextPrefetchPos.asTail==null ) {
- addPrefetch(nextPrefetchPos)
+ def invalidate_prefetch: Unit = {
+ if (prefetch_tail != null) {
+ // release the prefetch counters...
+ var cur = pos
+ while (cur.seq <= prefetch_tail.seq) {
+ if (!cur.is_tombstone) {
+ prefetched_size -= cur.size
+ cur.prefetched -= 1
+ }
+ cur = cur.nextOrTail
+ }
+ assert(prefetched_size == 0, "inconsistent prefetch size.")
}
}
- def addPrefetch(value:QueueEntry):Unit = {
- prefetchSize += value.size
- value.prefetched += 1
- value.load
- nextPrefetchPos = value.nextOrTail
+
+ /**
+ * Is the specified queue entry prefeteched by this subscription?
+ */
+ def is_prefetched(value:QueueEntry) = {
+ prefetch_tail!=null && pos.seq <= value.seq && value.seq <= prefetch_tail.seq
+ }
+
+
+ def add_to_prefetch(entry:QueueEntry):Unit = {
+ assert( !entry.is_tombstone, "tombstones should not be prefetched..")
+ prefetched_size += entry.size
+ entry.prefetched += 1
+ entry.load
+ prefetch_tail = entry
}
- def tailParked = pos eq queue.tailEntry
+ def remove_from_prefetch(entry:QueueEntry):Unit = {
+ prefetched_size -= entry.size
+ entry.prefetched -= 1
- def connect(consumer: DeliveryConsumer) = {
- session = consumer.connect(this)
- addPrefetch(queue.headEntry)
- queue.headEntry.addCompeting(this :: Nil)
- queue.dispatchQueue << queue.headEntry
+ if( entry == prefetch_tail ) {
+ prefetch_tail = prefetch_tail.getPrevious;
+ if( prefetch_tail==null || prefetch_tail.seq < pos.seq ) {
+ prefetch_tail = null
+ assert( prefetched_size == 0 , "inconsistent prefetch size.")
+ }
+ } else {
+ assert( prefetched_size >= 0 , "inconsistent prefetch size.")
+ }
}
- def close() = {
- pos.removeCompeting(this)
- session.close
- session = null
- queue.nack(dispatched)
+ def refill_prefetch() = {
+ if( queue.tune_flush_to_store ) {
+ def next_prefetch_pos = if(prefetch_tail==null) {
+ if( !pos.is_tail ) {
+ pos
+ } else {
+ null
+ }
+ } else {
+ prefetch_tail.getNext
+ }
+
+ // attempts to fill the prefetch...
+ var next = next_prefetch_pos
+ while( !prefetchFull && next!=null ) {
+ if( !next.is_tombstone ) {
+ add_to_prefetch(next)
+ }
+ next = next.getNext
+ }
+ }
}
+ def prefetchFull() = acquired_size + prefetched_size >= queue.tune_consumer_buffer
+
+ def tail_parked = pos eq queue.tail_entry
+
def matches(entry:Delivery) = session.consumer.matches(entry)
def full = session.full
def offer(delivery:Delivery) = session.offer(delivery)
- def add(entry:QueueEntry) = {
- val rc = new LinkedQueueEntry(entry)
- dispatched.addLast(rc)
- rc
+
+ class AcquiredQueueEntry(val entry:QueueEntry) extends LinkedNode[AcquiredQueueEntry] {
+
+ acquired.addLast(this)
+ acquired_size += entry.size
+
+ def ack(sb:StoreBatch) = {
+
+ if (entry.messageKey != -1) {
+ val storeBatch = if( sb == null ) {
+ queue.host.store.createStoreBatch
+ } else {
+ sb
+ }
+ storeBatch.dequeue(entry.toQueueEntryRecord)
+ if( sb == null ) {
+ storeBatch.release
+ }
+ }
+ if( sb != null ) {
+ sb.release
+ }
+
+ queue.dequeue_item_counter += 1
+ queue.dequeue_size_counter += entry.size
+
+ // removes this entry from the acquired list.
+ unlink()
+
+ // we may now be able to prefetch some messages..
+ acquired_size -= entry.size
+ entry.tombstone // entry size changes to 0
+ refill_prefetch
+ }
+
+ def nack = {
+
+ entry.as_loaded.acquired = false
+ acquired_size -= entry.size
+
+ // track for stats
+ queue.nack_item_counter += 1
+ queue.nack_size_counter += entry.size
+
+ // rewind all the matching competing subs past the entry.. back to the entry
+ queue.all_subscriptions.valuesIterator.foreach{ sub=>
+ if( !sub.browser && entry.seq < sub.pos.seq && sub.matches(entry.as_loaded.delivery)) {
+ sub.rewind(entry)
+ }
+ }
+ unlink()
+ }
}
+
+
+ def acquire(entry:QueueEntry) = new AcquiredQueueEntry(entry)
+
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala?rev=961156&r1=961155&r2=961156&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala Wed Jul 7 04:11:49 2010
@@ -159,7 +159,11 @@ class HawtDBClient(hawtDBStore: HawtDBSt
pageFileFactory.setSync(true)
pageFileFactory.setUseWorkerThread(true)
pageFileFactory.setPageSize(512.toShort)
- pageFileFactory.setCacheSize((1024*1024*20)/512); // 20 meg page cache
+
+ // Empirically found (using profiler) that a cached BTree page retains
+ // about 4000 bytes of mem ON 64 bit platform.
+ pageFileFactory.setCacheSize((1024*1024*20)/4000 );
+
pageFileFactory.open()
val initialized = withTx { tx =>
@@ -193,7 +197,7 @@ class HawtDBClient(hawtDBStore: HawtDBSt
// Schedual periodic jobs.. they keep executing while schedual_version remains the same.
schedualCleanup(schedual_version.get())
- schedualFlush(schedual_version.get())
+ // schedualFlush(schedual_version.get())
}
}
@@ -312,11 +316,37 @@ class HawtDBClient(hawtDBStore: HawtDBSt
val metric_load_from_index = new TimeCounter
val metric_load_from_journal = new TimeCounter
+ def loadMessages(requests: ListBuffer[(Long, (Option[MessageRecord])=>Unit)]) = {
+ val locations = withTx { tx =>
+ val helper = new TxHelper(tx)
+ import JavaConversions._
+ import helper._
+ requests.flatMap { case (messageKey, callback)=>
+ val location = metric_load_from_index.time {
+ messageKeyIndex.get(messageKey)
+ }
+ if( location==null ) {
+ debug("Message not indexed. Journal location could not be determined for message: %s", messageKey)
+ callback(None)
+ None
+ } else {
+ Some((location, callback))
+ }
+ }
+ }
+
+ locations.foreach { case (location, callback)=>
+ val addMessage = metric_load_from_journal.time {
+ load(location, classOf[AddMessage.Getter])
+ }
+ callback( addMessage.map( x => toMessageRecord(x) ) )
+ }
+
+ }
+
def loadMessage(messageKey: Long): Option[MessageRecord] = {
metric_load_from_index.start { end =>
withTx { tx =>
- val idxPage = rootBuffer.getMessageKeyIndexPage
-
val helper = new TxHelper(tx)
import JavaConversions._
import helper._
Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala?rev=961156&r1=961155&r2=961156&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala Wed Jul 7 04:11:49 2010
@@ -91,7 +91,7 @@ class HawtDBStore extends Store with Bas
}
protected def _start(onCompleted: Runnable) = {
- executor_pool = Executors.newFixedThreadPool(20, new ThreadFactory(){
+ executor_pool = Executors.newFixedThreadPool(1, new ThreadFactory(){
def newThread(r: Runnable) = {
val rc = new Thread(r, "hawtdb store client")
rc.setDaemon(true)
@@ -160,10 +160,19 @@ class HawtDBStore extends Store with Bas
}
}
+ val load_source = createSource(new ListEventAggregator[(Long, (Option[MessageRecord])=>Unit)](), dispatchQueue)
+ load_source.setEventHandler(^{drain_loads});
+ load_source.resume
+
+
def loadMessage(id: Long)(callback: (Option[MessageRecord]) => Unit) = {
+ load_source.merge((id, callback))
+ }
+
+ def drain_loads = {
+ var data = load_source.getData
executor_pool ^{
- val rc = client.loadMessage(id)
- callback( rc )
+ client.loadMessages(data)
}
}
@@ -437,28 +446,30 @@ class HawtDBStore extends Store with Bas
if( !txs.isEmpty ) {
storeLatency.start { end=>
- client.store(txs, ^{
- dispatchQueue {
-
- end()
- txs.foreach { tx=>
-
- tx.actions.foreach { case (msg, action) =>
- if( action.messageRecord !=null ) {
- metric_flushed_message_counter += 1
- pendingStores.remove(msg)
+ executor_pool {
+ client.store(txs, ^{
+ dispatchQueue {
+
+ end()
+ txs.foreach { tx=>
+
+ tx.actions.foreach { case (msg, action) =>
+ if( action.messageRecord !=null ) {
+ metric_flushed_message_counter += 1
+ pendingStores.remove(msg)
+ }
+ action.enqueues.foreach { queueEntry=>
+ metric_flushed_enqueue_counter += 1
+ val k = key(queueEntry)
+ pendingEnqueues.remove(k)
+ }
}
- action.enqueues.foreach { queueEntry=>
- metric_flushed_enqueue_counter += 1
- val k = key(queueEntry)
- pendingEnqueues.remove(k)
- }
- }
- tx.onPerformed
+ tx.onPerformed
+ }
}
- }
- })
+ })
+ }
}
}
}