You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2019/06/15 10:21:52 UTC

[qpid-broker-j] branch master updated (dcffe40 -> 972fe15)

This is an automated email from the ASF dual-hosted git repository.

orudyy pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git.


    from dcffe40  QPID-8318 : Fix system test
     new f41bfe6  QPID-8316: [Broker-J] Flow to disk messages with metadata loaded into direct memory
     new ee225cd  QPID-8316: [Broker-J] Message validation should not store decoded headers in heap
     new e9f02b3  QPID-8322: [Broker-J] Fix credit restoration in window credit manager when infinite credit limit is used
     new 972fe15  NO-JIRA: Fix NPE in conversion tests

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../store/berkeleydb/AbstractBDBMessageStore.java  |  27 ++++-
 .../store/berkeleydb/BDBMessageStoreTest.java      |   6 ++
 .../server/message/AbstractServerMessageImpl.java  |   2 +-
 .../server/message/internal/InternalMessage.java   |   8 +-
 .../apache/qpid/server/queue/AbstractQueue.java    |  24 ++---
 .../qpid/server/store/StoredMemoryMessage.java     |   8 +-
 .../apache/qpid/server/store/StoredMessage.java    |   4 +-
 .../server/virtualhost/AbstractVirtualHost.java    |  16 ++-
 .../qpid/server/queue/AbstractQueueTestBase.java   |  58 ++++++++++
 .../queue/FlowToDiskOverflowPolicyHandlerTest.java |   3 +-
 .../qpid/server/store/MemoryMessageStoreTest.java  |   6 ++
 .../qpid/server/store/MessageStoreTestCase.java    | 112 +++++++++++++++++++-
 .../virtualhost/FlowToDiskCheckingTaskTest.java    | 117 +++++++++++++++++++++
 .../server/protocol/v0_10/ConsumerTarget_0_10.java |  10 +-
 .../v0_10/MessageConverter_Internal_to_v0_10.java  |   8 +-
 .../protocol/v0_10/MessageConverter_v0_10.java     |   8 +-
 .../server/protocol/v0_10/WindowCreditManager.java |  22 ++--
 .../protocol/v0_10/WindowCreditManagerTest.java    |  15 +++
 .../qpid/server/protocol/v0_8/FieldTable.java      |  28 +++--
 .../v0_8/MessageConverter_Internal_to_v0_8.java    |   8 +-
 .../protocol/v1_0/MessageConverter_to_1_0.java     |   8 +-
 .../v0_10_v1_0/MessageConverter_1_0_to_v0_10.java  |   8 +-
 .../v0_8_v0_10/MessageConverter_0_10_to_0_8.java   |   8 +-
 .../v0_8_v0_10/MessageConverter_0_8_to_0_10.java   |   8 +-
 .../v0_8_v1_0/MessageConverter_1_0_to_v0_8.java    |   8 +-
 .../server/store/derby/DerbyMessageStoreTest.java  |   6 ++
 .../store/jdbc/AbstractJDBCMessageStore.java       |  27 ++++-
 .../server/store/jdbc/JDBCMessageStoreTest.java    |   6 ++
 .../dependency_resolution/ClasspathQuery.java      |   7 +-
 29 files changed, 507 insertions(+), 69 deletions(-)
 create mode 100644 broker-core/src/test/java/org/apache/qpid/server/virtualhost/FlowToDiskCheckingTaskTest.java


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[qpid-broker-j] 04/04: NO-JIRA: Fix NPE in conversion tests

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orudyy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git

commit 972fe154991ad4f054aee983470da74e6ca9ab7c
Author: Alex Rudyy <or...@apache.org>
AuthorDate: Sat Jun 15 02:10:15 2019 +0100

    NO-JIRA: Fix NPE in conversion tests
---
 .../dependency_resolution/ClasspathQuery.java                      | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)

diff --git a/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/dependency_resolution/ClasspathQuery.java b/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/dependency_resolution/ClasspathQuery.java
index 921d214..8b62a99 100644
--- a/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/dependency_resolution/ClasspathQuery.java
+++ b/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/dependency_resolution/ClasspathQuery.java
@@ -153,7 +153,12 @@ public class ClasspathQuery
 
     private String buildClassPath(final Class<?> clientClazz, final Collection<String> gavs)
     {
-        List<File> classpathElements = new ArrayList<>(_classpathCache.getUnchecked(gavs));
+        final List<File> classpathElements = new ArrayList<>();
+        final List<File> cached = _classpathCache.getUnchecked(gavs);
+        if (cached != null)
+        {
+            classpathElements.addAll(cached);
+        }
         classpathElements.add(getLocalClasspathElement(clientClazz));
 
         final String collect = classpathElements.stream()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[qpid-broker-j] 03/04: QPID-8322: [Broker-J] Fix credit restoration in window credit manager when infinite credit limit is used

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orudyy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git

commit e9f02b361a0edbd27cdf37162ed304df281d9da7
Author: Alex Rudyy <or...@apache.org>
AuthorDate: Sat Jun 15 00:58:45 2019 +0100

    QPID-8322: [Broker-J] Fix credit restoration in window credit manager when infinite credit limit is used
---
 .../server/protocol/v0_10/ConsumerTarget_0_10.java | 10 ++++------
 .../server/protocol/v0_10/WindowCreditManager.java | 22 ++++++++++++++--------
 .../protocol/v0_10/WindowCreditManagerTest.java    | 15 +++++++++++++++
 3 files changed, 33 insertions(+), 14 deletions(-)

diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
index c6c312c..d1004ce 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
@@ -22,7 +22,6 @@ package org.apache.qpid.server.protocol.v0_10;
 
 import java.io.IOException;
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -69,16 +68,15 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget<ConsumerTarget_0
     private final String _targetAddress;
 
 
-    private FlowCreditManager_0_10 _creditManager;
+    private volatile FlowCreditManager_0_10 _creditManager;
 
     private final MessageAcceptMode _acceptMode;
     private final MessageAcquireMode _acquireMode;
-    private MessageFlowMode _flowMode;
+    private volatile MessageFlowMode _flowMode;
     private final ServerSession _session;
-    private final AtomicBoolean _stopped = new AtomicBoolean(true);
 
-    private int _deferredMessageCredit;
-    private long _deferredSizeCredit;
+    private volatile int _deferredMessageCredit;
+    private volatile long _deferredSizeCredit;
 
     private final StateChangeListener<MessageInstance, EntryState> _unacknowledgedMessageListener = new StateChangeListener<MessageInstance, EntryState>()
     {
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java
index 3f6f086..818617b 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java
@@ -63,18 +63,24 @@ public class WindowCreditManager implements FlowCreditManager_0_10
     @Override
     public synchronized void restoreCredit(final long messageCredit, final long bytesCredit)
     {
-        _messageUsed -= messageCredit;
-        if (_messageUsed < 0L)
+        if (_messageCreditLimit >= 0L)
         {
-            LOGGER.error("Message credit used value was negative: " + _messageUsed);
-            _messageUsed = 0;
+            _messageUsed -= messageCredit;
+            if (_messageUsed < 0L)
+            {
+                LOGGER.warn("Message credit used value was negative: " + _messageUsed);
+                _messageUsed = 0;
+            }
         }
 
-        _bytesUsed -= bytesCredit;
-        if (_bytesUsed < 0L)
+        if (_bytesCreditLimit >= 0L)
         {
-            LOGGER.error("Bytes credit used value was negative: " + _bytesUsed);
-            _bytesUsed = 0;
+            _bytesUsed -= bytesCredit;
+            if (_bytesUsed < 0L)
+            {
+                LOGGER.warn("Bytes credit used value was negative: " + _bytesUsed);
+                _bytesUsed = 0;
+            }
         }
     }
 
diff --git a/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManagerTest.java b/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManagerTest.java
index 3f1a403..d9ac43d 100644
--- a/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManagerTest.java
+++ b/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManagerTest.java
@@ -96,4 +96,19 @@ public class WindowCreditManagerTest extends UnitTestBase
         assertEquals("unexpected credit value", (long) 1, _creditManager.getMessageCredit());
         assertTrue("Manager should 'haveCredit'", _creditManager.hasCredit());
     }
+
+    @Test
+    public void testRestoreCreditWhenInfiniteBytesCredit()
+    {
+        _creditManager.addCredit(1, WindowCreditManager.INFINITE_CREDIT);
+
+        _creditManager.useCreditForMessage(10);
+        assertEquals(0, _creditManager.getMessageCredit());
+        assertEquals(Long.MAX_VALUE, _creditManager.getBytesCredit());
+
+        _creditManager.restoreCredit(1, 10);
+
+        assertEquals(1, _creditManager.getMessageCredit());
+        assertEquals(Long.MAX_VALUE, _creditManager.getBytesCredit());
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[qpid-broker-j] 01/04: QPID-8316: [Broker-J] Flow to disk messages with metadata loaded into direct memory

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orudyy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git

commit f41bfe651908e01bbca3406ea8a39b24571540b1
Author: Alex Rudyy <or...@apache.org>
AuthorDate: Thu Jun 13 22:00:29 2019 +0100

    QPID-8316: [Broker-J] Flow to disk messages with metadata loaded into direct memory
---
 .../store/berkeleydb/AbstractBDBMessageStore.java  |  27 ++++-
 .../store/berkeleydb/BDBMessageStoreTest.java      |   6 ++
 .../server/message/AbstractServerMessageImpl.java  |   2 +-
 .../server/message/internal/InternalMessage.java   |   8 +-
 .../apache/qpid/server/queue/AbstractQueue.java    |  24 ++---
 .../qpid/server/store/StoredMemoryMessage.java     |   8 +-
 .../apache/qpid/server/store/StoredMessage.java    |   4 +-
 .../server/virtualhost/AbstractVirtualHost.java    |  16 ++-
 .../qpid/server/queue/AbstractQueueTestBase.java   |  58 ++++++++++
 .../queue/FlowToDiskOverflowPolicyHandlerTest.java |   3 +-
 .../qpid/server/store/MemoryMessageStoreTest.java  |   6 ++
 .../qpid/server/store/MessageStoreTestCase.java    | 112 +++++++++++++++++++-
 .../virtualhost/FlowToDiskCheckingTaskTest.java    | 117 +++++++++++++++++++++
 .../v0_10/MessageConverter_Internal_to_v0_10.java  |   8 +-
 .../protocol/v0_10/MessageConverter_v0_10.java     |   8 +-
 .../v0_8/MessageConverter_Internal_to_v0_8.java    |   8 +-
 .../protocol/v1_0/MessageConverter_to_1_0.java     |   8 +-
 .../v0_10_v1_0/MessageConverter_1_0_to_v0_10.java  |   8 +-
 .../v0_8_v0_10/MessageConverter_0_10_to_0_8.java   |   8 +-
 .../v0_8_v0_10/MessageConverter_0_8_to_0_10.java   |   8 +-
 .../v0_8_v1_0/MessageConverter_1_0_to_v0_8.java    |   8 +-
 .../server/store/derby/DerbyMessageStoreTest.java  |   6 ++
 .../store/jdbc/AbstractJDBCMessageStore.java       |  27 ++++-
 .../server/store/jdbc/JDBCMessageStoreTest.java    |   6 ++
 24 files changed, 451 insertions(+), 43 deletions(-)

diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
index 6a57739..b48cd54 100644
--- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
+++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
@@ -1221,11 +1221,36 @@ public abstract class AbstractBDBMessageStore implements MessageStore
         }
 
         @Override
-        public synchronized boolean isInMemory()
+        public synchronized boolean isInContentInMemory()
         {
             return _messageDataRef != null && (_messageDataRef.isHardRef() || _messageDataRef.getData() != null);
         }
 
+        @Override
+        public synchronized long getInMemorySize()
+        {
+            long size = 0;
+            if (_messageDataRef != null)
+            {
+                if (_messageDataRef.isHardRef())
+                {
+                    size += getMetadataSize() + getContentSize();
+                }
+                else
+                {
+                    if (_messageDataRef.getMetaData() != null)
+                    {
+                        size += getMetadataSize();
+                    }
+                    if (_messageDataRef.getData() != null)
+                    {
+                        size += getContentSize();
+                    }
+                }
+            }
+            return size;
+        }
+
         private boolean stored()
         {
             return _messageDataRef != null && !_messageDataRef.isHardRef();
diff --git a/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java b/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
index b45f6c2..d3fea6e 100644
--- a/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
+++ b/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
@@ -212,4 +212,10 @@ public class BDBMessageStoreTest extends MessageStoreTestCase
         return new BDBMessageStore();
     }
 
+    @Override
+    protected boolean flowToDiskSupported()
+    {
+        return true;
+    }
+
 }
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java b/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
index 2f72098..ea1e8d7 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
@@ -194,7 +194,7 @@ public abstract class AbstractServerMessageImpl<X extends AbstractServerMessageI
     public QpidByteBuffer getContent(int offset, int length)
     {
         StoredMessage<T> storedMessage = getStoredMessage();
-        boolean wasInMemory = storedMessage.isInMemory();
+        boolean wasInMemory = storedMessage.isInContentInMemory();
         try
         {
             return storedMessage.getContent(offset, length);
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java b/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
index 75f56de..f1decc8 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
@@ -281,12 +281,18 @@ public class InternalMessage extends AbstractServerMessageImpl<InternalMessage,
                     }
 
                     @Override
-                    public boolean isInMemory()
+                    public boolean isInContentInMemory()
                     {
                         return true;
                     }
 
                     @Override
+                    public long getInMemorySize()
+                    {
+                        return getContentSize() + getMetadataSize();
+                    }
+
+                    @Override
                     public boolean flowToDisk()
                     {
                         return false;
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 375732b..22e42d4 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -1226,7 +1226,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
         final StoredMessage storedMessage = message.getStoredMessage();
         if ((_virtualHost.isOverTargetSize()
              || QpidByteBuffer.getAllocatedDirectMemorySize() > _flowToDiskThreshold)
-            && storedMessage.isInMemory())
+            && storedMessage.getInMemorySize() > 0)
         {
             if (message.checkValid())
             {
@@ -2383,26 +2383,14 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
     public boolean checkValid(final QueueEntry queueEntry)
     {
         final ServerMessage message = queueEntry.getMessage();
-        final ServerMessage.ValidationStatus validationStatus = message.getValidationStatus();
-        boolean isValid = false;
-        if (validationStatus == ServerMessage.ValidationStatus.UNKNOWN)
+        boolean isValid = true;
+        try (MessageReference ref = message.newReference())
         {
-            try (MessageReference ref = message.newReference())
-            {
-                isValid = message.checkValid();
-            }
-            catch (MessageDeletedException e)
-            {
-                // noop
-            }
-        }
-        else
-        {
-            isValid = validationStatus == ServerMessage.ValidationStatus.VALID;
+            isValid = message.checkValid();
         }
-        if (!isValid)
+        catch (MessageDeletedException e)
         {
-            malformedEntry(queueEntry);
+            // noop
         }
         return isValid;
     }
diff --git a/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java b/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
index 1132e82..118ec44 100755
--- a/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
@@ -117,12 +117,18 @@ public class StoredMemoryMessage<T extends StorableMessageMetaData> implements S
     }
 
     @Override
-    public boolean isInMemory()
+    public boolean isInContentInMemory()
     {
         return true;
     }
 
     @Override
+    public long getInMemorySize()
+    {
+        return getContentSize() + getMetadataSize();
+    }
+
+    @Override
     public boolean flowToDisk()
     {
         return false;
diff --git a/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java b/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java
index a1be172..c276e7b 100755
--- a/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java
@@ -41,7 +41,9 @@ public interface StoredMessage<M extends StorableMessageMetaData>
 
     void remove();
 
-    boolean isInMemory();
+    boolean isInContentInMemory();
+
+    long getInMemorySize();
 
     boolean flowToDisk();
 
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index c3b9ab3..7642ad0 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -2100,7 +2100,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
         }
     }
 
-    private class FlowToDiskCheckingTask extends HouseKeepingTask
+    class FlowToDiskCheckingTask extends HouseKeepingTask
     {
         public FlowToDiskCheckingTask()
         {
@@ -2133,19 +2133,17 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
                             try (MessageReference messageReference = node.getMessage().newReference())
                             {
                                 final StoredMessage storedMessage = messageReference.getMessage().getStoredMessage();
-                                if (storedMessage.isInMemory())
+                                final long inMemorySize = storedMessage.getInMemorySize();
+                                if (inMemorySize > 0)
                                 {
                                     if (cumulativeSize <= currentTargetSize)
                                     {
-                                        cumulativeSize += storedMessage.getContentSize();
-                                        cumulativeSize += storedMessage.getMetadataSize();
+                                        cumulativeSize += inMemorySize;
                                     }
-                                    else
+
+                                    if (cumulativeSize > currentTargetSize && node.getQueue().checkValid(node))
                                     {
-                                        if (node.getQueue().checkValid(node))
-                                        {
-                                            storedMessage.flowToDisk();
-                                        }
+                                        storedMessage.flowToDisk();
                                     }
                                 }
                             }
diff --git a/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java b/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
index 82b8dc8..4fcea19 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
@@ -32,6 +32,7 @@ import static org.mockito.ArgumentMatchers.contains;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.atLeastOnce;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyZeroInteractions;
 import static org.mockito.Mockito.when;
@@ -69,6 +70,7 @@ import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.message.RoutingResult;
 import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.message.internal.InternalMessage;
 import org.apache.qpid.server.model.AlternateBinding;
 import org.apache.qpid.server.model.Binding;
 import org.apache.qpid.server.model.BrokerTestHelper;
@@ -1214,6 +1216,62 @@ abstract class AbstractQueueTestBase extends UnitTestBase
                             (long) target.getQueueDepthMessages());
     }
 
+    @Test
+    public void testEnqueuedMessageFlowedToDisk() throws Exception
+    {
+        makeVirtualHostTargetSizeExceeded();
+
+        final ServerMessage message2 = createMessage(1L, 2, 3);
+        final long sizeIncludingHeader = message2.getSizeIncludingHeader();
+        final StoredMessage storedMessage = message2.getStoredMessage();
+        when(storedMessage.getInMemorySize()).thenReturn(sizeIncludingHeader);
+
+        _queue.enqueue(message2, null, null);
+
+        verify(storedMessage).getInMemorySize();
+        verify(storedMessage).flowToDisk();
+
+        assertEquals("Unexpected number of messages on the queue",
+                     2,
+                     _queue.getQueueDepthMessages());
+    }
+
+    @Test
+    public void testEnqueuedMalformedMessageDeleted() throws Exception
+    {
+        makeVirtualHostTargetSizeExceeded();
+
+        final ServerMessage message2 = createMessage(1L, 2, 3);
+        final long sizeIncludingHeader = message2.getSizeIncludingHeader();
+        final StoredMessage storedMessage = message2.getStoredMessage();
+        when(storedMessage.getInMemorySize()).thenReturn(sizeIncludingHeader);
+        when(message2.checkValid()).thenReturn(false);
+
+        _queue.enqueue(message2, null, null);
+
+        verify(storedMessage).getInMemorySize();
+        verify(storedMessage, never()).flowToDisk();
+
+        assertEquals("Unexpected number of messages on the queue",
+                     1,
+                     _queue.getQueueDepthMessages());
+    }
+
+    private void makeVirtualHostTargetSizeExceeded()
+    {
+        final InternalMessage message = InternalMessage.createMessage(_virtualHost.getMessageStore(),
+                                                                       mock(AMQMessageHeader.class),
+                                                                       "test",
+                                                                       true,
+                                                                       _qname);
+        _queue.enqueue(message, null, null);
+        assertEquals("Unexpected number of messages on the queue",
+                     1,
+                     _queue.getQueueDepthMessages());
+        _virtualHost.setTargetSize(1L);
+        assertTrue(_virtualHost.isOverTargetSize());
+    }
+
     private long getExpirationOnQueue(final Queue<?> queue, long arrivalTime, long expiration)
     {
         final List<QueueEntry> entries = new ArrayList<>();
diff --git a/broker-core/src/test/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandlerTest.java b/broker-core/src/test/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandlerTest.java
index 8351e53..9d8f997 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandlerTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandlerTest.java
@@ -134,7 +134,8 @@ public class FlowToDiskOverflowPolicyHandlerTest extends UnitTestBase
 
         StoredMessage storedMessage = mock(StoredMessage.class);
         when(message.getStoredMessage()).thenReturn(storedMessage);
-        when(storedMessage.isInMemory()).thenReturn(true);
+        when(storedMessage.isInContentInMemory()).thenReturn(true);
+        when(storedMessage.getInMemorySize()).thenReturn(size);
 
         MessageReference ref = mock(MessageReference.class);
         when(ref.getMessage()).thenReturn(message);
diff --git a/broker-core/src/test/java/org/apache/qpid/server/store/MemoryMessageStoreTest.java b/broker-core/src/test/java/org/apache/qpid/server/store/MemoryMessageStoreTest.java
index f05da24..39eb426 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/store/MemoryMessageStoreTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/store/MemoryMessageStoreTest.java
@@ -42,6 +42,12 @@ public class MemoryMessageStoreTest extends MessageStoreTestCase
     }
 
     @Override
+    protected boolean flowToDiskSupported()
+    {
+        return false;
+    }
+
+    @Override
     protected void reopenStore() throws Exception
     {
         // cannot re-open memory message store as it is not persistent
diff --git a/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java b/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java
index 0c8a4ca..f74f60e 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java
@@ -21,10 +21,14 @@
 package org.apache.qpid.server.store;
 
 import static junit.framework.TestCase.assertNull;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.junit.Assume.assumeThat;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.argThat;
 import static org.mockito.ArgumentMatchers.eq;
@@ -34,19 +38,22 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.util.Collections;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicReference;
 
-import org.hamcrest.Description;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.ArgumentMatcher;
 
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.message.AMQMessageHeader;
 import org.apache.qpid.server.message.EnqueueableMessage;
+import org.apache.qpid.server.message.internal.InternalMessage;
 import org.apache.qpid.server.model.UUIDGenerator;
 import org.apache.qpid.server.model.VirtualHost;
 import org.apache.qpid.server.store.Transaction.EnqueueRecord;
@@ -94,6 +101,8 @@ public abstract class MessageStoreTestCase extends UnitTestBase
 
     protected abstract MessageStore createMessageStore();
 
+    protected abstract boolean flowToDiskSupported();
+
     protected MessageStore getStore()
     {
         return _store;
@@ -439,6 +448,107 @@ public abstract class MessageStoreTestCase extends UnitTestBase
         verify(listener, times(1)).messageDeleted(message);
     }
 
+    @Test
+    public void testFlowToDisk() throws Exception
+    {
+        assumeThat(flowToDiskSupported(), is(equalTo(true)));
+
+        final StoredMessage<?> storedMessage = createStoredMessage();
+
+        assertEquals(storedMessage.getContentSize() + storedMessage.getMetadataSize(), storedMessage.getInMemorySize());
+        assertTrue(storedMessage.flowToDisk());
+        assertEquals(0, storedMessage.getInMemorySize());
+    }
+
+    @Test
+    public void testFlowToDiskAfterMetadataReload()
+    {
+        assumeThat(flowToDiskSupported(), is(equalTo(true)));
+
+        final StoredMessage<?> storedMessage = createStoredMessage();
+
+        assertTrue(storedMessage.flowToDisk());
+        assertNotNull(storedMessage.getMetaData());
+        assertEquals(storedMessage.getMetadataSize(), storedMessage.getInMemorySize());
+
+        assertTrue(storedMessage.flowToDisk());
+        assertEquals(0, storedMessage.getInMemorySize());
+    }
+
+    @Test
+    public void testFlowToDiskAfterContentReload()
+    {
+        assumeThat(flowToDiskSupported(), is(equalTo(true)));
+
+        final StoredMessage<?> storedMessage = createStoredMessage();
+
+        assertTrue(storedMessage.flowToDisk());
+        assertNotNull(storedMessage.getContent(0, storedMessage.getContentSize()));
+        assertEquals(storedMessage.getContentSize(), storedMessage.getInMemorySize());
+
+        assertTrue(storedMessage.flowToDisk());
+        assertEquals(0, storedMessage.getInMemorySize());
+    }
+
+
+    @Test
+    public void testIsInContentInMemoryBeforeFlowControl()
+    {
+        assumeThat(flowToDiskSupported(), is(equalTo(true)));
+
+        final StoredMessage<?> storedMessage = createStoredMessage();
+
+        assertTrue(storedMessage.isInContentInMemory());
+    }
+
+    @Test
+    public void testIsInContentInMemoryAfterFlowControl()
+    {
+        assumeThat(flowToDiskSupported(), is(equalTo(true)));
+
+        final StoredMessage<?> storedMessage = createStoredMessage();
+        assertTrue(storedMessage.flowToDisk());
+        assertFalse(storedMessage.isInContentInMemory());
+    }
+
+    @Test
+    public void testIsInContentInMemoryAfterReload()
+    {
+        assumeThat(flowToDiskSupported(), is(equalTo(true)));
+
+        final StoredMessage<?> storedMessage = createStoredMessage();
+        assertTrue(storedMessage.flowToDisk());
+        assertFalse(storedMessage.isInContentInMemory());
+        assertNotNull(storedMessage.getContent(0, storedMessage.getContentSize()));
+        assertTrue(storedMessage.isInContentInMemory());
+    }
+
+    private StoredMessage<?> createStoredMessage()
+    {
+        return createStoredMessage(Collections.singletonMap("test", "testValue"), "testContent", "testQueue");
+    }
+
+    private StoredMessage<?> createStoredMessage(final Map<String, String> headers,
+                                                 final String content,
+                                                 final String queueName)
+    {
+        return createInternalTestMessage(headers, content, queueName).getStoredMessage();
+    }
+
+    private InternalMessage createInternalTestMessage(final Map<String, String> headers,
+                                                      final String content,
+                                                      final String queueName)
+    {
+        final AMQMessageHeader messageHeader = mock(AMQMessageHeader.class);
+        if (headers != null)
+        {
+            headers.forEach((k,v) -> when(messageHeader.getHeader(k)).thenReturn(v));
+            when(messageHeader.getHeaderNames()).thenReturn(headers.keySet());
+        }
+
+        return InternalMessage.createMessage(_store, messageHeader, content, true, queueName);
+    }
+
     private TransactionLogResource createTransactionLogResource(UUID queueId)
     {
         TransactionLogResource queue = mock(TransactionLogResource.class);
diff --git a/broker-core/src/test/java/org/apache/qpid/server/virtualhost/FlowToDiskCheckingTaskTest.java b/broker-core/src/test/java/org/apache/qpid/server/virtualhost/FlowToDiskCheckingTaskTest.java
new file mode 100644
index 0000000..9e68f62
--- /dev/null
+++ b/broker-core/src/test/java/org/apache/qpid/server/virtualhost/FlowToDiskCheckingTaskTest.java
@@ -0,0 +1,117 @@
+package org.apache.qpid.server.virtualhost;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.MessageReference;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.message.internal.InternalMessage;
+import org.apache.qpid.server.model.BrokerTestHelper;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.test.utils.UnitTestBase;
+
+public class FlowToDiskCheckingTaskTest extends UnitTestBase
+{
+    private static final int FLOW_TO_DISK_CHECK_PERIOD = 0;
+    private AbstractVirtualHost<?> _virtualHost;
+    private Queue _queue;
+    private AbstractVirtualHost.FlowToDiskCheckingTask _task;
+
+    @Before
+    public void setUp() throws Exception
+    {
+        final Map<String, Object> attributes = new HashMap<>();
+        attributes.put(VirtualHost.NAME, getTestName());
+        attributes.put(VirtualHost.TYPE,  TestMemoryVirtualHost.VIRTUAL_HOST_TYPE);
+        attributes.put(VirtualHost.CONTEXT, Collections.singletonMap(QueueManagingVirtualHost.FLOW_TO_DISK_CHECK_PERIOD,
+                                                                     FLOW_TO_DISK_CHECK_PERIOD));
+        _virtualHost = (AbstractVirtualHost)BrokerTestHelper.createVirtualHost(attributes, this);
+        _task = _virtualHost. new FlowToDiskCheckingTask();
+        _queue = _virtualHost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, getTestName()));
+        _queue.enqueue(InternalMessage.createMessage(_virtualHost.getMessageStore(),
+                                                     mock(AMQMessageHeader.class),
+                                                     "test",
+                                                     true,
+                                                     _queue.getName()), null, null);
+    }
+
+    @After
+    public void tearDown() throws Exception
+    {
+        if (_queue != null)
+        {
+            _queue.close();
+        }
+        if (_virtualHost !=  null)
+        {
+            _virtualHost.close();
+        }
+    }
+
+    @Test
+    public void testFlowToDiskInMemoryMessage()
+    {
+        final ServerMessage message = createMessage(10, 20);
+        _queue.enqueue(message, null, null);
+
+        makeVirtualHostTargetSizeExceeded();
+
+        _task.execute();
+        verify(message.getStoredMessage()).flowToDisk();
+    }
+
+    private void makeVirtualHostTargetSizeExceeded()
+    {
+        if (_virtualHost.getInMemoryMessageSize() == 0)
+        {
+            _queue.enqueue(InternalMessage.createMessage(_virtualHost.getMessageStore(),
+                                                         mock(AMQMessageHeader.class),
+                                                         "test",
+                                                         true,
+                                                         _queue.getName()), null, null);
+        }
+        _virtualHost.setTargetSize(1L);
+        assertTrue(_virtualHost.isOverTargetSize());
+    }
+
+    private ServerMessage createMessage(final int headerSize, final int payloadSize)
+    {
+        long totalSize = headerSize + payloadSize;
+        final long id = System.currentTimeMillis();
+        final ServerMessage message = mock(ServerMessage.class);
+        when(message.getMessageNumber()).thenReturn(id);
+        when(message.getMessageHeader()).thenReturn(mock(AMQMessageHeader.class));
+        when(message.checkValid()).thenReturn(true);
+        when(message.getSizeIncludingHeader()).thenReturn(totalSize);
+        when(message.getValidationStatus()).thenReturn(ServerMessage.ValidationStatus.UNKNOWN);
+
+        final StoredMessage storedMessage = mock(StoredMessage.class);
+        when(storedMessage.getContentSize()).thenReturn(payloadSize);
+        when(storedMessage.getMetadataSize()).thenReturn(headerSize);
+        when(storedMessage.getInMemorySize()).thenReturn(totalSize);
+        when(message.getStoredMessage()).thenReturn(storedMessage);
+
+        final MessageReference ref = mock(MessageReference.class);
+        when(ref.getMessage()).thenReturn(message);
+
+        when(message.newReference()).thenReturn(ref);
+        when(message.newReference(any(TransactionLogResource.class))).thenReturn(ref);
+
+        return message;
+    }
+}
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
index 114b0f2..5a27e89 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
@@ -129,12 +129,18 @@ public class MessageConverter_Internal_to_v0_10 implements MessageConverter<Inte
                     }
 
                     @Override
-                    public boolean isInMemory()
+                    public boolean isInContentInMemory()
                     {
                         return true;
                     }
 
                     @Override
+                    public long getInMemorySize()
+                    {
+                        return getContentSize() + getMetadataSize();
+                    }
+
+                    @Override
                     public boolean flowToDisk()
                     {
                         return false;
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
index 8c9db87..10e76bd 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
@@ -105,12 +105,18 @@ public class MessageConverter_v0_10 implements MessageConverter<ServerMessage, M
                     }
 
                     @Override
-                    public boolean isInMemory()
+                    public boolean isInContentInMemory()
                     {
                         return true;
                     }
 
                     @Override
+                    public long getInMemorySize()
+                    {
+                        return getContentSize() + getMetadataSize();
+                    }
+
+                    @Override
                     public boolean flowToDisk()
                     {
                         return false;
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
index b362b2a..1196d38 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
@@ -132,12 +132,18 @@ public class MessageConverter_Internal_to_v0_8 implements MessageConverter<Inter
             }
 
             @Override
-            public boolean isInMemory()
+            public boolean isInContentInMemory()
             {
                 return true;
             }
 
             @Override
+            public long getInMemorySize()
+            {
+                return getContentSize() + getMetadataSize();
+            }
+
+            @Override
             public boolean flowToDisk()
             {
                 return false;
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
index 71c3b06..a9914c4 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
@@ -443,12 +443,18 @@ public abstract class MessageConverter_to_1_0<M extends ServerMessage> implement
         }
 
         @Override
-        public boolean isInMemory()
+        public boolean isInContentInMemory()
         {
             return true;
         }
 
         @Override
+        public long getInMemorySize()
+        {
+            return getContentSize() + getMetadataSize();
+        }
+
+        @Override
         public boolean flowToDisk()
         {
             return false;
diff --git a/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java b/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java
index 07aeb95..83bc923 100644
--- a/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java
+++ b/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java
@@ -149,12 +149,18 @@ public class MessageConverter_1_0_to_v0_10 implements MessageConverter<Message_1
             }
 
             @Override
-            public boolean isInMemory()
+            public boolean isInContentInMemory()
             {
                 return true;
             }
 
             @Override
+            public long getInMemorySize()
+            {
+                return getContentSize() + getMetadataSize();
+            }
+
+            @Override
             public boolean flowToDisk()
             {
                 return false;
diff --git a/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java b/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
index 59ab6b0..ab9afa6 100644
--- a/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
+++ b/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
@@ -274,12 +274,18 @@ public class MessageConverter_0_10_to_0_8 implements MessageConverter<MessageTra
             }
 
             @Override
-            public boolean isInMemory()
+            public boolean isInContentInMemory()
             {
                 return true;
             }
 
             @Override
+            public long getInMemorySize()
+            {
+                return getMetadataSize() + getContentSize();
+            }
+
+            @Override
             public boolean flowToDisk()
             {
                 return false;
diff --git a/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java b/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
index 6d9c190..68c96c9 100644
--- a/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
+++ b/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
@@ -115,12 +115,18 @@ public class MessageConverter_0_8_to_0_10  implements MessageConverter<AMQMessag
             }
 
             @Override
-            public boolean isInMemory()
+            public boolean isInContentInMemory()
             {
                 return true;
             }
 
             @Override
+            public long getInMemorySize()
+            {
+                return getContentSize() + getMetadataSize();
+            }
+
+            @Override
             public boolean flowToDisk()
             {
                 return false;
diff --git a/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java b/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
index 3c6058f..738bd9a 100644
--- a/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
+++ b/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
@@ -140,12 +140,18 @@ public class MessageConverter_1_0_to_v0_8 implements MessageConverter<Message_1_
             }
 
             @Override
-            public boolean isInMemory()
+            public boolean isInContentInMemory()
             {
                 return true;
             }
 
             @Override
+            public long getInMemorySize()
+            {
+                return getContentSize() + getMetadataSize();
+            }
+
+            @Override
             public boolean flowToDisk()
             {
                 return false;
diff --git a/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java b/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java
index 336da71..da203bb 100644
--- a/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java
+++ b/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java
@@ -109,4 +109,10 @@ public class DerbyMessageStoreTest extends MessageStoreTestCase
         return new DerbyMessageStore();
     }
 
+    @Override
+    protected boolean flowToDiskSupported()
+    {
+        return true;
+    }
+
 }
diff --git a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
index b2359b7..df02e55 100644
--- a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
+++ b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
@@ -1637,11 +1637,36 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
         }
 
         @Override
-        public synchronized boolean isInMemory()
+        public synchronized boolean isInContentInMemory()
         {
             return _messageDataRef != null && (_messageDataRef.isHardRef() || _messageDataRef.getData() != null);
         }
 
+        @Override
+        public synchronized long getInMemorySize()
+        {
+            long size = 0;
+            if (_messageDataRef != null)
+            {
+                if (_messageDataRef.isHardRef())
+                {
+                    size += getMetadataSize() + getContentSize();
+                }
+                else
+                {
+                    if (_messageDataRef.getMetaData() != null)
+                    {
+                        size += getMetadataSize();
+                    }
+                    if (_messageDataRef.getData() != null)
+                    {
+                        size += getContentSize();
+                    }
+                }
+            }
+            return size;
+        }
+
         private boolean stored()
         {
             return _messageDataRef != null && !_messageDataRef.isHardRef();
diff --git a/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java b/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
index a152337..9f12062 100644
--- a/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
+++ b/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
@@ -323,6 +323,12 @@ public class JDBCMessageStoreTest extends MessageStoreTestCase
         return new GenericJDBCMessageStore();
     }
 
+    @Override
+    protected boolean flowToDiskSupported()
+    {
+        return true;
+    }
+
     private Connection openConnection() throws SQLException
     {
         return TestJdbcUtils.openConnection(_connectionURL);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[qpid-broker-j] 02/04: QPID-8316: [Broker-J] Message validation should not store decoded headers in heap

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orudyy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git

commit ee225cdada7be63ea5ae7bfdaecc635bbabc3d5a
Author: Alex Rudyy <or...@apache.org>
AuthorDate: Fri Jun 14 12:43:05 2019 +0100

    QPID-8316: [Broker-J] Message validation should not store decoded headers in heap
---
 .../qpid/server/protocol/v0_8/FieldTable.java      | 28 +++++++++++++---------
 1 file changed, 17 insertions(+), 11 deletions(-)

diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/FieldTable.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/FieldTable.java
index 7ea7958..3c14ebb 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/FieldTable.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/FieldTable.java
@@ -109,14 +109,11 @@ public class FieldTable
         return value;
     }
 
-    private void decode()
+    private Map<String, AMQTypedValue> decode()
     {
+        final Map<String, AMQTypedValue> properties = new HashMap<>();
         if (_encodedSize > 0 && _encodedForm != null)
         {
-            if (!_properties.isEmpty())
-            {
-                _properties.clear();
-            }
             _encodedForm.mark();
             try
             {
@@ -126,7 +123,7 @@ public class FieldTable
 
                     checkPropertyName(key);
                     AMQTypedValue value = AMQTypedValue.readFromBuffer(_encodedForm);
-                    _properties.put(key, value);
+                    properties.put(key, value);
                 }
                 while (_encodedForm.hasRemaining());
             }
@@ -135,7 +132,7 @@ public class FieldTable
                 _encodedForm.reset();
             }
 
-            final long recalculateEncodedSize = recalculateEncodedSize();
+            final long recalculateEncodedSize = recalculateEncodedSize(properties);
             if (_encodedSize != recalculateEncodedSize)
             {
                 throw new IllegalStateException(String.format(
@@ -144,6 +141,7 @@ public class FieldTable
                         recalculateEncodedSize));
             }
         }
+        return properties;
     }
 
     private void decodeIfNecessary()
@@ -152,7 +150,12 @@ public class FieldTable
         {
             try
             {
-                decode();
+                final Map<String, AMQTypedValue> properties = decode();
+                if (!_properties.isEmpty())
+                {
+                    _properties.clear();
+                }
+                _properties.putAll(properties);
             }
             finally
             {
@@ -344,10 +347,10 @@ public class FieldTable
         return _encodedSize;
     }
 
-    private synchronized long recalculateEncodedSize()
+    private synchronized long recalculateEncodedSize(final Map<String, AMQTypedValue> properties)
     {
         long size = 0L;
-        for (Map.Entry<String, AMQTypedValue> e : _properties.entrySet())
+        for (Map.Entry<String, AMQTypedValue> e : properties.entrySet())
         {
             String key = e.getKey();
             AMQTypedValue value = e.getValue();
@@ -533,6 +536,9 @@ public class FieldTable
 
     public synchronized void validate()
     {
-        decodeIfNecessary();
+        if (!_decoded)
+        {
+            decode();
+        }
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org