You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2014/10/23 17:30:11 UTC
git commit: https://issues.apache.org/jira/browse/AMQ-5266 tidy up
leveldb impl with additional scenario tests
Repository: activemq
Updated Branches:
refs/heads/trunk 642cc4321 -> 3042797b4
https://issues.apache.org/jira/browse/AMQ-5266 tidy up leveldb impl with additional scenario tests
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/3042797b
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/3042797b
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/3042797b
Branch: refs/heads/trunk
Commit: 3042797b41d04a682c7687c0589214f5a2f2e4ba
Parents: 642cc43
Author: gtully <ga...@gmail.com>
Authored: Thu Oct 23 16:28:33 2014 +0100
Committer: gtully <ga...@gmail.com>
Committed: Thu Oct 23 16:29:57 2014 +0100
----------------------------------------------------------------------
.../activemq/store/jdbc/JDBCMessageStore.java | 18 +++--
.../org/apache/activemq/leveldb/DBManager.scala | 16 +----
.../apache/activemq/leveldb/LevelDBClient.scala | 10 +--
.../apache/activemq/leveldb/LevelDBStore.scala | 59 ++++++++-------
.../activemq/bugs/AMQ5266SingleDestTest.java | 4 +-
.../bugs/AMQ5266StarvedConsumerTest.java | 43 +++--------
.../org/apache/activemq/bugs/AMQ5266Test.java | 75 +++++++-------------
7 files changed, 93 insertions(+), 132 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/3042797b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
----------------------------------------------------------------------
diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
index 43daff2..75a68c7 100755
--- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
+++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
@@ -21,7 +21,6 @@ import java.sql.SQLException;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.LinkedList;
-import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
@@ -73,7 +72,7 @@ public class JDBCMessageStore extends AbstractMessageStore {
protected AtomicLong lastRecoveredPriority = new AtomicLong(Byte.MAX_VALUE -1);
final Set<Long> recoveredAdditions = new LinkedHashSet<Long>();
protected ActiveMQMessageAudit audit;
- protected final List<Long> pendingAdditions = new LinkedList<Long>();
+ protected final LinkedList<Long> pendingAdditions = new LinkedList<Long>();
public JDBCMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQDestination destination, ActiveMQMessageAudit audit) throws IOException {
super(destination);
@@ -131,7 +130,8 @@ public class JDBCMessageStore extends AbstractMessageStore {
pendingAdditions.add(sequence);
c.onCompletion(new Runnable() {
public void run() {
- // message added to db
+ // jdbc close or jms commit - while futureOrSequenceLong==null ordered
+ // work will remain pending on the Queue
message.getMessageId().setFutureOrSequenceLong(sequence);
message.getMessageId().setEntryLocator(sequence);
}
@@ -341,6 +341,9 @@ public class JDBCMessageStore extends AbstractMessageStore {
}
}
}
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(this + " recoverNext lastRecovered:" + lastRecoveredSequenceId.get() + ", minPending:" + minPendingSequeunceId());
+ }
adapter.doRecoverNextMessages(c, destination, minPendingSequeunceId(), lastRecoveredSequenceId.get(), lastRecoveredPriority.get(),
maxReturned, isPrioritizedMessages(), new JDBCMessageRecoveryListener() {
@@ -376,7 +379,7 @@ public class JDBCMessageStore extends AbstractMessageStore {
*/
public void resetBatching() {
if (LOG.isTraceEnabled()) {
- LOG.trace(destination.getPhysicalName() + " resetBatching, existing last recovered seqId: " + lastRecoveredSequenceId.get());
+ LOG.trace(this + " resetBatching, existing last recovered seqId: " + lastRecoveredSequenceId.get());
}
lastRecoveredSequenceId.set(-1);
lastRecoveredPriority.set(Byte.MAX_VALUE - 1);
@@ -394,7 +397,7 @@ public class JDBCMessageStore extends AbstractMessageStore {
lastRecoveredPriority.set(Byte.MAX_VALUE -1);
}
if (LOG.isTraceEnabled()) {
- LOG.trace(destination.getPhysicalName() + " setBatch: new sequenceId: " + lastRecoveredSequenceId.get()
+ LOG.trace(this + " setBatch: new sequenceId: " + lastRecoveredSequenceId.get()
+ ", priority: " + lastRecoveredPriority.get());
}
}
@@ -403,4 +406,9 @@ public class JDBCMessageStore extends AbstractMessageStore {
public void setPrioritizedMessages(boolean prioritizedMessages) {
super.setPrioritizedMessages(prioritizedMessages);
}
+
+ @Override
+ public String toString() {
+ return destination.getPhysicalName() + ",pendingSize:" + pendingAdditions.size();
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/3042797b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
index 09c1378..b0051cc 100644
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
+++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
@@ -104,7 +104,6 @@ class CountDownFuture[T <: AnyRef]() extends ListenableFuture[T] {
var value:T = _
var error:Throwable = _
var listener:Runnable = _
- var id:MessageId = _
def cancel(mayInterruptIfRunning: Boolean) = false
def isCancelled = false
@@ -116,9 +115,6 @@ class CountDownFuture[T <: AnyRef]() extends ListenableFuture[T] {
def set(v:T) = {
value = v
- if (id != null) {
- id.setFutureOrSequenceLong(id.getEntryLocator.asInstanceOf[EntryLocator].seq)
- }
latch.countDown()
fireListener
}
@@ -330,14 +326,6 @@ class DelayableUOW(val manager:DBManager) extends BaseRetained {
val entry = QueueEntryRecord(id, queueKey, queueSeq)
assert(id.getEntryLocator == null)
id.setEntryLocator(EntryLocator(queueKey, queueSeq))
- if (message.getTransactionId!=null) {
- // why does future not get set in tx?
- id.setFutureOrSequenceLong(queueSeq)
- } else {
- id.setFutureOrSequenceLong(countDownFuture)
- message.setRecievedByDFBridge(true)
- countDownFuture.id = id
- }
val a = this.synchronized {
if( !delay )
@@ -741,10 +729,10 @@ class DBManager(val parent:LevelDBStore) {
client.collectionIsEmpty(key)
}
- def cursorMessages(preparedAcks:java.util.HashSet[MessageId], key:Long, listener:MessageRecoveryListener, startPos:Long, max:Long=Long.MaxValue) = {
+ def cursorMessages(preparedAcks:java.util.HashSet[MessageId], key:Long, listener:MessageRecoveryListener, startPos:Long, endPos:Long=Long.MaxValue, max:Long=Long.MaxValue) = {
var lastmsgid:MessageId = null
var count = 0L
- client.queueCursor(key, startPos) { msg =>
+ client.queueCursor(key, startPos, endPos) { msg =>
if( !preparedAcks.contains(msg.getMessageId) && listener.recoverMessage(msg) ) {
lastmsgid = msg.getMessageId
count += 1
http://git-wip-us.apache.org/repos/asf/activemq/blob/3042797b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
index 3d6178b..64bbcee 100755
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
+++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
@@ -1255,8 +1255,8 @@ class LevelDBClient(store: LevelDBStore) {
return rc
}
- def queueCursor(collectionKey: Long, seq:Long)(func: (Message)=>Boolean) = {
- collectionCursor(collectionKey, encodeLong(seq)) { (key, value) =>
+ def queueCursor(collectionKey: Long, seq:Long, endSeq:Long)(func: (Message)=>Boolean) = {
+ collectionCursor(collectionKey, encodeLong(seq), encodeLong(endSeq)) { (key, value) =>
val seq = decodeLong(key)
var locator = DataLocator(store, value.getValueLocation, value.getValueLength)
val msg = getMessage(locator)
@@ -1273,7 +1273,7 @@ class LevelDBClient(store: LevelDBStore) {
}
def transactionCursor(collectionKey: Long)(func: (AnyRef)=>Boolean) = {
- collectionCursor(collectionKey, encodeLong(0)) { (key, value) =>
+ collectionCursor(collectionKey, encodeLong(0), encodeLong(Long.MaxValue)) { (key, value) =>
val seq = decodeLong(key)
if( value.getMeta != null ) {
@@ -1336,12 +1336,12 @@ class LevelDBClient(store: LevelDBStore) {
store.wireFormat.unmarshal(new ByteSequence(data.data, data.offset, data.length)).asInstanceOf[Message]
}
- def collectionCursor(collectionKey: Long, cursorPosition:Buffer)(func: (Buffer, EntryRecord.Buffer)=>Boolean) = {
+ def collectionCursor(collectionKey: Long, cursorPosition:Buffer, endCursorPosition:Buffer)(func: (Buffer, EntryRecord.Buffer)=>Boolean) = {
val ro = new ReadOptions
ro.fillCache(true)
ro.verifyChecksums(verifyChecksums)
val start = encodeEntryKey(ENTRY_PREFIX, collectionKey, cursorPosition)
- val end = encodeLongKey(ENTRY_PREFIX, collectionKey+1)
+ val end = encodeEntryKey(ENTRY_PREFIX, collectionKey, endCursorPosition)
might_fail_using_index {
index.cursorRange(start, end, ro) { case (key, value) =>
func(key.buffer.moveHead(9), EntryRecord.FACTORY.parseUnframed(value))
http://git-wip-us.apache.org/repos/asf/activemq/blob/3042797b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
index 451bc04..52a785a 100644
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
+++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
@@ -35,7 +35,8 @@ import org.apache.activemq.leveldb.util.Log
import org.apache.activemq.store.PList.PListIterator
import org.fusesource.hawtbuf.{UTF8Buffer, DataByteArrayOutputStream}
import org.fusesource.hawtdispatch;
-import org.apache.activemq.broker.scheduler.JobSchedulerStore;
+import org.apache.activemq.broker.scheduler.JobSchedulerStore
+import org.apache.activemq.store.IndexListener.MessageContext
object LevelDBStore extends Log {
val DEFAULT_DIRECTORY = new File("LevelDB");
@@ -245,7 +246,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
val (msgs, acks) = db.getXAActions(transaction.xacontainer_id)
transaction.xarecovery = (msgs, acks.map(_.ack))
for ( msg <- msgs ) {
- transaction.add(createMessageStore(msg.getDestination), new IndexListener.MessageContext(null, msg, null), false);
+ transaction.add(createMessageStore(msg.getDestination), null, msg, false);
}
for ( record <- acks ) {
var ack = record.ack
@@ -348,27 +349,27 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
}
}
- def add(store:LevelDBStore#LevelDBMessageStore, messageContext:IndexListener.MessageContext, delay:Boolean) = {
+ def add(store:LevelDBStore#LevelDBMessageStore, context: ConnectionContext, message: Message, delay:Boolean) = {
commitActions += new TransactionAction() {
def commit(uow:DelayableUOW) = {
if( prepared ) {
- uow.dequeue(xacontainer_id, messageContext.message.getMessageId)
+ uow.dequeue(xacontainer_id, message.getMessageId)
}
- var copy = messageContext.message.getMessageId.copy()
+ var copy = message.getMessageId.copy()
copy.setEntryLocator(null)
- messageContext.message.setMessageId(copy)
- store.doAdd(uow, messageContext, delay)
+ message.setMessageId(copy)
+ store.doAdd(uow, context, message, delay)
}
def prepare(uow:DelayableUOW) = {
// add it to the xa container instead of the actual store container.
- uow.enqueue(xacontainer_id, xaseqcounter.incrementAndGet, messageContext.message, delay)
- xarecovery._1 += messageContext.message
+ uow.enqueue(xacontainer_id, xaseqcounter.incrementAndGet, message, delay)
+ xarecovery._1 += message
}
def rollback(uow:DelayableUOW) = {
if( prepared ) {
- uow.dequeue(xacontainer_id, messageContext.message.getMessageId)
+ uow.dequeue(xacontainer_id, message.getMessageId)
}
}
@@ -671,23 +672,29 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
val lastSeq: AtomicLong = new AtomicLong(0)
protected var cursorPosition: Long = 0
val preparedAcks = new HashSet[MessageId]()
-
+ val pendingCursorAdds = new LinkedList[Long]()
lastSeq.set(db.getLastQueueEntrySeq(key))
def cursorResetPosition = 0L
- def doAdd(uow: DelayableUOW, messageContext:IndexListener.MessageContext, delay:Boolean): CountDownFuture[AnyRef] = {
+ def doAdd(uow: DelayableUOW, context: ConnectionContext, message: Message, delay:Boolean): CountDownFuture[AnyRef] = {
check_running
- val seq = lastSeq.incrementAndGet()
- messageContext.message.incrementReferenceCount()
+ message.incrementReferenceCount()
uow.addCompleteListener({
- messageContext.message.decrementReferenceCount()
+ message.decrementReferenceCount()
})
- val future = uow.enqueue(key, seq, messageContext.message, delay)
- if (indexListener != null) {
- indexListener.onAdd(messageContext)
+ val sequence = lastSeq.synchronized {
+ val seq = lastSeq.incrementAndGet()
+ message.getMessageId.setFutureOrSequenceLong(seq);
+ if (indexListener != null) {
+ pendingCursorAdds.synchronized { pendingCursorAdds.add(seq) }
+ indexListener.onAdd(new MessageContext(context, message, new Runnable {
+ def run(): Unit = pendingCursorAdds.synchronized { pendingCursorAdds.remove(seq) }
+ }))
+ }
+ seq
}
- future
+ uow.enqueue(key, sequence, message, delay)
}
override def asyncAddQueueMessage(context: ConnectionContext, message: Message) = asyncAddQueueMessage(context, message, false)
@@ -695,11 +702,11 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
check_running
message.getMessageId.setEntryLocator(null)
if( message.getTransactionId!=null ) {
- transaction(message.getTransactionId).add(this, new IndexListener.MessageContext(context, message, null), delay)
+ transaction(message.getTransactionId).add(this, context, message, delay)
DONE
} else {
withUow { uow=>
- doAdd(uow, new IndexListener.MessageContext(context, message, null), delay)
+ doAdd(uow, context, message, delay)
}
}
}
@@ -759,9 +766,13 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
return db.collectionIsEmpty(key)
}
+ def getCursorPendingLimit: Long = {
+ pendingCursorAdds.synchronized { Option(pendingCursorAdds.peek).getOrElse(Long.MaxValue) }
+ }
+
def recover(listener: MessageRecoveryListener): Unit = {
check_running
- cursorPosition = db.cursorMessages(preparedAcks, key, listener, cursorResetPosition)
+ cursorPosition = db.cursorMessages(preparedAcks, key, listener, cursorResetPosition, getCursorPendingLimit)
}
def resetBatching: Unit = {
@@ -770,11 +781,11 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
def recoverNextMessages(maxReturned: Int, listener: MessageRecoveryListener): Unit = {
check_running
- cursorPosition = db.cursorMessages(preparedAcks, key, listener, cursorPosition, maxReturned)
+ cursorPosition = db.cursorMessages(preparedAcks, key, listener, cursorPosition, getCursorPendingLimit, maxReturned)
}
override def setBatch(id: MessageId): Unit = {
- cursorPosition = db.queuePosition(id) + 1
+ cursorPosition = Math.min(getCursorPendingLimit, db.queuePosition(id)) + 1
}
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/3042797b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266SingleDestTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266SingleDestTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266SingleDestTest.java
index 131f807..cfd6534 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266SingleDestTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266SingleDestTest.java
@@ -89,7 +89,9 @@ public class AMQ5266SingleDestTest {
@Parameterized.Parameters(name="#{0},producerThreads:{1},consumerThreads:{2},mL:{3},useCache:{4},useDefaultStore:{5},optimizedDispatch:{6}")
public static Iterable<Object[]> parameters() {
return Arrays.asList(new Object[][]{
- {1000, 80, 80, 1024*1024*1, true, TestSupport.PersistenceAdapterChoice.KahaDB, false},
+ {1000, 40, 40, 1024*1024*1, true, TestSupport.PersistenceAdapterChoice.KahaDB, false},
+ {1000, 40, 40, 1024*1024*1, true, TestSupport.PersistenceAdapterChoice.LevelDB, false},
+ {1000, 40, 40, 1024*1024*1, true, TestSupport.PersistenceAdapterChoice.JDBC, false},
});
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/3042797b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266StarvedConsumerTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266StarvedConsumerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266StarvedConsumerTest.java
index 300bec1..f7409dd 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266StarvedConsumerTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266StarvedConsumerTest.java
@@ -37,6 +37,7 @@ import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
+import org.apache.activemq.TestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.RegionBroker;
@@ -66,7 +67,6 @@ public class AMQ5266StarvedConsumerTest {
static Logger LOG = LoggerFactory.getLogger(AMQ5266StarvedConsumerTest.class);
String activemqURL;
BrokerService brokerService;
- private EmbeddedDataSource dataSource;
public int messageSize = 1000;
@@ -86,16 +86,22 @@ public class AMQ5266StarvedConsumerTest {
public boolean useCache = true;
@Parameterized.Parameter(5)
- public boolean useDefaultStore = false;
+ public TestSupport.PersistenceAdapterChoice persistenceAdapterChoice = TestSupport.PersistenceAdapterChoice.KahaDB;
@Parameterized.Parameter(6)
public boolean optimizeDispatch = false;
private AtomicBoolean didNotReceive = new AtomicBoolean(false);
- @Parameterized.Parameters(name="#{0},producerThreads:{1},consumerThreads:{2},mL:{3},useCache:{4},useDefaultStore:{5},optimizedDispatch:{6}")
+ @Parameterized.Parameters(name="#{0},producerThreads:{1},consumerThreads:{2},mL:{3},useCache:{4},store:{5},optimizedDispatch:{6}")
public static Iterable<Object[]> parameters() {
return Arrays.asList(new Object[][]{
- {1000, 40, 5, 1024*1024, false, false, true},
+ {1000, 40, 5, 1024*1024, false, TestSupport.PersistenceAdapterChoice.KahaDB, true},
+ {1000, 40, 5, 1024*1024, false, TestSupport.PersistenceAdapterChoice.LevelDB, true},
+ {1000, 40, 5, 1024*1024, false, TestSupport.PersistenceAdapterChoice.JDBC, true},
+
+ {500, 20, 20, 1024*1024, false, TestSupport.PersistenceAdapterChoice.KahaDB, true},
+ {500, 20, 20, 1024*1024, false, TestSupport.PersistenceAdapterChoice.LevelDB, true},
+ {500, 20, 20, 1024*1024, false, TestSupport.PersistenceAdapterChoice.JDBC, true},
});
}
@@ -104,21 +110,7 @@ public class AMQ5266StarvedConsumerTest {
@Before
public void startBroker() throws Exception {
brokerService = new BrokerService();
-
- dataSource = new EmbeddedDataSource();
- dataSource.setDatabaseName("target/derbyDb");
- dataSource.setCreateDatabase("create");
-
- JDBCPersistenceAdapter jdbcPersistenceAdapter = new JDBCPersistenceAdapter();
- jdbcPersistenceAdapter.setDataSource(dataSource);
- jdbcPersistenceAdapter.setUseLock(false);
-
- if (!useDefaultStore) {
- brokerService.setPersistenceAdapter(jdbcPersistenceAdapter);
- } else {
- KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter) brokerService.getPersistenceAdapter();
- kahaDBPersistenceAdapter.setConcurrentStoreAndDispatchQueues(true);
- }
+ TestSupport.setPersistenceAdapter(brokerService, persistenceAdapterChoice);
brokerService.setDeleteAllMessagesOnStartup(true);
brokerService.setUseJmx(false);
brokerService.setAdvisorySupport(false);
@@ -149,10 +141,6 @@ public class AMQ5266StarvedConsumerTest {
if (brokerService != null) {
brokerService.stop();
}
- try {
- dataSource.setShutdownDatabase("shutdown");
- dataSource.getConnection();
- } catch (Exception ignored) {}
}
CyclicBarrier globalProducerHalt = new CyclicBarrier(publisherThreadCount, new Runnable() {
@@ -216,9 +204,6 @@ public class AMQ5266StarvedConsumerTest {
try {
int secs = (int) (endWait - System.currentTimeMillis()) / 1000;
LOG.info("Waiting For Consumer Completion. Time left: " + secs + " secs");
- if (!useDefaultStore) {
- DefaultJDBCAdapter.dumpTables(dataSource.getConnection());
- }
Thread.sleep(10000);
} catch (Exception e) {
}
@@ -228,12 +213,6 @@ public class AMQ5266StarvedConsumerTest {
consumer.shutdown();
- TimeUnit.SECONDS.sleep(2);
- LOG.info("DB Contents START");
- if (!useDefaultStore) {
- DefaultJDBCAdapter.dumpTables(dataSource.getConnection());
- }
- LOG.info("DB Contents END");
LOG.info("Consumer Stats:");
http://git-wip-us.apache.org/repos/asf/activemq/blob/3042797b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266Test.java
index efccefa..e180746 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266Test.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266Test.java
@@ -35,15 +35,12 @@ import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
+import org.apache.activemq.TestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
-import org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter;
-import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
-import org.apache.derby.jdbc.EmbeddedDataSource;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -65,7 +62,6 @@ public class AMQ5266Test {
static Logger LOG = LoggerFactory.getLogger(AMQ5266Test.class);
String activemqURL = "tcp://localhost:61617";
BrokerService brokerService;
- private EmbeddedDataSource dataSource;
public int messageSize = 1000;
@@ -85,28 +81,34 @@ public class AMQ5266Test {
public boolean useCache = true;
@Parameterized.Parameter(5)
- public boolean useDefaultStore = false;
+ public TestSupport.PersistenceAdapterChoice persistenceAdapterChoice = TestSupport.PersistenceAdapterChoice.KahaDB;
@Parameterized.Parameter(6)
public boolean optimizeDispatch = false;
- @Parameterized.Parameters(name="#{0},producerThreads:{1},consumerThreads:{2},mL:{3},useCache:{4},useDefaultStore:{5},optimizedDispatch:{6}")
+ @Parameterized.Parameters(name="#{0},producerThreads:{1},consumerThreads:{2},mL:{3},useCache:{4},store:{5},optimizedDispatch:{6}")
public static Iterable<Object[]> parameters() {
return Arrays.asList(new Object[][]{
- // jdbc
- {1, 1, 1, 50*1024, false, false, true},
- {1000, 20, 5, 50*1024, true, false, false},
- {100, 20, 5, 50*1024, false, false, false},
- {1000, 5, 20, 50*1024, true, false, false},
- {1000, 20, 20, 1024*1024, true, false, false},
-
- // default store
- {1, 1, 1, 50*1024, false, true, true},
- {100, 5, 5, 50*1024, false, true, false},
- {1000, 20, 5, 50*1024, true, true, false},
- {100, 20, 5, 50*1024, false, true, false},
- {1000, 5, 20, 50*1024, true, true, false},
- {1000, 20, 20, 1024*1024, true, true, false},
+ {1, 1, 1, 50*1024, false, TestSupport.PersistenceAdapterChoice.JDBC, true},
+ {1000, 20, 5, 50*1024, true, TestSupport.PersistenceAdapterChoice.JDBC, false},
+ {100, 20, 5, 50*1024, false, TestSupport.PersistenceAdapterChoice.JDBC, false},
+ {1000, 5, 20, 50*1024, true, TestSupport.PersistenceAdapterChoice.JDBC, false},
+ {1000, 20, 20, 1024*1024, true, TestSupport.PersistenceAdapterChoice.JDBC, false},
+
+ {1, 1, 1, 50*1024, false, TestSupport.PersistenceAdapterChoice.KahaDB, true},
+ {100, 5, 5, 50*1024, false, TestSupport.PersistenceAdapterChoice.KahaDB, false},
+ {1000, 20, 5, 50*1024, true, TestSupport.PersistenceAdapterChoice.KahaDB, false},
+ {100, 20, 5, 50*1024, false, TestSupport.PersistenceAdapterChoice.KahaDB, false},
+ {1000, 5, 20, 50*1024, true, TestSupport.PersistenceAdapterChoice.KahaDB, false},
+ {1000, 20, 20, 1024*1024, true, TestSupport.PersistenceAdapterChoice.KahaDB, false},
+
+ {1, 1, 1, 50*1024, false, TestSupport.PersistenceAdapterChoice.LevelDB, true},
+ {100, 5, 5, 50*1024, false, TestSupport.PersistenceAdapterChoice.LevelDB, false},
+ {1000, 20, 5, 50*1024, true, TestSupport.PersistenceAdapterChoice.LevelDB, false},
+ {100, 20, 5, 50*1024, false, TestSupport.PersistenceAdapterChoice.LevelDB, false},
+ {1000, 5, 20, 50*1024, true, TestSupport.PersistenceAdapterChoice.LevelDB, false},
+ {1000, 20, 20, 1024*1024, true, TestSupport.PersistenceAdapterChoice.LevelDB, false},
+
});
}
@@ -115,25 +117,10 @@ public class AMQ5266Test {
@Before
public void startBroker() throws Exception {
brokerService = new BrokerService();
-
- dataSource = new EmbeddedDataSource();
- dataSource.setDatabaseName("target/derbyDb");
- dataSource.setCreateDatabase("create");
-
- JDBCPersistenceAdapter jdbcPersistenceAdapter = new JDBCPersistenceAdapter();
- jdbcPersistenceAdapter.setDataSource(dataSource);
- jdbcPersistenceAdapter.setUseLock(false);
-
- if (!useDefaultStore) {
- brokerService.setPersistenceAdapter(jdbcPersistenceAdapter);
- } else {
- KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter) brokerService.getPersistenceAdapter();
- kahaDBPersistenceAdapter.setConcurrentStoreAndDispatchQueues(true);
- }
+ TestSupport.setPersistenceAdapter(brokerService, persistenceAdapterChoice);
brokerService.setDeleteAllMessagesOnStartup(true);
brokerService.setUseJmx(false);
-
PolicyMap policyMap = new PolicyMap();
PolicyEntry defaultEntry = new PolicyEntry();
defaultEntry.setUseConsumerPriority(false); // java.lang.IllegalArgumentException: Comparison method violates its general contract!
@@ -159,10 +146,6 @@ public class AMQ5266Test {
if (brokerService != null) {
brokerService.stop();
}
- try {
- dataSource.setShutdownDatabase("shutdown");
- dataSource.getConnection();
- } catch (Exception ignored) {}
}
@Test
@@ -211,9 +194,6 @@ public class AMQ5266Test {
try {
int secs = (int) (endWait - System.currentTimeMillis()) / 1000;
LOG.info("Waiting For Consumer Completion. Time left: " + secs + " secs");
- if (!useDefaultStore) {
- DefaultJDBCAdapter.dumpTables(dataSource.getConnection());
- }
Thread.sleep(10000);
} catch (Exception e) {
}
@@ -223,13 +203,6 @@ public class AMQ5266Test {
consumer.shutdown();
- TimeUnit.SECONDS.sleep(2);
- LOG.info("DB Contents START");
- if (!useDefaultStore) {
- DefaultJDBCAdapter.dumpTables(dataSource.getConnection());
- }
- LOG.info("DB Contents END");
-
LOG.info("Consumer Stats:");
for (Map.Entry<String, List<String>> entry : consumer.getIDs().entrySet()) {