You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2021/08/18 16:33:10 UTC
[activemq-artemis] branch main updated: ARTEMIS-3329 ability to
purge all queues on address
This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 82f0ece ARTEMIS-3329 ability to purge all queues on address
82f0ece is described below
commit 82f0ece67c723a5633cc76990cb342b8d44cefb4
Author: Justin Bertram <jb...@apache.org>
AuthorDate: Fri Jun 4 08:20:31 2021 -0500
ARTEMIS-3329 ability to purge all queues on address
---
.../apache/activemq/artemis/logs/AuditLogger.java | 28 ++++++++++++++++++
.../api/core/management/AddressControl.java | 7 +++++
.../core/management/impl/AddressControlImpl.java | 33 ++++++++++++++++++++--
.../integration/management/AddressControlTest.java | 25 ++++++++++++++++
.../management/AddressControlUsingCoreTest.java | 5 ++++
5 files changed, 96 insertions(+), 2 deletions(-)
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java
index e9a6342..f249c75 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java
@@ -2859,4 +2859,32 @@ public interface AuditLogger extends BasicLogger {
@LogMessage(level = Logger.Level.INFO)
@Message(id = 601749, value = "User {0} is getting activation sequence on target resource: {1} {2}", format = Message.Format.MESSAGE_FORMAT)
void getActivationSequence(String user, Object source, Object... args);
+
+ static void purge(Object source) {
+ RESOURCE_LOGGER.purge(getCaller(), source);
+ }
+
+ @LogMessage(level = Logger.Level.INFO)
+ @Message(id = 601750, value = "User {0} is purging target resource: {1} {2}", format = Message.Format.MESSAGE_FORMAT)
+ void purge(String user, Object source, Object... args);
+
+
+ static void purgeAddressSuccess(String queueName) {
+ RESOURCE_LOGGER.purgeAddressSuccess(getCaller(), queueName);
+ }
+
+ @LogMessage(level = Logger.Level.INFO)
+ @Message(id = 601751, value = "User {0} has purged address {1}", format = Message.Format.MESSAGE_FORMAT)
+ void purgeAddressSuccess(String user, String queueName);
+
+
+ static void purgeAddressFailure(String queueName) {
+ RESOURCE_LOGGER.purgeAddressFailure(getCaller(), queueName);
+ }
+
+ @LogMessage(level = Logger.Level.INFO)
+ @Message(id = 601752, value = "User {0} failed to purge address {1}", format = Message.Format.MESSAGE_FORMAT)
+ void purgeAddressFailure(String user, String queueName);
+
+
}
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressControl.java
index eab512e..361134c 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressControl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressControl.java
@@ -203,4 +203,11 @@ public interface AddressControl {
@Attribute(desc = "whether this address is temporary")
boolean isTemporary();
+ /**
+ * Purge all the queues bound of this address. Returns the total number of messages purged.
+ * @throws java.lang.Exception
+ */
+ @Operation(desc = "Purges the queues bound to this address. Returns the total number of messages purged.", impact = MBeanOperationInfo.ACTION)
+ long purge() throws Exception;
+
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java
index 02601a9..4f65e69 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java
@@ -44,7 +44,9 @@ import org.apache.activemq.artemis.core.security.SecurityStore;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
+import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.server.impl.QueueImpl;
import org.apache.activemq.artemis.core.server.management.ManagementService;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.logs.AuditLogger;
@@ -530,9 +532,36 @@ public class AddressControlImpl extends AbstractControl implements AddressContro
return addressInfo.isTemporary();
}
- // Package protected ---------------------------------------------
+ @Override
+ public long purge() throws Exception {
+ if (AuditLogger.isBaseLoggingEnabled()) {
+ AuditLogger.purge(this.addressInfo);
+ }
+ clearIO();
+ long totalMsgs = 0;
+ try {
+ Bindings bindings = server.getPostOffice().lookupBindingsForAddress(addressInfo.getName());
+ if (bindings != null) {
+ for (Binding binding : bindings.getBindings()) {
+ if (binding instanceof QueueBinding) {
+ totalMsgs += ((QueueBinding) binding).getQueue().deleteMatchingReferences(QueueImpl.DEFAULT_FLUSH_LIMIT, null, AckReason.KILLED);
+ }
+ }
+ }
+ if (AuditLogger.isResourceLoggingEnabled()) {
+ AuditLogger.purgeAddressSuccess(addressInfo.getName().toString());
+ }
+ } catch (Throwable t) {
+ if (AuditLogger.isResourceLoggingEnabled()) {
+ AuditLogger.purgeAddressFailure(addressInfo.getName().toString());
+ }
+ throw new IllegalStateException(t.getMessage());
+ } finally {
+ blockOnIO();
+ }
- // Protected -----------------------------------------------------
+ return totalMsgs;
+ }
// Private -------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java
index 55a9bad..026cd9f 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java
@@ -526,6 +526,31 @@ public class AddressControlTest extends ManagementTestBase {
}
}
+ @Test
+ public void testPurge() throws Exception {
+ SimpleString address = RandomUtil.randomSimpleString();
+ session.createAddress(address, RoutingType.ANYCAST, false);
+
+ AddressControl addressControl = createManagementControl(address);
+ assertEquals(0, addressControl.getMessageCount());
+
+ ClientProducer producer = session.createProducer(address.toString());
+ producer.send(session.createMessage(false));
+ assertEquals(0, addressControl.getMessageCount());
+
+ session.createQueue(new QueueConfiguration(address).setRoutingType(RoutingType.ANYCAST));
+ producer.send(session.createMessage(false));
+ assertTrue(Wait.waitFor(() -> addressControl.getMessageCount() == 1, 2000, 100));
+
+ session.createQueue(new QueueConfiguration(address.concat('2')).setAddress(address).setRoutingType(RoutingType.ANYCAST));
+ producer.send(session.createMessage(false));
+ assertTrue(Wait.waitFor(() -> addressControl.getMessageCount() == 2, 2000, 100));
+
+ assertEquals(2L, addressControl.purge());
+
+ Wait.assertEquals(0L, () -> addressControl.getMessageCount(), 2000, 100);
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlUsingCoreTest.java
index 3b9d802..5d2f5f5 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlUsingCoreTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlUsingCoreTest.java
@@ -174,6 +174,11 @@ public class AddressControlUsingCoreTest extends AddressControlTest {
}
@Override
+ public long purge() throws Exception {
+ return (long) proxy.invokeOperation("purge");
+ }
+
+ @Override
public String sendMessage(Map<String, String> headers,
int type,
String body,