You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 06:11:50 UTC

svn commit: r961156 - in /activemq/sandbox/activemq-apollo-actor: activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/ activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/

Author: chirino
Date: Wed Jul  7 04:11:49 2010
New Revision: 961156

URL: http://svn.apache.org/viewvc?rev=961156&view=rev
Log:
The Queue now has much more robust prefetch tracking.  This has made consumers/producers much more decoupled. i.e. persisetnce events on one should not aftect the other as much.
the hawtdb store loads messages in batches now.

Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=961156&r1=961155&r2=961156&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala Wed Jul  7 04:11:49 2010
@@ -48,6 +48,7 @@ trait DeliveryProducer {
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
 trait DeliveryConsumer extends Retained {
+  def browser = false
   def dispatchQueue:DispatchQueue;
   def matches(message:Delivery):Boolean
   def connect(producer:DeliveryProducer):DeliverySession

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=961156&r1=961155&r2=961156&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Wed Jul  7 04:11:49 2010
@@ -21,12 +21,12 @@ import org.apache.activemq.util.TreeMap
 import collection.{SortedMap}
 import org.fusesource.hawtdispatch.{ScalaDispatch, DispatchQueue, BaseRetained}
 import org.apache.activemq.util.TreeMap.TreeEntry
-import java.util.{Collections, ArrayList, LinkedList}
 import org.apache.activemq.util.list.{LinkedNodeList, LinkedNode}
 import org.apache.activemq.broker.store.{StoreBatch}
 import protocol.ProtocolFactory
 import org.apache.activemq.apollo.store.{QueueEntryRecord, MessageRecord}
 import java.util.concurrent.TimeUnit
+import java.util.{HashSet, Collections, ArrayList, LinkedList}
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -62,8 +62,8 @@ class Queue(val host: VirtualHost, val d
 
   import Queue._
 
-  var consumerSubs = Map[DeliveryConsumer, Subscription]()
-  var fastSubs = List[Subscription]()
+  var all_subscriptions = Map[DeliveryConsumer, Subscription]()
+  var fast_subscriptions = List[Subscription]()
 
   override val dispatchQueue: DispatchQueue = createQueue(destination.toString);
   dispatchQueue.setTargetQueue(getRandomThreadQueue)
@@ -77,7 +77,7 @@ class Queue(val host: VirtualHost, val d
   })
 
 
-  val ack_source = createSource(new ListEventAggregator[(LinkedQueueEntry, StoreBatch)](), dispatchQueue)
+  val ack_source = createSource(new ListEventAggregator[(Subscription#AcquiredQueueEntry, StoreBatch)](), dispatchQueue)
   ack_source.setEventHandler(^ {drain_acks});
   ack_source.resume
 
@@ -86,18 +86,16 @@ class Queue(val host: VirtualHost, val d
   // sequence numbers.. used to track what's in the store.
   var message_seq_counter = 1L
 
-  var counter = 0
-
   val entries = new LinkedNodeList[QueueEntry]()
-  val headEntry = new QueueEntry(this, 0L);
-  var tailEntry = new QueueEntry(this, next_message_seq)
+  val head_entry = new QueueEntry(this, 0L);
+  var tail_entry = new QueueEntry(this, next_message_seq)
 
-  entries.addFirst(headEntry)
-  headEntry.tombstone
+  entries.addFirst(head_entry)
+  head_entry.tombstone
 
-  var loadingSize = 0
-  var flushingSize = 0
-  var storeId: Long = -1L
+  var loading_size = 0
+  var flushing_size = 0
+  var store_id: Long = -1L
 
   //
   // Tuning options.
@@ -106,12 +104,12 @@ class Queue(val host: VirtualHost, val d
   /**
    *  The amount of memory buffer space for receiving messages.
    */
-  var tune_inbound_buffer = 1024 * 32
+  var tune_producer_buffer = 1024*32
 
   /**
    *  The amount of memory buffer space to use per subscription.
    */
-  var tune_subscription_buffer = 1024*32
+  var tune_consumer_buffer = 1024*64
 
   /**
    * Subscribers that consume slower than this rate per seconds will be considered
@@ -122,7 +120,18 @@ class Queue(val host: VirtualHost, val d
   /**
    * The number of milliseconds between slow consumer checks.
    */
-  var tune_slow_check_interval = 100L
+  var tune_slow_check_interval = 200L
+
+  /**
+   * Should messages be flushed or swapped out of memory if
+   * no consumers need the message?
+   */
+  def tune_flush_to_store = tune_persistent
+
+  /**
+   * Should this queue persistently store it's entries?
+   */
+  def tune_persistent = host.store !=null
 
   /**
    * The number of intervals that a consumer must not meeting the subscription rate before it is
@@ -130,28 +139,35 @@ class Queue(val host: VirtualHost, val d
    */
   var tune_max_slow_intervals = 10
 
-  var enqueue_counter = 0L
-  var dequeue_counter = 0L
-  var enqueue_size = 0L
-  var dequeue_size = 0L
+  var enqueue_item_counter = 0L
+  var dequeue_item_counter = 0L
+  var enqueue_size_counter = 0L
+  var dequeue_size_counter = 0L
+  var nack_item_counter = 0L
+  var nack_size_counter = 0L
+
+  def queue_size = enqueue_size_counter - dequeue_size_counter
+  def queue_items = enqueue_item_counter - dequeue_item_counter
 
-  private var capacity = tune_inbound_buffer
+  private var capacity = tune_producer_buffer
   var size = 0
 
-  schedualSlowConsumerCheck
+  schedual_slow_consumer_check
 
   def restore(storeId: Long, records:Seq[QueueEntryRecord]) = ^{
-    this.storeId = storeId
+    this.store_id = storeId
     if( !records.isEmpty ) {
+
+      // adjust the head tombstone.
+      head_entry.as_tombstone.count = records.head.queueSeq
+
       records.foreach { qer =>
         val entry = new QueueEntry(Queue.this,qer.queueSeq).init(qer)
         entries.addLast(entry)
       }
 
       message_seq_counter = records.last.queueSeq+1
-
-      counter = records.size
-      enqueue_counter += records.size
+      enqueue_item_counter += records.size
       debug("restored: "+records.size )
     }
   } >>: dispatchQueue
@@ -174,16 +190,18 @@ class Queue(val host: VirtualHost, val d
         false
       } else {
 
-        val entry = tailEntry
-        tailEntry = new QueueEntry(Queue.this, next_message_seq)
+        val entry = tail_entry
+        tail_entry = new QueueEntry(Queue.this, next_message_seq)
         val queueDelivery = delivery.copy
         entry.init(queueDelivery)
-        queueDelivery.storeBatch = delivery.storeBatch
+        
+        if( tune_persistent ) {
+          queueDelivery.storeBatch = delivery.storeBatch
+        }
 
         entries.addLast(entry)
-        counter += 1;
-        enqueue_counter += 1
-        enqueue_size += entry.size
+        enqueue_item_counter += 1
+        enqueue_size_counter += entry.size
 
         // Do we need to do a persistent enqueue???
         if (queueDelivery.storeBatch != null) {
@@ -191,7 +209,7 @@ class Queue(val host: VirtualHost, val d
         }
 
 
-        def haveQuickConsumer = fastSubs.find( sub=> sub.pos.seq <= entry.seq ).isDefined
+        def haveQuickConsumer = fast_subscriptions.find( sub=> sub.pos.seq <= entry.seq ).isDefined
 
         var dispatched = false
         if( entry.hasSubs || haveQuickConsumer ) {
@@ -215,74 +233,117 @@ class Queue(val host: VirtualHost, val d
   }
 
 
-  var checkCounter = 0
-  def schedualSlowConsumerCheck:Unit = {
+  var check_counter = 0
+  def display_stats: Unit = {
+    info("contains: %d messages worth %,.2f MB of data, producers are %s, %d/%d buffer space used.", queue_items, (queue_size.toFloat / (1024 * 1024)), {if (messages.full) "being throttled" else "not being throttled"}, size, capacity)
+    info("total messages enqueued %d, dequeues %d ", enqueue_item_counter, dequeue_item_counter)
+  }
+
+  def display_active_entries: Unit = {
+    var cur = entries.getHead
+    var total_items = 0L
+    var total_size = 0L
+    while (cur != null) {
+      if (cur.is_loaded || cur.hasSubs || cur.is_prefetched) {
+        info("  => " + cur)
+      }
+      if (cur.is_flushed || cur.is_loaded) {
+        total_items += 1
+        total_size += cur.size
+      }
+      cur = cur.getNext
+    }
+    info("tail: " + tail_entry)
+
+    // sanitiy checks..
+    assert(total_items == queue_items)
+    assert(total_size == queue_size)
+  }
+
+  def schedual_slow_consumer_check:Unit = {
 
     def slowConsumerCheck = {
       if( retained > 0 ) {
 
         // Handy for periodically looking at the dispatch state...
-//        checkCounter += 1
-//        if( !consumerSubs.isEmpty && (checkCounter%100)==0 ) {
-//          println("using "+size+" out of "+capacity+" buffer space.");
-//          var cur = entries.getHead
-//          while( cur!=null ) {
-//            if( cur.asLoaded!=null || cur.hasSubs || cur.prefetched>0 ) {
-//              println("  => "+cur)
-//            }
-//            cur = cur.getNext
+        check_counter += 1
+
+        if( (check_counter%10)==0  ) {
+          display_stats
+        }
+
+//        if( (check_counter%100)==0 ) {
+//          if (!all_subscriptions.isEmpty) {
+//            display_active_entries
 //          }
-//          println("tail: "+tailEntry)
 //        }
 
         // target tune_min_subscription_rate / sec
-        val slowCursorDelta = (((tune_slow_subscription_rate) * tune_slow_check_interval) / 1000).toInt
+        val slow_cursor_delta = (((tune_slow_subscription_rate) * tune_slow_check_interval) / 1000).toInt
         var idleConsumerCount = 0
-        fastSubs = Nil
 
-        consumerSubs.foreach{ case (consumer, sub)=>
+
+        var startedWithFastSubs = !fast_subscriptions.isEmpty
+        fast_subscriptions = Nil
+
+        all_subscriptions.foreach{ case (consumer, sub)=>
 
           // Skip over new consumers...
-          if( sub.cursoredCounter != 0 ) {
+          if( sub.advanced_size != 0 ) {
 
-            val cursorDelta = sub.cursoredCounter - sub.prevCursoredCounter 
-            sub.prevCursoredCounter = sub.cursoredCounter
+            val cursor_delta = sub.advanced_size - sub.last_cursored_size
+            sub.last_cursored_size = sub.advanced_size
 
             // If the subscription is NOT slow if it's been tail parked or
             // it's been parking and cursoring through the data at the tune_slow_subscription_rate 
-            if( (sub.tailParked && sub.tailParkings==0) || ( sub.tailParkings > 0 && cursorDelta >= slowCursorDelta ) ) {
+            if( (sub.tail_parked && sub.tail_parkings==0) || ( sub.tail_parkings > 0 && cursor_delta >= slow_cursor_delta ) ) {
               if( sub.slow ) {
-                debug("consumer is no longer slow: %s", consumer)
-                sub.slowIntervals = 0
+                info("subscription is now fast: %s", sub)
+                sub.slow_intervals = 0
               }
             } else {
               if( !sub.slow ) {
-//                debug("slow interval: %d, %d, %d", sub.slowIntervals, sub.tailParkings, cursorDelta)
-                sub.slowIntervals += 1
+                info("slow interval: %d, %d, %d", sub.slow_intervals, sub.tail_parkings, cursor_delta)
+                sub.slow_intervals += 1
                 if( sub.slow ) {
-                  debug("consumer is slow: %s", consumer)
+                  info("subscription is now slow: %s", sub)
                 }
               }
             }
 
             // has the consumer been stuck at the tail?
-            if( sub.tailParked && sub.tailParkings==0 ) {
+            if( sub.tail_parked && sub.tail_parkings==0 ) {
               idleConsumerCount += 1;
             }
 
-            sub.tailParkings = 0
+            sub.tail_parkings = 0
           }
 
           if( !sub.slow ) {
-            fastSubs ::= sub
+            fast_subscriptions ::= sub
+          }
+        }
+
+
+        // If we no longer have fast subs...
+        if( startedWithFastSubs && fast_subscriptions.isEmpty ) {
+          // Flush out the tail entries..
+          var cur = entries.getTail
+          while( cur!=null ) {
+            if( !cur.hasSubs && !cur.is_prefetched ) {
+              cur
+            }
+            cur = cur.getPrevious
           }
+
+
         }
 
         // flush tail entries that are still loaded but which have no fast subs that can process them.
         var cur = entries.getTail
         while( cur!=null ) {
-          def haveQuickConsumer = fastSubs.find( sub=> sub.pos.seq <= cur.seq ).isDefined
-          if( !cur.hasSubs && cur.prefetched==0 && cur.asFlushed==null && !haveQuickConsumer ) {
+          def haveQuickConsumer = fast_subscriptions.find( sub=> sub.pos.seq <= cur.seq ).isDefined
+          if( cur.is_loaded && !cur.hasSubs && !cur.is_prefetched && !cur.as_loaded.acquired && !haveQuickConsumer ) {
             // then flush out to make space...
             cur.flush
             cur = cur.getPrevious
@@ -293,10 +354,10 @@ class Queue(val host: VirtualHost, val d
 
 
         // Trigger a swap if we have consumers waiting for messages and we are full..
-        if( idleConsumerCount > 0 && messages.full && flushingSize==0 ) {
+        if( idleConsumerCount > 0 && messages.full && flushing_size==0 ) {
           swap
         }
-        schedualSlowConsumerCheck
+        schedual_slow_consumer_check
       }
     }
 
@@ -305,39 +366,11 @@ class Queue(val host: VirtualHost, val d
     })
   }
 
-  def ack(entry: QueueEntry, sb:StoreBatch) = {
-    if (entry.messageKey != -1) {
-      val storeBatch = if( sb == null ) {
-        host.store.createStoreBatch
-      } else {
-        sb
-      }
-      storeBatch.dequeue(entry.toQueueEntryRecord)
-      if( sb == null ) {
-        storeBatch.release
-      }
-    }
-    if( sb != null ) {
-      sb.release
-    }
-
-    dequeue_counter += 1
-    counter -= 1
-    dequeue_size += entry.size
-    entry.tombstone
-  }
-
-
-  def nack(values: LinkedNodeList[LinkedQueueEntry]) = {
-    // TODO:
-  }
-
 
   def drain_acks = {
     ack_source.getData.foreach {
       case (entry, tx) =>
-        entry.unlink
-        ack(entry.value, tx)
+        entry.ack(tx)
     }
     messages.refiller.run
   }
@@ -354,6 +387,10 @@ class Queue(val host: VirtualHost, val d
   def connect(p: DeliveryProducer) = new DeliverySession {
     retain
 
+    dispatchQueue {
+      addCapacity( tune_producer_buffer )
+    }
+
     override def consumer = Queue.this
 
     override def producer = p
@@ -362,6 +399,9 @@ class Queue(val host: VirtualHost, val d
 
     def close = {
       session_manager.close(session)
+      dispatchQueue {
+        addCapacity( -tune_producer_buffer )
+      }
       release
     }
 
@@ -373,7 +413,7 @@ class Queue(val host: VirtualHost, val d
         false
       } else {
 
-        if( delivery.storeBatch!=null ) {
+        if( tune_persistent && delivery.storeBatch!=null ) {
           delivery.storeBatch.retain
         }
         val rc = session.offer(delivery)
@@ -394,26 +434,26 @@ class Queue(val host: VirtualHost, val d
   //
   /////////////////////////////////////////////////////////////////////
 
-  def connected(consumers: List[DeliveryConsumer]) = bind(consumers)
+  def connected(values: List[DeliveryConsumer]) = bind(values)
 
-  def bind(consumers: List[DeliveryConsumer]) = retaining(consumers) {
-    for (consumer <- consumers) {
+  def bind(values: List[DeliveryConsumer]) = retaining(values) {
+    for (consumer <- values) {
       val subscription = new Subscription(this)
-      subscription.connect(consumer)
-      consumerSubs += consumer -> subscription
-      fastSubs ::= subscription
-      addCapacity( tune_subscription_buffer )
+      subscription.open(consumer)
+      all_subscriptions += consumer -> subscription
+      fast_subscriptions ::= subscription
+      addCapacity( tune_consumer_buffer )
     }
   } >>: dispatchQueue
 
-  def unbind(consumers: List[DeliveryConsumer]) = releasing(consumers) {
-    for (consumer <- consumers) {
-      consumerSubs.get(consumer) match {
-        case Some(cs) =>
-          cs.close
-          consumerSubs -= consumer
-          fastSubs = fastSubs.filterNot(_ eq cs)
-          addCapacity( -tune_subscription_buffer )
+  def unbind(values: List[DeliveryConsumer]) = releasing(values) {
+    for (consumer <- values) {
+      all_subscriptions.get(consumer) match {
+        case Some(subscription) =>
+          all_subscriptions -= consumer
+          fast_subscriptions = fast_subscriptions.filterNot(_ eq subscription)
+          subscription.close
+          addCapacity( -tune_consumer_buffer )
         case None =>
       }
 
@@ -449,12 +489,12 @@ class Queue(val host: VirtualHost, val d
 
     var entry = entries.getHead
     while( entry!=null ) {
-      if( entry.asTombstone == null ) {
+      if( entry.as_tombstone == null ) {
 
-        val loaded = entry.asLoaded
+        val loaded = entry.as_loaded
 
         // Keep around prefetched and loaded entries.
-        if( entry.prefetched < 0 || (loaded!=null && loaded.aquired)) {
+        if( entry.is_prefetched || (loaded!=null && loaded.acquired)) {
           entry.load
         } else {
           // flush the the others out of memory.
@@ -508,18 +548,16 @@ class QueueEntry(val queue:Queue, val se
   override protected def log = Queue
   import QueueEntry._
 
-  // Competing subscriptions try to exclusivly aquire the entry.
-  var competing:List[Subscription] = Nil
-  // These are subscriptions which will not be exclusivly aquiring the entry.
-  var browsing:List[Subscription] = Nil
-  // The number of subscriptions which have requested this entry to be prefetech (held in memory) so that it's
+  // Subscriptions waiting to dispatch this entry.
+  var subscriptions:List[Subscription] = Nil
+  // The number of subscriptions which have requested this entry to be prefeteched (held in memory) so that it's
   // ready for them to get dispatched.
   var prefetched = 0
 
   // The current state of the entry: Tail | Loaded | Flushed | Tombstone
   var state:EntryState = new Tail
 
-
+  def is_prefetched = prefetched>0
 
   def init(delivery:Delivery):QueueEntry = {
     this.state = new Loaded(delivery)
@@ -532,7 +570,7 @@ class QueueEntry(val queue:Queue, val se
     this
   }
 
-  def hasSubs = !(competing == Nil && browsing == Nil)
+  def hasSubs = !(subscriptions == Nil )
 
   /**
    * Dispatches this entry to the consumers and continues dispatching subsequent
@@ -545,30 +583,19 @@ class QueueEntry(val queue:Queue, val se
     }
   }
 
-  def addBrowsing(l:List[Subscription]) = {
-    l.foreach(x=>x.position(this))
-    browsing :::= l
-  }
-
-  def addCompeting(l:List[Subscription]) = {
-    l.foreach(x=>x.position(this))
-    competing :::= l
+  def addSubscriptions(l:List[Subscription]) = {
+    subscriptions :::= l
   }
 
-  def removeBrowsing(s:Subscription) = {
-    s.position(null)
-    browsing = browsing.filterNot(_ == s)
-  }
 
-  def removeCompeting(s:Subscription) = {
-    s.position(null)
-    competing = competing.filterNot(_ == s)
+  def removeSubscriptions(s:Subscription) = {
+    subscriptions = subscriptions.filterNot(_ == s)
   }
 
   def nextOrTail():QueueEntry = {
     var entry = getNext
     if (entry == null) {
-      entry = queue.tailEntry
+      entry = queue.tail_entry
     }
     entry
   }
@@ -580,7 +607,7 @@ class QueueEntry(val queue:Queue, val se
 
   def toQueueEntryRecord = {
     val qer = new QueueEntryRecord
-    qer.queueKey = queue.storeId
+    qer.queueKey = queue.store_id
     qer.queueSeq = seq
     qer.messageKey = state.messageKey
     qer.size = state.size
@@ -588,7 +615,7 @@ class QueueEntry(val queue:Queue, val se
   }
 
   override def toString = {
-    "{seq: "+seq+", prefetched: "+prefetched+", value: "+state+", competing: "+competing+", browsing: "+browsing+"}"
+    "{seq: "+seq+", prefetched: "+prefetched+", value: "+state+", subscriptions: "+subscriptions+"}"
   }
 
   /////////////////////////////////////////////////////
@@ -598,15 +625,22 @@ class QueueEntry(val queue:Queue, val se
   /////////////////////////////////////////////////////
 
   // What state is it in?
-  def asTombstone = this.state.asTombstone
-  def asFlushed = this.state.asFlushed
-  def asLoaded = this.state.asLoaded
-  def asTail = this.state.asTail
+  def as_tombstone = this.state.as_tombstone
+  def as_flushed = this.state.as_flushed
+  def as_loaded = this.state.as_loaded
+  def as_tail = this.state.as_tail
+
+  def is_tail = this == queue.tail_entry
+  def is_head = this == queue.head_entry
+
+  def is_loaded = as_loaded!=null
+  def is_flushed = as_flushed!=null
+  def is_tombstone = as_tombstone!=null
 
   // These should not change the current state.
   def size = this.state.size
   def messageKey = this.state.messageKey
-  def isFlushedOrFlushing = state.isFlushedOrFlushing
+  def is_flushed_or_flushing = state.is_flushed_or_flushing
   def dispatch():QueueEntry = state.dispatch
 
   // These methods may cause a change in the current state.
@@ -618,15 +652,15 @@ class QueueEntry(val queue:Queue, val se
 
     final def entry:QueueEntry = QueueEntry.this
 
-    def asTail:Tail = null
-    def asLoaded:Loaded = null
-    def asFlushed:Flushed = null
-    def asTombstone:Tombstone = null
+    def as_tail:Tail = null
+    def as_loaded:Loaded = null
+    def as_flushed:Flushed = null
+    def as_tombstone:Tombstone = null
 
     def size:Int
     def dispatch():QueueEntry
     def messageKey:Long
-    def isFlushedOrFlushing = false
+    def is_flushed_or_flushing = false
 
     def load = entry
 
@@ -634,13 +668,22 @@ class QueueEntry(val queue:Queue, val se
 
     def tombstone = {
 
-      // Update the prefetch counter to reflect that this entry is no longer being prefetched.
-      var cur = entry
-      while( cur!=null && prefetched > 0 ) {
-        if( cur.hasSubs ) {
-          (cur.browsing ::: cur.competing).foreach { sub => if( sub.prefetched(entry) ) { sub.removePrefetch(entry) } }
+      var refill_preftch_list = List[Subscription]()
+      if( queue.tune_flush_to_store ) {
+        // Update the prefetch counter to reflect that this entry is no longer being prefetched.
+        var cur = entry
+        while( cur!=null && is_prefetched ) {
+          if( cur.hasSubs ) {
+            (cur.subscriptions).foreach { sub =>
+              if( sub.is_prefetched(entry) ) {
+                sub.remove_from_prefetch(entry)
+                refill_preftch_list ::= sub
+              }
+            }
+          }
+          cur = cur.getPrevious
         }
-        cur = cur.getPrevious
+        assert(!is_prefetched, "entry should not be prefetched.")
       }
 
       // if rv and lv are both adjacent tombstones, then this merges the rv
@@ -651,8 +694,8 @@ class QueueEntry(val queue:Queue, val se
           return rv
         }
 
-        val lts = lv.state.asTombstone
-        val rts = rv.state.asTombstone
+        val lts = lv.state.as_tombstone
+        val rts = rv.state.as_tombstone
 
         if( lts==null ||  rts==null ) {
           return rv
@@ -660,26 +703,22 @@ class QueueEntry(val queue:Queue, val se
 
         // Sanity check: the the entries are adjacent.. this should
         // always be the case.
-        if( lv.seq + lts.count  != rv.seq ) {
-          throw new AssertionError("entries are not adjacent.")
-        }
+        assert( lv.seq + lts.count  == rv.seq , "entries are not adjacent.")
 
         lts.count += rts.count
-        rts.count = 0
-
-        if( rv.browsing!=Nil || rv.competing!=Nil ){
-          lv.addBrowsing(rv.browsing)
-          lv.addCompeting(rv.competing)
-          rv.browsing = Nil
-          rv.competing = Nil
-        }
+        rv.dispatch // moves the subs to the next entry.
         rv.unlink
         return lv
       }
 
       state = new Tombstone()
       merge(entry, getNext)
-      merge(getPrevious, entry)
+      val rc = merge(getPrevious, entry)
+
+      refill_preftch_list.foreach( _.refill_prefetch )
+
+      rc.run // dispatch to move the subs to the next entry..
+      rc
     }
 
   }
@@ -691,7 +730,7 @@ class QueueEntry(val queue:Queue, val se
    */
   class Tail extends EntryState {
 
-    override def asTail:Tail = this
+    override def as_tail:Tail = this
     def size = 0
     def messageKey = -1
     def dispatch():QueueEntry = null
@@ -709,18 +748,18 @@ class QueueEntry(val queue:Queue, val se
    */
   class Loaded(val delivery: Delivery) extends EntryState {
 
-    var aquired = false
+    var acquired = false
     def messageKey = delivery.storeKey
     def size = delivery.size
     var flushing = false
 
-    override def toString = { "loaded:{ flushing: "+flushing+", aquired: "+aquired+", size:"+size+"}" }
+    override def toString = { "loaded:{ flushing: "+flushing+", acquired: "+acquired+", size:"+size+"}" }
 
-    override def isFlushedOrFlushing = {
+    override def is_flushed_or_flushing = {
       flushing
     }
 
-    override  def asLoaded = this
+    override  def as_loaded = this
 
     def store() = {
       if( delivery.storeKey == -1 ) {
@@ -732,9 +771,9 @@ class QueueEntry(val queue:Queue, val se
     }
 
     override def flush() = {
-      if( queue.host.store!=null && !flushing ) {
+      if( queue.tune_flush_to_store && !flushing ) {
         flushing=true
-        queue.flushingSize+=size
+        queue.flushing_size+=size
         if( delivery.storeBatch!=null ) {
           delivery.storeBatch.eagerFlush(^{
             queue.store_flush_source.merge(this)
@@ -751,7 +790,7 @@ class QueueEntry(val queue:Queue, val se
 
     def flushed() = {
       if( flushing ) {
-        queue.flushingSize-=size
+        queue.flushing_size-=size
         queue.size -= size
         state = new Flushed(delivery.storeKey, size)
       }
@@ -760,7 +799,7 @@ class QueueEntry(val queue:Queue, val se
     override def load() = {
       if( flushing ) {
         flushing = false
-        queue.flushingSize-=size
+        queue.flushing_size-=size
       }
       entry
     }
@@ -768,99 +807,111 @@ class QueueEntry(val queue:Queue, val se
     override def tombstone = {
       if( flushing ) {
         flushing = false
-        queue.flushingSize-=size
+        queue.flushing_size-=size
       }
       queue.size -= size
       super.tombstone
     }
 
     def dispatch():QueueEntry = {
+
+      // Nothing to dispatch if we don't have subs..
+      if( subscriptions==Nil ) {
+        // This usualy happens when a new consumer connects, it's not marked as slow but
+        // is not at the tail.  And this entry is an entry just sent by a producer.
+        return nextOrTail
+      }
+
+      // can't dispatch until the delivery is set.
       if( delivery==null ) {
-        // can't dispatch until the delivery is set.
-        null
-      } else {
+        // TODO: check to see if this ever happens
+        return null
+      }
 
-        var browsingSlowSubs:List[Subscription] = Nil
-        var browsingFastSubs:List[Subscription] = Nil
-        var competingSlowSubs:List[Subscription] = Nil
-        var competingFastSubs:List[Subscription] = Nil
-
-        if( browsing!=Nil ) {
-          val offering = delivery.copy
-          browsing.foreach { sub =>
-            if (sub.matches(offering)) {
-              if (sub.offer(offering)) {
-                browsingFastSubs ::= sub
-              } else {
-                browsingSlowSubs ::= sub
-              }
+      var heldBack:List[Subscription] = Nil
+      var advancing:List[Subscription] = Nil
+
+
+      var acquiringSub: Subscription = null
+      subscriptions.foreach{ sub=>
+
+        if( sub.browser ) {
+          if (!sub.matches(delivery)) {
+            // advance: not interested.
+            advancing ::= sub
+          } else {
+            if (sub.offer(delivery)) {
+              // advance: accepted...
+              advancing ::= sub
             } else {
-              browsingFastSubs ::= sub
+              // hold back: flow controlled
+              heldBack ::= sub
             }
           }
-        }
 
-        if( competing!=Nil ) {
-          if (!this.aquired) {
-            aquired = true
-
-            var picked: Subscription = null
-            var remaining = competing
-            while( remaining!=Nil && picked == null ) {
-              val sub = remaining.head
-              remaining = remaining.drop(1)
-              if (sub.matches(delivery)) {
-                competingSlowSubs = competingSlowSubs ::: sub :: Nil
-
-                if( !sub.full ) {
-                  val node = sub.add(entry)
-                  val offering = delivery.copy
-                  offering.ack = (tx)=> {
-                    queue.ack_source.merge((node, tx))
-                  }
-                  if (sub.offer(offering)) {
-                    picked = sub
-                  }
+        } else {
+          if( acquired ) {
+            // advance: another sub already acquired this entry..
+            advancing = advancing ::: sub :: Nil
+          } else {
+            if (!sub.matches(delivery)) {
+              // advance: not interested.
+              advancing = advancing ::: sub :: Nil
+            } else {
+              if( sub.full ) {
+                // hold back: flow controlled
+                heldBack = heldBack ::: sub :: Nil
+              } else {
+                // advance: accepted...
+                acquiringSub = sub
+                acquired = true
+
+                val acquiredQueueEntry = sub.acquire(entry)
+                val acquiredDelivery = delivery.copy
+                acquiredDelivery.ack = (tx)=> {
+                  queue.ack_source.merge((acquiredQueueEntry, tx))
                 }
 
-              } else {
-                competingFastSubs = competingFastSubs ::: sub :: Nil
+                assert(sub.offer(acquiredDelivery), "sub should have accepted, it had reported not full earlier.")
               }
             }
-
-            if (picked == null) {
-              aquired = false
-            } else {
-              competingFastSubs = remaining ::: competingFastSubs ::: competingSlowSubs
-              competingSlowSubs = Nil
-            }
-          } else {
-            competingFastSubs = competing
           }
         }
+      }
 
-        // The slow subs stay on this entry..
-        browsing = browsingSlowSubs
-        competing = competingSlowSubs
-
-        // the fast subs move on to the next entry...
-        if ( browsingFastSubs!=null &&  competingFastSubs!=null) {
-          val p = nextOrTail
-          p.addBrowsing(browsingFastSubs)
-          p.addCompeting(competingFastSubs)
-
-          // flush this entry out if it's not going to be needed soon.
-          def haveQuickConsumer = queue.fastSubs.find( sub=> sub.pos.seq <= seq ).isDefined
-          if( !hasSubs && prefetched==0 && !aquired && !haveQuickConsumer ) {
-            // then flush out to make space...
-            flush
-          }
-
-          p
+      // The acquiring sub is added last to the list so that
+      // the other competing subs get first dibs at the next entry.
+      if( acquiringSub != null ) {
+
+        // Advancing may need to be held back because the sub's prefetch is full.
+        if( acquiringSub.prefetchFull ) {
+          advancing = advancing ::: acquiringSub :: Nil
         } else {
-          null
+          heldBack = heldBack ::: acquiringSub :: Nil
         }
       }
+
+      // The held back subs stay on this entry..
+      subscriptions = heldBack
+
+      // the advancing subs move on to the next entry...
+      if ( advancing!=Nil ) {
+
+        val next = nextOrTail
+        next.addSubscriptions(advancing)
+        advancing.foreach(_.advance(next))
+
+        // flush this entry out if it's not going to be needed soon.
+        def haveQuickConsumer = queue.fast_subscriptions.find( sub=> sub.pos.seq <= seq ).isDefined
+        if( !hasSubs && !is_prefetched && !acquired && !haveQuickConsumer ) {
+          // then flush out to make space...
+          flush
+        }
+
+        return next
+      } else {
+        return null
+      }
     }
   }
 
@@ -877,18 +928,19 @@ class QueueEntry(val queue:Queue, val se
     def size = 0
     def messageKey = -1
 
-    override def asTombstone = this
+    override def as_tombstone = this
 
     /**
      * Nothing ot dispatch in a Tombstone, move the subscriptions to the next entry.
      */
     def dispatch():QueueEntry = {
-      val p = nextOrTail
-      p.addBrowsing(browsing)
-      p.addCompeting(competing)
-      browsing = Nil
-      competing = Nil
-      p
+      assert(prefetched==0, "tombstones should never be prefetched.")
+
+      val next = nextOrTail
+      next.addSubscriptions(subscriptions)
+      subscriptions.foreach(_.advance(next))
+      subscriptions = Nil
+      next
     }
 
     override def tombstone = throw new AssertionError("Tombstone entry cannot be tombstoned")
@@ -908,27 +960,18 @@ class QueueEntry(val queue:Queue, val se
 
     var loading = false
 
-    override def asFlushed = this
+    override def as_flushed = this
 
-    override def isFlushedOrFlushing = true
+    override def is_flushed_or_flushing = true
 
     override def toString = { "flushed:{ loading: "+loading+", size:"+size+"}" }
 
     // Flushed entries can't be dispatched until
     // they get loaded.
     def dispatch():QueueEntry = {
-      if( !loading && hasSubs) {
-
-        // I don't think this should ever happen as we should be prefetching the
-        // entry before we dispatch it.
-        warn("dispatch called on a flushed entry that is not loading.")
-
-        // ask the subs to fill the prefetches.. that should
-        // kick off a load
-        (browsing ::: competing).foreach { sub =>
-          sub.fillPrefetch
-        }
-      }
+      // This dispatch can happen when a subscription is holding onto lots of acquired entries
+      // it can't prefetch anymore as it's waiting for ack on those messages to avoid
+      // blowing it's memory limits.
       null
     }
 
@@ -937,7 +980,7 @@ class QueueEntry(val queue:Queue, val se
 //        trace("Start entry load of message seq: %s", seq)
         // start loading it back...
         loading = true
-        queue.loadingSize += size
+        queue.loading_size += size
         queue.host.store.loadMessage(messageKey) { delivery =>
           // pass off to a source so it can aggregate multiple
           // loads to reduce cross thread synchronization
@@ -946,7 +989,7 @@ class QueueEntry(val queue:Queue, val se
             queue.store_load_source.merge((this, delivery.get))
           } else {
 
-            debug("Detected store drop of message seq: %d", seq)
+            info("Detected store dropped message at seq: %d", seq)
 
             // Looks like someone else removed the message from the store.. lets just
             // tombstone this entry now.
@@ -963,7 +1006,7 @@ class QueueEntry(val queue:Queue, val se
       if( loading ) {
 //        debug("Loaded message seq: ", seq )
         loading = false
-        queue.loadingSize -= size
+        queue.loading_size -= size
 
         val delivery = new Delivery()
         delivery.message = ProtocolFactory.get(messageRecord.protocol).decode(messageRecord.value)
@@ -982,7 +1025,7 @@ class QueueEntry(val queue:Queue, val se
       if( loading ) {
 //        debug("Tombstoned, will ignore store load of seq: ", seq)
         loading = false
-        queue.loadingSize -= size
+        queue.loading_size -= size
       }
       super.tombstone
     }
@@ -990,105 +1033,238 @@ class QueueEntry(val queue:Queue, val se
 }
 
 
-class LinkedQueueEntry(val value:QueueEntry) extends LinkedNode[LinkedQueueEntry]
-
 class Subscription(queue:Queue) extends DeliveryProducer with DispatchLogging {
   override protected def log = Queue
 
   def dispatchQueue = queue.dispatchQueue
 
-  var dispatched = new LinkedNodeList[LinkedQueueEntry]
+  var acquired = new LinkedNodeList[AcquiredQueueEntry]
   var session: DeliverySession = null
   var pos:QueueEntry = null
 
-  var cursoredCounter = 0L
+  var advanced_size = 0L
 
   // Vars used to detect slow consumers.
-  var prevCursoredCounter = 0L
-  var tailParkings = 0
-  var slowIntervals = 0
-
-  def slow = slowIntervals > queue.tune_max_slow_intervals
-
-  var nextPrefetchPos:QueueEntry = null
-  var prefetchSize = 0
-
-
-  override def toString = "{ prefetchSize: "+prefetchSize+", pos: "+(if(pos==null) null else pos.seq)+" nextPrefetchPos: "+(if(nextPrefetchPos==null) null else nextPrefetchPos.seq)+" }"
-
-  def position(value:QueueEntry):Unit = {
-    if( value!=null ) {
-      // setting a new position..
-      if( pos!=null ) {
-        // Remove the previous pos from the prefetch counters.
-        pos.prefetched -= 1
-        removePrefetch(pos)
-        cursoredCounter += pos.size
-      }
-    } else {
-      // setting null pos, happens when the sub is closed.
-      var cur = pos
+  var last_cursored_size = 0L
+  var tail_parkings = 0
+  var slow_intervals = 0
+
+  def slow = slow_intervals > queue.tune_max_slow_intervals
+
+  var prefetch_tail:QueueEntry = null
+  var prefetched_size = 0
+  var acquired_size = 0L
 
-      // clean up it's prefetch counters on the entries..
-      while( cur!=nextPrefetchPos ) {
-        cur.prefetched -= 1
-        cur = cur.nextOrTail
-      }
-    }
-    pos = value
+  override def toString = {
+    def seq(entry:QueueEntry) = if(entry==null) null else entry.seq
+    "{ acquired_size: "+acquired_size+", prefetch_size: "+prefetched_size+", pos: "+seq(pos)+" prefetch_tail: "+seq(prefetch_tail)+" }"
+  }
+
+  def browser = session.consumer.browser
+
+  def open(consumer: DeliveryConsumer) = {
+    pos = queue.head_entry;
+    session = consumer.connect(this)
     session.refiller = pos
-    if( tailParked ) {
-      tailParkings += 1
+
+    queue.head_entry.addSubscriptions(this :: Nil)
+    refill_prefetch
+
+    // kick off the initial dispatch.
+    queue.dispatchQueue << queue.head_entry
+  }
+
+  def close() = {
+    pos.removeSubscriptions(this)
+
+    invalidate_prefetch
+
+    pos = null
+    session.refiller = null
+    session.close
+    session = null
+
+    // nack all the acquired entries.
+    var next = acquired.getHead
+    while( next !=null ) {
+      val cur = next;
+      next = next.getNext
+      cur.nack // this unlinks the entry.
     }
+
+    // show the queue entries... after we disconnect.
+    queue.dispatchQueue.dispatchAfter(1, TimeUnit.SECONDS, ^{
+      queue.display_active_entries
+    })
   }
 
+  /**
+   * Advances the subscriptions position to the specified
+   * queue entry.
+   */
+  def advance(value:QueueEntry):Unit = {
+    assert(value!=null)
+
+    // Remove the previous pos from the prefetch counters.
+    if( prefetch_tail!=null && !pos.is_tombstone) {
+      remove_from_prefetch(pos)
+    }
+    advanced_size += pos.size
 
-  def prefetched(value:QueueEntry) = {
-    pos.seq <= value.seq && value.seq < nextPrefetchPos.seq
+    pos = value
+    session.refiller = pos
+
+    refill_prefetch()
+    if( tail_parked ) {
+      tail_parkings += 1
+    }
   }
 
-  def removePrefetch(value:QueueEntry):Unit = {
-    prefetchSize -= value.size
-    fillPrefetch()
+  /**
+   * Rewinds to a previously seen location.. Happens when
+   * a nack occurs from another consumer.
+   */
+  def rewind(value:QueueEntry):Unit = {
+    assert(value!=null)
+    invalidate_prefetch
+    pos = value
+    session.refiller = pos
+    queue.dispatchQueue << value // queue up the entry to get dispatched..
   }
 
-  def fillPrefetch() = {
-    // attempts to fill the prefetch...
-    while(prefetchSize < queue.tune_subscription_buffer && nextPrefetchPos.asTail==null ) {
-      addPrefetch(nextPrefetchPos)
+  def invalidate_prefetch: Unit = {
+    if (prefetch_tail != null) {
+      // release the prefetch counters...
+      var cur = pos
+      while (cur.seq <= prefetch_tail.seq) {
+        if (!cur.is_tombstone) {
+          prefetched_size -= cur.size
+          cur.prefetched -= 1
+        }
+        cur = cur.nextOrTail
+      }
+      assert(prefetched_size == 0, "inconsistent prefetch size.")
     }
   }
 
-  def addPrefetch(value:QueueEntry):Unit = {
-    prefetchSize += value.size
-    value.prefetched += 1
-    value.load
-    nextPrefetchPos = value.nextOrTail
+
+  /**
+   * Is the specified queue entry prefeteched by this subscription?
+   */
+  def is_prefetched(value:QueueEntry) = {
+    prefetch_tail!=null && pos.seq <= value.seq && value.seq <= prefetch_tail.seq
+  }
+
+
+  def add_to_prefetch(entry:QueueEntry):Unit = {
+    assert( !entry.is_tombstone, "tombstones should not be prefetched..")
+    prefetched_size += entry.size
+    entry.prefetched += 1
+    entry.load
+    prefetch_tail = entry
   }
 
-  def tailParked = pos eq queue.tailEntry
+  def remove_from_prefetch(entry:QueueEntry):Unit = {
+    prefetched_size -= entry.size
+    entry.prefetched -= 1
 
-  def connect(consumer: DeliveryConsumer) = {
-    session = consumer.connect(this)
-    addPrefetch(queue.headEntry)
-    queue.headEntry.addCompeting(this :: Nil)
-    queue.dispatchQueue << queue.headEntry
+    if( entry == prefetch_tail ) {
+      prefetch_tail = prefetch_tail.getPrevious;
+      if( prefetch_tail==null || prefetch_tail.seq < pos.seq ) {
+        prefetch_tail = null
+        assert( prefetched_size == 0 , "inconsistent prefetch size.")
+      }
+    } else {
+      assert( prefetched_size >= 0 , "inconsistent prefetch size.")
+    }
   }
 
-  def close() = {
-    pos.removeCompeting(this)
-    session.close
-    session = null
-    queue.nack(dispatched)
+  def refill_prefetch() = {
+    if( queue.tune_flush_to_store ) {
+      def next_prefetch_pos = if(prefetch_tail==null) {
+        if( !pos.is_tail ) {
+          pos
+        } else {
+          null
+        }
+      } else  {
+        prefetch_tail.getNext
+      }
+
+      // attempts to fill the prefetch...
+      var next = next_prefetch_pos
+      while( !prefetchFull && next!=null ) {
+        if( !next.is_tombstone ) {
+          add_to_prefetch(next)
+        }
+        next = next.getNext
+      }
+    }
   }
 
+  def prefetchFull() = acquired_size + prefetched_size >= queue.tune_consumer_buffer
+
+  def tail_parked = pos eq queue.tail_entry
+
   def matches(entry:Delivery) = session.consumer.matches(entry)
   def full = session.full
   def offer(delivery:Delivery) = session.offer(delivery)
 
-  def add(entry:QueueEntry) = {
-    val rc = new LinkedQueueEntry(entry)
-    dispatched.addLast(rc)
-    rc
+
+  class AcquiredQueueEntry(val entry:QueueEntry) extends LinkedNode[AcquiredQueueEntry] {
+
+    acquired.addLast(this)
+    acquired_size += entry.size
+
+    def ack(sb:StoreBatch) = {
+
+      if (entry.messageKey != -1) {
+        val storeBatch = if( sb == null ) {
+          queue.host.store.createStoreBatch
+        } else {
+          sb
+        }
+        storeBatch.dequeue(entry.toQueueEntryRecord)
+        if( sb == null ) {
+          storeBatch.release
+        }
+      }
+      if( sb != null ) {
+        sb.release
+      }
+
+      queue.dequeue_item_counter += 1
+      queue.dequeue_size_counter += entry.size
+
+      // removes this entry from the acquired list.
+      unlink()
+
+      // we may now be able to prefetch some messages..
+      acquired_size -= entry.size
+      entry.tombstone // entry size changes to 0
+      refill_prefetch
+    }
+
+    def nack = {
+
+      entry.as_loaded.acquired = false
+      acquired_size -= entry.size
+
+      // track for stats
+      queue.nack_item_counter += 1
+      queue.nack_size_counter += entry.size
+
+      // rewind all the matching competing subs past the entry.. back to the entry
+      queue.all_subscriptions.valuesIterator.foreach{ sub=>
+        if( !sub.browser && entry.seq < sub.pos.seq && sub.matches(entry.as_loaded.delivery)) {
+          sub.rewind(entry)
+        }
+      }
+      unlink()
+    }
   }
+
+
+  def acquire(entry:QueueEntry) = new AcquiredQueueEntry(entry)
+
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala?rev=961156&r1=961155&r2=961156&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala Wed Jul  7 04:11:49 2010
@@ -159,7 +159,11 @@ class HawtDBClient(hawtDBStore: HawtDBSt
       pageFileFactory.setSync(true)
       pageFileFactory.setUseWorkerThread(true)
       pageFileFactory.setPageSize(512.toShort)
-      pageFileFactory.setCacheSize((1024*1024*20)/512); // 20 meg page cache 
+
+      // Empirically found (using profiler) that a cached BTree page retains
+      // about 4000 bytes of mem ON 64 bit platform.
+      pageFileFactory.setCacheSize((1024*1024*20)/4000 );
+
       pageFileFactory.open()
 
       val initialized = withTx { tx =>
@@ -193,7 +197,7 @@ class HawtDBClient(hawtDBStore: HawtDBSt
 
       // Schedual periodic jobs.. they keep executing while schedual_version remains the same.
       schedualCleanup(schedual_version.get())
-      schedualFlush(schedual_version.get())
+      // schedualFlush(schedual_version.get())
     }
   }
 
@@ -312,11 +316,37 @@ class HawtDBClient(hawtDBStore: HawtDBSt
   val metric_load_from_index = new TimeCounter
   val metric_load_from_journal = new TimeCounter
 
+  def loadMessages(requests: ListBuffer[(Long, (Option[MessageRecord])=>Unit)]) = {
+    val locations = withTx { tx =>
+      val helper = new TxHelper(tx)
+      import JavaConversions._
+      import helper._
+      requests.flatMap { case (messageKey, callback)=>
+        val location = metric_load_from_index.time {
+          messageKeyIndex.get(messageKey)
+        }
+        if( location==null ) {
+          debug("Message not indexed.  Journal location could not be determined for message: %s", messageKey)
+          callback(None)
+          None
+        } else {
+          Some((location, callback))
+        }
+      }
+    }
+
+    locations.foreach { case (location, callback)=>
+      val addMessage = metric_load_from_journal.time {
+        load(location, classOf[AddMessage.Getter])
+      }
+      callback( addMessage.map( x => toMessageRecord(x) ) )
+    }
+
+  }
+
   def loadMessage(messageKey: Long): Option[MessageRecord] = {
     metric_load_from_index.start { end =>
       withTx { tx =>
-        val idxPage = rootBuffer.getMessageKeyIndexPage
-
         val helper = new TxHelper(tx)
         import JavaConversions._
         import helper._

Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala?rev=961156&r1=961155&r2=961156&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala Wed Jul  7 04:11:49 2010
@@ -91,7 +91,7 @@ class HawtDBStore extends Store with Bas
   }
 
   protected def _start(onCompleted: Runnable) = {
-    executor_pool = Executors.newFixedThreadPool(20, new ThreadFactory(){
+    executor_pool = Executors.newFixedThreadPool(1, new ThreadFactory(){
       def newThread(r: Runnable) = {
         val rc = new Thread(r, "hawtdb store client")
         rc.setDaemon(true)
@@ -160,10 +160,19 @@ class HawtDBStore extends Store with Bas
     }
   }
 
+  val load_source = createSource(new ListEventAggregator[(Long, (Option[MessageRecord])=>Unit)](), dispatchQueue)
+  load_source.setEventHandler(^{drain_loads});
+  load_source.resume
+
+
   def loadMessage(id: Long)(callback: (Option[MessageRecord]) => Unit) = {
+    load_source.merge((id, callback))
+  }
+
+  def drain_loads = {
+    var data = load_source.getData
     executor_pool ^{
-      val rc = client.loadMessage(id)
-      callback( rc )
+      client.loadMessages(data)
     }
   }
 
@@ -437,28 +446,30 @@ class HawtDBStore extends Store with Bas
 
     if( !txs.isEmpty ) {
       storeLatency.start { end=>
-        client.store(txs, ^{
-          dispatchQueue {
-
-            end()
-            txs.foreach { tx=>
-
-              tx.actions.foreach { case (msg, action) =>
-                if( action.messageRecord !=null ) {
-                  metric_flushed_message_counter += 1
-                  pendingStores.remove(msg)
+        executor_pool {
+          client.store(txs, ^{
+            dispatchQueue {
+
+              end()
+              txs.foreach { tx=>
+
+                tx.actions.foreach { case (msg, action) =>
+                  if( action.messageRecord !=null ) {
+                    metric_flushed_message_counter += 1
+                    pendingStores.remove(msg)
+                  }
+                  action.enqueues.foreach { queueEntry=>
+                    metric_flushed_enqueue_counter += 1
+                    val k = key(queueEntry)
+                    pendingEnqueues.remove(k)
+                  }
                 }
-                action.enqueues.foreach { queueEntry=>
-                  metric_flushed_enqueue_counter += 1
-                  val k = key(queueEntry)
-                  pendingEnqueues.remove(k)
-                }
-              }
-              tx.onPerformed
+                tx.onPerformed
 
+              }
             }
-          }
-        })
+          })
+        }
       }
     }
   }