You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2011/02/22 19:51:54 UTC

svn commit: r1073453 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/broker/region/cursors/ main/java/org/apache/activemq/store/jdbc/ test/java/org/apache/activemq/usecases/

Author: gtully
Date: Tue Feb 22 18:51:54 2011
New Revision: 1073453

URL: http://svn.apache.org/viewvc?rev=1073453&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3188 - Full table scan for durable subs in jdbc store when priority enabled; very slow with large message backlog
added more state to the topic message store such that it can ask the db for a single priority at a time which is indexed. This avoids a full table scan.
send rate with active durable subs vs inactive durable subs is now in the region of 6x from 40x. validation test included.

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ConcurrentProducerDurableConsumerTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?rev=1073453&r1=1073452&r2=1073453&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java Tue Feb 22 18:51:54 2011
@@ -131,7 +131,7 @@ public class DurableTopicSubscription ex
                     topic.activate(context, this);
                 }
             }
-            synchronized (pending) {
+            synchronized (pendingLock) {
                 pending.setSystemUsage(memoryManager);
                 pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
                 pending.setMaxAuditDepth(getMaxAuditDepth());

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=1073453&r1=1073452&r2=1073453&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Tue Feb 22 18:51:54 2011
@@ -64,7 +64,7 @@ public abstract class PrefetchSubscripti
     private int maxProducersToAudit=32;
     private int maxAuditDepth=2048;
     protected final SystemUsage usageManager;
-    private final Object pendingLock = new Object();
+    protected final Object pendingLock = new Object();
     private final Object dispatchLock = new Object();
     private final CountDownLatch okForAckAsDispatchDone = new CountDownLatch(1);
     

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java?rev=1073453&r1=1073452&r2=1073453&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java Tue Feb 22 18:51:54 2011
@@ -45,7 +45,7 @@ public abstract class AbstractPendingMes
     protected boolean enableAudit=true;
     protected ActiveMQMessageAudit audit;
     protected boolean useCache=true;
-    protected boolean cacheEnabled=true;
+    private boolean cacheEnabled=true;
     private boolean started=false;
     protected MessageReference last = null;
     protected final boolean prioritizedMessages;
@@ -329,7 +329,11 @@ public abstract class AbstractPendingMes
 
     }
 
-    public boolean isCacheEnabled() {
+    public synchronized boolean isCacheEnabled() {
         return cacheEnabled;
     }
+
+    public synchronized void setCacheEnabled(boolean val) {
+        cacheEnabled = val;
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java?rev=1073453&r1=1073452&r2=1073453&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java Tue Feb 22 18:51:54 2011
@@ -45,7 +45,7 @@ public abstract class AbstractStoreCurso
         this.regionDestination=destination;
         if (this.prioritizedMessages) {
             this.batchList= new PrioritizedPendingList();
-        }else {
+        } else {
             this.batchList = new OrderedPendingList();
         }
     }
@@ -58,7 +58,7 @@ public abstract class AbstractStoreCurso
             resetBatch();
             this.size = getStoreSize();
             this.storeHasMessages=this.size > 0;
-            cacheEnabled = !this.storeHasMessages&&useCache;
+            setCacheEnabled(!this.storeHasMessages&&useCache);
         } 
     }
     
@@ -95,8 +95,7 @@ public abstract class AbstractStoreCurso
              * it will be a duplicate - but should be ignored
              */
             if (LOG.isTraceEnabled()) {
-                LOG.trace(regionDestination.getActiveMQDestination().getPhysicalName()
-                        + " cursor got duplicate: " + message.getMessageId() + ", " + message.getPriority());
+                LOG.trace(this + " - cursor got duplicate: " + message.getMessageId() + ", " + message.getPriority());
             }
         }
         return recovered;
@@ -108,7 +107,7 @@ public abstract class AbstractStoreCurso
             try {
                 fillBatch();
             } catch (Exception e) {
-                LOG.error("Failed to fill batch", e);
+                LOG.error(this + " - Failed to fill batch", e);
                 throw new RuntimeException(e);
             }
         }
@@ -145,7 +144,7 @@ public abstract class AbstractStoreCurso
             try {
                 fillBatch();
             } catch (Exception e) {
-                LOG.error("Failed to fill batch", e);
+                LOG.error(this + " - Failed to fill batch", e);
                 throw new RuntimeException(e);
             }
         }
@@ -169,24 +168,22 @@ public abstract class AbstractStoreCurso
     
     public final synchronized void addMessageLast(MessageReference node) throws Exception {
         if (hasSpace()) {
-            if (!cacheEnabled && size==0 && isStarted() && useCache) {
+            if (!isCacheEnabled() && size==0 && isStarted() && useCache) {
                 if (LOG.isTraceEnabled()) {
-                    LOG.trace(regionDestination.getActiveMQDestination().getPhysicalName()
-                            + " enabling cache for empty store " + node.getMessageId());
+                    LOG.trace(this + " - enabling cache for empty store " + node.getMessageId());
                 }
-                cacheEnabled=true;
+                setCacheEnabled(true);
             }
-            if (cacheEnabled) {
+            if (isCacheEnabled()) {
                 recoverMessage(node.getMessage(),true);
                 lastCachedId = node.getMessageId();
             }
-        } else if (cacheEnabled) {
-            cacheEnabled=false;
+        } else if (isCacheEnabled()) {
+            setCacheEnabled(false);
             // sync with store on disabling the cache
             if (lastCachedId != null) {
                 if (LOG.isTraceEnabled()) {
-                    LOG.trace(regionDestination.getActiveMQDestination().getPhysicalName()
-                            + " disabling cache on size:" + size
+                    LOG.trace(this + " - disabling cache"
                             + ", lastCachedId: " + lastCachedId
                             + " current node Id: " + node.getMessageId());
                 }
@@ -203,7 +200,7 @@ public abstract class AbstractStoreCurso
 
     
     public final synchronized void addMessageFirst(MessageReference node) throws Exception {
-        cacheEnabled=false;
+        setCacheEnabled(false);
         size++;
     }
 
@@ -221,7 +218,7 @@ public abstract class AbstractStoreCurso
     
     public final synchronized void remove(MessageReference node) {
         size--;
-        cacheEnabled=false;
+        setCacheEnabled(false);
         batchList.remove(node);
     }
     
@@ -240,7 +237,7 @@ public abstract class AbstractStoreCurso
         batchList.clear();
         clearIterator(false);
         batchResetNeeded = true;
-        this.cacheEnabled=false;
+        setCacheEnabled(false);
     }
 
     @Override
@@ -251,8 +248,7 @@ public abstract class AbstractStoreCurso
 
     protected final synchronized void fillBatch() {
         if (LOG.isTraceEnabled()) {
-            LOG.trace("fillBatch - batchResetNeeded=" + batchResetNeeded
-                    + ", hasMessages=" + this.storeHasMessages + ", size=" + this.size + ", cacheEnabled=" + this.cacheEnabled);
+            LOG.trace(this + " - fillBatch");
         }
         if (batchResetNeeded) {
             resetBatch();
@@ -263,7 +259,7 @@ public abstract class AbstractStoreCurso
             try {
                 doFillBatch();
             } catch (Exception e) {
-                LOG.error("Failed to fill batch", e);
+                LOG.error(this + " - Failed to fill batch", e);
                 throw new RuntimeException(e);
             }
             if (!this.batchList.isEmpty() || !hadSpace) {
@@ -290,7 +286,11 @@ public abstract class AbstractStoreCurso
         }
         return size;
     }
-    
+
+    public String toString() {
+        return regionDestination.getActiveMQDestination().getPhysicalName() + ",batchResetNeeded=" + batchResetNeeded
+                    + ",storeHasMessages=" + this.storeHasMessages + ",size=" + this.size + ",cacheEnabled=" + isCacheEnabled();
+    }
     
     protected abstract void doFillBatch() throws Exception;
     

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java?rev=1073453&r1=1073452&r2=1073453&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java Tue Feb 22 18:51:54 2011
@@ -199,7 +199,7 @@ public class FilePendingMessageCursor ex
                     if (hasSpace() || this.store == null) {
                         memoryList.add(node);
                         node.incrementReferenceCount();
-                        cacheEnabled = true;
+                        setCacheEnabled(true);
                         return true;
                     }
                 }
@@ -247,7 +247,7 @@ public class FilePendingMessageCursor ex
                     if (hasSpace()) {
                         memoryList.addFirst(node);
                         node.incrementReferenceCount();
-                        cacheEnabled = true;
+                        setCacheEnabled(true);
                         return;
                     }
                 }
@@ -428,7 +428,7 @@ public class FilePendingMessageCursor ex
 
             }
             memoryList.clear();
-            cacheEnabled = false;
+            setCacheEnabled(false);
         }
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java?rev=1073453&r1=1073452&r2=1073453&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java Tue Feb 22 18:51:54 2011
@@ -187,15 +187,15 @@ public class StoreDurableSubscriberCurso
                 Destination dest = msg.getRegionDestination();
                 TopicStorePrefetch tsp = topics.get(dest);
                 if (tsp != null) {
-                    // cache can be come high priority cache for immediate dispatch
+                    // cache can become high priority cache for immediate dispatch
                     final int priority = msg.getPriority();
-                    if (isStarted() && this.prioritizedMessages && immediatePriorityDispatch && !tsp.cacheEnabled) {
+                    if (isStarted() && this.prioritizedMessages && immediatePriorityDispatch && !tsp.isCacheEnabled()) {
                         if (priority > tsp.getCurrentLowestPriority()) {
                             if (LOG.isTraceEnabled()) {
                                 LOG.trace("enabling cache for cursor on high priority message " + priority
                                         + ", current lowest: " + tsp.getCurrentLowestPriority());
                             }
-                            tsp.cacheEnabled = true;
+                            tsp.setCacheEnabled(true);
                             cacheCurrentLowestPriority = tsp.getCurrentLowestPriority();
                         }
                     } else if (cacheCurrentLowestPriority != UNKNOWN && priority <= cacheCurrentLowestPriority) {
@@ -206,7 +206,7 @@ public class StoreDurableSubscriberCurso
                                     + priority + ", tsp current lowest: " + tsp.getCurrentLowestPriority()
                                     + " cache lowest: " + cacheCurrentLowestPriority);
                         }
-                        tsp.cacheEnabled = false;
+                        tsp.setCacheEnabled(false);
                         cacheCurrentLowestPriority = UNKNOWN;
                     }
                     tsp.addMessageLast(node);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java?rev=1073453&r1=1073452&r2=1073453&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java Tue Feb 22 18:51:54 2011
@@ -297,7 +297,7 @@ public class StoreQueueCursor extends Ab
 
     @Override
     public boolean isCacheEnabled() {
-        cacheEnabled = isUseCache();
+        boolean cacheEnabled = isUseCache();
         if (cacheEnabled) {
             if (persistent != null) {
                 cacheEnabled &= persistent.isCacheEnabled();
@@ -305,6 +305,7 @@ public class StoreQueueCursor extends Ab
             if (nonPersistent != null) {
                 cacheEnabled &= nonPersistent.isCacheEnabled();
             }
+            setCacheEnabled(cacheEnabled);
         }
         return cacheEnabled;
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java?rev=1073453&r1=1073452&r2=1073453&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java Tue Feb 22 18:51:54 2011
@@ -132,6 +132,6 @@ class TopicStorePrefetch extends Abstrac
 
     @Override
     public String toString() {
-        return "TopicStorePrefetch" + System.identityHashCode(this) + "(" + clientId + "," + subscriberName + ")";
+        return "TopicStorePrefetch(" + clientId + "," + subscriberName + ")" + super.toString();
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java?rev=1073453&r1=1073452&r2=1073453&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java Tue Feb 22 18:51:54 2011
@@ -40,6 +40,25 @@ import org.slf4j.LoggerFactory;
  */
 public class JDBCMessageStore extends AbstractMessageStore {
 
+    class Duration {
+        static final int LIMIT = 100;
+        final long start = System.currentTimeMillis();
+        final String name;
+
+        Duration(String name) {
+            this.name = name;
+        }
+        void end() {
+            end(null);
+        }
+        void end(Object o) {
+            long duration = System.currentTimeMillis() - start;
+
+            if (duration > LIMIT) {
+                System.err.println(name + " took a long time: " + duration + "ms " + o);
+            }
+        }
+    }
     private static final Logger LOG = LoggerFactory.getLogger(JDBCMessageStore.class);
     protected final WireFormat wireFormat;
     protected final JDBCAdapter adapter;
@@ -58,7 +77,6 @@ public class JDBCMessageStore extends Ab
     }
     
     public void addMessage(ConnectionContext context, Message message) throws IOException {
-
         MessageId messageId = message.getMessageId();
         if (audit != null && audit.isDuplicate(message)) {
             if (LOG.isDebugEnabled()) {
@@ -90,6 +108,10 @@ public class JDBCMessageStore extends Ab
         } finally {
             c.close();
         }
+        onAdd(sequenceId, message.getPriority());
+    }
+
+    protected void onAdd(long sequenceId, byte priority) {
     }
 
     public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime, String messageRef) throws IOException {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java?rev=1073453&r1=1073452&r2=1073453&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java Tue Feb 22 18:51:54 2011
@@ -18,10 +18,10 @@ package org.apache.activemq.store.jdbc;
 
 import java.io.IOException;
 import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.activemq.ActiveMQMessageAudit;
 import org.apache.activemq.broker.ConnectionContext;
@@ -102,22 +102,120 @@ public class JDBCTopicMessageStore exten
         }
     }
 
-    private class LastRecovered {
-        long sequence = 0;
-        byte priority = 9;
-
-        public void update(long sequence, Message msg) {
-            this.sequence = sequence;
-            this.priority = msg.getPriority();
+    private class LastRecovered implements Iterable<LastRecoveredEntry> {
+        LastRecoveredEntry[] perPriority = new LastRecoveredEntry[10];
+        LastRecovered() {
+            for (int i=0; i<perPriority.length; i++) {
+                perPriority[i] = new LastRecoveredEntry(i);
+            }
+        }
+
+        public void updateStored(long sequence, int priority) {
+            perPriority[priority].stored = sequence;
+        }
+
+        public LastRecoveredEntry defaultPriority() {
+            return perPriority[javax.jms.Message.DEFAULT_PRIORITY];
+        }
+
+        public String toString() {
+            return Arrays.deepToString(perPriority);
+        }
+
+        public Iterator<LastRecoveredEntry> iterator() {
+            return new PriorityIterator();
+        }
+
+        class PriorityIterator implements Iterator<LastRecoveredEntry> {
+            int current = 9;
+            public boolean hasNext() {
+                for (int i=current; i>=0; i--) {
+                    if (perPriority[i].hasMessages()) {
+                        current = i;
+                        return true;
+                    }
+                }
+                return false;
+            }
+
+            public LastRecoveredEntry next() {
+                return perPriority[current];
+            }
+
+            public void remove() {
+                throw new RuntimeException("not implemented");
+            }
+        }
+    }
+
+    private class LastRecoveredEntry {
+        final int priority;
+        long recovered = 0;
+        long stored = Integer.MAX_VALUE;
+
+        public LastRecoveredEntry(int priority) {
+            this.priority = priority;
         }
 
         public String toString() {
-            return "" + sequence + ":" + priority;
+            return priority + "-" + stored + ":" + recovered;
+        }
+
+        public void exhausted() {
+            stored = recovered;
+        }
+
+        public boolean hasMessages() {
+            return stored > recovered;
+        }
+    }
+
+    class LastRecoveredAwareListener implements JDBCMessageRecoveryListener {
+        final MessageRecoveryListener delegate;
+        final int maxMessages;
+        LastRecoveredEntry lastRecovered;
+        int recoveredCount;
+        int recoveredMarker;
+
+        public LastRecoveredAwareListener(MessageRecoveryListener delegate, int maxMessages) {
+            this.delegate = delegate;
+            this.maxMessages = maxMessages;
+        }
+
+        public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
+            if (delegate.hasSpace()) {
+                Message msg = (Message) wireFormat.unmarshal(new ByteSequence(data));
+                msg.getMessageId().setBrokerSequenceId(sequenceId);
+                if (delegate.recoverMessage(msg)) {
+                    lastRecovered.recovered = sequenceId;
+                    recoveredCount++;
+                    return true;
+                }
+            }
+            return false;
+        }
+
+        public boolean recoverMessageReference(String reference) throws Exception {
+            return delegate.recoverMessageReference(new MessageId(reference));
+        }
+
+        public void setLastRecovered(LastRecoveredEntry lastRecovered) {
+            this.lastRecovered = lastRecovered;
+            recoveredMarker = recoveredCount;
+        }
+
+        public boolean complete() {
+            return  !delegate.hasSpace() || recoveredCount == maxMessages;
+        }
+
+        public boolean stalled() {
+            return recoveredMarker == recoveredCount;
         }
     }
 
     public synchronized void recoverNextMessages(final String clientId, final String subscriptionName, final int maxReturned, final MessageRecoveryListener listener)
             throws Exception {
+        //Duration duration = new Duration("recoverNextMessages");
         TransactionContext c = persistenceAdapter.getTransactionContext();
 
         String key = getSubscriptionKey(clientId, subscriptionName);
@@ -125,38 +223,38 @@ public class JDBCTopicMessageStore exten
            subscriberLastRecoveredMap.put(key, new LastRecovered());
         }
         final LastRecovered lastRecovered = subscriberLastRecoveredMap.get(key);        
+        LastRecoveredAwareListener recoveredAwareListener = new LastRecoveredAwareListener(listener, maxReturned);
         try {
-            JDBCMessageRecoveryListener jdbcListener = new JDBCMessageRecoveryListener() {
-                public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
-                    if (listener.hasSpace()) {
-                        Message msg = (Message) wireFormat.unmarshal(new ByteSequence(data));
-                        msg.getMessageId().setBrokerSequenceId(sequenceId);
-                        if (listener.recoverMessage(msg)) {
-                            lastRecovered.update(sequenceId, msg);
-                            return true;
-                        }
-                    }
-                    return false;
-                }
-
-                public boolean recoverMessageReference(String reference) throws Exception {
-                    return listener.recoverMessageReference(new MessageId(reference));
-                }
-
-            };
             if (LOG.isTraceEnabled()) {
                 LOG.trace(key + " existing last recovered: " + lastRecovered);
             }
             if (isPrioritizedMessages()) {
-                adapter.doRecoverNextMessagesWithPriority(c, destination, clientId, subscriptionName,
-                        lastRecovered.sequence, lastRecovered.priority, maxReturned, jdbcListener);
+                Iterator<LastRecoveredEntry> it = lastRecovered.iterator();
+                for ( ; it.hasNext() && !recoveredAwareListener.complete(); ) {
+                    LastRecoveredEntry entry = it.next();
+                    recoveredAwareListener.setLastRecovered(entry);
+                    //Duration microDuration = new Duration("recoverNextMessages:loop");
+                    adapter.doRecoverNextMessagesWithPriority(c, destination, clientId, subscriptionName,
+                        entry.recovered, entry.priority, maxReturned, recoveredAwareListener);
+                    //microDuration.end(entry);
+                    if (recoveredAwareListener.stalled()) {
+                        if (recoveredAwareListener.complete()) {
+                            break;
+                        } else {
+                            entry.exhausted();
+                        }
+                    }
+                }
             } else {
+                LastRecoveredEntry last = lastRecovered.defaultPriority();
+                recoveredAwareListener.setLastRecovered(last);
                 adapter.doRecoverNextMessages(c, destination, clientId, subscriptionName,
-                        lastRecovered.sequence, 0, maxReturned, jdbcListener);
+                        last.recovered, 0, maxReturned, recoveredAwareListener);
             }
             if (LOG.isTraceEnabled()) {
                 LOG.trace(key + " last recovered: " + lastRecovered);
             }
+            //duration.end();
         } catch (SQLException e) {
             JDBCPersistenceAdapter.log("JDBC Failure: ", e);
         } finally {
@@ -168,6 +266,14 @@ public class JDBCTopicMessageStore exten
         subscriberLastRecoveredMap.remove(getSubscriptionKey(clientId, subscriptionName));
     }
 
+    protected void onAdd(long sequenceId, byte priority) {
+        // update last recovered state
+        for (LastRecovered last : subscriberLastRecoveredMap.values()) {
+            last.updateStored(sequenceId, priority);
+        }
+    }
+
+
     public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
         TransactionContext c = persistenceAdapter.getTransactionContext();
         try {
@@ -223,6 +329,7 @@ public class JDBCTopicMessageStore exten
     }
 
     public int getMessageCount(String clientId, String subscriberName) throws IOException {
+        //Duration duration = new Duration("getMessageCount");
         int result = 0;
         TransactionContext c = persistenceAdapter.getTransactionContext();
         try {
@@ -236,6 +343,7 @@ public class JDBCTopicMessageStore exten
         if (LOG.isTraceEnabled()) {
             LOG.trace(clientId + ":" + subscriberName + ", messageCount: " + result);
         }
+        //duration.end();
         return result;
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java?rev=1073453&r1=1073452&r2=1073453&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java Tue Feb 22 18:51:54 2011
@@ -281,13 +281,13 @@ public class Statements {
     
     public String getFindDurableSubMessagesByPriorityStatement() {
         if (findDurableSubMessagesByPriorityStatement == null) {
-            findDurableSubMessagesByPriorityStatement = "SELECT M.ID, M.MSG FROM " + getFullMessageTableName() + " M, "
-                                              + getFullAckTableName() + " D "
+            findDurableSubMessagesByPriorityStatement = "SELECT M.ID, M.MSG FROM " + getFullMessageTableName() + " M,"
+                                              + " " + getFullAckTableName() + " D"
                                               + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
                                               + " AND M.CONTAINER=D.CONTAINER"
                                               + " AND M.PRIORITY=D.PRIORITY AND M.ID > D.LAST_ACKED_ID"
-                                              + " AND ( (M.ID > ?) OR (M.PRIORITY < ?) )" 
-                                              + " ORDER BY M.PRIORITY DESC, M.ID";
+                                              + " AND M.ID > ? AND M.PRIORITY = ?"
+                                              + " ORDER BY M.ID";
         }
         return findDurableSubMessagesByPriorityStatement;
     }    

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ConcurrentProducerDurableConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ConcurrentProducerDurableConsumerTest.java?rev=1073453&r1=1073452&r2=1073453&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ConcurrentProducerDurableConsumerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ConcurrentProducerDurableConsumerTest.java Tue Feb 22 18:51:54 2011
@@ -23,15 +23,19 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.DeliveryMode;
 import javax.jms.Destination;
 import javax.jms.JMSException;
+import javax.jms.Message;
 import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
 import javax.jms.TextMessage;
@@ -43,32 +47,28 @@ import org.apache.activemq.TestSupport;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.broker.region.policy.StorePendingDurableSubscriberMessageStoragePolicy;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ActiveMQTopic;
+//import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
 import org.apache.activemq.util.MessageIdList;
 import org.apache.activemq.util.Wait;
+//import org.apache.commons.dbcp.BasicDataSource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class ConcurrentProducerDurableConsumerTest extends TestSupport {
     private static final Logger LOG = LoggerFactory.getLogger(ConcurrentProducerDurableConsumerTest.class);
-    private int consumerCount = 1;
+    private int consumerCount = 5;
     BrokerService broker;
     protected List<Connection> connections = Collections.synchronizedList(new ArrayList<Connection>());
-    protected Map<MessageConsumer, MessageIdList> consumers = new HashMap<MessageConsumer, MessageIdList>();
+    protected Map<MessageConsumer, TimedMessageListener> consumers = new HashMap<MessageConsumer, TimedMessageListener>();
     protected MessageIdList allMessagesList = new MessageIdList();
     private int messageSize = 1024;
 
-    public void testPlaceHolder() throws Exception {
-    }
-
-    public void x_initCombosForTestSendRateWithActivatingConsumers() throws Exception {
+    public void initCombosForTestSendRateWithActivatingConsumers() throws Exception {
         addCombinationValues("defaultPersistenceAdapter",
                 new Object[]{PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC, PersistenceAdapterChoice.MEM});
     }
 
-    public void x_testSendRateWithActivatingConsumers() throws Exception {
+    public void testSendRateWithActivatingConsumers() throws Exception {
         final Destination destination = createDestination();
         final ConnectionFactory factory = createConnectionFactory();
         startInactiveConsumers(factory, destination);
@@ -78,12 +78,12 @@ public class ConcurrentProducerDurableCo
         MessageProducer producer = createMessageProducer(session, destination);
 
         // preload the durable consumers
-        double[] inactiveConsumerStats = produceMessages(destination, 200, 100, session, producer, null);
+        double[] inactiveConsumerStats = produceMessages(destination, 500, 10, session, producer, null);
         LOG.info("With inactive consumers: ave: " + inactiveConsumerStats[1]
                 + ", max: " + inactiveConsumerStats[0] + ", multiplier: " + (inactiveConsumerStats[0]/inactiveConsumerStats[1]));
 
-        // periodically start a durable sub that is has a backlog
-        final int consumersToActivate = 1;
+        // periodically start a durable sub that has a backlog
+        final int consumersToActivate = 5;
         final Object addConsumerSignal = new Object();
         Executors.newCachedThreadPool(new ThreadFactory() {
             @Override
@@ -96,16 +96,15 @@ public class ConcurrentProducerDurableCo
                 try {
                     MessageConsumer consumer = null;
                     for (int i = 0; i < consumersToActivate; i++) {
-                        LOG.info("Waiting for add signal");
+                        LOG.info("Waiting for add signal from producer...");
                         synchronized (addConsumerSignal) {
                             addConsumerSignal.wait(30 * 60 * 1000);
                         }
+                        TimedMessageListener listener = new TimedMessageListener();
                         consumer = createDurableSubscriber(factory.createConnection(), destination, "consumer" + (i + 1));
                         LOG.info("Created consumer " + consumer);
-                        MessageIdList list = new MessageIdList();
-                        list.setParent(allMessagesList);
-                        consumer.setMessageListener(list);
-                        consumers.put(consumer, list);
+                        consumer.setMessageListener(listener);
+                        consumers.put(consumer, listener);
                     }
                 } catch (Exception e) {
                     LOG.error("failed to start consumer", e);
@@ -114,18 +113,44 @@ public class ConcurrentProducerDurableCo
         });
 
 
-        double[] stats  = produceMessages(destination, 20, 100, session, producer, addConsumerSignal);
+        double[] statsWithActive = produceMessages(destination, 300, 10, session, producer, addConsumerSignal);
 
-        LOG.info(" with concurrent activate, ave: " + stats[1] + ", max: " + stats[0] + ", multiplier: " + (stats[0]/stats[1]));
-        assertTrue("max (" + stats[0] + ") within reasonable multiplier of ave (" + stats[1] + ")",
-                stats[0] < 5 * stats[1]);
+        LOG.info(" with concurrent activate, ave: " + statsWithActive[1] + ", max: " + statsWithActive[0] + ", multiplier: " + (statsWithActive[0]/ statsWithActive[1]));
+
+        while(consumers.size() < consumersToActivate) {
+            TimeUnit.SECONDS.sleep(2);
+        }
 
+        long timeToFirstAccumulator = 0;
+        for (TimedMessageListener listener : consumers.values()) {
+            long time = listener.getFirstReceipt();
+            timeToFirstAccumulator += time;
+            LOG.info("Time to first " + time);
+        }
+        LOG.info("Ave time to first message =" + timeToFirstAccumulator/consumers.size());
+
+        for (TimedMessageListener listener : consumers.values()) {
+            LOG.info("Ave batch receipt time: " + listener.waitForReceivedLimit(5000) + " max receipt: " + listener.maxReceiptTime);
+        }
+
+        //assertTrue("max (" + statsWithActive[0] + ") within reasonable multiplier of ave (" + statsWithActive[1] + ")",
+        //        statsWithActive[0] < 5 * statsWithActive[1]);
+
+        // compare no active to active
+        LOG.info("Ave send time with active: " + statsWithActive[1]
+                + " as multiplier of ave with none active: " + inactiveConsumerStats[1]
+                + ", multiplier=" + (statsWithActive[1]/inactiveConsumerStats[1]));
+
+        assertTrue("Ave send time with active: " + statsWithActive[1]
+                + " within reasonable multpler of ave with none active: " + inactiveConsumerStats[1]
+                + ", multiplier " + (statsWithActive[1]/inactiveConsumerStats[1]),
+                statsWithActive[1] < 15 * inactiveConsumerStats[1]);
     }
 
 
     public void x_initCombosForTestSendWithInactiveAndActiveConsumers() throws Exception {
         addCombinationValues("defaultPersistenceAdapter",
-                new Object[]{PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC, PersistenceAdapterChoice.MEM});
+                new Object[]{PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC});
     }
 
     public void x_testSendWithInactiveAndActiveConsumers() throws Exception {
@@ -150,7 +175,7 @@ public class ConcurrentProducerDurableCo
 
         LOG.info("With consumer: " + withConsumerStats[1] + " , with noConsumer: " + noConsumerStats[1]
                 + ", multiplier: " + (withConsumerStats[1]/noConsumerStats[1]));
-        final int reasonableMultiplier = 4; // not so reasonable, but on slow disks it can be
+        final int reasonableMultiplier = 15; // not so reasonable but improving
         assertTrue("max X times as slow with consumer: " + withConsumerStats[1] + ", with no Consumer: "
                 + noConsumerStats[1] + ", multiplier: " + (withConsumerStats[1]/noConsumerStats[1]),
                 withConsumerStats[1] < noConsumerStats[1] * reasonableMultiplier);
@@ -188,9 +213,8 @@ public class ConcurrentProducerDurableCo
     protected void startConsumers(ConnectionFactory factory, Destination dest) throws Exception {
         MessageConsumer consumer;
         for (int i = 0; i < consumerCount; i++) {
+            TimedMessageListener list = new TimedMessageListener();
             consumer = createDurableSubscriber(factory.createConnection(), dest, "consumer" + (i + 1));
-            MessageIdList list = new MessageIdList();
-            list.setParent(allMessagesList);
             consumer.setMessageListener(list);
             consumers.put(consumer, list);
         }
@@ -212,39 +236,44 @@ public class ConcurrentProducerDurableCo
      * @throws Exception
      */
     private double[] produceMessages(Destination destination,
-                                     int toSend,
-                                     int numIterations,
+                                     final int toSend,
+                                     final int numIterations,
                                      Session session,
                                      MessageProducer producer,
                                      Object addConsumerSignal) throws Exception {
         long start;
         long count = 0;
-        double max = 0, sum = 0;
+        double batchMax = 0, max = 0, sum = 0;
         for (int i=0; i<numIterations; i++) {
             start = System.currentTimeMillis();
             for (int j=0; j < toSend; j++) {
+                long singleSendstart = System.currentTimeMillis();
                 TextMessage msg = createTextMessage(session, "" + j);
                 producer.send(msg);
-                if (++count % 300 == 0) {
+                max = Math.max(max, (System.currentTimeMillis() - singleSendstart));
+                if (++count % 500 == 0) {
                     if (addConsumerSignal != null) {
                         synchronized (addConsumerSignal) {
                             addConsumerSignal.notifyAll();
-                            LOG.info("Signaled add consumer");
+                            LOG.info("Signalled add consumer");
                         }
                     }
                 }
+                ;
                 if (count % 5000 == 0) {
-                    LOG.info("Sent " + count);
+                    LOG.info("Sent " + count + ", singleSendMax:" + max);
                 }
 
             }
             long duration = System.currentTimeMillis() - start;
-            max = Math.max(max, duration);
+            batchMax = Math.max(batchMax, duration);
             sum += duration;
+            LOG.info("Iteration " + i + ", sent " + toSend + ", time: "
+                    + duration + ", batchMax:" + batchMax + ", singleSendMax:" + max);
         }
 
-        LOG.info("Sent: " + toSend * numIterations + ", max send time: " + max);
-        return new double[]{max, sum/numIterations};
+        LOG.info("Sent: " + toSend * numIterations + ", batchMax: " + batchMax + " singleSendMax: " + max);
+        return new double[]{batchMax, sum/numIterations};
     }
 
     protected TextMessage createTextMessage(Session session, String initText) throws Exception {
@@ -297,12 +326,44 @@ public class ConcurrentProducerDurableCo
 
         PolicyEntry policy = new PolicyEntry();
         policy.setPrioritizedMessages(true);
+        policy.setMaxPageSize(500);
+
         PolicyMap policyMap = new PolicyMap();
         policyMap.setDefaultEntry(policy);
         brokerService.setDestinationPolicy(policyMap);
 
-        //setPersistenceAdapter(brokerService, PersistenceAdapterChoice.JDBC);
-        setDefaultPersistenceAdapter(brokerService);
+        if (false) {
+              // external mysql works a lot faster
+              //
+//            JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter();
+//            BasicDataSource ds = new BasicDataSource();
+//            com.mysql.jdbc.Driver d = new com.mysql.jdbc.Driver();
+//            ds.setDriverClassName("com.mysql.jdbc.Driver");
+//            ds.setUrl("jdbc:mysql://localhost/activemq?relaxAutoCommit=true");
+//            ds.setMaxActive(200);
+//            ds.setUsername("root");
+//            ds.setPassword("");
+//            ds.setPoolPreparedStatements(true);
+//            jdbc.setDataSource(ds);
+//            brokerService.setPersistenceAdapter(jdbc);
+
+/* add mysql bits to the pom in the testing dependencies
+<dependency>
+    <groupId>mysql</groupId>
+    <artifactId>mysql-connector-java</artifactId>
+    <version>5.1.10</version>
+    <scope>test</scope>
+</dependency>
+<dependency>
+    <groupId>commons-dbcp</groupId>
+    <artifactId>commons-dbcp</artifactId>
+    <version>1.2.2</version>
+    <scope>test</scope>
+</dependency>
+             */
+        } else {
+            setDefaultPersistenceAdapter(brokerService);
+        }
         return brokerService;
     }
 
@@ -311,6 +372,8 @@ public class ConcurrentProducerDurableCo
         ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
         prefetchPolicy.setAll(1);
         factory.setPrefetchPolicy(prefetchPolicy);
+
+        factory.setDispatchAsync(true);
         return factory;
     }
 
@@ -318,4 +381,55 @@ public class ConcurrentProducerDurableCo
         return suite(ConcurrentProducerDurableConsumerTest.class);
     }
 
+    class TimedMessageListener implements MessageListener {
+        final int batchSize = 1000;
+        CountDownLatch firstReceiptLatch = new CountDownLatch(1);
+        long mark = System.currentTimeMillis();
+        long firstReceipt = 0l;
+        long receiptAccumulator = 0;
+        long batchReceiptAccumulator = 0;
+        long maxReceiptTime = 0;
+        AtomicLong count = new AtomicLong(0);
+
+        @Override
+        public void onMessage(Message message) {
+            final long current = System.currentTimeMillis();
+            final long duration = current - mark;
+            receiptAccumulator += duration;
+            allMessagesList.onMessage(message);
+            if (count.incrementAndGet() == 1) {
+                firstReceipt = duration;
+                firstReceiptLatch.countDown();
+                LOG.info("First receipt in " + firstReceipt + "ms");
+            } else if (count.get() % batchSize == 0) {
+                LOG.info("Consumed " + batchSize + " in " + batchReceiptAccumulator + "ms");
+                batchReceiptAccumulator=0;
+            }
+            maxReceiptTime = Math.max(maxReceiptTime, duration);
+            receiptAccumulator += duration;
+            batchReceiptAccumulator += duration;
+            mark = current;
+        }
+
+        long getMessageCount() {
+            return count.get();
+        }
+
+        long getFirstReceipt() throws Exception {
+            firstReceiptLatch.await(30, TimeUnit.SECONDS);
+            return firstReceipt;
+        }
+
+        public long waitForReceivedLimit(long limit) throws Exception {
+            final long expiry = System.currentTimeMillis() + 30*60*1000;
+            while (count.get() < limit) {
+                if (System.currentTimeMillis() > expiry) {
+                    throw new RuntimeException("Expired waiting for X messages, " + limit);
+                }
+                TimeUnit.SECONDS.sleep(2);
+            }
+            return receiptAccumulator/(limit/batchSize);
+        }
+    }
+
 }