You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2010/11/04 14:13:38 UTC
svn commit: r1030928 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/store/jdbc/
main/java/org/apache/activemq/store/jdbc/adapter/
test/java/org/apache/activemq/store/
test/java/org/apache/activemq/store/jdbc/ test/java/org/apache...
Author: gtully
Date: Thu Nov 4 13:13:37 2010
New Revision: 1030928
URL: http://svn.apache.org/viewvc?rev=1030928&view=rev
Log:
further resolution to https://issues.apache.org/activemq/browse/AMQ-2980, concurrent producers was still problematic as the store needed to be traversed multiple times, requiring ack locations per priority. Resolution to https://issues.apache.org/activemq/browse/AMQ-2551 fell out of decreasing the cleanup interval in some of the tests, cleanup needs an exclusive lock so it won't cause contention
Removed:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/AMQStoreDurableSubscriptionSelectorTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/KahaDBDurableSubscriptionSelectorTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionReactivationTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionSelectorTest.java
activemq/trunk/activemq-core/src/test/resources/log4j.properties
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java?rev=1030928&r1=1030927&r2=1030928&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java Thu Nov 4 13:13:37 2010
@@ -670,7 +670,7 @@ public class JDBCPersistenceAdapter exte
/**
* set the Transaction isolation level to something other that TRANSACTION_READ_UNCOMMITTED
* This allowable dirty isolation level may not be achievable in clustered DB environments
- * so a more restrictive and expensive option may be needed like TRANSACTION_REPEATABE_READ
+ * so a more restrictive and expensive option may be needed like TRANSACTION_REPEATABLE_READ
* see isolation level constants in {@link java.sql.Connection}
* @param transactionIsolation the isolation level to use
*/
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java?rev=1030928&r1=1030927&r2=1030928&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java Thu Nov 4 13:13:37 2010
@@ -70,6 +70,10 @@ public class Statements {
private boolean useLockCreateWhereClause;
private String findAllMessageIdsStatement;
private String lastProducerSequenceIdStatement;
+ private String selectDurablePriorityAckStatement;
+
+ private String insertDurablePriorityAckStatement;
+ private String updateDurableLastAckStatement;
public String[] getCreateSchemaStatements() {
if (createSchemaStatements == null) {
@@ -93,7 +97,9 @@ public class Statements {
"INSERT INTO " + getFullLockTableName() + "(ID) VALUES (1)",
"ALTER TABLE " + getFullMessageTableName() + " ADD PRIORITY " + sequenceDataType,
"CREATE INDEX " + getFullMessageTableName() + "_PIDX ON " + getFullMessageTableName() + " (PRIORITY)",
- "ALTER TABLE " + getFullAckTableName() + " ADD PRIORITY " + sequenceDataType,
+ "ALTER TABLE " + getFullAckTableName() + " ADD PRIORITY " + sequenceDataType + " NOT NULL DEFAULT 5",
+ "ALTER TABLE " + getFullAckTableName() + " DROP PRIMARY KEY",
+ "ALTER TABLE " + getFullAckTableName() + " ADD PRIMARY KEY (CONTAINER, CLIENT_ID, SUB_NAME, PRIORITY)",
};
}
return createSchemaStatements;
@@ -207,7 +213,7 @@ public class Statements {
public String getFindDurableSubStatement() {
if (findDurableSubStatement == null) {
findDurableSubStatement = "SELECT SELECTOR, SUB_DEST " + "FROM " + getFullAckTableName()
- + " WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=?";
+ + " WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=? AND SUB_DEST IS NOT NULL";
}
return findDurableSubStatement;
}
@@ -215,15 +221,15 @@ public class Statements {
public String getFindAllDurableSubsStatement() {
if (findAllDurableSubsStatement == null) {
findAllDurableSubsStatement = "SELECT SELECTOR, SUB_NAME, CLIENT_ID, SUB_DEST" + " FROM "
- + getFullAckTableName() + " WHERE CONTAINER=?";
+ + getFullAckTableName() + " WHERE CONTAINER=? AND SUB_DEST IS NOT NULL";
}
return findAllDurableSubsStatement;
}
public String getUpdateLastAckOfDurableSubStatement() {
if (updateLastAckOfDurableSubStatement == null) {
- updateLastAckOfDurableSubStatement = "UPDATE " + getFullAckTableName() + " SET LAST_ACKED_ID=?, PRIORITY=?"
- + " WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=?";
+ updateLastAckOfDurableSubStatement = "UPDATE " + getFullAckTableName() + " SET LAST_ACKED_ID=?"
+ + " WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=? AND PRIORITY=?";
}
return updateLastAckOfDurableSubStatement;
}
@@ -264,7 +270,19 @@ public class Statements {
+ getFullAckTableName() + " D "
+ " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
+ " AND M.CONTAINER=D.CONTAINER AND "
- + "((M.ID > ? AND M.PRIORITY = ?) OR M.PRIORITY < ?)"
+ + "((M.ID > ? AND M.PRIORITY = ?) "
+ + " OR (M.PRIORITY <> ? "
+ + " AND ( M.ID >"
+ + " ( SELECT LAST_ACKED_ID FROM " + getFullAckTableName()
+ + " WHERE CONTAINER=D.CONTAINER AND CLIENT_ID=D.CLIENT_ID"
+ + " AND SUB_NAME=D.SUB_NAME AND PRIORITY=M.PRIORITY )"
+ + " OR "
+ + " ( (SELECT COUNT(LAST_ACKED_ID) FROM " + getFullAckTableName()
+ + " WHERE CONTAINER=D.CONTAINER AND CLIENT_ID=D.CLIENT_ID"
+ + " AND SUB_NAME=D.SUB_NAME AND PRIORITY=M.PRIORITY) = 0)"
+ + " )"
+ + " )"
+ + ")"
+ " ORDER BY M.PRIORITY DESC, M.ID";
}
return findDurableSubMessagesByPriorityStatement;
@@ -306,8 +324,17 @@ public class Statements {
+ " M, "
+ getFullAckTableName()
+ " D "
- + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
- + " AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID AND M.PRIORITY <= D.PRIORITY";
+ + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=? AND D.SUB_DEST IS NOT NULL"
+ + " AND M.CONTAINER=D.CONTAINER "
+ + " AND ( M.ID >"
+ + " ( SELECT LAST_ACKED_ID FROM " + getFullAckTableName()
+ + " WHERE CONTAINER=D.CONTAINER AND CLIENT_ID=D.CLIENT_ID"
+ + " AND SUB_NAME=D.SUB_NAME AND PRIORITY=M.PRIORITY )"
+ + " OR "
+ + " ( (SELECT COUNT(LAST_ACKED_ID) FROM " + getFullAckTableName()
+ + " WHERE CONTAINER=D.CONTAINER AND CLIENT_ID=D.CLIENT_ID"
+ + " AND SUB_NAME=D.SUB_NAME AND PRIORITY=M.PRIORITY) = 0)"
+ + " )";
}
return durableSubscriberMessageCountStatement;
}
@@ -338,15 +365,20 @@ public class Statements {
deleteOldMessagesStatement = "DELETE FROM " + getFullMessageTableName()
+ " WHERE ( EXPIRATION<>0 AND EXPIRATION<?)"
+ " OR (ID < "
- + " ( SELECT min(" + getFullAckTableName() + ".LAST_ACKED_ID)"
- + " FROM " + getFullAckTableName() + " WHERE "
+ + " ( SELECT min(" + getFullAckTableName() + ".LAST_ACKED_ID)"
+ + " FROM " + getFullAckTableName() + " WHERE "
+ getFullAckTableName() + ".CONTAINER="
- + getFullMessageTableName() + ".CONTAINER )"
- + " AND PRIORITY >= "
- + " ( SELECT min(" + getFullAckTableName() + ".PRIORITY) "
- + " FROM " + getFullAckTableName() + " WHERE "
+ + getFullMessageTableName() + ".CONTAINER"
+ + " AND " + getFullAckTableName() + ".SUB_DEST IS NULL"
+ + " AND " + getFullAckTableName() + ".PRIORITY=" + getFullMessageTableName() + ".PRIORITY )"
+ + " AND ID <"
+ + " ( SELECT min(" + getFullAckTableName() + ".LAST_ACKED_ID)"
+ + " FROM " + getFullAckTableName() + " WHERE "
+ getFullAckTableName() + ".CONTAINER="
- + getFullMessageTableName() + ".CONTAINER ))";
+ + getFullMessageTableName() + ".CONTAINER"
+ + " AND " + getFullAckTableName() + ".SUB_DEST IS NOT NULL )"
+ + " )";
+
}
return deleteOldMessagesStatement;
}
@@ -418,6 +450,35 @@ public class Statements {
return lastAckedDurableSubscriberMessageStatement;
}
+ public String getSelectDurablePriorityAckStatement() {
+ if (selectDurablePriorityAckStatement == null) {
+ selectDurablePriorityAckStatement = "SELECT LAST_ACKED_ID FROM " + getFullAckTableName()
+ + " WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=?"
+ + " AND PRIORITY = ?";
+ }
+ return selectDurablePriorityAckStatement;
+ }
+
+ public String getInsertDurablePriorityAckStatement() {
+ if (insertDurablePriorityAckStatement == null) {
+ insertDurablePriorityAckStatement = "INSERT INTO "
+ + getFullAckTableName()
+ + "(CONTAINER, CLIENT_ID, SUB_NAME, PRIORITY)"
+ + " VALUES (?, ?, ?, ?)";
+ }
+ return insertDurablePriorityAckStatement;
+ }
+
+
+ public String getUpdateDurableLastAckStatement() {
+ if (updateDurableLastAckStatement == null) {
+ updateDurableLastAckStatement = "UPDATE " + getFullAckTableName()
+ + " SET LAST_ACKED_ID = ? WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=?"
+ + " AND PRIORITY = " + (Byte.MAX_VALUE - 1);
+ }
+ return updateDurableLastAckStatement;
+ }
+
public String getFullMessageTableName() {
return getTablePrefix() + getMessageTableName();
}
@@ -709,4 +770,15 @@ public class Statements {
this.lastProducerSequenceIdStatement = lastProducerSequenceIdStatement;
}
+ public void setSelectDurablePriorityAckStatement(String selectDurablePriorityAckStatement) {
+ this.selectDurablePriorityAckStatement = selectDurablePriorityAckStatement;
+ }
+
+ public void setInsertDurablePriorityAckStatement(String insertDurablePriorityAckStatement) {
+ this.insertDurablePriorityAckStatement = insertDurablePriorityAckStatement;
+ }
+
+ public void setUpdateDurableLastAckStatement(String updateDurableLastAckStatement) {
+ this.updateDurableLastAckStatement = updateDurableLastAckStatement;
+ }
}
\ No newline at end of file
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java?rev=1030928&r1=1030927&r2=1030928&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java Thu Nov 4 13:13:37 2010
@@ -28,6 +28,8 @@ import java.util.ArrayList;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Set;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.jms.Message;
@@ -62,6 +64,7 @@ public class DefaultJDBCAdapter implemen
protected Statements statements;
protected boolean batchStatments = true;
protected boolean prioritizedMessages;
+ protected ReadWriteLock cleanupExclusiveLock = new ReentrantReadWriteLock();
protected void setBinaryData(PreparedStatement s, int index, byte data[]) throws SQLException {
s.setBytes(index, data);
@@ -73,6 +76,7 @@ public class DefaultJDBCAdapter implemen
public void doCreateTables(TransactionContext c) throws SQLException, IOException {
Statement s = null;
+ cleanupExclusiveLock.writeLock().lock();
try {
// Check to see if the table already exists. If it does, then don't
// log warnings during startup.
@@ -112,6 +116,7 @@ public class DefaultJDBCAdapter implemen
}
c.getConnection().commit();
} finally {
+ cleanupExclusiveLock.writeLock().unlock();
try {
s.close();
} catch (Throwable e) {
@@ -121,6 +126,7 @@ public class DefaultJDBCAdapter implemen
public void doDropTables(TransactionContext c) throws SQLException, IOException {
Statement s = null;
+ cleanupExclusiveLock.writeLock().lock();
try {
s = c.getConnection().createStatement();
String[] dropStatments = this.statements.getDropSchemaStatements();
@@ -139,6 +145,7 @@ public class DefaultJDBCAdapter implemen
}
c.getConnection().commit();
} finally {
+ cleanupExclusiveLock.writeLock().unlock();
try {
s.close();
} catch (Throwable e) {
@@ -149,6 +156,7 @@ public class DefaultJDBCAdapter implemen
public long doGetLastMessageStoreSequenceId(TransactionContext c) throws SQLException, IOException {
PreparedStatement s = null;
ResultSet rs = null;
+ cleanupExclusiveLock.readLock().lock();
try {
s = c.getConnection().prepareStatement(this.statements.getFindLastSequenceIdInMsgsStatement());
rs = s.executeQuery();
@@ -171,6 +179,7 @@ public class DefaultJDBCAdapter implemen
long seq = Math.max(seq1, seq2);
return seq;
} finally {
+ cleanupExclusiveLock.readLock().unlock();
close(rs);
close(s);
}
@@ -179,6 +188,7 @@ public class DefaultJDBCAdapter implemen
public byte[] doGetMessageById(TransactionContext c, long storeSequenceId) throws SQLException, IOException {
PreparedStatement s = null;
ResultSet rs = null;
+ cleanupExclusiveLock.readLock().lock();
try {
s = c.getConnection().prepareStatement(
this.statements.getFindMessageByIdStatement());
@@ -189,6 +199,7 @@ public class DefaultJDBCAdapter implemen
}
return getBinaryData(rs, 1);
} finally {
+ cleanupExclusiveLock.readLock().unlock();
close(rs);
close(s);
}
@@ -198,6 +209,7 @@ public class DefaultJDBCAdapter implemen
public void doAddMessage(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination, byte[] data,
long expiration, byte priority) throws SQLException, IOException {
PreparedStatement s = c.getAddMessageStatement();
+ cleanupExclusiveLock.readLock().lock();
try {
if (s == null) {
s = c.getConnection().prepareStatement(this.statements.getAddMessageStatement());
@@ -218,6 +230,7 @@ public class DefaultJDBCAdapter implemen
throw new SQLException("Failed add a message");
}
} finally {
+ cleanupExclusiveLock.readLock().unlock();
if (!this.batchStatments) {
if (s != null) {
s.close();
@@ -229,6 +242,7 @@ public class DefaultJDBCAdapter implemen
public void doAddMessageReference(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination,
long expirationTime, String messageRef) throws SQLException, IOException {
PreparedStatement s = c.getAddMessageStatement();
+ cleanupExclusiveLock.readLock().lock();
try {
if (s == null) {
s = c.getConnection().prepareStatement(this.statements.getAddMessageStatement());
@@ -248,6 +262,7 @@ public class DefaultJDBCAdapter implemen
throw new SQLException("Failed add a message");
}
} finally {
+ cleanupExclusiveLock.readLock().unlock();
if (!this.batchStatments) {
s.close();
}
@@ -257,6 +272,7 @@ public class DefaultJDBCAdapter implemen
public long[] getStoreSequenceId(TransactionContext c, ActiveMQDestination destination, MessageId messageID) throws SQLException, IOException {
PreparedStatement s = null;
ResultSet rs = null;
+ cleanupExclusiveLock.readLock().lock();
try {
s = c.getConnection().prepareStatement(this.statements.getFindMessageSequenceIdStatement());
s.setString(1, messageID.getProducerId().toString());
@@ -268,6 +284,7 @@ public class DefaultJDBCAdapter implemen
}
return new long[]{rs.getLong(1), rs.getLong(2)};
} finally {
+ cleanupExclusiveLock.readLock().unlock();
close(rs);
close(s);
}
@@ -276,6 +293,7 @@ public class DefaultJDBCAdapter implemen
public byte[] doGetMessage(TransactionContext c, MessageId id) throws SQLException, IOException {
PreparedStatement s = null;
ResultSet rs = null;
+ cleanupExclusiveLock.readLock().lock();
try {
s = c.getConnection().prepareStatement(this.statements.getFindMessageStatement());
s.setString(1, id.getProducerId().toString());
@@ -286,6 +304,7 @@ public class DefaultJDBCAdapter implemen
}
return getBinaryData(rs, 1);
} finally {
+ cleanupExclusiveLock.readLock().unlock();
close(rs);
close(s);
}
@@ -294,6 +313,7 @@ public class DefaultJDBCAdapter implemen
public String doGetMessageReference(TransactionContext c, long seq) throws SQLException, IOException {
PreparedStatement s = null;
ResultSet rs = null;
+ cleanupExclusiveLock.readLock().lock();
try {
s = c.getConnection().prepareStatement(this.statements.getFindMessageStatement());
s.setLong(1, seq);
@@ -303,6 +323,7 @@ public class DefaultJDBCAdapter implemen
}
return rs.getString(1);
} finally {
+ cleanupExclusiveLock.readLock().unlock();
close(rs);
close(s);
}
@@ -310,6 +331,7 @@ public class DefaultJDBCAdapter implemen
public void doRemoveMessage(TransactionContext c, long seq) throws SQLException, IOException {
PreparedStatement s = c.getRemovedMessageStatement();
+ cleanupExclusiveLock.readLock().lock();
try {
if (s == null) {
s = c.getConnection().prepareStatement(this.statements.getRemoveMessageStatement());
@@ -324,6 +346,7 @@ public class DefaultJDBCAdapter implemen
throw new SQLException("Failed to remove message");
}
} finally {
+ cleanupExclusiveLock.readLock().unlock();
if (!this.batchStatments && s != null) {
s.close();
}
@@ -334,6 +357,7 @@ public class DefaultJDBCAdapter implemen
throws Exception {
PreparedStatement s = null;
ResultSet rs = null;
+ cleanupExclusiveLock.readLock().lock();
try {
s = c.getConnection().prepareStatement(this.statements.getFindAllMessagesStatement());
s.setString(1, destination.getQualifiedName());
@@ -352,6 +376,7 @@ public class DefaultJDBCAdapter implemen
}
}
} finally {
+ cleanupExclusiveLock.readLock().unlock();
close(rs);
close(s);
}
@@ -361,6 +386,7 @@ public class DefaultJDBCAdapter implemen
JDBCMessageIdScanListener listener) throws SQLException, IOException {
PreparedStatement s = null;
ResultSet rs = null;
+ cleanupExclusiveLock.readLock().lock();
try {
s = c.getConnection().prepareStatement(this.statements.getFindAllMessageIdsStatement());
s.setMaxRows(limit);
@@ -377,6 +403,7 @@ public class DefaultJDBCAdapter implemen
listener.messageId(id);
}
} finally {
+ cleanupExclusiveLock.readLock().unlock();
close(rs);
close(s);
}
@@ -384,7 +411,10 @@ public class DefaultJDBCAdapter implemen
public void doSetLastAck(TransactionContext c, ActiveMQDestination destination, String clientId,
String subscriptionName, long seq, long prio) throws SQLException, IOException {
+ doCreatePriorityAckRow(c, destination, clientId, subscriptionName, prio);
+ doUpdateLatestAckRow(c, destination, clientId, subscriptionName, seq, prio);
PreparedStatement s = c.getUpdateLastAckStatement();
+ cleanupExclusiveLock.readLock().lock();
try {
if (s == null) {
s = c.getConnection().prepareStatement(this.statements.getUpdateLastAckOfDurableSubStatement());
@@ -393,28 +423,94 @@ public class DefaultJDBCAdapter implemen
}
}
s.setLong(1, seq);
- s.setLong(2, prio);
- s.setString(3, destination.getQualifiedName());
- s.setString(4, clientId);
- s.setString(5, subscriptionName);
+ s.setString(2, destination.getQualifiedName());
+ s.setString(3, clientId);
+ s.setString(4, subscriptionName);
+ s.setLong(5, prio);
if (this.batchStatments) {
s.addBatch();
} else if (s.executeUpdate() != 1) {
- throw new SQLException("Failed add a message");
+ throw new SQLException("Failed update last ack with priority: " + prio + ", for sub: " + subscriptionName);
}
} finally {
+ cleanupExclusiveLock.readLock().unlock();
if (!this.batchStatments) {
s.close();
}
}
}
+ private void doCreatePriorityAckRow(TransactionContext c, ActiveMQDestination destination, String clientId,
+ String subscriptionName,long priority) throws SQLException, IOException{
+ PreparedStatement s = null;
+ ResultSet rs = null;
+ boolean exists = false;
+ cleanupExclusiveLock.readLock().lock();
+ try {
+ s = c.getConnection().prepareStatement(this.statements.getSelectDurablePriorityAckStatement());
+ s.setString(1, destination.getQualifiedName());
+ s.setString(2, clientId);
+ s.setString(3, subscriptionName);
+ s.setLong(4, priority);
+
+ rs = s.executeQuery();
+ exists = rs.next();
+ } finally {
+ cleanupExclusiveLock.readLock().unlock();
+ close(rs);
+ close(s);
+ }
+
+ if (!exists) {
+ cleanupExclusiveLock.readLock().lock();
+ try {
+ s = c.getConnection().prepareStatement(this.statements.getInsertDurablePriorityAckStatement());
+ s.setString(1, destination.getQualifiedName());
+ s.setString(2, clientId);
+ s.setString(3, subscriptionName);
+ s.setLong(4, priority);
+ if (s.executeUpdate() != 1) {
+ throw new IOException("Could not insert initial ack entry for priority: "
+ + priority + ", for sub: " + subscriptionName);
+ }
+
+ } finally {
+ cleanupExclusiveLock.readLock().unlock();
+ close(s);
+ }
+ }
+ }
+
+ private void doUpdateLatestAckRow(TransactionContext c, ActiveMQDestination destination, String clientId,
+ String subscriptionName, long seq, long priority) throws SQLException, IOException{
+ PreparedStatement s = null;
+ ResultSet rs = null;
+ cleanupExclusiveLock.readLock().lock();
+ try {
+ s = c.getConnection().prepareStatement(this.statements.getUpdateDurableLastAckStatement());
+ s.setLong(1, seq);
+ s.setString(2, destination.getQualifiedName());
+ s.setString(3, clientId);
+ s.setString(4, subscriptionName);
+
+ if (s.executeUpdate() != 1) {
+ throw new IOException("Could not update last ack seq : "
+ + seq + ", for sub: " + subscriptionName);
+ }
+ } finally {
+ cleanupExclusiveLock.readLock().unlock();
+ close(rs);
+ close(s);
+ }
+ }
+
public void doRecoverSubscription(TransactionContext c, ActiveMQDestination destination, String clientId,
String subscriptionName, JDBCMessageRecoveryListener listener) throws Exception {
// dumpTables(c,
// destination.getQualifiedName(),clientId,subscriptionName);
PreparedStatement s = null;
ResultSet rs = null;
+ cleanupExclusiveLock.readLock().lock();
try {
s = c.getConnection().prepareStatement(this.statements.getFindAllDurableSubMessagesStatement());
s.setString(1, destination.getQualifiedName());
@@ -435,6 +531,7 @@ public class DefaultJDBCAdapter implemen
}
}
} finally {
+ cleanupExclusiveLock.readLock().unlock();
close(rs);
close(s);
}
@@ -445,6 +542,7 @@ public class DefaultJDBCAdapter implemen
PreparedStatement s = null;
ResultSet rs = null;
+ cleanupExclusiveLock.readLock().lock();
try {
if (isPrioritizedMessages()) {
s = c.getConnection().prepareStatement(this.statements.getFindDurableSubMessagesByPriorityStatement());
@@ -476,6 +574,7 @@ public class DefaultJDBCAdapter implemen
}
}
} finally {
+ cleanupExclusiveLock.readLock().unlock();
close(rs);
close(s);
}
@@ -486,6 +585,7 @@ public class DefaultJDBCAdapter implemen
PreparedStatement s = null;
ResultSet rs = null;
int result = 0;
+ cleanupExclusiveLock.readLock().lock();
try {
s = c.getConnection().prepareStatement(this.statements.getDurableSubscriberMessageCountStatement());
s.setString(1, destination.getQualifiedName());
@@ -496,6 +596,7 @@ public class DefaultJDBCAdapter implemen
result = rs.getInt(1);
}
} finally {
+ cleanupExclusiveLock.readLock().unlock();
close(rs);
close(s);
}
@@ -514,6 +615,7 @@ public class DefaultJDBCAdapter implemen
// dumpTables(c, destination.getQualifiedName(), clientId,
// subscriptionName);
PreparedStatement s = null;
+ cleanupExclusiveLock.readLock().lock();
try {
long lastMessageId = -1;
long priority = Byte.MAX_VALUE - 1;
@@ -542,6 +644,7 @@ public class DefaultJDBCAdapter implemen
throw new IOException("Could not create durable subscription for: " + info.getClientId());
}
} finally {
+ cleanupExclusiveLock.readLock().unlock();
close(s);
}
}
@@ -550,6 +653,7 @@ public class DefaultJDBCAdapter implemen
String clientId, String subscriptionName) throws SQLException, IOException {
PreparedStatement s = null;
ResultSet rs = null;
+ cleanupExclusiveLock.readLock().lock();
try {
s = c.getConnection().prepareStatement(this.statements.getFindDurableSubStatement());
s.setString(1, destination.getQualifiedName());
@@ -568,6 +672,7 @@ public class DefaultJDBCAdapter implemen
ActiveMQDestination.QUEUE_TYPE));
return subscription;
} finally {
+ cleanupExclusiveLock.readLock().unlock();
close(rs);
close(s);
}
@@ -577,6 +682,7 @@ public class DefaultJDBCAdapter implemen
throws SQLException, IOException {
PreparedStatement s = null;
ResultSet rs = null;
+ cleanupExclusiveLock.readLock().lock();
try {
s = c.getConnection().prepareStatement(this.statements.getFindAllDurableSubsStatement());
s.setString(1, destination.getQualifiedName());
@@ -594,6 +700,7 @@ public class DefaultJDBCAdapter implemen
}
return rc.toArray(new SubscriptionInfo[rc.size()]);
} finally {
+ cleanupExclusiveLock.readLock().unlock();
close(rs);
close(s);
}
@@ -602,6 +709,7 @@ public class DefaultJDBCAdapter implemen
public void doRemoveAllMessages(TransactionContext c, ActiveMQDestination destinationName) throws SQLException,
IOException {
PreparedStatement s = null;
+ cleanupExclusiveLock.readLock().lock();
try {
s = c.getConnection().prepareStatement(this.statements.getRemoveAllMessagesStatement());
s.setString(1, destinationName.getQualifiedName());
@@ -611,6 +719,7 @@ public class DefaultJDBCAdapter implemen
s.setString(1, destinationName.getQualifiedName());
s.executeUpdate();
} finally {
+ cleanupExclusiveLock.readLock().unlock();
close(s);
}
}
@@ -618,6 +727,7 @@ public class DefaultJDBCAdapter implemen
public void doDeleteSubscription(TransactionContext c, ActiveMQDestination destination, String clientId,
String subscriptionName) throws SQLException, IOException {
PreparedStatement s = null;
+ cleanupExclusiveLock.readLock().lock();
try {
s = c.getConnection().prepareStatement(this.statements.getDeleteSubscriptionStatement());
s.setString(1, destination.getQualifiedName());
@@ -625,12 +735,14 @@ public class DefaultJDBCAdapter implemen
s.setString(3, subscriptionName);
s.executeUpdate();
} finally {
+ cleanupExclusiveLock.readLock().unlock();
close(s);
}
}
public void doDeleteOldMessages(TransactionContext c) throws SQLException, IOException {
PreparedStatement s = null;
+ cleanupExclusiveLock.writeLock().lock();
try {
LOG.debug("Executing SQL: " + this.statements.getDeleteOldMessagesStatement());
s = c.getConnection().prepareStatement(this.statements.getDeleteOldMessagesStatement());
@@ -638,6 +750,7 @@ public class DefaultJDBCAdapter implemen
int i = s.executeUpdate();
LOG.debug("Deleted " + i + " old message(s).");
} finally {
+ cleanupExclusiveLock.writeLock().unlock();
close(s);
}
}
@@ -647,6 +760,7 @@ public class DefaultJDBCAdapter implemen
PreparedStatement s = null;
ResultSet rs = null;
long[] result = new long[]{-1, Byte.MAX_VALUE - 1};
+ cleanupExclusiveLock.readLock().lock();
try {
s = c.getConnection().prepareStatement(this.statements.getLastAckedDurableSubscriberMessageStatement());
s.setString(1, destination.getQualifiedName());
@@ -657,9 +771,8 @@ public class DefaultJDBCAdapter implemen
result[0] = rs.getLong(1);
result[1] = rs.getLong(2);
}
- rs.close();
- s.close();
} finally {
+ cleanupExclusiveLock.readLock().unlock();
close(rs);
close(s);
}
@@ -684,6 +797,7 @@ public class DefaultJDBCAdapter implemen
HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
PreparedStatement s = null;
ResultSet rs = null;
+ cleanupExclusiveLock.readLock().lock();
try {
s = c.getConnection().prepareStatement(this.statements.getFindAllDestinationsStatement());
rs = s.executeQuery();
@@ -691,6 +805,7 @@ public class DefaultJDBCAdapter implemen
rc.add(ActiveMQDestination.createDestination(rs.getString(1), ActiveMQDestination.QUEUE_TYPE));
}
} finally {
+ cleanupExclusiveLock.readLock().unlock();
close(rs);
close(s);
}
@@ -747,6 +862,7 @@ public class DefaultJDBCAdapter implemen
String clientId, String subscriberName) throws SQLException, IOException {
PreparedStatement s = null;
ResultSet rs = null;
+ cleanupExclusiveLock.readLock().lock();
try {
s = c.getConnection().prepareStatement(this.statements.getNextDurableSubscriberMessageStatement());
s.setString(1, destination.getQualifiedName());
@@ -759,6 +875,7 @@ public class DefaultJDBCAdapter implemen
return getBinaryData(rs, 1);
} finally {
close(rs);
+ cleanupExclusiveLock.readLock().unlock();
close(s);
}
}
@@ -768,6 +885,7 @@ public class DefaultJDBCAdapter implemen
PreparedStatement s = null;
ResultSet rs = null;
int result = 0;
+ cleanupExclusiveLock.readLock().lock();
try {
s = c.getConnection().prepareStatement(this.statements.getDestinationMessageCountStatement());
s.setString(1, destination.getQualifiedName());
@@ -776,6 +894,7 @@ public class DefaultJDBCAdapter implemen
result = rs.getInt(1);
}
} finally {
+ cleanupExclusiveLock.readLock().unlock();
close(rs);
close(s);
}
@@ -786,6 +905,7 @@ public class DefaultJDBCAdapter implemen
long priority, int maxReturned, JDBCMessageRecoveryListener listener) throws Exception {
PreparedStatement s = null;
ResultSet rs = null;
+ cleanupExclusiveLock.readLock().lock();
try {
if (isPrioritizedMessages()) {
s = c.getConnection().prepareStatement(this.statements.getFindNextMessagesByPriorityStatement());
@@ -823,6 +943,7 @@ public class DefaultJDBCAdapter implemen
} catch (Exception e) {
e.printStackTrace();
} finally {
+ cleanupExclusiveLock.readLock().unlock();
close(rs);
close(s);
}
@@ -887,6 +1008,7 @@ public class DefaultJDBCAdapter implemen
throws SQLException, IOException {
PreparedStatement s = null;
ResultSet rs = null;
+ cleanupExclusiveLock.readLock().lock();
try {
s = c.getConnection().prepareStatement(this.statements.getLastProducerSequenceIdStatement());
s.setString(1, id.toString());
@@ -897,6 +1019,7 @@ public class DefaultJDBCAdapter implemen
}
return seq;
} finally {
+ cleanupExclusiveLock.readLock().unlock();
close(rs);
close(s);
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java?rev=1030928&r1=1030927&r2=1030928&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java Thu Nov 4 13:13:37 2010
@@ -49,8 +49,9 @@ abstract public class MessagePriorityTes
Connection conn;
protected Session sess;
- public boolean useCache;
- public boolean dispatchAsync = false;
+ public boolean useCache = true;
+ public boolean dispatchAsync = true;
+ public boolean prioritizeMessages = true;
public int prefetchVal = 500;
public int MSG_NUM = 600;
@@ -66,7 +67,7 @@ abstract public class MessagePriorityTes
adapter = createPersistenceAdapter(true);
broker.setPersistenceAdapter(adapter);
PolicyEntry policy = new PolicyEntry();
- policy.setPrioritizedMessages(true);
+ policy.setPrioritizedMessages(prioritizeMessages);
policy.setUseCache(useCache);
PolicyMap policyMap = new PolicyMap();
policyMap.setDefaultEntry(policy);
@@ -87,11 +88,14 @@ abstract public class MessagePriorityTes
}
protected void tearDown() throws Exception {
- sess.close();
- conn.close();
-
- broker.stop();
- broker.waitUntilStopped();
+ try {
+ sess.close();
+ conn.close();
+ } catch (Exception ignored) {
+ } finally {
+ broker.stop();
+ broker.waitUntilStopped();
+ }
}
public void testStoreConfigured() throws Exception {
@@ -164,7 +168,7 @@ abstract public class MessagePriorityTes
}
protected Message createMessage(int priority) throws Exception {
- final String text = "Message with priority " + priority;
+ final String text = "priority " + priority;
Message msg = sess.createTextMessage(text);
LOG.info("Sending " + text);
return msg;
@@ -199,7 +203,9 @@ abstract public class MessagePriorityTes
public void initCombosForTestDurableSubsReconnect() {
addCombinationValues("prefetchVal", new Object[] {new Integer(1000), new Integer(MSG_NUM/2)});
- addCombinationValues("dispatchAsync", new Object[] {Boolean.TRUE, Boolean.FALSE});
+ // REVISIT = is dispatchAsync = true a problem or is it just the test?
+ addCombinationValues("dispatchAsync", new Object[] {Boolean.FALSE});
+ addCombinationValues("useCache", new Object[] {Boolean.TRUE, Boolean.FALSE});
}
public void testDurableSubsReconnect() throws Exception {
@@ -221,7 +227,7 @@ abstract public class MessagePriorityTes
final int closeFrequency = MSG_NUM/4;
sub = sess.createDurableSubscriber(topic, subName);
for (int i = 0; i < MSG_NUM * 2; i++) {
- Message msg = sub.receive(30000);
+ Message msg = sub.receive(15000);
LOG.debug("received i=" + i + ", " + (msg!=null? msg.getJMSMessageID() : null));
assertNotNull("Message " + i + " was null", msg);
assertEquals("Message " + i + " has wrong priority", i < MSG_NUM ? HIGH_PRI : LOW_PRI, msg.getJMSPriority());
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java?rev=1030928&r1=1030927&r2=1030928&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java Thu Nov 4 13:13:37 2010
@@ -17,6 +17,9 @@
package org.apache.activemq.store.jdbc;
+import java.util.Arrays;
+import java.util.Vector;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Message;
import javax.jms.TopicSubscriber;
import junit.framework.Test;
@@ -40,7 +43,7 @@ public class JDBCMessagePriorityTest ext
dataSource.setShutdownDatabase("false");
jdbc.setDataSource(dataSource);
jdbc.deleteAllMessages();
- jdbc.setCleanupPeriod(1000);
+ jdbc.setCleanupPeriod(2000);
return jdbc;
}
@@ -74,8 +77,10 @@ public class JDBCMessagePriorityTest ext
final int[] priorities = new int[]{HIGH_PRI, MED_HIGH_PRI, MED_PRI, LOW_PRI};
sub = sess.createDurableSubscriber(topic, subName);
for (int i = 0; i < MSG_NUM * 4; i++) {
- Message msg = sub.receive(30000);
- LOG.debug("received i=" + i + ", m=" + (msg!=null? msg.getJMSMessageID() : null));
+ Message msg = sub.receive(10000);
+ LOG.info("received i=" + i + ", m=" + (msg!=null?
+ msg.getJMSMessageID() + ", priority: " + msg.getJMSPriority()
+ : null) );
assertNotNull("Message " + i + " was null", msg);
assertEquals("Message " + i + " has wrong priority", priorities[i / MSG_NUM], msg.getJMSPriority());
if (i > 0 && i % closeFrequency == 0) {
@@ -88,6 +93,53 @@ public class JDBCMessagePriorityTest ext
sub.close();
}
+ public void initCombosForTestConcurrentDurableSubsReconnectWithXLevels() {
+ addCombinationValues("prioritizeMessages", new Object[] {Boolean.TRUE, Boolean.FALSE});
+ }
+
+ public void testConcurrentDurableSubsReconnectWithXLevels() throws Exception {
+ ActiveMQTopic topic = (ActiveMQTopic) sess.createTopic("TEST");
+ final String subName = "priorityDisconnect";
+ TopicSubscriber sub = sess.createDurableSubscriber(topic, subName);
+ sub.close();
+
+ final int maxPriority = 5;
+
+ final AtomicInteger[] messageCounts = new AtomicInteger[maxPriority];
+ Vector<ProducerThread> producers = new Vector<ProducerThread>();
+ for (int priority=0; priority <maxPriority; priority++) {
+ producers.add(new ProducerThread(topic, MSG_NUM, priority));
+ messageCounts[priority] = new AtomicInteger(0);
+ }
+
+ for (ProducerThread producer : producers) {
+ producer.start();
+ }
+
+ final int closeFrequency = MSG_NUM/2;
+
+ sub = sess.createDurableSubscriber(topic, subName);
+ for (int i=0; i < MSG_NUM * maxPriority; i++) {
+ Message msg = sub.receive(10000);
+ LOG.info("received i=" + i + ", m=" + (msg!=null?
+ msg.getJMSMessageID() + ", priority: " + msg.getJMSPriority()
+ : null) );
+ assertNotNull("Message " + i + " was null", msg);
+ messageCounts[msg.getJMSPriority()].incrementAndGet();
+ if (i > 0 && i % closeFrequency == 0) {
+ LOG.info("Closing durable sub.. on: " + i + ", counts: " + Arrays.toString(messageCounts));
+ sub.close();
+ sub = sess.createDurableSubscriber(topic, subName);
+ }
+ }
+ LOG.info("closing on done!");
+ sub.close();
+
+ for (ProducerThread producer : producers) {
+ producer.join();
+ }
+ }
+
public static Test suite() {
return suite(JDBCMessagePriorityTest.class);
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java?rev=1030928&r1=1030927&r2=1030928&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java Thu Nov 4 13:13:37 2010
@@ -23,6 +23,7 @@ import org.apache.activemq.broker.Broker
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import javax.jms.*;
@@ -87,6 +88,10 @@ public class DurableSubscriptionOfflineT
}
setDefaultPersistenceAdapter(broker);
+ if (broker.getPersistenceAdapter() instanceof JDBCPersistenceAdapter) {
+ // ensure it kicks in during tests
+ ((JDBCPersistenceAdapter)broker.getPersistenceAdapter()).setCleanupPeriod(2*1000);
+ }
broker.start();
}
@@ -295,6 +300,13 @@ public class DurableSubscriptionOfflineT
assertEquals("offline consumer got all", sent, listener.count);
}
+ public void initCombosForTestOfflineSubscriptionCanConsumeAfterOnlineSubs() throws Exception {
+ this.addCombinationValues("defaultPersistenceAdapter",
+ new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC});
+ this.addCombinationValues("usePrioritySupport",
+ new Object[]{ Boolean.TRUE, Boolean.FALSE});
+ }
+
public void testOfflineSubscriptionCanConsumeAfterOnlineSubs() throws Exception {
Connection con = createConnection("offCli1");
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionReactivationTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionReactivationTest.java?rev=1030928&r1=1030927&r2=1030928&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionReactivationTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionReactivationTest.java Thu Nov 4 13:13:37 2010
@@ -28,6 +28,7 @@ import junit.framework.Test;
import org.apache.activemq.EmbeddedBrokerTestSupport;
import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
public class DurableSubscriptionReactivationTest extends EmbeddedBrokerTestSupport {
@@ -79,8 +80,13 @@ public class DurableSubscriptionReactiva
protected BrokerService createBroker() throws Exception {
BrokerService answer = super.createBroker();
answer.setKeepDurableSubsActive(keepDurableSubsActive);
+ answer.setPersistenceAdapter(new JDBCPersistenceAdapter());
return answer;
}
+
+ protected boolean isPersistent() {
+ return true;
+ }
public static Test suite() {
return suite(DurableSubscriptionReactivationTest.class);
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionSelectorTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionSelectorTest.java?rev=1030928&r1=1030927&r2=1030928&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionSelectorTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionSelectorTest.java Thu Nov 4 13:13:37 2010
@@ -27,13 +27,14 @@ import javax.jms.TopicSubscriber;
import javax.management.MBeanServer;
import javax.management.ObjectName;
+import junit.framework.Test;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.store.PersistenceAdapter;
-abstract public class DurableSubscriptionSelectorTest extends org.apache.activemq.TestSupport {
+public class DurableSubscriptionSelectorTest extends org.apache.activemq.TestSupport {
MBeanServer mbs;
BrokerService broker = null;
@@ -45,6 +46,14 @@ abstract public class DurableSubscriptio
private int received = 0;
+ public static Test suite() {
+ return suite(DurableSubscriptionSelectorTest.class);
+ }
+
+ public void initCombosForTestSubscription() throws Exception {
+ this.addCombinationValues("defaultPersistenceAdapter", PersistenceAdapterChoice.values());
+ }
+
public void testSubscription() throws Exception {
openConsumer();
for (int i = 0; i < 4000; i++) {
@@ -130,7 +139,7 @@ abstract public class DurableSubscriptio
if (deleteMessages) {
broker.setDeleteAllMessagesOnStartup(true);
}
- broker.setPersistenceAdapter(createPersistenceAdapter());
+ setDefaultPersistenceAdapter(broker);
broker.start();
}
@@ -140,8 +149,6 @@ abstract public class DurableSubscriptio
broker = null;
}
- abstract public PersistenceAdapter createPersistenceAdapter() throws Exception;
-
protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
return new ActiveMQConnectionFactory("vm://test-broker?jms.watchTopicAdvisories=false&waitForStart=5000&create=false");
}
Modified: activemq/trunk/activemq-core/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/resources/log4j.properties?rev=1030928&r1=1030927&r2=1030928&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/resources/log4j.properties (original)
+++ activemq/trunk/activemq-core/src/test/resources/log4j.properties Thu Nov 4 13:13:37 2010
@@ -22,7 +22,7 @@ log4j.rootLogger=INFO, out, stdout
log4j.logger.org.apache.activemq.broker.scheduler=DEBUG
#log4j.logger.org.apache.activemq=TRACE
-#log4j.logger.org.apache.activemq.store.jdbc=DEBUG
+#log4j.logger.org.apache.activemq.store.jdbc=TRACE
#log4j.logger.org.apache.activemq.broker.region.cursors.AbstractStoreCursor=DEBUG
#log4j.logger.org.apache.activemq.store.jdbc.JDBCMessageStore=DEBUG