You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2023/01/13 14:23:59 UTC

[activemq-artemis] 02/03: ARTEMIS-4116 Management lock to avoid multiple long running tasks running from user requests

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit 4550fcf47c8c87d4e7cc0c06207a09c7de41dc65
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Thu Dec 15 13:55:03 2022 -0500

    ARTEMIS-4116 Management lock to avoid multiple long running tasks running from user requests
---
 .../api/core/management/ActiveMQServerControl.java |   4 +-
 .../management/impl/ActiveMQServerControlImpl.java | 450 +++++++-------
 .../core/management/impl/AddressControlImpl.java   | 128 ++--
 .../core/management/impl/QueueControlImpl.java     | 671 +++++++++++----------
 .../core/server/impl/ActiveMQServerImpl.java       |  11 +-
 .../artemis/core/server/replay/ReplayManager.java  |  18 +-
 6 files changed, 679 insertions(+), 603 deletions(-)

diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
index 248f44d4a6..2229abfee5 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
@@ -526,7 +526,7 @@ public interface ActiveMQServerControl {
 
    // Operations ----------------------------------------------------
    @Operation(desc = "Isolate the broker", impact = MBeanOperationInfo.ACTION)
-   boolean freezeReplication();
+   boolean freezeReplication() throws Exception;
 
    @Operation(desc = "Create an address", impact = MBeanOperationInfo.ACTION)
    String createAddress(@Parameter(name = "name", desc = "The name of the address") String name,
@@ -1905,7 +1905,7 @@ public interface ActiveMQServerControl {
     * Returns the names of the queues created on this server with the given routing-type.
     */
    @Operation(desc = "Names of the queues created on this server with the given routing-type (i.e. ANYCAST or MULTICAST)", impact = MBeanOperationInfo.INFO)
-   String[] getQueueNames(@Parameter(name = "routingType", desc = "The routing type, MULTICAST or ANYCAST") String routingType);
+   String[] getQueueNames(@Parameter(name = "routingType", desc = "The routing type, MULTICAST or ANYCAST") String routingType) throws Exception;
 
    /**
     * Returns the names of the cluster-connections deployed on this server.
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
index 8e5ce81e08..25deb59e09 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
@@ -826,17 +826,19 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
    }
 
    @Override
-   public boolean freezeReplication() {
+   public boolean freezeReplication() throws Exception {
       if (AuditLogger.isBaseLoggingEnabled()) {
          AuditLogger.freezeReplication(this.server);
       }
-      Activation activation = server.getActivation();
-      if (activation instanceof SharedNothingLiveActivation) {
-         SharedNothingLiveActivation liveActivation = (SharedNothingLiveActivation) activation;
-         liveActivation.freezeReplication();
-         return true;
+      try (AutoCloseable lock = server.managementLock()) {
+         Activation activation = server.getActivation();
+         if (activation instanceof SharedNothingLiveActivation) {
+            SharedNothingLiveActivation liveActivation = (SharedNothingLiveActivation) activation;
+            liveActivation.freezeReplication();
+            return true;
+         }
+         return false;
       }
-      return false;
    }
 
    private enum AddressInfoTextFormatter {
@@ -966,21 +968,25 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
 
    @Override
    public void deleteAddress(String name, boolean force) throws Exception {
-      checkStarted();
 
-      clearIO();
-      try {
-         server.removeAddressInfo(new SimpleString(name), null, force);
-         if (AuditLogger.isResourceLoggingEnabled()) {
-            AuditLogger.deleteAddressSuccess(name);
-         }
-      } catch (ActiveMQException e) {
-         if (AuditLogger.isResourceLoggingEnabled()) {
-            AuditLogger.deleteAddressFailure(name);
+      // delete might be a long running task, we ensure only one large task running
+      try (AutoCloseable lock = server.managementLock()) {
+         checkStarted();
+
+         clearIO();
+         try {
+            server.removeAddressInfo(new SimpleString(name), null, force);
+            if (AuditLogger.isResourceLoggingEnabled()) {
+               AuditLogger.deleteAddressSuccess(name);
+            }
+         } catch (ActiveMQException e) {
+            if (AuditLogger.isResourceLoggingEnabled()) {
+               AuditLogger.deleteAddressFailure(name);
+            }
+            throw new IllegalStateException(e.getMessage());
+         } finally {
+            blockOnIO();
          }
-         throw new IllegalStateException(e.getMessage());
-      } finally {
-         blockOnIO();
       }
    }
 
@@ -1003,10 +1009,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
 
       clearIO();
       try {
-         server.createQueue(new QueueConfiguration(name)
-                            .setAddress(address)
-                            .setFilterString(filterStr)
-                            .setDurable(durable));
+         server.createQueue(new QueueConfiguration(name).setAddress(address).setFilterString(filterStr).setDurable(durable));
       } finally {
          blockOnIO();
       }
@@ -1232,10 +1235,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
                              boolean autoCreateAddress,
                              long ringSize) throws Exception {
       if (AuditLogger.isBaseLoggingEnabled()) {
-         AuditLogger.createQueue(this.server, null, null, address, routingType, name, filterStr, durable,
-                  maxConsumers, purgeOnNoConsumers, exclusive, groupRebalance, groupBuckets, groupFirstKey,
-                  lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch,
-                  autoDelete, autoDeleteDelay, autoDeleteMessageCount, autoCreateAddress, ringSize);
+         AuditLogger.createQueue(this.server, null, null, address, routingType, name, filterStr, durable, maxConsumers, purgeOnNoConsumers, exclusive, groupRebalance, groupBuckets, groupFirstKey, lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, autoDelete, autoDeleteDelay, autoDeleteMessageCount, autoCreateAddress, ringSize);
       }
       checkStarted();
 
@@ -1247,34 +1247,14 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
             filter = new SimpleString(filterStr);
          }
 
-         final Queue queue = server.createQueue(new QueueConfiguration(name)
-                                                   .setAddress(address)
-                                                   .setRoutingType(RoutingType.valueOf(routingType.toUpperCase()))
-                                                   .setFilterString(filter)
-                                                   .setDurable(durable)
-                                                   .setMaxConsumers(maxConsumers)
-                                                   .setPurgeOnNoConsumers(purgeOnNoConsumers)
-                                                   .setExclusive(exclusive)
-                                                   .setGroupRebalance(groupRebalance)
-                                                   .setGroupBuckets(groupBuckets)
-                                                   .setGroupFirstKey(groupFirstKey)
-                                                   .setLastValue(lastValue)
-                                                   .setLastValueKey(lastValueKey)
-                                                   .setNonDestructive(nonDestructive)
-                                                   .setConsumersBeforeDispatch(consumersBeforeDispatch)
-                                                   .setDelayBeforeDispatch(delayBeforeDispatch)
-                                                   .setAutoDelete(autoDelete)
-                                                   .setAutoDeleteDelay(autoDeleteDelay)
-                                                   .setAutoDeleteMessageCount(autoDeleteMessageCount)
-                                                   .setAutoCreateAddress(autoCreateAddress)
-                                                   .setRingSize(ringSize));
+         final Queue queue = server.createQueue(new QueueConfiguration(name).setAddress(address).setRoutingType(RoutingType.valueOf(routingType.toUpperCase())).setFilterString(filter).setDurable(durable).setMaxConsumers(maxConsumers).setPurgeOnNoConsumers(purgeOnNoConsumers).setExclusive(exclusive).setGroupRebalance(groupRebalance).setGroupBuckets(groupBuckets).setGroupFirstKey(groupFirstKey).setLastValue(lastValue).setLastValueKey(lastValueKey).setNonDestructive(nonDestructive).setConsu [...]
          if (AuditLogger.isResourceLoggingEnabled()) {
-            AuditLogger.createQueueSuccess( name, address, routingType);
+            AuditLogger.createQueueSuccess(name, address, routingType);
          }
          return QueueTextFormatter.Long.format(queue, new StringBuilder()).toString();
       } catch (ActiveMQException e) {
          if (AuditLogger.isResourceLoggingEnabled()) {
-            AuditLogger.createQueueFailure( name, address, routingType);
+            AuditLogger.createQueueFailure(name, address, routingType);
          }
          throw new IllegalStateException(e.getMessage());
       } finally {
@@ -1313,7 +1293,6 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
 
    @Override
    public String updateQueue(String queueConfigurationAsJson) throws Exception {
-
       if (AuditLogger.isBaseLoggingEnabled()) {
          AuditLogger.updateQueue(this.server, queueConfigurationAsJson);
       }
@@ -1414,8 +1393,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
                              String user,
                              Long ringSize) throws Exception {
       if (AuditLogger.isBaseLoggingEnabled()) {
-         AuditLogger.updateQueue(this.server, name, routingType, filter, maxConsumers, purgeOnNoConsumers,
-                  exclusive, groupRebalance, groupBuckets, groupFirstKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, user, ringSize);
+         AuditLogger.updateQueue(this.server, name, routingType, filter, maxConsumers, purgeOnNoConsumers, exclusive, groupRebalance, groupBuckets, groupFirstKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, user, ringSize);
       }
       checkStarted();
 
@@ -1592,27 +1570,30 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
 
    @Override
    public void destroyQueue(final String name, final boolean removeConsumers, final boolean forceAutoDeleteAddress) throws Exception {
-      if (AuditLogger.isBaseLoggingEnabled()) {
-         AuditLogger.destroyQueue(this.server, null, null, name, removeConsumers, forceAutoDeleteAddress);
-      }
-      checkStarted();
+      // destroy might be a long running task, we prevent multiple running tasks in this case
+      try (AutoCloseable lock = server.managementLock()) {
+         if (AuditLogger.isBaseLoggingEnabled()) {
+            AuditLogger.destroyQueue(this.server, null, null, name, removeConsumers, forceAutoDeleteAddress);
+         }
+         checkStarted();
 
-      clearIO();
-      try {
-         SimpleString queueName = new SimpleString(name);
+         clearIO();
          try {
-            server.destroyQueue(queueName, null, !removeConsumers, removeConsumers, forceAutoDeleteAddress);
-         } catch (Exception e) {
+            SimpleString queueName = new SimpleString(name);
+            try {
+               server.destroyQueue(queueName, null, !removeConsumers, removeConsumers, forceAutoDeleteAddress);
+            } catch (Exception e) {
+               if (AuditLogger.isResourceLoggingEnabled()) {
+                  AuditLogger.destroyQueueFailure(name);
+               }
+               throw e;
+            }
             if (AuditLogger.isResourceLoggingEnabled()) {
-               AuditLogger.destroyQueueFailure(name);
+               AuditLogger.destroyQueueSuccess(name);
             }
-            throw e;
-         }
-         if (AuditLogger.isResourceLoggingEnabled()) {
-            AuditLogger.destroyQueueSuccess(name);
+         } finally {
+            blockOnIO();
          }
-      } finally {
-         blockOnIO();
       }
    }
 
@@ -2149,56 +2130,64 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
 
    @Override
    public synchronized boolean commitPreparedTransaction(final String transactionAsBase64) throws Exception {
-      if (AuditLogger.isBaseLoggingEnabled()) {
-         AuditLogger.commitPreparedTransaction(this.server, transactionAsBase64);
-      }
-      checkStarted();
-
-      clearIO();
-      try {
-         List<Xid> xids = resourceManager.getPreparedTransactions();
+      // commit might be a long running task if dealing with a large transaction
+      // ensuring a single one just in case
+      try (AutoCloseable lock = server.managementLock()) {
+         if (AuditLogger.isBaseLoggingEnabled()) {
+            AuditLogger.commitPreparedTransaction(this.server, transactionAsBase64);
+         }
+         checkStarted();
 
-         for (Xid xid : xids) {
-            if (XidImpl.toBase64String(xid).equals(transactionAsBase64)) {
-               Transaction transaction = resourceManager.removeTransaction(xid, null);
-               transaction.commit(false);
-               long recordID = server.getStorageManager().storeHeuristicCompletion(xid, true);
-               storageManager.waitOnOperations();
-               resourceManager.putHeuristicCompletion(recordID, xid, true);
-               return true;
+         clearIO();
+         try {
+            List<Xid> xids = resourceManager.getPreparedTransactions();
+
+            for (Xid xid : xids) {
+               if (XidImpl.toBase64String(xid).equals(transactionAsBase64)) {
+                  Transaction transaction = resourceManager.removeTransaction(xid, null);
+                  transaction.commit(false);
+                  long recordID = server.getStorageManager().storeHeuristicCompletion(xid, true);
+                  storageManager.waitOnOperations();
+                  resourceManager.putHeuristicCompletion(recordID, xid, true);
+                  return true;
+               }
             }
+            return false;
+         } finally {
+            blockOnIO();
          }
-         return false;
-      } finally {
-         blockOnIO();
       }
    }
 
    @Override
    public synchronized boolean rollbackPreparedTransaction(final String transactionAsBase64) throws Exception {
-      if (AuditLogger.isBaseLoggingEnabled()) {
-         AuditLogger.rollbackPreparedTransaction(this.server, transactionAsBase64);
-      }
-      checkStarted();
+      // rollback might be a long running task if dealing with a large transaction
+      // ensuring a single task just in case
+      try (AutoCloseable lock = server.managementLock()) {
+         if (AuditLogger.isBaseLoggingEnabled()) {
+            AuditLogger.rollbackPreparedTransaction(this.server, transactionAsBase64);
+         }
+         checkStarted();
 
-      clearIO();
-      try {
+         clearIO();
+         try {
 
-         List<Xid> xids = resourceManager.getPreparedTransactions();
+            List<Xid> xids = resourceManager.getPreparedTransactions();
 
-         for (Xid xid : xids) {
-            if (XidImpl.toBase64String(xid).equals(transactionAsBase64)) {
-               Transaction transaction = resourceManager.removeTransaction(xid, null);
-               transaction.rollback();
-               long recordID = server.getStorageManager().storeHeuristicCompletion(xid, false);
-               server.getStorageManager().waitOnOperations();
-               resourceManager.putHeuristicCompletion(recordID, xid, false);
-               return true;
+            for (Xid xid : xids) {
+               if (XidImpl.toBase64String(xid).equals(transactionAsBase64)) {
+                  Transaction transaction = resourceManager.removeTransaction(xid, null);
+                  transaction.rollback();
+                  long recordID = server.getStorageManager().storeHeuristicCompletion(xid, false);
+                  server.getStorageManager().waitOnOperations();
+                  resourceManager.putHeuristicCompletion(recordID, xid, false);
+                  return true;
+               }
             }
+            return false;
+         } finally {
+            blockOnIO();
          }
-         return false;
-      } finally {
-         blockOnIO();
       }
    }
 
@@ -2278,149 +2267,170 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
 
    @Override
    public boolean closeConsumerConnectionsForAddress(final String address) {
-      if (AuditLogger.isBaseLoggingEnabled()) {
-         AuditLogger.closeConsumerConnectionsForAddress(this.server, address);
-      }
-      boolean closed = false;
-      checkStarted();
+      // this could be a long running task, ensuring a single task
+      try (AutoCloseable lock = server.managementLock()) {
+         if (AuditLogger.isBaseLoggingEnabled()) {
+            AuditLogger.closeConsumerConnectionsForAddress(this.server, address);
+         }
+         boolean closed = false;
+         checkStarted();
 
-      clearIO();
-      try {
-         for (Binding binding : postOffice.getMatchingBindings(SimpleString.toSimpleString(address))) {
-            if (binding instanceof LocalQueueBinding) {
-               Queue queue = ((LocalQueueBinding) binding).getQueue();
-               for (Consumer consumer : queue.getConsumers()) {
-                  if (consumer instanceof ServerConsumer) {
-                     ServerConsumer serverConsumer = (ServerConsumer) consumer;
-                     RemotingConnection connection = null;
-
-                     for (RemotingConnection potentialConnection : remotingService.getConnections()) {
-                        if (potentialConnection.getID().toString().equals(serverConsumer.getConnectionID())) {
-                           connection = potentialConnection;
+         clearIO();
+         try {
+            for (Binding binding : postOffice.getMatchingBindings(SimpleString.toSimpleString(address))) {
+               if (binding instanceof LocalQueueBinding) {
+                  Queue queue = ((LocalQueueBinding) binding).getQueue();
+                  for (Consumer consumer : queue.getConsumers()) {
+                     if (consumer instanceof ServerConsumer) {
+                        ServerConsumer serverConsumer = (ServerConsumer) consumer;
+                        RemotingConnection connection = null;
+
+                        for (RemotingConnection potentialConnection : remotingService.getConnections()) {
+                           if (potentialConnection.getID().toString().equals(serverConsumer.getConnectionID())) {
+                              connection = potentialConnection;
+                           }
                         }
-                     }
 
-                     if (connection != null) {
-                        remotingService.removeConnection(connection.getID());
-                        connection.fail(ActiveMQMessageBundle.BUNDLE.consumerConnectionsClosedByManagement(address));
-                        closed = true;
+                        if (connection != null) {
+                           remotingService.removeConnection(connection.getID());
+                           connection.fail(ActiveMQMessageBundle.BUNDLE.consumerConnectionsClosedByManagement(address));
+                           closed = true;
+                        }
                      }
                   }
                }
             }
+         } catch (Exception e) {
+            ActiveMQServerLogger.LOGGER.failedToCloseConsumerConnectionsForAddress(address, e);
+         } finally {
+            blockOnIO();
          }
-      } catch (Exception e) {
-         ActiveMQServerLogger.LOGGER.failedToCloseConsumerConnectionsForAddress(address, e);
-      } finally {
-         blockOnIO();
+         return closed;
+      } catch (Throwable e) {
+         throw new RuntimeException(e.getMessage(), e);
       }
-      return closed;
    }
 
    @Override
    public boolean closeConnectionsForUser(final String userName) {
-      if (AuditLogger.isBaseLoggingEnabled()) {
-         AuditLogger.closeConnectionsForUser(this.server, userName);
-      }
-      boolean closed = false;
-      checkStarted();
+      // possibly a long running task
+      try (AutoCloseable lock = server.managementLock()) {
+         if (AuditLogger.isBaseLoggingEnabled()) {
+            AuditLogger.closeConnectionsForUser(this.server, userName);
+         }
+         boolean closed = false;
+         checkStarted();
 
-      clearIO();
-      try {
-         for (ServerSession serverSession : server.getSessions()) {
-            if (serverSession.getUsername() != null && serverSession.getUsername().equals(userName)) {
-               RemotingConnection connection = null;
+         clearIO();
+         try {
+            for (ServerSession serverSession : server.getSessions()) {
+               if (serverSession.getUsername() != null && serverSession.getUsername().equals(userName)) {
+                  RemotingConnection connection = null;
 
-               for (RemotingConnection potentialConnection : remotingService.getConnections()) {
-                  if (potentialConnection.getID().toString().equals(serverSession.getConnectionID().toString())) {
-                     connection = potentialConnection;
+                  for (RemotingConnection potentialConnection : remotingService.getConnections()) {
+                     if (potentialConnection.getID().toString().equals(serverSession.getConnectionID().toString())) {
+                        connection = potentialConnection;
+                     }
                   }
-               }
 
-               if (connection != null) {
-                  remotingService.removeConnection(connection.getID());
-                  connection.fail(ActiveMQMessageBundle.BUNDLE.connectionsForUserClosedByManagement(userName));
-                  closed = true;
+                  if (connection != null) {
+                     remotingService.removeConnection(connection.getID());
+                     connection.fail(ActiveMQMessageBundle.BUNDLE.connectionsForUserClosedByManagement(userName));
+                     closed = true;
+                  }
                }
             }
+         } finally {
+            blockOnIO();
          }
-      } finally {
-         blockOnIO();
+         return closed;
+      } catch (Throwable e) {
+         throw new RuntimeException(e.getMessage(), e);
       }
-      return closed;
    }
 
    @Override
    public boolean closeConnectionWithID(final String ID) {
-      if (AuditLogger.isBaseLoggingEnabled()) {
-         AuditLogger.closeConnectionWithID(this.server, ID);
-      }
-      checkStarted();
+      // possibly a long running task
+      try (AutoCloseable lock = server.managementLock()) {
+         if (AuditLogger.isBaseLoggingEnabled()) {
+            AuditLogger.closeConnectionWithID(this.server, ID);
+         }
+         checkStarted();
 
-      clearIO();
-      try {
-         for (RemotingConnection connection : remotingService.getConnections()) {
-            if (connection.getID().toString().equals(ID)) {
-               remotingService.removeConnection(connection.getID());
-               connection.fail(ActiveMQMessageBundle.BUNDLE.connectionWithIDClosedByManagement(ID));
-               return true;
+         clearIO();
+         try {
+            for (RemotingConnection connection : remotingService.getConnections()) {
+               if (connection.getID().toString().equals(ID)) {
+                  remotingService.removeConnection(connection.getID());
+                  connection.fail(ActiveMQMessageBundle.BUNDLE.connectionWithIDClosedByManagement(ID));
+                  return true;
+               }
             }
+         } finally {
+            blockOnIO();
          }
-      } finally {
-         blockOnIO();
+         return false;
+      } catch (Throwable e) {
+         throw new RuntimeException(e.getMessage(), e);
       }
-      return false;
    }
 
    @Override
    public boolean closeSessionWithID(final String connectionID, final String ID) throws Exception {
-      if (AuditLogger.isBaseLoggingEnabled()) {
-         AuditLogger.closeSessionWithID(this.server, connectionID, ID);
-      }
-      checkStarted();
+      // possibly a long running task
+      try (AutoCloseable lock = server.managementLock()) {
+         if (AuditLogger.isBaseLoggingEnabled()) {
+            AuditLogger.closeSessionWithID(this.server, connectionID, ID);
+         }
+         checkStarted();
 
-      clearIO();
-      try {
-         List<ServerSession> sessions = server.getSessions(connectionID);
-         for (ServerSession session : sessions) {
-            if (session.getName().equals(ID.toString())) {
-               session.close(true);
-               return true;
+         clearIO();
+         try {
+            List<ServerSession> sessions = server.getSessions(connectionID);
+            for (ServerSession session : sessions) {
+               if (session.getName().equals(ID.toString())) {
+                  session.close(true);
+                  return true;
+               }
             }
-         }
 
-      } finally {
-         blockOnIO();
+         } finally {
+            blockOnIO();
+         }
+         return false;
       }
-      return false;
    }
 
    @Override
    public boolean closeConsumerWithID(final String sessionID, final String ID) throws Exception {
-      if (AuditLogger.isBaseLoggingEnabled()) {
-         AuditLogger.closeConsumerWithID(this.server, sessionID, ID);
-      }
-      checkStarted();
+      // possibly a long running task
+      try (AutoCloseable lock = server.managementLock()) {
+         if (AuditLogger.isBaseLoggingEnabled()) {
+            AuditLogger.closeConsumerWithID(this.server, sessionID, ID);
+         }
+         checkStarted();
 
-      clearIO();
-      try {
-         Set<ServerSession> sessions = server.getSessions();
-         for (ServerSession session : sessions) {
-            if (session.getName().equals(sessionID.toString())) {
-               Set<ServerConsumer> serverConsumers = session.getServerConsumers();
-               for (ServerConsumer serverConsumer : serverConsumers) {
-                  if (serverConsumer.sequentialID() == Long.valueOf(ID)) {
-                     serverConsumer.disconnect();
-                     return true;
+         clearIO();
+         try {
+            Set<ServerSession> sessions = server.getSessions();
+            for (ServerSession session : sessions) {
+               if (session.getName().equals(sessionID.toString())) {
+                  Set<ServerConsumer> serverConsumers = session.getServerConsumers();
+                  for (ServerConsumer serverConsumer : serverConsumers) {
+                     if (serverConsumer.sequentialID() == Long.valueOf(ID)) {
+                        serverConsumer.disconnect();
+                        return true;
+                     }
                   }
                }
             }
-         }
 
-      } finally {
-         blockOnIO();
+         } finally {
+            blockOnIO();
+         }
+         return false;
       }
-      return false;
    }
 
    @Override
@@ -4116,27 +4126,29 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
 
    @Override
    public void scaleDown(String connector) throws Exception {
-      if (AuditLogger.isBaseLoggingEnabled()) {
-         AuditLogger.scaleDown(this.server, connector);
-      }
-      checkStarted();
+      try (AutoCloseable lock = server.managementLock()) {
+         if (AuditLogger.isBaseLoggingEnabled()) {
+            AuditLogger.scaleDown(this.server, connector);
+         }
+         checkStarted();
 
-      clearIO();
-      HAPolicy haPolicy = server.getHAPolicy();
-      if (haPolicy instanceof LiveOnlyPolicy) {
-         LiveOnlyPolicy liveOnlyPolicy = (LiveOnlyPolicy) haPolicy;
+         clearIO();
+         HAPolicy haPolicy = server.getHAPolicy();
+         if (haPolicy instanceof LiveOnlyPolicy) {
+            LiveOnlyPolicy liveOnlyPolicy = (LiveOnlyPolicy) haPolicy;
 
-         if (liveOnlyPolicy.getScaleDownPolicy() == null) {
-            liveOnlyPolicy.setScaleDownPolicy(new ScaleDownPolicy());
-         }
+            if (liveOnlyPolicy.getScaleDownPolicy() == null) {
+               liveOnlyPolicy.setScaleDownPolicy(new ScaleDownPolicy());
+            }
 
-         liveOnlyPolicy.getScaleDownPolicy().setEnabled(true);
+            liveOnlyPolicy.getScaleDownPolicy().setEnabled(true);
 
-         if (connector != null) {
-            liveOnlyPolicy.getScaleDownPolicy().getConnectors().add(0, connector);
-         }
+            if (connector != null) {
+               liveOnlyPolicy.getScaleDownPolicy().getConnectors().add(0, connector);
+            }
 
-         server.fail(true);
+            server.fail(true);
+         }
       }
    }
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java
index d88051e392..9044936d63 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java
@@ -431,10 +431,15 @@ public class AddressControlImpl extends AbstractControl implements AddressContro
 
    @Override
    public long getMessageCount() {
-      if (AuditLogger.isBaseLoggingEnabled()) {
-         AuditLogger.getMessageCount(this.addressInfo);
+      // prevent parallel tasks running
+      try (AutoCloseable lock = server.managementLock()) {
+         if (AuditLogger.isBaseLoggingEnabled()) {
+            AuditLogger.getMessageCount(this.addressInfo);
+         }
+         return getMessageCount(DurabilityType.ALL);
+      } catch (Exception e) {
+         throw new RuntimeException(e.getMessage(), e);
       }
-      return getMessageCount(DurabilityType.ALL);
    }
 
    @Override
@@ -472,14 +477,17 @@ public class AddressControlImpl extends AbstractControl implements AddressContro
                              final String user,
                              final String password,
                              boolean createMessageId) throws Exception {
-      if (AuditLogger.isBaseLoggingEnabled()) {
-         AuditLogger.sendMessageThroughManagement(this, headers, type, body, durable, user, "****");
-      }
-      try {
-         return sendMessage(addressInfo.getName(), server, headers, type, body, durable, user, password, createMessageId);
-      } catch (Exception e) {
-         e.printStackTrace();
-         throw new IllegalStateException(e.getMessage());
+      // prevent parallel tasks running
+      try (AutoCloseable lock = server.managementLock()) {
+         if (AuditLogger.isBaseLoggingEnabled()) {
+            AuditLogger.sendMessageThroughManagement(this, headers, type, body, durable, user, "****");
+         }
+         try {
+            return sendMessage(addressInfo.getName(), server, headers, type, body, durable, user, password, createMessageId);
+         } catch (Exception e) {
+            e.printStackTrace();
+            throw new IllegalStateException(e.getMessage());
+         }
       }
    }
 
@@ -585,20 +593,25 @@ public class AddressControlImpl extends AbstractControl implements AddressContro
 
    @Override
    public boolean clearDuplicateIdCache() {
-      if (AuditLogger.isBaseLoggingEnabled()) {
-         AuditLogger.clearDuplicateIdCache(this.addressInfo);
-      }
-      DuplicateIDCache cache = ((PostOfficeImpl)server.getPostOffice()).getDuplicateIDCaches().get(addressInfo.getName());
-      try {
-         if (cache != null) {
-            cache.clear();
-            return true;
+      // prevent parallel tasks running
+      try (AutoCloseable lock = server.managementLock()) {
+         if (AuditLogger.isBaseLoggingEnabled()) {
+            AuditLogger.clearDuplicateIdCache(this.addressInfo);
+         }
+         DuplicateIDCache cache = ((PostOfficeImpl) server.getPostOffice()).getDuplicateIDCaches().get(addressInfo.getName());
+         try {
+            if (cache != null) {
+               cache.clear();
+               return true;
+            }
+         } catch (Exception e) {
+            logger.debug("Failed to clear duplicate ID cache", e);
          }
+
+         return false;
       } catch (Exception e) {
-         logger.debug("Failed to clear duplicate ID cache", e);
+         throw new RuntimeException(e.getMessage(), e);
       }
-
-      return false;
    }
 
    @Override
@@ -627,37 +640,41 @@ public class AddressControlImpl extends AbstractControl implements AddressContro
 
    @Override
    public long purge() throws Exception {
-      if (AuditLogger.isBaseLoggingEnabled()) {
-         AuditLogger.purge(this.addressInfo);
-      }
-      clearIO();
-      long totalMsgs = 0;
-      try {
-         Bindings bindings = server.getPostOffice().lookupBindingsForAddress(addressInfo.getName());
-         if (bindings != null) {
-            for (Binding binding : bindings.getBindings()) {
-               if (binding instanceof QueueBinding) {
-                  totalMsgs += ((QueueBinding) binding).getQueue().deleteMatchingReferences(QueueImpl.DEFAULT_FLUSH_LIMIT, null, AckReason.KILLED);
+      // prevent parallel tasks running
+      try (AutoCloseable lock = server.managementLock()) {
+         if (AuditLogger.isBaseLoggingEnabled()) {
+            AuditLogger.purge(this.addressInfo);
+         }
+         clearIO();
+         long totalMsgs = 0;
+         try {
+            Bindings bindings = server.getPostOffice().lookupBindingsForAddress(addressInfo.getName());
+            if (bindings != null) {
+               for (Binding binding : bindings.getBindings()) {
+                  if (binding instanceof QueueBinding) {
+                     totalMsgs += ((QueueBinding) binding).getQueue().deleteMatchingReferences(QueueImpl.DEFAULT_FLUSH_LIMIT, null, AckReason.KILLED);
+                  }
                }
             }
+            if (AuditLogger.isResourceLoggingEnabled()) {
+               AuditLogger.purgeAddressSuccess(addressInfo.getName().toString());
+            }
+         } catch (Throwable t) {
+            if (AuditLogger.isResourceLoggingEnabled()) {
+               AuditLogger.purgeAddressFailure(addressInfo.getName().toString());
+            }
+            throw new IllegalStateException(t.getMessage());
+         } finally {
+            blockOnIO();
          }
-         if (AuditLogger.isResourceLoggingEnabled()) {
-            AuditLogger.purgeAddressSuccess(addressInfo.getName().toString());
-         }
-      } catch (Throwable t) {
-         if (AuditLogger.isResourceLoggingEnabled()) {
-            AuditLogger.purgeAddressFailure(addressInfo.getName().toString());
-         }
-         throw new IllegalStateException(t.getMessage());
-      } finally {
-         blockOnIO();
-      }
 
-      return totalMsgs;
+         return totalMsgs;
+      }
    }
 
    @Override
    public void replay(String target, String filter) throws Exception {
+      // server.replay is already calling the managementLock, no need to do it here
       server.replay(null, null, this.getAddress(), target, filter);
    }
 
@@ -669,22 +686,25 @@ public class AddressControlImpl extends AbstractControl implements AddressContro
       Date startScanDate = format.parse(startScan);
       Date endScanDate = format.parse(endScan);
 
+      // server.replay is already calling the managementLock, no need to do it here
       server.replay(startScanDate, endScanDate, this.getAddress(), target, filter);
    }
 
 
    private long getMessageCount(final DurabilityType durability) {
-      long count = 0;
-      for (String queueName : getQueueNames()) {
-         Queue queue = server.locateQueue(queueName);
-         if (queue != null &&
-            (durability == DurabilityType.ALL ||
-            (durability == DurabilityType.DURABLE && queue.isDurable()) ||
-            (durability == DurabilityType.NON_DURABLE && !queue.isDurable()))) {
-            count += queue.getMessageCount();
+      // prevent parallel tasks running
+      try (AutoCloseable lock = server.managementLock()) {
+         long count = 0;
+         for (String queueName : getQueueNames()) {
+            Queue queue = server.locateQueue(queueName);
+            if (queue != null && (durability == DurabilityType.ALL || (durability == DurabilityType.DURABLE && queue.isDurable()) || (durability == DurabilityType.NON_DURABLE && !queue.isDurable()))) {
+               count += queue.getMessageCount();
+            }
          }
+         return count;
+      } catch (Exception e) {
+         throw new RuntimeException(e.getMessage(), e);
       }
-      return count;
    }
 
    private void checkStarted() {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
index 1fbefc3847..9bced82671 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
@@ -740,32 +740,38 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
 
    @Override
    public Map<String, Object>[] listScheduledMessages() throws Exception {
-      if (AuditLogger.isBaseLoggingEnabled()) {
-         AuditLogger.listScheduledMessages(queue);
-      }
-      checkStarted();
+      // it could be a long running task
+      try (AutoCloseable lock = server.managementLock()) {
+         if (AuditLogger.isBaseLoggingEnabled()) {
+            AuditLogger.listScheduledMessages(queue);
+         }
+         checkStarted();
 
-      clearIO();
-      try {
-         List<MessageReference> refs = queue.getScheduledMessages();
-         return convertMessagesToMaps(refs);
-      } finally {
-         blockOnIO();
+         clearIO();
+         try {
+            List<MessageReference> refs = queue.getScheduledMessages();
+            return convertMessagesToMaps(refs);
+         } finally {
+            blockOnIO();
+         }
       }
    }
 
    @Override
    public String listScheduledMessagesAsJSON() throws Exception {
-      if (AuditLogger.isBaseLoggingEnabled()) {
-         AuditLogger.listScheduledMessagesAsJSON(queue);
-      }
-      checkStarted();
+      // it could be a long running task
+      try (AutoCloseable lock = server.managementLock()) {
+         if (AuditLogger.isBaseLoggingEnabled()) {
+            AuditLogger.listScheduledMessagesAsJSON(queue);
+         }
+         checkStarted();
 
-      clearIO();
-      try {
-         return QueueControlImpl.toJSON(listScheduledMessages());
-      } finally {
-         blockOnIO();
+         clearIO();
+         try {
+            return QueueControlImpl.toJSON(listScheduledMessages());
+         } finally {
+            blockOnIO();
+         }
       }
    }
 
@@ -969,33 +975,36 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
    }
 
    private Map<String, Long> internalCountMessages(final String filterStr, final String groupByPropertyStr) throws Exception {
-      checkStarted();
+      //  long running task
+      try (AutoCloseable lock = server.managementLock()) {
+         checkStarted();
 
-      clearIO();
+         clearIO();
 
-      Map<String, Long> result = new HashMap<>();
-      try {
-         Filter filter = FilterImpl.createFilter(filterStr);
-         SimpleString groupByProperty = SimpleString.toSimpleString(groupByPropertyStr);
-         if (filter == null && groupByProperty == null) {
-            result.put(null, getMessageCount());
-         } else {
-            final int limit = addressSettingsRepository.getMatch(address).getManagementBrowsePageSize();
-            int count = 0;
-            try (LinkedListIterator<MessageReference> iterator = queue.browserIterator()) {
-               try {
-                  while (iterator.hasNext() && count++ < limit) {
-                     Message message = iterator.next().getMessage();
-                     internalComputeMessage(result, filter, groupByProperty, message);
+         Map<String, Long> result = new HashMap<>();
+         try {
+            Filter filter = FilterImpl.createFilter(filterStr);
+            SimpleString groupByProperty = SimpleString.toSimpleString(groupByPropertyStr);
+            if (filter == null && groupByProperty == null) {
+               result.put(null, getMessageCount());
+            } else {
+               final int limit = addressSettingsRepository.getMatch(address).getManagementBrowsePageSize();
+               int count = 0;
+               try (LinkedListIterator<MessageReference> iterator = queue.browserIterator()) {
+                  try {
+                     while (iterator.hasNext() && count++ < limit) {
+                        Message message = iterator.next().getMessage();
+                        internalComputeMessage(result, filter, groupByProperty, message);
+                     }
+                  } catch (NoSuchElementException ignored) {
+                     // this could happen through paging browsing
                   }
-               } catch (NoSuchElementException ignored) {
-                  // this could happen through paging browsing
                }
             }
+            return result;
+         } finally {
+            blockOnIO();
          }
-         return result;
-      } finally {
-         blockOnIO();
       }
    }
 
@@ -1019,26 +1028,26 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
    }
 
    private Map<String, Long> internalCountDeliveryMessages(final String filterStr, final String groupByPropertyStr) throws Exception {
-      checkStarted();
+      // it could be a long running task
+      try (AutoCloseable lock = server.managementLock()) {
+         checkStarted();
 
-      clearIO();
+         clearIO();
 
-      Map<String, Long> result = new HashMap<>();
-      try {
-         Filter filter = FilterImpl.createFilter(filterStr);
-         SimpleString groupByProperty = SimpleString.toSimpleString(groupByPropertyStr);
-         if (filter == null && groupByProperty == null) {
-            result.put(null, Long.valueOf(getDeliveringCount()));
-         } else {
-            Map<String, List<MessageReference>> deliveringMessages = queue.getDeliveringMessages();
-            deliveringMessages.forEach((s, messageReferenceList) ->
-                            messageReferenceList.forEach(messageReference ->
-                                    internalComputeMessage(result, filter, groupByProperty, messageReference.getMessage())
-                            ));
+         Map<String, Long> result = new HashMap<>();
+         try {
+            Filter filter = FilterImpl.createFilter(filterStr);
+            SimpleString groupByProperty = SimpleString.toSimpleString(groupByPropertyStr);
+            if (filter == null && groupByProperty == null) {
+               result.put(null, Long.valueOf(getDeliveringCount()));
+            } else {
+               Map<String, List<MessageReference>> deliveringMessages = queue.getDeliveringMessages();
+               deliveringMessages.forEach((s, messageReferenceList) -> messageReferenceList.forEach(messageReference -> internalComputeMessage(result, filter, groupByProperty, messageReference.getMessage())));
+            }
+            return result;
+         } finally {
+            blockOnIO();
          }
-         return result;
-      } finally {
-         blockOnIO();
       }
    }
 
@@ -1057,18 +1066,21 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
 
    @Override
    public boolean removeMessage(final long messageID) throws Exception {
-      if (AuditLogger.isBaseLoggingEnabled()) {
-         AuditLogger.removeMessage(queue, messageID);
-      }
-      checkStarted();
+      // this is a critical task, we need to prevent parallel tasks running
+      try (AutoCloseable lock = server.managementLock()) {
+         if (AuditLogger.isBaseLoggingEnabled()) {
+            AuditLogger.removeMessage(queue, messageID);
+         }
+         checkStarted();
 
-      clearIO();
-      try {
-         return queue.deleteReference(messageID);
-      } catch (ActiveMQException e) {
-         throw new IllegalStateException(e.getMessage());
-      } finally {
-         blockOnIO();
+         clearIO();
+         try {
+            return queue.deleteReference(messageID);
+         } catch (ActiveMQException e) {
+            throw new IllegalStateException(e.getMessage());
+         } finally {
+            blockOnIO();
+         }
       }
    }
 
@@ -1079,30 +1091,33 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
 
    @Override
    public int removeMessages(final int flushLimit, final String filterStr) throws Exception {
-      if (AuditLogger.isBaseLoggingEnabled()) {
-         AuditLogger.removeMessages(queue, flushLimit, filterStr);
-      }
-      checkStarted();
-
-      clearIO();
-      try {
-         Filter filter = FilterImpl.createFilter(filterStr);
+      // this is a critical task, we need to prevent parallel tasks running
+      try (AutoCloseable lock = server.managementLock()) {
+         if (AuditLogger.isBaseLoggingEnabled()) {
+            AuditLogger.removeMessages(queue, flushLimit, filterStr);
+         }
+         checkStarted();
 
-         int removed = 0;
+         clearIO();
          try {
-            removed = queue.deleteMatchingReferences(flushLimit, filter);
-            if (AuditLogger.isResourceLoggingEnabled()) {
-               AuditLogger.removeMessagesSuccess(removed, queue.getName().toString());
-            }
-         } catch (Exception e) {
-            if (AuditLogger.isResourceLoggingEnabled()) {
-               AuditLogger.removeMessagesFailure(queue.getName().toString());
+            Filter filter = FilterImpl.createFilter(filterStr);
+
+            int removed = 0;
+            try {
+               removed = queue.deleteMatchingReferences(flushLimit, filter);
+               if (AuditLogger.isResourceLoggingEnabled()) {
+                  AuditLogger.removeMessagesSuccess(removed, queue.getName().toString());
+               }
+            } catch (Exception e) {
+               if (AuditLogger.isResourceLoggingEnabled()) {
+                  AuditLogger.removeMessagesFailure(queue.getName().toString());
+               }
+               throw e;
             }
-            throw e;
+            return removed;
+         } finally {
+            blockOnIO();
          }
-         return removed;
-      } finally {
-         blockOnIO();
       }
    }
 
@@ -1113,87 +1128,99 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
 
    @Override
    public boolean expireMessage(final long messageID) throws Exception {
-      if (AuditLogger.isBaseLoggingEnabled()) {
-         AuditLogger.expireMessage(queue, messageID);
-      }
-      checkStarted();
+      // this is a critical task, we need to prevent parallel tasks running
+      try (AutoCloseable lock = server.managementLock()) {
+         if (AuditLogger.isBaseLoggingEnabled()) {
+            AuditLogger.expireMessage(queue, messageID);
+         }
+         checkStarted();
 
-      clearIO();
-      try {
-         return queue.expireReference(messageID);
-      } finally {
-         blockOnIO();
+         clearIO();
+         try {
+            return queue.expireReference(messageID);
+         } finally {
+            blockOnIO();
+         }
       }
    }
 
    @Override
    public int expireMessages(final String filterStr) throws Exception {
-      if (AuditLogger.isBaseLoggingEnabled()) {
-         AuditLogger.expireMessages(queue, filterStr);
-      }
-      checkStarted();
+      // this is a critical task, we need to prevent parallel tasks running
+      try (AutoCloseable lock = server.managementLock()) {
+         if (AuditLogger.isBaseLoggingEnabled()) {
+            AuditLogger.expireMessages(queue, filterStr);
+         }
+         checkStarted();
 
-      clearIO();
-      try {
-         Filter filter = FilterImpl.createFilter(filterStr);
-         return queue.expireReferences(filter);
-      } catch (ActiveMQException e) {
-         throw new IllegalStateException(e.getMessage());
-      } finally {
-         blockOnIO();
+         clearIO();
+         try {
+            Filter filter = FilterImpl.createFilter(filterStr);
+            return queue.expireReferences(filter);
+         } catch (ActiveMQException e) {
+            throw new IllegalStateException(e.getMessage());
+         } finally {
+            blockOnIO();
+         }
       }
    }
 
    @Override
    public boolean retryMessage(final long messageID) throws Exception {
-      if (AuditLogger.isBaseLoggingEnabled()) {
-         AuditLogger.retryMessage(queue, messageID);
-      }
+      // this is a critical task, we need to prevent parallel tasks running
+      try (AutoCloseable lock = server.managementLock()) {
+         if (AuditLogger.isBaseLoggingEnabled()) {
+            AuditLogger.retryMessage(queue, messageID);
+         }
 
-      checkStarted();
-      clearIO();
+         checkStarted();
+         clearIO();
 
-      try {
-         Filter singleMessageFilter = new Filter() {
-            @Override
-            public boolean match(Message message) {
-               return message.getMessageID() == messageID;
-            }
+         try {
+            Filter singleMessageFilter = new Filter() {
+               @Override
+               public boolean match(Message message) {
+                  return message.getMessageID() == messageID;
+               }
 
-            @Override
-            public boolean match(Map<String, String> map) {
-               return false;
-            }
+               @Override
+               public boolean match(Map<String, String> map) {
+                  return false;
+               }
 
-            @Override
-            public boolean match(Filterable filterable) {
-               return false;
-            }
+               @Override
+               public boolean match(Filterable filterable) {
+                  return false;
+               }
 
-            @Override
-            public SimpleString getFilterString() {
-               return new SimpleString("custom filter for MESSAGEID= messageID");
-            }
-         };
+               @Override
+               public SimpleString getFilterString() {
+                  return new SimpleString("custom filter for MESSAGEID= messageID");
+               }
+            };
 
-         return queue.retryMessages(singleMessageFilter) > 0;
-      } finally {
-         blockOnIO();
+            return queue.retryMessages(singleMessageFilter) > 0;
+         } finally {
+            blockOnIO();
+         }
       }
    }
 
    @Override
    public int retryMessages() throws Exception {
-      if (AuditLogger.isBaseLoggingEnabled()) {
-         AuditLogger.retryMessages(queue);
-      }
-      checkStarted();
-      clearIO();
+      // this is a critical task, we need to prevent parallel tasks running
+      try (AutoCloseable lock = server.managementLock()) {
+         if (AuditLogger.isBaseLoggingEnabled()) {
+            AuditLogger.retryMessages(queue);
+         }
+         checkStarted();
+         clearIO();
 
-      try {
-         return queue.retryMessages(null);
-      } finally {
-         blockOnIO();
+         try {
+            return queue.retryMessages(null);
+         } finally {
+            blockOnIO();
+         }
       }
    }
 
@@ -1206,22 +1233,25 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
    public boolean moveMessage(final long messageID,
                               final String otherQueueName,
                               final boolean rejectDuplicates) throws Exception {
-      if (AuditLogger.isBaseLoggingEnabled()) {
-         AuditLogger.moveMessage(queue, messageID, otherQueueName, rejectDuplicates);
-      }
-      checkStarted();
+      // this is a critical task, we need to prevent parallel tasks running
+      try (AutoCloseable lock = server.managementLock()) {
+         if (AuditLogger.isBaseLoggingEnabled()) {
+            AuditLogger.moveMessage(queue, messageID, otherQueueName, rejectDuplicates);
+         }
+         checkStarted();
 
-      clearIO();
-      try {
-         Binding binding = server.getPostOffice().getBinding(new SimpleString(otherQueueName));
+         clearIO();
+         try {
+            Binding binding = server.getPostOffice().getBinding(new SimpleString(otherQueueName));
 
-         if (binding == null) {
-            throw ActiveMQMessageBundle.BUNDLE.noQueueFound(otherQueueName);
-         }
+            if (binding == null) {
+               throw ActiveMQMessageBundle.BUNDLE.noQueueFound(otherQueueName);
+            }
 
-         return queue.moveReference(messageID, binding.getAddress(), binding, rejectDuplicates);
-      } finally {
-         blockOnIO();
+            return queue.moveReference(messageID, binding.getAddress(), binding, rejectDuplicates);
+         } finally {
+            blockOnIO();
+         }
       }
 
    }
@@ -1245,25 +1275,28 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
                            final String otherQueueName,
                            final boolean rejectDuplicates,
                            final int messageCount) throws Exception {
-      if (AuditLogger.isBaseLoggingEnabled()) {
-         AuditLogger.moveMessages(queue, flushLimit, filterStr, otherQueueName, rejectDuplicates, messageCount);
-      }
-      checkStarted();
+      // this is a critical task, we need to prevent parallel tasks running
+      try (AutoCloseable lock = server.managementLock()) {
+         if (AuditLogger.isBaseLoggingEnabled()) {
+            AuditLogger.moveMessages(queue, flushLimit, filterStr, otherQueueName, rejectDuplicates, messageCount);
+         }
+         checkStarted();
 
-      clearIO();
-      try {
-         Filter filter = FilterImpl.createFilter(filterStr);
+         clearIO();
+         try {
+            Filter filter = FilterImpl.createFilter(filterStr);
 
-         Binding binding = server.getPostOffice().getBinding(new SimpleString(otherQueueName));
+            Binding binding = server.getPostOffice().getBinding(new SimpleString(otherQueueName));
 
-         if (binding == null) {
-            throw ActiveMQMessageBundle.BUNDLE.noQueueFound(otherQueueName);
-         }
+            if (binding == null) {
+               throw ActiveMQMessageBundle.BUNDLE.noQueueFound(otherQueueName);
+            }
 
-         int retValue = queue.moveReferences(flushLimit, filter, binding.getAddress(), rejectDuplicates, messageCount, binding);
-         return retValue;
-      } finally {
-         blockOnIO();
+            int retValue = queue.moveReferences(flushLimit, filter, binding.getAddress(), rejectDuplicates, messageCount, binding);
+            return retValue;
+         } finally {
+            blockOnIO();
+         }
       }
 
    }
@@ -1277,18 +1310,21 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
 
    @Override
    public int sendMessagesToDeadLetterAddress(final String filterStr) throws Exception {
-      if (AuditLogger.isBaseLoggingEnabled()) {
-         AuditLogger.sendMessagesToDeadLetterAddress(queue, filterStr);
-      }
-      checkStarted();
+      // this is a critical task, we need to prevent parallel tasks running
+      try (AutoCloseable lock = server.managementLock()) {
+         if (AuditLogger.isBaseLoggingEnabled()) {
+            AuditLogger.sendMessagesToDeadLetterAddress(queue, filterStr);
+         }
+         checkStarted();
 
-      clearIO();
-      try {
-         Filter filter = FilterImpl.createFilter(filterStr);
+         clearIO();
+         try {
+            Filter filter = FilterImpl.createFilter(filterStr);
 
-         return queue.sendMessagesToDeadLetterAddress(filter);
-      } finally {
-         blockOnIO();
+            return queue.sendMessagesToDeadLetterAddress(filter);
+         } finally {
+            blockOnIO();
+         }
       }
    }
 
@@ -1310,35 +1346,41 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
                              final String user,
                              final String password,
                              boolean createMessageId) throws Exception {
-      if (AuditLogger.isBaseLoggingEnabled()) {
-         AuditLogger.sendMessageThroughManagement(queue, headers, type, body, durable, user, "****");
-      }
-      try {
-         String s = sendMessage(queue.getAddress(), server, headers, type, body, durable, user, password, createMessageId, queue.getID());
-         if (AuditLogger.isResourceLoggingEnabled()) {
-            AuditLogger.sendMessageSuccess(queue.getName().toString(), user);
+      // this is a critical task, we need to prevent parallel tasks running
+      try (AutoCloseable lock = server.managementLock()) {
+         if (AuditLogger.isBaseLoggingEnabled()) {
+            AuditLogger.sendMessageThroughManagement(queue, headers, type, body, durable, user, "****");
          }
-         return s;
-      } catch (Exception e) {
-         if (AuditLogger.isResourceLoggingEnabled()) {
-            AuditLogger.sendMessageFailure(queue.getName().toString(), user);
+         try {
+            String s = sendMessage(queue.getAddress(), server, headers, type, body, durable, user, password, createMessageId, queue.getID());
+            if (AuditLogger.isResourceLoggingEnabled()) {
+               AuditLogger.sendMessageSuccess(queue.getName().toString(), user);
+            }
+            return s;
+         } catch (Exception e) {
+            if (AuditLogger.isResourceLoggingEnabled()) {
+               AuditLogger.sendMessageFailure(queue.getName().toString(), user);
+            }
+            throw new IllegalStateException(e.getMessage());
          }
-         throw new IllegalStateException(e.getMessage());
       }
    }
 
    @Override
    public boolean sendMessageToDeadLetterAddress(final long messageID) throws Exception {
-      if (AuditLogger.isBaseLoggingEnabled()) {
-         AuditLogger.sendMessageToDeadLetterAddress(queue, messageID);
-      }
-      checkStarted();
+      // this is a critical task, we need to prevent parallel tasks running
+      try (AutoCloseable lock = server.managementLock()) {
+         if (AuditLogger.isBaseLoggingEnabled()) {
+            AuditLogger.sendMessageToDeadLetterAddress(queue, messageID);
+         }
+         checkStarted();
 
-      clearIO();
-      try {
-         return queue.sendMessageToDeadLetterAddress(messageID);
-      } finally {
-         blockOnIO();
+         clearIO();
+         try {
+            return queue.sendMessageToDeadLetterAddress(messageID);
+         } finally {
+            blockOnIO();
+         }
       }
    }
 
@@ -1554,52 +1596,55 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
 
    @Override
    public CompositeData[] browse(int page, int pageSize, String filter) throws Exception {
-      if (AuditLogger.isBaseLoggingEnabled()) {
-         AuditLogger.browse(queue, page, pageSize);
-      }
-      checkStarted();
+      // this is a critical task, we need to prevent parallel tasks running
+      try (AutoCloseable lock = server.managementLock()) {
+         if (AuditLogger.isBaseLoggingEnabled()) {
+            AuditLogger.browse(queue, page, pageSize);
+         }
+         checkStarted();
 
-      clearIO();
-      try {
-         long index = 0;
-         long start = (long) (page - 1) * pageSize;
-         long end = Math.min(page * pageSize, queue.getMessageCount());
+         clearIO();
+         try {
+            long index = 0;
+            long start = (long) (page - 1) * pageSize;
+            long end = Math.min(page * pageSize, queue.getMessageCount());
 
-         ArrayList<CompositeData> c = new ArrayList<>();
-         Filter thefilter = FilterImpl.createFilter(filter);
+            ArrayList<CompositeData> c = new ArrayList<>();
+            Filter thefilter = FilterImpl.createFilter(filter);
 
-         final int attributeSizeLimit = addressSettingsRepository.getMatch(address).getManagementMessageAttributeSizeLimit();
-         try (LinkedListIterator<MessageReference> iterator = queue.browserIterator()) {
-            try {
-               while (iterator.hasNext() && index < end) {
-                  MessageReference ref = iterator.next();
-                  if (thefilter == null || thefilter.match(ref.getMessage())) {
-                     if (index >= start) {
-                        c.add(ref.getMessage().toCompositeData(attributeSizeLimit, ref.getDeliveryCount()));
+            final int attributeSizeLimit = addressSettingsRepository.getMatch(address).getManagementMessageAttributeSizeLimit();
+            try (LinkedListIterator<MessageReference> iterator = queue.browserIterator()) {
+               try {
+                  while (iterator.hasNext() && index < end) {
+                     MessageReference ref = iterator.next();
+                     if (thefilter == null || thefilter.match(ref.getMessage())) {
+                        if (index >= start) {
+                           c.add(ref.getMessage().toCompositeData(attributeSizeLimit, ref.getDeliveryCount()));
+                        }
+                        //we only increase the index if we add a message, otherwise we could stop before we get to a filtered message
+                        index++;
                      }
-                     //we only increase the index if we add a message, otherwise we could stop before we get to a filtered message
-                     index++;
                   }
+               } catch (NoSuchElementException ignored) {
+                  // this could happen through paging browsing
                }
-            } catch (NoSuchElementException ignored) {
-               // this could happen through paging browsing
-            }
 
-            CompositeData[] rc = new CompositeData[c.size()];
-            c.toArray(rc);
+               CompositeData[] rc = new CompositeData[c.size()];
+               c.toArray(rc);
+               if (AuditLogger.isResourceLoggingEnabled()) {
+                  AuditLogger.browseMessagesSuccess(queue.getName().toString(), c.size());
+               }
+               return rc;
+            }
+         } catch (Exception e) {
+            logger.warn(e.getMessage(), e);
             if (AuditLogger.isResourceLoggingEnabled()) {
-               AuditLogger.browseMessagesSuccess(queue.getName().toString(), c.size());
+               AuditLogger.browseMessagesFailure(queue.getName().toString());
             }
-            return rc;
-         }
-      } catch (Exception e) {
-         logger.warn(e.getMessage(), e);
-         if (AuditLogger.isResourceLoggingEnabled()) {
-            AuditLogger.browseMessagesFailure(queue.getName().toString());
+            throw new IllegalStateException(e.getMessage());
+         } finally {
+            blockOnIO();
          }
-         throw new IllegalStateException(e.getMessage());
-      } finally {
-         blockOnIO();
       }
    }
 
@@ -1609,46 +1654,49 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
    }
    @Override
    public CompositeData[] browse(String filter) throws Exception {
-      if (AuditLogger.isBaseLoggingEnabled()) {
-         AuditLogger.browse(queue, filter);
-      }
-      checkStarted();
+      // this is a critical task, we need to prevent parallel tasks running
+      try (AutoCloseable lock = server.managementLock()) {
+         if (AuditLogger.isBaseLoggingEnabled()) {
+            AuditLogger.browse(queue, filter);
+         }
+         checkStarted();
 
-      clearIO();
-      try {
-         final AddressSettings addressSettings = addressSettingsRepository.getMatch(address);
-         final int attributeSizeLimit = addressSettings.getManagementMessageAttributeSizeLimit();
-         final int limit = addressSettings.getManagementBrowsePageSize();
-         int currentPageSize = 0;
-         ArrayList<CompositeData> c = new ArrayList<>();
-         Filter thefilter = FilterImpl.createFilter(filter);
-         try (LinkedListIterator<MessageReference> iterator = queue.browserIterator()) {
-            try {
-               while (iterator.hasNext() && currentPageSize++ < limit) {
-                  MessageReference ref = iterator.next();
-                  if (thefilter == null || thefilter.match(ref.getMessage())) {
-                     c.add(ref.getMessage().toCompositeData(attributeSizeLimit, ref.getDeliveryCount()));
+         clearIO();
+         try {
+            final AddressSettings addressSettings = addressSettingsRepository.getMatch(address);
+            final int attributeSizeLimit = addressSettings.getManagementMessageAttributeSizeLimit();
+            final int limit = addressSettings.getManagementBrowsePageSize();
+            int currentPageSize = 0;
+            ArrayList<CompositeData> c = new ArrayList<>();
+            Filter thefilter = FilterImpl.createFilter(filter);
+            try (LinkedListIterator<MessageReference> iterator = queue.browserIterator()) {
+               try {
+                  while (iterator.hasNext() && currentPageSize++ < limit) {
+                     MessageReference ref = iterator.next();
+                     if (thefilter == null || thefilter.match(ref.getMessage())) {
+                        c.add(ref.getMessage().toCompositeData(attributeSizeLimit, ref.getDeliveryCount()));
 
+                     }
                   }
+               } catch (NoSuchElementException ignored) {
+                  // this could happen through paging browsing
                }
-            } catch (NoSuchElementException ignored) {
-               // this could happen through paging browsing
-            }
 
-            CompositeData[] rc = new CompositeData[c.size()];
-            c.toArray(rc);
+               CompositeData[] rc = new CompositeData[c.size()];
+               c.toArray(rc);
+               if (AuditLogger.isResourceLoggingEnabled()) {
+                  AuditLogger.browseMessagesSuccess(queue.getName().toString(), currentPageSize);
+               }
+               return rc;
+            }
+         } catch (ActiveMQException e) {
             if (AuditLogger.isResourceLoggingEnabled()) {
-               AuditLogger.browseMessagesSuccess(queue.getName().toString(), currentPageSize);
+               AuditLogger.browseMessagesFailure(queue.getName().toString());
             }
-            return rc;
-         }
-      } catch (ActiveMQException e) {
-         if (AuditLogger.isResourceLoggingEnabled()) {
-            AuditLogger.browseMessagesFailure(queue.getName().toString());
+            throw new IllegalStateException(e.getMessage());
+         } finally {
+            blockOnIO();
          }
-         throw new IllegalStateException(e.getMessage());
-      } finally {
-         blockOnIO();
       }
    }
 
@@ -1714,32 +1762,35 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
 
    @Override
    public String listGroupsAsJSON() throws Exception {
-      if (AuditLogger.isBaseLoggingEnabled()) {
-         AuditLogger.listGroupsAsJSON(queue);
-      }
-      checkStarted();
+      // prevent parallel tasks running
+      try (AutoCloseable lock = server.managementLock()) {
+         if (AuditLogger.isBaseLoggingEnabled()) {
+            AuditLogger.listGroupsAsJSON(queue);
+         }
+         checkStarted();
 
-      clearIO();
-      try {
-         Map<SimpleString, Consumer> groups = queue.getGroups();
+         clearIO();
+         try {
+            Map<SimpleString, Consumer> groups = queue.getGroups();
 
-         JsonArrayBuilder jsonArray = JsonLoader.createArrayBuilder();
+            JsonArrayBuilder jsonArray = JsonLoader.createArrayBuilder();
 
-         for (Map.Entry<SimpleString, Consumer> group : groups.entrySet()) {
+            for (Map.Entry<SimpleString, Consumer> group : groups.entrySet()) {
 
-            if (group.getValue() instanceof ServerConsumer) {
-               ServerConsumer serverConsumer = (ServerConsumer) group.getValue();
+               if (group.getValue() instanceof ServerConsumer) {
+                  ServerConsumer serverConsumer = (ServerConsumer) group.getValue();
 
-               JsonObjectBuilder obj = JsonLoader.createObjectBuilder().add("groupID", group.getKey().toString()).add("consumerID", serverConsumer.getID()).add("connectionID", serverConsumer.getConnectionID().toString()).add("sessionID", serverConsumer.getSessionID()).add("browseOnly", serverConsumer.isBrowseOnly()).add("creationTime", serverConsumer.getCreationTime());
+                  JsonObjectBuilder obj = JsonLoader.createObjectBuilder().add("groupID", group.getKey().toString()).add("consumerID", serverConsumer.getID()).add("connectionID", serverConsumer.getConnectionID().toString()).add("sessionID", serverConsumer.getSessionID()).add("browseOnly", serverConsumer.isBrowseOnly()).add("creationTime", serverConsumer.getCreationTime());
+
+                  jsonArray.add(obj);
+               }
 
-               jsonArray.add(obj);
             }
 
+            return jsonArray.build().toString();
+         } finally {
+            blockOnIO();
          }
-
-         return jsonArray.build().toString();
-      } finally {
-         blockOnIO();
       }
    }
 
@@ -1969,31 +2020,37 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
 
    @Override
    public void deliverScheduledMessages(String filter) throws Exception {
-      if (AuditLogger.isBaseLoggingEnabled()) {
-         AuditLogger.deliverScheduledMessage(queue, filter);
-      }
-      checkStarted();
+      // prevent parallel tasks running
+      try (AutoCloseable lock = server.managementLock()) {
+         if (AuditLogger.isBaseLoggingEnabled()) {
+            AuditLogger.deliverScheduledMessage(queue, filter);
+         }
+         checkStarted();
 
-      clearIO();
-      try {
-         queue.deliverScheduledMessages(filter);
-      } finally {
-         blockOnIO();
+         clearIO();
+         try {
+            queue.deliverScheduledMessages(filter);
+         } finally {
+            blockOnIO();
+         }
       }
    }
 
    @Override
    public void deliverScheduledMessage(long messageId) throws Exception {
-      if (AuditLogger.isBaseLoggingEnabled()) {
-         AuditLogger.deliverScheduledMessage(queue, messageId);
-      }
-      checkStarted();
+      // prevent parallel tasks running
+      try (AutoCloseable lock = server.managementLock()) {
+         if (AuditLogger.isBaseLoggingEnabled()) {
+            AuditLogger.deliverScheduledMessage(queue, messageId);
+         }
+         checkStarted();
 
-      clearIO();
-      try {
-         queue.deliverScheduledMessage(messageId);
-      } finally {
-         blockOnIO();
+         clearIO();
+         try {
+            queue.deliverScheduledMessage(messageId);
+         } finally {
+            blockOnIO();
+         }
       }
    }
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index bcfab14924..9e122d670e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -50,6 +50,7 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
@@ -277,7 +278,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
 
    /** Certain management operations shouldn't use more than one thread.
     *  this semaphore is used to guarantee a single thread used. */
-   private final Semaphore managementSemaphore = new Semaphore(1);
+   private final ReentrantLock managementLock = new ReentrantLock();
 
    /**
     * This is a thread pool for io tasks only.
@@ -522,7 +523,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
       if (replayManager == null) {
          throw ActiveMQMessageBundle.BUNDLE.noRetention();
       }
-      replayManager.replay(start, end, address, target, filter);
+      try (AutoCloseable lock = managementLock()) {
+         replayManager.replay(start, end, address, target, filter);
+      }
    }
 
    /**
@@ -4626,10 +4629,10 @@ public class ActiveMQServerImpl implements ActiveMQServer {
 
    @Override
    public AutoCloseable managementLock() throws Exception {
-      if (!managementSemaphore.tryAcquire(1, TimeUnit.MINUTES)) {
+      if (!managementLock.tryLock(1, TimeUnit.MINUTES)) {
          throw ActiveMQMessageBundle.BUNDLE.managementBusy();
       } else {
-         return managementSemaphore::release;
+         return managementLock::unlock;
       }
    }
 }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/replay/ReplayManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/replay/ReplayManager.java
index bb46c337ea..a425b83b6f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/replay/ReplayManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/replay/ReplayManager.java
@@ -69,23 +69,7 @@ public class ReplayManager {
       this.retentionFolder = server.getConfiguration().getJournalRetentionLocation();
    }
 
-   public void replay(Date start, Date end, String sourceAddress, final String targetAddress, String filter) throws Exception {
-
-      if (running.compareAndSet(false, true)) {
-         try {
-            actualReplay(start, end, sourceAddress, targetAddress, filter);
-         } catch (Exception e) {
-            logger.warn(e.getMessage());
-            throw e;
-         } finally {
-            running.set(false);
-         }
-      } else {
-         throw new RuntimeException("Replay manager is currently busy with another operation");
-      }
-   }
-
-   private void actualReplay(Date start, Date end, String sourceAddress, String targetAddressParameter, String filterStr) throws Exception {
+   public void replay(Date start, Date end, String sourceAddress, String targetAddressParameter, String filterStr) throws Exception {
       logger.debug("Replay::{}", sourceAddress);
 
       if (sourceAddress == null) {