You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2010/11/04 14:13:38 UTC

svn commit: r1030928 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/store/jdbc/ main/java/org/apache/activemq/store/jdbc/adapter/ test/java/org/apache/activemq/store/ test/java/org/apache/activemq/store/jdbc/ test/java/org/apache...

Author: gtully
Date: Thu Nov  4 13:13:37 2010
New Revision: 1030928

URL: http://svn.apache.org/viewvc?rev=1030928&view=rev
Log:
further resolution to https://issues.apache.org/activemq/browse/AMQ-2980, concurrent producers was still problematic as the store needed to be traversed multiple times, requiring ack locations per priority. Resolution to https://issues.apache.org/activemq/browse/AMQ-2551 fell out of decreasing the cleanup interval in some of the tests, cleanup needs an exclusive lock so it won't cause contention

Removed:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/AMQStoreDurableSubscriptionSelectorTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/KahaDBDurableSubscriptionSelectorTest.java
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionReactivationTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionSelectorTest.java
    activemq/trunk/activemq-core/src/test/resources/log4j.properties

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java?rev=1030928&r1=1030927&r2=1030928&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java Thu Nov  4 13:13:37 2010
@@ -670,7 +670,7 @@ public class JDBCPersistenceAdapter exte
     /**
      * set the Transaction isolation level to something other that TRANSACTION_READ_UNCOMMITTED
      * This allowable dirty isolation level may not be achievable in clustered DB environments
-     * so a more restrictive and expensive option may be needed like TRANSACTION_REPEATABE_READ
+     * so a more restrictive and expensive option may be needed like TRANSACTION_REPEATABLE_READ
      * see isolation level constants in {@link java.sql.Connection}
      * @param transactionIsolation the isolation level to use
      */

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java?rev=1030928&r1=1030927&r2=1030928&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java Thu Nov  4 13:13:37 2010
@@ -70,6 +70,10 @@ public class Statements {
     private boolean useLockCreateWhereClause;
     private String findAllMessageIdsStatement;
     private String lastProducerSequenceIdStatement;
+    private String selectDurablePriorityAckStatement;
+
+    private String insertDurablePriorityAckStatement;
+    private String updateDurableLastAckStatement;
 
     public String[] getCreateSchemaStatements() {
         if (createSchemaStatements == null) {
@@ -93,7 +97,9 @@ public class Statements {
                 "INSERT INTO " + getFullLockTableName() + "(ID) VALUES (1)", 
                 "ALTER TABLE " + getFullMessageTableName() + " ADD PRIORITY " + sequenceDataType,
                 "CREATE INDEX " + getFullMessageTableName() + "_PIDX ON " + getFullMessageTableName() + " (PRIORITY)",
-                "ALTER TABLE " + getFullAckTableName() + " ADD PRIORITY " + sequenceDataType,
+                "ALTER TABLE " + getFullAckTableName() + " ADD PRIORITY " + sequenceDataType  + " NOT NULL DEFAULT 5",
+                "ALTER TABLE " + getFullAckTableName() + " DROP PRIMARY KEY",
+                "ALTER TABLE " + getFullAckTableName() + " ADD PRIMARY KEY (CONTAINER, CLIENT_ID, SUB_NAME, PRIORITY)",
             };
         }
         return createSchemaStatements;
@@ -207,7 +213,7 @@ public class Statements {
     public String getFindDurableSubStatement() {
         if (findDurableSubStatement == null) {
             findDurableSubStatement = "SELECT SELECTOR, SUB_DEST " + "FROM " + getFullAckTableName()
-                                      + " WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=?";
+                                      + " WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=? AND SUB_DEST IS NOT NULL";
         }
         return findDurableSubStatement;
     }
@@ -215,15 +221,15 @@ public class Statements {
     public String getFindAllDurableSubsStatement() {
         if (findAllDurableSubsStatement == null) {
             findAllDurableSubsStatement = "SELECT SELECTOR, SUB_NAME, CLIENT_ID, SUB_DEST" + " FROM "
-                                          + getFullAckTableName() + " WHERE CONTAINER=?";
+                                          + getFullAckTableName() + " WHERE CONTAINER=? AND SUB_DEST IS NOT NULL";
         }
         return findAllDurableSubsStatement;
     }
 
     public String getUpdateLastAckOfDurableSubStatement() {
         if (updateLastAckOfDurableSubStatement == null) {
-            updateLastAckOfDurableSubStatement = "UPDATE " + getFullAckTableName() + " SET LAST_ACKED_ID=?, PRIORITY=?"
-                                                 + " WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=?";
+            updateLastAckOfDurableSubStatement = "UPDATE " + getFullAckTableName() + " SET LAST_ACKED_ID=?"
+                                                 + " WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=? AND PRIORITY=?";
         }
         return updateLastAckOfDurableSubStatement;
     }
@@ -264,7 +270,19 @@ public class Statements {
                                               + getFullAckTableName() + " D "
                                               + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
                                               + " AND M.CONTAINER=D.CONTAINER AND "
-                                              + "((M.ID > ? AND M.PRIORITY = ?) OR M.PRIORITY < ?)"
+                                              + "((M.ID > ? AND M.PRIORITY = ?) "
+                                              + "   OR (M.PRIORITY <> ? "
+                                              + "     AND ( M.ID >"
+                                              + "          ( SELECT LAST_ACKED_ID FROM " + getFullAckTableName()
+                                              + "           WHERE CONTAINER=D.CONTAINER AND CLIENT_ID=D.CLIENT_ID"
+                                              + "           AND SUB_NAME=D.SUB_NAME AND PRIORITY=M.PRIORITY )"
+                                              + "          OR "
+                                              + "          ( (SELECT COUNT(LAST_ACKED_ID) FROM " + getFullAckTableName()
+                                              + "           WHERE CONTAINER=D.CONTAINER AND CLIENT_ID=D.CLIENT_ID"
+                                              + "           AND SUB_NAME=D.SUB_NAME AND PRIORITY=M.PRIORITY) = 0)"
+                                              + "        )"
+                                              + "   )"
+                                              + ")"
                                               + " ORDER BY M.PRIORITY DESC, M.ID";
         }
         return findDurableSubMessagesByPriorityStatement;
@@ -306,8 +324,17 @@ public class Statements {
                                                      + " M, "
                                                      + getFullAckTableName()
                                                      + " D "
-                                                     + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
-                                                     + " AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID AND M.PRIORITY <= D.PRIORITY";
+                                                     + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=? AND D.SUB_DEST IS NOT NULL"
+                                                     + " AND M.CONTAINER=D.CONTAINER "
+                                                     + "     AND ( M.ID >"
+                                                     + "          ( SELECT LAST_ACKED_ID FROM " + getFullAckTableName()
+                                                     + "           WHERE CONTAINER=D.CONTAINER AND CLIENT_ID=D.CLIENT_ID"
+                                                     + "           AND SUB_NAME=D.SUB_NAME AND PRIORITY=M.PRIORITY )"
+                                                     + "          OR "
+                                                     + "          ( (SELECT COUNT(LAST_ACKED_ID) FROM " + getFullAckTableName()
+                                                     + "           WHERE CONTAINER=D.CONTAINER AND CLIENT_ID=D.CLIENT_ID"
+                                                     + "           AND SUB_NAME=D.SUB_NAME AND PRIORITY=M.PRIORITY) = 0)"
+                                                     + "        )";
         }
         return durableSubscriberMessageCountStatement;
     }
@@ -338,15 +365,20 @@ public class Statements {
             deleteOldMessagesStatement = "DELETE FROM " + getFullMessageTableName()
                                          + " WHERE ( EXPIRATION<>0 AND EXPIRATION<?)"
                                          + " OR (ID < "
-                                         + "   ( SELECT min(" + getFullAckTableName() + ".LAST_ACKED_ID)"
-                                         + "      FROM " + getFullAckTableName() + " WHERE "
+                                         + "     ( SELECT min(" + getFullAckTableName() + ".LAST_ACKED_ID)"
+                                         + "       FROM " + getFullAckTableName() + " WHERE "
                                          +          getFullAckTableName() + ".CONTAINER="
-                                         +          getFullMessageTableName() + ".CONTAINER )"
-                                         + "   AND PRIORITY >= "
-                                         + "   ( SELECT min(" + getFullAckTableName() + ".PRIORITY) "
-                                         + "     FROM " + getFullAckTableName() + " WHERE "
+                                         +          getFullMessageTableName() + ".CONTAINER"
+                                         + "        AND " + getFullAckTableName() + ".SUB_DEST IS NULL"
+                                         + "        AND " + getFullAckTableName() + ".PRIORITY=" + getFullMessageTableName() + ".PRIORITY )"
+                                         + "    AND ID <"
+                                         + "     ( SELECT min(" + getFullAckTableName() + ".LAST_ACKED_ID)"
+                                         + "       FROM " + getFullAckTableName() + " WHERE "
                                          +          getFullAckTableName() + ".CONTAINER="
-                                         + getFullMessageTableName() + ".CONTAINER ))";
+                                         +          getFullMessageTableName() + ".CONTAINER"
+                                         + "        AND " + getFullAckTableName() + ".SUB_DEST IS NOT NULL )"
+                                         + "   )";
+
         }
         return deleteOldMessagesStatement;
     }
@@ -418,6 +450,35 @@ public class Statements {
         return lastAckedDurableSubscriberMessageStatement;
     }
 
+    public String getSelectDurablePriorityAckStatement() {
+        if (selectDurablePriorityAckStatement == null) {
+            selectDurablePriorityAckStatement = "SELECT LAST_ACKED_ID FROM " + getFullAckTableName()
+                                                    + " WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=?"
+                                                    + " AND PRIORITY = ?";
+        }
+        return selectDurablePriorityAckStatement;
+    }
+
+    public String getInsertDurablePriorityAckStatement() {
+        if (insertDurablePriorityAckStatement == null) {
+            insertDurablePriorityAckStatement = "INSERT INTO "
+                                  + getFullAckTableName()
+                                  + "(CONTAINER, CLIENT_ID, SUB_NAME, PRIORITY)"
+                                  + " VALUES (?, ?, ?, ?)";            
+       }
+        return insertDurablePriorityAckStatement;
+    }
+
+
+    public String getUpdateDurableLastAckStatement() {
+        if (updateDurableLastAckStatement == null) {
+            updateDurableLastAckStatement  = "UPDATE " + getFullAckTableName()
+                    + " SET LAST_ACKED_ID = ? WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=?"
+                    + " AND PRIORITY = " + (Byte.MAX_VALUE - 1);
+        }
+        return  updateDurableLastAckStatement;
+    }
+
     public String getFullMessageTableName() {
         return getTablePrefix() + getMessageTableName();
     }
@@ -709,4 +770,15 @@ public class Statements {
         this.lastProducerSequenceIdStatement = lastProducerSequenceIdStatement;
     }
 
+    public void setSelectDurablePriorityAckStatement(String selectDurablePriorityAckStatement) {
+        this.selectDurablePriorityAckStatement = selectDurablePriorityAckStatement;
+    }
+
+    public void setInsertDurablePriorityAckStatement(String insertDurablePriorityAckStatement) {
+        this.insertDurablePriorityAckStatement = insertDurablePriorityAckStatement;
+    }
+
+    public void setUpdateDurableLastAckStatement(String updateDurableLastAckStatement) {
+        this.updateDurableLastAckStatement = updateDurableLastAckStatement;
+    }    
 }
\ No newline at end of file

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java?rev=1030928&r1=1030927&r2=1030928&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java Thu Nov  4 13:13:37 2010
@@ -28,6 +28,8 @@ import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.Set;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import javax.jms.Message;
 
@@ -62,6 +64,7 @@ public class DefaultJDBCAdapter implemen
     protected Statements statements;
     protected boolean batchStatments = true;
     protected boolean prioritizedMessages;
+    protected ReadWriteLock cleanupExclusiveLock = new ReentrantReadWriteLock();
 
     protected void setBinaryData(PreparedStatement s, int index, byte data[]) throws SQLException {
         s.setBytes(index, data);
@@ -73,6 +76,7 @@ public class DefaultJDBCAdapter implemen
 
     public void doCreateTables(TransactionContext c) throws SQLException, IOException {
         Statement s = null;
+        cleanupExclusiveLock.writeLock().lock();
         try {
             // Check to see if the table already exists. If it does, then don't
             // log warnings during startup.
@@ -112,6 +116,7 @@ public class DefaultJDBCAdapter implemen
             }
             c.getConnection().commit();
         } finally {
+            cleanupExclusiveLock.writeLock().unlock();
             try {
                 s.close();
             } catch (Throwable e) {
@@ -121,6 +126,7 @@ public class DefaultJDBCAdapter implemen
 
     public void doDropTables(TransactionContext c) throws SQLException, IOException {
         Statement s = null;
+        cleanupExclusiveLock.writeLock().lock();
         try {
             s = c.getConnection().createStatement();
             String[] dropStatments = this.statements.getDropSchemaStatements();
@@ -139,6 +145,7 @@ public class DefaultJDBCAdapter implemen
             }
             c.getConnection().commit();
         } finally {
+            cleanupExclusiveLock.writeLock().unlock();
             try {
                 s.close();
             } catch (Throwable e) {
@@ -149,6 +156,7 @@ public class DefaultJDBCAdapter implemen
     public long doGetLastMessageStoreSequenceId(TransactionContext c) throws SQLException, IOException {
         PreparedStatement s = null;
         ResultSet rs = null;
+        cleanupExclusiveLock.readLock().lock();
         try {
             s = c.getConnection().prepareStatement(this.statements.getFindLastSequenceIdInMsgsStatement());
             rs = s.executeQuery();
@@ -171,6 +179,7 @@ public class DefaultJDBCAdapter implemen
             long seq = Math.max(seq1, seq2);
             return seq;
         } finally {
+            cleanupExclusiveLock.readLock().unlock();
             close(rs);
             close(s);
         }
@@ -179,6 +188,7 @@ public class DefaultJDBCAdapter implemen
     public byte[] doGetMessageById(TransactionContext c, long storeSequenceId) throws SQLException, IOException {
         PreparedStatement s = null;
         ResultSet rs = null;
+        cleanupExclusiveLock.readLock().lock();
         try {
             s = c.getConnection().prepareStatement(
                     this.statements.getFindMessageByIdStatement());
@@ -189,6 +199,7 @@ public class DefaultJDBCAdapter implemen
             }
             return getBinaryData(rs, 1);
         } finally {
+            cleanupExclusiveLock.readLock().unlock();
             close(rs);
             close(s);
         }
@@ -198,6 +209,7 @@ public class DefaultJDBCAdapter implemen
     public void doAddMessage(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination, byte[] data,
             long expiration, byte priority) throws SQLException, IOException {
         PreparedStatement s = c.getAddMessageStatement();
+        cleanupExclusiveLock.readLock().lock();
         try {
             if (s == null) {
                 s = c.getConnection().prepareStatement(this.statements.getAddMessageStatement());
@@ -218,6 +230,7 @@ public class DefaultJDBCAdapter implemen
                 throw new SQLException("Failed add a message");
             }
         } finally {
+            cleanupExclusiveLock.readLock().unlock();
             if (!this.batchStatments) {
                 if (s != null) {
                     s.close();
@@ -229,6 +242,7 @@ public class DefaultJDBCAdapter implemen
     public void doAddMessageReference(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination,
             long expirationTime, String messageRef) throws SQLException, IOException {
         PreparedStatement s = c.getAddMessageStatement();
+        cleanupExclusiveLock.readLock().lock();
         try {
             if (s == null) {
                 s = c.getConnection().prepareStatement(this.statements.getAddMessageStatement());
@@ -248,6 +262,7 @@ public class DefaultJDBCAdapter implemen
                 throw new SQLException("Failed add a message");
             }
         } finally {
+            cleanupExclusiveLock.readLock().unlock();
             if (!this.batchStatments) {
                 s.close();
             }
@@ -257,6 +272,7 @@ public class DefaultJDBCAdapter implemen
     public long[] getStoreSequenceId(TransactionContext c, ActiveMQDestination destination, MessageId messageID) throws SQLException, IOException {
         PreparedStatement s = null;
         ResultSet rs = null;
+        cleanupExclusiveLock.readLock().lock();
         try {
             s = c.getConnection().prepareStatement(this.statements.getFindMessageSequenceIdStatement());
             s.setString(1, messageID.getProducerId().toString());
@@ -268,6 +284,7 @@ public class DefaultJDBCAdapter implemen
             }
             return new long[]{rs.getLong(1), rs.getLong(2)};
         } finally {
+            cleanupExclusiveLock.readLock().unlock();
             close(rs);
             close(s);
         }
@@ -276,6 +293,7 @@ public class DefaultJDBCAdapter implemen
     public byte[] doGetMessage(TransactionContext c, MessageId id) throws SQLException, IOException {
         PreparedStatement s = null;
         ResultSet rs = null;
+        cleanupExclusiveLock.readLock().lock();
         try {
             s = c.getConnection().prepareStatement(this.statements.getFindMessageStatement());
             s.setString(1, id.getProducerId().toString());
@@ -286,6 +304,7 @@ public class DefaultJDBCAdapter implemen
             }
             return getBinaryData(rs, 1);
         } finally {
+            cleanupExclusiveLock.readLock().unlock();
             close(rs);
             close(s);
         }
@@ -294,6 +313,7 @@ public class DefaultJDBCAdapter implemen
     public String doGetMessageReference(TransactionContext c, long seq) throws SQLException, IOException {
         PreparedStatement s = null;
         ResultSet rs = null;
+        cleanupExclusiveLock.readLock().lock();
         try {
             s = c.getConnection().prepareStatement(this.statements.getFindMessageStatement());
             s.setLong(1, seq);
@@ -303,6 +323,7 @@ public class DefaultJDBCAdapter implemen
             }
             return rs.getString(1);
         } finally {
+            cleanupExclusiveLock.readLock().unlock();
             close(rs);
             close(s);
         }
@@ -310,6 +331,7 @@ public class DefaultJDBCAdapter implemen
 
     public void doRemoveMessage(TransactionContext c, long seq) throws SQLException, IOException {
         PreparedStatement s = c.getRemovedMessageStatement();
+        cleanupExclusiveLock.readLock().lock();
         try {
             if (s == null) {
                 s = c.getConnection().prepareStatement(this.statements.getRemoveMessageStatement());
@@ -324,6 +346,7 @@ public class DefaultJDBCAdapter implemen
                 throw new SQLException("Failed to remove message");
             }
         } finally {
+            cleanupExclusiveLock.readLock().unlock();
             if (!this.batchStatments && s != null) {
                 s.close();
             }
@@ -334,6 +357,7 @@ public class DefaultJDBCAdapter implemen
             throws Exception {
         PreparedStatement s = null;
         ResultSet rs = null;
+        cleanupExclusiveLock.readLock().lock();
         try {
             s = c.getConnection().prepareStatement(this.statements.getFindAllMessagesStatement());
             s.setString(1, destination.getQualifiedName());
@@ -352,6 +376,7 @@ public class DefaultJDBCAdapter implemen
                 }
             }
         } finally {
+            cleanupExclusiveLock.readLock().unlock();
             close(rs);
             close(s);
         }
@@ -361,6 +386,7 @@ public class DefaultJDBCAdapter implemen
             JDBCMessageIdScanListener listener) throws SQLException, IOException {
         PreparedStatement s = null;
         ResultSet rs = null;
+        cleanupExclusiveLock.readLock().lock();
         try {
             s = c.getConnection().prepareStatement(this.statements.getFindAllMessageIdsStatement());
             s.setMaxRows(limit);
@@ -377,6 +403,7 @@ public class DefaultJDBCAdapter implemen
                 listener.messageId(id);
             }
         } finally {
+            cleanupExclusiveLock.readLock().unlock();
             close(rs);
             close(s);
         }
@@ -384,7 +411,10 @@ public class DefaultJDBCAdapter implemen
     
     public void doSetLastAck(TransactionContext c, ActiveMQDestination destination, String clientId,
             String subscriptionName, long seq, long prio) throws SQLException, IOException {
+        doCreatePriorityAckRow(c, destination, clientId, subscriptionName, prio);
+        doUpdateLatestAckRow(c, destination, clientId, subscriptionName, seq, prio);
         PreparedStatement s = c.getUpdateLastAckStatement();
+        cleanupExclusiveLock.readLock().lock();
         try {
             if (s == null) {
                 s = c.getConnection().prepareStatement(this.statements.getUpdateLastAckOfDurableSubStatement());
@@ -393,28 +423,94 @@ public class DefaultJDBCAdapter implemen
                 }
             }
             s.setLong(1, seq);
-            s.setLong(2, prio);
-            s.setString(3, destination.getQualifiedName());
-            s.setString(4, clientId);
-            s.setString(5, subscriptionName);
+            s.setString(2, destination.getQualifiedName());
+            s.setString(3, clientId);
+            s.setString(4, subscriptionName);
+            s.setLong(5, prio);
             if (this.batchStatments) {
                 s.addBatch();
             } else if (s.executeUpdate() != 1) {
-                throw new SQLException("Failed add a message");
+                throw new SQLException("Failed update last ack with priority: " + prio + ", for sub: " + subscriptionName);
             }
         } finally {
+            cleanupExclusiveLock.readLock().unlock();
             if (!this.batchStatments) {
                 s.close();
             }
         }
     }
 
+    private void doCreatePriorityAckRow(TransactionContext c, ActiveMQDestination destination, String clientId,
+                                        String subscriptionName,long priority) throws SQLException, IOException{
+        PreparedStatement s = null;
+        ResultSet rs = null;
+        boolean exists = false;
+        cleanupExclusiveLock.readLock().lock();
+        try {
+            s = c.getConnection().prepareStatement(this.statements.getSelectDurablePriorityAckStatement());
+            s.setString(1, destination.getQualifiedName());
+            s.setString(2, clientId);
+            s.setString(3, subscriptionName);
+            s.setLong(4, priority);
+
+            rs = s.executeQuery();
+            exists = rs.next();
+        } finally {
+            cleanupExclusiveLock.readLock().unlock();
+            close(rs);
+            close(s);
+        }
+
+        if (!exists) {
+            cleanupExclusiveLock.readLock().lock();
+            try {
+                s = c.getConnection().prepareStatement(this.statements.getInsertDurablePriorityAckStatement());
+                s.setString(1, destination.getQualifiedName());
+                s.setString(2, clientId);
+                s.setString(3, subscriptionName);
+                s.setLong(4, priority);
+                if (s.executeUpdate() != 1) {
+                    throw new IOException("Could not insert initial ack entry for priority: "
+                            + priority + ", for sub: " + subscriptionName);
+                }
+
+            } finally {
+                cleanupExclusiveLock.readLock().unlock();
+                close(s);
+            }
+        }
+    }
+
+    private void doUpdateLatestAckRow(TransactionContext c, ActiveMQDestination destination, String clientId,
+                                        String subscriptionName, long seq, long priority) throws SQLException, IOException{
+        PreparedStatement s = null;
+        ResultSet rs = null;
+        cleanupExclusiveLock.readLock().lock();
+        try {
+            s = c.getConnection().prepareStatement(this.statements.getUpdateDurableLastAckStatement());
+            s.setLong(1, seq);
+            s.setString(2, destination.getQualifiedName());
+            s.setString(3, clientId);
+            s.setString(4, subscriptionName);
+
+           if (s.executeUpdate() != 1) {
+                throw new IOException("Could not update last ack seq : "
+                            + seq + ", for sub: " + subscriptionName);
+           }
+        } finally {
+            cleanupExclusiveLock.readLock().unlock();
+            close(rs);
+            close(s);
+        }
+    }
+
     public void doRecoverSubscription(TransactionContext c, ActiveMQDestination destination, String clientId,
             String subscriptionName, JDBCMessageRecoveryListener listener) throws Exception {
         // dumpTables(c,
         // destination.getQualifiedName(),clientId,subscriptionName);
         PreparedStatement s = null;
         ResultSet rs = null;
+        cleanupExclusiveLock.readLock().lock();
         try {
             s = c.getConnection().prepareStatement(this.statements.getFindAllDurableSubMessagesStatement());
             s.setString(1, destination.getQualifiedName());
@@ -435,6 +531,7 @@ public class DefaultJDBCAdapter implemen
                 }
             }
         } finally {
+            cleanupExclusiveLock.readLock().unlock();
             close(rs);
             close(s);
         }
@@ -445,6 +542,7 @@ public class DefaultJDBCAdapter implemen
         
         PreparedStatement s = null;
         ResultSet rs = null;
+        cleanupExclusiveLock.readLock().lock();
         try {
             if (isPrioritizedMessages()) {
                 s = c.getConnection().prepareStatement(this.statements.getFindDurableSubMessagesByPriorityStatement());
@@ -476,6 +574,7 @@ public class DefaultJDBCAdapter implemen
                 }
             }
         } finally {
+            cleanupExclusiveLock.readLock().unlock();
             close(rs);
             close(s);
         }
@@ -486,6 +585,7 @@ public class DefaultJDBCAdapter implemen
         PreparedStatement s = null;
         ResultSet rs = null;
         int result = 0;
+        cleanupExclusiveLock.readLock().lock();
         try {
             s = c.getConnection().prepareStatement(this.statements.getDurableSubscriberMessageCountStatement());
             s.setString(1, destination.getQualifiedName());
@@ -496,6 +596,7 @@ public class DefaultJDBCAdapter implemen
                 result = rs.getInt(1);
             }
         } finally {
+            cleanupExclusiveLock.readLock().unlock();
             close(rs);
             close(s);
         }
@@ -514,6 +615,7 @@ public class DefaultJDBCAdapter implemen
         // dumpTables(c, destination.getQualifiedName(), clientId,
         // subscriptionName);
         PreparedStatement s = null;
+        cleanupExclusiveLock.readLock().lock();
         try {
             long lastMessageId = -1;
             long priority = Byte.MAX_VALUE - 1;
@@ -542,6 +644,7 @@ public class DefaultJDBCAdapter implemen
                 throw new IOException("Could not create durable subscription for: " + info.getClientId());
             }
         } finally {
+            cleanupExclusiveLock.readLock().unlock();
             close(s);
         }
     }
@@ -550,6 +653,7 @@ public class DefaultJDBCAdapter implemen
             String clientId, String subscriptionName) throws SQLException, IOException {
         PreparedStatement s = null;
         ResultSet rs = null;
+        cleanupExclusiveLock.readLock().lock();
         try {
             s = c.getConnection().prepareStatement(this.statements.getFindDurableSubStatement());
             s.setString(1, destination.getQualifiedName());
@@ -568,6 +672,7 @@ public class DefaultJDBCAdapter implemen
                     ActiveMQDestination.QUEUE_TYPE));
             return subscription;
         } finally {
+            cleanupExclusiveLock.readLock().unlock();
             close(rs);
             close(s);
         }
@@ -577,6 +682,7 @@ public class DefaultJDBCAdapter implemen
             throws SQLException, IOException {
         PreparedStatement s = null;
         ResultSet rs = null;
+        cleanupExclusiveLock.readLock().lock();
         try {
             s = c.getConnection().prepareStatement(this.statements.getFindAllDurableSubsStatement());
             s.setString(1, destination.getQualifiedName());
@@ -594,6 +700,7 @@ public class DefaultJDBCAdapter implemen
             }
             return rc.toArray(new SubscriptionInfo[rc.size()]);
         } finally {
+            cleanupExclusiveLock.readLock().unlock();
             close(rs);
             close(s);
         }
@@ -602,6 +709,7 @@ public class DefaultJDBCAdapter implemen
     public void doRemoveAllMessages(TransactionContext c, ActiveMQDestination destinationName) throws SQLException,
             IOException {
         PreparedStatement s = null;
+        cleanupExclusiveLock.readLock().lock();
         try {
             s = c.getConnection().prepareStatement(this.statements.getRemoveAllMessagesStatement());
             s.setString(1, destinationName.getQualifiedName());
@@ -611,6 +719,7 @@ public class DefaultJDBCAdapter implemen
             s.setString(1, destinationName.getQualifiedName());
             s.executeUpdate();
         } finally {
+            cleanupExclusiveLock.readLock().unlock();
             close(s);
         }
     }
@@ -618,6 +727,7 @@ public class DefaultJDBCAdapter implemen
     public void doDeleteSubscription(TransactionContext c, ActiveMQDestination destination, String clientId,
             String subscriptionName) throws SQLException, IOException {
         PreparedStatement s = null;
+        cleanupExclusiveLock.readLock().lock();
         try {
             s = c.getConnection().prepareStatement(this.statements.getDeleteSubscriptionStatement());
             s.setString(1, destination.getQualifiedName());
@@ -625,12 +735,14 @@ public class DefaultJDBCAdapter implemen
             s.setString(3, subscriptionName);
             s.executeUpdate();
         } finally {
+            cleanupExclusiveLock.readLock().unlock();
             close(s);
         }
     }
 
     public void doDeleteOldMessages(TransactionContext c) throws SQLException, IOException {
         PreparedStatement s = null;
+        cleanupExclusiveLock.writeLock().lock();
         try {
             LOG.debug("Executing SQL: " + this.statements.getDeleteOldMessagesStatement());
             s = c.getConnection().prepareStatement(this.statements.getDeleteOldMessagesStatement());
@@ -638,6 +750,7 @@ public class DefaultJDBCAdapter implemen
             int i = s.executeUpdate();
             LOG.debug("Deleted " + i + " old message(s).");
         } finally {
+            cleanupExclusiveLock.writeLock().unlock();
             close(s);
         }
     }
@@ -647,6 +760,7 @@ public class DefaultJDBCAdapter implemen
         PreparedStatement s = null;
         ResultSet rs = null;
         long[] result = new long[]{-1, Byte.MAX_VALUE - 1};
+        cleanupExclusiveLock.readLock().lock();
         try {
             s = c.getConnection().prepareStatement(this.statements.getLastAckedDurableSubscriberMessageStatement());
             s.setString(1, destination.getQualifiedName());
@@ -657,9 +771,8 @@ public class DefaultJDBCAdapter implemen
                 result[0] = rs.getLong(1);
                 result[1] = rs.getLong(2);
             }
-            rs.close();
-            s.close();
         } finally {
+            cleanupExclusiveLock.readLock().unlock();
             close(rs);
             close(s);
         }
@@ -684,6 +797,7 @@ public class DefaultJDBCAdapter implemen
         HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
         PreparedStatement s = null;
         ResultSet rs = null;
+        cleanupExclusiveLock.readLock().lock();
         try {
             s = c.getConnection().prepareStatement(this.statements.getFindAllDestinationsStatement());
             rs = s.executeQuery();
@@ -691,6 +805,7 @@ public class DefaultJDBCAdapter implemen
                 rc.add(ActiveMQDestination.createDestination(rs.getString(1), ActiveMQDestination.QUEUE_TYPE));
             }
         } finally {
+            cleanupExclusiveLock.readLock().unlock();
             close(rs);
             close(s);
         }
@@ -747,6 +862,7 @@ public class DefaultJDBCAdapter implemen
             String clientId, String subscriberName) throws SQLException, IOException {
         PreparedStatement s = null;
         ResultSet rs = null;
+        cleanupExclusiveLock.readLock().lock();
         try {
             s = c.getConnection().prepareStatement(this.statements.getNextDurableSubscriberMessageStatement());
             s.setString(1, destination.getQualifiedName());
@@ -759,6 +875,7 @@ public class DefaultJDBCAdapter implemen
             return getBinaryData(rs, 1);
         } finally {
             close(rs);
+            cleanupExclusiveLock.readLock().unlock();
             close(s);
         }
     }
@@ -768,6 +885,7 @@ public class DefaultJDBCAdapter implemen
         PreparedStatement s = null;
         ResultSet rs = null;
         int result = 0;
+        cleanupExclusiveLock.readLock().lock();
         try {
             s = c.getConnection().prepareStatement(this.statements.getDestinationMessageCountStatement());
             s.setString(1, destination.getQualifiedName());
@@ -776,6 +894,7 @@ public class DefaultJDBCAdapter implemen
                 result = rs.getInt(1);
             }
         } finally {
+            cleanupExclusiveLock.readLock().unlock();
             close(rs);
             close(s);
         }
@@ -786,6 +905,7 @@ public class DefaultJDBCAdapter implemen
             long priority, int maxReturned, JDBCMessageRecoveryListener listener) throws Exception {
         PreparedStatement s = null;
         ResultSet rs = null;
+        cleanupExclusiveLock.readLock().lock();
         try {
             if (isPrioritizedMessages()) {
                 s = c.getConnection().prepareStatement(this.statements.getFindNextMessagesByPriorityStatement());
@@ -823,6 +943,7 @@ public class DefaultJDBCAdapter implemen
         } catch (Exception e) {
             e.printStackTrace();
         } finally {
+            cleanupExclusiveLock.readLock().unlock();
             close(rs);
             close(s);
         }
@@ -887,6 +1008,7 @@ public class DefaultJDBCAdapter implemen
             throws SQLException, IOException {
         PreparedStatement s = null;
         ResultSet rs = null;
+        cleanupExclusiveLock.readLock().lock();
         try {
             s = c.getConnection().prepareStatement(this.statements.getLastProducerSequenceIdStatement());
             s.setString(1, id.toString());
@@ -897,6 +1019,7 @@ public class DefaultJDBCAdapter implemen
             }
             return seq;
         } finally {
+            cleanupExclusiveLock.readLock().unlock();
             close(rs);
             close(s);
         }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java?rev=1030928&r1=1030927&r2=1030928&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java Thu Nov  4 13:13:37 2010
@@ -49,8 +49,9 @@ abstract public class MessagePriorityTes
     Connection conn;
     protected Session sess;
     
-    public boolean useCache;
-    public boolean dispatchAsync = false;
+    public boolean useCache = true;
+    public boolean dispatchAsync = true;
+    public boolean prioritizeMessages = true;
     public int prefetchVal = 500;
 
     public int MSG_NUM = 600;
@@ -66,7 +67,7 @@ abstract public class MessagePriorityTes
         adapter = createPersistenceAdapter(true);
         broker.setPersistenceAdapter(adapter);
         PolicyEntry policy = new PolicyEntry();
-        policy.setPrioritizedMessages(true);
+        policy.setPrioritizedMessages(prioritizeMessages);
         policy.setUseCache(useCache);
         PolicyMap policyMap = new PolicyMap();
         policyMap.setDefaultEntry(policy);
@@ -87,11 +88,14 @@ abstract public class MessagePriorityTes
     }
     
     protected void tearDown() throws Exception {
-        sess.close();
-        conn.close();
-        
-        broker.stop();
-        broker.waitUntilStopped();
+        try {
+            sess.close();
+            conn.close();
+        } catch (Exception ignored) {
+        } finally {
+            broker.stop();
+            broker.waitUntilStopped();
+        }
     }
     
     public void testStoreConfigured() throws Exception {
@@ -164,7 +168,7 @@ abstract public class MessagePriorityTes
     }
     
     protected Message createMessage(int priority) throws Exception {
-        final String text = "Message with priority " + priority;
+        final String text = "priority " + priority;
         Message msg = sess.createTextMessage(text);
         LOG.info("Sending  " + text);
         return msg;
@@ -199,7 +203,9 @@ abstract public class MessagePriorityTes
 
     public void initCombosForTestDurableSubsReconnect() {
         addCombinationValues("prefetchVal", new Object[] {new Integer(1000), new Integer(MSG_NUM/2)});
-        addCombinationValues("dispatchAsync", new Object[] {Boolean.TRUE, Boolean.FALSE});
+        // REVISIT = is dispatchAsync = true a problem or is it just the test?
+        addCombinationValues("dispatchAsync", new Object[] {Boolean.FALSE});
+        addCombinationValues("useCache", new Object[] {Boolean.TRUE, Boolean.FALSE});
     }
     
     public void testDurableSubsReconnect() throws Exception {
@@ -221,7 +227,7 @@ abstract public class MessagePriorityTes
         final int closeFrequency = MSG_NUM/4;
         sub = sess.createDurableSubscriber(topic, subName);
         for (int i = 0; i < MSG_NUM * 2; i++) {
-            Message msg = sub.receive(30000);
+            Message msg = sub.receive(15000);
             LOG.debug("received i=" + i + ", " + (msg!=null? msg.getJMSMessageID() : null));
             assertNotNull("Message " + i + " was null", msg);
             assertEquals("Message " + i + " has wrong priority", i < MSG_NUM ? HIGH_PRI : LOW_PRI, msg.getJMSPriority());

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java?rev=1030928&r1=1030927&r2=1030928&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java Thu Nov  4 13:13:37 2010
@@ -17,6 +17,9 @@
 
 package org.apache.activemq.store.jdbc;
 
+import java.util.Arrays;
+import java.util.Vector;
+import java.util.concurrent.atomic.AtomicInteger;
 import javax.jms.Message;
 import javax.jms.TopicSubscriber;
 import junit.framework.Test;
@@ -40,7 +43,7 @@ public class JDBCMessagePriorityTest ext
         dataSource.setShutdownDatabase("false");
         jdbc.setDataSource(dataSource);
         jdbc.deleteAllMessages();
-        jdbc.setCleanupPeriod(1000);
+        jdbc.setCleanupPeriod(2000);
         return jdbc;
     }
 
@@ -74,8 +77,10 @@ public class JDBCMessagePriorityTest ext
         final int[] priorities = new int[]{HIGH_PRI, MED_HIGH_PRI, MED_PRI, LOW_PRI};
         sub = sess.createDurableSubscriber(topic, subName);
         for (int i = 0; i < MSG_NUM * 4; i++) {
-            Message msg = sub.receive(30000);
-            LOG.debug("received i=" + i + ", m=" + (msg!=null? msg.getJMSMessageID() : null));
+            Message msg = sub.receive(10000);
+            LOG.info("received i=" + i + ", m=" + (msg!=null?
+                    msg.getJMSMessageID() + ", priority: " + msg.getJMSPriority()
+                    : null) );
             assertNotNull("Message " + i + " was null", msg);
             assertEquals("Message " + i + " has wrong priority", priorities[i / MSG_NUM], msg.getJMSPriority());
             if (i > 0 && i % closeFrequency == 0) {
@@ -88,6 +93,53 @@ public class JDBCMessagePriorityTest ext
         sub.close();
     }
 
+    public void initCombosForTestConcurrentDurableSubsReconnectWithXLevels() {
+        addCombinationValues("prioritizeMessages", new Object[] {Boolean.TRUE, Boolean.FALSE});
+    }
+
+    public void testConcurrentDurableSubsReconnectWithXLevels() throws Exception {
+        ActiveMQTopic topic = (ActiveMQTopic) sess.createTopic("TEST");
+        final String subName = "priorityDisconnect";
+        TopicSubscriber sub = sess.createDurableSubscriber(topic, subName);
+        sub.close();
+
+        final int maxPriority = 5;
+
+        final AtomicInteger[] messageCounts = new AtomicInteger[maxPriority];
+        Vector<ProducerThread> producers = new Vector<ProducerThread>();
+        for (int priority=0; priority <maxPriority; priority++) {
+            producers.add(new ProducerThread(topic, MSG_NUM, priority));
+            messageCounts[priority] = new AtomicInteger(0);
+        }
+
+        for (ProducerThread producer : producers) {
+            producer.start();
+        }
+
+        final int closeFrequency = MSG_NUM/2;
+
+        sub = sess.createDurableSubscriber(topic, subName);
+        for (int i=0; i < MSG_NUM * maxPriority; i++) {
+            Message msg = sub.receive(10000);
+            LOG.info("received i=" + i + ", m=" + (msg!=null?
+                    msg.getJMSMessageID() + ", priority: " + msg.getJMSPriority()
+                    : null) );
+            assertNotNull("Message " + i + " was null", msg);
+            messageCounts[msg.getJMSPriority()].incrementAndGet();
+            if (i > 0 && i % closeFrequency == 0) {
+                LOG.info("Closing durable sub.. on: " + i + ", counts: " + Arrays.toString(messageCounts));
+                sub.close();
+                sub = sess.createDurableSubscriber(topic, subName);
+            }
+        }
+        LOG.info("closing on done!");
+        sub.close();
+
+        for (ProducerThread producer : producers) {
+            producer.join();
+        }
+    }
+
     public static Test suite() {
         return suite(JDBCMessagePriorityTest.class);
     }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java?rev=1030928&r1=1030927&r2=1030928&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java Thu Nov  4 13:13:37 2010
@@ -23,6 +23,7 @@ import org.apache.activemq.broker.Broker
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
 import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
 
 import javax.jms.*;
@@ -87,6 +88,10 @@ public class DurableSubscriptionOfflineT
         }
         
         setDefaultPersistenceAdapter(broker);
+        if (broker.getPersistenceAdapter() instanceof JDBCPersistenceAdapter) {
+            // ensure it kicks in during tests
+            ((JDBCPersistenceAdapter)broker.getPersistenceAdapter()).setCleanupPeriod(2*1000);
+        }
         broker.start();
     }
 
@@ -295,6 +300,13 @@ public class DurableSubscriptionOfflineT
         assertEquals("offline consumer got all", sent, listener.count);
     }
 
+    public void initCombosForTestOfflineSubscriptionCanConsumeAfterOnlineSubs() throws Exception {
+        this.addCombinationValues("defaultPersistenceAdapter",
+                new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC});
+        this.addCombinationValues("usePrioritySupport",
+                new Object[]{ Boolean.TRUE, Boolean.FALSE});
+    }
+
     public void testOfflineSubscriptionCanConsumeAfterOnlineSubs() throws Exception {
         Connection con = createConnection("offCli1");
         Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionReactivationTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionReactivationTest.java?rev=1030928&r1=1030927&r2=1030928&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionReactivationTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionReactivationTest.java Thu Nov  4 13:13:37 2010
@@ -28,6 +28,7 @@ import junit.framework.Test;
 
 import org.apache.activemq.EmbeddedBrokerTestSupport;
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
 
 public class DurableSubscriptionReactivationTest extends EmbeddedBrokerTestSupport {
 
@@ -79,8 +80,13 @@ public class DurableSubscriptionReactiva
     protected BrokerService createBroker() throws Exception {
         BrokerService answer = super.createBroker();
         answer.setKeepDurableSubsActive(keepDurableSubsActive);
+        answer.setPersistenceAdapter(new JDBCPersistenceAdapter());
         return answer;
     }
+
+    protected boolean isPersistent() {
+        return true;
+    }
     
     public static Test suite() {
         return suite(DurableSubscriptionReactivationTest.class);

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionSelectorTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionSelectorTest.java?rev=1030928&r1=1030927&r2=1030928&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionSelectorTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionSelectorTest.java Thu Nov  4 13:13:37 2010
@@ -27,13 +27,14 @@ import javax.jms.TopicSubscriber;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
+import junit.framework.Test;
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.store.PersistenceAdapter;
 
-abstract public class DurableSubscriptionSelectorTest extends org.apache.activemq.TestSupport {
+public class DurableSubscriptionSelectorTest extends org.apache.activemq.TestSupport {
 
     MBeanServer mbs;
     BrokerService broker = null;
@@ -45,6 +46,14 @@ abstract public class DurableSubscriptio
 
     private int received = 0;
 
+    public static Test suite() {
+        return suite(DurableSubscriptionSelectorTest.class);
+    }
+
+    public void initCombosForTestSubscription() throws Exception {
+        this.addCombinationValues("defaultPersistenceAdapter", PersistenceAdapterChoice.values());
+    }
+
     public void testSubscription() throws Exception {
         openConsumer();
         for (int i = 0; i < 4000; i++) {
@@ -130,7 +139,7 @@ abstract public class DurableSubscriptio
         if (deleteMessages) {
             broker.setDeleteAllMessagesOnStartup(true);
         }
-        broker.setPersistenceAdapter(createPersistenceAdapter());
+        setDefaultPersistenceAdapter(broker);
         broker.start();
     }
 
@@ -140,8 +149,6 @@ abstract public class DurableSubscriptio
         broker = null;
     }
     
-    abstract public PersistenceAdapter createPersistenceAdapter() throws Exception;
-
     protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
         return new ActiveMQConnectionFactory("vm://test-broker?jms.watchTopicAdvisories=false&waitForStart=5000&create=false");
     }

Modified: activemq/trunk/activemq-core/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/resources/log4j.properties?rev=1030928&r1=1030927&r2=1030928&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/resources/log4j.properties (original)
+++ activemq/trunk/activemq-core/src/test/resources/log4j.properties Thu Nov  4 13:13:37 2010
@@ -22,7 +22,7 @@ log4j.rootLogger=INFO, out, stdout
 
 log4j.logger.org.apache.activemq.broker.scheduler=DEBUG
 #log4j.logger.org.apache.activemq=TRACE
-#log4j.logger.org.apache.activemq.store.jdbc=DEBUG
+#log4j.logger.org.apache.activemq.store.jdbc=TRACE
 #log4j.logger.org.apache.activemq.broker.region.cursors.AbstractStoreCursor=DEBUG
 #log4j.logger.org.apache.activemq.store.jdbc.JDBCMessageStore=DEBUG