You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2017/05/15 16:13:48 UTC

[1/2] qpid-broker-j git commit: Revert "QPID-7770 : [Java Broker] Avoid rapid oscillation of flow to disk state"

Repository: qpid-broker-j
Updated Branches:
  refs/heads/master a5b0f7761 -> 501aa8919


Revert "QPID-7770 : [Java Broker] Avoid rapid oscillation of flow to disk state"

This reverts commit ac08557c89f29641f1b5a42332582379fb19eab6.


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/ba0e9c3f
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/ba0e9c3f
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/ba0e9c3f

Branch: refs/heads/master
Commit: ba0e9c3f5bd3ac8aa782f916327c37392c7f6236
Parents: a5b0f77
Author: Lorenz Quack <lq...@apache.org>
Authored: Fri May 12 13:45:08 2017 +0100
Committer: Lorenz Quack <lq...@apache.org>
Committed: Mon May 15 11:52:47 2017 +0100

----------------------------------------------------------------------
 .../org/apache/qpid/server/model/Broker.java    |  10 --
 .../apache/qpid/server/model/BrokerImpl.java    |   8 --
 .../org/apache/qpid/server/model/Queue.java     |   9 +-
 .../apache/qpid/server/queue/AbstractQueue.java | 105 +++++++++++--------
 4 files changed, 60 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ba0e9c3f/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java b/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
index 5545f28..19e6de0 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
@@ -85,13 +85,6 @@ public interface Broker<X extends Broker<X>> extends ConfiguredObject<X>, EventL
     @ManagedContextDefault(name = BROKER_FLOW_TO_DISK_THRESHOLD)
     long DEFAULT_FLOW_TO_DISK_THRESHOLD = (long)(0.75 * (double) BrokerImpl.getMaxDirectMemorySize());
 
-
-    String BROKER_FLOW_TO_DISK_CESSATION_FRACTION = "broker.flowToDiskCessationFraction";
-    @SuppressWarnings("unused")
-    @ManagedContextDefault( name = BROKER_FLOW_TO_DISK_CESSATION_FRACTION,
-            description = "Fraction of the flow to disk threshold where flow to disk will cease.")
-    double DEFAULT_BROKER_FLOW_TO_DISK_CESSATION_FRACTION = 0.8;
-
     String COMPACT_MEMORY_THRESHOLD = "qpid.compact_memory_threshold";
     @SuppressWarnings("unused")
     @ManagedContextDefault(name = COMPACT_MEMORY_THRESHOLD)
@@ -375,9 +368,6 @@ public interface Broker<X extends Broker<X>> extends ConfiguredObject<X>, EventL
     @DerivedAttribute(description = "Threshold direct memory size (in bytes) at which the Broker will start flowing incoming messages to disk.")
     long getFlowToDiskThreshold();
 
-    @DerivedAttribute(description = "Threshold direct memory size (in bytes) at which the Broker will cease flowing incoming messages to disk.")
-    long getFlowToDiskCessationThreshold();
-
     @DerivedAttribute(description = "Threshold direct memory size (in bytes) at which the Broker will start considering to compact sparse buffers. Set to -1 to disable.")
     long getCompactMemoryThreshold();
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ba0e9c3f/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java b/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java
index d7182bc..0840b66 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java
@@ -150,7 +150,6 @@ public class BrokerImpl extends AbstractContainer<BrokerImpl> implements Broker<
     private long _compactMemoryThreshold;
     private long _compactMemoryInterval;
     private long _flowToDiskThreshold;
-    private long _flowToDiskCessationThreshold;
     private double _sparsityFraction;
     private long _lastDisposalCounter;
 
@@ -623,7 +622,6 @@ public class BrokerImpl extends AbstractContainer<BrokerImpl> implements Broker<
         getEventLogger().message(BrokerMessages.MAX_MEMORY(heapMemory, directMemory));
 
         _flowToDiskThreshold = getContextValue(Long.class, BROKER_FLOW_TO_DISK_THRESHOLD);
-        _flowToDiskCessationThreshold = (long) (getContextValue(Double.class, BROKER_FLOW_TO_DISK_CESSATION_FRACTION) * _flowToDiskThreshold);
         _compactMemoryThreshold = getContextValue(Long.class, Broker.COMPACT_MEMORY_THRESHOLD);
         _compactMemoryInterval = getContextValue(Long.class, Broker.COMPACT_MEMORY_INTERVAL);
 
@@ -888,12 +886,6 @@ public class BrokerImpl extends AbstractContainer<BrokerImpl> implements Broker<
     }
 
     @Override
-    public long getFlowToDiskCessationThreshold()
-    {
-        return _flowToDiskCessationThreshold;
-    }
-
-    @Override
     public long getNumberOfActivePooledBuffers()
     {
         return QpidByteBuffer.getNumberOfActivePooledBuffers();

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ba0e9c3f/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java b/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
index 94d02ee..f2cbc25 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
@@ -210,11 +210,7 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X>,
                           + "none is explicitly set")
     String DEFAULT_MESSAGE_DURABILTY = "DEFAULT";
 
-    String QUEUE_FLOW_TO_DISK_QUEUE_CESSATION_FRACTION = "queue.flowToDiskQueueCessationFraction";
-    @SuppressWarnings("unused")
-    @ManagedContextDefault( name = QUEUE_FLOW_TO_DISK_QUEUE_CESSATION_FRACTION,
-            description = "The fraction of this queue's 'target size' where the queue will cease flowing messages to disk")
-    double DEFAULT_QUEUE_FLOW_TO_DISK_QUEUE_CESSATION_FRACTION = 0.80;
+
 
     @ManagedAttribute( defaultValue = "${queue.defaultMessageDurability}" )
     MessageDurability getMessageDurability();
@@ -280,9 +276,6 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X>,
             mandatory = true)
     OverflowPolicy getOverflowPolicy();
 
-    @DerivedAttribute(description = "Returns 'true' when the broker is flowing transient messages to disk")
-    boolean isFlowingToDisk();
-
     @ManagedOperation(nonModifying = true, changesConfiguredObjectState = false)
     Collection<PublishingLink> getPublishingLinks();
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ba0e9c3f/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 9a5f9d7..6deda67 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -262,14 +262,11 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
     private boolean _closing;
     private final ConcurrentMap<String, Callable<MessageFilter>> _defaultFiltersMap = new ConcurrentHashMap<>();
     private final List<HoldMethod> _holdMethods = new CopyOnWriteArrayList<>();
-    private final AtomicBoolean _flowingToDisk = new AtomicBoolean();
     private Map<String, String> _mimeTypeToFileExtension = Collections.emptyMap();
     private AdvanceConsumersTask _queueHouseKeepingTask;
     private volatile int _bindingCount;
     private volatile OverflowPolicyHandler _overflowPolicyHandler;
-    private volatile long _brokerFlowToDiskThreshold;
-    private volatile long _brokerFlowToDiskLowerThreshold;
-    private volatile double _queueFlowToDiskCessationFraction;
+    private long _flowToDiskThreshold;
 
     private interface HoldMethod
     {
@@ -474,9 +471,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
         }
 
         _mimeTypeToFileExtension = getContextValue(Map.class, MAP_OF_STRING_STRING, MIME_TYPE_TO_FILE_EXTENSION);
-        _queueFlowToDiskCessationFraction = getContextValue(Double.class, QUEUE_FLOW_TO_DISK_QUEUE_CESSATION_FRACTION);
-        _brokerFlowToDiskThreshold = getAncestor(Broker.class).getFlowToDiskThreshold();
-        _brokerFlowToDiskLowerThreshold = getAncestor(Broker.class).getFlowToDiskThreshold();
+        _flowToDiskThreshold = getAncestor(Broker.class).getFlowToDiskThreshold();
 
         if(_defaultFilters != null)
         {
@@ -1106,8 +1101,9 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
             doEnqueue(message, action, enqueueRecord);
         }
 
-        _flowToDiskChecker.flowToDiskIfNecessary(message.getStoredMessage(),
-                                                 _queueStatistics.getQueueSizeIncludingHeader());
+        long estimatedQueueSize = _queueStatistics.getQueueSizeIncludingHeader();
+        _flowToDiskChecker.flowToDiskAndReportIfNecessary(message.getStoredMessage(), estimatedQueueSize,
+                                                          _targetQueueSize.get());
     }
 
     public final void recover(ServerMessage message, final MessageEnqueueRecord enqueueRecord)
@@ -1774,7 +1770,6 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
     public void checkCapacity()
     {
         _overflowPolicyHandler.checkOverflow();
-        _flowToDiskChecker.ceaseFlowToDiskIfNecessary();
     }
 
     void notifyConsumers(QueueEntry entry)
@@ -2037,7 +2032,8 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
     {
         QueueEntryIterator queueListIterator = getEntries().iterator();
 
-        _flowToDiskChecker.ceaseFlowToDiskIfNecessary();
+        final long estimatedQueueSize = _queueStatistics.getQueueSizeIncludingHeader();
+        _flowToDiskChecker.reportFlowToDiskStatusIfNecessary(estimatedQueueSize, _targetQueueSize.get());
 
         final Set<NotificationCheck> perMessageChecks = new HashSet<>();
         final Set<NotificationCheck> queueLevelChecks = new HashSet<>();
@@ -2084,9 +2080,10 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
                         {
                             messageReference = msg.newReference();
                             cumulativeQueueSize += msg.getSizeIncludingHeader();
-                            _flowToDiskChecker.flowToDiskIfNecessary(msg.getStoredMessage(), cumulativeQueueSize);
+                            _flowToDiskChecker.flowToDiskIfNecessary(msg.getStoredMessage(), cumulativeQueueSize,
+                                                                     _targetQueueSize.get());
 
-                            for (NotificationCheck check : perMessageChecks)
+                            for(NotificationCheck check : perMessageChecks)
                             {
                                 checkForNotification(msg, listener, currentTime, thresholdTime, check);
                             }
@@ -2922,12 +2919,6 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
     }
 
     @Override
-    public boolean isFlowingToDisk()
-    {
-        return _flowingToDisk.get();
-    }
-
-    @Override
     public <C extends ConfiguredObject> Collection<C> getChildren(final Class<C> clazz)
     {
         if(clazz == org.apache.qpid.server.model.Consumer.class)
@@ -3340,43 +3331,65 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
 
     private class FlowToDiskChecker
     {
+        final AtomicBoolean _lastReportedFlowToDiskStatus = new AtomicBoolean(false);
 
-        void flowToDiskIfNecessary(StoredMessage<?> storedMessage, long currentQueueSize)
+        void flowToDiskIfNecessary(StoredMessage<?> storedMessage, long estimatedQueueSize, final long targetQueueSize)
         {
-            final long targetQueueSize = _targetQueueSize.get();
-            final int allocatedDirectMemorySize = QpidByteBuffer.getAllocatedDirectMemorySize();
-            if ((currentQueueSize > targetQueueSize || allocatedDirectMemorySize > _brokerFlowToDiskThreshold) && storedMessage.isInMemory())
+            if ((estimatedQueueSize > targetQueueSize
+                 || QpidByteBuffer.getAllocatedDirectMemorySize() > _flowToDiskThreshold)
+                && storedMessage.isInMemory())
             {
-                if (_flowingToDisk.compareAndSet(false, true))
-                {
-
-                    getEventLogger().message(_logSubject, QueueMessages.FLOW_TO_DISK_ACTIVE(getQueueDepthBytesIncludingHeader() / 1024.0,
-                                                                                            targetQueueSize / 1024.0,
-                                                                                            allocatedDirectMemorySize / 1024.0 / 1024.0,
-                                                                                            _brokerFlowToDiskThreshold
-                                                                                            / 1024.0 / 1024.0));
-                }
                 storedMessage.flowToDisk();
             }
         }
 
-        void ceaseFlowToDiskIfNecessary()
+        void flowToDiskAndReportIfNecessary(StoredMessage<?> storedMessage,
+                                            final long estimatedQueueSize,
+                                            final long targetQueueSize)
         {
-            final long currentQueueSize = _queueStatistics.getQueueSizeIncludingHeader();
-            final long targetQueueSize = _targetQueueSize.get();
-            final long allocatedDirectMemorySize = (long) QpidByteBuffer.getAllocatedDirectMemorySize();
+            flowToDiskIfNecessary(storedMessage, estimatedQueueSize, targetQueueSize);
+            reportFlowToDiskStatusIfNecessary(estimatedQueueSize, targetQueueSize);
+        }
 
-            if (currentQueueSize <= (targetQueueSize * _queueFlowToDiskCessationFraction)
-                && allocatedDirectMemorySize <= _brokerFlowToDiskLowerThreshold)
+        void reportFlowToDiskStatusIfNecessary(final long estimatedQueueSize, final long targetQueueSize)
+        {
+            final int allocatedDirectMemorySize = QpidByteBuffer.getAllocatedDirectMemorySize();
+            if (estimatedQueueSize > targetQueueSize
+                || allocatedDirectMemorySize > _flowToDiskThreshold)
             {
-                if (_flowingToDisk.compareAndSet(true, false))
-                {
-                    getEventLogger().message(_logSubject, QueueMessages.FLOW_TO_DISK_INACTIVE(currentQueueSize / 1024.0,
-                                                                                              targetQueueSize / 1024.0,
-                                                                                              allocatedDirectMemorySize / 1024.0 / 1024.0,
-                                                                                              _brokerFlowToDiskThreshold
-                                                                                              / 1024.0 / 1024.0));
-                }
+                reportFlowToDiskActiveIfNecessary(estimatedQueueSize, targetQueueSize, allocatedDirectMemorySize, _flowToDiskThreshold);
+            }
+            else
+            {
+                reportFlowToDiskInactiveIfNecessary(estimatedQueueSize, targetQueueSize, allocatedDirectMemorySize, _flowToDiskThreshold);
+            }
+        }
+
+        private void reportFlowToDiskActiveIfNecessary(long estimatedQueueSize,
+                                                       long targetQueueSize,
+                                                       long allocatedDirectMemorySize,
+                                                       long flowToDiskThreshold)
+        {
+            if (!_lastReportedFlowToDiskStatus.getAndSet(true))
+            {
+                getEventLogger().message(_logSubject, QueueMessages.FLOW_TO_DISK_ACTIVE(estimatedQueueSize / 1024.0,
+                                                                                        targetQueueSize / 1024.0,
+                                                                                        allocatedDirectMemorySize / 1024.0 / 1024.0,
+                                                                                        flowToDiskThreshold / 1024.0 / 1024.0));
+            }
+        }
+
+        private void reportFlowToDiskInactiveIfNecessary(long estimatedQueueSize,
+                                                         long targetQueueSize,
+                                                         long allocatedDirectMemorySize,
+                                                         long flowToDiskThreshold)
+        {
+            if (_lastReportedFlowToDiskStatus.getAndSet(false))
+            {
+                getEventLogger().message(_logSubject, QueueMessages.FLOW_TO_DISK_INACTIVE(estimatedQueueSize / 1024.0,
+                                                                                          targetQueueSize / 1024.0,
+                                                                                          allocatedDirectMemorySize / 1024.0 / 1024.0,
+                                                                                          flowToDiskThreshold / 1024.0 / 1024.0));
             }
         }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[2/2] qpid-broker-j git commit: QPID-7775: [Java Broker] Trigger flow to disk based on actual memory occupancy.

Posted by lq...@apache.org.
QPID-7775: [Java Broker] Trigger flow to disk based on actual memory occupancy.


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/501aa891
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/501aa891
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/501aa891

Branch: refs/heads/master
Commit: 501aa8919c8cd4938b6fa26eef9e6746e267175b
Parents: ba0e9c3
Author: Lorenz Quack <lq...@apache.org>
Authored: Fri May 12 16:21:14 2017 +0100
Committer: Lorenz Quack <lq...@apache.org>
Committed: Mon May 15 17:13:27 2017 +0100

----------------------------------------------------------------------
 .../berkeleydb/AbstractBDBMessageStore.java     |  41 +++++-
 .../store/berkeleydb/BDBConfigurationStore.java |   1 +
 .../store/berkeleydb/BDBMessageStore.java       |   1 +
 .../server/logging/messages/QueueMessages.java  | 121 -----------------
 .../messages/Queue_logmessages.properties       |   6 +-
 .../message/AbstractServerMessageImpl.java      |   9 +-
 .../qpid/server/message/MessageReference.java   |   7 +-
 .../org/apache/qpid/server/model/Broker.java    |   6 +
 .../apache/qpid/server/model/BrokerImpl.java    |  18 +++
 .../org/apache/qpid/server/model/Queue.java     |   5 +-
 .../apache/qpid/server/queue/AbstractQueue.java | 113 ++--------------
 .../qpid/server/store/MemoryMessageStore.java   |  26 +++-
 .../apache/qpid/server/store/MessageStore.java  |   4 +
 .../qpid/server/store/NullMessageStore.java     |  12 ++
 .../server/virtualhost/AbstractVirtualHost.java | 134 ++++++++++++-------
 .../virtualhost/QueueManagingVirtualHost.java   |   8 ++
 .../server/store/TestMessageMetaDataType.java   |   6 +
 .../store/jdbc/AbstractJDBCMessageStore.java    |  34 ++++-
 18 files changed, 264 insertions(+), 288 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/501aa891/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
----------------------------------------------------------------------
diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
index ce25a8f..6f1326e 100644
--- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
+++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
@@ -30,6 +30,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Random;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.util.concurrent.ListenableFuture;
 import com.sleepycat.bind.tuple.LongBinding;
@@ -100,6 +101,8 @@ public abstract class AbstractBDBMessageStore implements MessageStore
     private boolean _limitBusted;
     private long _totalStoreSize;
     private final Random _lockConflictRandom = new Random();
+    private final AtomicLong _inMemorySize = new AtomicLong();
+    private final AtomicLong _bytesEvacuatedFromMemory = new AtomicLong();
 
     @Override
     public void upgradeStoreStructure() throws StoreException
@@ -147,6 +150,18 @@ public abstract class AbstractBDBMessageStore implements MessageStore
     }
 
     @Override
+    public long getInMemorySize()
+    {
+        return _inMemorySize.get();
+    }
+
+    @Override
+    public long getBytesEvacuatedFromMemory()
+    {
+        return _bytesEvacuatedFromMemory.get();
+    }
+
+    @Override
     public boolean isPersistent()
     {
         return true;
@@ -167,6 +182,13 @@ public abstract class AbstractBDBMessageStore implements MessageStore
     }
 
     @Override
+    public void closeMessageStore()
+    {
+        _inMemorySize.set(0);
+        _bytesEvacuatedFromMemory.set(0);
+    }
+
+    @Override
     public MessageStoreReader newMessageStoreReader()
     {
         return new BDBMessageStoreReader();
@@ -979,10 +1001,12 @@ public abstract class AbstractBDBMessageStore implements MessageStore
             _data = data;
         }
 
-        public void clear()
+        public long clear()
         {
+            long bytesCleared = 0;
             if(_metaData != null)
             {
+                bytesCleared += _metaData.getStorableSize();
                 _metaData.clearEncodedForm();
                 _metaData = null;
             }
@@ -990,10 +1014,12 @@ public abstract class AbstractBDBMessageStore implements MessageStore
             {
                 for(QpidByteBuffer buf : _data)
                 {
+                    bytesCleared += buf.remaining();
                     buf.dispose();
                 }
+                _data = null;
             }
-            _data = null;
+            return bytesCleared;
         }
 
         @Override
@@ -1037,7 +1063,9 @@ public abstract class AbstractBDBMessageStore implements MessageStore
             {
                 _messageDataRef = new MessageDataSoftRef<>(metaData, null);
             }
+
             _contentSize = metaData.getContentSize();
+            _inMemorySize.addAndGet(metaData.getStorableSize());
         }
 
         @Override
@@ -1089,6 +1117,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
         @Override
         public StoredMessage<T> allContentAdded()
         {
+            _inMemorySize.addAndGet(getContentSize());
             return this;
         }
 
@@ -1225,14 +1254,17 @@ public abstract class AbstractBDBMessageStore implements MessageStore
             }
 
             final T metaData;
+            long bytesCleared = 0;
             if ((metaData =_messageDataRef.getMetaData()) != null)
             {
+                bytesCleared += metaData.getStorableSize();
                 metaData.dispose();
             }
 
             Collection<QpidByteBuffer> data = _messageDataRef.getData();
             if(data != null)
             {
+                bytesCleared += getContentSize();
                 _messageDataRef.setData(null);
                 for(QpidByteBuffer buf : data)
                 {
@@ -1240,6 +1272,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
                 }
             }
             _messageDataRef = null;
+            _inMemorySize.addAndGet(-bytesCleared);
         }
 
         @Override
@@ -1260,7 +1293,9 @@ public abstract class AbstractBDBMessageStore implements MessageStore
             flushToStore();
             if(_messageDataRef != null && !_messageDataRef.isHardRef())
             {
-                ((MessageDataSoftRef)_messageDataRef).clear();
+                final long bytesCleared = ((MessageDataSoftRef) _messageDataRef).clear();
+                _inMemorySize.addAndGet(-bytesCleared);
+                _bytesEvacuatedFromMemory.addAndGet(bytesCleared);
             }
             return true;
         }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/501aa891/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java
----------------------------------------------------------------------
diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java
index 2e854f7..ca7b4dd 100644
--- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java
+++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java
@@ -592,6 +592,7 @@ public class BDBConfigurationStore implements MessageStoreProvider, DurableConfi
         @Override
         public void closeMessageStore()
         {
+            super.closeMessageStore();
             _messageStoreOpen.set(false);
         }
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/501aa891/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
----------------------------------------------------------------------
diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
index a5ebc68..83df5aa 100644
--- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
+++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
@@ -83,6 +83,7 @@ public class BDBMessageStore extends AbstractBDBMessageStore
     @Override
     public void closeMessageStore()
     {
+        super.closeMessageStore();
         if (_messageStoreOpen.compareAndSet(true, false))
         {
             if (_environmentFacade != null)

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/501aa891/broker-core/src/main/java/org/apache/qpid/server/logging/messages/QueueMessages.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/QueueMessages.java b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/QueueMessages.java
index e73e173..5df5b21 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/QueueMessages.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/QueueMessages.java
@@ -67,10 +67,8 @@ public class QueueMessages
     public static final String DROPPED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "queue.dropped";
     public static final String OVERFULL_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "queue.overfull";
     public static final String OPERATION_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "queue.operation";
-    public static final String FLOW_TO_DISK_ACTIVE_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "queue.flow_to_disk_active";
     public static final String UNDERFULL_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "queue.underfull";
     public static final String DELETED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "queue.deleted";
-    public static final String FLOW_TO_DISK_INACTIVE_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "queue.flow_to_disk_inactive";
 
     static
     {
@@ -79,10 +77,8 @@ public class QueueMessages
         LoggerFactory.getLogger(DROPPED_LOG_HIERARCHY);
         LoggerFactory.getLogger(OVERFULL_LOG_HIERARCHY);
         LoggerFactory.getLogger(OPERATION_LOG_HIERARCHY);
-        LoggerFactory.getLogger(FLOW_TO_DISK_ACTIVE_LOG_HIERARCHY);
         LoggerFactory.getLogger(UNDERFULL_LOG_HIERARCHY);
         LoggerFactory.getLogger(DELETED_LOG_HIERARCHY);
-        LoggerFactory.getLogger(FLOW_TO_DISK_INACTIVE_LOG_HIERARCHY);
 
         _messages = ResourceBundle.getBundle("org.apache.qpid.server.logging.messages.Queue_logmessages", _currentLocale);
     }
@@ -384,64 +380,6 @@ public class QueueMessages
 
     /**
      * Log a Queue message of the Format:
-     * <pre>QUE-1014 : Message flow to disk active : Queue : depth {0,number,#.##} kB, threshold {1,number,#.##} kB / Broker : direct memory used {2,number,#.##} MB, threshold {3,number,#.##} MB</pre>
-     * Optional values are contained in [square brackets] and are numbered
-     * sequentially in the method call.
-     *
-     */
-    public static LogMessage FLOW_TO_DISK_ACTIVE(Number param1, Number param2, Number param3, Number param4)
-    {
-        String rawMessage = _messages.getString("FLOW_TO_DISK_ACTIVE");
-
-        final Object[] messageArguments = {param1, param2, param3, param4};
-        // Create a new MessageFormat to ensure thread safety.
-        // Sharing a MessageFormat and using applyPattern is not thread safe
-        MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale);
-
-        final String message = formatter.format(messageArguments);
-
-        return new LogMessage()
-        {
-            public String toString()
-            {
-                return message;
-            }
-
-            public String getLogHierarchy()
-            {
-                return FLOW_TO_DISK_ACTIVE_LOG_HIERARCHY;
-            }
-
-            @Override
-            public boolean equals(final Object o)
-            {
-                if (this == o)
-                {
-                    return true;
-                }
-                if (o == null || getClass() != o.getClass())
-                {
-                    return false;
-                }
-
-                final LogMessage that = (LogMessage) o;
-
-                return getLogHierarchy().equals(that.getLogHierarchy()) && toString().equals(that.toString());
-
-            }
-
-            @Override
-            public int hashCode()
-            {
-                int result = toString().hashCode();
-                result = 31 * result + getLogHierarchy().hashCode();
-                return result;
-            }
-        };
-    }
-
-    /**
-     * Log a Queue message of the Format:
      * <pre>QUE-1004 : Underfull : Size : {0,number} bytes, Resume Capacity : {1,number}, Messages : {2,number}, Message Capacity : {3,number}</pre>
      * Optional values are contained in [square brackets] and are numbered
      * sequentially in the method call.
@@ -556,65 +494,6 @@ public class QueueMessages
         };
     }
 
-    /**
-     * Log a Queue message of the Format:
-     * <pre>QUE-1015 : Message flow to disk inactive : Queue : depth {0,number,#.##} kB, threshold {1,number,#.##} kB / Broker : direct memory used {2,number,#.##} MB, threshold {3,number,#.##} MB</pre>
-     * Optional values are contained in [square brackets] and are numbered
-     * sequentially in the method call.
-     *
-     */
-    public static LogMessage FLOW_TO_DISK_INACTIVE(Number param1, Number param2, Number param3, Number param4)
-    {
-        String rawMessage = _messages.getString("FLOW_TO_DISK_INACTIVE");
-
-        final Object[] messageArguments = {param1, param2, param3, param4};
-        // Create a new MessageFormat to ensure thread safety.
-        // Sharing a MessageFormat and using applyPattern is not thread safe
-        MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale);
-
-        final String message = formatter.format(messageArguments);
-
-        return new LogMessage()
-        {
-            public String toString()
-            {
-                return message;
-            }
-
-            public String getLogHierarchy()
-            {
-                return FLOW_TO_DISK_INACTIVE_LOG_HIERARCHY;
-            }
-
-            @Override
-            public boolean equals(final Object o)
-            {
-                if (this == o)
-                {
-                    return true;
-                }
-                if (o == null || getClass() != o.getClass())
-                {
-                    return false;
-                }
-
-                final LogMessage that = (LogMessage) o;
-
-                return getLogHierarchy().equals(that.getLogHierarchy()) && toString().equals(that.toString());
-
-            }
-
-            @Override
-            public int hashCode()
-            {
-                int result = toString().hashCode();
-                result = 31 * result + getLogHierarchy().hashCode();
-                return result;
-            }
-        };
-    }
-
-
     private QueueMessages()
     {
     }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/501aa891/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Queue_logmessages.properties
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Queue_logmessages.properties b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Queue_logmessages.properties
index 9061f0d..cd4ecdb 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Queue_logmessages.properties
+++ b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Queue_logmessages.properties
@@ -27,9 +27,9 @@ OVERFULL = QUE-1003 : Overfull : Size : {0,number} bytes, Capacity : {1,number},
 UNDERFULL = QUE-1004 : Underfull : Size : {0,number} bytes, Resume Capacity : {1,number}, Messages : {2,number}, Message Capacity : {3,number}
 DROPPED = QUE-1005 : Dropped : {0,number} messages, Depth : {1,number} bytes, {2,number} messages, Capacity : {3,number} bytes, {4,number} messages
 
-# use similar number to the broker for similar topic
-FLOW_TO_DISK_ACTIVE = QUE-1014 : Message flow to disk active : Queue : depth {0,number,#.##} kB, threshold {1,number,#.##} kB / Broker : direct memory used {2,number,#.##} MB, threshold {3,number,#.##} MB
-FLOW_TO_DISK_INACTIVE = QUE-1015 : Message flow to disk inactive : Queue : depth {0,number,#.##} kB, threshold {1,number,#.##} kB / Broker : direct memory used {2,number,#.##} MB, threshold {3,number,#.##} MB
+# These are no longer in use
+#FLOW_TO_DISK_ACTIVE = QUE-1014 : Message flow to disk active : Queue : depth {0,number,#.##} kB, threshold {1,number,#.##} kB / Broker : direct memory used {2,number,#.##} MB, threshold {3,number,#.##} MB
+#FLOW_TO_DISK_INACTIVE = QUE-1015 : Message flow to disk inactive : Queue : depth {0,number,#.##} kB, threshold {1,number,#.##} kB / Broker : direct memory used {2,number,#.##} MB, threshold {3,number,#.##} MB
 
 # 0 - operation name
 OPERATION = QUE-1016 : Operation : {0}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/501aa891/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java b/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
index b60624a..978abc5 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
@@ -213,11 +213,11 @@ public abstract class AbstractServerMessageImpl<X extends AbstractServerMessageI
         private final UUID _resourceId;
         private volatile int _released;
 
-        private Reference(final AbstractServerMessageImpl<X, T> message)
+        private Reference(final AbstractServerMessageImpl<X, T> message) throws MessageDeletedException
         {
             this(message, null);
         }
-        private Reference(final AbstractServerMessageImpl<X, T> message, TransactionLogResource resource)
+        private Reference(final AbstractServerMessageImpl<X, T> message, TransactionLogResource resource) throws MessageDeletedException
         {
             _message = message;
             if(resource != null)
@@ -299,6 +299,11 @@ public abstract class AbstractServerMessageImpl<X extends AbstractServerMessageI
             }
         }
 
+        @Override
+        public void close()
+        {
+            release();
+        }
     }
 
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/501aa891/broker-core/src/main/java/org/apache/qpid/server/message/MessageReference.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/MessageReference.java b/broker-core/src/main/java/org/apache/qpid/server/message/MessageReference.java
index eda8550..dfe5d64 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/message/MessageReference.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/MessageReference.java
@@ -20,8 +20,9 @@
  */
 package org.apache.qpid.server.message;
 
-public interface MessageReference<M extends ServerMessage>
+public interface MessageReference<M extends ServerMessage> extends AutoCloseable
 {
-    public M getMessage();
-    public void release();
+    M getMessage();
+    void release();
+    void close();
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/501aa891/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java b/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
index 19e6de0..044f035 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
@@ -163,6 +163,9 @@ public interface Broker<X extends Broker<X>> extends ConfiguredObject<X>, EventL
     @ManagedContextDefault( name = "broker.housekeepingThreadCount")
     public static final int DEFAULT_HOUSEKEEPING_THREAD_COUNT = 2;
 
+    String QPID_BROKER_HOUSEKEEPING_CHECK_PERIOD = "qpid.broker.housekeepingCheckPeriod";
+    @ManagedContextDefault(name = QPID_BROKER_HOUSEKEEPING_CHECK_PERIOD)
+    long DEFAULT_BROKER_HOUSEKEEPING_CHECK_PERIOD = 30000L;
 
     @ManagedAttribute( defaultValue = "${broker.housekeepingThreadCount}")
     int getHousekeepingThreadCount();
@@ -377,6 +380,9 @@ public interface Broker<X extends Broker<X>> extends ConfiguredObject<X>, EventL
     @DerivedAttribute(description = "Minimum fraction of direct memory buffer that can be occupied before the buffer is considered for compaction")
     double getSparsityFraction();
 
+    @DerivedAttribute()
+    long getHousekeepingCheckPeriod();
+
     @ManagedOperation(changesConfiguredObjectState = false, nonModifying = true,
             description = "Force direct memory buffer compaction.")
     void compactMemory();

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/501aa891/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java b/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java
index 0840b66..b9242dc 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java
@@ -40,6 +40,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
 
@@ -152,6 +153,8 @@ public class BrokerImpl extends AbstractContainer<BrokerImpl> implements Broker<
     private long _flowToDiskThreshold;
     private double _sparsityFraction;
     private long _lastDisposalCounter;
+    private ScheduledFuture<?> _assignTargetSizeSchedulingFuture;
+    private long _housekeepingCheckPeriod;
 
     @ManagedObjectFactoryConstructor
     public BrokerImpl(Map<String, Object> attributes,
@@ -420,6 +423,9 @@ public class BrokerImpl extends AbstractContainer<BrokerImpl> implements Broker<
                                                              getSystemTaskSubject("Housekeeping", _principal));
 
         scheduleDirectMemoryCheck();
+        _assignTargetSizeSchedulingFuture = scheduleHouseKeepingTask(getHousekeepingCheckPeriod(),
+                                                                     TimeUnit.MILLISECONDS,
+                                                                     this::assignTargetSizes);
 
         final PreferenceStoreUpdaterImpl updater = new PreferenceStoreUpdaterImpl();
         final Collection<PreferenceRecord> preferenceRecords = _preferenceStore.openAndLoad(updater);
@@ -624,6 +630,7 @@ public class BrokerImpl extends AbstractContainer<BrokerImpl> implements Broker<
         _flowToDiskThreshold = getContextValue(Long.class, BROKER_FLOW_TO_DISK_THRESHOLD);
         _compactMemoryThreshold = getContextValue(Long.class, Broker.COMPACT_MEMORY_THRESHOLD);
         _compactMemoryInterval = getContextValue(Long.class, Broker.COMPACT_MEMORY_INTERVAL);
+        _housekeepingCheckPeriod = getContextValue(Long.class, Broker.QPID_BROKER_HOUSEKEEPING_CHECK_PERIOD);
 
         if (SystemUtils.getProcessPid() != null)
         {
@@ -732,6 +739,11 @@ public class BrokerImpl extends AbstractContainer<BrokerImpl> implements Broker<
             _reportingTimer.cancel();
         }
 
+        if (_assignTargetSizeSchedulingFuture != null)
+        {
+            _assignTargetSizeSchedulingFuture.cancel(true);
+        }
+
         shutdownHouseKeeping();
 
         stopPreferenceTaskExecutor();
@@ -1222,6 +1234,12 @@ public class BrokerImpl extends AbstractContainer<BrokerImpl> implements Broker<
     }
 
     @Override
+    public long getHousekeepingCheckPeriod()
+    {
+        return _housekeepingCheckPeriod;
+    }
+
+    @Override
     public void compactMemory()
     {
         compactMemoryInternal();

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/501aa891/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java b/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
index f2cbc25..7e154ed 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
@@ -37,6 +37,7 @@ import org.apache.qpid.server.queue.BaseQueue;
 import org.apache.qpid.server.queue.NotificationCheck;
 import org.apache.qpid.server.queue.QueueConsumer;
 import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.queue.QueueEntryIterator;
 import org.apache.qpid.server.queue.QueueEntryVisitor;
 import org.apache.qpid.server.store.MessageDurability;
 import org.apache.qpid.server.store.MessageEnqueueRecord;
@@ -471,8 +472,6 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X>,
 
     void recover(ServerMessage<?> message, MessageEnqueueRecord enqueueRecord);
 
-    void setTargetSize(long targetSize);
-
     boolean isHeld(QueueEntry queueEntry, final long evaluationTime);
 
     void checkCapacity();
@@ -480,4 +479,6 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X>,
     void deleteEntry(QueueEntry entry);
 
     QueueEntry getLeastSignificantOldestEntry();
+
+    QueueEntryIterator queueEntryIterator();
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/501aa891/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 6deda67..31b77e7 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -142,7 +142,6 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
         }
     };
 
-    private static final long INITIAL_TARGET_QUEUE_SIZE = 102400l;
     private static final String UTF8 = StandardCharsets.UTF_8.name();
     private static final Operation PUBLISH_ACTION = Operation.ACTION("publish");
 
@@ -156,8 +155,6 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
 
     private volatile QueueConsumer<?,?> _exclusiveSubscriber;
 
-    private final AtomicLong _targetQueueSize = new AtomicLong(INITIAL_TARGET_QUEUE_SIZE);
-
     private final AtomicInteger _activeSubscriberCount = new AtomicInteger();
 
     private final QueueStatistics _queueStatistics = new QueueStatistics();
@@ -210,7 +207,6 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
     @ManagedAttributeField
     private boolean _noLocal;
 
-    private final FlowToDiskChecker _flowToDiskChecker = new FlowToDiskChecker();
     private final CopyOnWriteArrayList<Binding> _bindings = new CopyOnWriteArrayList<>();
     private Map<String, Object> _arguments;
 
@@ -1101,9 +1097,13 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
             doEnqueue(message, action, enqueueRecord);
         }
 
-        long estimatedQueueSize = _queueStatistics.getQueueSizeIncludingHeader();
-        _flowToDiskChecker.flowToDiskAndReportIfNecessary(message.getStoredMessage(), estimatedQueueSize,
-                                                          _targetQueueSize.get());
+        final StoredMessage storedMessage = message.getStoredMessage();
+        if ((_virtualHost.isOverTargetSize()
+             || QpidByteBuffer.getAllocatedDirectMemorySize() > _flowToDiskThreshold)
+            && storedMessage.isInMemory())
+        {
+            storedMessage.flowToDisk();
+        }
     }
 
     public final void recover(ServerMessage message, final MessageEnqueueRecord enqueueRecord)
@@ -1229,15 +1229,6 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
         // Simple Queues don't :-)
     }
 
-    @Override
-    public void setTargetSize(final long targetSize)
-    {
-        if (_targetQueueSize.compareAndSet(_targetQueueSize.get(), targetSize))
-        {
-            _logger.debug("Queue '{}' target size : {}", getName(), targetSize);
-        }
-    }
-
     public long getTotalDequeuedMessages()
     {
         return _queueStatistics.getDequeueCount();
@@ -1460,6 +1451,12 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
 
     }
 
+    @Override
+    public QueueEntryIterator queueEntryIterator()
+    {
+        return getEntries().iterator();
+    }
+
     public int compareTo(final X o)
     {
         return getName().compareTo(o.getName());
@@ -2032,9 +2029,6 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
     {
         QueueEntryIterator queueListIterator = getEntries().iterator();
 
-        final long estimatedQueueSize = _queueStatistics.getQueueSizeIncludingHeader();
-        _flowToDiskChecker.reportFlowToDiskStatusIfNecessary(estimatedQueueSize, _targetQueueSize.get());
-
         final Set<NotificationCheck> perMessageChecks = new HashSet<>();
         final Set<NotificationCheck> queueLevelChecks = new HashSet<>();
 
@@ -2053,7 +2047,6 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
         final long currentTime = System.currentTimeMillis();
         final long thresholdTime = currentTime - getAlertRepeatGap();
 
-        long cumulativeQueueSize = 0;
         while (!_stopped.get() && queueListIterator.advance())
         {
             final QueueEntry node = queueListIterator.getNode();
@@ -2075,14 +2068,8 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
                     ServerMessage msg = node.getMessage();
                     if (msg != null)
                     {
-                        MessageReference messageReference = null;
-                        try
+                        try (MessageReference messageReference = msg.newReference())
                         {
-                            messageReference = msg.newReference();
-                            cumulativeQueueSize += msg.getSizeIncludingHeader();
-                            _flowToDiskChecker.flowToDiskIfNecessary(msg.getStoredMessage(), cumulativeQueueSize,
-                                                                     _targetQueueSize.get());
-
                             for(NotificationCheck check : perMessageChecks)
                             {
                                 checkForNotification(msg, listener, currentTime, thresholdTime, check);
@@ -2092,13 +2079,6 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
                         {
                             // Ignore
                         }
-                        finally
-                        {
-                            if (messageReference != null)
-                            {
-                                messageReference.release();
-                            }
-                        }
                     }
                 }
             }
@@ -3329,71 +3309,6 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
         }
     }
 
-    private class FlowToDiskChecker
-    {
-        final AtomicBoolean _lastReportedFlowToDiskStatus = new AtomicBoolean(false);
-
-        void flowToDiskIfNecessary(StoredMessage<?> storedMessage, long estimatedQueueSize, final long targetQueueSize)
-        {
-            if ((estimatedQueueSize > targetQueueSize
-                 || QpidByteBuffer.getAllocatedDirectMemorySize() > _flowToDiskThreshold)
-                && storedMessage.isInMemory())
-            {
-                storedMessage.flowToDisk();
-            }
-        }
-
-        void flowToDiskAndReportIfNecessary(StoredMessage<?> storedMessage,
-                                            final long estimatedQueueSize,
-                                            final long targetQueueSize)
-        {
-            flowToDiskIfNecessary(storedMessage, estimatedQueueSize, targetQueueSize);
-            reportFlowToDiskStatusIfNecessary(estimatedQueueSize, targetQueueSize);
-        }
-
-        void reportFlowToDiskStatusIfNecessary(final long estimatedQueueSize, final long targetQueueSize)
-        {
-            final int allocatedDirectMemorySize = QpidByteBuffer.getAllocatedDirectMemorySize();
-            if (estimatedQueueSize > targetQueueSize
-                || allocatedDirectMemorySize > _flowToDiskThreshold)
-            {
-                reportFlowToDiskActiveIfNecessary(estimatedQueueSize, targetQueueSize, allocatedDirectMemorySize, _flowToDiskThreshold);
-            }
-            else
-            {
-                reportFlowToDiskInactiveIfNecessary(estimatedQueueSize, targetQueueSize, allocatedDirectMemorySize, _flowToDiskThreshold);
-            }
-        }
-
-        private void reportFlowToDiskActiveIfNecessary(long estimatedQueueSize,
-                                                       long targetQueueSize,
-                                                       long allocatedDirectMemorySize,
-                                                       long flowToDiskThreshold)
-        {
-            if (!_lastReportedFlowToDiskStatus.getAndSet(true))
-            {
-                getEventLogger().message(_logSubject, QueueMessages.FLOW_TO_DISK_ACTIVE(estimatedQueueSize / 1024.0,
-                                                                                        targetQueueSize / 1024.0,
-                                                                                        allocatedDirectMemorySize / 1024.0 / 1024.0,
-                                                                                        flowToDiskThreshold / 1024.0 / 1024.0));
-            }
-        }
-
-        private void reportFlowToDiskInactiveIfNecessary(long estimatedQueueSize,
-                                                         long targetQueueSize,
-                                                         long allocatedDirectMemorySize,
-                                                         long flowToDiskThreshold)
-        {
-            if (_lastReportedFlowToDiskStatus.getAndSet(false))
-            {
-                getEventLogger().message(_logSubject, QueueMessages.FLOW_TO_DISK_INACTIVE(estimatedQueueSize / 1024.0,
-                                                                                          targetQueueSize / 1024.0,
-                                                                                          allocatedDirectMemorySize / 1024.0 / 1024.0,
-                                                                                          flowToDiskThreshold / 1024.0 / 1024.0));
-            }
-        }
-    }
-
     private class AdvanceConsumersTask extends HouseKeepingTask
     {
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/501aa891/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
index b2fa18a..22020c0 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
@@ -52,6 +52,7 @@ public class MemoryMessageStore implements MessageStore
     private final Object _transactionLock = new Object();
     private final Map<UUID, Set<Long>> _messageInstances = new HashMap<UUID, Set<Long>>();
     private final Map<Xid, DistributedTransactionRecords> _distributedTransactions = new HashMap<Xid, DistributedTransactionRecords>();
+    private final AtomicLong _inMemorySize = new AtomicLong();
 
 
     private final class MemoryMessageStoreTransaction implements Transaction
@@ -289,13 +290,23 @@ public class MemoryMessageStore implements MessageStore
         {
 
             @Override
+            public synchronized StoredMessage<T> allContentAdded()
+            {
+                final StoredMessage<T> storedMessage = super.allContentAdded();
+                _inMemorySize.addAndGet(getContentSize());
+                return storedMessage;
+            }
+
+            @Override
             public void remove()
             {
                 _messages.remove(getMessageNumber());
+                int bytesCleared = metaData.getStorableSize() + metaData.getContentSize();
                 super.remove();
+                _inMemorySize.addAndGet(-bytesCleared);
             }
-
         };
+        _inMemorySize.addAndGet(metaData.getStorableSize());
 
         return storedMemoryMessage;
 
@@ -314,6 +325,18 @@ public class MemoryMessageStore implements MessageStore
     }
 
     @Override
+    public long getInMemorySize()
+    {
+        return _inMemorySize.get();
+    }
+
+    @Override
+    public long getBytesEvacuatedFromMemory()
+    {
+        return 0L;
+    }
+
+    @Override
     public Transaction newTransaction()
     {
         return new MemoryMessageStoreTransaction();
@@ -323,6 +346,7 @@ public class MemoryMessageStore implements MessageStore
     public void closeMessageStore()
     {
         _messages.clear();
+        _inMemorySize.set(0);
         synchronized (_transactionLock)
         {
             _messageInstances.clear();

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/501aa891/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java b/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java
index ab5764e..b50dbb1 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java
@@ -61,6 +61,10 @@ public interface MessageStore
 
     <T extends StorableMessageMetaData> MessageHandle<T> addMessage(T metaData);
 
+    long getInMemorySize();
+
+    long getBytesEvacuatedFromMemory();
+
     /**
      * Is this store capable of persisting the data
      *

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/501aa891/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java b/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
index e543084..37c79e5 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
@@ -97,6 +97,18 @@ public abstract class NullMessageStore implements MessageStore, DurableConfigura
     }
 
     @Override
+    public long getInMemorySize()
+    {
+        return 0;
+    }
+
+    @Override
+    public long getBytesEvacuatedFromMemory()
+    {
+        return 0L;
+    }
+
+    @Override
     public Transaction newTransaction()
     {
         return null;

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/501aa891/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index 7c3ac37..67bee82 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.server.virtualhost;
 
+import static com.google.common.collect.Iterators.cycle;
 import static java.util.Collections.newSetFromMap;
 
 import java.io.BufferedInputStream;
@@ -88,8 +89,10 @@ import org.apache.qpid.server.logging.messages.VirtualHostMessages;
 import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
 import org.apache.qpid.server.message.AMQMessageHeader;
 import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageDeletedException;
 import org.apache.qpid.server.message.MessageDestination;
 import org.apache.qpid.server.message.MessageNode;
+import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.message.RoutingResult;
 import org.apache.qpid.server.message.ServerMessage;
@@ -105,6 +108,7 @@ import org.apache.qpid.server.plugin.SystemNodeCreator;
 import org.apache.qpid.server.pool.SuppressingInheritedAccessControlContextThreadFactory;
 import org.apache.qpid.server.protocol.LinkModel;
 import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.queue.QueueEntryIterator;
 import org.apache.qpid.server.security.AccessControl;
 import org.apache.qpid.server.security.CompoundAccessControl;
 import org.apache.qpid.server.security.Result;
@@ -308,8 +312,6 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
         _fileSystemSpaceCheckerJobContext = getSystemTaskControllerContext("FileSystemSpaceChecker["+getName()+"]", _principal);
 
         _fileSystemSpaceChecker = new FileSystemSpaceChecker();
-
-        addChangeListener(new TargetSizeAssigningListener());
     }
 
     private void updateAccessControl()
@@ -1165,6 +1167,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
         if (period > 0L)
         {
             scheduleHouseKeepingTask(period, new VirtualHostHouseKeepingTask());
+            scheduleHouseKeepingTask(period, new FlowToDiskCheckingTask());
         }
     }
 
@@ -1444,6 +1447,18 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
     }
 
     @Override
+    public long getInMemoryMessageSize()
+    {
+        return _messageStore == null ? -1 : _messageStore.getInMemorySize();
+    }
+
+    @Override
+    public long getBytesEvacuatedFromMemory()
+    {
+        return _messageStore == null ? -1 : _messageStore.getBytesEvacuatedFromMemory();
+    }
+
+    @Override
     public <T extends ConfiguredObject<?>> T getAttainedChildFromAddress(final Class<T> childClass,
                                                                          final String address)
     {
@@ -1738,6 +1753,12 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
         return null;
     }
 
+    @Override
+    public boolean isOverTargetSize()
+    {
+        return getInMemoryMessageSize() > _targetSize.get();
+    }
+
     private static class MessageHeaderImpl implements AMQMessageHeader
     {
         private final String _userName;
@@ -1856,46 +1877,84 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
         }
     }
 
-    private class TargetSizeAssigningListener extends AbstractConfigurationChangeListener
+    private class VirtualHostHouseKeepingTask extends HouseKeepingTask
     {
-        @Override
-        public void childAdded(final ConfiguredObject<?> object, final ConfiguredObject<?> child)
+        public VirtualHostHouseKeepingTask()
         {
-            if (child instanceof Queue)
-            {
-                allocateTargetSizeToQueues();
-            }
+            super("Housekeeping["+AbstractVirtualHost.this.getName()+"]",AbstractVirtualHost.this,_housekeepingJobContext);
         }
 
-        @Override
-        public void childRemoved(final ConfiguredObject<?> object,
-                                 final ConfiguredObject<?> child)
+        public void execute()
         {
-            if (child instanceof Queue)
+            for (Queue<?> q : getChildren(Queue.class))
             {
-                allocateTargetSizeToQueues();
+                if (q.getState() == State.ACTIVE)
+                {
+                    _logger.debug("Checking message status for queue: {}", q.getName());
+                    q.checkMessageStatus();
+                }
             }
         }
     }
 
-    private class VirtualHostHouseKeepingTask extends HouseKeepingTask
+    private class FlowToDiskCheckingTask extends HouseKeepingTask
     {
-        public VirtualHostHouseKeepingTask()
+        public FlowToDiskCheckingTask()
         {
-            super("Housekeeping["+AbstractVirtualHost.this.getName()+"]",AbstractVirtualHost.this,_housekeepingJobContext);
+            super("FlowToDiskChecking["+AbstractVirtualHost.this.getName()+"]", AbstractVirtualHost.this, _housekeepingJobContext);
         }
 
+        @Override
         public void execute()
         {
-            Broker<?> broker = getAncestor(Broker.class);
-            broker.assignTargetSizes();
-
-            for (Queue<?> q : getChildren(Queue.class))
+            if (isOverTargetSize())
             {
-                if (q.getState() == State.ACTIVE)
+                long currentTargetSize = _targetSize.get();
+                List<QueueEntryIterator> queueIterators = new ArrayList<>();
+                for (Queue<?> q : getChildren(Queue.class))
                 {
-                    _logger.debug("Checking message status for queue: {}", q.getName());
-                    q.checkMessageStatus();
+                    queueIterators.add(q.queueEntryIterator());
+                }
+                Collections.shuffle(queueIterators);
+
+                long cumulativeSize = 0;
+                final Iterator<QueueEntryIterator> cyclicIterators = cycle(queueIterators);
+                while (cyclicIterators.hasNext())
+                {
+                    final QueueEntryIterator queueIterator = cyclicIterators.next();
+                    if (queueIterator.advance())
+                    {
+                        QueueEntry node = queueIterator.getNode();
+                        if (node != null && !node.isDeleted())
+                        {
+                            try (MessageReference messageReference = node.getMessage().newReference())
+                            {
+                                final StoredMessage storedMessage = messageReference.getMessage().getStoredMessage();
+                                if (storedMessage.isInMemory())
+                                {
+                                    if (cumulativeSize <= currentTargetSize)
+                                    {
+                                        cumulativeSize += storedMessage.getContentSize();
+                                        cumulativeSize += storedMessage.getMetaData() == null
+                                                ? 0
+                                                : storedMessage.getMetaData().getStorableSize();
+                                    }
+                                    else
+                                    {
+                                        storedMessage.flowToDisk();
+                                    }
+                                }
+                            }
+                            catch (MessageDeletedException e)
+                            {
+                                // pass
+                            }
+                        }
+                    }
+                    else
+                    {
+                        cyclicIterators.remove();
+                    }
                 }
             }
         }
@@ -2396,7 +2455,6 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
     public void setTargetSize(final long targetSize)
     {
         _targetSize.set(targetSize);
-        allocateTargetSizeToQueues();
     }
 
     public long getTargetSize()
@@ -2404,38 +2462,12 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
         return _targetSize.get();
     }
 
-    private void allocateTargetSizeToQueues()
-    {
-        long targetSize = _targetSize.get();
-        Collection<Queue> queues = getChildren(Queue.class);
-        long totalSize = calculateTotalEnqueuedSize(queues);
-        _logger.debug("Allocating target size to queues, total target: {} ; total enqueued size {}", targetSize, totalSize);
-        if (targetSize > 0l)
-        {
-            for (Queue<?> q : queues)
-            {
-                long size;
-                if (totalSize == 0)
-                {
-                    size = targetSize / queues.size();
-                }
-                else
-                {
-                    size = (long) ((q.getQueueDepthBytesIncludingHeader() / (double) totalSize) * targetSize);
-                }
-
-                q.setTargetSize(size);
-            }
-        }
-    }
-
     @Override
     public long getTotalQueueDepthBytes()
     {
         return calculateTotalEnqueuedSize(getChildren(Queue.class));
     }
 
-
     @Override
     public Principal getPrincipal()
     {

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/501aa891/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java
index a47ab37..77289d0 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java
@@ -206,6 +206,12 @@ public interface QueueManagingVirtualHost<X extends QueueManagingVirtualHost<X>>
     @ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units = StatisticUnit.BYTES, label = "Total Depth of Queues Including Header")
     long getTotalDepthOfQueuesBytesIncludingHeader();
 
+    @ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units = StatisticUnit.BYTES, label = "Total Memory Occupied by Message Headers and Content")
+    long getInMemoryMessageSize();
+
+    @ManagedStatistic(statisticType = StatisticType.CUMULATIVE, units = StatisticUnit.BYTES, label = "Total Number of Bytes Evacuated from Memory Due to Flow to Disk")
+    long getBytesEvacuatedFromMemory();
+
     @Override
     @ManagedOperation(nonModifying = true, changesConfiguredObjectState = false)
     Collection<? extends Connection<?>> getConnections();
@@ -285,6 +291,8 @@ public interface QueueManagingVirtualHost<X extends QueueManagingVirtualHost<X>>
 
     ListenableFuture<Void> reallocateMessages();
 
+    boolean isOverTargetSize();
+
     interface Transaction
     {
         void dequeue(QueueEntry entry);

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/501aa891/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java b/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
index 3b6a509..f384a13 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
@@ -77,6 +77,12 @@ public class TestMessageMetaDataType implements MessageMetaDataType<TestMessageM
         {
 
             @Override
+            public void close()
+            {
+                release();
+            }
+
+            @Override
             public ServerMessage getMessage()
             {
                 return TestServerMessage.this;

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/501aa891/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
----------------------------------------------------------------------
diff --git a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
index 918e193..277810d 100644
--- a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
+++ b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
@@ -84,6 +84,8 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
     protected final EventManager _eventManager = new EventManager();
     private ConfiguredObject<?> _parent;
     private String _tablePrefix = "";
+    private final AtomicLong _inMemorySize = new AtomicLong();
+    private final AtomicLong _bytesEvacuatedFromMemory = new AtomicLong();
 
     protected abstract boolean isMessageStoreOpen();
 
@@ -245,6 +247,8 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
     @Override
     public void closeMessageStore()
     {
+        _inMemorySize.set(0);
+        _bytesEvacuatedFromMemory.set(0);
         if(_executor != null)
         {
             _executor.shutdown();
@@ -1124,6 +1128,17 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
         return true;
     }
 
+    @Override
+    public long getInMemorySize()
+    {
+        return _inMemorySize.get();
+    }
+
+    @Override
+    public long getBytesEvacuatedFromMemory()
+    {
+        return _bytesEvacuatedFromMemory.get();
+    }
 
     protected class JDBCTransaction implements Transaction
     {
@@ -1408,10 +1423,12 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
             _data = data;
         }
 
-        public void clear()
+        public long clear()
         {
+            long bytesCleared = 0;
             if(_metaData != null)
             {
+                bytesCleared += _metaData.getStorableSize();
                 _metaData.clearEncodedForm();
                 _metaData = null;
             }
@@ -1419,10 +1436,12 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
             {
                 for(QpidByteBuffer buf : _data)
                 {
+                    bytesCleared += buf.remaining();
                     buf.dispose();
                 }
+                _data = null;
             }
-            _data = null;
+            return bytesCleared;
         }
 
         @Override
@@ -1471,7 +1490,9 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
             {
                 _messageDataRef = new MessageDataSoftRef<>(metaData, null);
             }
+
             _contentSize = metaData.getContentSize();
+            _inMemorySize.addAndGet(metaData.getStorableSize());
         }
 
 
@@ -1531,6 +1552,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
         @Override
         public StoredMessage<T> allContentAdded()
         {
+            _inMemorySize.addAndGet(getContentSize());
             return this;
         }
 
@@ -1668,14 +1690,17 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
             }
 
             final T metaData;
+            long bytesCleared = 0;
             if ((metaData = _messageDataRef.getMetaData()) != null)
             {
+                bytesCleared += metaData.getStorableSize();
                 metaData.dispose();
             }
 
             Collection<QpidByteBuffer> data = _messageDataRef.getData();
             if(data != null)
             {
+                bytesCleared += getContentSize();
                 _messageDataRef.setData(null);
                 for(QpidByteBuffer buf : data)
                 {
@@ -1683,6 +1708,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
                 }
             }
             _messageDataRef = null;
+            _inMemorySize.addAndGet(-bytesCleared);
         }
 
         @Override
@@ -1703,7 +1729,9 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
             flushToStore();
             if(_messageDataRef != null && !_messageDataRef.isHardRef())
             {
-                ((MessageDataSoftRef)_messageDataRef).clear();
+                final long bytesCleared = ((MessageDataSoftRef) _messageDataRef).clear();
+                _inMemorySize.addAndGet(-bytesCleared);
+                _bytesEvacuatedFromMemory.addAndGet(bytesCleared);
             }
             return true;
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org