You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2019/11/24 00:20:27 UTC

[qpid-broker-j] 02/04: QPID-8378: [Broker-J] Make sure that message reference is released for deleted node

This is an automated email from the ASF dual-hosted git repository.

orudyy pushed a commit to branch 7.1.x
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git

commit f1742d3e9a0c58720a333be944f91be5f68ae0ce
Author: Alex Rudyy <or...@apache.org>
AuthorDate: Thu Nov 14 23:13:46 2019 +0000

    QPID-8378: [Broker-J] Make sure that message reference is released for deleted node
    
    (cherry picked from commit f17f16aded492953984ece42d90afd52ab44408a)
---
 .../apache/qpid/server/queue/AbstractQueue.java    |  8 +---
 .../qpid/server/queue/AbstractQueueTestBase.java   | 53 ++++++++++++++++++++++
 2 files changed, 55 insertions(+), 6 deletions(-)

diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 22e42d4..cc6608d 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -1714,15 +1714,11 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
         {
             QueueEntry node = queueListIterator.getNode();
             MessageReference reference = node.newMessageReference();
-            if(reference != null && !node.isDeleted())
+            if(reference != null)
             {
                 try
                 {
-                    if (!reference.getMessage().checkValid())
-                    {
-                        malformedEntry(node);
-                    }
-                    else if (visitor.visit(node))
+                    if (!node.isDeleted() && reference.getMessage().checkValid() && visitor.visit(node))
                     {
                         break;
                     }
diff --git a/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java b/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
index 4fcea19..0a4f987 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
@@ -34,6 +34,7 @@ import static org.mockito.Mockito.atLeastOnce;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.verifyZeroInteractions;
 import static org.mockito.Mockito.when;
 
@@ -1257,6 +1258,58 @@ abstract class AbstractQueueTestBase extends UnitTestBase
                      _queue.getQueueDepthMessages());
     }
 
+    @Test
+    public void testVisit()
+    {
+        final ServerMessage message = createMessage(1L, 2, 3);
+        _queue.enqueue(message, null, null);
+
+        final QueueEntryVisitor visitor = mock(QueueEntryVisitor.class);
+        _queue.visit(visitor);
+
+        final ArgumentCaptor<QueueEntry> argument = ArgumentCaptor.forClass(QueueEntry.class);
+        verify(visitor).visit(argument.capture());
+
+        final QueueEntry queueEntry = argument.getValue();
+        assertEquals(message, queueEntry.getMessage());
+        verify(message.newReference()).release();
+    }
+
+    @Test
+    public void testVisitWhenNodeDeletedAfterAdvance()
+    {
+        final QueueEntryList list = mock(QueueEntryList.class);
+
+        final Map<String,Object> attributes = new HashMap<>();
+        attributes.put(Queue.NAME, _qname);
+        attributes.put(Queue.OWNER, _owner);
+
+        @SuppressWarnings("unchecked")
+        final Queue queue = new AbstractQueue(attributes, _virtualHost)
+        {
+            @Override
+            QueueEntryList getEntries()
+            {
+                return list;
+            }
+
+        };
+
+        final MessageReference reference = mock(MessageReference.class);
+        final QueueEntry entry = mock(QueueEntry.class);
+        when(entry.isDeleted()).thenReturn(true);
+        when(entry.newMessageReference()).thenReturn(reference);
+        final QueueEntryIterator iterator = mock(QueueEntryIterator.class);
+        when(iterator.advance()).thenReturn(true, false);
+        when(iterator.getNode()).thenReturn(entry);
+        when(list.iterator()).thenReturn(iterator);
+
+        final QueueEntryVisitor visitor = mock(QueueEntryVisitor.class);
+        queue.visit(visitor);
+        verifyNoMoreInteractions(visitor);
+        verify(reference).release();
+    }
+
     private void makeVirtualHostTargetSizeExceeded()
     {
         final InternalMessage message = InternalMessage.createMessage(_virtualHost.getMessageStore(),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org