You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by va...@apache.org on 2021/10/08 06:07:40 UTC

[qpid-broker-j] branch main updated: QPID-8563: [Broker-J] Purge all queues (#109)

This is an automated email from the ASF dual-hosted git repository.

vavrtom pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git


The following commit(s) were added to refs/heads/main by this push:
     new 3f553ff  QPID-8563: [Broker-J] Purge all queues (#109)
3f553ff is described below

commit 3f553ffba38ec70793506e7826b3bdad5342c05e
Author: Marek Laca <mk...@users.noreply.github.com>
AuthorDate: Fri Oct 8 08:07:34 2021 +0200

    QPID-8563: [Broker-J] Purge all queues (#109)
---
 .../server/virtualhost/AbstractVirtualHost.java    | 108 +++++++++++++++-
 .../virtualhost/QueueManagingVirtualHost.java      |   6 +
 .../virtualhost/AbstractVirtualHostTest.java       | 137 ++++++++++++++++++++-
 .../resources/js/qpid/management/VirtualHost.js    |  35 ++++++
 .../src/main/java/resources/showVirtualHost.html   |   1 +
 5 files changed, 285 insertions(+), 2 deletions(-)

diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index 0b1d1de..09c4fc1 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -100,7 +100,35 @@ import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.message.RoutingResult;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.message.internal.InternalMessage;
-import org.apache.qpid.server.model.*;
+import org.apache.qpid.server.model.AbstractConfigurationChangeListener;
+import org.apache.qpid.server.model.AbstractConfiguredObject;
+import org.apache.qpid.server.model.Binding;
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.BrokerModel;
+import org.apache.qpid.server.model.ConfigurationChangeListener;
+import org.apache.qpid.server.model.ConfigurationExtractor;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.Connection;
+import org.apache.qpid.server.model.Content;
+import org.apache.qpid.server.model.CustomRestHeaders;
+import org.apache.qpid.server.model.DoOnConfigThread;
+import org.apache.qpid.server.model.Exchange;
+import org.apache.qpid.server.model.ExclusivityPolicy;
+import org.apache.qpid.server.model.ManageableMessage;
+import org.apache.qpid.server.model.ManagedAttributeField;
+import org.apache.qpid.server.model.NoFactoryForTypeException;
+import org.apache.qpid.server.model.NotFoundException;
+import org.apache.qpid.server.model.Param;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.model.RestContentHeader;
+import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.model.StateTransition;
+import org.apache.qpid.server.model.SystemConfig;
+import org.apache.qpid.server.model.UUIDGenerator;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.model.VirtualHostAccessControlProvider;
+import org.apache.qpid.server.model.VirtualHostLogger;
+import org.apache.qpid.server.model.VirtualHostNode;
 import org.apache.qpid.server.model.port.AmqpPort;
 import org.apache.qpid.server.model.preferences.Preference;
 import org.apache.qpid.server.model.preferences.UserPreferences;
@@ -3482,4 +3510,82 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
                     inMemoryMessageSize);
         }
     }
+
+    @Override
+    public long clearMatchingQueues(String queueNamePattern)
+    {
+        LOGGER.debug("Clearing the queues with name that matches the pattern: '{}'", queueNamePattern);
+        try
+        {
+            final Pattern pattern = Pattern.compile(queueNamePattern);
+
+            long count = 0;
+            for (final Queue<?> queue : getChildren(Queue.class))
+            {
+                if (pattern.matcher(queue.getName()).matches())
+                {
+                    LOGGER.debug("Clearing the queue with name '{}' and ID '{}'", queue.getName(), queue.getId());
+                    count += queue.clearQueue();
+                }
+            }
+            return count;
+        }
+        catch (PatternSyntaxException e)
+        {
+            final String message = String.format("Failed to compile queue name pattern: '%s'", queueNamePattern);
+            LOGGER.debug(message, e);
+            throw new IllegalArgumentException(message, e);
+        }
+    }
+
+    @Override
+    public long clearQueues(Collection<String> queues)
+    {
+        final Map<UUID, String> uuid = new HashMap<>();
+        final Set<String> names = new HashSet<>();
+        for (final String id : queues)
+        {
+            try
+            {
+                uuid.put(UUID.fromString(id), id);
+            }
+            catch (IllegalArgumentException e)
+            {
+                LOGGER.trace(String.format("'%s' is not a valid queue ID", id), e);
+                names.add(id);
+            }
+        }
+
+        final Collection<Queue> queueList = getChildren(Queue.class);
+        long count = 0;
+
+        if (!uuid.isEmpty())
+        {
+            LOGGER.debug("Clearing the queues with IDs: {}", uuid.values());
+            for (final Queue<?> queue : queueList)
+            {
+                if (uuid.remove(queue.getId()) != null)
+                {
+                    LOGGER.debug("Clearing the queue with ID '{}'", queue.getId());
+                    count += queue.clearQueue();
+                }
+            }
+            names.addAll(uuid.values());
+        }
+
+        if (!names.isEmpty())
+        {
+            LOGGER.debug("Clearing the queues with names: {}", names);
+            for (final Queue<?> queue : queueList)
+            {
+                if (names.contains(queue.getName()))
+                {
+                    LOGGER.debug("Clearing the queue with name '{}'", queue.getName());
+                    count += queue.clearQueue();
+                }
+            }
+        }
+
+        return count;
+    }
 }
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java
index 690c076..003b4d1 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java
@@ -343,6 +343,12 @@ public interface QueueManagingVirtualHost<X extends QueueManagingVirtualHost<X>>
                            @Param(name = "role", description = "whether to remove only sending links (\"SENDER\"), receiving links (\"RECEIVER\") or both (\"BOTH\")", validValues = {"SENDER", "RECEIVER", "BOTH"}, defaultValue = "BOTH") String role,
                            @Param(name = "linkNamePattern", description = "Regular Expression to match the link names to be removed.", defaultValue = ".*") String linkNamePattern);
 
+    @ManagedOperation(description = "Purge every queue with the name that matches the regular expression.", changesConfiguredObjectState = false)
+    long clearMatchingQueues(@Param(name = "queueNamePattern", description = "Regular Expression to match the queue name.", defaultValue = ".*") String queueNamePattern);
+
+    @ManagedOperation(description = "Purge queues in provided list.", changesConfiguredObjectState = false)
+    long clearQueues(@Param(name = "queues", description = "Collection of queue IDs or names.") Collection<String> queues);
+
     Queue<?> getSubscriptionQueue(final String exchangeName,
                                   final Map<String, Object> attributes,
                                   final Map<String, Map<String, Object>> bindings);
diff --git a/broker-core/src/test/java/org/apache/qpid/server/virtualhost/AbstractVirtualHostTest.java b/broker-core/src/test/java/org/apache/qpid/server/virtualhost/AbstractVirtualHostTest.java
index 00f1785..b22555d 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/virtualhost/AbstractVirtualHostTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/virtualhost/AbstractVirtualHostTest.java
@@ -21,6 +21,7 @@
 package org.apache.qpid.server.virtualhost;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.any;
@@ -33,8 +34,13 @@ import static org.mockito.Mockito.when;
 
 import java.io.File;
 import java.security.Principal;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
@@ -46,6 +52,7 @@ import ch.qos.logback.core.spi.FilterReply;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -57,13 +64,13 @@ import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.BrokerModel;
 import org.apache.qpid.server.model.BrokerTestHelper;
 import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.model.State;
 import org.apache.qpid.server.model.SystemConfig;
 import org.apache.qpid.server.model.VirtualHost;
 import org.apache.qpid.server.model.VirtualHostNode;
 import org.apache.qpid.server.security.AccessControl;
 import org.apache.qpid.server.store.DurableConfigurationStore;
-import org.apache.qpid.server.store.Event;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.preferences.PreferenceStore;
 import org.apache.qpid.server.util.FileUtils;
@@ -350,4 +357,132 @@ public class AbstractVirtualHostTest extends UnitTestBase
         action.run();
         assertTrue("Did not receive expected log message", logMessageReceivedLatch.await(2, TimeUnit.SECONDS));
     }
+
+    @Test
+    public void testClearMatchingQueues()
+    {
+        final List<Queue> queues = new ArrayList<>();
+        final Queue<?> queueA = newQueue("queueA");
+        queues.add(queueA);
+        final Queue<?> topic = newQueue("queue-topic");
+        queues.add(topic);
+        final Queue<?> queueB = newQueue("queueB");
+        queues.add(queueB);
+
+        newVirtualHost(queues).clearMatchingQueues("queue.?");
+        Mockito.verify(queueA).clearQueue();
+        Mockito.verify(queueB).clearQueue();
+        Mockito.verify(topic, Mockito.never()).clearQueue();
+    }
+
+    @Test
+    public void testClearMatchingQueues_any()
+    {
+        final List<Queue> queues = new ArrayList<>();
+        queues.add(newQueue("queueA"));
+        queues.add(newQueue("queue-topic"));
+        queues.add(newQueue("queueB"));
+
+        newVirtualHost(queues).clearMatchingQueues(".*");
+        for (final Queue<?> queue : queues)
+        {
+            Mockito.verify(queue).clearQueue();
+        }
+    }
+
+    @Test
+    public void testClearMatchingQueues_exception()
+    {
+        final List<Queue> queues = new ArrayList<>();
+        queues.add(newQueue("queueA"));
+        queues.add(newQueue("queue-topic"));
+        queues.add(newQueue("queueB"));
+
+        final AbstractVirtualHost host = newVirtualHost(queues);
+        try
+        {
+            host.clearMatchingQueues(".*[");
+            fail("An exception is expected!");
+        }
+        catch (RuntimeException e)
+        {
+            assertNotNull(e.getMessage());
+        }
+        for (final Queue<?> queue : queues)
+        {
+            Mockito.verify(queue, Mockito.never()).clearQueue();
+        }
+    }
+
+    @Test
+    public void testClearQueues()
+    {
+        final List<Queue> queues = new ArrayList<>();
+        final Queue<?> queueA = newQueue("queueA");
+        queues.add(queueA);
+        final Queue<?> topic = newQueue("queue-topic");
+        queues.add(topic);
+        final Queue<?> queueB = newQueue("queueB");
+        queues.add(queueB);
+
+        newVirtualHost(queues).clearQueues(Arrays.asList("queue-topic", queueB.getId().toString()));
+        Mockito.verify(queueA, Mockito.never()).clearQueue();
+        Mockito.verify(queueB).clearQueue();
+        Mockito.verify(topic).clearQueue();
+    }
+
+    @Test
+    public void testClearQueues_none()
+    {
+        final List<Queue> queues = new ArrayList<>();
+        queues.add(newQueue("queueA"));
+        queues.add(newQueue("queue-topic"));
+        queues.add(newQueue("queueB"));
+
+        newVirtualHost(queues).clearQueues(Collections.emptySet());
+        for (final Queue<?> queue : queues)
+        {
+            Mockito.verify(queue, Mockito.never()).clearQueue();
+        }
+    }
+
+    private AbstractVirtualHost newVirtualHost(List<Queue> queues)
+    {
+        final Map<String, Object> attributes = Collections.singletonMap(AbstractVirtualHost.NAME, getTestName());
+        final MessageStore store = mock(MessageStore.class);
+        return new AbstractVirtualHost(attributes, _node)
+        {
+            @Override
+            protected MessageStore createMessageStore()
+            {
+                return store;
+            }
+
+            @Override
+            public Collection getChildren(Class clazz)
+            {
+                if (clazz == Queue.class)
+                {
+                    return queues;
+                }
+                else
+                {
+                    return super.getChildren(clazz);
+                }
+
+            }
+        };
+    }
+
+    private Queue<?> newQueue(final String name)
+    {
+        final Queue<?> queue = Mockito.mock(Queue.class);
+        Mockito.doReturn(name).when(queue).getName();
+
+        final UUID uuid = UUID.randomUUID();
+        Mockito.doReturn(uuid).when(queue).getId();
+
+        Mockito.doReturn(Queue.class).when(queue).getCategoryClass();
+        return queue;
+    }
 }
diff --git a/broker-plugins/management-http/src/main/java/resources/js/qpid/management/VirtualHost.js b/broker-plugins/management-http/src/main/java/resources/js/qpid/management/VirtualHost.js
index c19a9f8..8d7ef9a 100644
--- a/broker-plugins/management-http/src/main/java/resources/js/qpid/management/VirtualHost.js
+++ b/broker-plugins/management-http/src/main/java/resources/js/qpid/management/VirtualHost.js
@@ -114,6 +114,12 @@ define(["dojo/parser",
                             "delete", "queue");
                     });
 
+                    const clearQueueButton = query(".clearQueueButton", containerNode)[0];
+                    registry.byNode(clearQueueButton).on("click", function (evt)
+                    {
+                        that._clearQueues(that.vhostUpdater.queuesGrid);
+                    });
+
                     var addExchangeButton = query(".addExchangeButton", containerNode)[0];
                     registry.byNode(addExchangeButton).on("click", function (evt)
                     {
@@ -286,6 +292,35 @@ define(["dojo/parser",
             return confirmed;
         };
 
+        VirtualHost.prototype._clearQueues = function (dgrid)
+        {
+            let selected = [];
+            for (let item in dgrid.selection)
+            {
+                if (dgrid.selection.hasOwnProperty(item) && dgrid.selection[item])
+                {
+                    selected.push(item);
+                }
+            }
+            if (selected.length > 0)
+            {
+                const plural = selected.length === 1 ? "" : "s";
+                if (confirm(lang.replace("Are you sure you want to purge {0} queue{1}?", [selected.length, plural])))
+                {
+                    const modelObj = {
+                        type: "virtualhost",
+                        name: "clearQueues",
+                        parent: this.modelObj
+                    };
+                    this.management.update(modelObj, {"queues" : selected}).then(lang.hitch(this, function ()
+                    {
+                        dgrid.clearSelection();
+                        this.vhostUpdater.update();
+                    }));
+                }
+            }
+        };
+
         VirtualHost.prototype.close = function ()
         {
             updater.remove(this.vhostUpdater);
diff --git a/broker-plugins/management-http/src/main/java/resources/showVirtualHost.html b/broker-plugins/management-http/src/main/java/resources/showVirtualHost.html
index 217d43f..300f862 100644
--- a/broker-plugins/management-http/src/main/java/resources/showVirtualHost.html
+++ b/broker-plugins/management-http/src/main/java/resources/showVirtualHost.html
@@ -127,6 +127,7 @@
             <div class="queues"></div>
             <button data-dojo-type="dijit.form.Button" class="addQueueButton">Add Queue</button>
             <button data-dojo-type="dijit.form.Button" class="deleteQueueButton">Delete Queue</button>
+            <button data-dojo-type="dijit.form.Button" class="clearQueueButton">Clear Queue</button>
         </div>
         <br/>
         <div data-dojo-type="dijit.TitlePane" data-dojo-props="title: 'Connections'" class="virtualHostConnections">

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org