You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2021/08/18 16:33:10 UTC

[activemq-artemis] branch main updated: ARTEMIS-3329 ability to purge all queues on address

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 82f0ece  ARTEMIS-3329 ability to purge all queues on address
82f0ece is described below

commit 82f0ece67c723a5633cc76990cb342b8d44cefb4
Author: Justin Bertram <jb...@apache.org>
AuthorDate: Fri Jun 4 08:20:31 2021 -0500

    ARTEMIS-3329 ability to purge all queues on address
---
 .../apache/activemq/artemis/logs/AuditLogger.java  | 28 ++++++++++++++++++
 .../api/core/management/AddressControl.java        |  7 +++++
 .../core/management/impl/AddressControlImpl.java   | 33 ++++++++++++++++++++--
 .../integration/management/AddressControlTest.java | 25 ++++++++++++++++
 .../management/AddressControlUsingCoreTest.java    |  5 ++++
 5 files changed, 96 insertions(+), 2 deletions(-)

diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java
index e9a6342..f249c75 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java
@@ -2859,4 +2859,32 @@ public interface AuditLogger extends BasicLogger {
    @LogMessage(level = Logger.Level.INFO)
    @Message(id = 601749, value = "User {0} is getting activation sequence on target resource: {1} {2}", format = Message.Format.MESSAGE_FORMAT)
    void getActivationSequence(String user, Object source, Object... args);
+
+   static void purge(Object source) {
+      RESOURCE_LOGGER.purge(getCaller(), source);
+   }
+
+   @LogMessage(level = Logger.Level.INFO)
+   @Message(id = 601750, value = "User {0} is purging target resource: {1} {2}", format = Message.Format.MESSAGE_FORMAT)
+   void purge(String user, Object source, Object... args);
+
+
+   static void purgeAddressSuccess(String queueName) {
+      RESOURCE_LOGGER.purgeAddressSuccess(getCaller(), queueName);
+   }
+
+   @LogMessage(level = Logger.Level.INFO)
+   @Message(id = 601751, value = "User {0} has purged address {1}", format = Message.Format.MESSAGE_FORMAT)
+   void purgeAddressSuccess(String user, String queueName);
+
+
+   static void purgeAddressFailure(String queueName) {
+      RESOURCE_LOGGER.purgeAddressFailure(getCaller(), queueName);
+   }
+
+   @LogMessage(level = Logger.Level.INFO)
+   @Message(id = 601752, value = "User {0} failed to purge address {1}", format = Message.Format.MESSAGE_FORMAT)
+   void purgeAddressFailure(String user, String queueName);
+
+
 }
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressControl.java
index eab512e..361134c 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressControl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressControl.java
@@ -203,4 +203,11 @@ public interface AddressControl {
    @Attribute(desc = "whether this address is temporary")
    boolean isTemporary();
 
+   /**
+    * Purge all the queues bound of this address. Returns the total number of messages purged.
+    * @throws java.lang.Exception
+    */
+   @Operation(desc = "Purges the queues bound to this address. Returns the total number of messages purged.", impact = MBeanOperationInfo.ACTION)
+   long purge() throws Exception;
+
 }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java
index 02601a9..4f65e69 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java
@@ -44,7 +44,9 @@ import org.apache.activemq.artemis.core.security.SecurityStore;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
+import org.apache.activemq.artemis.core.server.impl.AckReason;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.server.impl.QueueImpl;
 import org.apache.activemq.artemis.core.server.management.ManagementService;
 import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
 import org.apache.activemq.artemis.logs.AuditLogger;
@@ -530,9 +532,36 @@ public class AddressControlImpl extends AbstractControl implements AddressContro
       return addressInfo.isTemporary();
    }
 
-   // Package protected ---------------------------------------------
+   @Override
+   public long purge() throws Exception {
+      if (AuditLogger.isBaseLoggingEnabled()) {
+         AuditLogger.purge(this.addressInfo);
+      }
+      clearIO();
+      long totalMsgs = 0;
+      try {
+         Bindings bindings = server.getPostOffice().lookupBindingsForAddress(addressInfo.getName());
+         if (bindings != null) {
+            for (Binding binding : bindings.getBindings()) {
+               if (binding instanceof QueueBinding) {
+                  totalMsgs += ((QueueBinding) binding).getQueue().deleteMatchingReferences(QueueImpl.DEFAULT_FLUSH_LIMIT, null, AckReason.KILLED);
+               }
+            }
+         }
+         if (AuditLogger.isResourceLoggingEnabled()) {
+            AuditLogger.purgeAddressSuccess(addressInfo.getName().toString());
+         }
+      } catch (Throwable t) {
+         if (AuditLogger.isResourceLoggingEnabled()) {
+            AuditLogger.purgeAddressFailure(addressInfo.getName().toString());
+         }
+         throw new IllegalStateException(t.getMessage());
+      } finally {
+         blockOnIO();
+      }
 
-   // Protected -----------------------------------------------------
+      return totalMsgs;
+   }
 
    // Private -------------------------------------------------------
 
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java
index 55a9bad..026cd9f 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java
@@ -526,6 +526,31 @@ public class AddressControlTest extends ManagementTestBase {
       }
    }
 
+   @Test
+   public void testPurge() throws Exception {
+      SimpleString address = RandomUtil.randomSimpleString();
+      session.createAddress(address, RoutingType.ANYCAST, false);
+
+      AddressControl addressControl = createManagementControl(address);
+      assertEquals(0, addressControl.getMessageCount());
+
+      ClientProducer producer = session.createProducer(address.toString());
+      producer.send(session.createMessage(false));
+      assertEquals(0, addressControl.getMessageCount());
+
+      session.createQueue(new QueueConfiguration(address).setRoutingType(RoutingType.ANYCAST));
+      producer.send(session.createMessage(false));
+      assertTrue(Wait.waitFor(() -> addressControl.getMessageCount() == 1, 2000, 100));
+
+      session.createQueue(new QueueConfiguration(address.concat('2')).setAddress(address).setRoutingType(RoutingType.ANYCAST));
+      producer.send(session.createMessage(false));
+      assertTrue(Wait.waitFor(() -> addressControl.getMessageCount() == 2, 2000, 100));
+
+      assertEquals(2L, addressControl.purge());
+
+      Wait.assertEquals(0L, () -> addressControl.getMessageCount(), 2000, 100);
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlUsingCoreTest.java
index 3b9d802..5d2f5f5 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlUsingCoreTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlUsingCoreTest.java
@@ -174,6 +174,11 @@ public class AddressControlUsingCoreTest extends AddressControlTest {
          }
 
          @Override
+         public long purge() throws Exception {
+            return (long) proxy.invokeOperation("purge");
+         }
+
+         @Override
          public String sendMessage(Map<String, String> headers,
                                    int type,
                                    String body,