You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2019/06/18 09:16:56 UTC

[qpid-broker-j] 03/07: QPID-8316: [Broker-J] Flow to disk messages with metadata loaded into direct memory

This is an automated email from the ASF dual-hosted git repository.

orudyy pushed a commit to branch 7.1.x
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git

commit 712942a7aeec9a46b06417c43e985179756b7183
Author: Alex Rudyy <or...@apache.org>
AuthorDate: Thu Jun 13 22:00:29 2019 +0100

    QPID-8316: [Broker-J] Flow to disk messages with metadata loaded into direct memory
---
 .../store/berkeleydb/AbstractBDBMessageStore.java  |  27 ++++-
 .../store/berkeleydb/BDBMessageStoreTest.java      |   6 ++
 .../server/message/AbstractServerMessageImpl.java  |   2 +-
 .../server/message/internal/InternalMessage.java   |   8 +-
 .../apache/qpid/server/queue/AbstractQueue.java    |  24 ++---
 .../qpid/server/store/StoredMemoryMessage.java     |   8 +-
 .../apache/qpid/server/store/StoredMessage.java    |   4 +-
 .../server/virtualhost/AbstractVirtualHost.java    |  16 ++-
 .../qpid/server/queue/AbstractQueueTestBase.java   |  58 ++++++++++
 .../queue/FlowToDiskOverflowPolicyHandlerTest.java |   3 +-
 .../qpid/server/store/MemoryMessageStoreTest.java  |   6 ++
 .../qpid/server/store/MessageStoreTestCase.java    | 112 +++++++++++++++++++-
 .../virtualhost/FlowToDiskCheckingTaskTest.java    | 117 +++++++++++++++++++++
 .../v0_10/MessageConverter_Internal_to_v0_10.java  |   8 +-
 .../protocol/v0_10/MessageConverter_v0_10.java     |   8 +-
 .../v0_8/MessageConverter_Internal_to_v0_8.java    |   8 +-
 .../protocol/v1_0/MessageConverter_to_1_0.java     |   8 +-
 .../v0_10_v1_0/MessageConverter_1_0_to_v0_10.java  |   8 +-
 .../v0_8_v0_10/MessageConverter_0_10_to_0_8.java   |   8 +-
 .../v0_8_v0_10/MessageConverter_0_8_to_0_10.java   |   8 +-
 .../v0_8_v1_0/MessageConverter_1_0_to_v0_8.java    |   8 +-
 .../server/store/derby/DerbyMessageStoreTest.java  |   6 ++
 .../store/jdbc/AbstractJDBCMessageStore.java       |  27 ++++-
 .../server/store/jdbc/JDBCMessageStoreTest.java    |   6 ++
 24 files changed, 451 insertions(+), 43 deletions(-)

diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
index 6a57739..b48cd54 100644
--- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
+++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
@@ -1221,11 +1221,36 @@ public abstract class AbstractBDBMessageStore implements MessageStore
         }
 
         @Override
-        public synchronized boolean isInMemory()
+        public synchronized boolean isInContentInMemory()
         {
             return _messageDataRef != null && (_messageDataRef.isHardRef() || _messageDataRef.getData() != null);
         }
 
+        @Override
+        public synchronized long getInMemorySize()
+        {
+            long size = 0;
+            if (_messageDataRef != null)
+            {
+                if (_messageDataRef.isHardRef())
+                {
+                    size += getMetadataSize() + getContentSize();
+                }
+                else
+                {
+                    if (_messageDataRef.getMetaData() != null)
+                    {
+                        size += getMetadataSize();
+                    }
+                    if (_messageDataRef.getData() != null)
+                    {
+                        size += getContentSize();
+                    }
+                }
+            }
+            return size;
+        }
+
         private boolean stored()
         {
             return _messageDataRef != null && !_messageDataRef.isHardRef();
diff --git a/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java b/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
index b45f6c2..d3fea6e 100644
--- a/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
+++ b/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
@@ -212,4 +212,10 @@ public class BDBMessageStoreTest extends MessageStoreTestCase
         return new BDBMessageStore();
     }
 
+    @Override
+    protected boolean flowToDiskSupported()
+    {
+        return true;
+    }
+
 }
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java b/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
index 2f72098..ea1e8d7 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
@@ -194,7 +194,7 @@ public abstract class AbstractServerMessageImpl<X extends AbstractServerMessageI
     public QpidByteBuffer getContent(int offset, int length)
     {
         StoredMessage<T> storedMessage = getStoredMessage();
-        boolean wasInMemory = storedMessage.isInMemory();
+        boolean wasInMemory = storedMessage.isInContentInMemory();
         try
         {
             return storedMessage.getContent(offset, length);
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java b/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
index 75f56de..f1decc8 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
@@ -281,12 +281,18 @@ public class InternalMessage extends AbstractServerMessageImpl<InternalMessage,
                     }
 
                     @Override
-                    public boolean isInMemory()
+                    public boolean isInContentInMemory()
                     {
                         return true;
                     }
 
                     @Override
+                    public long getInMemorySize()
+                    {
+                        return getContentSize() + getMetadataSize();
+                    }
+
+                    @Override
                     public boolean flowToDisk()
                     {
                         return false;
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 375732b..22e42d4 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -1226,7 +1226,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
         final StoredMessage storedMessage = message.getStoredMessage();
         if ((_virtualHost.isOverTargetSize()
              || QpidByteBuffer.getAllocatedDirectMemorySize() > _flowToDiskThreshold)
-            && storedMessage.isInMemory())
+            && storedMessage.getInMemorySize() > 0)
         {
             if (message.checkValid())
             {
@@ -2383,26 +2383,14 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
     public boolean checkValid(final QueueEntry queueEntry)
     {
         final ServerMessage message = queueEntry.getMessage();
-        final ServerMessage.ValidationStatus validationStatus = message.getValidationStatus();
-        boolean isValid = false;
-        if (validationStatus == ServerMessage.ValidationStatus.UNKNOWN)
+        boolean isValid = true;
+        try (MessageReference ref = message.newReference())
         {
-            try (MessageReference ref = message.newReference())
-            {
-                isValid = message.checkValid();
-            }
-            catch (MessageDeletedException e)
-            {
-                // noop
-            }
-        }
-        else
-        {
-            isValid = validationStatus == ServerMessage.ValidationStatus.VALID;
+            isValid = message.checkValid();
         }
-        if (!isValid)
+        catch (MessageDeletedException e)
         {
-            malformedEntry(queueEntry);
+            // noop
         }
         return isValid;
     }
diff --git a/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java b/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
index 1132e82..118ec44 100755
--- a/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
@@ -117,12 +117,18 @@ public class StoredMemoryMessage<T extends StorableMessageMetaData> implements S
     }
 
     @Override
-    public boolean isInMemory()
+    public boolean isInContentInMemory()
     {
         return true;
     }
 
     @Override
+    public long getInMemorySize()
+    {
+        return getContentSize() + getMetadataSize();
+    }
+
+    @Override
     public boolean flowToDisk()
     {
         return false;
diff --git a/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java b/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java
index a1be172..c276e7b 100755
--- a/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java
@@ -41,7 +41,9 @@ public interface StoredMessage<M extends StorableMessageMetaData>
 
     void remove();
 
-    boolean isInMemory();
+    boolean isInContentInMemory();
+
+    long getInMemorySize();
 
     boolean flowToDisk();
 
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index c3b9ab3..7642ad0 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -2100,7 +2100,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
         }
     }
 
-    private class FlowToDiskCheckingTask extends HouseKeepingTask
+    class FlowToDiskCheckingTask extends HouseKeepingTask
     {
         public FlowToDiskCheckingTask()
         {
@@ -2133,19 +2133,17 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
                             try (MessageReference messageReference = node.getMessage().newReference())
                             {
                                 final StoredMessage storedMessage = messageReference.getMessage().getStoredMessage();
-                                if (storedMessage.isInMemory())
+                                final long inMemorySize = storedMessage.getInMemorySize();
+                                if (inMemorySize > 0)
                                 {
                                     if (cumulativeSize <= currentTargetSize)
                                     {
-                                        cumulativeSize += storedMessage.getContentSize();
-                                        cumulativeSize += storedMessage.getMetadataSize();
+                                        cumulativeSize += inMemorySize;
                                     }
-                                    else
+
+                                    if (cumulativeSize > currentTargetSize && node.getQueue().checkValid(node))
                                     {
-                                        if (node.getQueue().checkValid(node))
-                                        {
-                                            storedMessage.flowToDisk();
-                                        }
+                                        storedMessage.flowToDisk();
                                     }
                                 }
                             }
diff --git a/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java b/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
index 82b8dc8..4fcea19 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
@@ -32,6 +32,7 @@ import static org.mockito.ArgumentMatchers.contains;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.atLeastOnce;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyZeroInteractions;
 import static org.mockito.Mockito.when;
@@ -69,6 +70,7 @@ import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.message.RoutingResult;
 import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.message.internal.InternalMessage;
 import org.apache.qpid.server.model.AlternateBinding;
 import org.apache.qpid.server.model.Binding;
 import org.apache.qpid.server.model.BrokerTestHelper;
@@ -1214,6 +1216,62 @@ abstract class AbstractQueueTestBase extends UnitTestBase
                             (long) target.getQueueDepthMessages());
     }
 
+    @Test
+    public void testEnqueuedMessageFlowedToDisk() throws Exception
+    {
+        makeVirtualHostTargetSizeExceeded();
+
+        final ServerMessage message2 = createMessage(1L, 2, 3);
+        final long sizeIncludingHeader = message2.getSizeIncludingHeader();
+        final StoredMessage storedMessage = message2.getStoredMessage();
+        when(storedMessage.getInMemorySize()).thenReturn(sizeIncludingHeader);
+
+        _queue.enqueue(message2, null, null);
+
+        verify(storedMessage).getInMemorySize();
+        verify(storedMessage).flowToDisk();
+
+        assertEquals("Unexpected number of messages on the queue",
+                     2,
+                     _queue.getQueueDepthMessages());
+    }
+
+    @Test
+    public void testEnqueuedMalformedMessageDeleted() throws Exception
+    {
+        makeVirtualHostTargetSizeExceeded();
+
+        final ServerMessage message2 = createMessage(1L, 2, 3);
+        final long sizeIncludingHeader = message2.getSizeIncludingHeader();
+        final StoredMessage storedMessage = message2.getStoredMessage();
+        when(storedMessage.getInMemorySize()).thenReturn(sizeIncludingHeader);
+        when(message2.checkValid()).thenReturn(false);
+
+        _queue.enqueue(message2, null, null);
+
+        verify(storedMessage).getInMemorySize();
+        verify(storedMessage, never()).flowToDisk();
+
+        assertEquals("Unexpected number of messages on the queue",
+                     1,
+                     _queue.getQueueDepthMessages());
+    }
+
+    private void makeVirtualHostTargetSizeExceeded()
+    {
+        final InternalMessage message = InternalMessage.createMessage(_virtualHost.getMessageStore(),
+                                                                       mock(AMQMessageHeader.class),
+                                                                       "test",
+                                                                       true,
+                                                                       _qname);
+        _queue.enqueue(message, null, null);
+        assertEquals("Unexpected number of messages on the queue",
+                     1,
+                     _queue.getQueueDepthMessages());
+        _virtualHost.setTargetSize(1L);
+        assertTrue(_virtualHost.isOverTargetSize());
+    }
+
     private long getExpirationOnQueue(final Queue<?> queue, long arrivalTime, long expiration)
     {
         final List<QueueEntry> entries = new ArrayList<>();
diff --git a/broker-core/src/test/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandlerTest.java b/broker-core/src/test/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandlerTest.java
index 8351e53..9d8f997 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandlerTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandlerTest.java
@@ -134,7 +134,8 @@ public class FlowToDiskOverflowPolicyHandlerTest extends UnitTestBase
 
         StoredMessage storedMessage = mock(StoredMessage.class);
         when(message.getStoredMessage()).thenReturn(storedMessage);
-        when(storedMessage.isInMemory()).thenReturn(true);
+        when(storedMessage.isInContentInMemory()).thenReturn(true);
+        when(storedMessage.getInMemorySize()).thenReturn(size);
 
         MessageReference ref = mock(MessageReference.class);
         when(ref.getMessage()).thenReturn(message);
diff --git a/broker-core/src/test/java/org/apache/qpid/server/store/MemoryMessageStoreTest.java b/broker-core/src/test/java/org/apache/qpid/server/store/MemoryMessageStoreTest.java
index f05da24..39eb426 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/store/MemoryMessageStoreTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/store/MemoryMessageStoreTest.java
@@ -42,6 +42,12 @@ public class MemoryMessageStoreTest extends MessageStoreTestCase
     }
 
     @Override
+    protected boolean flowToDiskSupported()
+    {
+        return false;
+    }
+
+    @Override
     protected void reopenStore() throws Exception
     {
         // cannot re-open memory message store as it is not persistent
diff --git a/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java b/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java
index 0c8a4ca..f74f60e 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java
@@ -21,10 +21,14 @@
 package org.apache.qpid.server.store;
 
 import static junit.framework.TestCase.assertNull;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.junit.Assume.assumeThat;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.argThat;
 import static org.mockito.ArgumentMatchers.eq;
@@ -34,19 +38,22 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.util.Collections;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicReference;
 
-import org.hamcrest.Description;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.ArgumentMatcher;
 
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.message.AMQMessageHeader;
 import org.apache.qpid.server.message.EnqueueableMessage;
+import org.apache.qpid.server.message.internal.InternalMessage;
 import org.apache.qpid.server.model.UUIDGenerator;
 import org.apache.qpid.server.model.VirtualHost;
 import org.apache.qpid.server.store.Transaction.EnqueueRecord;
@@ -94,6 +101,8 @@ public abstract class MessageStoreTestCase extends UnitTestBase
 
     protected abstract MessageStore createMessageStore();
 
+    protected abstract boolean flowToDiskSupported();
+
     protected MessageStore getStore()
     {
         return _store;
@@ -439,6 +448,107 @@ public abstract class MessageStoreTestCase extends UnitTestBase
         verify(listener, times(1)).messageDeleted(message);
     }
 
+    @Test
+    public void testFlowToDisk() throws Exception
+    {
+        assumeThat(flowToDiskSupported(), is(equalTo(true)));
+
+        final StoredMessage<?> storedMessage = createStoredMessage();
+
+        assertEquals(storedMessage.getContentSize() + storedMessage.getMetadataSize(), storedMessage.getInMemorySize());
+        assertTrue(storedMessage.flowToDisk());
+        assertEquals(0, storedMessage.getInMemorySize());
+    }
+
+    @Test
+    public void testFlowToDiskAfterMetadataReload()
+    {
+        assumeThat(flowToDiskSupported(), is(equalTo(true)));
+
+        final StoredMessage<?> storedMessage = createStoredMessage();
+
+        assertTrue(storedMessage.flowToDisk());
+        assertNotNull(storedMessage.getMetaData());
+        assertEquals(storedMessage.getMetadataSize(), storedMessage.getInMemorySize());
+
+        assertTrue(storedMessage.flowToDisk());
+        assertEquals(0, storedMessage.getInMemorySize());
+    }
+
+    @Test
+    public void testFlowToDiskAfterContentReload()
+    {
+        assumeThat(flowToDiskSupported(), is(equalTo(true)));
+
+        final StoredMessage<?> storedMessage = createStoredMessage();
+
+        assertTrue(storedMessage.flowToDisk());
+        assertNotNull(storedMessage.getContent(0, storedMessage.getContentSize()));
+        assertEquals(storedMessage.getContentSize(), storedMessage.getInMemorySize());
+
+        assertTrue(storedMessage.flowToDisk());
+        assertEquals(0, storedMessage.getInMemorySize());
+    }
+
+
+    @Test
+    public void testIsInContentInMemoryBeforeFlowControl()
+    {
+        assumeThat(flowToDiskSupported(), is(equalTo(true)));
+
+        final StoredMessage<?> storedMessage = createStoredMessage();
+
+        assertTrue(storedMessage.isInContentInMemory());
+    }
+
+    @Test
+    public void testIsInContentInMemoryAfterFlowControl()
+    {
+        assumeThat(flowToDiskSupported(), is(equalTo(true)));
+
+        final StoredMessage<?> storedMessage = createStoredMessage();
+        assertTrue(storedMessage.flowToDisk());
+        assertFalse(storedMessage.isInContentInMemory());
+    }
+
+    @Test
+    public void testIsInContentInMemoryAfterReload()
+    {
+        assumeThat(flowToDiskSupported(), is(equalTo(true)));
+
+        final StoredMessage<?> storedMessage = createStoredMessage();
+        assertTrue(storedMessage.flowToDisk());
+        assertFalse(storedMessage.isInContentInMemory());
+        assertNotNull(storedMessage.getContent(0, storedMessage.getContentSize()));
+        assertTrue(storedMessage.isInContentInMemory());
+    }
+
+    private StoredMessage<?> createStoredMessage()
+    {
+        return createStoredMessage(Collections.singletonMap("test", "testValue"), "testContent", "testQueue");
+    }
+
+    private StoredMessage<?> createStoredMessage(final Map<String, String> headers,
+                                                 final String content,
+                                                 final String queueName)
+    {
+        return createInternalTestMessage(headers, content, queueName).getStoredMessage();
+    }
+
+    private InternalMessage createInternalTestMessage(final Map<String, String> headers,
+                                                      final String content,
+                                                      final String queueName)
+    {
+        final AMQMessageHeader messageHeader = mock(AMQMessageHeader.class);
+        if (headers != null)
+        {
+            headers.forEach((k,v) -> when(messageHeader.getHeader(k)).thenReturn(v));
+            when(messageHeader.getHeaderNames()).thenReturn(headers.keySet());
+        }
+
+        return InternalMessage.createMessage(_store, messageHeader, content, true, queueName);
+    }
+
     private TransactionLogResource createTransactionLogResource(UUID queueId)
     {
         TransactionLogResource queue = mock(TransactionLogResource.class);
diff --git a/broker-core/src/test/java/org/apache/qpid/server/virtualhost/FlowToDiskCheckingTaskTest.java b/broker-core/src/test/java/org/apache/qpid/server/virtualhost/FlowToDiskCheckingTaskTest.java
new file mode 100644
index 0000000..9e68f62
--- /dev/null
+++ b/broker-core/src/test/java/org/apache/qpid/server/virtualhost/FlowToDiskCheckingTaskTest.java
@@ -0,0 +1,117 @@
+package org.apache.qpid.server.virtualhost;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.MessageReference;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.message.internal.InternalMessage;
+import org.apache.qpid.server.model.BrokerTestHelper;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.test.utils.UnitTestBase;
+
+public class FlowToDiskCheckingTaskTest extends UnitTestBase
+{
+    private static final int FLOW_TO_DISK_CHECK_PERIOD = 0;
+    private AbstractVirtualHost<?> _virtualHost;
+    private Queue _queue;
+    private AbstractVirtualHost.FlowToDiskCheckingTask _task;
+
+    @Before
+    public void setUp() throws Exception
+    {
+        final Map<String, Object> attributes = new HashMap<>();
+        attributes.put(VirtualHost.NAME, getTestName());
+        attributes.put(VirtualHost.TYPE,  TestMemoryVirtualHost.VIRTUAL_HOST_TYPE);
+        attributes.put(VirtualHost.CONTEXT, Collections.singletonMap(QueueManagingVirtualHost.FLOW_TO_DISK_CHECK_PERIOD,
+                                                                     FLOW_TO_DISK_CHECK_PERIOD));
+        _virtualHost = (AbstractVirtualHost)BrokerTestHelper.createVirtualHost(attributes, this);
+        _task = _virtualHost. new FlowToDiskCheckingTask();
+        _queue = _virtualHost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, getTestName()));
+        _queue.enqueue(InternalMessage.createMessage(_virtualHost.getMessageStore(),
+                                                     mock(AMQMessageHeader.class),
+                                                     "test",
+                                                     true,
+                                                     _queue.getName()), null, null);
+    }
+
+    @After
+    public void tearDown() throws Exception
+    {
+        if (_queue != null)
+        {
+            _queue.close();
+        }
+        if (_virtualHost !=  null)
+        {
+            _virtualHost.close();
+        }
+    }
+
+    @Test
+    public void testFlowToDiskInMemoryMessage()
+    {
+        final ServerMessage message = createMessage(10, 20);
+        _queue.enqueue(message, null, null);
+
+        makeVirtualHostTargetSizeExceeded();
+
+        _task.execute();
+        verify(message.getStoredMessage()).flowToDisk();
+    }
+
+    private void makeVirtualHostTargetSizeExceeded()
+    {
+        if (_virtualHost.getInMemoryMessageSize() == 0)
+        {
+            _queue.enqueue(InternalMessage.createMessage(_virtualHost.getMessageStore(),
+                                                         mock(AMQMessageHeader.class),
+                                                         "test",
+                                                         true,
+                                                         _queue.getName()), null, null);
+        }
+        _virtualHost.setTargetSize(1L);
+        assertTrue(_virtualHost.isOverTargetSize());
+    }
+
+    private ServerMessage createMessage(final int headerSize, final int payloadSize)
+    {
+        long totalSize = headerSize + payloadSize;
+        final long id = System.currentTimeMillis();
+        final ServerMessage message = mock(ServerMessage.class);
+        when(message.getMessageNumber()).thenReturn(id);
+        when(message.getMessageHeader()).thenReturn(mock(AMQMessageHeader.class));
+        when(message.checkValid()).thenReturn(true);
+        when(message.getSizeIncludingHeader()).thenReturn(totalSize);
+        when(message.getValidationStatus()).thenReturn(ServerMessage.ValidationStatus.UNKNOWN);
+
+        final StoredMessage storedMessage = mock(StoredMessage.class);
+        when(storedMessage.getContentSize()).thenReturn(payloadSize);
+        when(storedMessage.getMetadataSize()).thenReturn(headerSize);
+        when(storedMessage.getInMemorySize()).thenReturn(totalSize);
+        when(message.getStoredMessage()).thenReturn(storedMessage);
+
+        final MessageReference ref = mock(MessageReference.class);
+        when(ref.getMessage()).thenReturn(message);
+
+        when(message.newReference()).thenReturn(ref);
+        when(message.newReference(any(TransactionLogResource.class))).thenReturn(ref);
+
+        return message;
+    }
+}
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
index 114b0f2..5a27e89 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
@@ -129,12 +129,18 @@ public class MessageConverter_Internal_to_v0_10 implements MessageConverter<Inte
                     }
 
                     @Override
-                    public boolean isInMemory()
+                    public boolean isInContentInMemory()
                     {
                         return true;
                     }
 
                     @Override
+                    public long getInMemorySize()
+                    {
+                        return getContentSize() + getMetadataSize();
+                    }
+
+                    @Override
                     public boolean flowToDisk()
                     {
                         return false;
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
index 8c9db87..10e76bd 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
@@ -105,12 +105,18 @@ public class MessageConverter_v0_10 implements MessageConverter<ServerMessage, M
                     }
 
                     @Override
-                    public boolean isInMemory()
+                    public boolean isInContentInMemory()
                     {
                         return true;
                     }
 
                     @Override
+                    public long getInMemorySize()
+                    {
+                        return getContentSize() + getMetadataSize();
+                    }
+
+                    @Override
                     public boolean flowToDisk()
                     {
                         return false;
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
index b362b2a..1196d38 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
@@ -132,12 +132,18 @@ public class MessageConverter_Internal_to_v0_8 implements MessageConverter<Inter
             }
 
             @Override
-            public boolean isInMemory()
+            public boolean isInContentInMemory()
             {
                 return true;
             }
 
             @Override
+            public long getInMemorySize()
+            {
+                return getContentSize() + getMetadataSize();
+            }
+
+            @Override
             public boolean flowToDisk()
             {
                 return false;
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
index 71c3b06..a9914c4 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
@@ -443,12 +443,18 @@ public abstract class MessageConverter_to_1_0<M extends ServerMessage> implement
         }
 
         @Override
-        public boolean isInMemory()
+        public boolean isInContentInMemory()
         {
             return true;
         }
 
         @Override
+        public long getInMemorySize()
+        {
+            return getContentSize() + getMetadataSize();
+        }
+
+        @Override
         public boolean flowToDisk()
         {
             return false;
diff --git a/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java b/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java
index 07aeb95..83bc923 100644
--- a/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java
+++ b/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java
@@ -149,12 +149,18 @@ public class MessageConverter_1_0_to_v0_10 implements MessageConverter<Message_1
             }
 
             @Override
-            public boolean isInMemory()
+            public boolean isInContentInMemory()
             {
                 return true;
             }
 
             @Override
+            public long getInMemorySize()
+            {
+                return getContentSize() + getMetadataSize();
+            }
+
+            @Override
             public boolean flowToDisk()
             {
                 return false;
diff --git a/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java b/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
index 59ab6b0..ab9afa6 100644
--- a/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
+++ b/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
@@ -274,12 +274,18 @@ public class MessageConverter_0_10_to_0_8 implements MessageConverter<MessageTra
             }
 
             @Override
-            public boolean isInMemory()
+            public boolean isInContentInMemory()
             {
                 return true;
             }
 
             @Override
+            public long getInMemorySize()
+            {
+                return getMetadataSize() + getContentSize();
+            }
+
+            @Override
             public boolean flowToDisk()
             {
                 return false;
diff --git a/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java b/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
index 6d9c190..68c96c9 100644
--- a/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
+++ b/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
@@ -115,12 +115,18 @@ public class MessageConverter_0_8_to_0_10  implements MessageConverter<AMQMessag
             }
 
             @Override
-            public boolean isInMemory()
+            public boolean isInContentInMemory()
             {
                 return true;
             }
 
             @Override
+            public long getInMemorySize()
+            {
+                return getContentSize() + getMetadataSize();
+            }
+
+            @Override
             public boolean flowToDisk()
             {
                 return false;
diff --git a/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java b/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
index 3c6058f..738bd9a 100644
--- a/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
+++ b/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
@@ -140,12 +140,18 @@ public class MessageConverter_1_0_to_v0_8 implements MessageConverter<Message_1_
             }
 
             @Override
-            public boolean isInMemory()
+            public boolean isInContentInMemory()
             {
                 return true;
             }
 
             @Override
+            public long getInMemorySize()
+            {
+                return getContentSize() + getMetadataSize();
+            }
+
+            @Override
             public boolean flowToDisk()
             {
                 return false;
diff --git a/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java b/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java
index 336da71..da203bb 100644
--- a/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java
+++ b/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java
@@ -109,4 +109,10 @@ public class DerbyMessageStoreTest extends MessageStoreTestCase
         return new DerbyMessageStore();
     }
 
+    @Override
+    protected boolean flowToDiskSupported()
+    {
+        return true;
+    }
+
 }
diff --git a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
index b2359b7..df02e55 100644
--- a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
+++ b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
@@ -1637,11 +1637,36 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
         }
 
         @Override
-        public synchronized boolean isInMemory()
+        public synchronized boolean isInContentInMemory()
         {
             return _messageDataRef != null && (_messageDataRef.isHardRef() || _messageDataRef.getData() != null);
         }
 
+        @Override
+        public synchronized long getInMemorySize()
+        {
+            long size = 0;
+            if (_messageDataRef != null)
+            {
+                if (_messageDataRef.isHardRef())
+                {
+                    size += getMetadataSize() + getContentSize();
+                }
+                else
+                {
+                    if (_messageDataRef.getMetaData() != null)
+                    {
+                        size += getMetadataSize();
+                    }
+                    if (_messageDataRef.getData() != null)
+                    {
+                        size += getContentSize();
+                    }
+                }
+            }
+            return size;
+        }
+
         private boolean stored()
         {
             return _messageDataRef != null && !_messageDataRef.isHardRef();
diff --git a/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java b/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
index a152337..9f12062 100644
--- a/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
+++ b/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
@@ -323,6 +323,12 @@ public class JDBCMessageStoreTest extends MessageStoreTestCase
         return new GenericJDBCMessageStore();
     }
 
+    @Override
+    protected boolean flowToDiskSupported()
+    {
+        return true;
+    }
+
     private Connection openConnection() throws SQLException
     {
         return TestJdbcUtils.openConnection(_connectionURL);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org