You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2016/03/01 12:55:22 UTC

activemq git commit: https://issues.apache.org/jira/browse/AMQ-4495 - revisit. Reinstate check for space on pagein, so that highWaterMark is respected and full state is not reached, hense pfc is not triggered in error

Repository: activemq
Updated Branches:
  refs/heads/master 473b3284d -> d8cf54b0a


https://issues.apache.org/jira/browse/AMQ-4495 - revisit. Reinstate check for space on pagein, so that highWaterMark is respected and full state is not reached, hense pfc is not triggered in error


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/d8cf54b0
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/d8cf54b0
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/d8cf54b0

Branch: refs/heads/master
Commit: d8cf54b0a9eee4b86db1ffef2cb3dd1171067307
Parents: 473b328
Author: gtully <ga...@gmail.com>
Authored: Tue Mar 1 11:41:37 2016 +0000
Committer: gtully <ga...@gmail.com>
Committed: Tue Mar 1 11:44:41 2016 +0000

----------------------------------------------------------------------
 .../apache/activemq/broker/region/Queue.java    |   4 +-
 .../region/cursors/AbstractStoreCursor.java     |   4 +-
 .../region/cursors/QueueStorePrefetch.java      |  11 +-
 .../broker/region/cursors/StoreQueueCursor.java |   2 +-
 .../region/cursors/TopicStorePrefetch.java      |  15 +-
 .../store/jdbc/JDBCMessageRecoveryListener.java |   1 +
 .../activemq/store/jdbc/JDBCMessageStore.java   |  37 ++--
 .../store/jdbc/JDBCTopicMessageStore.java       |   8 +
 .../store/jdbc/adapter/DefaultJDBCAdapter.java  |  13 +-
 .../activemq/store/kahadb/KahaDBStore.java      |   2 +-
 .../org/apache/activemq/leveldb/DBManager.scala |   2 +-
 .../StoreQueueCursorNoDuplicateTest.java        |  14 +-
 .../cursors/StoreQueueCursorOrderTest.java      |  10 +-
 .../org/apache/activemq/bugs/AMQ4930Test.java   |   2 +-
 .../activemq/usecases/MemoryLimitPfcTest.java   | 213 +++++++++++++++++++
 .../activemq/usecases/MemoryLimitTest.java      |  13 +-
 .../activemq/usecases/QueueBrowsingTest.java    |   4 +-
 17 files changed, 280 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/d8cf54b0/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index 34817a0..96a22ec 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -636,8 +636,8 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
             if (isProducerFlowControl() && context.isProducerFlowControl()) {
                 if (warnOnProducerFlowControl) {
                     warnOnProducerFlowControl = false;
-                    LOG.info("Usage Manager Memory Limit ({}) reached on {}, size {}. Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it. See http://activemq.apache.org/producer-flow-control.html for more info.",
-                                    memoryUsage.getLimit(), getActiveMQDestination().getQualifiedName(), destinationStatistics.getMessages().getCount());
+                    LOG.info("Usage Manager Memory Limit ({}) reached (%{}) on {}, size {}. Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it. See http://activemq.apache.org/producer-flow-control.html for more info.",
+                                    memoryUsage.getLimit(), memoryUsage.getPercentUsage(), getActiveMQDestination().getQualifiedName(), destinationStatistics.getMessages().getCount());
                 }
 
                 if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/d8cf54b0/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
index 06bae97..d84379d 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
@@ -48,8 +48,6 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
     private static int SYNC_ADD = 0;
     private static int ASYNC_ADD = 1;
     final MessageId[] lastCachedIds = new MessageId[2];
-    protected boolean hadSpace = false;
-
 
 
     protected AbstractStoreCursor(Destination destination) {
@@ -401,7 +399,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
             resetBatch();
             this.batchResetNeeded = false;
         }
-        if (this.batchList.isEmpty() && this.size >0) {
+        if (this.batchList.isEmpty() && this.size >0 && hasSpace()) {
             try {
                 doFillBatch();
             } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/d8cf54b0/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
index b10b2e2..dacae78 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
@@ -38,16 +38,14 @@ import org.slf4j.LoggerFactory;
 class QueueStorePrefetch extends AbstractStoreCursor {
     private static final Logger LOG = LoggerFactory.getLogger(QueueStorePrefetch.class);
     private final MessageStore store;
-    private final Broker broker;
 
     /**
      * Construct it
      * @param queue
      */
-    public QueueStorePrefetch(Queue queue, Broker broker) {
+    public QueueStorePrefetch(Queue queue) {
         super(queue);
         this.store = queue.getMessageStore();
-        this.broker = broker;
 
     }
 
@@ -115,11 +113,8 @@ class QueueStorePrefetch extends AbstractStoreCursor {
 
     @Override
     protected void doFillBatch() throws Exception {
-        hadSpace = this.hasSpace();
-        if (!broker.getBrokerService().isPersistent() || hadSpace) {
-            this.store.recoverNextMessages(this.maxBatchSize, this);
-            dealWithDuplicates(); // without the index lock
-        }
+        this.store.recoverNextMessages(this.maxBatchSize, this);
+        dealWithDuplicates(); // without the index lock
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/activemq/blob/d8cf54b0/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
index 7f26b43..e6de82e 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
@@ -47,7 +47,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
         super((queue != null ? queue.isPrioritizedMessages():false));
         this.broker=broker;
         this.queue = queue;
-        this.persistent = new QueueStorePrefetch(queue, broker);
+        this.persistent = new QueueStorePrefetch(queue);
         currentCursor = persistent;
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/d8cf54b0/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
index 35ec3ed..1a6a851 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
@@ -40,7 +40,6 @@ class TopicStorePrefetch extends AbstractStoreCursor {
     private final String subscriberName;
     private final Subscription subscription;
     private byte lastRecoveredPriority = 9;
-    private boolean storeHasMessages = false;
 
     /**
      * @param topic
@@ -56,7 +55,6 @@ class TopicStorePrefetch extends AbstractStoreCursor {
         this.maxProducersToAudit=32;
         this.maxAuditDepth=10000;
         resetSize();
-        this.storeHasMessages=this.size > 0;
     }
 
     @Override
@@ -73,11 +71,6 @@ class TopicStorePrefetch extends AbstractStoreCursor {
         //this.messageSize.addSize(node.getMessage().getSize());
     }
 
-    @Override
-    public final synchronized boolean addMessageLast(MessageReference node) throws Exception {
-        this.storeHasMessages = super.addMessageLast(node);
-        return this.storeHasMessages;
-    }
 
     @Override
     public synchronized boolean recoverMessage(Message message, boolean cached) throws Exception {
@@ -90,7 +83,6 @@ class TopicStorePrefetch extends AbstractStoreCursor {
             if (recovered && !cached) {
                 lastRecoveredPriority = message.getPriority();
             }
-            storeHasMessages = true;
         }
         return recovered;
     }
@@ -134,13 +126,8 @@ class TopicStorePrefetch extends AbstractStoreCursor {
 
     @Override
     protected void doFillBatch() throws Exception {
-        // avoid repeated  trips to the store if there is nothing of interest
-        this.storeHasMessages = false;
         this.store.recoverNextMessages(clientId, subscriberName,
                 maxBatchSize, this);
-        if (!this.storeHasMessages && (!this.batchList.isEmpty() || !hadSpace)) {
-            this.storeHasMessages = true;
-        }
     }
 
     public byte getLastRecoveredPriority() {
@@ -158,6 +145,6 @@ class TopicStorePrefetch extends AbstractStoreCursor {
 
     @Override
     public String toString() {
-        return "TopicStorePrefetch(" + clientId + "," + subscriberName + ",storeHasMessages=" + this.storeHasMessages +") " + this.subscription.getConsumerInfo().getConsumerId() + " - " + super.toString();
+        return "TopicStorePrefetch(" + clientId + "," + subscriberName + ") " + this.subscription.getConsumerInfo().getConsumerId() + " - " + super.toString();
     }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/d8cf54b0/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageRecoveryListener.java
----------------------------------------------------------------------
diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageRecoveryListener.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageRecoveryListener.java
index 07f4816..5ade773 100755
--- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageRecoveryListener.java
+++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageRecoveryListener.java
@@ -24,4 +24,5 @@ package org.apache.activemq.store.jdbc;
 public interface JDBCMessageRecoveryListener {
     boolean recoverMessage(long sequenceId, byte[] message) throws Exception;
     boolean recoverMessageReference(String reference) throws Exception;
+    boolean hasSpace();
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/d8cf54b0/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
----------------------------------------------------------------------
diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
index 175002a..27313f4 100755
--- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
+++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
@@ -279,6 +279,10 @@ public class JDBCMessageStore extends AbstractMessageStore {
                 public boolean recoverMessageReference(String reference) throws Exception {
                     return listener.recoverMessageReference(new MessageId(reference));
                 }
+
+                public boolean hasSpace() {
+                    return listener.hasSpace();
+                }
             });
         } catch (SQLException e) {
             JDBCPersistenceAdapter.log("JDBC Failure: ", e);
@@ -337,24 +341,25 @@ public class JDBCMessageStore extends AbstractMessageStore {
             adapter.doRecoverNextMessages(c, destination, perPriorityLastRecovered, minPendingSequeunceId(),
                     maxReturned, isPrioritizedMessages(), new JDBCMessageRecoveryListener() {
 
-                public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
-                        Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data));
-                        msg.getMessageId().setBrokerSequenceId(sequenceId);
-                        msg.getMessageId().setFutureOrSequenceLong(sequenceId);
-                        listener.recoverMessage(msg);
-                        trackLastRecovered(sequenceId, msg.getPriority());
-                        return true;
-                }
+                        public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
+                            Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data));
+                            msg.getMessageId().setBrokerSequenceId(sequenceId);
+                            msg.getMessageId().setFutureOrSequenceLong(sequenceId);
+                            listener.recoverMessage(msg);
+                            trackLastRecovered(sequenceId, msg.getPriority());
+                            return true;
+                        }
 
-                public boolean recoverMessageReference(String reference) throws Exception {
-                    if (listener.hasSpace()) {
-                        listener.recoverMessageReference(new MessageId(reference));
-                        return true;
-                    }
-                    return false;
-                }
+                        public boolean recoverMessageReference(String reference) throws Exception {
+                            listener.recoverMessageReference(new MessageId(reference));
+                            return true;
+                        }
 
-            });
+                        public boolean hasSpace() {
+                            return listener.hasSpace();
+                        }
+
+                    });
         } catch (SQLException e) {
             JDBCPersistenceAdapter.log("JDBC Failure: ", e);
         } finally {

http://git-wip-us.apache.org/repos/asf/activemq/blob/d8cf54b0/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
----------------------------------------------------------------------
diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
index 3bff9b2..7203f92 100755
--- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
+++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
@@ -129,6 +129,10 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
                     return listener.recoverMessageReference(new MessageId(reference));
                 }
 
+                public boolean hasSpace() {
+                    return listener.hasSpace();
+                }
+
             });
         } catch (SQLException e) {
             JDBCPersistenceAdapter.log("JDBC Failure: ", e);
@@ -238,6 +242,10 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
             return false;
         }
 
+        public boolean hasSpace() {
+            return delegate.hasSpace();
+        }
+
         @Override
         public boolean recoverMessageReference(String reference) throws Exception {
             return delegate.recoverMessageReference(new MessageId(reference));

http://git-wip-us.apache.org/repos/asf/activemq/blob/d8cf54b0/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
----------------------------------------------------------------------
diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
index facf969..6fe83c8 100755
--- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
+++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
@@ -37,7 +37,6 @@ import org.apache.activemq.command.XATransactionId;
 import org.apache.activemq.store.jdbc.JDBCAdapter;
 import org.apache.activemq.store.jdbc.JDBCMessageIdScanListener;
 import org.apache.activemq.store.jdbc.JDBCMessageRecoveryListener;
-import org.apache.activemq.store.jdbc.JDBCMessageStore;
 import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
 import org.apache.activemq.store.jdbc.JdbcMemoryTransactionStore;
 import org.apache.activemq.store.jdbc.Statements;
@@ -633,13 +632,13 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
             rs = s.executeQuery();
             int count = 0;
             if (this.statements.isUseExternalMessageReferences()) {
-                while (rs.next() && count < maxReturned) {
+                while (rs.next() && count < maxReturned && listener.hasSpace()) {
                     if (listener.recoverMessageReference(rs.getString(1))) {
                         count++;
                     }
                 }
             } else {
-                while (rs.next() && count < maxReturned) {
+                while (rs.next() && count < maxReturned && listener.hasSpace()) {
                     if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
                         count++;
                     }
@@ -670,13 +669,13 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
             rs = s.executeQuery();
             int count = 0;
             if (this.statements.isUseExternalMessageReferences()) {
-                while (rs.next() && count < maxReturned) {
+                while (rs.next() && count < maxReturned && listener.hasSpace() ) {
                     if (listener.recoverMessageReference(rs.getString(1))) {
                         count++;
                     }
                 }
             } else {
-                while (rs.next() && count < maxReturned) {
+                while (rs.next() && count < maxReturned  && listener.hasSpace()) {
                     if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
                         count++;
                     }
@@ -1144,7 +1143,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
             rs = s.executeQuery();
             int count = 0;
             if (this.statements.isUseExternalMessageReferences()) {
-                while (rs.next() && count < maxReturned) {
+                while (rs.next() && count < maxReturned && listener.hasSpace()) {
                     if (listener.recoverMessageReference(rs.getString(1))) {
                         count++;
                     } else {
@@ -1153,7 +1152,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
                     }
                 }
             } else {
-                while (rs.next() && count < maxReturned) {
+                while (rs.next() && count < maxReturned && listener.hasSpace()) {
                     if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
                         count++;
                     } else {

http://git-wip-us.apache.org/repos/asf/activemq/blob/d8cf54b0/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
index e1c1df4..69319a0 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
@@ -585,7 +585,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
                             msg.getMessageId().setFutureOrSequenceLong(entry.getKey());
                             listener.recoverMessage(msg);
                             counter++;
-                            if (counter >= maxReturned) {
+                            if (counter >= maxReturned || listener.hasSpace() == false) {
                                 break;
                             }
                         }

http://git-wip-us.apache.org/repos/asf/activemq/blob/d8cf54b0/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
index b0051cc..09fd350 100644
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
+++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
@@ -737,7 +737,7 @@ class DBManager(val parent:LevelDBStore) {
         lastmsgid = msg.getMessageId
         count += 1
       }
-      count < max
+      count < max && listener.hasSpace
     }
     if( lastmsgid==null ) {
       startPos

http://git-wip-us.apache.org/repos/asf/activemq/blob/d8cf54b0/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorNoDuplicateTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorNoDuplicateTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorNoDuplicateTest.java
index 2406e88..7680ca9 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorNoDuplicateTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorNoDuplicateTest.java
@@ -27,6 +27,7 @@ import org.apache.activemq.broker.region.Queue;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTextMessage;
 import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.store.PersistenceAdapter;
@@ -82,10 +83,14 @@ public class StoreQueueCursorNoDuplicateTest extends TestCase {
         queueMessageStore.start();
         queueMessageStore.registerIndexListener(null);
 
-        QueueStorePrefetch underTest = new QueueStorePrefetch(queue, brokerService.getBroker());
+        QueueStorePrefetch underTest = new QueueStorePrefetch(queue);
         SystemUsage systemUsage = new SystemUsage();
+
+        ActiveMQTextMessage sampleMessage = getMessage(0);
+        int unitSize = sampleMessage.getSize();
+
         // ensure memory limit is reached
-        systemUsage.getMemoryUsage().setLimit(messageBytesSize * (count + 2));
+        systemUsage.getMemoryUsage().setLimit(unitSize * count);
         underTest.setSystemUsage(systemUsage);
         underTest.setEnableAudit(false);
         underTest.start();
@@ -110,8 +115,11 @@ public class StoreQueueCursorNoDuplicateTest extends TestCase {
             ref.decrementReferenceCount();
             underTest.remove();
             LOG.info("Received message: {} with body: {}",
-                     ref.getMessageId(), ((ActiveMQTextMessage)ref.getMessage()).getText());
+                    ref.getMessageId(), ((ActiveMQTextMessage) ref.getMessage()).getText());
             assertEquals(dequeueCount++, ref.getMessageId().getProducerSequenceId());
+
+            // memory store keeps a message ref that needs releasing to free usage
+            queueMessageStore.removeMessage(contextNotInTx, new MessageAck(ref.getMessage(), MessageAck.STANDARD_ACK_TYPE, 1));
         }
         underTest.release();
         assertEquals(count, dequeueCount);

http://git-wip-us.apache.org/repos/asf/activemq/blob/d8cf54b0/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java
index 90b8428..92c646b 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java
@@ -89,7 +89,7 @@ public class StoreQueueCursorOrderTest {
         queueMessageStore.start();
         queueMessageStore.registerIndexListener(null);
 
-        QueueStorePrefetch underTest = new QueueStorePrefetch(queue, brokerService.getBroker());
+        QueueStorePrefetch underTest = new QueueStorePrefetch(queue);
         SystemUsage systemUsage = new SystemUsage();
         // ensure memory limit is reached
         systemUsage.getMemoryUsage().setLimit(messageBytesSize * 1);
@@ -154,7 +154,7 @@ public class StoreQueueCursorOrderTest {
         queueMessageStore.start();
         queueMessageStore.registerIndexListener(null);
 
-        QueueStorePrefetch underTest = new QueueStorePrefetch(queue, brokerService.getBroker());
+        QueueStorePrefetch underTest = new QueueStorePrefetch(queue);
         SystemUsage systemUsage = new SystemUsage();
         // ensure memory limit is reached
         systemUsage.getMemoryUsage().setLimit(messageBytesSize * 1);
@@ -222,7 +222,7 @@ public class StoreQueueCursorOrderTest {
         queueMessageStore.start();
         queueMessageStore.registerIndexListener(null);
 
-        QueueStorePrefetch underTest = new QueueStorePrefetch(queue, brokerService.getBroker());
+        QueueStorePrefetch underTest = new QueueStorePrefetch(queue);
         SystemUsage systemUsage = new SystemUsage();
         // ensure memory limit is reached
         systemUsage.getMemoryUsage().setLimit(messageBytesSize * 1);
@@ -299,7 +299,7 @@ public class StoreQueueCursorOrderTest {
         queueMessageStore.start();
         queueMessageStore.registerIndexListener(null);
 
-        QueueStorePrefetch underTest = new QueueStorePrefetch(queue, brokerService.getBroker());
+        QueueStorePrefetch underTest = new QueueStorePrefetch(queue);
         SystemUsage systemUsage = new SystemUsage();
         // ensure memory limit is reached
         systemUsage.getMemoryUsage().setLimit(messageBytesSize * (count + 6));
@@ -392,7 +392,7 @@ public class StoreQueueCursorOrderTest {
         queueMessageStore.start();
         queueMessageStore.registerIndexListener(null);
 
-        QueueStorePrefetch underTest = new QueueStorePrefetch(queue, brokerService.getBroker());
+        QueueStorePrefetch underTest = new QueueStorePrefetch(queue);
         SystemUsage systemUsage = new SystemUsage();
         // ensure memory limit is reached
         systemUsage.getMemoryUsage().setLimit(messageBytesSize * 5);

http://git-wip-us.apache.org/repos/asf/activemq/blob/d8cf54b0/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4930Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4930Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4930Test.java
index e65ad91..8f6fbb2 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4930Test.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4930Test.java
@@ -46,7 +46,7 @@ public class AMQ4930Test extends TestCase {
     protected void configureBroker() throws Exception {
         broker.setDeleteAllMessagesOnStartup(true);
         broker.setAdvisorySupport(false);
-        broker.getSystemUsage().getMemoryUsage().setLimit(1*1024*1024);
+        broker.getSystemUsage().getMemoryUsage().setLimit(100*1024*1024);
 
         PolicyMap pMap = new PolicyMap();
         PolicyEntry policy = new PolicyEntry();

http://git-wip-us.apache.org/repos/asf/activemq/blob/d8cf54b0/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitPfcTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitPfcTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitPfcTest.java
new file mode 100644
index 0000000..5b2dc23
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitPfcTest.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.TestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.util.ProducerThread;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@RunWith(value = Parameterized.class)
+public class MemoryLimitPfcTest extends TestSupport {
+    private static final Logger LOG = LoggerFactory.getLogger(MemoryLimitPfcTest.class);
+    final String payload = new String(new byte[100 * 1024]);
+    protected BrokerService broker;
+
+    @Parameterized.Parameter
+    public PersistenceAdapterChoice persistenceAdapterChoice;
+
+    @Parameterized.Parameters(name="store={0}")
+    public static Iterable<Object[]> getTestParameters() {
+        return Arrays.asList(new Object[][]{{PersistenceAdapterChoice.KahaDB}, {PersistenceAdapterChoice.LevelDB}, {PersistenceAdapterChoice.JDBC}});
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService broker = new BrokerService();
+        broker.getSystemUsage().getMemoryUsage().setLimit(1 * 1024 * 1024); //1MB
+        broker.setDeleteAllMessagesOnStartup(true);
+
+        PolicyMap policyMap = new PolicyMap();
+        PolicyEntry policyEntry = new PolicyEntry();
+        policyEntry.setExpireMessagesPeriod(0); // when this fires it will consume 2*pageSize mem which will throw the test
+        policyMap.put(new ActiveMQQueue(">"), policyEntry);
+        broker.setDestinationPolicy(policyMap);
+
+        LOG.info("Starting broker with persistenceAdapterChoice " + persistenceAdapterChoice.toString());
+        setPersistenceAdapter(broker, persistenceAdapterChoice);
+
+        return broker;
+    }
+
+    @Override
+    @Before
+    public void setUp() throws Exception {
+        if (broker == null) {
+            broker = createBroker();
+        }
+        broker.start();
+        broker.waitUntilStarted();
+    }
+
+    @Override
+    @After
+    public void tearDown() throws Exception {
+        if (broker != null) {
+            broker.stop();
+            broker.waitUntilStopped();
+        }
+    }
+
+
+    @Test(timeout = 120000)
+    public void testStopCachingDispatchNoPfc() throws Exception {
+
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost?jms.prefetchPolicy.all=10");
+        factory.setOptimizeAcknowledge(true);
+        Connection conn = factory.createConnection();
+        conn.start();
+        Session sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        Queue queue = sess.createQueue("STORE");
+        final ProducerThread producer = new ProducerThread(sess, queue) {
+            @Override
+            protected Message createMessage(int i) throws Exception {
+                BytesMessage bytesMessage = session.createBytesMessage();
+                bytesMessage.writeBytes(payload.getBytes());
+                return bytesMessage;
+            }
+        };
+        producer.setMessageCount(200);
+        producer.start();
+        producer.join();
+
+        Thread.sleep(1000);
+
+        // assert we didn't break high watermark (70%) usage
+        final Destination dest = broker.getDestination((ActiveMQQueue) queue);
+        LOG.info("Destination usage: " + dest.getMemoryUsage());
+        int percentUsage = dest.getMemoryUsage().getPercentUsage();
+        assertTrue("Should be less than 70% of limit but was: " + percentUsage, percentUsage <= 80);
+        LOG.info("Broker usage: " + broker.getSystemUsage().getMemoryUsage());
+        assertTrue(broker.getSystemUsage().getMemoryUsage().getPercentUsage() <= 80);
+
+        assertFalse("cache disabled", ((org.apache.activemq.broker.region.Queue) dest).getMessages().isCacheEnabled());
+
+        // consume one message
+        MessageConsumer consumer = sess.createConsumer(queue);
+        Message msg = consumer.receive(5000);
+        msg.acknowledge();
+
+        LOG.info("Destination usage after consume one: " + dest.getMemoryUsage());
+
+        // ensure we can send more messages
+        final ProducerThread secondProducer = new ProducerThread(sess, queue) {
+                    @Override
+                    protected Message createMessage(int i) throws Exception {
+                        BytesMessage bytesMessage = session.createBytesMessage();
+                        bytesMessage.writeBytes(payload.getBytes());
+                        return bytesMessage;
+                    }
+                };
+        secondProducer.setMessageCount(100);
+        secondProducer.start();
+        secondProducer.join();
+
+        LOG.info("Broker usage: " + broker.getSystemUsage().getMemoryUsage());
+        assertTrue(broker.getSystemUsage().getMemoryUsage().getPercentUsage() <= 100);
+
+        // let's make sure we can consume all messages
+        for (int i = 1; i < 300; i++) {
+            msg = consumer.receive(5000);
+            if (msg == null) {
+                dumpAllThreads("NoMessage");
+            }
+            assertNotNull("Didn't receive message " + i, msg);
+            msg.acknowledge();
+        }
+    }
+
+    @Test(timeout = 120000)
+    public void testConsumeFromTwoAfterPageInToOne() throws Exception {
+
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost?jms.prefetchPolicy.all=10");
+        factory.setOptimizeAcknowledge(true);
+        Connection conn = factory.createConnection();
+        conn.start();
+        Session sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        final ProducerThread producer = new ProducerThread(sess, sess.createQueue("STORE.1")) {
+            @Override
+            protected Message createMessage(int i) throws Exception {
+                return session.createTextMessage(payload + "::" + i);
+            }
+        };
+        producer.setMessageCount(20);
+
+        final ProducerThread producer2 = new ProducerThread(sess, sess.createQueue("STORE.2")) {
+            @Override
+            protected Message createMessage(int i) throws Exception {
+                return session.createTextMessage(payload + "::" + i);
+            }
+        };
+        producer2.setMessageCount(20);
+
+        producer.start();
+        producer2.start();
+
+        producer.join();
+        producer2.join();
+
+        LOG.info("before consumer1, broker % mem usage: " + broker.getSystemUsage().getMemoryUsage().getPercentUsage());
+
+        MessageConsumer consumer = sess.createConsumer(sess.createQueue("STORE.1"));
+        Message msg = null;
+        for (int i=0; i<10; i++) {
+            msg = consumer.receive(5000);
+            LOG.info("% mem usage: " + broker.getSystemUsage().getMemoryUsage().getPercentUsage());
+            msg.acknowledge();
+        }
+
+        TimeUnit.SECONDS.sleep(2);
+        LOG.info("Before consumer2, Broker % mem usage: " + broker.getSystemUsage().getMemoryUsage().getPercentUsage());
+
+        MessageConsumer consumer2 = sess.createConsumer(sess.createQueue("STORE.2"));
+        for (int i=0; i<10; i++) {
+            msg = consumer2.receive(5000);
+            LOG.info("% mem usage: " + broker.getSystemUsage().getMemoryUsage().getPercentUsage());
+            msg.acknowledge();
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/d8cf54b0/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitTest.java
index 760876c..d3af604 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitTest.java
@@ -133,18 +133,9 @@ public class MemoryLimitTest extends TestSupport {
         Message msg = consumer.receive(5000);
         msg.acknowledge();
 
-        // this should free some space and allow us to get new batch of messages in the memory
-        // exceeding the limit
-        assertTrue("Limit is exceeded", Wait.waitFor(new Wait.Condition() {
-            @Override
-            public boolean isSatisified() throws Exception {
-                LOG.info("Destination usage: " + dest.getMemoryUsage());
-                return dest.getMemoryUsage().getPercentUsage() >= 200;
-            }
-        }));
-
+        assertTrue("Should be less than 70% of limit but was: " + percentUsage, percentUsage <= 71);
         LOG.info("Broker usage: " + broker.getSystemUsage().getMemoryUsage());
-        assertTrue(broker.getSystemUsage().getMemoryUsage().getPercentUsage() >= 200);
+        assertTrue(broker.getSystemUsage().getMemoryUsage().getPercentUsage() <= 71);
 
         // let's make sure we can consume all messages
         for (int i = 1; i < 2000; i++) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/d8cf54b0/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueBrowsingTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueBrowsingTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueBrowsingTest.java
index 29b6e72..05540a5 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueBrowsingTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueBrowsingTest.java
@@ -182,7 +182,7 @@ public class QueueBrowsingTest {
 
     @Test
     public void testMemoryLimit() throws Exception {
-        broker.getSystemUsage().getMemoryUsage().setLimit(16 * 1024);
+        broker.getSystemUsage().getMemoryUsage().setLimit((maxPageSize + 10) * 4 * 1024);
 
         int messageToSend = 370;
 
@@ -211,6 +211,6 @@ public class QueueBrowsingTest {
         }
 
         browser.close();
-        assertTrue("got at least maxPageSize", received >= maxPageSize);
+        assertTrue("got at least maxPageSize, received: " + received, received >= maxPageSize);
     }
 }