You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by va...@apache.org on 2021/10/08 06:07:40 UTC
[qpid-broker-j] branch main updated: QPID-8563: [Broker-J] Purge
all queues (#109)
This is an automated email from the ASF dual-hosted git repository.
vavrtom pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git
The following commit(s) were added to refs/heads/main by this push:
new 3f553ff QPID-8563: [Broker-J] Purge all queues (#109)
3f553ff is described below
commit 3f553ffba38ec70793506e7826b3bdad5342c05e
Author: Marek Laca <mk...@users.noreply.github.com>
AuthorDate: Fri Oct 8 08:07:34 2021 +0200
QPID-8563: [Broker-J] Purge all queues (#109)
---
.../server/virtualhost/AbstractVirtualHost.java | 108 +++++++++++++++-
.../virtualhost/QueueManagingVirtualHost.java | 6 +
.../virtualhost/AbstractVirtualHostTest.java | 137 ++++++++++++++++++++-
.../resources/js/qpid/management/VirtualHost.js | 35 ++++++
.../src/main/java/resources/showVirtualHost.html | 1 +
5 files changed, 285 insertions(+), 2 deletions(-)
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index 0b1d1de..09c4fc1 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -100,7 +100,35 @@ import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.message.RoutingResult;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.message.internal.InternalMessage;
-import org.apache.qpid.server.model.*;
+import org.apache.qpid.server.model.AbstractConfigurationChangeListener;
+import org.apache.qpid.server.model.AbstractConfiguredObject;
+import org.apache.qpid.server.model.Binding;
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.BrokerModel;
+import org.apache.qpid.server.model.ConfigurationChangeListener;
+import org.apache.qpid.server.model.ConfigurationExtractor;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.Connection;
+import org.apache.qpid.server.model.Content;
+import org.apache.qpid.server.model.CustomRestHeaders;
+import org.apache.qpid.server.model.DoOnConfigThread;
+import org.apache.qpid.server.model.Exchange;
+import org.apache.qpid.server.model.ExclusivityPolicy;
+import org.apache.qpid.server.model.ManageableMessage;
+import org.apache.qpid.server.model.ManagedAttributeField;
+import org.apache.qpid.server.model.NoFactoryForTypeException;
+import org.apache.qpid.server.model.NotFoundException;
+import org.apache.qpid.server.model.Param;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.model.RestContentHeader;
+import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.model.StateTransition;
+import org.apache.qpid.server.model.SystemConfig;
+import org.apache.qpid.server.model.UUIDGenerator;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.model.VirtualHostAccessControlProvider;
+import org.apache.qpid.server.model.VirtualHostLogger;
+import org.apache.qpid.server.model.VirtualHostNode;
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.model.preferences.Preference;
import org.apache.qpid.server.model.preferences.UserPreferences;
@@ -3482,4 +3510,82 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
inMemoryMessageSize);
}
}
+
+ @Override
+ public long clearMatchingQueues(String queueNamePattern)
+ {
+ LOGGER.debug("Clearing the queues with name that matches the pattern: '{}'", queueNamePattern);
+ try
+ {
+ final Pattern pattern = Pattern.compile(queueNamePattern);
+
+ long count = 0;
+ for (final Queue<?> queue : getChildren(Queue.class))
+ {
+ if (pattern.matcher(queue.getName()).matches())
+ {
+ LOGGER.debug("Clearing the queue with name '{}' and ID '{}'", queue.getName(), queue.getId());
+ count += queue.clearQueue();
+ }
+ }
+ return count;
+ }
+ catch (PatternSyntaxException e)
+ {
+ final String message = String.format("Failed to compile queue name pattern: '%s'", queueNamePattern);
+ LOGGER.debug(message, e);
+ throw new IllegalArgumentException(message, e);
+ }
+ }
+
+ @Override
+ public long clearQueues(Collection<String> queues)
+ {
+ final Map<UUID, String> uuid = new HashMap<>();
+ final Set<String> names = new HashSet<>();
+ for (final String id : queues)
+ {
+ try
+ {
+ uuid.put(UUID.fromString(id), id);
+ }
+ catch (IllegalArgumentException e)
+ {
+ LOGGER.trace(String.format("'%s' is not a valid queue ID", id), e);
+ names.add(id);
+ }
+ }
+
+ final Collection<Queue> queueList = getChildren(Queue.class);
+ long count = 0;
+
+ if (!uuid.isEmpty())
+ {
+ LOGGER.debug("Clearing the queues with IDs: {}", uuid.values());
+ for (final Queue<?> queue : queueList)
+ {
+ if (uuid.remove(queue.getId()) != null)
+ {
+ LOGGER.debug("Clearing the queue with ID '{}'", queue.getId());
+ count += queue.clearQueue();
+ }
+ }
+ names.addAll(uuid.values());
+ }
+
+ if (!names.isEmpty())
+ {
+ LOGGER.debug("Clearing the queues with names: {}", names);
+ for (final Queue<?> queue : queueList)
+ {
+ if (names.contains(queue.getName()))
+ {
+ LOGGER.debug("Clearing the queue with name '{}'", queue.getName());
+ count += queue.clearQueue();
+ }
+ }
+ }
+
+ return count;
+ }
}
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java
index 690c076..003b4d1 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java
@@ -343,6 +343,12 @@ public interface QueueManagingVirtualHost<X extends QueueManagingVirtualHost<X>>
@Param(name = "role", description = "whether to remove only sending links (\"SENDER\"), receiving links (\"RECEIVER\") or both (\"BOTH\")", validValues = {"SENDER", "RECEIVER", "BOTH"}, defaultValue = "BOTH") String role,
@Param(name = "linkNamePattern", description = "Regular Expression to match the link names to be removed.", defaultValue = ".*") String linkNamePattern);
+ @ManagedOperation(description = "Purge every queue with the name that matches the regular expression.", changesConfiguredObjectState = false)
+ long clearMatchingQueues(@Param(name = "queueNamePattern", description = "Regular Expression to match the queue name.", defaultValue = ".*") String queueNamePattern);
+
+ @ManagedOperation(description = "Purge queues in provided list.", changesConfiguredObjectState = false)
+ long clearQueues(@Param(name = "queues", description = "Collection of queue IDs or names.") Collection<String> queues);
+
Queue<?> getSubscriptionQueue(final String exchangeName,
final Map<String, Object> attributes,
final Map<String, Map<String, Object>> bindings);
diff --git a/broker-core/src/test/java/org/apache/qpid/server/virtualhost/AbstractVirtualHostTest.java b/broker-core/src/test/java/org/apache/qpid/server/virtualhost/AbstractVirtualHostTest.java
index 00f1785..b22555d 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/virtualhost/AbstractVirtualHostTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/virtualhost/AbstractVirtualHostTest.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.virtualhost;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.any;
@@ -33,8 +34,13 @@ import static org.mockito.Mockito.when;
import java.io.File;
import java.security.Principal;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
+import java.util.List;
import java.util.Map;
+import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -46,6 +52,7 @@ import ch.qos.logback.core.spi.FilterReply;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,13 +64,13 @@ import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.BrokerModel;
import org.apache.qpid.server.model.BrokerTestHelper;
import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.SystemConfig;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.model.VirtualHostNode;
import org.apache.qpid.server.security.AccessControl;
import org.apache.qpid.server.store.DurableConfigurationStore;
-import org.apache.qpid.server.store.Event;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.preferences.PreferenceStore;
import org.apache.qpid.server.util.FileUtils;
@@ -350,4 +357,132 @@ public class AbstractVirtualHostTest extends UnitTestBase
action.run();
assertTrue("Did not receive expected log message", logMessageReceivedLatch.await(2, TimeUnit.SECONDS));
}
+
+ @Test
+ public void testClearMatchingQueues()
+ {
+ final List<Queue> queues = new ArrayList<>();
+ final Queue<?> queueA = newQueue("queueA");
+ queues.add(queueA);
+ final Queue<?> topic = newQueue("queue-topic");
+ queues.add(topic);
+ final Queue<?> queueB = newQueue("queueB");
+ queues.add(queueB);
+
+ newVirtualHost(queues).clearMatchingQueues("queue.?");
+ Mockito.verify(queueA).clearQueue();
+ Mockito.verify(queueB).clearQueue();
+ Mockito.verify(topic, Mockito.never()).clearQueue();
+ }
+
+ @Test
+ public void testClearMatchingQueues_any()
+ {
+ final List<Queue> queues = new ArrayList<>();
+ queues.add(newQueue("queueA"));
+ queues.add(newQueue("queue-topic"));
+ queues.add(newQueue("queueB"));
+
+ newVirtualHost(queues).clearMatchingQueues(".*");
+ for (final Queue<?> queue : queues)
+ {
+ Mockito.verify(queue).clearQueue();
+ }
+ }
+
+ @Test
+ public void testClearMatchingQueues_exception()
+ {
+ final List<Queue> queues = new ArrayList<>();
+ queues.add(newQueue("queueA"));
+ queues.add(newQueue("queue-topic"));
+ queues.add(newQueue("queueB"));
+
+ final AbstractVirtualHost host = newVirtualHost(queues);
+ try
+ {
+ host.clearMatchingQueues(".*[");
+ fail("An exception is expected!");
+ }
+ catch (RuntimeException e)
+ {
+ assertNotNull(e.getMessage());
+ }
+ for (final Queue<?> queue : queues)
+ {
+ Mockito.verify(queue, Mockito.never()).clearQueue();
+ }
+ }
+
+ @Test
+ public void testClearQueues()
+ {
+ final List<Queue> queues = new ArrayList<>();
+ final Queue<?> queueA = newQueue("queueA");
+ queues.add(queueA);
+ final Queue<?> topic = newQueue("queue-topic");
+ queues.add(topic);
+ final Queue<?> queueB = newQueue("queueB");
+ queues.add(queueB);
+
+ newVirtualHost(queues).clearQueues(Arrays.asList("queue-topic", queueB.getId().toString()));
+ Mockito.verify(queueA, Mockito.never()).clearQueue();
+ Mockito.verify(queueB).clearQueue();
+ Mockito.verify(topic).clearQueue();
+ }
+
+ @Test
+ public void testClearQueues_none()
+ {
+ final List<Queue> queues = new ArrayList<>();
+ queues.add(newQueue("queueA"));
+ queues.add(newQueue("queue-topic"));
+ queues.add(newQueue("queueB"));
+
+ newVirtualHost(queues).clearQueues(Collections.emptySet());
+ for (final Queue<?> queue : queues)
+ {
+ Mockito.verify(queue, Mockito.never()).clearQueue();
+ }
+ }
+
+ private AbstractVirtualHost newVirtualHost(List<Queue> queues)
+ {
+ final Map<String, Object> attributes = Collections.singletonMap(AbstractVirtualHost.NAME, getTestName());
+ final MessageStore store = mock(MessageStore.class);
+ return new AbstractVirtualHost(attributes, _node)
+ {
+ @Override
+ protected MessageStore createMessageStore()
+ {
+ return store;
+ }
+
+ @Override
+ public Collection getChildren(Class clazz)
+ {
+ if (clazz == Queue.class)
+ {
+ return queues;
+ }
+ else
+ {
+ return super.getChildren(clazz);
+ }
+
+ }
+ };
+ }
+
+ private Queue<?> newQueue(final String name)
+ {
+ final Queue<?> queue = Mockito.mock(Queue.class);
+ Mockito.doReturn(name).when(queue).getName();
+
+ final UUID uuid = UUID.randomUUID();
+ Mockito.doReturn(uuid).when(queue).getId();
+
+ Mockito.doReturn(Queue.class).when(queue).getCategoryClass();
+ return queue;
+ }
}
diff --git a/broker-plugins/management-http/src/main/java/resources/js/qpid/management/VirtualHost.js b/broker-plugins/management-http/src/main/java/resources/js/qpid/management/VirtualHost.js
index c19a9f8..8d7ef9a 100644
--- a/broker-plugins/management-http/src/main/java/resources/js/qpid/management/VirtualHost.js
+++ b/broker-plugins/management-http/src/main/java/resources/js/qpid/management/VirtualHost.js
@@ -114,6 +114,12 @@ define(["dojo/parser",
"delete", "queue");
});
+ const clearQueueButton = query(".clearQueueButton", containerNode)[0];
+ registry.byNode(clearQueueButton).on("click", function (evt)
+ {
+ that._clearQueues(that.vhostUpdater.queuesGrid);
+ });
+
var addExchangeButton = query(".addExchangeButton", containerNode)[0];
registry.byNode(addExchangeButton).on("click", function (evt)
{
@@ -286,6 +292,35 @@ define(["dojo/parser",
return confirmed;
};
+ VirtualHost.prototype._clearQueues = function (dgrid)
+ {
+ let selected = [];
+ for (let item in dgrid.selection)
+ {
+ if (dgrid.selection.hasOwnProperty(item) && dgrid.selection[item])
+ {
+ selected.push(item);
+ }
+ }
+ if (selected.length > 0)
+ {
+ const plural = selected.length === 1 ? "" : "s";
+ if (confirm(lang.replace("Are you sure you want to purge {0} queue{1}?", [selected.length, plural])))
+ {
+ const modelObj = {
+ type: "virtualhost",
+ name: "clearQueues",
+ parent: this.modelObj
+ };
+ this.management.update(modelObj, {"queues" : selected}).then(lang.hitch(this, function ()
+ {
+ dgrid.clearSelection();
+ this.vhostUpdater.update();
+ }));
+ }
+ }
+ };
+
VirtualHost.prototype.close = function ()
{
updater.remove(this.vhostUpdater);
diff --git a/broker-plugins/management-http/src/main/java/resources/showVirtualHost.html b/broker-plugins/management-http/src/main/java/resources/showVirtualHost.html
index 217d43f..300f862 100644
--- a/broker-plugins/management-http/src/main/java/resources/showVirtualHost.html
+++ b/broker-plugins/management-http/src/main/java/resources/showVirtualHost.html
@@ -127,6 +127,7 @@
<div class="queues"></div>
<button data-dojo-type="dijit.form.Button" class="addQueueButton">Add Queue</button>
<button data-dojo-type="dijit.form.Button" class="deleteQueueButton">Delete Queue</button>
+ <button data-dojo-type="dijit.form.Button" class="clearQueueButton">Clear Queue</button>
</div>
<br/>
<div data-dojo-type="dijit.TitlePane" data-dojo-props="title: 'Connections'" class="virtualHostConnections">
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org