You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by va...@apache.org on 2022/06/09 07:20:58 UTC

[qpid-broker-j] branch main updated: QPID-8590: [Broker-J] Purge on flow to disk queue (#128)

This is an automated email from the ASF dual-hosted git repository.

vavrtom pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git


The following commit(s) were added to refs/heads/main by this push:
     new 1a4ac67109 QPID-8590: [Broker-J] Purge on flow to disk queue (#128)
1a4ac67109 is described below

commit 1a4ac67109b53ef397024f49ac8594b3f1a17e59
Author: Daniil Kirilyuk <da...@gmail.com>
AuthorDate: Thu Jun 9 09:20:54 2022 +0200

    QPID-8590: [Broker-J] Purge on flow to disk queue (#128)
---
 .../queue/FlowToDiskOverflowPolicyHandler.java     | 138 ++++++-
 .../queue/FlowToDiskOverflowPolicyHandlerTest.java | 412 ++++++++++++++++++++-
 2 files changed, 516 insertions(+), 34 deletions(-)

diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandler.java b/broker-core/src/main/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandler.java
index 36fcf9e245..06e3d43122 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandler.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandler.java
@@ -18,54 +18,96 @@
  */
 package org.apache.qpid.server.queue;
 
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.message.MessageDeletedException;
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.OverflowPolicy;
 import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.store.StoredMessage;
 
+/**
+ * If the queue breaches a limit, newly arriving messages are written to disk and the in-memory representation of
+ * the message is minimised. The Broker will transparently retrieve messages from disk as they are required by a
+ * consumer or management. The flow to disk policy does not actually restrict the overall size of the queue,
+ * merely the space occupied in memory.
+ */
 public class FlowToDiskOverflowPolicyHandler implements OverflowPolicyHandler
 {
+    /**
+     * Delegate handler
+     */
     private final Handler _handler;
 
+    /**
+     * Constructor injects queue
+     *
+     * @param queue Queue
+     */
     FlowToDiskOverflowPolicyHandler(final Queue<?> queue)
     {
         _handler = new Handler(queue);
         queue.addChangeListener(_handler);
     }
 
+    /**
+     * Checks queue overflow (is called when adding or deleting queue entry)
+     *
+     * @param newlyEnqueued QueueEntry enqueued (is null on deletion)
+     */
     @Override
     public void checkOverflow(final QueueEntry newlyEnqueued)
     {
-        _handler.checkOverflow(newlyEnqueued);
-
+        _handler.checkOverflow(newlyEnqueued, true);
     }
 
+    /**
+     * Delegate handler
+     */
     private static class Handler extends OverflowPolicyMaximumQueueDepthChangeListener
     {
+        /**
+         * Queue instance
+         */
         private final Queue<?> _queue;
 
+        /**
+         * Constructor injects queue
+         *
+         * @param queue Queue
+         */
         private Handler(final Queue<?> queue)
         {
             super(OverflowPolicy.FLOW_TO_DISK);
             _queue = queue;
         }
 
+        /**
+         * Is called when max queue depth is changed
+         *
+         * @param queue Queue
+         */
         @Override
         void onMaximumQueueDepthChange(final Queue<?> queue)
         {
-            checkOverflow(null);
+            checkOverflow(null, false);
         }
 
-        private void checkOverflow(final QueueEntry newlyEnqueued)
+        /**
+         * Either flows messages to the disk or restores them into the memory
+         *
+         * @param newlyEnqueued QueueEntry (could be null in case of deletion or limit change)
+         * @param stopOnFirstMatch Whether flowing to disk / restoring to memory should be stopped after fist match
+         */
+        private void checkOverflow(final QueueEntry newlyEnqueued, final boolean stopOnFirstMatch)
         {
-            long maximumQueueDepthBytes = _queue.getMaximumQueueDepthBytes();
-            long maximumQueueDepthMessages = _queue.getMaximumQueueDepthMessages();
+            final long maximumQueueDepthBytes = _queue.getMaximumQueueDepthBytes();
+            final long maximumQueueDepthMessages = _queue.getMaximumQueueDepthMessages();
             if (maximumQueueDepthBytes >= 0L || maximumQueueDepthMessages >= 0L)
             {
                 if (newlyEnqueued == null)
                 {
-                    flowTailToDiskIfNecessary(maximumQueueDepthBytes, maximumQueueDepthMessages);
+                    balanceTailIfNecessary(maximumQueueDepthBytes, maximumQueueDepthMessages, stopOnFirstMatch);
                 }
                 else
                 {
@@ -74,45 +116,76 @@ public class FlowToDiskOverflowPolicyHandler implements OverflowPolicyHandler
             }
         }
 
-        private void flowTailToDiskIfNecessary(final long maximumQueueDepthBytes, final long maximumQueueDepthMessages)
+        /**
+         * Either flows tail messages to disk or restores them into the memory depending on the overflow limit.
+         * Boolean flag stopOnFirstMatch is true when enqueueing or deleting messages, is false when overflow limit
+         * was changed
+         *
+         * @param maximumQueueDepthBytes Max queue depth in bytes
+         * @param maximumQueueDepthMessages Max queue depth in messages
+         * @param stopOnFirstMatch Whether flowing to disk / restoring to memory should be stopped after fist match
+         */
+        private void balanceTailIfNecessary(
+            final long maximumQueueDepthBytes,
+            final long maximumQueueDepthMessages,
+            final boolean stopOnFirstMatch)
         {
             final long queueDepthBytes = _queue.getQueueDepthBytes();
             final long queueDepthMessages = _queue.getQueueDepthMessages();
+            final boolean isMaximumQueueDepthBytesUnlimited = maximumQueueDepthBytes >= 0L;
 
-            if ((maximumQueueDepthBytes >= 0L && queueDepthBytes > maximumQueueDepthBytes) ||
-                (maximumQueueDepthMessages >= 0L && queueDepthMessages > maximumQueueDepthMessages))
+            if ((isMaximumQueueDepthBytesUnlimited && queueDepthBytes > maximumQueueDepthBytes) ||
+                (maximumQueueDepthMessages >= 0L && queueDepthMessages >= maximumQueueDepthMessages))
             {
 
                 long cumulativeDepthBytes = 0;
                 long cumulativeDepthMessages = 0;
 
-                QueueEntryIterator queueEntryIterator = _queue.queueEntryIterator();
+                final QueueEntryIterator queueEntryIterator = _queue.queueEntryIterator();
                 while (queueEntryIterator.advance())
                 {
-                    QueueEntry node = queueEntryIterator.getNode();
+                    final QueueEntry node = queueEntryIterator.getNode();
 
                     if (node != null && !node.isDeleted())
                     {
-                        ServerMessage message = node.getMessage();
+                        final ServerMessage<?> message = node.getMessage();
                         if (message != null)
                         {
                             cumulativeDepthMessages++;
                             cumulativeDepthBytes += message.getSizeIncludingHeader();
 
-                            if (cumulativeDepthBytes > maximumQueueDepthBytes
+                            final boolean isInMemory = message.getStoredMessage().isInContentInMemory();
+
+                            if ((isMaximumQueueDepthBytesUnlimited && cumulativeDepthBytes > maximumQueueDepthBytes)
                                 || cumulativeDepthMessages > maximumQueueDepthMessages)
                             {
+                                if (stopOnFirstMatch || !isInMemory)
+                                {
+                                    break;
+                                }
                                 flowToDisk(node);
                             }
+                            else if (!isInMemory)
+                            {
+                                restoreInMemory(node);
+                            }
                         }
                     }
                 }
             }
         }
 
-        private void flowNewEntryToDiskIfNecessary(final QueueEntry newlyEnqueued,
-                                                   final long maximumQueueDepthBytes,
-                                                   final long maximumQueueDepthMessages)
+        /**
+         * Flows queue entry to the disk (when overflow limit is exceeded)
+         *
+         * @param newlyEnqueued QueueEntry
+         * @param maximumQueueDepthBytes Max queue depth in bytes
+         * @param maximumQueueDepthMessages Max queue depth in messages
+         */
+        private void flowNewEntryToDiskIfNecessary(
+            final QueueEntry newlyEnqueued,
+            final long maximumQueueDepthBytes,
+            final long maximumQueueDepthMessages)
         {
             final long queueDepthBytes = _queue.getQueueDepthBytes();
             final long queueDepthMessages = _queue.getQueueDepthMessages();
@@ -124,9 +197,14 @@ public class FlowToDiskOverflowPolicyHandler implements OverflowPolicyHandler
             }
         }
 
+        /**
+         * Flows queue entry to the disk
+         *
+         * @param node QueueEntry
+         */
         private void flowToDisk(final QueueEntry node)
         {
-            try (MessageReference messageReference = node.getMessage().newReference())
+            try (final MessageReference<?> messageReference = node.getMessage().newReference())
             {
                 if (node.getQueue().checkValid(node))
                 {
@@ -138,5 +216,29 @@ public class FlowToDiskOverflowPolicyHandler implements OverflowPolicyHandler
                 // pass
             }
         }
+
+        /**
+         * Restores queue entry in memory
+         *
+         * @param node QueueEntry
+         */
+        private void restoreInMemory(final QueueEntry node)
+        {
+            try (final MessageReference<?> messageReference = node.getMessage().newReference())
+            {
+                if (node.getQueue().checkValid(node))
+                {
+                    final StoredMessage<?> storedMessage = messageReference.getMessage().getStoredMessage();
+                    try (final QpidByteBuffer qpidByteBuffer = storedMessage.getContent(0, storedMessage.getContentSize()))
+                    {
+                        qpidByteBuffer.dispose();
+                    }
+                }
+            }
+            catch (MessageDeletedException mde)
+            {
+                // pass
+            }
+        }
     }
 }
diff --git a/broker-core/src/test/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandlerTest.java b/broker-core/src/test/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandlerTest.java
index 9d8f997055..739968e0a6 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandlerTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandlerTest.java
@@ -20,19 +20,24 @@
 
 package org.apache.qpid.server.queue;
 
+import static org.junit.Assert.assertEquals;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import org.junit.Before;
 import org.junit.Test;
 
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.BrokerTestHelper;
@@ -43,10 +48,18 @@ import org.apache.qpid.server.store.StoredMessage;
 import org.apache.qpid.server.store.TransactionLogResource;
 import org.apache.qpid.test.utils.UnitTestBase;
 
+/**
+ * Tests for FlowToDiskOverflowPolicyHandler.
+ *
+ * Here mockito verify() for flowToDisk() checks whether message was flowed to disk or not,
+ * and mockito verify() for getContent() checks whether message was restored into memory or not
+ */
 public class FlowToDiskOverflowPolicyHandlerTest extends UnitTestBase
 {
     private Queue<?> _queue;
 
+    private Map<StoredMessage<?>, Boolean> _state;
+
     @Before
     public void setUp() throws Exception
     {
@@ -59,28 +72,112 @@ public class FlowToDiskOverflowPolicyHandlerTest extends UnitTestBase
         attributes.put(Queue.OVERFLOW_POLICY, OverflowPolicy.FLOW_TO_DISK);
 
         _queue = (AbstractQueue<?>) virtualHost.createChild(Queue.class, attributes);
+        _state = new HashMap<>();
     }
 
+    /**
+     * Lowers the overflow limit, forcing messages to be flowed to the disk
+     */
     @Test
-    public void testOverflowAfterLoweringLimit() throws Exception
+    public void overflowAfterLoweringLimit()
     {
-        ServerMessage<?> message = createMessage(10L);
-        _queue.enqueue(message, null, null);
-        StoredMessage<?> storedMessage = message.getStoredMessage();
-        verify(storedMessage, never()).flowToDisk();
+        _queue.setAttributes(Collections.singletonMap(Queue.MAXIMUM_QUEUE_DEPTH_MESSAGES, 10));
 
-        ServerMessage<?> message2 = createMessage(10L);
-        _queue.enqueue(message2, null, null);
-        StoredMessage<?> storedMessage2 = message2.getStoredMessage();
-        verify(storedMessage2, never()).flowToDisk();
+        List<ServerMessage<?>> messages = new ArrayList<>();
 
-        _queue.setAttributes(Collections.singletonMap(Queue.MAXIMUM_QUEUE_DEPTH_BYTES, 10));
+        for (int i = 0; i < 15; i ++)
+        {
+            messages.add(createMessage(10L));
+            _queue.enqueue(messages.get(i), null, null);
+        }
 
-        verify(storedMessage2).flowToDisk();
+        for (int i = 0; i < 10; i ++)
+        {
+            verify(messages.get(i).getStoredMessage(), never()).flowToDisk();
+            verify(messages.get(i).getStoredMessage(), never()).getContent(anyInt(), anyInt());
+        }
+
+        for (int i = 10; i < 15; i ++)
+        {
+            verify(messages.get(i).getStoredMessage()).flowToDisk();
+            verify(messages.get(i).getStoredMessage(), never()).getContent(anyInt(), anyInt());
+        }
+
+        _queue.setAttributes(Collections.singletonMap(Queue.MAXIMUM_QUEUE_DEPTH_MESSAGES, 5));
+
+        for (int i = 0; i < 5; i ++)
+        {
+            verify(messages.get(i).getStoredMessage(), never()).flowToDisk();
+            verify(messages.get(i).getStoredMessage(), never()).getContent(anyInt(), anyInt());
+        }
+
+        for (int i = 5; i < 10; i ++)
+        {
+            verify(messages.get(i).getStoredMessage()).flowToDisk();
+            verify(messages.get(i).getStoredMessage(), never()).getContent(anyInt(), anyInt());
+        }
+
+        for (int i = 11; i < 15; i ++)
+        {
+            verify(messages.get(i).getStoredMessage()).flowToDisk();
+            verify(messages.get(i).getStoredMessage(), never()).getContent(anyInt(), anyInt());
+        }
+    }
+
+    /**
+     * Rises the overflow limit, forcing messages to be restored in the memory from the flowed to disk state
+     */
+    @Test
+    public void overflowAfterRisingLimit()
+    {
+        _queue.setAttributes(Collections.singletonMap(Queue.MAXIMUM_QUEUE_DEPTH_MESSAGES, 5));
+
+        List<ServerMessage<?>> messages = new ArrayList<>();
+
+        for (int i = 0; i < 15; i ++)
+        {
+            messages.add(createMessage(10L));
+            _queue.enqueue(messages.get(i), null, null);
+        }
+
+        for (int i = 0; i < 5; i ++)
+        {
+            verify(messages.get(i).getStoredMessage(), never()).flowToDisk();
+            verify(messages.get(i).getStoredMessage(), never()).getContent(anyInt(), anyInt());
+        }
+
+        for (int i = 5; i < 15; i ++)
+        {
+            verify(messages.get(i).getStoredMessage()).flowToDisk();
+            verify(messages.get(i).getStoredMessage(), never()).getContent(anyInt(), anyInt());
+        }
+
+        _queue.setAttributes(Collections.singletonMap(Queue.MAXIMUM_QUEUE_DEPTH_MESSAGES, 10));
+
+        // first five messages should be neither be flowed to the disk nor restored to memory (nothing changed to them)
+        for (int i = 0; i < 5; i ++)
+        {
+            verify(messages.get(i).getStoredMessage(), never()).flowToDisk();
+            verify(messages.get(i).getStoredMessage(), never()).getContent(anyInt(), anyInt());
+        }
+
+        // middle five messages should be restored to memory
+        for (int i = 5; i < 10; i ++)
+        {
+            verify(messages.get(i).getStoredMessage()).flowToDisk();
+            verify(messages.get(i).getStoredMessage()).getContent(anyInt(), anyInt());
+        }
+
+        // last five messages should remain flowed to the disk
+        for (int i = 11; i < 15; i ++)
+        {
+            verify(messages.get(i).getStoredMessage()).flowToDisk();
+            verify(messages.get(i).getStoredMessage(), never()).getContent(anyInt(), anyInt());
+        }
     }
 
     @Test
-    public void testOverflowOnSecondMessage() throws Exception
+    public void overflowOnSecondMessage()
     {
         _queue.setAttributes(Collections.singletonMap(Queue.MAXIMUM_QUEUE_DEPTH_BYTES, 10));
         ServerMessage<?> message = createMessage(10L);
@@ -95,7 +192,7 @@ public class FlowToDiskOverflowPolicyHandlerTest extends UnitTestBase
     }
 
     @Test
-    public void testBytesOverflow() throws Exception
+    public void bytesOverflow()
     {
         _queue.setAttributes(Collections.singletonMap(Queue.MAXIMUM_QUEUE_DEPTH_BYTES, 0));
         ServerMessage<?> message = createMessage(1L);
@@ -105,7 +202,7 @@ public class FlowToDiskOverflowPolicyHandlerTest extends UnitTestBase
     }
 
     @Test
-    public void testMessagesOverflow() throws Exception
+    public void messagesOverflow()
     {
         _queue.setAttributes(Collections.singletonMap(Queue.MAXIMUM_QUEUE_DEPTH_MESSAGES, 0));
         ServerMessage<?> message = createMessage(1L);
@@ -115,7 +212,7 @@ public class FlowToDiskOverflowPolicyHandlerTest extends UnitTestBase
     }
 
     @Test
-    public void testNoOverflow() throws Exception
+    public void noOverflow()
     {
         _queue.setAttributes(Collections.singletonMap(Queue.MAXIMUM_QUEUE_DEPTH_MESSAGES, 10));
         _queue.setAttributes(Collections.singletonMap(Queue.MAXIMUM_QUEUE_DEPTH_BYTES, 10));
@@ -125,6 +222,276 @@ public class FlowToDiskOverflowPolicyHandlerTest extends UnitTestBase
         verify(storedMessage, never()).flowToDisk();
     }
 
+    @Test
+    public void oneByOneDeletion()
+    {
+        _queue.setAttributes(Collections.singletonMap(Queue.MAXIMUM_QUEUE_DEPTH_MESSAGES, 5));
+
+        List<ServerMessage<?>> messages = new ArrayList<>();
+
+        for (int i = 0; i < 10; i ++)
+        {
+            messages.add(createMessage(10L));
+            _queue.enqueue(messages.get(i), null, null);
+        }
+
+        for (int i = 0; i < 5; i ++)
+        {
+            verify(messages.get(i).getStoredMessage(), never()).flowToDisk();
+        }
+
+        for (int i = 5; i < 10; i ++)
+        {
+            verify(messages.get(i).getStoredMessage()).flowToDisk();
+        }
+
+        QueueEntryIterator it = _queue.queueEntryIterator();
+        it.advance();
+        _queue.deleteEntry(it.getNode());
+
+        for (int i = 0; i < 5; i ++)
+        {
+            verify(messages.get(i).getStoredMessage(), never()).flowToDisk();
+        }
+        verify(messages.get(5).getStoredMessage()).getContent(anyInt(), anyInt());
+        verify(messages.get(6).getStoredMessage(), never()).getContent(anyInt(), anyInt());
+        verify(messages.get(7).getStoredMessage(), never()).getContent(anyInt(), anyInt());
+        verify(messages.get(8).getStoredMessage(), never()).getContent(anyInt(), anyInt());
+        verify(messages.get(9).getStoredMessage(), never()).getContent(anyInt(), anyInt());
+
+        it.advance();
+        _queue.deleteEntry(it.getNode());
+
+        for (int i = 0; i < 5; i ++)
+        {
+            verify(messages.get(i).getStoredMessage(), never()).flowToDisk();
+        }
+        verify(messages.get(5).getStoredMessage()).getContent(anyInt(), anyInt());
+        verify(messages.get(6).getStoredMessage()).getContent(anyInt(), anyInt());
+        verify(messages.get(7).getStoredMessage(), never()).getContent(anyInt(), anyInt());
+        verify(messages.get(8).getStoredMessage(), never()).getContent(anyInt(), anyInt());
+        verify(messages.get(9).getStoredMessage(), never()).getContent(anyInt(), anyInt());
+
+        it.advance();
+        _queue.deleteEntry(it.getNode());
+
+        for (int i = 0; i < 5; i ++)
+        {
+            verify(messages.get(i).getStoredMessage(), never()).flowToDisk();
+        }
+        verify(messages.get(5).getStoredMessage()).getContent(anyInt(), anyInt());
+        verify(messages.get(6).getStoredMessage()).getContent(anyInt(), anyInt());
+        verify(messages.get(7).getStoredMessage()).getContent(anyInt(), anyInt());
+        verify(messages.get(8).getStoredMessage(), never()).getContent(anyInt(), anyInt());
+        verify(messages.get(9).getStoredMessage(), never()).getContent(anyInt(), anyInt());
+
+        it.advance();
+        _queue.deleteEntry(it.getNode());
+
+        for (int i = 0; i < 5; i ++)
+        {
+            verify(messages.get(i).getStoredMessage(), never()).flowToDisk();
+        }
+        verify(messages.get(5).getStoredMessage()).getContent(anyInt(), anyInt());
+        verify(messages.get(6).getStoredMessage()).getContent(anyInt(), anyInt());
+        verify(messages.get(7).getStoredMessage()).getContent(anyInt(), anyInt());
+        verify(messages.get(8).getStoredMessage()).getContent(anyInt(), anyInt());
+        verify(messages.get(9).getStoredMessage(), never()).getContent(anyInt(), anyInt());
+
+        it.advance();
+        _queue.deleteEntry(it.getNode());
+
+        for (int i = 0; i < 5; i ++)
+        {
+            verify(messages.get(i).getStoredMessage(), never()).flowToDisk();
+        }
+        verify(messages.get(5).getStoredMessage()).getContent(anyInt(), anyInt());
+        verify(messages.get(6).getStoredMessage()).getContent(anyInt(), anyInt());
+        verify(messages.get(7).getStoredMessage()).getContent(anyInt(), anyInt());
+        verify(messages.get(8).getStoredMessage()).getContent(anyInt(), anyInt());
+        verify(messages.get(9).getStoredMessage()).getContent(anyInt(), anyInt());
+    }
+
+    @Test
+    public void clearQueue()
+    {
+        _queue.setAttributes(Collections.singletonMap(Queue.MAXIMUM_QUEUE_DEPTH_MESSAGES, 5));
+
+        List<ServerMessage<?>> messages = new ArrayList<>();
+
+        for (int i = 0; i < 15; i ++)
+        {
+            messages.add(createMessage(10L));
+            _queue.enqueue(messages.get(i), null, null);
+        }
+        assertEquals(15, _queue.getQueueDepthMessages());
+
+        for (int i = 0; i < 5; i ++)
+        {
+            verify(messages.get(i).getStoredMessage(), never()).flowToDisk();
+            verify(messages.get(i).getStoredMessage(), never()).getContent(anyInt(), anyInt());
+        }
+
+        for (int i = 5; i < 15; i ++)
+        {
+            verify(messages.get(i).getStoredMessage()).flowToDisk();
+            verify(messages.get(i).getStoredMessage(), never()).getContent(anyInt(), anyInt());
+        }
+
+        long deleted = _queue.clearQueue();
+
+        assertEquals(15, deleted);
+        assertEquals(0, _queue.getQueueDepthMessages());
+
+        for (int i = 0; i < 5; i ++)
+        {
+            verify(messages.get(i).getStoredMessage(), never()).flowToDisk();
+            verify(messages.get(i).getStoredMessage(), never()).getContent(anyInt(), anyInt());
+        }
+
+        for (int i = 5; i < 15; i ++)
+        {
+            verify(messages.get(i).getStoredMessage()).flowToDisk();
+            verify(messages.get(i).getStoredMessage()).getContent(anyInt(), anyInt());
+        }
+    }
+
+    /**
+     * Deletes messages 5-10 of 15 in the queue with the limit 5:
+     *
+     * o o o o o | _ _ _ _ _ | _ _ _ _ _
+     * =>
+     * o o o o o | x x x x x | _ _ _ _ _
+     * =>
+     * o o o o o | _ _ _ _ _
+     */
+    @Test
+    public void deleteMessagesAfterLimit()
+    {
+        _queue.setAttributes(Collections.singletonMap(Queue.MAXIMUM_QUEUE_DEPTH_MESSAGES, 5));
+
+        List<ServerMessage<?>> messages = new ArrayList<>();
+        List<QueueEntry> entries = new ArrayList<>();
+
+        for (int i = 0; i < 15; i ++)
+        {
+            messages.add(createMessage(10L));
+            _queue.enqueue(messages.get(i), null, null);
+        }
+        assertEquals(15, _queue.getQueueDepthMessages());
+
+        for (int i = 0; i < 5; i ++)
+        {
+            verify(messages.get(i).getStoredMessage(), never()).flowToDisk();
+            verify(messages.get(i).getStoredMessage(), never()).getContent(anyInt(), anyInt());
+        }
+
+        for (int i = 5; i < 15; i ++)
+        {
+            verify(messages.get(i).getStoredMessage()).flowToDisk();
+            verify(messages.get(i).getStoredMessage(), never()).getContent(anyInt(), anyInt());
+        }
+
+        QueueEntryIterator it = _queue.queueEntryIterator();
+        while (it.advance())
+        {
+            entries.add(it.getNode());
+        }
+
+        for (int i = 5; i < 10; i ++)
+        {
+            _queue.deleteEntry(entries.get(i));
+        }
+
+        assertEquals(10, _queue.getQueueDepthMessages());
+
+        // first 5 messages shouldn't be either flowed to disk or restored in memory, they remain without changes
+        for (int i = 0; i < 5; i ++)
+        {
+            verify(messages.get(i).getStoredMessage(), never()).flowToDisk();
+            verify(messages.get(i).getStoredMessage(), never()).getContent(anyInt(), anyInt());
+        }
+
+        // last 5 messages should be first flowed to disk but never restored in memory
+        for (int i = 5; i < 10; i ++)
+        {
+            verify(messages.get(i).getStoredMessage()).flowToDisk();
+            verify(messages.get(i).getStoredMessage(), never()).getContent(anyInt(), anyInt());
+        }
+    }
+
+    /**
+     * Deletes messages 3-7 of 15 in the queue with the limit 5:
+     *
+     * o o o o o | _ _ _ _ _ | _ _ _ _ _
+     * =>
+     * o o o x x | x x x _ _ | _ _ _ _ _
+     * =>
+     * o o o o o | _ _ _ _ _
+     */
+    @Test
+    public void deleteMessagesAroundLimit()
+    {
+        _queue.setAttributes(Collections.singletonMap(Queue.MAXIMUM_QUEUE_DEPTH_MESSAGES, 5));
+
+        List<ServerMessage<?>> messages = new ArrayList<>();
+        List<QueueEntry> entries = new ArrayList<>();
+
+        for (int i = 0; i < 15; i ++)
+        {
+            messages.add(createMessage(10L));
+            _queue.enqueue(messages.get(i), null, null);
+        }
+        assertEquals(15, _queue.getQueueDepthMessages());
+
+        for (int i = 0; i < 5; i ++)
+        {
+            verify(messages.get(i).getStoredMessage(), never()).flowToDisk();
+            verify(messages.get(i).getStoredMessage(), never()).getContent(anyInt(), anyInt());
+        }
+
+        for (int i = 5; i < 15; i ++)
+        {
+            verify(messages.get(i).getStoredMessage()).flowToDisk();
+            verify(messages.get(i).getStoredMessage(), never()).getContent(anyInt(), anyInt());
+        }
+
+        QueueEntryIterator it = _queue.queueEntryIterator();
+        while (it.advance())
+        {
+            entries.add(it.getNode());
+        }
+
+        for (int i = 3; i < 8; i ++)
+        {
+            _queue.deleteEntry(entries.get(i));
+        }
+
+        assertEquals(10, _queue.getQueueDepthMessages());
+
+        // first 5 messages shouldn't be either flowed to disk or restored in memory
+        for (int i = 0; i < 5; i ++)
+        {
+            verify(messages.get(i).getStoredMessage(), never()).flowToDisk();
+            verify(messages.get(i).getStoredMessage(), never()).getContent(anyInt(), anyInt());
+        }
+
+        // messages 5-10 should be first flowed to disk and restored in memory
+        for (int i = 5; i < 10; i ++)
+        {
+            verify(messages.get(i).getStoredMessage()).flowToDisk();
+            verify(messages.get(i).getStoredMessage()).getContent(anyInt(), anyInt());
+        }
+
+        // messages 5-10 should be flowed to disk and never restored in memory
+        for (int i = 11; i < 15; i ++)
+        {
+            verify(messages.get(i).getStoredMessage()).flowToDisk();
+            verify(messages.get(i).getStoredMessage(), never()).getContent(anyInt(), anyInt());
+        }
+    }
+
+    @SuppressWarnings("rawtypes")
     private ServerMessage createMessage(long size)
     {
         ServerMessage message = mock(ServerMessage.class);
@@ -133,9 +500,22 @@ public class FlowToDiskOverflowPolicyHandlerTest extends UnitTestBase
         when(message.getValidationStatus()).thenReturn(ServerMessage.ValidationStatus.VALID);
 
         StoredMessage storedMessage = mock(StoredMessage.class);
+        _state.put(storedMessage, true);
         when(message.getStoredMessage()).thenReturn(storedMessage);
-        when(storedMessage.isInContentInMemory()).thenReturn(true);
+        when(storedMessage.isInContentInMemory()).thenAnswer(invocation -> _state.get(message.getStoredMessage()));
         when(storedMessage.getInMemorySize()).thenReturn(size);
+        when(storedMessage.flowToDisk()).thenAnswer(invocation ->
+        {
+            StoredMessage sm = (StoredMessage) invocation.getMock();
+            _state.put(sm, false);
+            return true;
+        });
+        when(storedMessage.getContent(anyInt(), anyInt())).thenAnswer(invocation ->
+        {
+            StoredMessage sm = (StoredMessage) invocation.getMock();
+            _state.put(sm, true);
+            return QpidByteBuffer.allocate((int)size);
+        });
 
         MessageReference ref = mock(MessageReference.class);
         when(ref.getMessage()).thenReturn(message);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org