You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 06:07:17 UTC

svn commit: r961129 - in /activemq/sandbox/activemq-apollo-actor: activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/ activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/ activemq-cassandra/src/test/scala/org/apac...

Author: chirino
Date: Wed Jul  7 04:07:16 2010
New Revision: 961129

URL: http://svn.apache.org/viewvc?rev=961129&view=rev
Log:
better store tests

Added:
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/test/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStoreTest.scala
      - copied, changed from r961128, activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/StoreDTO.java
    activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/
    activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/
    activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreFunSuiteSupport.scala
Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
    activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala
    activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala
    activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/test/scala/org/apache/activemq/broker/store/cassandra/CassandraStoreTest.scala
    activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/StoreDTO.java
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala
    activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=961129&r1=961128&r2=961129&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala Wed Jul  7 04:07:16 2010
@@ -137,7 +137,7 @@ class VirtualHost(val broker: Broker) ex
                 store.getQueueStatus(queueKey) { x =>
                   x match {
                     case Some(info)=>
-                    store.getQueueEntries(queueKey) { entries=>
+                    store.listQueueEntries(queueKey) { entries=>
                       dispatchQueue ^{
                         val dest = DestinationParser.parse(info.record.name, destination_parser_options)
                         val queue = new Queue(this, dest)

Modified: activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala?rev=961129&r1=961128&r2=961129&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala Wed Jul  7 04:07:16 2010
@@ -115,6 +115,15 @@ class CassandraClient() {
     }
   }
 
+  def removeQueue(queueKey: Long):Boolean = {
+    withSession {
+      session =>
+        session.remove(schema.entries \ queueKey)
+        session.remove(schema.queue_name \ queueKey)
+    }
+    true
+  }
+
   def listQueues: Seq[Long] = {
     withSession {
       session =>
@@ -137,7 +146,7 @@ class CassandraClient() {
             rc.record.key = id
             rc.record.name = new AsciiBuffer(x.value)
 
-//            rc.count = session.count( schema.entries \ id )
+            rc.count = session.count( schema.entries \ id )
             
             // TODO
             //          rc.count =

Modified: activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala?rev=961129&r1=961128&r2=961129&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala Wed Jul  7 04:07:16 2010
@@ -34,7 +34,6 @@ import org.fusesource.hawtdispatch.Scala
 import ReporterLevel._
 
 object CassandraStore extends Log {
-  val DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
 
   /**
    * Creates a default a configuration object.
@@ -132,55 +131,61 @@ class CassandraStore extends Store with 
   /**
    * Deletes all stored data from the store.
    */
-  def purge(cb: =>Unit) = {
+  def purge(callback: =>Unit) = {
     executor_pool ^{
       client.purge
-      cb
+      callback
     }
   }
 
-  def addQueue(record: QueueRecord)(cb: (Option[Long]) => Unit) = {
+  def addQueue(record: QueueRecord)(callback: (Option[Long]) => Unit) = {
     val key = next_queue_key.incrementAndGet
     record.key = key
     executor_pool ^{
       client.addQueue(record)
-      cb(Some(key))
+      callback(Some(key))
     }
   }
 
-  def getQueueStatus(id: Long)(cb: (Option[QueueStatus]) => Unit) = {
+  def removeQueue(queueKey: Long)(callback: (Boolean) => Unit) = {
     executor_pool ^{
-      cb( client.getQueueStatus(id) )
+      callback(client.removeQueue(queueKey))
     }
   }
 
-  def listQueues(cb: (Seq[Long]) => Unit) = {
+  def getQueueStatus(id: Long)(callback: (Option[QueueStatus]) => Unit) = {
     executor_pool ^{
-      cb( client.listQueues )
+      callback( client.getQueueStatus(id) )
     }
   }
 
-  def loadMessage(id: Long)(cb: (Option[MessageRecord]) => Unit) = {
+  def listQueues(callback: (Seq[Long]) => Unit) = {
     executor_pool ^{
-      cb( client.loadMessage(id) )
+      callback( client.listQueues )
+    }
+  }
+
+  def loadMessage(id: Long)(callback: (Option[MessageRecord]) => Unit) = {
+    executor_pool ^{
+      callback( client.loadMessage(id) )
     }
   }
 
 
-  def getQueueEntries(id: Long)(cb: (Seq[QueueEntryRecord]) => Unit) = {
+  def listQueueEntries(id: Long)(callback: (Seq[QueueEntryRecord]) => Unit) = {
     executor_pool ^{
-      cb( client.getQueueEntries(id) )
+      callback( client.getQueueEntries(id) )
     }
   }
 
-  def flushMessage(id: Long)(cb: => Unit) = ^{
+  def flushMessage(id: Long)(callback: => Unit) = ^{
     val action: CassandraBatch#MessageAction = pendingStores.get(id)
     if( action == null ) {
-      cb
+      callback
     } else {
       val prevDisposer = action.tx.getDisposer
       action.tx.setDisposer(^{
-        cb
+        callback
         if(prevDisposer!=null) {
           prevDisposer.run
         }
@@ -291,8 +296,12 @@ class CassandraStore extends Store with 
       val tx_id = next_tx_id.incrementAndGet
       tx.txid = tx_id
       delayedTransactions.put(tx_id, tx)
-      dispatchQueue.dispatchAsync(^{flush(tx_id)})
-      dispatchQueue.dispatchAfter(50, TimeUnit.MILLISECONDS, ^{flush(tx_id)})
+
+      if( config.flushDelay > 0 ) {
+        dispatchQueue.dispatchAfter(config.flushDelay, TimeUnit.MILLISECONDS, ^{flush(tx_id)})
+      } else {
+        dispatchQueue.dispatchAsync(^{flush(tx_id)})
+      }
 
       tx.actions.foreach { case (msg, action) =>
         if( action.store!=null ) {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/test/scala/org/apache/activemq/broker/store/cassandra/CassandraStoreTest.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/test/scala/org/apache/activemq/broker/store/cassandra/CassandraStoreTest.scala?rev=961129&r1=961128&r2=961129&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/test/scala/org/apache/activemq/broker/store/cassandra/CassandraStoreTest.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/test/scala/org/apache/activemq/broker/store/cassandra/CassandraStoreTest.scala Wed Jul  7 04:07:16 2010
@@ -16,145 +16,17 @@
  */
 package org.apache.activemq.broker.store.cassandra
 
-import org.fusesource.hawtbuf.AsciiBuffer._
-import org.scalatest.BeforeAndAfterAll
-import org.fusesource.hawtdispatch.ScalaDispatch._
-import org.fusesource.hawtdispatch.TaskTracker
-import org.apache.activemq.apollo.broker.{LoggingTracker, FunSuiteSupport}
-import java.util.concurrent.{TimeUnit, CountDownLatch}
-import org.apache.activemq.apollo.store.{QueueEntryRecord, QueueStatus, QueueRecord, MessageRecord}
+import org.apache.activemq.broker.store.{Store, StoreFunSuiteSupport}
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class CassandraStoreTest extends FunSuiteSupport with CassandraServerMixin {
+class CassandraStoreTest extends StoreFunSuiteSupport with CassandraServerMixin {
 
-  def CB[T](func: (T=>Unit)=>Unit ) = {
-    class X {
-      var value:T = _
-    }
-    val rc = new X
-    val cd = new CountDownLatch(1)
-    def cb(x:T) = {
-      rc.value = x
-      cd.countDown
-    }
-    func(cb)
-    cd.await
-    rc.value
+  def createStore(flushDelay:Long):Store = {
+    val rc = new CassandraStore
+    rc.config.flushDelay = flushDelay
+    rc
   }
 
-  var store:CassandraStore=null
-
-  override protected def beforeAll() = {
-    store = new CassandraStore()
-    val tracker = new LoggingTracker("store startup")
-    tracker.start(store)
-    tracker.await
-  }
-
-  override protected def afterAll() = {
-    val tracker = new LoggingTracker("store stop")
-    tracker.stop(store)
-    tracker.await
-  }
-
-
-  def expectCB[T](expected:T)(func: (T=>Unit)=>Unit ) = {
-    expect(expected) {
-      CB(func)
-    }
-  }
-
-
-  test("add message") {
-   addMessage
-  }
-
-  def addMessage() {
-    var queueA = new QueueRecord
-    queueA.key =1
-    queueA.name = ascii("queue:1")
-
-    val rc:Option[Long] = CB( cb=> store.addQueue(queueA)(cb) )
-    queueA.key = rc.get
-
-    val expected:Seq[Long] = List(queueA.key)
-    expectCB(expected) { cb=>
-      store.listQueues(cb)
-    }
-
-    var tx = store.createStoreBatch
-    var message = new MessageRecord
-    message.key = 35
-    message.protocol = ascii("test-protocol")
-    message.value = ascii("test content").buffer
-    message.size = message.value.length
-    tx.store(message)
-
-
-    val disposed = new CountDownLatch(1)
-
-    var queueEntry = new QueueEntryRecord
-    queueEntry.queueKey = queueA.key
-    queueEntry.messageKey = message.key
-    queueEntry.queueSeq = 1
-
-    tx.enqueue(queueEntry)
-    tx.setDisposer(^{ disposed.countDown })
-    tx.dispose
-
-    // It should not finish disposing right away...
-    expect(false) {
-      disposed.await(5, TimeUnit.SECONDS)
-    }
-
-    var flushed = new CountDownLatch(1)
-    store.flushMessage(message.key) {
-      flushed.countDown
-    }
-
-    // Should flush quickly now..
-    expect(true) {
-      flushed.await(1, TimeUnit.SECONDS)
-    }
-    // Flushing triggers the tx to finish disposing.
-    expect(true) {
-      disposed.await(1, TimeUnit.SECONDS)
-    }
-
-    // add another message to the queue..
-    tx = store.createStoreBatch
-    message = new MessageRecord
-    message.key = 36
-    message.protocol = ascii("test-protocol")
-    message.value = ascii("test content").buffer
-    message.size = message.value.length
-    tx.store(message)
-
-    queueEntry = new QueueEntryRecord
-    queueEntry.queueKey = queueA.key
-    queueEntry.messageKey = message.key
-    queueEntry.queueSeq = 2
-
-    tx.enqueue(queueEntry)
-
-    flushed = new CountDownLatch(1)
-    store.flushMessage(message.key) {
-      flushed.countDown
-    }
-    flushed.await
-
-    val qso:Option[QueueStatus] = CB( cb=> store.getQueueStatus(queueA.key)(cb) )
-    expect(ascii("queue:1")) {
-      qso.get.record.name
-    }
-    expect(2) {
-      qso.get.count
-    }
-
-    println("xx")
-
-  }
-    
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/StoreDTO.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/StoreDTO.java?rev=961129&r1=961128&r2=961129&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/StoreDTO.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/StoreDTO.java Wed Jul  7 04:07:16 2010
@@ -18,8 +18,10 @@ package org.apache.activemq.apollo.dto;
 
 import org.codehaus.jackson.annotate.JsonTypeInfo;
 
+import javax.xml.bind.annotation.XmlAttribute;
 import javax.xml.bind.annotation.XmlSeeAlso;
 import javax.xml.bind.annotation.XmlType;
+import java.io.File;
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -29,4 +31,14 @@ import javax.xml.bind.annotation.XmlType
 @JsonTypeInfo(use=JsonTypeInfo.Id.CLASS, include=JsonTypeInfo.As.PROPERTY, property="@class")
 public abstract class StoreDTO {
 
+    /**
+     * The flush delay is the amount of time in milliseconds that a store
+     * will delay persisting a messaging unit of work in hopes that it will
+     * be invalidated shortly thereafter by another unit of work which
+     * would negate the operation.
+     */
+    @XmlAttribute(name="flush-delay", required=false)
+    public long flushDelay = 100;
+
+
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala?rev=961129&r1=961128&r2=961129&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala Wed Jul  7 04:07:16 2010
@@ -16,6 +16,9 @@
  */
 package org.apache.activemq.broker.store.hawtdb
 
+import java.{lang=>jl}
+import java.{util=>ju}
+
 import model.{AddQueue, AddQueueEntry, AddMessage}
 import org.apache.activemq.apollo.store.{QueueEntryRecord, QueueStatus, MessageRecord}
 import org.apache.activemq.apollo.dto.HawtDBStoreDTO
@@ -39,6 +42,7 @@ import org.apache.activemq.apollo.broker
 import collection.mutable.{LinkedHashMap, HashMap, ListBuffer}
 import collection.JavaConversions
 import java.util.{TreeSet, HashSet}
+
 import org.fusesource.hawtdb.api._
 
 object HawtDBClient extends Log {
@@ -70,7 +74,7 @@ class HawtDBClient(hawtDBStore: HawtDBSt
 
   private var lockFile: LockFile = null
   private val trackingGen = new AtomicLong(0)
-  private val lockedDatatFiles = new HashSet[java.lang.Integer]()
+  private val lockedDatatFiles = new HashSet[jl.Integer]()
 
   private var recovering = false
   private var nextRecoveryPosition: Location = null
@@ -221,6 +225,13 @@ class HawtDBClient(hawtDBStore: HawtDBSt
     store(update)
   }
 
+  def removeQueue(queueKey: Long):Boolean = {
+    val update = new RemoveQueue.Bean()
+    update.setKey(queueKey)
+    store(update)
+    true
+  }
+
   def store(txs: Seq[HawtDBStore#HawtDBBatch]) {
     var batch = List[TypeCreatable]()
     txs.foreach {
@@ -670,7 +681,7 @@ class HawtDBClient(hawtDBStore: HawtDBSt
             error("Message replay with different location for: %d", key)
           }
         } else {
-          val fileId:java.lang.Integer = location.getDataFileId()
+          val fileId:jl.Integer = location.getDataFileId()
           addAndGet(dataFileRefIndex, fileId, 1)
         }
       }
@@ -678,7 +689,7 @@ class HawtDBClient(hawtDBStore: HawtDBSt
       def removeMessage(key:Long) = {
         val location = messageKeyIndex.remove(key)
         if (location != null) {
-          val fileId:java.lang.Integer = location.getDataFileId()
+          val fileId:jl.Integer = location.getDataFileId()
           addAndGet(dataFileRefIndex, fileId, -1)
         } else {
           error("Cannot remove message, it did not exist: %d", key)
@@ -698,8 +709,19 @@ class HawtDBClient(hawtDBStore: HawtDBSt
       def apply(x: RemoveQueue.Getter): Unit = {
         val queueRecord = queueIndex.remove(x.getKey)
         if (queueRecord != null) {
-          queueEntryIndex(queueRecord).destroy
-          queueTrackingIndex(queueRecord).destroy
+          val trackingIndex = queueTrackingIndex(queueRecord)
+          val entryIndex = queueEntryIndex(queueRecord)
+
+          trackingIndex.iterator.map { entry=>
+            val messageKey = entry.getKey
+            if( addAndGet(messageRefsIndex, messageKey, -1) == 0 ) {
+              // message is no longer referenced.. we can remove it..
+              removeMessage(messageKey.longValue)
+            }
+          }
+
+          entryIndex.destroy
+          trackingIndex.destroy
         }
       }
 
@@ -725,7 +747,7 @@ class HawtDBClient(hawtDBStore: HawtDBSt
               queueRecordUpdate.setSize(queueRecord.getSize + x.getSize)
               queueIndex.put(queueKey, queueRecordUpdate.freeze)
 
-              addAndGet(messageRefsIndex, new java.lang.Long(messageKey), 1)
+              addAndGet(messageRefsIndex, new jl.Long(messageKey), 1)
             } else {
               error("Duplicate queue entry seq %d", x.getQueueSeq)
             }
@@ -752,8 +774,8 @@ class HawtDBClient(hawtDBStore: HawtDBSt
             if (existing == null) {
               error("Tracking entry not found for message %d", queueEntry.getMessageKey)
             }
-            if( addAndGet(messageRefsIndex, new java.lang.Long(messageKey), -1) == 0 ) {
-              // message is no long referenced.. we can remove it..
+            if( addAndGet(messageRefsIndex, new jl.Long(messageKey), -1) == 0 ) {
+              // message is no longer referenced.. we can remove it..
               removeMessage(messageKey)
             }
           } else {
@@ -869,7 +891,7 @@ class HawtDBClient(hawtDBStore: HawtDBSt
     import helper._
 
     debug("Cleanup started.")
-    val gcCandidateSet = new TreeSet[Integer](journal.getFileMap().keySet())
+    val gcCandidateSet = new TreeSet[jl.Integer](journal.getFileMap().keySet())
 
     // Don't cleanup locked data files
     if (lockedDatatFiles != null) {
@@ -929,7 +951,7 @@ class HawtDBClient(hawtDBStore: HawtDBSt
     lazy val messageRefsIndex = MESSAGE_REFS_INDEX_FACTORY.open(_tx, databaseRootRecord.getMessageRefsIndexPage)
     lazy val subscriptionIndex = SUBSCRIPTIONS_INDEX_FACTORY.open(_tx, databaseRootRecord.getSubscriptionIndexPage)
 
-    def addAndGet[K](index:SortedIndex[K, Integer], key:K, amount:Int):Int = {
+    def addAndGet[K](index:SortedIndex[K, jl.Integer], key:K, amount:Int):Int = {
       var counter = index.get(key)
       if( counter == null ) {
         if( amount!=0 ) {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala?rev=961129&r1=961128&r2=961129&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala Wed Jul  7 04:07:16 2010
@@ -121,55 +121,61 @@ class HawtDBStore extends Store with Bas
   /**
    * Deletes all stored data from the store.
    */
-  def purge(cb: =>Unit) = {
+  def purge(callback: =>Unit) = {
     executor_pool ^{
       client.purge
-      cb
+      callback
     }
   }
 
-  def addQueue(record: QueueRecord)(cb: (Option[Long]) => Unit) = {
+  def addQueue(record: QueueRecord)(callback: (Option[Long]) => Unit) = {
     val key = next_queue_key.incrementAndGet
     record.key = key
     executor_pool ^{
       client.addQueue(record)
-      cb(Some(key))
+      callback(Some(key))
     }
   }
 
-  def getQueueStatus(id: Long)(cb: (Option[QueueStatus]) => Unit) = {
+  def removeQueue(queueKey: Long)(callback: (Boolean) => Unit) = {
     executor_pool ^{
-      cb( client.getQueueStatus(id) )
+      callback(client.removeQueue(queueKey))
     }
   }
 
-  def listQueues(cb: (Seq[Long]) => Unit) = {
+  def getQueueStatus(id: Long)(callback: (Option[QueueStatus]) => Unit) = {
     executor_pool ^{
-      cb( client.listQueues )
+      callback( client.getQueueStatus(id) )
     }
   }
 
-  def loadMessage(id: Long)(cb: (Option[MessageRecord]) => Unit) = {
+  def listQueues(callback: (Seq[Long]) => Unit) = {
     executor_pool ^{
-      cb( client.loadMessage(id) )
+      callback( client.listQueues )
+    }
+  }
+
+  def loadMessage(id: Long)(callback: (Option[MessageRecord]) => Unit) = {
+    executor_pool ^{
+      callback( client.loadMessage(id) )
     }
   }
 
 
-  def getQueueEntries(id: Long)(cb: (Seq[QueueEntryRecord]) => Unit) = {
+  def listQueueEntries(id: Long)(callback: (Seq[QueueEntryRecord]) => Unit) = {
     executor_pool ^{
-      cb( client.getQueueEntries(id) )
+      callback( client.getQueueEntries(id) )
     }
   }
 
-  def flushMessage(id: Long)(cb: => Unit) = ^{
+  def flushMessage(id: Long)(callback: => Unit) = ^{
     val action: HawtDBBatch#MessageAction = pendingStores.get(id)
     if( action == null ) {
-      cb
+      callback
     } else {
       val prevDisposer = action.tx.getDisposer
       action.tx.setDisposer(^{
-        cb
+        callback
         if(prevDisposer!=null) {
           prevDisposer.run
         }
@@ -280,8 +286,12 @@ class HawtDBStore extends Store with Bas
       val tx_id = next_tx_id.incrementAndGet
       tx.txid = tx_id
       delayedTransactions.put(tx_id, tx)
-      dispatchQueue.dispatchAsync(^{flush(tx_id)})
-      dispatchQueue.dispatchAfter(50, TimeUnit.MILLISECONDS, ^{flush(tx_id)})
+
+      if( config.flushDelay > 0 ) {
+        dispatchQueue.dispatchAfter(config.flushDelay, TimeUnit.MILLISECONDS, ^{flush(tx_id)})
+      } else {
+        dispatchQueue.dispatchAsync(^{flush(tx_id)})
+      }
 
       tx.actions.foreach { case (msg, action) =>
         if( action.store!=null ) {

Copied: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/test/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStoreTest.scala (from r961128, activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/StoreDTO.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/test/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStoreTest.scala?p2=activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/test/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStoreTest.scala&p1=activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/StoreDTO.java&r1=961128&r2=961129&rev=961129&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/StoreDTO.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/test/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStoreTest.scala Wed Jul  7 04:07:16 2010
@@ -14,19 +14,20 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.apollo.dto;
+package org.apache.activemq.broker.store.cassandra
 
-import org.codehaus.jackson.annotate.JsonTypeInfo;
-
-import javax.xml.bind.annotation.XmlSeeAlso;
-import javax.xml.bind.annotation.XmlType;
+import org.apache.activemq.broker.store.{Store, StoreFunSuiteSupport}
+import org.apache.activemq.broker.store.hawtdb.HawtDBStore
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-@XmlType(name = "store-type")
-@XmlSeeAlso({CassandraStoreDTO.class, HawtDBStoreDTO.class})
-@JsonTypeInfo(use=JsonTypeInfo.Id.CLASS, include=JsonTypeInfo.As.PROPERTY, property="@class")
-public abstract class StoreDTO {
+class HawtDBStoreTest extends StoreFunSuiteSupport {
+
+  def createStore(flushDelay:Long):Store = {
+    val rc = new HawtDBStore
+    rc.config.flushDelay = flushDelay
+    rc
+  }
 
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala?rev=961129&r1=961128&r2=961129&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala Wed Jul  7 04:07:16 2010
@@ -25,30 +25,38 @@ import org.apache.activemq.apollo.broker
 import org.apache.activemq.apollo.dto.StoreDTO
 
 /**
- * A StoreTransaction is used to perform persistent
- * operations as unit of work.
+ * A store batch is used to perform persistent
+ * operations as a unit of work.
+ * 
+ * The batch implements the Retained interface and is
+ * thread safe.  Once the batch is no longer retained,
+ * the unit of work is executed.  
  *
- * The disposer assigned to the store transaction will
- * be executed once all associated persistent operations
- * have been persisted.
+ * The disposer assigned to the batch will
+ * be executed once the unit of work is persisted
+ * or it has been negated by subsequent storage
+ * operations.
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
 trait StoreBatch extends Retained {
 
   /**
-   * Assigns the delivery a store id if it did not already
-   * have one assigned.
+   * Stores a message.  Messages a reference counted, so make sure you also 
+   * enqueue it to queue if you don't want it to be discarded right away.
+   * 
+   * This method auto generates and assigns the key field of the message record and
+   * returns it.
    */
-  def store(delivery:MessageRecord):Long
+  def store(message:MessageRecord):Long
 
   /**
-   * Adds a delivery to a specified queue at a the specified position in the queue.
+   * Adds a queue entry
    */
   def enqueue(entry:QueueEntryRecord)
 
   /**
-   * Removes a delivery from a specified queue at a the specified position in the queue.
+   * Removes a queue entry
    */
   def dequeue(entry:QueueEntryRecord)
 
@@ -59,50 +67,65 @@ trait StoreBatch extends Retained {
  */
 trait Store extends Service {
 
+  /**
+   * Creates a store batch which is used to perform persistent
+   * operations as unit of work.
+   */
+  def createStoreBatch():StoreBatch
+
+  /**
+   * Supplies configuration data to the Store.  This will be called
+   * before the store is started, but may also occur after the the Store 
+   * is started.
+   */
   def configure(config: StoreDTO, reporter:Reporter):Unit
 
   /**
-   * Deletes all stored data from the store.
+   * Removes all previously stored data.
    */
-  def purge(cb: =>Unit):Unit
+  def purge(callback: =>Unit):Unit
 
   /**
-   *  Stores a queue, calls back with a unquie id for the stored queue.
+   * Adds a queue.
+   * 
+   * This method auto generates and assigns the key field of the queue record and
+   * returns it via the callback.
    */
-  def addQueue(record:QueueRecord)(cb:(Option[Long])=>Unit):Unit
+  def addQueue(record:QueueRecord)(callback:(Option[Long])=>Unit):Unit
 
   /**
-   * Loads the queue information for a given queue id.
+   * Removes a queue. Success is reported via the callback.
    */
-  def getQueueStatus(id:Long)(cb:(Option[QueueStatus])=>Unit )
+  def removeQueue(queueKey:Long)(callback:(Boolean)=>Unit):Unit
 
   /**
-   * gets a listing of all queues previously added.
+   * Loads the queue information for a given queue key.
    */
-  def listQueues(cb: (Seq[Long])=>Unit )
+  def getQueueStatus(queueKey:Long)(callback:(Option[QueueStatus])=>Unit )
+
+  /**
+   * Gets a listing of all queue entry sequences previously added
+   * and reports them to the callback.
+   */
+  def listQueues(callback: (Seq[Long])=>Unit )
 
   /**
    * Loads the queue information for a given queue id.
    */
-  def getQueueEntries(id:Long)(cb:(Seq[QueueEntryRecord])=>Unit )
+  def listQueueEntries(queueKey:Long)(callback:(Seq[QueueEntryRecord])=>Unit )
 
   /**
    * Removes a the delivery associated with the provided from any
    * internal buffers/caches.  The callback is executed once, the message is
    * no longer buffered.
    */
-  def flushMessage(id:Long)(cb: =>Unit)
+  def flushMessage(messageKey:Long)(callback: =>Unit)
 
   /**
    * Loads a delivery with the associated id from persistent storage.
    */
-  def loadMessage(id:Long)(cb:(Option[MessageRecord])=>Unit )
+  def loadMessage(messageKey:Long)(callback:(Option[MessageRecord])=>Unit )
 
-  /**
-   * Creates a StoreBatch which is used to perform persistent
-   * operations as unit of work.
-   */
-  def createStoreBatch():StoreBatch
 
 }
 

Added: activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreFunSuiteSupport.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreFunSuiteSupport.scala?rev=961129&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreFunSuiteSupport.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreFunSuiteSupport.scala Wed Jul  7 04:07:16 2010
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.store
+
+import org.fusesource.hawtbuf.AsciiBuffer._
+import org.fusesource.hawtdispatch.ScalaDispatch._
+import org.fusesource.hawtdispatch.TaskTracker
+import org.apache.activemq.apollo.broker.{LoggingTracker, FunSuiteSupport}
+import java.util.concurrent.{TimeUnit, CountDownLatch}
+import org.apache.activemq.apollo.store.{QueueEntryRecord, QueueStatus, QueueRecord, MessageRecord}
+import org.scalatest.{BeforeAndAfterEach, BeforeAndAfterAll}
+import collection.mutable.ListBuffer
+
+/**
+ * <p>Implements generic testing of Store implementations.</p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+abstract class StoreFunSuiteSupport extends FunSuiteSupport with BeforeAndAfterEach {
+
+  var store:Store = null
+
+  def createStore(flushDelay:Long):Store
+
+  /**
+   * Handy helper to call an async method on the store and wait for
+   * the result of the callback.
+   */
+  def CB[T](func: (T=>Unit)=>Unit ) = {
+    class X {
+      var value:T = _
+    }
+    val rc = new X
+    val cd = new CountDownLatch(1)
+    def cb(x:T) = {
+      rc.value = x
+      cd.countDown
+    }
+    func(cb)
+    cd.await
+    rc.value
+  }
+
+
+  override protected def beforeAll() = {
+    store = createStore(5*1000)
+    val tracker = new LoggingTracker("store startup")
+    tracker.start(store)
+    tracker.await
+  }
+
+  override protected def afterAll() = {
+    val tracker = new LoggingTracker("store stop")
+    tracker.stop(store)
+    tracker.await
+  }
+
+  override protected def beforeEach() = {
+    val tracker = new LoggingTracker("store startup")
+    val task = tracker.task("purge")
+    store.purge(task.run)
+    tracker.await
+  }
+
+  def expectCB[T](expected:T)(func: (T=>Unit)=>Unit ) = {
+    expect(expected) {
+      CB(func)
+    }
+  }
+
+  def addQueue(name:String):Long = {
+    var queueA = new QueueRecord
+    queueA.name = ascii(name)
+    val rc:Option[Long] = CB( cb=> store.addQueue(queueA)(cb) )
+    expect(true)(rc.isDefined)
+    rc.get
+  }
+
+  def addMessage(batch:StoreBatch, content:String):Long = {
+    var message = new MessageRecord
+    message.protocol = ascii("test-protocol")
+    message.value = ascii(content).buffer
+    message.size = message.value.length
+    batch.store(message)
+  }
+
+
+  def entry(queueKey:Long, queueSeq:Long, messageKey:Long=0) = {
+    var queueEntry = new QueueEntryRecord
+    queueEntry.queueKey = queueKey
+    queueEntry.queueSeq = queueSeq
+    queueEntry.messageKey = messageKey
+    queueEntry
+  }
+
+  def populate(queueKey:Long, messages:List[String], firstSeq:Long=1) = {
+    var batch = store.createStoreBatch
+    var msgKeys = ListBuffer[Long]()
+    var nextSeq = firstSeq
+
+    messages.foreach { message=>
+      val msgKey = addMessage(batch, message)
+      msgKeys += msgKey
+      batch.enqueue(entry(queueKey, nextSeq, msgKey))
+      nextSeq += 1
+    }
+
+    val tracker = new TaskTracker()
+    tracker.release(batch)
+    msgKeys.foreach { msgKey =>
+      store.flushMessage(msgKey) {}
+    }
+    tracker.await
+    msgKeys
+  }
+
+  test("add and list queues") {
+    val A = addQueue("A")
+    val B = addQueue("B")
+    val C = addQueue("C")
+
+    expectCB(List(A,B,C).toSeq) { cb=>
+      store.listQueues(cb)
+    }
+  }
+
+  test("get queue status") {
+    val A = addQueue("my queue name")
+    populate(A, "message 1"::"message 2"::"message 3"::Nil)
+
+    val rc:Option[QueueStatus] = CB( cb=> store.getQueueStatus(A)(cb) )
+    expect(ascii("my queue name")) {
+      rc.get.record.name
+    }
+    expect(3) {
+      rc.get.count
+    }
+  }
+
+  test("list queue entries") {
+    val A = addQueue("A")
+    val msgKeys = populate(A, "message 1"::"message 2"::"message 3"::Nil)
+
+    val rc:Seq[QueueEntryRecord] = CB( cb=> store.listQueueEntries(A)(cb) )
+    expect(msgKeys.toSeq) {
+      rc.map( _.messageKey )
+    }
+  }
+
+  test("load stored message") {
+    val A = addQueue("A")
+    val msgKeys = populate(A, "message 1"::"message 2"::"message 3"::Nil)
+
+    val rc:Option[MessageRecord] = CB( cb=> store.loadMessage(msgKeys.head)(cb) )
+    expect(ascii("message 1").buffer) {
+      rc.get.value
+    }
+  }
+
+  test("batch completes after a delay") {x}
+  def x = {
+    val A = addQueue("A")
+    var batch = store.createStoreBatch
+
+    val m1 = addMessage(batch, "message 1")
+    batch.enqueue(entry(A, 1, m1))
+
+    val tracker = new TaskTracker()
+    tracker.release(batch)
+    expect(false) {
+      tracker.await(3, TimeUnit.SECONDS)
+    }
+    expect(true) {
+      tracker.await(3, TimeUnit.SECONDS)
+    }
+  }
+
+  test("flush cancels the completion delay") {
+    val A = addQueue("A")
+    var batch = store.createStoreBatch
+
+    val m1 = addMessage(batch, "message 1")
+    batch.enqueue(entry(A, 1, m1))
+
+    val tracker = new TaskTracker()
+    tracker.release(batch)
+
+    store.flushMessage(m1) {}
+
+    expect(true) {
+      tracker.await(1, TimeUnit.SECONDS)
+    }
+  }
+
+
+}