You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 06:12:40 UTC

svn commit: r961163 - in /activemq/sandbox/activemq-apollo-actor: activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/ activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/ activemq-hawtdb/src/main/scala/org/apache/...

Author: chirino
Date: Wed Jul  7 04:12:40 2010
New Revision: 961163

URL: http://svn.apache.org/viewvc?rev=961163&view=rev
Log:
A group of Flushed messages can now be tracked as a single Queue entry.  Right now only used on restart when the initial queue contents are loaded from the store.  Drastically reduced memory consumption needed for managing queues with a large number of entries.  Updated store interfaces so to allow loading in groups.  Renamed StoreBatch to StoreUOW

Added:
    activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/QueueEntryGroup.java
    activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/StoreUOW.scala
Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
    activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala
    activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
    activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala
    activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreBenchmarkSupport.scala
    activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreFunSuiteSupport.scala

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=961163&r1=961162&r2=961163&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala Wed Jul  7 04:12:40 2010
@@ -20,7 +20,7 @@ import _root_.org.apache.activemq.filter
 import _root_.java.lang.{String}
 import _root_.org.fusesource.hawtdispatch._
 import org.fusesource.hawtbuf._
-import org.apache.activemq.broker.store.StoreBatch
+import org.apache.activemq.broker.store.StoreUOW
 import org.apache.activemq.apollo.store.MessageRecord
 import protocol.ProtocolFactory
 
@@ -149,13 +149,13 @@ class Delivery extends BaseRetained {
   /**
    * The transaction the delivery is participating in.
    */
-  var storeBatch:StoreBatch = null
+  var uow:StoreUOW = null
 
   /**
    * Set if the producer requires an ack to be sent back.  Consumer
    * should execute once the message is processed.
    */
-  var ack:(StoreBatch)=>Unit = null
+  var ack:(StoreUOW)=>Unit = null
 
   def copy() = (new Delivery).set(this)
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=961163&r1=961162&r2=961163&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Wed Jul  7 04:12:40 2010
@@ -22,11 +22,11 @@ import collection.{SortedMap}
 import org.fusesource.hawtdispatch.{ScalaDispatch, DispatchQueue, BaseRetained}
 import org.apache.activemq.util.TreeMap.TreeEntry
 import org.apache.activemq.util.list.{LinkedNodeList, LinkedNode}
-import org.apache.activemq.broker.store.{StoreBatch}
+import org.apache.activemq.broker.store.{StoreUOW}
 import protocol.ProtocolFactory
-import org.apache.activemq.apollo.store.{QueueEntryRecord, MessageRecord}
 import java.util.concurrent.TimeUnit
 import java.util.{HashSet, Collections, ArrayList, LinkedList}
+import org.apache.activemq.apollo.store.{QueueEntryGroup, QueueEntryRecord, MessageRecord}
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -57,7 +57,7 @@ object Queue extends Log {
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class Queue(val host: VirtualHost, val destination: Destination) extends BaseRetained with Route with DeliveryConsumer with DispatchLogging {
+class Queue(val host: VirtualHost, val destination: Destination, val queueKey: Long = -1L) extends BaseRetained with Route with DeliveryConsumer with BaseService with DispatchLogging {
   override protected def log = Queue
 
   import Queue._
@@ -77,7 +77,7 @@ class Queue(val host: VirtualHost, val d
   })
 
 
-  val ack_source = createSource(new ListEventAggregator[(Subscription#AcquiredQueueEntry, StoreBatch)](), dispatchQueue)
+  val ack_source = createSource(new ListEventAggregator[(Subscription#AcquiredQueueEntry, StoreUOW)](), dispatchQueue)
   ack_source.setEventHandler(^ {drain_acks});
   ack_source.resume
 
@@ -95,7 +95,7 @@ class Queue(val host: VirtualHost, val d
 
   var loading_size = 0
   var flushing_size = 0
-  var store_id: Long = -1L
+
 
   //
   // Tuning options.
@@ -123,15 +123,23 @@ class Queue(val host: VirtualHost, val d
   var tune_slow_check_interval = 200L
 
   /**
+   * Should this queue persistently store it's entries?
+   */
+  def tune_persistent = host.store !=null
+
+  /**
    * Should messages be flushed or swapped out of memory if
    * no consumers need the message?
    */
   def tune_flush_to_store = tune_persistent
 
   /**
-   * Should this queue persistently store it's entries?
+   * The number max number of flushed queue entries to load
+   * for the store at a time.  Not that Flushed entires are just
+   * reference pointers to the actual messages.  When not loaded,
+   * the batch is referenced as sequence range to conserve memory.
    */
-  def tune_persistent = host.store !=null
+  def tune_entry_group_size = 10000
 
   /**
    * The number of intervals that a consumer must not meeting the subscription rate before it is
@@ -146,31 +154,72 @@ class Queue(val host: VirtualHost, val d
   var nack_item_counter = 0L
   var nack_size_counter = 0L
 
+  var flushed_items = 0L
+
   def queue_size = enqueue_size_counter - dequeue_size_counter
   def queue_items = enqueue_item_counter - dequeue_item_counter
 
   private var capacity = tune_producer_buffer
   var size = 0
 
-  schedual_slow_consumer_check
+  protected def _start(onCompleted: Runnable) = {
 
-  def restore(storeId: Long, records:Seq[QueueEntryRecord]) = ^{
-    this.store_id = storeId
-    if( !records.isEmpty ) {
+    def completed: Unit = {
+      // by the time this is run, consumers and producers may have already joined.
+      onCompleted.run
+      display_stats
+      schedual_slow_consumer_check
+      // wake up the producers to fill us up...
+      if (messages.refiller != null) {
+        messages.refiller.run
+      }
+
+      // kick of dispatching to the consumers.
+      all_subscriptions.valuesIterator.foreach( _.refill_prefetch )
+      dispatchQueue << head_entry
+    }
+
+    if( tune_persistent ) {
+      host.store.listQueueEntryGroups(queueKey, tune_entry_group_size) { groups=>
+        dispatchQueue {
+          if( !groups.isEmpty ) {
+
+            // adjust the head tombstone.
+            head_entry.as_tombstone.count = groups.head.firstSeq
+
+            var last:QueueEntryGroup = null
+            groups.foreach { group =>
+
+              // if groups were not adjacent.. create tombstone entry.
+              if( last!=null && (last.lastSeq+1 != group.firstSeq) ) {
+                val entry = new QueueEntry(Queue.this, last.lastSeq+1)
+                entry.tombstone.as_tombstone.count = group.firstSeq - (last.lastSeq+1)
+                entries.addLast(entry)
+              }
 
-      // adjust the head tombstone.
-      head_entry.as_tombstone.count = records.head.queueSeq
+              val entry = new QueueEntry(Queue.this, group.firstSeq).init(group)
+              entries.addLast(entry)
 
-      records.foreach { qer =>
-        val entry = new QueueEntry(Queue.this,qer.queueSeq).init(qer)
-        entries.addLast(entry)
-      }
+              message_seq_counter = group.lastSeq
+              enqueue_item_counter += group.count
+              enqueue_size_counter += group.size
 
-      message_seq_counter = records.last.queueSeq+1
-      enqueue_item_counter += records.size
-      debug("restored: "+records.size )
+              last = group
+            }
+
+            debug("restored: "+enqueue_item_counter)
+          }
+          completed
+        }
+      }
+    } else {
+      completed
     }
-  } >>: dispatchQueue
+  }
+
+  protected def _stop(onCompleted: Runnable) = {
+    throw new AssertionError("Not implemented.");
+  }
 
   def addCapacity(amount:Int) = {
     capacity += amount
@@ -180,10 +229,7 @@ class Queue(val host: VirtualHost, val d
 
     var refiller: Runnable = null
 
-    def full = if(size >= capacity)
-      true
-    else
-      false
+    def full = (size >= capacity) || !serviceState.isStarted
 
     def offer(delivery: Delivery): Boolean = {
       if (full) {
@@ -196,7 +242,7 @@ class Queue(val host: VirtualHost, val d
         entry.init(queueDelivery)
         
         if( tune_persistent ) {
-          queueDelivery.storeBatch = delivery.storeBatch
+          queueDelivery.uow = delivery.uow
         }
 
         entries.addLast(entry)
@@ -204,11 +250,10 @@ class Queue(val host: VirtualHost, val d
         enqueue_size_counter += entry.size
 
         // Do we need to do a persistent enqueue???
-        if (queueDelivery.storeBatch != null) {
-          queueDelivery.storeBatch.enqueue(entry.toQueueEntryRecord)
+        if (queueDelivery.uow != null) {
+          entry.as_loaded.store
         }
 
-
         def haveQuickConsumer = fast_subscriptions.find( sub=> sub.pos.seq <= entry.seq ).isDefined
 
         var dispatched = false
@@ -222,9 +267,9 @@ class Queue(val host: VirtualHost, val d
         }
 
         // release the store batch...
-        if (queueDelivery.storeBatch != null) {
-          queueDelivery.storeBatch.release
-          queueDelivery.storeBatch = null
+        if (queueDelivery.uow != null) {
+          queueDelivery.uow.release
+          queueDelivery.uow = null
         }
 
         true
@@ -244,20 +289,29 @@ class Queue(val host: VirtualHost, val d
     var total_items = 0L
     var total_size = 0L
     while (cur != null) {
-      if (cur.is_loaded || cur.hasSubs || cur.is_prefetched) {
+      if (cur.is_loaded || cur.hasSubs || cur.is_prefetched || cur.is_flushed_group ) {
         info("  => " + cur)
       }
+
+      total_size += cur.size
       if (cur.is_flushed || cur.is_loaded) {
         total_items += 1
-        total_size += cur.size
+      } else if (cur.is_flushed_group ) {
+        total_items += cur.as_flushed_group.items
       }
+      
       cur = cur.getNext
     }
     info("tail: " + tail_entry)
 
     // sanitiy checks..
-    assert(total_items == queue_items)
-    assert(total_size == queue_size)
+    if(total_items != queue_items) {
+      warn("queue_items mismatch, found %d, expected %d", total_size, queue_items)
+    }
+    if(total_size != queue_size) {
+      warn("queue_size mismatch, found %d, expected %d", total_size, queue_size)
+
+    }
   }
 
   def schedual_slow_consumer_check:Unit = {
@@ -269,14 +323,14 @@ class Queue(val host: VirtualHost, val d
         check_counter += 1
 
         if( (check_counter%10)==0  ) {
-//          display_stats
+          display_stats
         }
 
-//        if( (check_counter%100)==0 ) {
+        if( (check_counter%25)==0 ) {
 //          if (!all_subscriptions.isEmpty) {
 //            display_active_entries
 //          }
-//        }
+        }
 
         // target tune_min_subscription_rate / sec
         val slow_cursor_delta = (((tune_slow_subscription_rate) * tune_slow_check_interval) / 1000).toInt
@@ -413,8 +467,8 @@ class Queue(val host: VirtualHost, val d
         false
       } else {
 
-        if( tune_persistent && delivery.storeBatch!=null ) {
-          delivery.storeBatch.retain
+        if( tune_persistent && delivery.uow!=null ) {
+          delivery.uow.retain
         }
         val rc = session.offer(delivery)
         assert(rc, "session should accept since it was not full")
@@ -567,7 +621,7 @@ class QueueEntry(val queue:Queue, val se
   def is_prefetched = prefetched>0
 
   def init(delivery:Delivery):QueueEntry = {
-    this.state = new Loaded(delivery)
+    this.state = new Loaded(delivery, false)
     queue.size += size
     this
   }
@@ -577,6 +631,13 @@ class QueueEntry(val queue:Queue, val se
     this
   }
 
+  def init(group:QueueEntryGroup):QueueEntry = {
+    val count = (((group.lastSeq+1)-group.firstSeq)).toInt
+    val tombstones = count-group.count
+    this.state = new FlushedGroup(count, group.size, tombstones)
+    this
+  }
+
   def hasSubs = !(subscriptions == Nil )
 
   /**
@@ -614,7 +675,7 @@ class QueueEntry(val queue:Queue, val se
 
   def toQueueEntryRecord = {
     val qer = new QueueEntryRecord
-    qer.queueKey = queue.store_id
+    qer.queueKey = queue.queueKey
     qer.queueSeq = seq
     qer.messageKey = state.messageKey
     qer.size = state.size
@@ -634,6 +695,7 @@ class QueueEntry(val queue:Queue, val se
   // What state is it in?
   def as_tombstone = this.state.as_tombstone
   def as_flushed = this.state.as_flushed
+  def as_flushed_group = this.state.as_flushed_group
   def as_loaded = this.state.as_loaded
   def as_tail = this.state.as_tail
 
@@ -642,6 +704,7 @@ class QueueEntry(val queue:Queue, val se
 
   def is_loaded = as_loaded!=null
   def is_flushed = as_flushed!=null
+  def is_flushed_group = as_flushed_group!=null
   def is_tombstone = as_tombstone!=null
 
   // These should not change the current state.
@@ -662,6 +725,7 @@ class QueueEntry(val queue:Queue, val se
     def as_tail:Tail = null
     def as_loaded:Loaded = null
     def as_flushed:Flushed = null
+    def as_flushed_group:FlushedGroup = null
     def as_tombstone:Tombstone = null
 
     def size:Int
@@ -673,9 +737,8 @@ class QueueEntry(val queue:Queue, val se
 
     def flush = entry
 
-    def tombstone = {
-
-      var refill_preftch_list = List[Subscription]()
+    def prefetch_remove = {
+      var rc = List[Subscription]()
       if( queue.tune_flush_to_store ) {
         // Update the prefetch counter to reflect that this entry is no longer being prefetched.
         var cur = entry
@@ -684,7 +747,7 @@ class QueueEntry(val queue:Queue, val se
             (cur.subscriptions).foreach { sub =>
               if( sub.is_prefetched(entry) ) {
                 sub.remove_from_prefetch(entry)
-                refill_preftch_list ::= sub
+                rc ::= sub
               }
             }
           }
@@ -692,6 +755,12 @@ class QueueEntry(val queue:Queue, val se
         }
         assert(!is_prefetched, "entry should not be prefetched.")
       }
+      rc
+    }
+
+    def tombstone = {
+
+      var refill_preftch_list = prefetch_remove
 
       // if rv and lv are both adjacent tombstones, then this merges the rv
       // tombstone into lv, unlinks rv, and returns lv, otherwise it returns
@@ -753,7 +822,7 @@ class QueueEntry(val queue:Queue, val se
    * This state is used while a message is loaded in memory.  A message must be in this state
    * before it can be dispatched to a consumer.  It can transition to Flushed or Tombstone.
    */
-  class Loaded(val delivery: Delivery) extends EntryState {
+  class Loaded(val delivery: Delivery, var store_completed:Boolean) extends EntryState {
 
     var acquired = false
     def messageKey = delivery.storeKey
@@ -768,34 +837,58 @@ class QueueEntry(val queue:Queue, val se
 
     override  def as_loaded = this
 
-    def store() = {
-      if( delivery.storeKey == -1 ) {
-        val tx = queue.host.store.createStoreBatch
-        delivery.storeKey = tx.store(delivery.createMessageRecord)
-        tx.enqueue(toQueueEntryRecord)
-        tx.release
-      }
+    def store = {
+      delivery.uow.enqueue(toQueueEntryRecord)
+      delivery.uow.onComplete(^{
+        queue.store_flush_source.merge(this)
+      })
     }
 
     override def flush() = {
-      if( queue.tune_flush_to_store && !flushing ) {
-        flushing=true
-        queue.flushing_size+=size
-        if( delivery.storeBatch!=null ) {
-          delivery.storeBatch.eagerFlush(^{
-            queue.store_flush_source.merge(this)
-          })
+      if( queue.tune_flush_to_store ) {
+        if( store_completed ) {
+          flushing=true
+          flushed
         } else {
-          store
-          queue.host.store.flushMessage(messageKey) {
-            queue.store_flush_source.merge(this)
+          if( !flushing ) {
+            flushing=true
+            queue.flushing_size+=size
+
+            // The storeBatch is only set when called from the messages.offer method
+            if( delivery.uow!=null ) {
+              delivery.uow.completeASAP
+            } else {
+
+              // Are swapping out a non-persistent message?
+              if( delivery.storeKey == -1 ) {
+                
+                delivery.uow = queue.host.store.createStoreUOW
+                val uow = delivery.uow
+                delivery.storeKey = uow.store(delivery.createMessageRecord)
+                store
+                uow.completeASAP
+                uow.release
+                delivery.uow = null
+
+              } else {
+
+                queue.host.store.flushMessage(messageKey) {
+                  queue.store_flush_source.merge(this)
+                }
+
+              }
+
+            }
           }
         }
       }
+
       entry
     }
 
     def flushed() = {
+      store_completed = true
+      delivery.uow = null
       if( flushing ) {
         queue.flushing_size-=size
         queue.size -= size
@@ -955,13 +1048,101 @@ class QueueEntry(val queue:Queue, val se
 
   }
 
+
+  class FlushedGroup(
+   /** The number of adjacent entries this FlushedGroup represents. */
+   var count:Long,
+   /** size in bytes of the group */
+   var size:Int,
+   /** The number of tombstone entries in the groups */
+   var tombstones:Int ) extends EntryState {
+
+
+    def items = count - tombstones
+
+    def messageKey = -1
+
+    var loading = false
+
+    override def as_flushed_group = this
+
+    override def is_flushed_or_flushing = true
+
+    override def toString = { "flushed_group:{ loading: "+loading+", items: "+items+", size: "+size+"}" }
+
+    // Flushed entries can't be dispatched until
+    // they get loaded.
+    def dispatch():QueueEntry = {
+      null
+    }
+
+    override def load() = {
+      if( !loading ) {
+        loading = true
+        queue.host.store.listQueueEntries(queue.queueKey, seq, seq+count-1) { records =>
+          queue.dispatchQueue {
+
+            var item_count=0
+            var size_count=0
+            var last:QueueEntryRecord = null
+
+            val tmpList = new LinkedNodeList[QueueEntry]()
+            records.foreach { record =>
+
+              // if entries were not adjacent.. create tombstone entry.
+              if( last!=null && (last.queueSeq+1 != record.queueSeq) ) {
+                val entry = new QueueEntry(queue, last.queueSeq+1)
+                entry.tombstone.as_tombstone.count = record.queueSeq - (last.queueSeq+1)
+                tmpList.addLast(entry)
+              }
+
+              val entry = new QueueEntry(queue, record.queueSeq).init(record)
+              tmpList.addLast(entry)
+
+              item_count += 1
+              size_count += record.size
+              last = record
+            }
+
+            // we may need to adjust the enqueue count if entries
+            // were dropped at the store level
+            var item_delta = (items - item_count)
+            val size_delta: Int = size - size_count
+
+            if ( item_delta!=0 || size_delta!=0 ) {
+              assert(item_delta <= 0)
+              assert(size_delta <= 0)
+              info("Detected store dropped %d message(s) in seq range %d to %d using %d bytes", item_delta, seq, seq+count-1, size_delta)
+              queue.enqueue_item_counter += item_delta
+              queue.enqueue_size_counter += size_delta
+            }
+
+            var refill_preftch_list = prefetch_remove
+
+            linkAfter(tmpList)
+            val next = getNext
+
+            // move the subs to the first entry that we just loaded.
+            subscriptions.foreach(_.advance(next))
+            next.addSubscriptions(subscriptions)
+
+            unlink
+            refill_preftch_list.foreach( _.refill_prefetch )
+          }
+        }
+      }
+      entry
+    }
+
+    override def tombstone = {
+      throw new AssertionError("Flush group cannbot be tombstone.");
+    }
+  }
+
   /**
    * Entries in the Flushed state are not holding the referenced messages in memory anymore.
    * This state can transition to Loaded or Tombstone.
    *
-   * TODO: Add a new FlushedList state which can be used to merge multiple
-   * adjacent Flushed entries into a single FlushedList state.  This would allow us
-   * to support queues with millions of flushed entries without much memory impact.
    */
   class Flushed(val messageKey:Long, val size:Int) extends EntryState {
 
@@ -1021,7 +1202,7 @@ class QueueEntry(val queue:Queue, val se
         delivery.storeKey = messageRecord.key
 
         queue.size += size
-        state = new Loaded(delivery)
+        state = new Loaded(delivery, true)
       } else {
 //        debug("Ignoring store load of: ", messageKey)
       }
@@ -1037,6 +1218,9 @@ class QueueEntry(val queue:Queue, val se
       super.tombstone
     }
   }
+
+
+
 }
 
 
@@ -1073,12 +1257,13 @@ class Subscription(queue:Queue) extends 
     pos = queue.head_entry;
     session = consumer.connect(this)
     session.refiller = pos
-
     queue.head_entry.addSubscriptions(this :: Nil)
-    refill_prefetch
 
-    // kick off the initial dispatch.
-    queue.dispatchQueue << queue.head_entry
+    if( queue.serviceState.isStarted ) {
+      // kick off the initial dispatch.
+      refill_prefetch
+      queue.dispatchQueue << queue.head_entry
+    }
   }
 
   def close() = {
@@ -1223,11 +1408,11 @@ class Subscription(queue:Queue) extends 
     acquired.addLast(this)
     acquired_size += entry.size
 
-    def ack(sb:StoreBatch) = {
+    def ack(sb:StoreUOW) = {
 
       if (entry.messageKey != -1) {
         val storeBatch = if( sb == null ) {
-          queue.host.store.createStoreBatch
+          queue.host.store.createStoreUOW
         } else {
           sb
         }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=961163&r1=961162&r2=961163&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala Wed Jul  7 04:12:40 2010
@@ -308,8 +308,8 @@ class DeliveryProducerRoute(val router:R
         if( target.consumer.matches(delivery) ) {
           
           if( storeOnMatch ) {
-            delivery.storeBatch = router.host.store.createStoreBatch
-            delivery.storeKey = delivery.storeBatch.store(delivery.createMessageRecord)
+            delivery.uow = router.host.store.createStoreUOW
+            delivery.storeKey = delivery.uow.store(delivery.createMessageRecord)
             storeOnMatch = false
           }
 
@@ -330,14 +330,14 @@ class DeliveryProducerRoute(val router:R
 
   private def delivered(delivery: Delivery): Unit = {
     if (delivery.ack != null) {
-      if (delivery.storeBatch != null) {
-        delivery.storeBatch.setDisposer(^ {delivery.ack(null)})
+      if (delivery.uow != null) {
+        delivery.uow.setDisposer(^ {delivery.ack(null)})
       } else {
         delivery.ack(null)
       }
     }
-    if (delivery.storeBatch != null) {
-      delivery.storeBatch.release
+    if (delivery.uow != null) {
+      delivery.uow.release
     }
   }
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=961163&r1=961162&r2=961163&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala Wed Jul  7 04:12:40 2010
@@ -146,14 +146,13 @@ class VirtualHost(val broker: Broker) ex
                 store.getQueueStatus(queueKey) { x =>
                   x match {
                     case Some(info)=>
-                    store.listQueueEntries(queueKey) { entries=>
-                      dispatchQueue ^{
-                        val dest = DestinationParser.parse(info.record.name, destination_parser_options)
-                        val queue = new Queue(this, dest)
-                        queue.restore(queueKey, entries)
-                        queues.put(dest.getName, queue)
-                        task.run
-                      }
+
+                    dispatchQueue ^{
+                      val dest = DestinationParser.parse(info.record.name, destination_parser_options)
+                      val queue = new Queue(this, dest, queueKey)
+                      queue.start
+                      queues.put(dest.getName, queue)
+                      task.run
                     }
                     case _ =>
                       task.run
@@ -267,11 +266,11 @@ class VirtualHost(val broker: Broker) ex
       store.addQueue(record) { rc =>
         rc match {
           case Some(queueKey) =>
-            dispatchQueue ^ {
-              val queue = new Queue(this, dest)
-              queue.restore(queueKey, Nil)
+            dispatchQueue {
+              val queue = new Queue(this, dest, queueKey)
               queues.put(dest.getName, queue)
               cb(queue)
+              queue.start()
             }
           case None => // store could not create
             cb(null)

Modified: activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala?rev=961163&r1=961162&r2=961163&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala Wed Jul  7 04:12:40 2010
@@ -22,7 +22,8 @@ import com.shorrockin.cascal.utils.Conve
 import java.util.{HashMap}
 import org.fusesource.hawtbuf.AsciiBuffer._
 import org.fusesource.hawtbuf.{AsciiBuffer, DataByteArrayInputStream, DataByteArrayOutputStream, Buffer}
-import org.apache.activemq.apollo.store.{QueueEntryRecord, QueueStatus, MessageRecord, QueueRecord}
+import org.apache.activemq.apollo.store._
+import collection.mutable.ListBuffer
 
 /**
  *
@@ -161,7 +162,7 @@ class CassandraClient() {
   }
 
 
-  def store(txs:Seq[CassandraStore#CassandraBatch]) {
+  def store(txs:Seq[CassandraStore#CassandraUOW]) {
     withSession {
       session =>
         var operations = List[Operation]()
@@ -205,12 +206,43 @@ class CassandraClient() {
     }
   }
 
-  def getQueueEntries(qid: Long): Seq[QueueEntryRecord] = {
+  def listQueueEntryGroups(queueKey: Long, limit: Int): Seq[QueueEntryGroup] = {
     withSession {
       session =>
-        session.list(schema.entries \ qid).map { x=>
+        var rc = ListBuffer[QueueEntryGroup]()
+        var group:QueueEntryGroup = null
+
+        // TODO: this is going to bring back lots of entries.. not good.
+        session.list(schema.entries \ queueKey).foreach { x=>
+
+          val record:QueueEntryRecord = x.value
+
+          if( group == null ) {
+            group = new QueueEntryGroup
+            group.firstSeq = record.queueSeq
+          }
+          group.lastSeq = record.queueSeq
+          group.count += 1
+          group.size += record.size
+          if( group.count == limit) {
+            rc += group
+            group = null
+          }
+        }
+
+        if( group!=null ) {
+          rc += group
+        }
+        rc
+    }
+  }
+
+  def getQueueEntries(queueKey: Long, firstSeq:Long, lastSeq:Long): Seq[QueueEntryRecord] = {
+    withSession {
+      session =>
+        session.list(schema.entries \ queueKey, RangePredicate(firstSeq, lastSeq)).map { x=>
           val rc:QueueEntryRecord = x.value
-          rc.queueKey = qid
+          rc.queueKey = queueKey
           rc.queueSeq = x.name
           rc
         }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala?rev=961163&r1=961162&r2=961163&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala Wed Jul  7 04:12:40 2010
@@ -16,13 +16,12 @@
  */
 package org.apache.activemq.broker.store.cassandra
 
-import org.apache.activemq.broker.store.{StoreBatch, Store}
+import org.apache.activemq.broker.store.{StoreUOW, Store}
 import org.fusesource.hawtdispatch.BaseRetained
 import com.shorrockin.cascal.session._
 import java.util.concurrent.atomic.AtomicLong
 import collection.mutable.ListBuffer
 import java.util.HashMap
-import org.apache.activemq.apollo.store.{QueueEntryRecord, MessageRecord, QueueStatus, QueueRecord}
 import org.apache.activemq.apollo.broker.{Logging, Log, BaseService}
 import org.apache.activemq.apollo.dto.{CassandraStoreDTO, StoreDTO}
 import collection.{JavaConversions, Seq}
@@ -32,6 +31,7 @@ import org.fusesource.hawtdispatch.Scala
 import ReporterLevel._
 import java.util.concurrent._
 import org.apache.activemq.apollo.util.{TimeCounter, IntCounter}
+import org.apache.activemq.apollo.store._
 
 object CassandraStore extends Log {
 
@@ -195,24 +195,31 @@ class CassandraStore extends Store with 
   }
 
 
-  def listQueueEntries(id: Long)(callback: (Seq[QueueEntryRecord]) => Unit) = {
+  def listQueueEntryGroups(queueKey: Long, limit: Int)(callback: (Seq[QueueEntryGroup]) => Unit) = {
     blocking {
-      callback( client.getQueueEntries(id) )
+      callback( client.listQueueEntryGroups(queueKey, limit) )
+    }
+  }
+
+
+  def listQueueEntries(queueKey: Long, firstSeq: Long, lastSeq: Long)(callback: (Seq[QueueEntryRecord]) => Unit) = {
+    blocking {
+      callback( client.getQueueEntries(queueKey, firstSeq, lastSeq) )
     }
   }
 
   def flushMessage(id: Long)(callback: => Unit) = ^{
-    val action: CassandraBatch#MessageAction = pendingStores.get(id)
+    val action: CassandraUOW#MessageAction = pendingStores.get(id)
     if( action == null ) {
       callback
     } else {
-      action.tx.eagerFlush(callback _)
-      flush(action.tx.txid)
+      action.uow.onComplete(callback _)
+      flush(action.uow.uow_id)
     }
 
   } >>: dispatchQueue
 
-  def createStoreBatch() = new CassandraBatch
+  def createStoreUOW() = new CassandraUOW
 
 
   /////////////////////////////////////////////////////////////////////
@@ -220,7 +227,7 @@ class CassandraStore extends Store with 
   // Implementation of the StoreBatch interface
   //
   /////////////////////////////////////////////////////////////////////
-  class CassandraBatch extends BaseRetained with StoreBatch {
+  class CassandraUOW extends BaseRetained with StoreUOW {
 
     class MessageAction {
 
@@ -229,22 +236,29 @@ class CassandraStore extends Store with 
       var enqueues = ListBuffer[QueueEntryRecord]()
       var dequeues = ListBuffer[QueueEntryRecord]()
 
-      def tx = CassandraBatch.this
+      def uow = CassandraUOW.this
       def isEmpty() = store==null && enqueues==Nil && dequeues==Nil
       def cancel() = {
-        tx.rm(msg)
-        if( tx.isEmpty ) {
-          tx.cancel
+        uow.rm(msg)
+        if( uow.isEmpty ) {
+          uow.cancel
         }
       }
     }
 
-    val txid:Int = next_tx_id.getAndIncrement
+    val uow_id:Int = next_uow_id.getAndIncrement
     var actions = Map[Long, MessageAction]()
     var flushing= false
 
-    var flushListeners = ListBuffer[Runnable]()
-    def eagerFlush(callback: Runnable) = if( callback!=null ) { this.synchronized { flushListeners += callback } }
+    var completeListeners = ListBuffer[Runnable]()
+
+    def onComplete(callback: Runnable) = if( callback!=null ) { this.synchronized { completeListeners += callback } }
+
+    var disableDelay = false
+    def completeASAP() = this.synchronized { disableDelay=true }
+
+    def delayable = !disableDelay
+
 
     def rm(msg:Long) = {
       actions -= msg
@@ -252,7 +266,7 @@ class CassandraStore extends Store with 
 
     def isEmpty = actions.isEmpty
     def cancel = {
-      delayedTransactions.remove(txid)
+      delayedUOWs.remove(uow_id)
       onPerformed
     }
 
@@ -298,12 +312,12 @@ class CassandraStore extends Store with 
     }
 
     override def dispose = {
-      transaction_source.merge(this)
+      uow_source.merge(this)
     }
 
 
     def onPerformed() {
-      flushListeners.foreach { x=>
+      completeListeners.foreach { x=>
         x.run()
       }
       super.dispose
@@ -312,29 +326,29 @@ class CassandraStore extends Store with 
 
   def key(x:QueueEntryRecord) = (x.queueKey, x.queueSeq)
 
-  val transaction_source = createSource(new ListEventAggregator[CassandraBatch](), dispatchQueue)
-  transaction_source.setEventHandler(^{drain_transactions});
-  transaction_source.resume
-
-  var pendingStores = new HashMap[Long, CassandraBatch#MessageAction]()
-  var pendingEnqueues = new HashMap[(Long,Long), CassandraBatch#MessageAction]()
-  var delayedTransactions = new HashMap[Int, CassandraBatch]()
+  val uow_source = createSource(new ListEventAggregator[CassandraUOW](), dispatchQueue)
+  uow_source.setEventHandler(^{drain_uows});
+  uow_source.resume
+
+  var pendingStores = new HashMap[Long, CassandraUOW#MessageAction]()
+  var pendingEnqueues = new HashMap[(Long,Long), CassandraUOW#MessageAction]()
+  var delayedUOWs = new HashMap[Int, CassandraUOW]()
 
-  var next_tx_id = new IntCounter(1)
+  var next_uow_id = new IntCounter(1)
   
-  def drain_transactions = {
-    transaction_source.getData.foreach { tx =>
+  def drain_uows = {
+    uow_source.getData.foreach { uow =>
 
-      val tx_id = tx.txid
-      delayedTransactions.put(tx_id, tx)
+      val uow_id = uow.uow_id
+      delayedUOWs.put(uow_id, uow)
 
-      tx.actions.foreach { case (msg, action) =>
+      uow.actions.foreach { case (msg, action) =>
 
         // dequeues can cancel out previous enqueues
         action.dequeues.foreach { currentDequeue=>
           val currentKey = key(currentDequeue)
-          val prevAction:CassandraBatch#MessageAction = pendingEnqueues.remove(currentKey)
-          if( prevAction!=null && !prevAction.tx.flushing ) {
+          val prevAction:CassandraUOW#MessageAction = pendingEnqueues.remove(currentKey)
+          if( prevAction!=null && !prevAction.uow.flushing ) {
 
             // yay we can cancel out a previous enqueue
             prevAction.enqueues = prevAction.enqueues.filterNot( x=> key(x) == currentKey )
@@ -359,17 +373,17 @@ class CassandraStore extends Store with 
         }
       }
 
-      if( !tx.flushListeners.isEmpty || config.flushDelay <= 0 ) {
-        flush(tx_id)
+      if( !uow.completeListeners.isEmpty || config.flushDelay <= 0 ) {
+        flush(uow_id)
       } else {
-        dispatchQueue.dispatchAfter(config.flushDelay, TimeUnit.MILLISECONDS, ^{flush(tx_id)})
+        dispatchQueue.dispatchAfter(config.flushDelay, TimeUnit.MILLISECONDS, ^{flush(uow_id)})
       }
 
     }
   }
 
-  def flush(tx_id:Int) = {
-    flush_source.merge(tx_id)
+  def flush(uow_id:Int) = {
+    flush_source.merge(uow_id)
   }
 
   val flush_source = createSource(new ListEventAggregator[Int](), dispatchQueue)
@@ -382,27 +396,27 @@ class CassandraStore extends Store with 
       return
     }
     
-    val txs = flush_source.getData.flatMap{ tx_id =>
-      val tx = delayedTransactions.remove(tx_id)
+    val uows = flush_source.getData.flatMap{ uow_id =>
+      val uow = delayedUOWs.remove(uow_id)
       // Message may be flushed or canceled before the timeout flush event..
-      // tx may be null in those cases
-      if (tx!=null) {
-        tx.flushing = true
-        Some(tx)
+      // uow may be null in those cases
+      if (uow!=null) {
+        uow.flushing = true
+        Some(uow)
       } else {
         None
       }
     }
 
-    if( !txs.isEmpty ) {
+    if( !uows.isEmpty ) {
       storeLatency.start { end =>
         blocking {
-          client.store(txs)
+          client.store(uows)
           dispatchQueue {
             end()
-            txs.foreach { tx=>
+            uows.foreach { uow=>
 
-              tx.actions.foreach { case (msg, action) =>
+              uow.actions.foreach { case (msg, action) =>
                 if( action.store!=null ) {
                   pendingStores.remove(msg)
                 }
@@ -412,7 +426,7 @@ class CassandraStore extends Store with 
                 }
               }
 
-              tx.onPerformed
+              uow.onPerformed
             }
           }
         }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala?rev=961163&r1=961162&r2=961163&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala Wed Jul  7 04:12:40 2010
@@ -20,12 +20,10 @@ import java.{lang=>jl}
 import java.{util=>ju}
 
 import model.{AddQueue, AddQueueEntry, AddMessage}
-import org.apache.activemq.apollo.store.{QueueEntryRecord, QueueStatus, MessageRecord}
 import org.apache.activemq.apollo.dto.HawtDBStoreDTO
 import java.io.File
 import java.io.IOException
 import java.util.concurrent.TimeUnit
-import org.apache.activemq.apollo.store.QueueRecord
 import org.fusesource.hawtbuf.proto.MessageBuffer
 import org.fusesource.hawtbuf.proto.PBMessage
 import org.apache.activemq.util.LockFile
@@ -40,11 +38,12 @@ import org.fusesource.hawtbuf._
 import org.fusesource.hawtdispatch.ScalaDispatch._
 import collection.mutable.{LinkedHashMap, HashMap, ListBuffer}
 import collection.JavaConversions
-import java.util.{TreeSet, HashSet}
+import ju.{TreeSet, HashSet}
 
 import org.fusesource.hawtdb.api._
 import org.apache.activemq.apollo.broker.{DispatchLogging, Log, Logging, BaseService}
 import org.apache.activemq.apollo.util.TimeCounter
+import org.apache.activemq.apollo.store._
 
 object HawtDBClient extends Log {
   val BEGIN = -1
@@ -134,35 +133,38 @@ class HawtDBClient(hawtDBStore: HawtDBSt
         func
       } else {
         info("Database " + lockFileName + " is locked... waiting " + (DATABASE_LOCKED_WAIT_DELAY / 1000) + " seconds for the database to be unlocked.")
-        dispatchQueue.dispatchAfter(DATABASE_LOCKED_WAIT_DELAY, TimeUnit.MILLISECONDS, ^ {lock(func _)})
+        dispatchQueue.dispatchAfter(DATABASE_LOCKED_WAIT_DELAY, TimeUnit.MILLISECONDS, ^ {
+          hawtDBStore.executor_pool {
+            lock(func _)
+          }
+        })
       }
     }
   }
 
-  def createJournal() = {
-    val journal = new Journal()
-    journal.setDirectory(directory)
-    journal.setMaxFileLength(config.journalLogSize)
-    journal
-  }
-
   val schedual_version = new AtomicInteger()
 
   def start(onComplete:Runnable) = {
     lock {
 
-      journal = createJournal()
+      journal = new Journal()
+      journal.setDirectory(directory)
+      journal.setMaxFileLength(config.journalLogSize)
+      journal.setMaxWriteBatchSize(config.journalBatchSize);
+      journal.setChecksum(true);
+
+      if( config.archiveDirectory!=null ) {
+        journal.setDirectoryArchive(config.archiveDirectory)
+        journal.setArchiveDataLogs(true)
+      }
       journal.start
 
       pageFileFactory.setFile(new File(directory, "db"))
       pageFileFactory.setDrainOnClose(false)
       pageFileFactory.setSync(true)
       pageFileFactory.setUseWorkerThread(true)
-      pageFileFactory.setPageSize(512.toShort)
-
-      // Empirically found (using profiler) that a cached BTree page retains
-      // about 4000 bytes of mem ON 64 bit platform.
-      pageFileFactory.setCacheSize((1024*1024*20)/4000 );
+      pageFileFactory.setPageSize(config.indexPageSize)
+      pageFileFactory.setCacheSize(config.indexCacheSize);
 
       pageFileFactory.open()
 
@@ -222,7 +224,7 @@ class HawtDBClient(hawtDBStore: HawtDBSt
     _store(update, callback)
   }
 
-  def store(txs: Seq[HawtDBStore#HawtDBBatch], callback:Runnable) {
+  def store(txs: Seq[HawtDBStore#HawtDBUOW], callback:Runnable) {
     var batch = ListBuffer[TypeCreatable]()
     txs.foreach {
       tx =>
@@ -239,9 +241,9 @@ class HawtDBClient(hawtDBStore: HawtDBSt
             }
             action.dequeues.foreach {
               queueEntry =>
-                val qid = queueEntry.queueKey
-                val seq = queueEntry.queueSeq
-                batch += new RemoveQueueEntry.Bean().setQueueKey(qid).setQueueSeq(seq)
+                val queueKey = queueEntry.queueKey
+                val queueSeq = queueEntry.queueSeq
+                batch += new RemoveQueueEntry.Bean().setQueueKey(queueKey).setQueueSeq(queueSeq)
             }
         }
     }
@@ -254,15 +256,17 @@ class HawtDBClient(hawtDBStore: HawtDBSt
   }
 
   def listQueues: Seq[Long] = {
+    val rc = ListBuffer[Long]()
     withTx { tx =>
-        val helper = new TxHelper(tx)
-        import JavaConversions._
-        import helper._
-        queueIndex.iterator.map {
-          entry =>
-            entry.getKey.longValue
-        }.toSeq
+      val helper = new TxHelper(tx)
+      import JavaConversions._
+      import helper._
+
+      queueIndex.iterator.foreach { entry =>
+        rc += entry.getKey.longValue
+      }
     }
+    rc
   }
 
   def getQueueStatus(queueKey: Long): Option[QueueStatus] = {
@@ -292,27 +296,69 @@ class HawtDBClient(hawtDBStore: HawtDBSt
     }
   }
 
-
-  def getQueueEntries(queueKey: Long): Seq[QueueEntryRecord] = {
+  def listQueueEntryGroups(queueKey: Long, limit: Int) : Seq[QueueEntryGroup] = {
     withTx { tx =>
         val helper = new TxHelper(tx)
         import JavaConversions._
         import helper._
+        import Predicates._
 
         val queueRecord = queueIndex.get(queueKey)
         if (queueRecord != null) {
           val entryIndex = queueEntryIndex(queueRecord)
-          entryIndex.iterator.map {
-            entry =>
-              val rc: QueueEntryRecord = entry.getValue
-              rc
-          }.toSeq
+
+          var rc = ListBuffer[QueueEntryGroup]()
+          var group:QueueEntryGroup = null
+
+          entryIndex.iterator.foreach { entry =>
+            if( group == null ) {
+              group = new QueueEntryGroup
+              group.firstSeq = entry.getKey.longValue
+            }
+            group.lastSeq = entry.getKey.longValue
+            group.count += 1
+            group.size += entry.getValue.getSize
+            if( group.count == limit) {
+              rc += group
+              group = null
+            }
+          }
+
+          if( group!=null ) {
+            rc += group
+          }
+          rc
         } else {
-          Nil.toSeq
+          null
         }
     }
   }
 
+  def getQueueEntries(queueKey: Long, firstSeq:Long, lastSeq:Long): Seq[QueueEntryRecord] = {
+    var rc = ListBuffer[QueueEntryRecord]()
+    withTx { tx =>
+      val helper = new TxHelper(tx)
+      import JavaConversions._
+      import helper._
+      import Predicates._
+
+      val queueRecord = queueIndex.get(queueKey)
+      if (queueRecord != null) {
+        val entryIndex = queueEntryIndex(queueRecord)
+
+        val where = and(gte(new jl.Long(firstSeq)), lte(new jl.Long(lastSeq)))
+        entryIndex.iterator( where ).foreach {
+          entry =>
+            val record: QueueEntryRecord = entry.getValue
+            rc += record
+        }
+      } else {
+        rc = null
+      }
+    }
+    rc
+  }
+
   val metric_load_from_index = new TimeCounter
   val metric_load_from_journal = new TimeCounter
 
@@ -405,6 +451,9 @@ class HawtDBClient(hawtDBStore: HawtDBSt
 
   private def _store(update: TypeCreatable, onComplete: Runnable): Unit = _store(-1, update, onComplete)
 
+  val metric_journal_append = new TimeCounter
+  val metric_index_update = new TimeCounter
+
   /**
    * All updated are are funneled through this method. The updates are logged to
    * the journal and then the indexes are update.  onFlush will be called back once
@@ -423,7 +472,9 @@ class HawtDBClient(hawtDBStore: HawtDBSt
     val buffer = baos.toBuffer()
     append(buffer) {
       location =>
-        executeStore(batch, update, onComplete, location)
+        metric_index_update.time {
+          executeStore(batch, update, onComplete, location)
+        }
     }
   }
 
@@ -489,6 +540,7 @@ class HawtDBClient(hawtDBStore: HawtDBSt
     val start = System.currentTimeMillis()
     incrementalRecover
 
+
     _store(new AddTrace.Bean().setMessage("RECOVERED"), ^ {
       // Rollback any batches that did not complete.
       batches.keysIterator.foreach {
@@ -573,11 +625,14 @@ class HawtDBClient(hawtDBStore: HawtDBSt
   /////////////////////////////////////////////////////////////////////
 
   private def append(data: Buffer)(cb: (Location) => Unit): Unit = {
-    journal.write(data, new JournalCallback() {
-      def success(location: Location) = {
-        cb(location)
-      }
-    })
+    metric_journal_append.start { end =>
+      journal.write(data, new JournalCallback() {
+        def success(location: Location) = {
+          end()
+          cb(location)
+        }
+      })
+    }
   }
 
   def read(location: Location) = journal.read(location)
@@ -698,7 +753,7 @@ class HawtDBClient(hawtDBStore: HawtDBSt
         val trackingIndex = queueTrackingIndex(queueRecord)
         val entryIndex = queueEntryIndex(queueRecord)
 
-        trackingIndex.iterator.map { entry=>
+        trackingIndex.iterator.foreach { entry=>
           val messageKey = entry.getKey
           if( addAndGet(messageRefsIndex, messageKey, -1) == 0 ) {
             // message is no longer referenced.. we can remove it..
@@ -827,7 +882,7 @@ class HawtDBClient(hawtDBStore: HawtDBSt
 
       case x: Purge.Getter =>
         // Remove all the queues...
-        val queueKeys = queueIndex.iterator.map( _.getKey ).toList
+        val queueKeys = queueIndex.iterator.map( _.getKey )
         queueKeys.foreach { key =>
           removeQueue(key.longValue)
         }
@@ -1028,6 +1083,7 @@ class HawtDBClient(hawtDBStore: HawtDBSt
       } else {
         tx.rollback
       }
+      tx.close
     }
   }
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala?rev=961163&r1=961162&r2=961163&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala Wed Jul  7 04:12:40 2010
@@ -16,20 +16,20 @@
  */
 package org.apache.activemq.broker.store.hawtdb
 
-import org.apache.activemq.broker.store.{StoreBatch, Store}
+import org.apache.activemq.broker.store.{StoreUOW, Store}
 import org.fusesource.hawtdispatch.BaseRetained
 import java.util.concurrent.atomic.AtomicLong
 import collection.mutable.ListBuffer
 import java.util.HashMap
-import org.apache.activemq.apollo.store.{QueueEntryRecord, MessageRecord, QueueStatus, QueueRecord}
 import org.apache.activemq.apollo.dto.{HawtDBStoreDTO, StoreDTO}
 import collection.{JavaConversions, Seq}
 import org.fusesource.hawtdispatch.ScalaDispatch._
 import org.apache.activemq.apollo.broker._
 import java.io.File
 import ReporterLevel._
-import org.apache.activemq.apollo.util.{TimeCounter, IntCounter}
 import java.util.concurrent._
+import org.apache.activemq.apollo.store._
+import org.apache.activemq.apollo.util.{IntMetricCounter, TimeCounter, IntCounter}
 
 object HawtDBStore extends Log {
   val DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
@@ -138,6 +138,7 @@ class HawtDBStore extends Store with Bas
     })
   }
 
+
   def addQueue(record: QueueRecord)(callback: (Option[Long]) => Unit) = {
     val key = next_queue_key.getAndIncrement
     record.key = key
@@ -148,9 +149,9 @@ class HawtDBStore extends Store with Bas
     client.removeQueue(queueKey,^{ callback(true) })
   }
 
-  def getQueueStatus(id: Long)(callback: (Option[QueueStatus]) => Unit) = {
+  def getQueueStatus(queueKey: Long)(callback: (Option[QueueStatus]) => Unit) = {
     executor_pool ^{
-      callback( client.getQueueStatus(id) )
+      callback( client.getQueueStatus(queueKey) )
     }
   }
 
@@ -165,35 +166,46 @@ class HawtDBStore extends Store with Bas
   load_source.resume
 
 
-  def loadMessage(id: Long)(callback: (Option[MessageRecord]) => Unit) = {
-    load_source.merge((id, callback))
+  def loadMessage(messageKey: Long)(callback: (Option[MessageRecord]) => Unit) = {
+    message_load_latency.start { end=>
+      load_source.merge((messageKey, { (result)=>
+        end()
+        callback(result)
+      }))
+    }
   }
 
   def drain_loads = {
     var data = load_source.getData
+    message_load_batch_size += data.size
     executor_pool ^{
       client.loadMessages(data)
     }
   }
 
-  def listQueueEntries(id: Long)(callback: (Seq[QueueEntryRecord]) => Unit) = {
+  def listQueueEntryGroups(queueKey: Long, limit: Int)(callback: (Seq[QueueEntryGroup]) => Unit) = {
     executor_pool ^{
-      callback( client.getQueueEntries(id) )
+      callback( client.listQueueEntryGroups(queueKey, limit) )
     }
   }
 
-  def flushMessage(id: Long)(cb: => Unit) = dispatchQueue {
-    val action: HawtDBBatch#MessageAction = pendingStores.get(id)
+  def listQueueEntries(queueKey: Long, firstSeq: Long, lastSeq: Long)(callback: (Seq[QueueEntryRecord]) => Unit) = {
+    executor_pool ^{
+      callback( client.getQueueEntries(queueKey, firstSeq, lastSeq) )
+    }
+  }
+
+  def flushMessage(messageKey: Long)(cb: => Unit) = dispatchQueue {
+    val action: HawtDBUOW#MessageAction = pendingStores.get(messageKey)
     if( action == null ) {
-//      println("flush due to not found: "+id)
       cb
     } else {
-      action.tx.eagerFlush(^{ cb })
-      flush(action.tx.txid)
+      action.uow.onComplete(^{ cb })
+      flush(action.uow.uow_id)
     }
   }
 
-  def createStoreBatch() = new HawtDBBatch
+  def createStoreUOW() = new HawtDBUOW
 
 
   /////////////////////////////////////////////////////////////////////
@@ -201,7 +213,7 @@ class HawtDBStore extends Store with Bas
   // Implementation of the StoreBatch interface
   //
   /////////////////////////////////////////////////////////////////////
-  class HawtDBBatch extends BaseRetained with StoreBatch {
+  class HawtDBUOW extends BaseRetained with StoreUOW {
 
     var dispose_start:Long = 0
     var flushing = false;
@@ -213,29 +225,37 @@ class HawtDBStore extends Store with Bas
       var enqueues = ListBuffer[QueueEntryRecord]()
       var dequeues = ListBuffer[QueueEntryRecord]()
 
-      def tx = HawtDBBatch.this
+      def uow = HawtDBUOW.this
       def isEmpty() = messageRecord==null && enqueues==Nil && dequeues==Nil
+
       def cancel() = {
-        tx.rm(msg)
-        if( tx.isEmpty ) {
-          tx.cancel
-        }
+        uow.rm(msg)
       }
     }
 
-    val txid:Int = next_tx_id.getAndIncrement
+    val uow_id:Int = next_batch_id.getAndIncrement
     var actions = Map[Long, MessageAction]()
 
-    var flushListeners = ListBuffer[Runnable]()
-    def eagerFlush(callback: Runnable) = if( callback!=null ) { this.synchronized { flushListeners += callback } }
+    var completeListeners = ListBuffer[Runnable]()
+    var disableDelay = false
+
+    def onComplete(callback: Runnable) = if( callback!=null ) { this.synchronized { completeListeners += callback } }
+
+    def completeASAP() = this.synchronized { disableDelay=true }
+
+    var delayable_actions = 0
+
+    def delayable = !disableDelay && delayable_actions>0 && config.flushDelay>=0
 
     def rm(msg:Long) = {
       actions -= msg
+      if( actions.isEmpty ) {
+        cancel
+      }
     }
 
-    def isEmpty = actions.isEmpty
     def cancel = {
-      delayedTransactions.remove(txid)
+      delayedUOWs.remove(uow_id)
       onPerformed
     }
 
@@ -250,7 +270,7 @@ class HawtDBStore extends Store with Bas
       dispatchQueue {
         pendingStores.put(record.key, action)
       }
-
+      delayable_actions += 1
       record.key
     }
 
@@ -266,13 +286,14 @@ class HawtDBStore extends Store with Bas
     }
 
     def enqueue(entry: QueueEntryRecord) = {
-
-      this.synchronized {
+      val a = this.synchronized {
         val a = action(entry.messageKey)
         a.enqueues += entry
-        dispatchQueue {
-          pendingEnqueues.put(key(entry), a)
-        }
+        delayable_actions += 1
+        a
+      }
+      dispatchQueue {
+        pending_enqueues.put(key(entry), a)
       }
 
     }
@@ -285,14 +306,12 @@ class HawtDBStore extends Store with Bas
 
     override def dispose = {
       dispose_start = System.nanoTime
-      transaction_source.merge(this)
+      uow_source.merge(this)
     }
 
     def onPerformed() = this.synchronized {
-      metric_commit_counter += 1
-      val t = TimeUnit.NANOSECONDS.toMillis(System.nanoTime-dispose_start)
-      metric_commit_latency_counter += t
-      flushListeners.foreach { x=>
+      commit_latency += System.nanoTime-dispose_start
+      completeListeners.foreach { x=>
         x.run
       }
       super.dispose
@@ -303,9 +322,10 @@ class HawtDBStore extends Store with Bas
   var metric_canceled_enqueue_counter:Long = 0
   var metric_flushed_message_counter:Long = 0
   var metric_flushed_enqueue_counter:Long = 0
-  var metric_commit_counter:Long = 0
-  var metric_commit_latency_counter:Long = 0
 
+  val commit_latency = new TimeCounter
+  val message_load_batch_size = new IntMetricCounter
+  val message_load_latency = new TimeCounter
 
   var canceled_add_message:Long = 0
   var canceled_enqueue:Long = 0
@@ -313,27 +333,23 @@ class HawtDBStore extends Store with Bas
 
   def key(x:QueueEntryRecord) = (x.queueKey, x.queueSeq)
 
-  val transaction_source = createSource(new ListEventAggregator[HawtDBBatch](), dispatchQueue)
-  transaction_source.setEventHandler(^{drain_transactions});
-  transaction_source.resume
-
-  var pendingStores = new HashMap[Long, HawtDBBatch#MessageAction]()
-  var pendingEnqueues = new HashMap[(Long,Long), HawtDBBatch#MessageAction]()
-  var delayedTransactions = new HashMap[Int, HawtDBBatch]()
+  val uow_source = createSource(new ListEventAggregator[HawtDBUOW](), dispatchQueue)
+  uow_source.setEventHandler(^{drain_uows});
+  uow_source.resume
+
+  var pendingStores = new HashMap[Long, HawtDBUOW#MessageAction]()
+  var pending_enqueues = new HashMap[(Long,Long), HawtDBUOW#MessageAction]()
+  var delayedUOWs = new HashMap[Int, HawtDBUOW]()
 
-  var next_tx_id = new IntCounter(1)
+  var next_batch_id = new IntCounter(1)
 
   def schedualDisplayStats:Unit = {
     val st = System.nanoTime
-    val ss = (metric_canceled_message_counter, metric_canceled_enqueue_counter, metric_flushed_message_counter, metric_flushed_enqueue_counter, metric_commit_counter, metric_commit_latency_counter)
+    val ss = (metric_canceled_message_counter, metric_canceled_enqueue_counter, metric_flushed_message_counter, metric_flushed_enqueue_counter)
     def displayStats = {
       if( serviceState.isStarted ) {
         val et = System.nanoTime
-        val es = (metric_canceled_message_counter, metric_canceled_enqueue_counter, metric_flushed_message_counter, metric_flushed_enqueue_counter, metric_commit_counter, metric_commit_latency_counter)
-
-        val commits = es._5-ss._5
-        var avgCommitLatency = if (commits!=0) (es._6 - ss._6).toFloat / commits else 0f
-
+        val es = (metric_canceled_message_counter, metric_canceled_enqueue_counter, metric_flushed_message_counter, metric_flushed_enqueue_counter)
         def rate(x:Long, y:Long):Float = ((y-x)*1000.0f)/TimeUnit.NANOSECONDS.toMillis(et-st)
 
         val m1 = rate(ss._1,es._1)
@@ -342,23 +358,29 @@ class HawtDBStore extends Store with Bas
         val m4 = rate(ss._4,es._4)
 
         if( m1>0f || m2>0f || m3>0f || m4>0f ) {
-          info("metrics: cancled: { messages: %,.3f, enqeues: %,.3f }, flushed: { messages: %,.3f, enqeues: %,.3f }, commit latency: %,.3f, store latency: %,.3f",
-            m1, m2, m3, m3, avgCommitLatency, storeLatency(true).avgTime(TimeUnit.MILLISECONDS) )
+          info("metrics: cancled: { messages: %,.3f, enqeues: %,.3f }, flushed: { messages: %,.3f, enqeues: %,.3f }",
+            m1, m2, m3, m4 )
         }
 
-
-        def display(name:String, counter:TimeCounter) {
+        def displayTime(name:String, counter:TimeCounter) = {
           var t = counter.apply(true)
           if( t.count > 0 ) {
-            info("%s latency in ms: avg: %,.3f, max: %,.3f, min: %,.3f", name, t.avgTime(TimeUnit.MILLISECONDS), t.maxTime(TimeUnit.MILLISECONDS), t.minTime(TimeUnit.MILLISECONDS))
+            info("%s latency in ms: avg: %,.3f, min: %,.3f, max: %,.3f", name, t.avgTime(TimeUnit.MILLISECONDS), t.minTime(TimeUnit.MILLISECONDS), t.maxTime(TimeUnit.MILLISECONDS))
+          }
+        }
+        def displayInt(name:String, counter:IntMetricCounter) = {
+          var t = counter.apply(true)
+          if( t.count > 0 ) {
+            info("%s: avg: %,.3f, min: %d, max: %d", name, t.avg, t.min, t.max )
           }
         }
 
-//        display("total msg load", loadMessageTimer)
-//        display("index read", client.indexLoad)
-//        display("toal journal load", client.journalLoad)
-//        display("journal read", client.journalRead)
-//        display("journal decode", client.journalDecode)
+        displayTime("commit", commit_latency)
+        displayTime("store", store_latency)
+        displayTime("message load", message_load_latency)
+        displayTime("journal append", client.metric_journal_append)
+        displayTime("index update", client.metric_index_update)
+        displayInt("load batch size", message_load_batch_size)
 
         schedualDisplayStats
       }
@@ -366,34 +388,43 @@ class HawtDBStore extends Store with Bas
     dispatchQueue.dispatchAfter(5, TimeUnit.SECONDS, ^{ displayStats })
   }
 
-  def drain_transactions = {
-    transaction_source.getData.foreach { tx =>
+  def drain_uows = {
+    uow_source.getData.foreach { uow =>
 
-      delayedTransactions.put(tx.txid, tx)
+      delayedUOWs.put(uow.uow_id, uow)
 
-      tx.actions.foreach { case (msg, action) =>
+      uow.actions.foreach { case (msg, action) =>
 
         // dequeues can cancel out previous enqueues
         action.dequeues.foreach { currentDequeue=>
           val currentKey = key(currentDequeue)
-          val prevAction:HawtDBBatch#MessageAction = pendingEnqueues.remove(currentKey)
-          if( prevAction!=null && !prevAction.tx.flushing ) {
+          val prev_action:HawtDBUOW#MessageAction = pending_enqueues.remove(currentKey)
+
+          def prev_batch = prev_action.uow
+          
+          if( prev_action!=null && !prev_batch.flushing ) {
+
 
+            prev_batch.delayable_actions -= 1
             metric_canceled_enqueue_counter += 1
 
             // yay we can cancel out a previous enqueue
-            prevAction.enqueues = prevAction.enqueues.filterNot( x=> key(x) == currentKey )
+            prev_action.enqueues = prev_action.enqueues.filterNot( x=> key(x) == currentKey )
 
             // if the message is not in any queues.. we can gc it..
-            if( prevAction.enqueues == Nil && prevAction.messageRecord !=null ) {
+            if( prev_action.enqueues == Nil && prev_action.messageRecord !=null ) {
               pendingStores.remove(msg)
-              prevAction.messageRecord = null
+              prev_action.messageRecord = null
+              prev_batch.delayable_actions -= 1
               metric_canceled_message_counter += 1
             }
 
             // Cancel the action if it's now empty
-            if( prevAction.isEmpty ) {
-              prevAction.cancel()
+            if( prev_action.isEmpty ) {
+              prev_action.cancel()
+            } else if( !prev_batch.delayable ) {
+              // flush it if there is no point in delyaing anymore
+              flush(prev_batch.uow_id)
             }
 
             // since we canceled out the previous enqueue.. now cancel out the action
@@ -405,25 +436,25 @@ class HawtDBStore extends Store with Bas
         }
       }
 
-      val tx_id = tx.txid
-      if( !tx.flushListeners.isEmpty || config.flushDelay <= 0 ) {
-        flush(tx_id)
+      val batch_id = uow.uow_id
+      if( uow.delayable ) {
+        dispatchQueue.dispatchAfter(config.flushDelay, TimeUnit.MILLISECONDS, ^{flush(batch_id)})
       } else {
-        dispatchQueue.dispatchAfter(config.flushDelay, TimeUnit.MILLISECONDS, ^{flush(tx_id)})
+        flush(batch_id)
       }
 
     }
   }
 
-  def flush(tx_id:Int) = {
-    flush_source.merge(tx_id)
+  def flush(batch_id:Int) = {
+    flush_source.merge(batch_id)
   }
 
   val flush_source = createSource(new ListEventAggregator[Int](), dispatchQueue)
   flush_source.setEventHandler(^{drain_flushes});
   flush_source.resume
 
-  val storeLatency = new TimeCounter
+  val store_latency = new TimeCounter
 
   def drain_flushes:Unit = {
 
@@ -431,29 +462,29 @@ class HawtDBStore extends Store with Bas
       return
     }
     
-    val txs = flush_source.getData.flatMap{ tx_id =>
+    val uows = flush_source.getData.flatMap{ uow_id =>
 
-      val tx = delayedTransactions.remove(tx_id)
+      val uow = delayedUOWs.remove(uow_id)
       // Message may be flushed or canceled before the timeout flush event..
-      // tx may be null in those cases
-      if (tx!=null) {
-        tx.flushing = true
-        Some(tx)
+      // uow may be null in those cases
+      if (uow!=null) {
+        uow.flushing = true
+        Some(uow)
       } else {
         None
       }
     }
 
-    if( !txs.isEmpty ) {
-      storeLatency.start { end=>
+    if( !uows.isEmpty ) {
+      store_latency.start { end=>
         executor_pool {
-          client.store(txs, ^{
+          client.store(uows, ^{
             dispatchQueue {
 
               end()
-              txs.foreach { tx=>
+              uows.foreach { uow=>
 
-                tx.actions.foreach { case (msg, action) =>
+                uow.actions.foreach { case (msg, action) =>
                   if( action.messageRecord !=null ) {
                     metric_flushed_message_counter += 1
                     pendingStores.remove(msg)
@@ -461,10 +492,10 @@ class HawtDBStore extends Store with Bas
                   action.enqueues.foreach { queueEntry=>
                     metric_flushed_enqueue_counter += 1
                     val k = key(queueEntry)
-                    pendingEnqueues.remove(k)
+                    pending_enqueues.remove(k)
                   }
                 }
-                tx.onPerformed
+                uow.onPerformed
 
               }
             }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=961163&r1=961162&r2=961163&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala Wed Jul  7 04:12:40 2010
@@ -29,7 +29,7 @@ import Stomp._
 import BufferConversions._
 import StompFrameConstants._
 import java.io.IOException
-import org.apache.activemq.broker.store.StoreBatch
+import org.apache.activemq.broker.store.StoreUOW
 import org.apache.activemq.selector.SelectorParser
 import org.apache.activemq.filter.{BooleanExpression, FilterException}
 
@@ -160,7 +160,7 @@ class StompProtocolHandler extends Proto
   var host:VirtualHost = null
 
   private def queue = connection.dispatchQueue
-  var pendingAcks = HashMap[AsciiBuffer, (StoreBatch)=>Unit]()
+  var pendingAcks = HashMap[AsciiBuffer, (StoreUOW)=>Unit]()
 
   override def onTransportConnected() = {
     session_manager = new SinkMux[StompFrame]( MapSink(connection.transportSink){ x=>x }, dispatchQueue, StompFrame)
@@ -283,7 +283,7 @@ class StompProtocolHandler extends Proto
   }
 
   def send_via_route(route:DeliveryProducerRoute, frame:StompFrame) = {
-    var storeBatch:StoreBatch=null
+    var storeBatch:StoreUOW=null
     // User might be asking for ack that we have prcoessed the message..
     val receipt = frame.header(Stomp.Headers.RECEIPT_REQUESTED)
 

Added: activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/QueueEntryGroup.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/QueueEntryGroup.java?rev=961163&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/QueueEntryGroup.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/QueueEntryGroup.java Wed Jul  7 04:12:40 2010
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.store;
+
+import org.fusesource.hawtbuf.Buffer;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class QueueEntryGroup {
+    public long firstSeq;
+    public long lastSeq;
+    public int count;
+    public int size;
+}
\ No newline at end of file

Modified: activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala?rev=961163&r1=961162&r2=961163&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala Wed Jul  7 04:12:40 2010
@@ -16,68 +16,26 @@
  */
 package org.apache.activemq.broker.store
 
-import _root_.java.lang.{String}
-import org.fusesource.hawtbuf._
 import org.apache.activemq.Service
-import org.fusesource.hawtdispatch.{Retained}
 import org.apache.activemq.apollo.store._
 import org.apache.activemq.apollo.broker.Reporter
 import org.apache.activemq.apollo.dto.StoreDTO
 
 /**
- * A store batch is used to perform persistent
- * operations as a unit of work.
- * 
- * The batch implements the Retained interface and is
- * thread safe.  Once the batch is no longer retained,
- * the unit of work is executed.  
+ * <p>
+ * The Store is service which offers asynchronous persistence services
+ * to a Broker.
+ * </p>
  *
- * The disposer assigned to the batch will
- * be executed once the unit of work is persisted
- * or it has been negated by subsequent storage
- * operations.
- *
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-trait StoreBatch extends Retained {
-
-  /**
-   * Stores a message.  Messages a reference counted, so make sure you also 
-   * enqueue it to queue if you don't want it to be discarded right away.
-   * 
-   * This method auto generates and assigns the key field of the message record and
-   * returns it.
-   */
-  def store(message:MessageRecord):Long
-
-  /**
-   * Adds a queue entry
-   */
-  def enqueue(entry:QueueEntryRecord)
-
-  /**
-   * Removes a queue entry
-   */
-  def dequeue(entry:QueueEntryRecord)
-
-
-  /**
-   * Causes the batch to flush eagerly, callback is called once flushed.
-   */
-  def eagerFlush(callback: Runnable)
-
-}
-
-/**
  *  @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
 trait Store extends Service {
 
   /**
-   * Creates a store batch which is used to perform persistent
+   * Creates a store uow which is used to perform persistent
    * operations as unit of work.
    */
-  def createStoreBatch():StoreBatch
+  def createStoreUOW():StoreUOW
 
   /**
    * Supplies configuration data to the Store.  This will be called
@@ -116,9 +74,16 @@ trait Store extends Service {
   def listQueues(callback: (Seq[Long])=>Unit )
 
   /**
-   * Loads the queue information for a given queue id.
+   * Groups all the entries in the specified queue into groups containing limit entries and returns those
+   * groups.  Allows you to quickly get a rough idea of the items in queue without consuming too much memory.
+   */
+  def listQueueEntryGroups(queueKey:Long, limit:Int)(callback:(Seq[QueueEntryGroup])=>Unit )
+
+  /**
+   * Loads all the queue entry records for the given queue id between the first and last provided
+   * queue sequences (inclusive).
    */
-  def listQueueEntries(queueKey:Long)(callback:(Seq[QueueEntryRecord])=>Unit )
+  def listQueueEntries(queueKey:Long, firstSeq:Long, lastSeq:Long)(callback:(Seq[QueueEntryRecord])=>Unit )
 
   /**
    * Removes a the delivery associated with the provided from any

Added: activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/StoreUOW.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/StoreUOW.scala?rev=961163&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/StoreUOW.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/StoreUOW.scala Wed Jul  7 04:12:40 2010
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.store
+
+import org.fusesource.hawtdispatch.{Retained}
+import org.apache.activemq.apollo.store._
+
+/**
+ * A store uow is used to perform persistent
+ * operations as a unit of work.
+ *
+ * The uow implements the Retained interface and is
+ * thread safe.  Once the uow is no longer retained,
+ * the unit of work is executed.
+ *
+ * The disposer assigned to the uow will
+ * be executed once the unit of work is persisted
+ * or it has been negated by subsequent storage
+ * operations.
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+trait StoreUOW extends Retained {
+
+  /**
+   * Stores a message.  Messages a reference counted, so make sure you also
+   * enqueue it to queue if you don't want it to be discarded right away.
+   *
+   * This method auto generates and assigns the key field of the message record and
+   * returns it.
+   */
+  def store(message:MessageRecord):Long
+
+  /**
+   * Adds a queue entry
+   */
+  def enqueue(entry:QueueEntryRecord)
+
+  /**
+   * Removes a queue entry
+   */
+  def dequeue(entry:QueueEntryRecord)
+
+  /**
+   * Marks this uow as needing to be completed
+   * as soon as possible.  If not called, the Store
+   * implementation may delay completing the uow in
+   * the hopes that a subsequent uow will cancel negate
+   * all it operations and thus avoid the cost of the
+   * persistence operations.
+   */
+  def completeASAP()
+
+  /**
+   * The specified callback is executed once the UOW
+   * is completed.
+   */
+  def onComplete(callback: Runnable)
+
+}
\ No newline at end of file

Modified: activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreBenchmarkSupport.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreBenchmarkSupport.scala?rev=961163&r1=961162&r2=961163&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreBenchmarkSupport.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreBenchmarkSupport.scala Wed Jul  7 04:12:40 2010
@@ -91,7 +91,7 @@ abstract class StoreBenchmarkSupport ext
     rc.get
   }
 
-  def addMessage(batch:StoreBatch, content:String):Long = {
+  def addMessage(batch:StoreUOW, content:String):Long = {
     var message = new MessageRecord
     message.protocol = ascii("test-protocol")
     message.value = ascii(content).buffer
@@ -124,7 +124,7 @@ abstract class StoreBenchmarkSupport ext
   }
 
   def populate(queueKey:Long, messages:List[String], firstSeq:Long=1) = {
-    var batch = store.createStoreBatch
+    var batch = store.createStoreUOW
     var msgKeys = ListBuffer[Long]()
     var nextSeq = firstSeq
 
@@ -159,7 +159,7 @@ abstract class StoreBenchmarkSupport ext
     var metric = benchmarkCount(100000) {
       seq += 1
 
-      var batch = store.createStoreBatch
+      var batch = store.createStoreUOW
       val message = addMessage(batch, content)
       messageKeys += message
       batch.enqueue(entry(queue, seq, message))

Modified: activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreFunSuiteSupport.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreFunSuiteSupport.scala?rev=961163&r1=961162&r2=961163&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreFunSuiteSupport.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreFunSuiteSupport.scala Wed Jul  7 04:12:40 2010
@@ -90,7 +90,7 @@ abstract class StoreFunSuiteSupport exte
     rc.get
   }
 
-  def addMessage(batch:StoreBatch, content:String):Long = {
+  def addMessage(batch:StoreUOW, content:String):Long = {
     var message = new MessageRecord
     message.protocol = ascii("test-protocol")
     message.value = ascii(content).buffer
@@ -108,7 +108,7 @@ abstract class StoreFunSuiteSupport exte
   }
 
   def populate(queueKey:Long, messages:List[String], firstSeq:Long=1) = {
-    var batch = store.createStoreBatch
+    var batch = store.createStoreUOW
     var msgKeys = ListBuffer[Long]()
     var nextSeq = firstSeq
 
@@ -165,7 +165,7 @@ abstract class StoreFunSuiteSupport exte
     val A = addQueue("A")
     val msgKeys = populate(A, "message 1"::"message 2"::"message 3"::Nil)
 
-    val rc:Seq[QueueEntryRecord] = CB( cb=> store.listQueueEntries(A)(cb) )
+    val rc:Seq[QueueEntryRecord] = CB( cb=> store.listQueueEntries(A,msgKeys.head, msgKeys.last)(cb) )
     expect(msgKeys.toSeq) {
       rc.map( _.messageKey )
     }
@@ -174,7 +174,7 @@ abstract class StoreFunSuiteSupport exte
   test("batch completes after a delay") {x}
   def x = {
     val A = addQueue("A")
-    var batch = store.createStoreBatch
+    var batch = store.createStoreUOW
 
     val m1 = addMessage(batch, "message 1")
     batch.enqueue(entry(A, 1, m1))
@@ -191,7 +191,7 @@ abstract class StoreFunSuiteSupport exte
 
   test("flush cancels the delay") {
     val A = addQueue("A")
-    var batch = store.createStoreBatch
+    var batch = store.createStoreUOW
 
     val m1 = addMessage(batch, "message 1")
     batch.enqueue(entry(A, 1, m1))