You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2014/10/23 17:30:11 UTC

git commit: https://issues.apache.org/jira/browse/AMQ-5266 tidy up leveldb impl with additional scenario tests

Repository: activemq
Updated Branches:
  refs/heads/trunk 642cc4321 -> 3042797b4


https://issues.apache.org/jira/browse/AMQ-5266 tidy up leveldb impl with additional scenario tests


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/3042797b
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/3042797b
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/3042797b

Branch: refs/heads/trunk
Commit: 3042797b41d04a682c7687c0589214f5a2f2e4ba
Parents: 642cc43
Author: gtully <ga...@gmail.com>
Authored: Thu Oct 23 16:28:33 2014 +0100
Committer: gtully <ga...@gmail.com>
Committed: Thu Oct 23 16:29:57 2014 +0100

----------------------------------------------------------------------
 .../activemq/store/jdbc/JDBCMessageStore.java   | 18 +++--
 .../org/apache/activemq/leveldb/DBManager.scala | 16 +----
 .../apache/activemq/leveldb/LevelDBClient.scala | 10 +--
 .../apache/activemq/leveldb/LevelDBStore.scala  | 59 ++++++++-------
 .../activemq/bugs/AMQ5266SingleDestTest.java    |  4 +-
 .../bugs/AMQ5266StarvedConsumerTest.java        | 43 +++--------
 .../org/apache/activemq/bugs/AMQ5266Test.java   | 75 +++++++-------------
 7 files changed, 93 insertions(+), 132 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/3042797b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
----------------------------------------------------------------------
diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
index 43daff2..75a68c7 100755
--- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
+++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
@@ -21,7 +21,6 @@ import java.sql.SQLException;
 import java.util.Iterator;
 import java.util.LinkedHashSet;
 import java.util.LinkedList;
-import java.util.List;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -73,7 +72,7 @@ public class JDBCMessageStore extends AbstractMessageStore {
     protected AtomicLong lastRecoveredPriority = new AtomicLong(Byte.MAX_VALUE -1);
     final Set<Long> recoveredAdditions = new LinkedHashSet<Long>();
     protected ActiveMQMessageAudit audit;
-    protected final List<Long> pendingAdditions = new LinkedList<Long>();
+    protected final LinkedList<Long> pendingAdditions = new LinkedList<Long>();
     
     public JDBCMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQDestination destination, ActiveMQMessageAudit audit) throws IOException {
         super(destination);
@@ -131,7 +130,8 @@ public class JDBCMessageStore extends AbstractMessageStore {
             pendingAdditions.add(sequence);
             c.onCompletion(new Runnable() {
                 public void run() {
-                    // message added to db
+                    // jdbc close or jms commit - while futureOrSequenceLong==null ordered
+                    // work will remain pending on the Queue
                     message.getMessageId().setFutureOrSequenceLong(sequence);
                     message.getMessageId().setEntryLocator(sequence);
                 }
@@ -341,6 +341,9 @@ public class JDBCMessageStore extends AbstractMessageStore {
                     }
                 }
             }
+            if (LOG.isTraceEnabled()) {
+                LOG.trace(this + " recoverNext lastRecovered:" + lastRecoveredSequenceId.get() + ", minPending:" + minPendingSequeunceId());
+            }
             adapter.doRecoverNextMessages(c, destination, minPendingSequeunceId(), lastRecoveredSequenceId.get(), lastRecoveredPriority.get(),
                     maxReturned, isPrioritizedMessages(), new JDBCMessageRecoveryListener() {
 
@@ -376,7 +379,7 @@ public class JDBCMessageStore extends AbstractMessageStore {
      */
     public void resetBatching() {
         if (LOG.isTraceEnabled()) {
-            LOG.trace(destination.getPhysicalName() + " resetBatching, existing last recovered seqId: " + lastRecoveredSequenceId.get());
+            LOG.trace(this + " resetBatching, existing last recovered seqId: " + lastRecoveredSequenceId.get());
         }
         lastRecoveredSequenceId.set(-1);
         lastRecoveredPriority.set(Byte.MAX_VALUE - 1);
@@ -394,7 +397,7 @@ public class JDBCMessageStore extends AbstractMessageStore {
             lastRecoveredPriority.set(Byte.MAX_VALUE -1);
         }
         if (LOG.isTraceEnabled()) {
-            LOG.trace(destination.getPhysicalName() + " setBatch: new sequenceId: " + lastRecoveredSequenceId.get()
+            LOG.trace(this + " setBatch: new sequenceId: " + lastRecoveredSequenceId.get()
                     + ", priority: " + lastRecoveredPriority.get());
         }
     }
@@ -403,4 +406,9 @@ public class JDBCMessageStore extends AbstractMessageStore {
     public void setPrioritizedMessages(boolean prioritizedMessages) {
         super.setPrioritizedMessages(prioritizedMessages);
     }
+
+    @Override
+    public String toString() {
+        return destination.getPhysicalName() + ",pendingSize:" + pendingAdditions.size();
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/3042797b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
index 09c1378..b0051cc 100644
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
+++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
@@ -104,7 +104,6 @@ class CountDownFuture[T <: AnyRef]() extends ListenableFuture[T] {
   var value:T = _
   var error:Throwable = _
   var listener:Runnable = _
-  var id:MessageId = _
 
   def cancel(mayInterruptIfRunning: Boolean) = false
   def isCancelled = false
@@ -116,9 +115,6 @@ class CountDownFuture[T <: AnyRef]() extends ListenableFuture[T] {
 
   def set(v:T) = {
     value = v
-    if (id != null) {
-      id.setFutureOrSequenceLong(id.getEntryLocator.asInstanceOf[EntryLocator].seq)
-    }
     latch.countDown()
     fireListener
   }
@@ -330,14 +326,6 @@ class DelayableUOW(val manager:DBManager) extends BaseRetained {
     val entry = QueueEntryRecord(id, queueKey, queueSeq)
     assert(id.getEntryLocator == null)
     id.setEntryLocator(EntryLocator(queueKey, queueSeq))
-    if (message.getTransactionId!=null) {
-        // why does future not get set in tx?
-       id.setFutureOrSequenceLong(queueSeq)
-    } else {
-      id.setFutureOrSequenceLong(countDownFuture)
-      message.setRecievedByDFBridge(true)
-      countDownFuture.id = id
-    }
 
     val a = this.synchronized {
       if( !delay )
@@ -741,10 +729,10 @@ class DBManager(val parent:LevelDBStore) {
     client.collectionIsEmpty(key)
   }
 
-  def cursorMessages(preparedAcks:java.util.HashSet[MessageId], key:Long, listener:MessageRecoveryListener, startPos:Long, max:Long=Long.MaxValue) = {
+  def cursorMessages(preparedAcks:java.util.HashSet[MessageId], key:Long, listener:MessageRecoveryListener, startPos:Long, endPos:Long=Long.MaxValue, max:Long=Long.MaxValue) = {
     var lastmsgid:MessageId = null
     var count = 0L
-    client.queueCursor(key, startPos) { msg =>
+    client.queueCursor(key, startPos, endPos) { msg =>
       if( !preparedAcks.contains(msg.getMessageId) && listener.recoverMessage(msg) ) {
         lastmsgid = msg.getMessageId
         count += 1

http://git-wip-us.apache.org/repos/asf/activemq/blob/3042797b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
index 3d6178b..64bbcee 100755
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
+++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
@@ -1255,8 +1255,8 @@ class LevelDBClient(store: LevelDBStore) {
     return rc
   }
 
-  def queueCursor(collectionKey: Long, seq:Long)(func: (Message)=>Boolean) = {
-    collectionCursor(collectionKey, encodeLong(seq)) { (key, value) =>
+  def queueCursor(collectionKey: Long, seq:Long, endSeq:Long)(func: (Message)=>Boolean) = {
+    collectionCursor(collectionKey, encodeLong(seq), encodeLong(endSeq)) { (key, value) =>
       val seq = decodeLong(key)
       var locator = DataLocator(store, value.getValueLocation, value.getValueLength)
       val msg = getMessage(locator)
@@ -1273,7 +1273,7 @@ class LevelDBClient(store: LevelDBStore) {
   }
 
   def transactionCursor(collectionKey: Long)(func: (AnyRef)=>Boolean) = {
-    collectionCursor(collectionKey, encodeLong(0)) { (key, value) =>
+    collectionCursor(collectionKey, encodeLong(0), encodeLong(Long.MaxValue)) { (key, value) =>
       val seq = decodeLong(key)
       if( value.getMeta != null ) {
 
@@ -1336,12 +1336,12 @@ class LevelDBClient(store: LevelDBStore) {
     store.wireFormat.unmarshal(new ByteSequence(data.data, data.offset, data.length)).asInstanceOf[Message]
   }
 
-  def collectionCursor(collectionKey: Long, cursorPosition:Buffer)(func: (Buffer, EntryRecord.Buffer)=>Boolean) = {
+  def collectionCursor(collectionKey: Long, cursorPosition:Buffer, endCursorPosition:Buffer)(func: (Buffer, EntryRecord.Buffer)=>Boolean) = {
     val ro = new ReadOptions
     ro.fillCache(true)
     ro.verifyChecksums(verifyChecksums)
     val start = encodeEntryKey(ENTRY_PREFIX, collectionKey, cursorPosition)
-    val end = encodeLongKey(ENTRY_PREFIX, collectionKey+1)
+    val end = encodeEntryKey(ENTRY_PREFIX, collectionKey, endCursorPosition)
     might_fail_using_index {
       index.cursorRange(start, end, ro) { case (key, value) =>
         func(key.buffer.moveHead(9), EntryRecord.FACTORY.parseUnframed(value))

http://git-wip-us.apache.org/repos/asf/activemq/blob/3042797b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
index 451bc04..52a785a 100644
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
+++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
@@ -35,7 +35,8 @@ import org.apache.activemq.leveldb.util.Log
 import org.apache.activemq.store.PList.PListIterator
 import org.fusesource.hawtbuf.{UTF8Buffer, DataByteArrayOutputStream}
 import org.fusesource.hawtdispatch;
-import org.apache.activemq.broker.scheduler.JobSchedulerStore;
+import org.apache.activemq.broker.scheduler.JobSchedulerStore
+import org.apache.activemq.store.IndexListener.MessageContext
 
 object LevelDBStore extends Log {
   val DEFAULT_DIRECTORY = new File("LevelDB");
@@ -245,7 +246,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
       val (msgs, acks) = db.getXAActions(transaction.xacontainer_id)
       transaction.xarecovery = (msgs, acks.map(_.ack))
       for ( msg <- msgs ) {
-        transaction.add(createMessageStore(msg.getDestination), new IndexListener.MessageContext(null, msg, null), false);
+        transaction.add(createMessageStore(msg.getDestination), null, msg, false);
       }
       for ( record <- acks ) {
         var ack = record.ack
@@ -348,27 +349,27 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
       }
     }
 
-  def add(store:LevelDBStore#LevelDBMessageStore, messageContext:IndexListener.MessageContext, delay:Boolean) = {
+  def add(store:LevelDBStore#LevelDBMessageStore, context: ConnectionContext, message: Message, delay:Boolean) = {
       commitActions += new TransactionAction() {
         def commit(uow:DelayableUOW) = {
           if( prepared ) {
-            uow.dequeue(xacontainer_id, messageContext.message.getMessageId)
+            uow.dequeue(xacontainer_id, message.getMessageId)
           }
-          var copy = messageContext.message.getMessageId.copy()
+          var copy = message.getMessageId.copy()
           copy.setEntryLocator(null)
-          messageContext.message.setMessageId(copy)
-          store.doAdd(uow, messageContext, delay)
+          message.setMessageId(copy)
+          store.doAdd(uow, context, message, delay)
         }
 
         def prepare(uow:DelayableUOW) = {
           // add it to the xa container instead of the actual store container.
-          uow.enqueue(xacontainer_id, xaseqcounter.incrementAndGet, messageContext.message, delay)
-          xarecovery._1 += messageContext.message
+          uow.enqueue(xacontainer_id, xaseqcounter.incrementAndGet, message, delay)
+          xarecovery._1 += message
         }
 
         def rollback(uow:DelayableUOW) = {
           if( prepared ) {
-            uow.dequeue(xacontainer_id, messageContext.message.getMessageId)
+            uow.dequeue(xacontainer_id, message.getMessageId)
           }
         }
 
@@ -671,23 +672,29 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
     val lastSeq: AtomicLong = new AtomicLong(0)
     protected var cursorPosition: Long = 0
     val preparedAcks = new HashSet[MessageId]()
-
+    val pendingCursorAdds = new LinkedList[Long]()
     lastSeq.set(db.getLastQueueEntrySeq(key))
 
     def cursorResetPosition = 0L
 
-    def doAdd(uow: DelayableUOW, messageContext:IndexListener.MessageContext, delay:Boolean): CountDownFuture[AnyRef] = {
+    def doAdd(uow: DelayableUOW, context: ConnectionContext, message: Message, delay:Boolean): CountDownFuture[AnyRef] = {
       check_running
-      val seq = lastSeq.incrementAndGet()
-      messageContext.message.incrementReferenceCount()
+      message.incrementReferenceCount()
       uow.addCompleteListener({
-        messageContext.message.decrementReferenceCount()
+        message.decrementReferenceCount()
       })
-      val future = uow.enqueue(key, seq, messageContext.message, delay)
-      if (indexListener != null) {
-        indexListener.onAdd(messageContext)
+      val sequence = lastSeq.synchronized {
+        val seq = lastSeq.incrementAndGet()
+        message.getMessageId.setFutureOrSequenceLong(seq);
+        if (indexListener != null) {
+          pendingCursorAdds.synchronized { pendingCursorAdds.add(seq) }
+          indexListener.onAdd(new MessageContext(context, message, new Runnable {
+            def run(): Unit = pendingCursorAdds.synchronized { pendingCursorAdds.remove(seq) }
+          }))
+        }
+        seq
       }
-      future
+      uow.enqueue(key, sequence, message, delay)
     }
 
     override def asyncAddQueueMessage(context: ConnectionContext, message: Message) = asyncAddQueueMessage(context, message, false)
@@ -695,11 +702,11 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
       check_running
       message.getMessageId.setEntryLocator(null)
       if(  message.getTransactionId!=null ) {
-        transaction(message.getTransactionId).add(this, new IndexListener.MessageContext(context, message, null), delay)
+        transaction(message.getTransactionId).add(this, context, message, delay)
         DONE
       } else {
         withUow { uow=>
-          doAdd(uow, new IndexListener.MessageContext(context, message, null), delay)
+          doAdd(uow, context, message, delay)
         }
       }
     }
@@ -759,9 +766,13 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
       return db.collectionIsEmpty(key)
     }
 
+    def getCursorPendingLimit: Long = {
+      pendingCursorAdds.synchronized { Option(pendingCursorAdds.peek).getOrElse(Long.MaxValue) }
+    }
+
     def recover(listener: MessageRecoveryListener): Unit = {
       check_running
-      cursorPosition = db.cursorMessages(preparedAcks, key, listener, cursorResetPosition)
+      cursorPosition = db.cursorMessages(preparedAcks, key, listener, cursorResetPosition, getCursorPendingLimit)
     }
 
     def resetBatching: Unit = {
@@ -770,11 +781,11 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
 
     def recoverNextMessages(maxReturned: Int, listener: MessageRecoveryListener): Unit = {
       check_running
-      cursorPosition = db.cursorMessages(preparedAcks, key, listener, cursorPosition, maxReturned)
+      cursorPosition = db.cursorMessages(preparedAcks, key, listener, cursorPosition, getCursorPendingLimit, maxReturned)
     }
 
     override def setBatch(id: MessageId): Unit = {
-      cursorPosition = db.queuePosition(id) + 1
+      cursorPosition = Math.min(getCursorPendingLimit, db.queuePosition(id)) + 1
     }
 
   }

http://git-wip-us.apache.org/repos/asf/activemq/blob/3042797b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266SingleDestTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266SingleDestTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266SingleDestTest.java
index 131f807..cfd6534 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266SingleDestTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266SingleDestTest.java
@@ -89,7 +89,9 @@ public class AMQ5266SingleDestTest {
     @Parameterized.Parameters(name="#{0},producerThreads:{1},consumerThreads:{2},mL:{3},useCache:{4},useDefaultStore:{5},optimizedDispatch:{6}")
     public static Iterable<Object[]> parameters() {
         return Arrays.asList(new Object[][]{
-                {1000,  80,  80,   1024*1024*1,  true, TestSupport.PersistenceAdapterChoice.KahaDB, false},
+               {1000,  40,  40,   1024*1024*1,  true, TestSupport.PersistenceAdapterChoice.KahaDB, false},
+               {1000,  40,  40,   1024*1024*1,  true, TestSupport.PersistenceAdapterChoice.LevelDB, false},
+               {1000,  40,  40,   1024*1024*1,  true, TestSupport.PersistenceAdapterChoice.JDBC, false},
         });
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/3042797b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266StarvedConsumerTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266StarvedConsumerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266StarvedConsumerTest.java
index 300bec1..f7409dd 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266StarvedConsumerTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266StarvedConsumerTest.java
@@ -37,6 +37,7 @@ import javax.jms.TextMessage;
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.RedeliveryPolicy;
+import org.apache.activemq.TestSupport;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.TransportConnector;
 import org.apache.activemq.broker.region.RegionBroker;
@@ -66,7 +67,6 @@ public class AMQ5266StarvedConsumerTest {
     static Logger LOG = LoggerFactory.getLogger(AMQ5266StarvedConsumerTest.class);
     String activemqURL;
     BrokerService brokerService;
-    private EmbeddedDataSource dataSource;
 
     public int messageSize = 1000;
 
@@ -86,16 +86,22 @@ public class AMQ5266StarvedConsumerTest {
     public boolean useCache = true;
 
     @Parameterized.Parameter(5)
-    public boolean useDefaultStore = false;
+    public TestSupport.PersistenceAdapterChoice persistenceAdapterChoice = TestSupport.PersistenceAdapterChoice.KahaDB;
 
     @Parameterized.Parameter(6)
     public boolean optimizeDispatch = false;
     private  AtomicBoolean didNotReceive = new AtomicBoolean(false);
 
-    @Parameterized.Parameters(name="#{0},producerThreads:{1},consumerThreads:{2},mL:{3},useCache:{4},useDefaultStore:{5},optimizedDispatch:{6}")
+    @Parameterized.Parameters(name="#{0},producerThreads:{1},consumerThreads:{2},mL:{3},useCache:{4},store:{5},optimizedDispatch:{6}")
     public static Iterable<Object[]> parameters() {
         return Arrays.asList(new Object[][]{
-                {1000, 40,  5,   1024*1024,  false,  false, true},
+                {1000, 40,  5,   1024*1024,  false, TestSupport.PersistenceAdapterChoice.KahaDB, true},
+                {1000, 40,  5,   1024*1024,  false, TestSupport.PersistenceAdapterChoice.LevelDB, true},
+                {1000, 40,  5,   1024*1024,  false, TestSupport.PersistenceAdapterChoice.JDBC, true},
+
+                {500, 20,  20,   1024*1024,  false, TestSupport.PersistenceAdapterChoice.KahaDB, true},
+                {500, 20,  20,   1024*1024,  false, TestSupport.PersistenceAdapterChoice.LevelDB, true},
+                {500, 20,  20,   1024*1024,  false, TestSupport.PersistenceAdapterChoice.JDBC, true},
         });
     }
 
@@ -104,21 +110,7 @@ public class AMQ5266StarvedConsumerTest {
     @Before
     public void startBroker() throws Exception {
         brokerService = new BrokerService();
-
-        dataSource = new EmbeddedDataSource();
-        dataSource.setDatabaseName("target/derbyDb");
-        dataSource.setCreateDatabase("create");
-
-        JDBCPersistenceAdapter jdbcPersistenceAdapter = new JDBCPersistenceAdapter();
-        jdbcPersistenceAdapter.setDataSource(dataSource);
-        jdbcPersistenceAdapter.setUseLock(false);
-
-        if (!useDefaultStore) {
-            brokerService.setPersistenceAdapter(jdbcPersistenceAdapter);
-        } else {
-            KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter) brokerService.getPersistenceAdapter();
-            kahaDBPersistenceAdapter.setConcurrentStoreAndDispatchQueues(true);
-        }
+        TestSupport.setPersistenceAdapter(brokerService, persistenceAdapterChoice);
         brokerService.setDeleteAllMessagesOnStartup(true);
         brokerService.setUseJmx(false);
         brokerService.setAdvisorySupport(false);
@@ -149,10 +141,6 @@ public class AMQ5266StarvedConsumerTest {
         if (brokerService != null) {
             brokerService.stop();
         }
-        try {
-            dataSource.setShutdownDatabase("shutdown");
-            dataSource.getConnection();
-        } catch (Exception ignored) {}
     }
 
     CyclicBarrier globalProducerHalt = new CyclicBarrier(publisherThreadCount, new Runnable() {
@@ -216,9 +204,6 @@ public class AMQ5266StarvedConsumerTest {
             try {
                 int secs = (int) (endWait - System.currentTimeMillis()) / 1000;
                 LOG.info("Waiting For Consumer Completion. Time left: " + secs + " secs");
-                if (!useDefaultStore) {
-                    DefaultJDBCAdapter.dumpTables(dataSource.getConnection());
-                }
                 Thread.sleep(10000);
             } catch (Exception e) {
             }
@@ -228,12 +213,6 @@ public class AMQ5266StarvedConsumerTest {
 
         consumer.shutdown();
 
-        TimeUnit.SECONDS.sleep(2);
-        LOG.info("DB Contents START");
-        if (!useDefaultStore) {
-            DefaultJDBCAdapter.dumpTables(dataSource.getConnection());
-        }
-        LOG.info("DB Contents END");
 
         LOG.info("Consumer Stats:");
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/3042797b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266Test.java
index efccefa..e180746 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266Test.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266Test.java
@@ -35,15 +35,12 @@ import javax.jms.TextMessage;
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.RedeliveryPolicy;
+import org.apache.activemq.TestSupport;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.TransportConnector;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
-import org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter;
-import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
-import org.apache.derby.jdbc.EmbeddedDataSource;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -65,7 +62,6 @@ public class AMQ5266Test {
     static Logger LOG = LoggerFactory.getLogger(AMQ5266Test.class);
     String activemqURL = "tcp://localhost:61617";
     BrokerService brokerService;
-    private EmbeddedDataSource dataSource;
 
     public int messageSize = 1000;
 
@@ -85,28 +81,34 @@ public class AMQ5266Test {
     public boolean useCache = true;
 
     @Parameterized.Parameter(5)
-    public boolean useDefaultStore = false;
+    public TestSupport.PersistenceAdapterChoice persistenceAdapterChoice = TestSupport.PersistenceAdapterChoice.KahaDB;
 
     @Parameterized.Parameter(6)
     public boolean optimizeDispatch = false;
 
-    @Parameterized.Parameters(name="#{0},producerThreads:{1},consumerThreads:{2},mL:{3},useCache:{4},useDefaultStore:{5},optimizedDispatch:{6}")
+    @Parameterized.Parameters(name="#{0},producerThreads:{1},consumerThreads:{2},mL:{3},useCache:{4},store:{5},optimizedDispatch:{6}")
     public static Iterable<Object[]> parameters() {
         return Arrays.asList(new Object[][]{
-                // jdbc
-                {1,    1,   1,   50*1024,   false, false, true},
-                {1000, 20,  5,   50*1024,   true,  false, false},
-                {100,  20,  5,   50*1024,   false, false, false},
-                {1000, 5,   20,  50*1024,   true,  false, false},
-                {1000, 20,  20,  1024*1024, true,  false, false},
-
-                // default store
-                {1,    1,   1,   50*1024,   false, true, true},
-                {100,  5,   5,   50*1024,   false, true, false},
-                {1000, 20,  5,   50*1024,   true,  true, false},
-                {100,  20,  5,   50*1024,   false, true, false},
-                {1000, 5,   20,  50*1024,   true,  true, false},
-                {1000, 20,  20,  1024*1024, true,  true, false},
+                {1,    1,   1,   50*1024,   false, TestSupport.PersistenceAdapterChoice.JDBC, true},
+                {1000, 20,  5,   50*1024,   true,  TestSupport.PersistenceAdapterChoice.JDBC, false},
+                {100,  20,  5,   50*1024,   false, TestSupport.PersistenceAdapterChoice.JDBC, false},
+                {1000, 5,   20,  50*1024,   true,  TestSupport.PersistenceAdapterChoice.JDBC, false},
+                {1000, 20,  20,  1024*1024, true,  TestSupport.PersistenceAdapterChoice.JDBC, false},
+
+                {1,    1,   1,   50*1024,   false, TestSupport.PersistenceAdapterChoice.KahaDB, true},
+                {100,  5,   5,   50*1024,   false, TestSupport.PersistenceAdapterChoice.KahaDB, false},
+                {1000, 20,  5,   50*1024,   true,  TestSupport.PersistenceAdapterChoice.KahaDB, false},
+                {100,  20,  5,   50*1024,   false, TestSupport.PersistenceAdapterChoice.KahaDB, false},
+                {1000, 5,   20,  50*1024,   true,  TestSupport.PersistenceAdapterChoice.KahaDB, false},
+                {1000, 20,  20,  1024*1024, true,  TestSupport.PersistenceAdapterChoice.KahaDB, false},
+
+                {1,    1,   1,   50*1024,   false, TestSupport.PersistenceAdapterChoice.LevelDB, true},
+                {100,  5,   5,   50*1024,   false, TestSupport.PersistenceAdapterChoice.LevelDB, false},
+                {1000, 20,  5,   50*1024,   true,  TestSupport.PersistenceAdapterChoice.LevelDB, false},
+                {100,  20,  5,   50*1024,   false, TestSupport.PersistenceAdapterChoice.LevelDB, false},
+                {1000, 5,   20,  50*1024,   true,  TestSupport.PersistenceAdapterChoice.LevelDB, false},
+                {1000, 20,  20,  1024*1024, true,  TestSupport.PersistenceAdapterChoice.LevelDB, false},
+
         });
     }
 
@@ -115,25 +117,10 @@ public class AMQ5266Test {
     @Before
     public void startBroker() throws Exception {
         brokerService = new BrokerService();
-
-        dataSource = new EmbeddedDataSource();
-        dataSource.setDatabaseName("target/derbyDb");
-        dataSource.setCreateDatabase("create");
-
-        JDBCPersistenceAdapter jdbcPersistenceAdapter = new JDBCPersistenceAdapter();
-        jdbcPersistenceAdapter.setDataSource(dataSource);
-        jdbcPersistenceAdapter.setUseLock(false);
-
-        if (!useDefaultStore) {
-            brokerService.setPersistenceAdapter(jdbcPersistenceAdapter);
-        } else {
-            KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter) brokerService.getPersistenceAdapter();
-            kahaDBPersistenceAdapter.setConcurrentStoreAndDispatchQueues(true);
-        }
+        TestSupport.setPersistenceAdapter(brokerService, persistenceAdapterChoice);
         brokerService.setDeleteAllMessagesOnStartup(true);
         brokerService.setUseJmx(false);
 
-
         PolicyMap policyMap = new PolicyMap();
         PolicyEntry defaultEntry = new PolicyEntry();
         defaultEntry.setUseConsumerPriority(false); // java.lang.IllegalArgumentException: Comparison method violates its general contract!
@@ -159,10 +146,6 @@ public class AMQ5266Test {
         if (brokerService != null) {
             brokerService.stop();
         }
-        try {
-            dataSource.setShutdownDatabase("shutdown");
-            dataSource.getConnection();
-        } catch (Exception ignored) {}
     }
 
     @Test
@@ -211,9 +194,6 @@ public class AMQ5266Test {
             try {
                 int secs = (int) (endWait - System.currentTimeMillis()) / 1000;
                 LOG.info("Waiting For Consumer Completion. Time left: " + secs + " secs");
-                if (!useDefaultStore) {
-                    DefaultJDBCAdapter.dumpTables(dataSource.getConnection());
-                }
                 Thread.sleep(10000);
             } catch (Exception e) {
             }
@@ -223,13 +203,6 @@ public class AMQ5266Test {
 
         consumer.shutdown();
 
-        TimeUnit.SECONDS.sleep(2);
-        LOG.info("DB Contents START");
-        if (!useDefaultStore) {
-            DefaultJDBCAdapter.dumpTables(dataSource.getConnection());
-        }
-        LOG.info("DB Contents END");
-
         LOG.info("Consumer Stats:");
 
         for (Map.Entry<String, List<String>> entry : consumer.getIDs().entrySet()) {