You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by va...@apache.org on 2022/06/09 07:20:58 UTC
[qpid-broker-j] branch main updated: QPID-8590: [Broker-J] Purge on flow to disk queue (#128)
This is an automated email from the ASF dual-hosted git repository.
vavrtom pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git
The following commit(s) were added to refs/heads/main by this push:
new 1a4ac67109 QPID-8590: [Broker-J] Purge on flow to disk queue (#128)
1a4ac67109 is described below
commit 1a4ac67109b53ef397024f49ac8594b3f1a17e59
Author: Daniil Kirilyuk <da...@gmail.com>
AuthorDate: Thu Jun 9 09:20:54 2022 +0200
QPID-8590: [Broker-J] Purge on flow to disk queue (#128)
---
.../queue/FlowToDiskOverflowPolicyHandler.java | 138 ++++++-
.../queue/FlowToDiskOverflowPolicyHandlerTest.java | 412 ++++++++++++++++++++-
2 files changed, 516 insertions(+), 34 deletions(-)
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandler.java b/broker-core/src/main/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandler.java
index 36fcf9e245..06e3d43122 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandler.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandler.java
@@ -18,54 +18,96 @@
*/
package org.apache.qpid.server.queue;
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.message.MessageDeletedException;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.OverflowPolicy;
import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.store.StoredMessage;
+/**
+ * If the queue breaches a limit, newly arriving messages are written to disk and the in-memory representation of
+ * the message is minimised. The Broker will transparently retrieve messages from disk as they are required by a
+ * consumer or management. The flow to disk policy does not actually restrict the overall size of the queue,
+ * merely the space occupied in memory.
+ */
public class FlowToDiskOverflowPolicyHandler implements OverflowPolicyHandler
{
+ /**
+ * Delegate handler
+ */
private final Handler _handler;
+ /**
+ * Constructor injects queue
+ *
+ * @param queue Queue
+ */
FlowToDiskOverflowPolicyHandler(final Queue<?> queue)
{
_handler = new Handler(queue);
queue.addChangeListener(_handler);
}
+ /**
+ * Checks queue overflow (is called when adding or deleting queue entry)
+ *
+ * @param newlyEnqueued QueueEntry enqueued (is null on deletion)
+ */
@Override
public void checkOverflow(final QueueEntry newlyEnqueued)
{
- _handler.checkOverflow(newlyEnqueued);
-
+ _handler.checkOverflow(newlyEnqueued, true);
}
+ /**
+ * Delegate handler
+ */
private static class Handler extends OverflowPolicyMaximumQueueDepthChangeListener
{
+ /**
+ * Queue instance
+ */
private final Queue<?> _queue;
+ /**
+ * Constructor injects queue
+ *
+ * @param queue Queue
+ */
private Handler(final Queue<?> queue)
{
super(OverflowPolicy.FLOW_TO_DISK);
_queue = queue;
}
+ /**
+ * Is called when max queue depth is changed
+ *
+ * @param queue Queue
+ */
@Override
void onMaximumQueueDepthChange(final Queue<?> queue)
{
- checkOverflow(null);
+ checkOverflow(null, false);
}
- private void checkOverflow(final QueueEntry newlyEnqueued)
+ /**
+ * Either flows messages to the disk or restores them into the memory
+ *
+ * @param newlyEnqueued QueueEntry (could be null in case of deletion or limit change)
+ * @param stopOnFirstMatch Whether flowing to disk / restoring to memory should be stopped after fist match
+ */
+ private void checkOverflow(final QueueEntry newlyEnqueued, final boolean stopOnFirstMatch)
{
- long maximumQueueDepthBytes = _queue.getMaximumQueueDepthBytes();
- long maximumQueueDepthMessages = _queue.getMaximumQueueDepthMessages();
+ final long maximumQueueDepthBytes = _queue.getMaximumQueueDepthBytes();
+ final long maximumQueueDepthMessages = _queue.getMaximumQueueDepthMessages();
if (maximumQueueDepthBytes >= 0L || maximumQueueDepthMessages >= 0L)
{
if (newlyEnqueued == null)
{
- flowTailToDiskIfNecessary(maximumQueueDepthBytes, maximumQueueDepthMessages);
+ balanceTailIfNecessary(maximumQueueDepthBytes, maximumQueueDepthMessages, stopOnFirstMatch);
}
else
{
@@ -74,45 +116,76 @@ public class FlowToDiskOverflowPolicyHandler implements OverflowPolicyHandler
}
}
- private void flowTailToDiskIfNecessary(final long maximumQueueDepthBytes, final long maximumQueueDepthMessages)
+ /**
+ * Either flows tail messages to disk or restores them into the memory depending on the overflow limit.
+ * Boolean flag stopOnFirstMatch is true when enqueueing or deleting messages, is false when overflow limit
+ * was changed
+ *
+ * @param maximumQueueDepthBytes Max queue depth in bytes
+ * @param maximumQueueDepthMessages Max queue depth in messages
+ * @param stopOnFirstMatch Whether flowing to disk / restoring to memory should be stopped after fist match
+ */
+ private void balanceTailIfNecessary(
+ final long maximumQueueDepthBytes,
+ final long maximumQueueDepthMessages,
+ final boolean stopOnFirstMatch)
{
final long queueDepthBytes = _queue.getQueueDepthBytes();
final long queueDepthMessages = _queue.getQueueDepthMessages();
+ final boolean isMaximumQueueDepthBytesUnlimited = maximumQueueDepthBytes >= 0L;
- if ((maximumQueueDepthBytes >= 0L && queueDepthBytes > maximumQueueDepthBytes) ||
- (maximumQueueDepthMessages >= 0L && queueDepthMessages > maximumQueueDepthMessages))
+ if ((isMaximumQueueDepthBytesUnlimited && queueDepthBytes > maximumQueueDepthBytes) ||
+ (maximumQueueDepthMessages >= 0L && queueDepthMessages >= maximumQueueDepthMessages))
{
long cumulativeDepthBytes = 0;
long cumulativeDepthMessages = 0;
- QueueEntryIterator queueEntryIterator = _queue.queueEntryIterator();
+ final QueueEntryIterator queueEntryIterator = _queue.queueEntryIterator();
while (queueEntryIterator.advance())
{
- QueueEntry node = queueEntryIterator.getNode();
+ final QueueEntry node = queueEntryIterator.getNode();
if (node != null && !node.isDeleted())
{
- ServerMessage message = node.getMessage();
+ final ServerMessage<?> message = node.getMessage();
if (message != null)
{
cumulativeDepthMessages++;
cumulativeDepthBytes += message.getSizeIncludingHeader();
- if (cumulativeDepthBytes > maximumQueueDepthBytes
+ final boolean isInMemory = message.getStoredMessage().isInContentInMemory();
+
+ if ((isMaximumQueueDepthBytesUnlimited && cumulativeDepthBytes > maximumQueueDepthBytes)
|| cumulativeDepthMessages > maximumQueueDepthMessages)
{
+ if (stopOnFirstMatch || !isInMemory)
+ {
+ break;
+ }
flowToDisk(node);
}
+ else if (!isInMemory)
+ {
+ restoreInMemory(node);
+ }
}
}
}
}
}
- private void flowNewEntryToDiskIfNecessary(final QueueEntry newlyEnqueued,
- final long maximumQueueDepthBytes,
- final long maximumQueueDepthMessages)
+ /**
+ * Flows queue entry to the disk (when overflow limit is exceeded)
+ *
+ * @param newlyEnqueued QueueEntry
+ * @param maximumQueueDepthBytes Max queue depth in bytes
+ * @param maximumQueueDepthMessages Max queue depth in messages
+ */
+ private void flowNewEntryToDiskIfNecessary(
+ final QueueEntry newlyEnqueued,
+ final long maximumQueueDepthBytes,
+ final long maximumQueueDepthMessages)
{
final long queueDepthBytes = _queue.getQueueDepthBytes();
final long queueDepthMessages = _queue.getQueueDepthMessages();
@@ -124,9 +197,14 @@ public class FlowToDiskOverflowPolicyHandler implements OverflowPolicyHandler
}
}
+ /**
+ * Flows queue entry to the disk
+ *
+ * @param node QueueEntry
+ */
private void flowToDisk(final QueueEntry node)
{
- try (MessageReference messageReference = node.getMessage().newReference())
+ try (final MessageReference<?> messageReference = node.getMessage().newReference())
{
if (node.getQueue().checkValid(node))
{
@@ -138,5 +216,29 @@ public class FlowToDiskOverflowPolicyHandler implements OverflowPolicyHandler
// pass
}
}
+
+ /**
+ * Restores queue entry in memory
+ *
+ * @param node QueueEntry
+ */
+ private void restoreInMemory(final QueueEntry node)
+ {
+ try (final MessageReference<?> messageReference = node.getMessage().newReference())
+ {
+ if (node.getQueue().checkValid(node))
+ {
+ final StoredMessage<?> storedMessage = messageReference.getMessage().getStoredMessage();
+ try (final QpidByteBuffer qpidByteBuffer = storedMessage.getContent(0, storedMessage.getContentSize()))
+ {
+ qpidByteBuffer.dispose();
+ }
+ }
+ }
+ catch (MessageDeletedException mde)
+ {
+ // pass
+ }
+ }
}
}
diff --git a/broker-core/src/test/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandlerTest.java b/broker-core/src/test/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandlerTest.java
index 9d8f997055..739968e0a6 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandlerTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandlerTest.java
@@ -20,19 +20,24 @@
package org.apache.qpid.server.queue;
+import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import org.junit.Before;
import org.junit.Test;
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.BrokerTestHelper;
@@ -43,10 +48,18 @@ import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.test.utils.UnitTestBase;
+/**
+ * Tests for FlowToDiskOverflowPolicyHandler.
+ *
+ * Here mockito verify() for flowToDisk() checks whether message was flowed to disk or not,
+ * and mockito verify() for getContent() checks whether message was restored into memory or not
+ */
public class FlowToDiskOverflowPolicyHandlerTest extends UnitTestBase
{
private Queue<?> _queue;
+ private Map<StoredMessage<?>, Boolean> _state;
+
@Before
public void setUp() throws Exception
{
@@ -59,28 +72,112 @@ public class FlowToDiskOverflowPolicyHandlerTest extends UnitTestBase
attributes.put(Queue.OVERFLOW_POLICY, OverflowPolicy.FLOW_TO_DISK);
_queue = (AbstractQueue<?>) virtualHost.createChild(Queue.class, attributes);
+ _state = new HashMap<>();
}
+ /**
+ * Lowers the overflow limit, forcing messages to be flowed to the disk
+ */
@Test
- public void testOverflowAfterLoweringLimit() throws Exception
+ public void overflowAfterLoweringLimit()
{
- ServerMessage<?> message = createMessage(10L);
- _queue.enqueue(message, null, null);
- StoredMessage<?> storedMessage = message.getStoredMessage();
- verify(storedMessage, never()).flowToDisk();
+ _queue.setAttributes(Collections.singletonMap(Queue.MAXIMUM_QUEUE_DEPTH_MESSAGES, 10));
- ServerMessage<?> message2 = createMessage(10L);
- _queue.enqueue(message2, null, null);
- StoredMessage<?> storedMessage2 = message2.getStoredMessage();
- verify(storedMessage2, never()).flowToDisk();
+ List<ServerMessage<?>> messages = new ArrayList<>();
- _queue.setAttributes(Collections.singletonMap(Queue.MAXIMUM_QUEUE_DEPTH_BYTES, 10));
+ for (int i = 0; i < 15; i ++)
+ {
+ messages.add(createMessage(10L));
+ _queue.enqueue(messages.get(i), null, null);
+ }
- verify(storedMessage2).flowToDisk();
+ for (int i = 0; i < 10; i ++)
+ {
+ verify(messages.get(i).getStoredMessage(), never()).flowToDisk();
+ verify(messages.get(i).getStoredMessage(), never()).getContent(anyInt(), anyInt());
+ }
+
+ for (int i = 10; i < 15; i ++)
+ {
+ verify(messages.get(i).getStoredMessage()).flowToDisk();
+ verify(messages.get(i).getStoredMessage(), never()).getContent(anyInt(), anyInt());
+ }
+
+ _queue.setAttributes(Collections.singletonMap(Queue.MAXIMUM_QUEUE_DEPTH_MESSAGES, 5));
+
+ for (int i = 0; i < 5; i ++)
+ {
+ verify(messages.get(i).getStoredMessage(), never()).flowToDisk();
+ verify(messages.get(i).getStoredMessage(), never()).getContent(anyInt(), anyInt());
+ }
+
+ for (int i = 5; i < 10; i ++)
+ {
+ verify(messages.get(i).getStoredMessage()).flowToDisk();
+ verify(messages.get(i).getStoredMessage(), never()).getContent(anyInt(), anyInt());
+ }
+
+ for (int i = 11; i < 15; i ++)
+ {
+ verify(messages.get(i).getStoredMessage()).flowToDisk();
+ verify(messages.get(i).getStoredMessage(), never()).getContent(anyInt(), anyInt());
+ }
+ }
+
+ /**
+ * Rises the overflow limit, forcing messages to be restored in the memory from the flowed to disk state
+ */
+ @Test
+ public void overflowAfterRisingLimit()
+ {
+ _queue.setAttributes(Collections.singletonMap(Queue.MAXIMUM_QUEUE_DEPTH_MESSAGES, 5));
+
+ List<ServerMessage<?>> messages = new ArrayList<>();
+
+ for (int i = 0; i < 15; i ++)
+ {
+ messages.add(createMessage(10L));
+ _queue.enqueue(messages.get(i), null, null);
+ }
+
+ for (int i = 0; i < 5; i ++)
+ {
+ verify(messages.get(i).getStoredMessage(), never()).flowToDisk();
+ verify(messages.get(i).getStoredMessage(), never()).getContent(anyInt(), anyInt());
+ }
+
+ for (int i = 5; i < 15; i ++)
+ {
+ verify(messages.get(i).getStoredMessage()).flowToDisk();
+ verify(messages.get(i).getStoredMessage(), never()).getContent(anyInt(), anyInt());
+ }
+
+ _queue.setAttributes(Collections.singletonMap(Queue.MAXIMUM_QUEUE_DEPTH_MESSAGES, 10));
+
+ // first five messages should be neither be flowed to the disk nor restored to memory (nothing changed to them)
+ for (int i = 0; i < 5; i ++)
+ {
+ verify(messages.get(i).getStoredMessage(), never()).flowToDisk();
+ verify(messages.get(i).getStoredMessage(), never()).getContent(anyInt(), anyInt());
+ }
+
+ // middle five messages should be restored to memory
+ for (int i = 5; i < 10; i ++)
+ {
+ verify(messages.get(i).getStoredMessage()).flowToDisk();
+ verify(messages.get(i).getStoredMessage()).getContent(anyInt(), anyInt());
+ }
+
+ // last five messages should remain flowed to the disk
+ for (int i = 11; i < 15; i ++)
+ {
+ verify(messages.get(i).getStoredMessage()).flowToDisk();
+ verify(messages.get(i).getStoredMessage(), never()).getContent(anyInt(), anyInt());
+ }
}
@Test
- public void testOverflowOnSecondMessage() throws Exception
+ public void overflowOnSecondMessage()
{
_queue.setAttributes(Collections.singletonMap(Queue.MAXIMUM_QUEUE_DEPTH_BYTES, 10));
ServerMessage<?> message = createMessage(10L);
@@ -95,7 +192,7 @@ public class FlowToDiskOverflowPolicyHandlerTest extends UnitTestBase
}
@Test
- public void testBytesOverflow() throws Exception
+ public void bytesOverflow()
{
_queue.setAttributes(Collections.singletonMap(Queue.MAXIMUM_QUEUE_DEPTH_BYTES, 0));
ServerMessage<?> message = createMessage(1L);
@@ -105,7 +202,7 @@ public class FlowToDiskOverflowPolicyHandlerTest extends UnitTestBase
}
@Test
- public void testMessagesOverflow() throws Exception
+ public void messagesOverflow()
{
_queue.setAttributes(Collections.singletonMap(Queue.MAXIMUM_QUEUE_DEPTH_MESSAGES, 0));
ServerMessage<?> message = createMessage(1L);
@@ -115,7 +212,7 @@ public class FlowToDiskOverflowPolicyHandlerTest extends UnitTestBase
}
@Test
- public void testNoOverflow() throws Exception
+ public void noOverflow()
{
_queue.setAttributes(Collections.singletonMap(Queue.MAXIMUM_QUEUE_DEPTH_MESSAGES, 10));
_queue.setAttributes(Collections.singletonMap(Queue.MAXIMUM_QUEUE_DEPTH_BYTES, 10));
@@ -125,6 +222,276 @@ public class FlowToDiskOverflowPolicyHandlerTest extends UnitTestBase
verify(storedMessage, never()).flowToDisk();
}
+ @Test
+ public void oneByOneDeletion()
+ {
+ _queue.setAttributes(Collections.singletonMap(Queue.MAXIMUM_QUEUE_DEPTH_MESSAGES, 5));
+
+ List<ServerMessage<?>> messages = new ArrayList<>();
+
+ for (int i = 0; i < 10; i ++)
+ {
+ messages.add(createMessage(10L));
+ _queue.enqueue(messages.get(i), null, null);
+ }
+
+ for (int i = 0; i < 5; i ++)
+ {
+ verify(messages.get(i).getStoredMessage(), never()).flowToDisk();
+ }
+
+ for (int i = 5; i < 10; i ++)
+ {
+ verify(messages.get(i).getStoredMessage()).flowToDisk();
+ }
+
+ QueueEntryIterator it = _queue.queueEntryIterator();
+ it.advance();
+ _queue.deleteEntry(it.getNode());
+
+ for (int i = 0; i < 5; i ++)
+ {
+ verify(messages.get(i).getStoredMessage(), never()).flowToDisk();
+ }
+ verify(messages.get(5).getStoredMessage()).getContent(anyInt(), anyInt());
+ verify(messages.get(6).getStoredMessage(), never()).getContent(anyInt(), anyInt());
+ verify(messages.get(7).getStoredMessage(), never()).getContent(anyInt(), anyInt());
+ verify(messages.get(8).getStoredMessage(), never()).getContent(anyInt(), anyInt());
+ verify(messages.get(9).getStoredMessage(), never()).getContent(anyInt(), anyInt());
+
+ it.advance();
+ _queue.deleteEntry(it.getNode());
+
+ for (int i = 0; i < 5; i ++)
+ {
+ verify(messages.get(i).getStoredMessage(), never()).flowToDisk();
+ }
+ verify(messages.get(5).getStoredMessage()).getContent(anyInt(), anyInt());
+ verify(messages.get(6).getStoredMessage()).getContent(anyInt(), anyInt());
+ verify(messages.get(7).getStoredMessage(), never()).getContent(anyInt(), anyInt());
+ verify(messages.get(8).getStoredMessage(), never()).getContent(anyInt(), anyInt());
+ verify(messages.get(9).getStoredMessage(), never()).getContent(anyInt(), anyInt());
+
+ it.advance();
+ _queue.deleteEntry(it.getNode());
+
+ for (int i = 0; i < 5; i ++)
+ {
+ verify(messages.get(i).getStoredMessage(), never()).flowToDisk();
+ }
+ verify(messages.get(5).getStoredMessage()).getContent(anyInt(), anyInt());
+ verify(messages.get(6).getStoredMessage()).getContent(anyInt(), anyInt());
+ verify(messages.get(7).getStoredMessage()).getContent(anyInt(), anyInt());
+ verify(messages.get(8).getStoredMessage(), never()).getContent(anyInt(), anyInt());
+ verify(messages.get(9).getStoredMessage(), never()).getContent(anyInt(), anyInt());
+
+ it.advance();
+ _queue.deleteEntry(it.getNode());
+
+ for (int i = 0; i < 5; i ++)
+ {
+ verify(messages.get(i).getStoredMessage(), never()).flowToDisk();
+ }
+ verify(messages.get(5).getStoredMessage()).getContent(anyInt(), anyInt());
+ verify(messages.get(6).getStoredMessage()).getContent(anyInt(), anyInt());
+ verify(messages.get(7).getStoredMessage()).getContent(anyInt(), anyInt());
+ verify(messages.get(8).getStoredMessage()).getContent(anyInt(), anyInt());
+ verify(messages.get(9).getStoredMessage(), never()).getContent(anyInt(), anyInt());
+
+ it.advance();
+ _queue.deleteEntry(it.getNode());
+
+ for (int i = 0; i < 5; i ++)
+ {
+ verify(messages.get(i).getStoredMessage(), never()).flowToDisk();
+ }
+ verify(messages.get(5).getStoredMessage()).getContent(anyInt(), anyInt());
+ verify(messages.get(6).getStoredMessage()).getContent(anyInt(), anyInt());
+ verify(messages.get(7).getStoredMessage()).getContent(anyInt(), anyInt());
+ verify(messages.get(8).getStoredMessage()).getContent(anyInt(), anyInt());
+ verify(messages.get(9).getStoredMessage()).getContent(anyInt(), anyInt());
+ }
+
+ @Test
+ public void clearQueue()
+ {
+ _queue.setAttributes(Collections.singletonMap(Queue.MAXIMUM_QUEUE_DEPTH_MESSAGES, 5));
+
+ List<ServerMessage<?>> messages = new ArrayList<>();
+
+ for (int i = 0; i < 15; i ++)
+ {
+ messages.add(createMessage(10L));
+ _queue.enqueue(messages.get(i), null, null);
+ }
+ assertEquals(15, _queue.getQueueDepthMessages());
+
+ for (int i = 0; i < 5; i ++)
+ {
+ verify(messages.get(i).getStoredMessage(), never()).flowToDisk();
+ verify(messages.get(i).getStoredMessage(), never()).getContent(anyInt(), anyInt());
+ }
+
+ for (int i = 5; i < 15; i ++)
+ {
+ verify(messages.get(i).getStoredMessage()).flowToDisk();
+ verify(messages.get(i).getStoredMessage(), never()).getContent(anyInt(), anyInt());
+ }
+
+ long deleted = _queue.clearQueue();
+
+ assertEquals(15, deleted);
+ assertEquals(0, _queue.getQueueDepthMessages());
+
+ for (int i = 0; i < 5; i ++)
+ {
+ verify(messages.get(i).getStoredMessage(), never()).flowToDisk();
+ verify(messages.get(i).getStoredMessage(), never()).getContent(anyInt(), anyInt());
+ }
+
+ for (int i = 5; i < 15; i ++)
+ {
+ verify(messages.get(i).getStoredMessage()).flowToDisk();
+ verify(messages.get(i).getStoredMessage()).getContent(anyInt(), anyInt());
+ }
+ }
+
+ /**
+ * Deletes messages 5-10 of 15 in the queue with the limit 5:
+ *
+ * o o o o o | _ _ _ _ _ | _ _ _ _ _
+ * =>
+ * o o o o o | x x x x x | _ _ _ _ _
+ * =>
+ * o o o o o | _ _ _ _ _
+ */
+ @Test
+ public void deleteMessagesAfterLimit()
+ {
+ _queue.setAttributes(Collections.singletonMap(Queue.MAXIMUM_QUEUE_DEPTH_MESSAGES, 5));
+
+ List<ServerMessage<?>> messages = new ArrayList<>();
+ List<QueueEntry> entries = new ArrayList<>();
+
+ for (int i = 0; i < 15; i ++)
+ {
+ messages.add(createMessage(10L));
+ _queue.enqueue(messages.get(i), null, null);
+ }
+ assertEquals(15, _queue.getQueueDepthMessages());
+
+ for (int i = 0; i < 5; i ++)
+ {
+ verify(messages.get(i).getStoredMessage(), never()).flowToDisk();
+ verify(messages.get(i).getStoredMessage(), never()).getContent(anyInt(), anyInt());
+ }
+
+ for (int i = 5; i < 15; i ++)
+ {
+ verify(messages.get(i).getStoredMessage()).flowToDisk();
+ verify(messages.get(i).getStoredMessage(), never()).getContent(anyInt(), anyInt());
+ }
+
+ QueueEntryIterator it = _queue.queueEntryIterator();
+ while (it.advance())
+ {
+ entries.add(it.getNode());
+ }
+
+ for (int i = 5; i < 10; i ++)
+ {
+ _queue.deleteEntry(entries.get(i));
+ }
+
+ assertEquals(10, _queue.getQueueDepthMessages());
+
+ // first 5 messages shouldn't be either flowed to disk or restored in memory, they remain without changes
+ for (int i = 0; i < 5; i ++)
+ {
+ verify(messages.get(i).getStoredMessage(), never()).flowToDisk();
+ verify(messages.get(i).getStoredMessage(), never()).getContent(anyInt(), anyInt());
+ }
+
+ // last 5 messages should be first flowed to disk but never restored in memory
+ for (int i = 5; i < 10; i ++)
+ {
+ verify(messages.get(i).getStoredMessage()).flowToDisk();
+ verify(messages.get(i).getStoredMessage(), never()).getContent(anyInt(), anyInt());
+ }
+ }
+
+ /**
+ * Deletes messages 3-7 of 15 in the queue with the limit 5:
+ *
+ * o o o o o | _ _ _ _ _ | _ _ _ _ _
+ * =>
+ * o o o x x | x x x _ _ | _ _ _ _ _
+ * =>
+ * o o o o o | _ _ _ _ _
+ */
+ @Test
+ public void deleteMessagesAroundLimit()
+ {
+ _queue.setAttributes(Collections.singletonMap(Queue.MAXIMUM_QUEUE_DEPTH_MESSAGES, 5));
+
+ List<ServerMessage<?>> messages = new ArrayList<>();
+ List<QueueEntry> entries = new ArrayList<>();
+
+ for (int i = 0; i < 15; i ++)
+ {
+ messages.add(createMessage(10L));
+ _queue.enqueue(messages.get(i), null, null);
+ }
+ assertEquals(15, _queue.getQueueDepthMessages());
+
+ for (int i = 0; i < 5; i ++)
+ {
+ verify(messages.get(i).getStoredMessage(), never()).flowToDisk();
+ verify(messages.get(i).getStoredMessage(), never()).getContent(anyInt(), anyInt());
+ }
+
+ for (int i = 5; i < 15; i ++)
+ {
+ verify(messages.get(i).getStoredMessage()).flowToDisk();
+ verify(messages.get(i).getStoredMessage(), never()).getContent(anyInt(), anyInt());
+ }
+
+ QueueEntryIterator it = _queue.queueEntryIterator();
+ while (it.advance())
+ {
+ entries.add(it.getNode());
+ }
+
+ for (int i = 3; i < 8; i ++)
+ {
+ _queue.deleteEntry(entries.get(i));
+ }
+
+ assertEquals(10, _queue.getQueueDepthMessages());
+
+ // first 5 messages shouldn't be either flowed to disk or restored in memory
+ for (int i = 0; i < 5; i ++)
+ {
+ verify(messages.get(i).getStoredMessage(), never()).flowToDisk();
+ verify(messages.get(i).getStoredMessage(), never()).getContent(anyInt(), anyInt());
+ }
+
+ // messages 5-10 should be first flowed to disk and restored in memory
+ for (int i = 5; i < 10; i ++)
+ {
+ verify(messages.get(i).getStoredMessage()).flowToDisk();
+ verify(messages.get(i).getStoredMessage()).getContent(anyInt(), anyInt());
+ }
+
+ // messages 5-10 should be flowed to disk and never restored in memory
+ for (int i = 11; i < 15; i ++)
+ {
+ verify(messages.get(i).getStoredMessage()).flowToDisk();
+ verify(messages.get(i).getStoredMessage(), never()).getContent(anyInt(), anyInt());
+ }
+ }
+
+ @SuppressWarnings("rawtypes")
private ServerMessage createMessage(long size)
{
ServerMessage message = mock(ServerMessage.class);
@@ -133,9 +500,22 @@ public class FlowToDiskOverflowPolicyHandlerTest extends UnitTestBase
when(message.getValidationStatus()).thenReturn(ServerMessage.ValidationStatus.VALID);
StoredMessage storedMessage = mock(StoredMessage.class);
+ _state.put(storedMessage, true);
when(message.getStoredMessage()).thenReturn(storedMessage);
- when(storedMessage.isInContentInMemory()).thenReturn(true);
+ when(storedMessage.isInContentInMemory()).thenAnswer(invocation -> _state.get(message.getStoredMessage()));
when(storedMessage.getInMemorySize()).thenReturn(size);
+ when(storedMessage.flowToDisk()).thenAnswer(invocation ->
+ {
+ StoredMessage sm = (StoredMessage) invocation.getMock();
+ _state.put(sm, false);
+ return true;
+ });
+ when(storedMessage.getContent(anyInt(), anyInt())).thenAnswer(invocation ->
+ {
+ StoredMessage sm = (StoredMessage) invocation.getMock();
+ _state.put(sm, true);
+ return QpidByteBuffer.allocate((int)size);
+ });
MessageReference ref = mock(MessageReference.class);
when(ref.getMessage()).thenReturn(message);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org