You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2011/04/20 12:39:26 UTC

svn commit: r1095352 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/broker/region/cursors/ main/java/org/apache/activemq/broker/region/policy/ main/java/org/apache/activemq/store/jdbc/...

Author: gtully
Date: Wed Apr 20 10:39:25 2011
New Revision: 1095352

URL: http://svn.apache.org/viewvc?rev=1095352&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3288 - JDBC persistence adapter, intermittent performance degradation when a durable subscriber of priority messages falls behind. Store recovery was sometimes ignoring batch limit and would recover till memory limit was reached. Resolved issue with out or order delvery and immediatePriorityDelivery policy around memory limit boundaries. Resolved issue with over eager cleanup when priority and non priority destinations are mixed. Removed potential full table scan from cleanup. Resolved issue with null message ref (prefetch=0) and expiry. Added a bunch of corresponding tests

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingDurableSubscriberMessageStoragePolicy.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/AxionJDBCAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ConcurrentProducerDurableConsumerTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?rev=1095352&r1=1095351&r2=1095352&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java Wed Apr 20 10:39:25 2011
@@ -207,9 +207,11 @@ public class DurableTopicSubscription ex
     
     protected MessageDispatch createMessageDispatch(MessageReference node, Message message) {
         MessageDispatch md = super.createMessageDispatch(node, message);
-        Integer count = redeliveredMessages.get(node.getMessageId());
-        if (count != null) {
-            md.setRedeliveryCounter(count.intValue());
+        if (node != QueueMessageReference.NULL_MESSAGE) {
+            Integer count = redeliveredMessages.get(node.getMessageId());
+            if (count != null) {
+                md.setRedeliveryCounter(count.intValue());
+            }
         }
         return md;
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java?rev=1095352&r1=1095351&r2=1095352&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java Wed Apr 20 10:39:25 2011
@@ -114,7 +114,7 @@ final class NullMessageReference impleme
     }
 
     public boolean isExpired() {
-        throw new RuntimeException("not implemented");
+        return false;
     }
 
     public boolean isPersistent() {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=1095352&r1=1095351&r2=1095352&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Wed Apr 20 10:39:25 2011
@@ -447,7 +447,7 @@ public class Topic extends BaseDestinati
                 @Override
                 public void afterCommit() throws Exception {
                     // It could take while before we receive the commit
-                    // operration.. by that time the message could have
+                    // operation.. by that time the message could have
                     // expired..
                     if (broker.isExpired(message)) {
                         getDestinationStatistics().getExpired().increment();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java?rev=1095352&r1=1095351&r2=1095352&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java Wed Apr 20 10:39:25 2011
@@ -32,7 +32,7 @@ import org.slf4j.LoggerFactory;
 public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor implements MessageRecoveryListener {
     private static final Logger LOG = LoggerFactory.getLogger(AbstractStoreCursor.class);
     protected final Destination regionDestination;
-    private final PendingList batchList;
+    protected final PendingList batchList;
     private Iterator<MessageReference> iterator = null;
     protected boolean batchResetNeeded = true;
     private boolean storeHasMessages = false;
@@ -102,7 +102,7 @@ public abstract class AbstractStoreCurso
     }
     
     
-    public final void reset() {
+    public final synchronized void reset() {
         if (batchList.isEmpty()) {
             try {
                 fillBatch();
@@ -185,7 +185,7 @@ public abstract class AbstractStoreCurso
                 if (LOG.isTraceEnabled()) {
                     LOG.trace(this + " - disabling cache"
                             + ", lastCachedId: " + lastCachedId
-                            + " current node Id: " + node.getMessageId());
+                            + " current node Id: " + node.getMessageId() + " batchList size: " + batchList.size());
                 }
                 setBatch(lastCachedId);
                 lastCachedId = null;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java?rev=1095352&r1=1095351&r2=1095352&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java Wed Apr 20 10:39:25 2011
@@ -290,15 +290,15 @@ public class FilePendingMessageCursor ex
      */
     @Override
     public synchronized MessageReference next() {
-        Message message = (Message) iter.next();
-        last = message;
+        MessageReference reference = iter.next();
+        last = reference;
         if (!isDiskListEmpty()) {
             // got from disk
-            message.setRegionDestination(regionDestination);
-            message.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
+            reference.getMessage().setRegionDestination(regionDestination);
+            reference.getMessage().setMemoryUsage(this.getSystemUsage().getMemoryUsage());
         }
-        message.incrementReferenceCount();
-        return message;
+        reference.incrementReferenceCount();
+        return reference;
     }
 
     /**

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java?rev=1095352&r1=1095351&r2=1095352&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java Wed Apr 20 10:39:25 2011
@@ -42,7 +42,6 @@ import org.slf4j.LoggerFactory;
 public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
 
     private static final Logger LOG = LoggerFactory.getLogger(StoreDurableSubscriberCursor.class);
-    private static final int UNKNOWN = -1;
     private final String clientId;
     private final String subscriberName;
     private final Map<Destination, TopicStorePrefetch> topics = new HashMap<Destination, TopicStorePrefetch>();
@@ -50,7 +49,6 @@ public class StoreDurableSubscriberCurso
     private final PendingMessageCursor nonPersistent;
     private PendingMessageCursor currentCursor;
     private final DurableTopicSubscription subscription;
-    private int cacheCurrentLowestPriority = UNKNOWN;
     private boolean immediatePriorityDispatch = true;
     /**
      * @param broker Broker for this cursor
@@ -125,6 +123,7 @@ public class StoreDurableSubscriberCurso
             tsp.setMessageAudit(getMessageAudit());
             tsp.setEnableAudit(isEnableAudit());
             tsp.setMemoryUsageHighWaterMark(getMemoryUsageHighWaterMark());
+            tsp.setUseCache(isUseCache());
             topics.put(destination, tsp);
             storePrefetches.add(tsp);
             if (isStarted()) {
@@ -196,29 +195,17 @@ public class StoreDurableSubscriberCurso
                 Destination dest = msg.getRegionDestination();
                 TopicStorePrefetch tsp = topics.get(dest);
                 if (tsp != null) {
-                    // cache can become high priority cache for immediate dispatch
-                    final int priority = msg.getPriority();
-                    if (isStarted() && this.prioritizedMessages && immediatePriorityDispatch && !tsp.isCacheEnabled()) {
-                        if (priority > tsp.getCurrentLowestPriority()) {
+                    tsp.addMessageLast(node);
+                    if (prioritizedMessages && immediatePriorityDispatch && tsp.isPaging()) {
+                        if (msg.getPriority() > tsp.getLastRecoveredPriority()) {
+                            tsp.recoverMessage(node.getMessage(), true);
                             if (LOG.isTraceEnabled()) {
-                                LOG.trace("enabling cache for cursor on high priority message " + priority
-                                        + ", current lowest: " + tsp.getCurrentLowestPriority());
+                                LOG.trace("cached high priority (" + msg.getPriority() + ") message:" +
+                                    msg.getMessageId() + ", current paged batch priority: " +
+                                    tsp.getLastRecoveredPriority() + ", cache size:" + tsp.batchList.size());
                             }
-                            tsp.setCacheEnabled(true);
-                            cacheCurrentLowestPriority = tsp.getCurrentLowestPriority();
-                        }
-                    } else if (cacheCurrentLowestPriority != UNKNOWN && priority <= cacheCurrentLowestPriority) {
-                        // go to the store to get next priority message as lower priority messages may be recovered
-                        // already and need to acked sequence order
-                        if (LOG.isTraceEnabled()) {
-                            LOG.trace("disabling/clearing cache for cursor on lower priority message "
-                                    + priority + ", tsp current lowest: " + tsp.getCurrentLowestPriority()
-                                    + " cache lowest: " + cacheCurrentLowestPriority);
                         }
-                        tsp.setCacheEnabled(false);
-                        cacheCurrentLowestPriority = UNKNOWN;
                     }
-                    tsp.addMessageLast(node);
                 }
             }
 
@@ -330,7 +317,6 @@ public class StoreDurableSubscriberCurso
         for (PendingMessageCursor tsp : storePrefetches) {
             tsp.gc();
         }
-        cacheCurrentLowestPriority = UNKNOWN;
     }
 
     @Override

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java?rev=1095352&r1=1095351&r2=1095352&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java Wed Apr 20 10:39:25 2011
@@ -38,8 +38,7 @@ class TopicStorePrefetch extends Abstrac
     private final String clientId;
     private final String subscriberName;
     private final Subscription subscription;
-    private int currentLowestPriority;
-    
+    private byte lastRecoveredPriority = 9;
     /**
      * @param topic
      * @param clientId
@@ -53,15 +52,6 @@ class TopicStorePrefetch extends Abstrac
         this.subscriberName = subscriberName;
         this.maxProducersToAudit=32;
         this.maxAuditDepth=10000;
-        resetCurrentLowestPriority();
-    }
-
-    private void resetCurrentLowestPriority() {
-        currentLowestPriority = 9;
-    }
-
-    public synchronized int getCurrentLowestPriority() {
-        return currentLowestPriority;
     }
 
     public boolean recoverMessageReference(MessageId messageReference) throws Exception {
@@ -80,13 +70,13 @@ class TopicStorePrefetch extends Abstrac
         messageEvaluationContext.setMessageReference(message);
         if (this.subscription.matches(message, messageEvaluationContext)) {
             recovered = super.recoverMessage(message, cached);
-            if (recovered) {
-                currentLowestPriority = Math.min(currentLowestPriority, message.getPriority());                
+            if (recovered && !cached) {
+                lastRecoveredPriority = message.getPriority();
             }
         }
         return recovered;      
     }
-    
+
     @Override
     protected synchronized int getStoreSize() {
         try {
@@ -100,14 +90,9 @@ class TopicStorePrefetch extends Abstrac
     @Override
     protected synchronized boolean isStoreEmpty() {
         try {
-            boolean empty = this.store.isEmpty();
-            if (empty) {
-                resetCurrentLowestPriority();
-            }
-            return empty;
-            
+            return this.store.isEmpty();
         } catch (Exception e) {
-            LOG.error("Failed to get message count", e);
+            LOG.error("Failed to determine if store is empty", e);
             throw new RuntimeException(e);
         }
     }
@@ -119,17 +104,19 @@ class TopicStorePrefetch extends Abstrac
     }
 
     @Override
-    public synchronized void gc() {
-        super.gc();
-        resetCurrentLowestPriority();
-    }
-    
-    @Override
     protected void doFillBatch() throws Exception {
         this.store.recoverNextMessages(clientId, subscriberName,
                 maxBatchSize, this);
     }
 
+    public byte getLastRecoveredPriority() {
+        return lastRecoveredPriority;
+    }
+
+    public final boolean isPaging() {
+        return !isCacheEnabled() && !batchList.isEmpty();
+    }
+
     @Override
     public String toString() {
         return "TopicStorePrefetch(" + clientId + "," + subscriberName + ")" + super.toString();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingDurableSubscriberMessageStoragePolicy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingDurableSubscriberMessageStoragePolicy.java?rev=1095352&r1=1095351&r2=1095352&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingDurableSubscriberMessageStoragePolicy.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingDurableSubscriberMessageStoragePolicy.java Wed Apr 20 10:39:25 2011
@@ -34,6 +34,7 @@ import org.apache.activemq.kaha.Store;
  */
 public class StorePendingDurableSubscriberMessageStoragePolicy implements PendingDurableSubscriberMessageStoragePolicy {
     boolean immediatePriorityDispatch = true;
+    boolean useCache = true;
 
     public boolean isImmediatePriorityDispatch() {
         return immediatePriorityDispatch;
@@ -42,6 +43,7 @@ public class StorePendingDurableSubscrib
     /**
      * Ensure that new higher priority messages will get an immediate dispatch
      * rather than wait for the end of the current cursor batch.
+     * Useful when there is a large message backlog and intermittent high priority messages.
      *
      * @param immediatePriorityDispatch
      */
@@ -49,6 +51,14 @@ public class StorePendingDurableSubscrib
         this.immediatePriorityDispatch = immediatePriorityDispatch;
     }
 
+    public boolean isUseCache() {
+        return useCache;
+    }
+
+    public void setUseCache(boolean useCache) {
+        this.useCache = useCache;
+    }
+
     /**
      * Retrieve the configured pending message storage cursor;
      * @param broker 
@@ -61,6 +71,7 @@ public class StorePendingDurableSubscrib
      */
     public PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String clientId, String name, int maxBatchSize, DurableTopicSubscription sub) {
         StoreDurableSubscriberCursor cursor = new StoreDurableSubscriberCursor(broker,clientId, name, maxBatchSize, sub);
+        cursor.setUseCache(isUseCache());
         cursor.setImmediatePriorityDispatch(isImmediatePriorityDispatch());
         return cursor;
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java?rev=1095352&r1=1095351&r2=1095352&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java Wed Apr 20 10:39:25 2011
@@ -70,7 +70,7 @@ public interface JDBCAdapter {
 
     void doDeleteSubscription(TransactionContext c, ActiveMQDestination destinationName, String clientId, String subscriptionName) throws SQLException, IOException;
 
-    void doDeleteOldMessages(TransactionContext c, boolean isPrioritizedMessages) throws SQLException, IOException;
+    void doDeleteOldMessages(TransactionContext c) throws SQLException, IOException;
 
     long doGetLastMessageStoreSequenceId(TransactionContext c) throws SQLException, IOException;
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java?rev=1095352&r1=1095351&r2=1095352&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java Wed Apr 20 10:39:25 2011
@@ -101,7 +101,8 @@ public class JDBCMessageStore extends Ab
         // Get a connection and insert the message into the DB.
         TransactionContext c = persistenceAdapter.getTransactionContext(context);
         try {      
-            adapter.doAddMessage(c,sequenceId, messageId, destination, data, message.getExpiration(), message.getPriority());
+            adapter.doAddMessage(c,sequenceId, messageId, destination, data, message.getExpiration(),
+                    this.isPrioritizedMessages() ? message.getPriority() : 0);
         } catch (SQLException e) {
             JDBCPersistenceAdapter.log("JDBC Failure: ", e);
             throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java?rev=1095352&r1=1095351&r2=1095352&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java Wed Apr 20 10:39:25 2011
@@ -334,8 +334,7 @@ public class JDBCPersistenceAdapter exte
         try {
             LOG.debug("Cleaning up old messages.");
             c = getTransactionContext();
-            getAdapter().doDeleteOldMessages(c, false);
-            getAdapter().doDeleteOldMessages(c, true);
+            getAdapter().doDeleteOldMessages(c);
         } catch (IOException e) {
             LOG.warn("Old message cleanup failed due to: " + e, e);
         } catch (SQLException e) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java?rev=1095352&r1=1095351&r2=1095352&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java Wed Apr 20 10:39:25 2011
@@ -183,11 +183,11 @@ public class JDBCTopicMessageStore exten
         }
 
         public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
-            if (delegate.hasSpace()) {
+            if (delegate.hasSpace() && recoveredCount < maxMessages) {
                 Message msg = (Message) wireFormat.unmarshal(new ByteSequence(data));
                 msg.getMessageId().setBrokerSequenceId(sequenceId);
+                lastRecovered.recovered = sequenceId;
                 if (delegate.recoverMessage(msg)) {
-                    lastRecovered.recovered = sequenceId;
                     recoveredCount++;
                     return true;
                 }
@@ -236,7 +236,7 @@ public class JDBCTopicMessageStore exten
                     //Duration microDuration = new Duration("recoverNextMessages:loop");
                     adapter.doRecoverNextMessagesWithPriority(c, destination, clientId, subscriptionName,
                         entry.recovered, entry.priority, maxReturned, recoveredAwareListener);
-                    //microDuration.end(entry);
+                    //microDuration.end(new String(entry + " recoveredCount:" + recoveredAwareListener.recoveredCount));
                     if (recoveredAwareListener.stalled()) {
                         if (recoveredAwareListener.complete()) {
                             break;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java?rev=1095352&r1=1095351&r2=1095352&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java Wed Apr 20 10:39:25 2011
@@ -56,7 +56,6 @@ public class Statements {
     private String findAllDestinationsStatement;
     private String removeAllMessagesStatement;
     private String removeAllSubscriptionsStatement;
-    private String deleteOldMessagesStatement;
     private String[] createSchemaStatements;
     private String[] dropSchemaStatements;
     private String lockCreateStatement;
@@ -379,34 +378,17 @@ public class Statements {
     public String getDeleteOldMessagesStatementWithPriority() {
         if (deleteOldMessagesStatementWithPriority == null) {
             deleteOldMessagesStatementWithPriority = "DELETE FROM " + getFullMessageTableName()
-                                         + " WHERE ( EXPIRATION<>0 AND EXPIRATION<?)"
-                                         + " OR (ID <= "
+                                         + " WHERE (PRIORITY=? AND ID <= "
                                          + "     ( SELECT min(" + getFullAckTableName() + ".LAST_ACKED_ID)"
                                          + "       FROM " + getFullAckTableName() + " WHERE "
                                          +          getFullAckTableName() + ".CONTAINER="
                                          +          getFullMessageTableName() + ".CONTAINER"
-                                         + "        AND " + getFullAckTableName() + ".PRIORITY=" + getFullMessageTableName() + ".PRIORITY )"
+                                         + "        AND " + getFullAckTableName() + ".PRIORITY=?)"
                                          + "   )";
         }
         return deleteOldMessagesStatementWithPriority;
     }
 
-    public String getDeleteOldMessagesStatement() {
-        if (deleteOldMessagesStatement == null) {
-            deleteOldMessagesStatement = "DELETE FROM " + getFullMessageTableName()
-                                         + " WHERE ( EXPIRATION<>0 AND EXPIRATION<?)"
-                                         + " OR (ID <= "
-                                         + "     ( SELECT min(" + getFullAckTableName() + ".LAST_ACKED_ID)"
-                                         + "       FROM " + getFullAckTableName() + " WHERE "
-                                         +          getFullAckTableName() + ".CONTAINER="
-                                         +          getFullMessageTableName() + ".CONTAINER )"
-                                         + "   )";
-
-        }
-        return deleteOldMessagesStatement;
-    }
-
-
     public String getLockCreateStatement() {
         if (lockCreateStatement == null) {
             lockCreateStatement = "SELECT * FROM " + getFullLockTableName();
@@ -654,12 +636,8 @@ public class Statements {
         this.createSchemaStatements = createSchemaStatments;
     }
 
-    public void setDeleteOldMessagesStatement(String deleteOldMessagesStatment) {
-        this.deleteOldMessagesStatement = deleteOldMessagesStatment;
-    }
-
-    public void setDeleteOldMessagesStatementWithPriority(String deleteOldMessagesStatmentWithPriority) {
-        this.deleteOldMessagesStatementWithPriority = deleteOldMessagesStatmentWithPriority;
+    public void setDeleteOldMessagesStatementWithPriority(String deleteOldMessagesStatementWithPriority) {
+        this.deleteOldMessagesStatementWithPriority = deleteOldMessagesStatementWithPriority;
     }
 
     public void setDeleteSubscriptionStatement(String deleteSubscriptionStatment) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/AxionJDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/AxionJDBCAdapter.java?rev=1095352&r1=1095351&r2=1095352&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/AxionJDBCAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/AxionJDBCAdapter.java Wed Apr 20 10:39:25 2011
@@ -54,7 +54,6 @@ public class AxionJDBCAdapter extends St
                 + ", PRIMARY KEY ( CONTAINER, CLIENT_ID, SUB_NAME))"
         };
         statements.setCreateSchemaStatements(createStatements);
-        statements.setDeleteOldMessagesStatement("DELETE FROM " + statements.getFullMessageTableName() + " WHERE ( EXPIRATION<>0 AND EXPIRATION<?)");
         statements.setLongDataType("LONG");
         statements.setSequenceDataType("LONG");
         

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java?rev=1095352&r1=1095351&r2=1095352&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java Wed Apr 20 10:39:25 2011
@@ -539,7 +539,7 @@ public class DefaultJDBCAdapter implemen
         cleanupExclusiveLock.readLock().lock();
         try {
             s = c.getConnection().prepareStatement(this.statements.getFindDurableSubMessagesByPriorityStatement());
-            s.setMaxRows(maxRows);
+            s.setMaxRows(Math.max(maxReturned * 2, maxRows));
             s.setString(1, destination.getQualifiedName());
             s.setString(2, clientId);
             s.setString(3, subscriptionName);
@@ -739,20 +739,18 @@ public class DefaultJDBCAdapter implemen
         }
     }
 
-    public void doDeleteOldMessages(TransactionContext c, boolean isPrioritizedMessages) throws SQLException, IOException {
+    int priorityIterator = 0;
+    public void doDeleteOldMessages(TransactionContext c) throws SQLException, IOException {
         PreparedStatement s = null;
         cleanupExclusiveLock.writeLock().lock();
         try {
-            if (isPrioritizedMessages) {
-                LOG.debug("Executing SQL: " + this.statements.getDeleteOldMessagesStatementWithPriority());
-                s = c.getConnection().prepareStatement(this.statements.getDeleteOldMessagesStatementWithPriority());
-            } else {
-                LOG.debug("Executing SQL: " + this.statements.getDeleteOldMessagesStatement());
-                s = c.getConnection().prepareStatement(this.statements.getDeleteOldMessagesStatement());
-            }
-            s.setLong(1, System.currentTimeMillis());
+            LOG.debug("Executing SQL: " + this.statements.getDeleteOldMessagesStatementWithPriority());
+            s = c.getConnection().prepareStatement(this.statements.getDeleteOldMessagesStatementWithPriority());
+            int priority = priorityIterator++%10;
+            s.setInt(1, priority);
+            s.setInt(2, priority);
             int i = s.executeUpdate();
-            LOG.debug("Deleted " + i + " old message(s).");
+            LOG.debug("Deleted " + i + " old message(s) at priority: " + priority);
         } finally {
             cleanupExclusiveLock.writeLock().unlock();
             close(s);
@@ -1005,7 +1003,7 @@ public class DefaultJDBCAdapter implemen
             } catch (Throwable ignore) {
             }
         }
-    }  */  
+    }  */
 
     public long doGetLastProducerSequenceId(TransactionContext c, ProducerId id)
             throws SQLException, IOException {

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java?rev=1095352&r1=1095351&r2=1095352&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java Wed Apr 20 10:39:25 2011
@@ -33,6 +33,7 @@ import org.apache.activemq.broker.Broker
 import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
 import org.apache.activemq.broker.region.policy.StorePendingDurableSubscriberMessageStoragePolicy;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
@@ -75,10 +76,19 @@ abstract public class MessagePriorityTes
         StorePendingDurableSubscriberMessageStoragePolicy durableSubPending =
                 new StorePendingDurableSubscriberMessageStoragePolicy();
         durableSubPending.setImmediatePriorityDispatch(immediatePriorityDispatch);
+        durableSubPending.setUseCache(useCache);
         policy.setPendingDurableSubscriberPolicy(durableSubPending);
         PolicyMap policyMap = new PolicyMap();
         policyMap.put(new ActiveMQQueue("TEST"), policy);
         policyMap.put(new ActiveMQTopic("TEST"), policy);
+
+        // do not process expired for one test
+        PolicyEntry ignoreExpired = new PolicyEntry();
+        SharedDeadLetterStrategy ignoreExpiredStrategy = new SharedDeadLetterStrategy();
+        ignoreExpiredStrategy.setProcessExpired(false);
+        ignoreExpired.setDeadLetterStrategy(ignoreExpiredStrategy);
+        policyMap.put(new ActiveMQTopic("TEST_CLEANUP_NO_PRIORITY"), ignoreExpired);
+
         broker.setDestinationPolicy(policyMap);
         broker.start();
         broker.waitUntilStarted();
@@ -305,7 +315,7 @@ abstract public class MessagePriorityTes
         int lowLowCount = 0;
         for (int i=0; i<numToProduce; i++) {
             Message msg = sub.receive(15000);
-            LOG.info("received i=" + i + ", " + (msg!=null? msg.getJMSMessageID() + "-" + msg.getJMSPriority() : null));
+            LOG.info("received i=" + i + ", " + (msg!=null? msg.getJMSMessageID() + ", priority:" + msg.getJMSPriority() : null));
             assertNotNull("Message " + i + " was null", msg);
             assertEquals("Message " + i + " has wrong priority", LOW_PRI+1, msg.getJMSPriority());
             assertTrue("not duplicate ", dups[i] == 0);
@@ -352,5 +362,178 @@ abstract public class MessagePriorityTes
             assertEquals("Message " + i + " has wrong priority", LOW_PRI, msg.getJMSPriority());
         }
     }
-    
+
+
+    public void initCombosForTestHighPriorityDeliveryInterleaved() {
+        addCombinationValues("useCache", new Object[] {Boolean.TRUE, Boolean.FALSE});
+    }
+
+    public void testHighPriorityDeliveryInterleaved() throws Exception {
+
+        // get zero prefetch
+        ActiveMQPrefetchPolicy prefetch = new ActiveMQPrefetchPolicy();
+        prefetch.setAll(0);
+        factory.setPrefetchPolicy(prefetch);
+        conn.close();
+        conn = factory.createConnection();
+        conn.setClientID("priority");
+        conn.start();
+        sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        ActiveMQTopic topic = (ActiveMQTopic)sess.createTopic("TEST");
+        final String subName = "priorityDisconnect";
+        TopicSubscriber sub = sess.createDurableSubscriber(topic, subName);
+        sub.close();
+
+        ProducerThread producerThread = new ProducerThread(topic, 1, HIGH_PRI);
+        producerThread.run();
+
+        producerThread.setMessagePriority(HIGH_PRI -1);
+        producerThread.setMessageCount(1);
+        producerThread.run();
+
+        producerThread.setMessagePriority(LOW_PRI);
+        producerThread.setMessageCount(1);
+        producerThread.run();
+        LOG.info("Ordered priority messages sent");
+
+        sub = sess.createDurableSubscriber(topic, subName);
+        int count = 0;
+
+        Message msg = sub.receive(15000);
+        assertNotNull("Message was null", msg);
+        LOG.info("received " + msg.getJMSMessageID() + ", priority:" + msg.getJMSPriority());
+        assertEquals("Message has wrong priority", HIGH_PRI, msg.getJMSPriority());
+
+        producerThread.setMessagePriority(LOW_PRI+1);
+        producerThread.setMessageCount(1);
+        producerThread.run();
+
+        msg = sub.receive(15000);
+        assertNotNull("Message was null", msg);
+        LOG.info("received " + msg.getJMSMessageID() + ", priority:" + msg.getJMSPriority());
+        assertEquals("high priority", HIGH_PRI -1, msg.getJMSPriority());
+
+        msg = sub.receive(15000);
+        assertNotNull("Message was null", msg);
+        LOG.info("received hi? : " + msg);
+        assertEquals("high priority", LOW_PRI +1, msg.getJMSPriority());
+
+        msg = sub.receive(15000);
+        assertNotNull("Message was null", msg);
+        LOG.info("received hi? : " + msg);
+        assertEquals("high priority", LOW_PRI, msg.getJMSPriority());
+
+        msg = sub.receive(4000);
+        assertNull("Message was null", msg);
+    }
+
+    // immediatePriorityDispatch is only relevant when cache is exhausted
+    public void initCombosForTestHighPriorityDeliveryThroughBackLog() {
+        addCombinationValues("useCache", new Object[] {Boolean.FALSE});
+        addCombinationValues("immediatePriorityDispatch", new Object[] {Boolean.TRUE});
+    }
+
+    public void testHighPriorityDeliveryThroughBackLog() throws Exception {
+
+        // get zero prefetch
+        ActiveMQPrefetchPolicy prefetch = new ActiveMQPrefetchPolicy();
+        prefetch.setAll(0);
+        factory.setPrefetchPolicy(prefetch);
+        conn.close();
+        conn = factory.createConnection();
+        conn.setClientID("priority");
+        conn.start();
+        sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        ActiveMQTopic topic = (ActiveMQTopic)sess.createTopic("TEST");
+        final String subName = "priorityDisconnect";
+        TopicSubscriber sub = sess.createDurableSubscriber(topic, subName);
+        sub.close();
+
+        ProducerThread producerThread = new ProducerThread(topic, 600, LOW_PRI);
+        producerThread.run();
+
+
+        sub = sess.createDurableSubscriber(topic, subName);
+        int count = 0;
+
+        for (;count < 300; count++) {
+            Message msg = sub.receive(15000);
+            assertNotNull("Message was null", msg);
+            assertEquals("high priority", LOW_PRI, msg.getJMSPriority());
+        }
+
+        producerThread.setMessagePriority(HIGH_PRI);
+        producerThread.setMessageCount(1);
+        producerThread.run();
+
+        Message msg = sub.receive(15000);
+        assertNotNull("Message was null", msg);
+        assertEquals("high priority", HIGH_PRI, msg.getJMSPriority());
+
+        for (;count < 600; count++) {
+            msg = sub.receive(15000);
+            assertNotNull("Message was null", msg);
+            assertEquals("high priority", LOW_PRI, msg.getJMSPriority());
+        }
+    }
+
+
+    public void initCombosForTestHighPriorityNonDeliveryThroughBackLog() {
+        addCombinationValues("useCache", new Object[] {Boolean.FALSE});
+        addCombinationValues("immediatePriorityDispatch", new Object[] {Boolean.FALSE});
+    }
+
+    public void testHighPriorityNonDeliveryThroughBackLog() throws Exception {
+
+        // get zero prefetch
+        ActiveMQPrefetchPolicy prefetch = new ActiveMQPrefetchPolicy();
+        prefetch.setAll(0);
+        factory.setPrefetchPolicy(prefetch);
+        conn.close();
+        conn = factory.createConnection();
+        conn.setClientID("priority");
+        conn.start();
+        sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        ActiveMQTopic topic = (ActiveMQTopic)sess.createTopic("TEST");
+        final String subName = "priorityDisconnect";
+        TopicSubscriber sub = sess.createDurableSubscriber(topic, subName);
+        sub.close();
+
+        ProducerThread producerThread = new ProducerThread(topic, 600, LOW_PRI);
+        producerThread.run();
+
+
+        sub = sess.createDurableSubscriber(topic, subName);
+        int count = 0;
+
+        for (;count < 300; count++) {
+            Message msg = sub.receive(15000);
+            assertNotNull("Message was null", msg);
+            assertEquals("high priority", LOW_PRI, msg.getJMSPriority());
+        }
+
+        producerThread.setMessagePriority(HIGH_PRI);
+        producerThread.setMessageCount(1);
+        producerThread.run();
+
+        for (;count < 400; count++) {
+            Message msg = sub.receive(15000);
+            assertNotNull("Message was null", msg);
+            assertEquals("high priority", LOW_PRI, msg.getJMSPriority());
+        }
+
+        Message msg = sub.receive(15000);
+        assertNotNull("Message was null", msg);
+        assertEquals("high priority", HIGH_PRI, msg.getJMSPriority());
+
+        for (;count < 600; count++) {
+            msg = sub.receive(15000);
+            assertNotNull("Message was null", msg);
+            assertEquals("high priority", LOW_PRI, msg.getJMSPriority());
+        }
+    }
+
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java?rev=1095352&r1=1095351&r2=1095352&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java Wed Apr 20 10:39:25 2011
@@ -17,9 +17,13 @@
 
 package org.apache.activemq.store.jdbc;
 
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.Statement;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Vector;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import javax.jms.Connection;
 import javax.jms.DeliveryMode;
@@ -30,9 +34,11 @@ import javax.jms.Session;
 import javax.jms.TextMessage;
 import javax.jms.TopicSubscriber;
 import junit.framework.Test;
+import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.store.MessagePriorityTest;
 import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.util.ThreadTracker;
 import org.apache.activemq.util.Wait;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,10 +48,11 @@ public class JDBCMessagePriorityTest ext
 
     private static final Logger LOG = LoggerFactory.getLogger(JDBCMessagePriorityTest.class);
     EmbeddedDataSource dataSource;
+    JDBCPersistenceAdapter jdbc;
 
     @Override
     protected PersistenceAdapter createPersistenceAdapter(boolean delete) throws Exception {
-        JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter();
+        jdbc = new JDBCPersistenceAdapter();
         dataSource = new EmbeddedDataSource();
         dataSource.setDatabaseName("derbyDb");
         dataSource.setCreateDatabase("create");
@@ -136,10 +143,12 @@ public class JDBCMessagePriorityTest ext
         final int maxPriority = 5;
 
         final AtomicInteger[] messageCounts = new AtomicInteger[maxPriority];
+        final long[] messageIds = new long[maxPriority];
         Vector<ProducerThread> producers = new Vector<ProducerThread>();
         for (int priority = 0; priority < maxPriority; priority++) {
             producers.add(new ProducerThread(topic, MSG_NUM, priority));
             messageCounts[priority] = new AtomicInteger(0);
+            messageIds[priority] = 1l;
         }
 
         for (ProducerThread producer : producers) {
@@ -154,9 +163,12 @@ public class JDBCMessagePriorityTest ext
             LOG.debug("received i=" + i + ", m=" + (msg != null ?
                     msg.getJMSMessageID() + ", priority: " + msg.getJMSPriority()
                     : null));
+            assertNotNull("Message " + i + " was null, counts: " + Arrays.toString(messageCounts), msg);
             assertNull("no duplicate message failed on : " + msg.getJMSMessageID(), dups.put(msg.getJMSMessageID(), subName));
-            assertNotNull("Message " + i + " was null", msg);
             messageCounts[msg.getJMSPriority()].incrementAndGet();
+            assertEquals("message is in order : " + msg,
+                    messageIds[msg.getJMSPriority()],((ActiveMQMessage)msg).getMessageId().getProducerSequenceId());
+            messageIds[msg.getJMSPriority()]++;
             if (i > 0 && i % closeFrequency == 0) {
                 LOG.info("Closing durable sub.. on: " + i + ", counts: " + Arrays.toString(messageCounts));
                 sub.close();
@@ -273,6 +285,162 @@ public class JDBCMessagePriorityTest ext
         assertEquals("got all messages", TO_SEND * 2, count.get());
     }
 
+    public void testCleanupPriorityDestination() throws Exception {
+        assertEquals("no messages pending", 0, messageTableCount());
+
+        ActiveMQTopic topic = (ActiveMQTopic) sess.createTopic("TEST");
+        final String subName = "priorityConcurrent";
+        Connection consumerConn = factory.createConnection();
+        consumerConn.setClientID("subName");
+        consumerConn.start();
+        Session consumerSession = consumerConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        TopicSubscriber sub = consumerSession.createDurableSubscriber(topic, subName);
+        sub.close();
+
+        MessageProducer messageProducer = sess.createProducer(topic);
+        Message message = sess.createTextMessage();
+        message.setJMSPriority(2);
+        messageProducer.send(message, DeliveryMode.PERSISTENT, message.getJMSPriority(), 0);
+        message.setJMSPriority(5);
+        messageProducer.send(message, DeliveryMode.PERSISTENT, message.getJMSPriority(), 0);
+
+        assertEquals("two messages pending", 2, messageTableCount());
+
+        sub = consumerSession.createDurableSubscriber(topic, subName);
+
+        message = sub.receive(5000);
+        assertEquals("got high priority", 5, message.getJMSPriority());
+
+        waitForAck(5);
+
+        for (int i=0; i<10; i++) {
+            jdbc.cleanup();
+        }
+        assertEquals("one messages pending", 1, messageTableCount());
+
+        message = sub.receive(5000);
+        assertEquals("got high priority", 2, message.getJMSPriority());
+
+        waitForAck(2);
+
+        for (int i=0; i<10; i++) {
+            jdbc.cleanup();
+        }
+        assertEquals("no messages pending", 0, messageTableCount());
+    }
+
+
+    public void testCleanupNonPriorityDestination() throws Exception {
+        assertEquals("no messages pending", 0, messageTableCount());
+
+        ActiveMQTopic topic = (ActiveMQTopic) sess.createTopic("TEST_CLEANUP_NO_PRIORITY");
+        final String subName = "subName";
+        Connection consumerConn = factory.createConnection();
+        consumerConn.setClientID("subName");
+        consumerConn.start();
+        Session consumerSession = consumerConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        TopicSubscriber sub = consumerSession.createDurableSubscriber(topic, subName);
+        sub.close();
+
+        MessageProducer messageProducer = sess.createProducer(topic);
+        Message message = sess.createTextMessage("ToExpire");
+        messageProducer.send(message, DeliveryMode.PERSISTENT, Message.DEFAULT_PRIORITY, 4000);
+
+        message = sess.createTextMessage("A");
+        messageProducer.send(message);
+        message = sess.createTextMessage("B");
+        messageProducer.send(message);
+        message = null;
+
+        assertEquals("three messages pending", 3, messageTableCount());
+
+        // let first message expire
+        TimeUnit.SECONDS.sleep(5);
+
+        sub = consumerSession.createDurableSubscriber(topic, subName);
+        message = sub.receive(5000);
+        assertNotNull("got message", message);
+        LOG.info("Got: " + message);
+
+        waitForAck(0, 1);
+
+        for (int i=0; i<10; i++) {
+            jdbc.cleanup();
+        }
+        assertEquals("one messages pending", 1, messageTableCount());
+
+        message = sub.receive(5000);
+        assertNotNull("got message two", message);
+        LOG.info("Got: " + message);
+
+        waitForAck(0, 2);
+
+        for (int i=0; i<10; i++) {
+            jdbc.cleanup();
+        }
+        assertEquals("no messages pending", 0, messageTableCount());
+    }
+
+    private int messageTableCount() throws Exception {
+        int count = -1;
+        java.sql.Connection c = dataSource.getConnection();
+        try {
+            PreparedStatement s = c.prepareStatement("SELECT COUNT(*) FROM ACTIVEMQ_MSGS");
+            ResultSet rs = s.executeQuery();
+            if (rs.next()) {
+                count = rs.getInt(1);
+            }
+        } finally {
+            if (c!=null) {
+                c.close();
+            }
+        }
+        return count;
+    }
+
+    private void waitForAck(final int priority) throws Exception {
+        waitForAck(priority, 0);
+    }
+
+    private void waitForAck(final int priority, final int minId) throws Exception {
+       assertTrue("got ack for " + priority, Wait.waitFor(new Wait.Condition() {
+           @Override
+           public boolean isSatisified() throws Exception {
+               int id = 0;
+               java.sql.Connection c = dataSource.getConnection();
+               try {
+                    PreparedStatement s = c.prepareStatement("SELECT LAST_ACKED_ID FROM ACTIVEMQ_ACKS WHERE PRIORITY=" + priority);
+                    ResultSet rs = s.executeQuery();
+                    if (rs.next()) {
+                        id = rs.getInt(1);
+                    }
+                } finally {
+                    if (c!=null) {
+                        c.close();
+                    }
+                }
+               return id>minId;
+           }
+       }));
+    }
+
+    private int messageTableDump() throws Exception {
+        int count = -1;
+        java.sql.Connection c = dataSource.getConnection();
+        try {
+            PreparedStatement s = c.prepareStatement("SELECT * FROM ACTIVEMQ_MSGS");
+            ResultSet rs = s.executeQuery();
+            if (rs.next()) {
+                count = rs.getInt(1);
+            }
+        } finally {
+            if (c!=null) {
+                c.close();
+            }
+        }
+        return count;
+    }
+
     public static Test suite() {
         return suite(JDBCMessagePriorityTest.class);
     }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ConcurrentProducerDurableConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ConcurrentProducerDurableConsumerTest.java?rev=1095352&r1=1095351&r2=1095352&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ConcurrentProducerDurableConsumerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ConcurrentProducerDurableConsumerTest.java Wed Apr 20 10:39:25 2011
@@ -23,6 +23,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
@@ -48,6 +49,8 @@ import org.apache.activemq.broker.Broker
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
 //import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
+import org.apache.activemq.broker.region.policy.StorePendingDurableSubscriberMessageStoragePolicy;
+import org.apache.activemq.command.MessageId;
 import org.apache.activemq.util.MessageIdList;
 import org.apache.activemq.util.Wait;
 //import org.apache.commons.dbcp.BasicDataSource;
@@ -113,7 +116,7 @@ public class ConcurrentProducerDurableCo
         });
 
 
-        double[] statsWithActive = produceMessages(destination, 300, 10, session, producer, addConsumerSignal);
+        double[] statsWithActive = produceMessages(destination, 500, 10, session, producer, addConsumerSignal);
 
         LOG.info(" with concurrent activate, ave: " + statsWithActive[1] + ", max: " + statsWithActive[0] + ", multiplier: " + (statsWithActive[0]/ statsWithActive[1]));
 
@@ -130,7 +133,7 @@ public class ConcurrentProducerDurableCo
         LOG.info("Ave time to first message =" + timeToFirstAccumulator/consumers.size());
 
         for (TimedMessageListener listener : consumers.values()) {
-            LOG.info("Ave batch receipt time: " + listener.waitForReceivedLimit(5000) + " max receipt: " + listener.maxReceiptTime);
+            LOG.info("Ave batch receipt time: " + listener.waitForReceivedLimit(10000) + " max receipt: " + listener.maxReceiptTime);
         }
 
         //assertTrue("max (" + statsWithActive[0] + ") within reasonable multiplier of ave (" + statsWithActive[1] + ")",
@@ -249,7 +252,9 @@ public class ConcurrentProducerDurableCo
             for (int j=0; j < toSend; j++) {
                 long singleSendstart = System.currentTimeMillis();
                 TextMessage msg = createTextMessage(session, "" + j);
-                producer.send(msg);
+                // rotate
+                int priority = ((int)count%10);
+                producer.send(msg, DeliveryMode.PERSISTENT, priority, 0);
                 max = Math.max(max, (System.currentTimeMillis() - singleSendstart));
                 if (++count % 500 == 0) {
                     if (addConsumerSignal != null) {
@@ -328,6 +333,12 @@ public class ConcurrentProducerDurableCo
         policy.setPrioritizedMessages(true);
         policy.setMaxPageSize(500);
 
+        StorePendingDurableSubscriberMessageStoragePolicy durableSubPending =
+                new StorePendingDurableSubscriberMessageStoragePolicy();
+        durableSubPending.setImmediatePriorityDispatch(true);
+        durableSubPending.setUseCache(true);
+        policy.setPendingDurableSubscriberPolicy(durableSubPending);
+
         PolicyMap policyMap = new PolicyMap();
         policyMap.setDefaultEntry(policy);
         brokerService.setDestinationPolicy(policyMap);
@@ -390,19 +401,27 @@ public class ConcurrentProducerDurableCo
         long batchReceiptAccumulator = 0;
         long maxReceiptTime = 0;
         AtomicLong count = new AtomicLong(0);
+        Map<Integer, MessageIdList> messageLists = new ConcurrentHashMap<Integer, MessageIdList>(new HashMap<Integer, MessageIdList>());
 
         @Override
         public void onMessage(Message message) {
             final long current = System.currentTimeMillis();
             final long duration = current - mark;
             receiptAccumulator += duration;
-            allMessagesList.onMessage(message);
+            int priority = 0;
+            try {
+                priority = message.getJMSPriority();
+            } catch (JMSException ignored) {}
+            if (!messageLists.containsKey(priority)) {
+                messageLists.put(priority, new MessageIdList());
+            }
+            messageLists.get(priority).onMessage(message);
             if (count.incrementAndGet() == 1) {
                 firstReceipt = duration;
                 firstReceiptLatch.countDown();
                 LOG.info("First receipt in " + firstReceipt + "ms");
             } else if (count.get() % batchSize == 0) {
-                LOG.info("Consumed " + batchSize + " in " + batchReceiptAccumulator + "ms");
+                LOG.info("Consumed " + count.get() + " in " + batchReceiptAccumulator + "ms" + ", priority:" + priority);
                 batchReceiptAccumulator=0;
             }
             maxReceiptTime = Math.max(maxReceiptTime, duration);
@@ -427,9 +446,36 @@ public class ConcurrentProducerDurableCo
                     throw new RuntimeException("Expired waiting for X messages, " + limit);
                 }
                 TimeUnit.SECONDS.sleep(2);
+                String missing = findFirstMissingMessage();
+                if (missing != null) {
+                    LOG.info("first missing = " + missing);
+                    throw new RuntimeException("We have a missing message. " + missing);
+                }
+
             }
             return receiptAccumulator/(limit/batchSize);
         }
+
+        private String findFirstMissingMessage() {
+            MessageId current = new MessageId();
+            for (MessageIdList priorityList : messageLists.values()) {
+                MessageId previous = null;
+                for (String id : priorityList.getMessageIds()) {
+                    current.setValue(id);
+                    if (previous == null) {
+                        previous = current.copy();
+                    } else {
+                        if (current.getProducerSequenceId() - 1 != previous.getProducerSequenceId() &&
+                            current.getProducerSequenceId() - 10 !=  previous.getProducerSequenceId()) {
+                                return "Missing next after: " + previous + ", got: " + current;
+                        } else {
+                            previous = current.copy();
+                        }
+                    }
+                }
+            }
+            return null;
+        }
     }
 
 }