You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2010/07/22 17:46:19 UTC

svn commit: r966712 - in /activemq/trunk/activemq-core: ./ src/main/java/org/apache/activemq/store/jdbc/ src/main/java/org/apache/activemq/store/jdbc/adapter/ src/test/java/org/apache/activemq/store/

Author: dejanb
Date: Thu Jul 22 15:46:18 2010
New Revision: 966712

URL: http://svn.apache.org/viewvc?rev=966712&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQ-2843 - first stab at supporting priority for durable subs

Modified:
    activemq/trunk/activemq-core/pom.xml
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java

Modified: activemq/trunk/activemq-core/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/pom.xml?rev=966712&r1=966711&r2=966712&view=diff
==============================================================================
--- activemq/trunk/activemq-core/pom.xml (original)
+++ activemq/trunk/activemq-core/pom.xml Thu Jul 22 15:46:18 2010
@@ -506,9 +506,6 @@
             <!-- breaks hudson: disable till we get a chance to give it the time that it needs - http://hudson.zones.apache.org/hudson/job/ActiveMQ/org.apache.activemq$activemq-core/199/testReport/org.apache.activemq.network/BrokerNetworkWithStuckMessagesTest/testBrokerNetworkWithStuckMessages/ -->
             <exclude>**/BrokerNetworkWithStuckMessagesTest.*</exclude>
             
-            <!-- exclude until implemented -->
-            <exclude>**/JDBCMessagePriorityTest.*</exclude>
-            
           </excludes>
         </configuration>
       </plugin>

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java?rev=966712&r1=966711&r2=966712&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java Thu Jul 22 15:46:18 2010
@@ -51,19 +51,19 @@ public interface JDBCAdapter {
 
     void doRecover(TransactionContext c, ActiveMQDestination destination, JDBCMessageRecoveryListener listener) throws Exception;
 
-    void doSetLastAck(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriptionName, long seq) throws SQLException, IOException;
+    void doSetLastAck(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriptionName, long seq, long prio) throws SQLException, IOException;
 
     void doRecoverSubscription(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriptionName, JDBCMessageRecoveryListener listener)
         throws Exception;
 
-    void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriptionName, long seq, int maxReturned,
+    void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriptionName, long seq, long priority, int maxReturned,
                                JDBCMessageRecoveryListener listener) throws Exception;
 
     void doSetSubscriberEntry(TransactionContext c, SubscriptionInfo subscriptionInfo, boolean retroactive) throws SQLException, IOException;
 
     SubscriptionInfo doGetSubscriberEntry(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriptionName) throws SQLException, IOException;
 
-    long getStoreSequenceId(TransactionContext c, ActiveMQDestination destination, MessageId messageID) throws SQLException, IOException;
+    long[] getStoreSequenceId(TransactionContext c, ActiveMQDestination destination, MessageId messageID) throws SQLException, IOException;
 
     void doRemoveAllMessages(TransactionContext c, ActiveMQDestination destinationName) throws SQLException, IOException;
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java?rev=966712&r1=966711&r2=966712&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java Thu Jul 22 15:46:18 2010
@@ -284,7 +284,7 @@ public class JDBCMessageStore extends Ab
         long result = -1;
         TransactionContext c = persistenceAdapter.getTransactionContext();
         try {
-            result = adapter.getStoreSequenceId(c, destination, messageId);
+            result = adapter.getStoreSequenceId(c, destination, messageId)[0];
         } catch (SQLException e) {
             JDBCPersistenceAdapter.log("JDBC Failure: ", e);
             throw IOExceptionSupport.create("Failed to get store sequenceId for messageId: " + messageId +", on: " + destination + ". Reason: " + e, e);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java?rev=966712&r1=966711&r2=966712&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java Thu Jul 22 15:46:18 2010
@@ -40,6 +40,7 @@ import org.apache.activemq.wireformat.Wi
 public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMessageStore {
 
     private Map<String, AtomicLong> subscriberLastMessageMap = new ConcurrentHashMap<String, AtomicLong>();
+    private Map<String, AtomicLong> subscriberLastPriorityMap = new ConcurrentHashMap<String, AtomicLong>();
 
     public JDBCTopicMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQTopic topic, ActiveMQMessageAudit audit) {
         super(persistenceAdapter, adapter, wireFormat, topic, audit);
@@ -49,8 +50,8 @@ public class JDBCTopicMessageStore exten
         // Get a connection and insert the message into the DB.
         TransactionContext c = persistenceAdapter.getTransactionContext(context);
         try {
-        	long seq = adapter.getStoreSequenceId(c, destination, messageId);
-            adapter.doSetLastAck(c, destination, clientId, subscriptionName, seq);
+        	long[] res = adapter.getStoreSequenceId(c, destination, messageId);
+            adapter.doSetLastAck(c, destination, clientId, subscriptionName, res[0], res[1]);
         } catch (SQLException e) {
             JDBCPersistenceAdapter.log("JDBC Failure: ", e);
             throw IOExceptionSupport.create("Failed to store acknowledgment for: " + clientId + " on message " + messageId + " in container: " + e, e);
@@ -63,7 +64,6 @@ public class JDBCTopicMessageStore exten
      * @throws Exception
      */
     public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener) throws Exception {
-
         TransactionContext c = persistenceAdapter.getTransactionContext();
         try {
             adapter.doRecoverSubscription(c, destination, clientId, subscriptionName, new JDBCMessageRecoveryListener() {
@@ -91,14 +91,18 @@ public class JDBCTopicMessageStore exten
         TransactionContext c = persistenceAdapter.getTransactionContext();
         String subcriberId = getSubscriptionKey(clientId, subscriptionName);
         AtomicLong last = subscriberLastMessageMap.get(subcriberId);
+        AtomicLong priority = subscriberLastPriorityMap.get(subcriberId);
         if (last == null) {
             long lastAcked = adapter.doGetLastAckedDurableSubscriberMessageId(c, destination, clientId, subscriptionName);
             last = new AtomicLong(lastAcked);
             subscriberLastMessageMap.put(subcriberId, last);
+            priority = new AtomicLong(Byte.MAX_VALUE - 1);
+            subscriberLastMessageMap.put(subcriberId, priority);
         }
         final AtomicLong finalLast = last;
+        final AtomicLong finalPriority = priority;
         try {
-            adapter.doRecoverNextMessages(c, destination, clientId, subscriptionName, last.get(), maxReturned, new JDBCMessageRecoveryListener() {
+            adapter.doRecoverNextMessages(c, destination, clientId, subscriptionName, last.get(), priority.get(), maxReturned, new JDBCMessageRecoveryListener() {
 
                 public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
                     if (listener.hasSpace()) {
@@ -106,6 +110,7 @@ public class JDBCTopicMessageStore exten
                         msg.getMessageId().setBrokerSequenceId(sequenceId);
                         listener.recoverMessage(msg);
                         finalLast.set(sequenceId);
+                        finalPriority.set(msg.getPriority());
                         return true;
                     }
                     return false;
@@ -120,13 +125,15 @@ public class JDBCTopicMessageStore exten
             JDBCPersistenceAdapter.log("JDBC Failure: ", e);
         } finally {
             c.close();
-            last.set(finalLast.get());
+            subscriberLastMessageMap.put(subcriberId, finalLast);
+            subscriberLastPriorityMap.put(subcriberId, finalPriority);
         }
     }
 
     public void resetBatching(String clientId, String subscriptionName) {
         String subcriberId = getSubscriptionKey(clientId, subscriptionName);
         subscriberLastMessageMap.remove(subcriberId);
+        subscriberLastPriorityMap.remove(subcriberId);
     }
 
     /**

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java?rev=966712&r1=966711&r2=966712&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java Thu Jul 22 15:46:18 2010
@@ -52,6 +52,7 @@ public class Statements {
     private String deleteSubscriptionStatement;
     private String findAllDurableSubMessagesStatement;
     private String findDurableSubMessagesStatement;
+    private String findDurableSubMessagesByPriorityStatement;
     private String findAllDestinationsStatement;
     private String removeAllMessagesStatement;
     private String removeAllSubscriptionsStatement;
@@ -86,7 +87,7 @@ public class Statements {
                     + ", SUB_DEST " + stringIdDataType 
                     + ", CLIENT_ID " + stringIdDataType + " NOT NULL" + ", SUB_NAME " + stringIdDataType
                     + " NOT NULL" + ", SELECTOR " + stringIdDataType + ", LAST_ACKED_ID " + sequenceDataType
-                    + ", PRIMARY KEY ( CONTAINER, CLIENT_ID, SUB_NAME))", 
+                    + ", PRIORITY " + sequenceDataType + ", PRIMARY KEY ( CONTAINER, CLIENT_ID, SUB_NAME))", 
                 "CREATE TABLE " + getFullLockTableName() 
                     + "( ID " + longDataType + " NOT NULL, TIME " + longDataType 
                     + ", BROKER_NAME " + stringIdDataType + ", PRIMARY KEY (ID) )",
@@ -130,7 +131,7 @@ public class Statements {
 
     public String getFindMessageSequenceIdStatement() {
         if (findMessageSequenceIdStatement == null) {
-            findMessageSequenceIdStatement = "SELECT ID FROM " + getFullMessageTableName()
+            findMessageSequenceIdStatement = "SELECT ID, PRIORITY FROM " + getFullMessageTableName()
                                              + " WHERE MSGID_PROD=? AND MSGID_SEQ=? AND CONTAINER=?";
         }
         return findMessageSequenceIdStatement;
@@ -195,8 +196,8 @@ public class Statements {
         if (createDurableSubStatement == null) {
             createDurableSubStatement = "INSERT INTO "
                                         + getFullAckTableName()
-                                        + "(CONTAINER, CLIENT_ID, SUB_NAME, SELECTOR, LAST_ACKED_ID, SUB_DEST) "
-                                        + "VALUES (?, ?, ?, ?, ?, ?)";
+                                        + "(CONTAINER, CLIENT_ID, SUB_NAME, SELECTOR, LAST_ACKED_ID, SUB_DEST, PRIORITY) "
+                                        + "VALUES (?, ?, ?, ?, ?, ?, ?)";
         }
         return createDurableSubStatement;
     }
@@ -219,7 +220,7 @@ public class Statements {
 
     public String getUpdateLastAckOfDurableSubStatement() {
         if (updateLastAckOfDurableSubStatement == null) {
-            updateLastAckOfDurableSubStatement = "UPDATE " + getFullAckTableName() + " SET LAST_ACKED_ID=?"
+            updateLastAckOfDurableSubStatement = "UPDATE " + getFullAckTableName() + " SET LAST_ACKED_ID=?, PRIORITY=?"
                                                  + " WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=?";
         }
         return updateLastAckOfDurableSubStatement;
@@ -254,6 +255,18 @@ public class Statements {
         }
         return findDurableSubMessagesStatement;
     }
+    
+    public String getFindDurableSubMessagesByPriorityStatement() {
+        if (findDurableSubMessagesByPriorityStatement == null) {
+            findDurableSubMessagesByPriorityStatement = "SELECT M.ID, M.MSG FROM " + getFullMessageTableName() + " M, "
+                                              + getFullAckTableName() + " D "
+                                              + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
+                                              + " AND M.CONTAINER=D.CONTAINER AND "
+                                              + "((M.ID > ? AND M.PRIORITY = ?) OR M.PRIORITY < ?)"
+                                              + " ORDER BY M.PRIORITY DESC, M.ID";
+        }
+        return findDurableSubMessagesByPriorityStatement;
+    }    
 
     public String findAllDurableSubMessagesStatement() {
         if (findAllDurableSubMessagesStatement == null) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java?rev=966712&r1=966711&r2=966712&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java Thu Jul 22 15:46:18 2010
@@ -17,8 +17,11 @@
 package org.apache.activemq.store.jdbc.adapter;
 
 import java.io.IOException;
+import java.io.PrintStream;
+import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.ArrayList;
@@ -26,6 +29,8 @@ import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.Set;
 
+import javax.jms.Message;
+
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.ProducerId;
@@ -249,7 +254,7 @@ public class DefaultJDBCAdapter implemen
         }
     }
 
-    public long getStoreSequenceId(TransactionContext c, ActiveMQDestination destination, MessageId messageID) throws SQLException, IOException {
+    public long[] getStoreSequenceId(TransactionContext c, ActiveMQDestination destination, MessageId messageID) throws SQLException, IOException {
         PreparedStatement s = null;
         ResultSet rs = null;
         try {
@@ -259,9 +264,9 @@ public class DefaultJDBCAdapter implemen
             s.setString(3, destination.getQualifiedName());
             rs = s.executeQuery();
             if (!rs.next()) {
-                return 0;
+                return new long[]{0,0};
             }
-            return rs.getLong(1);
+            return new long[]{rs.getLong(1), rs.getLong(2)};
         } finally {
             close(rs);
             close(s);
@@ -378,7 +383,7 @@ public class DefaultJDBCAdapter implemen
     }
     
     public void doSetLastAck(TransactionContext c, ActiveMQDestination destination, String clientId,
-            String subscriptionName, long seq) throws SQLException, IOException {
+            String subscriptionName, long seq, long prio) throws SQLException, IOException {
         PreparedStatement s = c.getUpdateLastAckStatement();
         try {
             if (s == null) {
@@ -388,9 +393,10 @@ public class DefaultJDBCAdapter implemen
                 }
             }
             s.setLong(1, seq);
-            s.setString(2, destination.getQualifiedName());
-            s.setString(3, clientId);
-            s.setString(4, subscriptionName);
+            s.setLong(2, prio);
+            s.setString(3, destination.getQualifiedName());
+            s.setString(4, clientId);
+            s.setString(5, subscriptionName);
             if (this.batchStatments) {
                 s.addBatch();
             } else if (s.executeUpdate() != 1) {
@@ -435,16 +441,25 @@ public class DefaultJDBCAdapter implemen
     }
 
     public void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, String clientId,
-            String subscriptionName, long seq, int maxReturned, JDBCMessageRecoveryListener listener) throws Exception {
+            String subscriptionName, long seq, long priority, int maxReturned, JDBCMessageRecoveryListener listener) throws Exception {
+        
         PreparedStatement s = null;
         ResultSet rs = null;
         try {
-            s = c.getConnection().prepareStatement(this.statements.getFindDurableSubMessagesStatement());
+            if (isPrioritizedMessages()) {
+                s = c.getConnection().prepareStatement(this.statements.getFindDurableSubMessagesByPriorityStatement());
+            } else {
+                s = c.getConnection().prepareStatement(this.statements.getFindDurableSubMessagesStatement());
+            }
             s.setMaxRows(maxReturned);
             s.setString(1, destination.getQualifiedName());
             s.setString(2, clientId);
             s.setString(3, subscriptionName);
             s.setLong(4, seq);
+            if (isPrioritizedMessages()) {
+                s.setLong(5, priority);
+                s.setLong(6, priority);
+            }
             rs = s.executeQuery();
             int count = 0;
             if (this.statements.isUseExternalMessageReferences()) {
@@ -507,6 +522,7 @@ public class DefaultJDBCAdapter implemen
         PreparedStatement s = null;
         try {
             long lastMessageId = -1;
+            long priority = Byte.MAX_VALUE - 1;
             if (!retroactive) {
                 s = c.getConnection().prepareStatement(this.statements.getFindLastSequenceIdInMsgsStatement());
                 ResultSet rs = null;
@@ -527,6 +543,7 @@ public class DefaultJDBCAdapter implemen
             s.setString(4, info.getSelector());
             s.setLong(5, lastMessageId);
             s.setString(6, info.getSubscribedDestination().getQualifiedName());
+            s.setLong(7, priority);
             if (s.executeUpdate() != 1) {
                 throw new IOException("Could not create durable subscription for: " + info.getClientId());
             }
@@ -813,29 +830,61 @@ public class DefaultJDBCAdapter implemen
             close(s);
         }
     }
-    /*
-     * Useful for debugging. public void dumpTables(Connection c, String destinationName, String clientId, String
-     * subscriptionName) throws SQLException { printQuery(c, "Select * from ACTIVEMQ_MSGS", System.out); printQuery(c,
-     * "Select * from ACTIVEMQ_ACKS", System.out); PreparedStatement s = c.prepareStatement("SELECT M.ID,
-     * D.LAST_ACKED_ID FROM " +"ACTIVEMQ_MSGS M, " +"ACTIVEMQ_ACKS D " +"WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND
-     * D.SUB_NAME=?" +" AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID" +" ORDER BY M.ID");
-     * s.setString(1,destinationName); s.setString(2,clientId); s.setString(3,subscriptionName);
-     * printQuery(s,System.out); }
-     * 
-     * public void dumpTables(Connection c) throws SQLException { printQuery(c, "Select * from ACTIVEMQ_MSGS",
-     * System.out); printQuery(c, "Select * from ACTIVEMQ_ACKS", System.out); }
-     * 
-     * private void printQuery(Connection c, String query, PrintStream out) throws SQLException {
-     * printQuery(c.prepareStatement(query), out); }
-     * 
-     * private void printQuery(PreparedStatement s, PrintStream out) throws SQLException {
-     * 
-     * ResultSet set=null; try { set = s.executeQuery(); ResultSetMetaData metaData = set.getMetaData(); for( int i=1; i<=
-     * metaData.getColumnCount(); i++ ) { if(i==1) out.print("||"); out.print(metaData.getColumnName(i)+"||"); }
-     * out.println(); while(set.next()) { for( int i=1; i<= metaData.getColumnCount(); i++ ) { if(i==1) out.print("|");
-     * out.print(set.getString(i)+"|"); } out.println(); } } finally { try { set.close(); } catch (Throwable ignore) {}
-     * try { s.close(); } catch (Throwable ignore) {} } }
-     */
+    
+/*    public void dumpTables(Connection c, String destinationName, String clientId, String
+      subscriptionName) throws SQLException { 
+        printQuery(c, "Select * from ACTIVEMQ_MSGS", System.out); 
+        printQuery(c, "Select * from ACTIVEMQ_ACKS", System.out); 
+        PreparedStatement s = c.prepareStatement("SELECT M.ID, D.LAST_ACKED_ID FROM " 
+                + "ACTIVEMQ_MSGS M, " +"ACTIVEMQ_ACKS D " 
+                + "WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?" 
+                + " AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID" 
+                + " ORDER BY M.ID");
+      s.setString(1,destinationName); s.setString(2,clientId); s.setString(3,subscriptionName);
+      printQuery(s,System.out); }
+
+    public void dumpTables(Connection c) throws SQLException {
+        printQuery(c, "Select * from ACTIVEMQ_MSGS", System.out);
+        printQuery(c, "Select * from ACTIVEMQ_ACKS", System.out);
+    }
+
+    private void printQuery(Connection c, String query, PrintStream out)
+            throws SQLException {
+        printQuery(c.prepareStatement(query), out);
+    }
+
+    private void printQuery(PreparedStatement s, PrintStream out)
+            throws SQLException {
+
+        ResultSet set = null;
+        try {
+            set = s.executeQuery();
+            ResultSetMetaData metaData = set.getMetaData();
+            for (int i = 1; i <= metaData.getColumnCount(); i++) {
+                if (i == 1)
+                    out.print("||");
+                out.print(metaData.getColumnName(i) + "||");
+            }
+            out.println();
+            while (set.next()) {
+                for (int i = 1; i <= metaData.getColumnCount(); i++) {
+                    if (i == 1)
+                        out.print("|");
+                    out.print(set.getString(i) + "|");
+                }
+                out.println();
+            }
+        } finally {
+            try {
+                set.close();
+            } catch (Throwable ignore) {
+            }
+            try {
+                s.close();
+            } catch (Throwable ignore) {
+            }
+        }
+    }  */  
 
     public long doGetLastProducerSequenceId(TransactionContext c, ProducerId id)
             throws SQLException, IOException {

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java?rev=966712&r1=966711&r2=966712&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java Thu Jul 22 15:46:18 2010
@@ -177,7 +177,7 @@ abstract public class MessagePriorityTes
         sub = sess.createDurableSubscriber(topic, "priority");
         for (int i = 0; i < MSG_NUM * 2; i++) {
             Message msg = sub.receive(1000);
-            assertNotNull(msg);
+            assertNotNull("Message " + i + " was null", msg);
             assertEquals("Message " + i + " has wrong priority", i < MSG_NUM ? HIGH_PRI : LOW_PRI, msg.getJMSPriority());
         }