You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2023/01/13 14:23:57 UTC

[activemq-artemis] branch main updated (11c26943fa -> bc1258ab25)

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


    from 11c26943fa ARTEMIS-2876: remove duplicate dependency entry to fix immediate warning upon building
     new b565a8a7b9 ARTEMIS-4114 Avoiding deadlock during scale down
     new 4550fcf47c ARTEMIS-4116 Management lock to avoid multiple long running tasks running from user requests
     new bc1258ab25 ARTEMIS-3609 Using a different thread for the completion listener

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../activemq/artemis/api/core/ICoreMessage.java    |   8 +
 .../api/core/management/ActiveMQServerControl.java |   4 +-
 .../core/client/impl/ClientMessageImpl.java        |  11 +
 .../core/client/impl/ClientProducerImpl.java       |   6 +-
 .../core/client/impl/ClientSessionFactoryImpl.java |   2 +-
 .../core/client/impl/ClientSessionImpl.java        |  23 +-
 .../core/client/impl/ClientSessionInternal.java    |   9 +-
 .../impl/SendAcknowledgementHandlerWrapper.java    |  22 +-
 .../protocol/core/impl/ActiveMQSessionContext.java |   2 +-
 .../management/impl/ActiveMQServerControlImpl.java | 450 +++++++-------
 .../core/management/impl/AddressControlImpl.java   | 128 ++--
 .../core/management/impl/QueueControlImpl.java     | 671 +++++++++++----------
 .../cluster/impl/ClusterConnectionBridge.java      |  19 +-
 .../core/server/impl/ActiveMQServerImpl.java       |  11 +-
 .../artemis/core/server/replay/ReplayManager.java  |  18 +-
 15 files changed, 738 insertions(+), 646 deletions(-)


[activemq-artemis] 02/03: ARTEMIS-4116 Management lock to avoid multiple long running tasks running from user requests

Posted by cl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit 4550fcf47c8c87d4e7cc0c06207a09c7de41dc65
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Thu Dec 15 13:55:03 2022 -0500

    ARTEMIS-4116 Management lock to avoid multiple long running tasks running from user requests
---
 .../api/core/management/ActiveMQServerControl.java |   4 +-
 .../management/impl/ActiveMQServerControlImpl.java | 450 +++++++-------
 .../core/management/impl/AddressControlImpl.java   | 128 ++--
 .../core/management/impl/QueueControlImpl.java     | 671 +++++++++++----------
 .../core/server/impl/ActiveMQServerImpl.java       |  11 +-
 .../artemis/core/server/replay/ReplayManager.java  |  18 +-
 6 files changed, 679 insertions(+), 603 deletions(-)

diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
index 248f44d4a6..2229abfee5 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
@@ -526,7 +526,7 @@ public interface ActiveMQServerControl {
 
    // Operations ----------------------------------------------------
    @Operation(desc = "Isolate the broker", impact = MBeanOperationInfo.ACTION)
-   boolean freezeReplication();
+   boolean freezeReplication() throws Exception;
 
    @Operation(desc = "Create an address", impact = MBeanOperationInfo.ACTION)
    String createAddress(@Parameter(name = "name", desc = "The name of the address") String name,
@@ -1905,7 +1905,7 @@ public interface ActiveMQServerControl {
     * Returns the names of the queues created on this server with the given routing-type.
     */
    @Operation(desc = "Names of the queues created on this server with the given routing-type (i.e. ANYCAST or MULTICAST)", impact = MBeanOperationInfo.INFO)
-   String[] getQueueNames(@Parameter(name = "routingType", desc = "The routing type, MULTICAST or ANYCAST") String routingType);
+   String[] getQueueNames(@Parameter(name = "routingType", desc = "The routing type, MULTICAST or ANYCAST") String routingType) throws Exception;
 
    /**
     * Returns the names of the cluster-connections deployed on this server.
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
index 8e5ce81e08..25deb59e09 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
@@ -826,17 +826,19 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
    }
 
    @Override
-   public boolean freezeReplication() {
+   public boolean freezeReplication() throws Exception {
       if (AuditLogger.isBaseLoggingEnabled()) {
          AuditLogger.freezeReplication(this.server);
       }
-      Activation activation = server.getActivation();
-      if (activation instanceof SharedNothingLiveActivation) {
-         SharedNothingLiveActivation liveActivation = (SharedNothingLiveActivation) activation;
-         liveActivation.freezeReplication();
-         return true;
+      try (AutoCloseable lock = server.managementLock()) {
+         Activation activation = server.getActivation();
+         if (activation instanceof SharedNothingLiveActivation) {
+            SharedNothingLiveActivation liveActivation = (SharedNothingLiveActivation) activation;
+            liveActivation.freezeReplication();
+            return true;
+         }
+         return false;
       }
-      return false;
    }
 
    private enum AddressInfoTextFormatter {
@@ -966,21 +968,25 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
 
    @Override
    public void deleteAddress(String name, boolean force) throws Exception {
-      checkStarted();
 
-      clearIO();
-      try {
-         server.removeAddressInfo(new SimpleString(name), null, force);
-         if (AuditLogger.isResourceLoggingEnabled()) {
-            AuditLogger.deleteAddressSuccess(name);
-         }
-      } catch (ActiveMQException e) {
-         if (AuditLogger.isResourceLoggingEnabled()) {
-            AuditLogger.deleteAddressFailure(name);
+      // delete might be a long running task, we ensure only one large task running
+      try (AutoCloseable lock = server.managementLock()) {
+         checkStarted();
+
+         clearIO();
+         try {
+            server.removeAddressInfo(new SimpleString(name), null, force);
+            if (AuditLogger.isResourceLoggingEnabled()) {
+               AuditLogger.deleteAddressSuccess(name);
+            }
+         } catch (ActiveMQException e) {
+            if (AuditLogger.isResourceLoggingEnabled()) {
+               AuditLogger.deleteAddressFailure(name);
+            }
+            throw new IllegalStateException(e.getMessage());
+         } finally {
+            blockOnIO();
          }
-         throw new IllegalStateException(e.getMessage());
-      } finally {
-         blockOnIO();
       }
    }
 
@@ -1003,10 +1009,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
 
       clearIO();
       try {
-         server.createQueue(new QueueConfiguration(name)
-                            .setAddress(address)
-                            .setFilterString(filterStr)
-                            .setDurable(durable));
+         server.createQueue(new QueueConfiguration(name).setAddress(address).setFilterString(filterStr).setDurable(durable));
       } finally {
          blockOnIO();
       }
@@ -1232,10 +1235,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
                              boolean autoCreateAddress,
                              long ringSize) throws Exception {
       if (AuditLogger.isBaseLoggingEnabled()) {
-         AuditLogger.createQueue(this.server, null, null, address, routingType, name, filterStr, durable,
-                  maxConsumers, purgeOnNoConsumers, exclusive, groupRebalance, groupBuckets, groupFirstKey,
-                  lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch,
-                  autoDelete, autoDeleteDelay, autoDeleteMessageCount, autoCreateAddress, ringSize);
+         AuditLogger.createQueue(this.server, null, null, address, routingType, name, filterStr, durable, maxConsumers, purgeOnNoConsumers, exclusive, groupRebalance, groupBuckets, groupFirstKey, lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, autoDelete, autoDeleteDelay, autoDeleteMessageCount, autoCreateAddress, ringSize);
       }
       checkStarted();
 
@@ -1247,34 +1247,14 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
             filter = new SimpleString(filterStr);
          }
 
-         final Queue queue = server.createQueue(new QueueConfiguration(name)
-                                                   .setAddress(address)
-                                                   .setRoutingType(RoutingType.valueOf(routingType.toUpperCase()))
-                                                   .setFilterString(filter)
-                                                   .setDurable(durable)
-                                                   .setMaxConsumers(maxConsumers)
-                                                   .setPurgeOnNoConsumers(purgeOnNoConsumers)
-                                                   .setExclusive(exclusive)
-                                                   .setGroupRebalance(groupRebalance)
-                                                   .setGroupBuckets(groupBuckets)
-                                                   .setGroupFirstKey(groupFirstKey)
-                                                   .setLastValue(lastValue)
-                                                   .setLastValueKey(lastValueKey)
-                                                   .setNonDestructive(nonDestructive)
-                                                   .setConsumersBeforeDispatch(consumersBeforeDispatch)
-                                                   .setDelayBeforeDispatch(delayBeforeDispatch)
-                                                   .setAutoDelete(autoDelete)
-                                                   .setAutoDeleteDelay(autoDeleteDelay)
-                                                   .setAutoDeleteMessageCount(autoDeleteMessageCount)
-                                                   .setAutoCreateAddress(autoCreateAddress)
-                                                   .setRingSize(ringSize));
+         final Queue queue = server.createQueue(new QueueConfiguration(name).setAddress(address).setRoutingType(RoutingType.valueOf(routingType.toUpperCase())).setFilterString(filter).setDurable(durable).setMaxConsumers(maxConsumers).setPurgeOnNoConsumers(purgeOnNoConsumers).setExclusive(exclusive).setGroupRebalance(groupRebalance).setGroupBuckets(groupBuckets).setGroupFirstKey(groupFirstKey).setLastValue(lastValue).setLastValueKey(lastValueKey).setNonDestructive(nonDestructive).setConsu [...]
          if (AuditLogger.isResourceLoggingEnabled()) {
-            AuditLogger.createQueueSuccess( name, address, routingType);
+            AuditLogger.createQueueSuccess(name, address, routingType);
          }
          return QueueTextFormatter.Long.format(queue, new StringBuilder()).toString();
       } catch (ActiveMQException e) {
          if (AuditLogger.isResourceLoggingEnabled()) {
-            AuditLogger.createQueueFailure( name, address, routingType);
+            AuditLogger.createQueueFailure(name, address, routingType);
          }
          throw new IllegalStateException(e.getMessage());
       } finally {
@@ -1313,7 +1293,6 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
 
    @Override
    public String updateQueue(String queueConfigurationAsJson) throws Exception {
-
       if (AuditLogger.isBaseLoggingEnabled()) {
          AuditLogger.updateQueue(this.server, queueConfigurationAsJson);
       }
@@ -1414,8 +1393,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
                              String user,
                              Long ringSize) throws Exception {
       if (AuditLogger.isBaseLoggingEnabled()) {
-         AuditLogger.updateQueue(this.server, name, routingType, filter, maxConsumers, purgeOnNoConsumers,
-                  exclusive, groupRebalance, groupBuckets, groupFirstKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, user, ringSize);
+         AuditLogger.updateQueue(this.server, name, routingType, filter, maxConsumers, purgeOnNoConsumers, exclusive, groupRebalance, groupBuckets, groupFirstKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, user, ringSize);
       }
       checkStarted();
 
@@ -1592,27 +1570,30 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
 
    @Override
    public void destroyQueue(final String name, final boolean removeConsumers, final boolean forceAutoDeleteAddress) throws Exception {
-      if (AuditLogger.isBaseLoggingEnabled()) {
-         AuditLogger.destroyQueue(this.server, null, null, name, removeConsumers, forceAutoDeleteAddress);
-      }
-      checkStarted();
+      // destroy might be a long running task, we prevent multiple running tasks in this case
+      try (AutoCloseable lock = server.managementLock()) {
+         if (AuditLogger.isBaseLoggingEnabled()) {
+            AuditLogger.destroyQueue(this.server, null, null, name, removeConsumers, forceAutoDeleteAddress);
+         }
+         checkStarted();
 
-      clearIO();
-      try {
-         SimpleString queueName = new SimpleString(name);
+         clearIO();
          try {
-            server.destroyQueue(queueName, null, !removeConsumers, removeConsumers, forceAutoDeleteAddress);
-         } catch (Exception e) {
+            SimpleString queueName = new SimpleString(name);
+            try {
+               server.destroyQueue(queueName, null, !removeConsumers, removeConsumers, forceAutoDeleteAddress);
+            } catch (Exception e) {
+               if (AuditLogger.isResourceLoggingEnabled()) {
+                  AuditLogger.destroyQueueFailure(name);
+               }
+               throw e;
+            }
             if (AuditLogger.isResourceLoggingEnabled()) {
-               AuditLogger.destroyQueueFailure(name);
+               AuditLogger.destroyQueueSuccess(name);
             }
-            throw e;
-         }
-         if (AuditLogger.isResourceLoggingEnabled()) {
-            AuditLogger.destroyQueueSuccess(name);
+         } finally {
+            blockOnIO();
          }
-      } finally {
-         blockOnIO();
       }
    }
 
@@ -2149,56 +2130,64 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
 
    @Override
    public synchronized boolean commitPreparedTransaction(final String transactionAsBase64) throws Exception {
-      if (AuditLogger.isBaseLoggingEnabled()) {
-         AuditLogger.commitPreparedTransaction(this.server, transactionAsBase64);
-      }
-      checkStarted();
-
-      clearIO();
-      try {
-         List<Xid> xids = resourceManager.getPreparedTransactions();
+      // commit might be a long running task if dealing with a large transaction
+      // ensuring a single one just in case
+      try (AutoCloseable lock = server.managementLock()) {
+         if (AuditLogger.isBaseLoggingEnabled()) {
+            AuditLogger.commitPreparedTransaction(this.server, transactionAsBase64);
+         }
+         checkStarted();
 
-         for (Xid xid : xids) {
-            if (XidImpl.toBase64String(xid).equals(transactionAsBase64)) {
-               Transaction transaction = resourceManager.removeTransaction(xid, null);
-               transaction.commit(false);
-               long recordID = server.getStorageManager().storeHeuristicCompletion(xid, true);
-               storageManager.waitOnOperations();
-               resourceManager.putHeuristicCompletion(recordID, xid, true);
-               return true;
+         clearIO();
+         try {
+            List<Xid> xids = resourceManager.getPreparedTransactions();
+
+            for (Xid xid : xids) {
+               if (XidImpl.toBase64String(xid).equals(transactionAsBase64)) {
+                  Transaction transaction = resourceManager.removeTransaction(xid, null);
+                  transaction.commit(false);
+                  long recordID = server.getStorageManager().storeHeuristicCompletion(xid, true);
+                  storageManager.waitOnOperations();
+                  resourceManager.putHeuristicCompletion(recordID, xid, true);
+                  return true;
+               }
             }
+            return false;
+         } finally {
+            blockOnIO();
          }
-         return false;
-      } finally {
-         blockOnIO();
       }
    }
 
    @Override
    public synchronized boolean rollbackPreparedTransaction(final String transactionAsBase64) throws Exception {
-      if (AuditLogger.isBaseLoggingEnabled()) {
-         AuditLogger.rollbackPreparedTransaction(this.server, transactionAsBase64);
-      }
-      checkStarted();
+      // rollback might be a long running task if dealing with a large transaction
+      // ensuring a single task just in case
+      try (AutoCloseable lock = server.managementLock()) {
+         if (AuditLogger.isBaseLoggingEnabled()) {
+            AuditLogger.rollbackPreparedTransaction(this.server, transactionAsBase64);
+         }
+         checkStarted();
 
-      clearIO();
-      try {
+         clearIO();
+         try {
 
-         List<Xid> xids = resourceManager.getPreparedTransactions();
+            List<Xid> xids = resourceManager.getPreparedTransactions();
 
-         for (Xid xid : xids) {
-            if (XidImpl.toBase64String(xid).equals(transactionAsBase64)) {
-               Transaction transaction = resourceManager.removeTransaction(xid, null);
-               transaction.rollback();
-               long recordID = server.getStorageManager().storeHeuristicCompletion(xid, false);
-               server.getStorageManager().waitOnOperations();
-               resourceManager.putHeuristicCompletion(recordID, xid, false);
-               return true;
+            for (Xid xid : xids) {
+               if (XidImpl.toBase64String(xid).equals(transactionAsBase64)) {
+                  Transaction transaction = resourceManager.removeTransaction(xid, null);
+                  transaction.rollback();
+                  long recordID = server.getStorageManager().storeHeuristicCompletion(xid, false);
+                  server.getStorageManager().waitOnOperations();
+                  resourceManager.putHeuristicCompletion(recordID, xid, false);
+                  return true;
+               }
             }
+            return false;
+         } finally {
+            blockOnIO();
          }
-         return false;
-      } finally {
-         blockOnIO();
       }
    }
 
@@ -2278,149 +2267,170 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
 
    @Override
    public boolean closeConsumerConnectionsForAddress(final String address) {
-      if (AuditLogger.isBaseLoggingEnabled()) {
-         AuditLogger.closeConsumerConnectionsForAddress(this.server, address);
-      }
-      boolean closed = false;
-      checkStarted();
+      // this could be a long running task, ensuring a single task
+      try (AutoCloseable lock = server.managementLock()) {
+         if (AuditLogger.isBaseLoggingEnabled()) {
+            AuditLogger.closeConsumerConnectionsForAddress(this.server, address);
+         }
+         boolean closed = false;
+         checkStarted();
 
-      clearIO();
-      try {
-         for (Binding binding : postOffice.getMatchingBindings(SimpleString.toSimpleString(address))) {
-            if (binding instanceof LocalQueueBinding) {
-               Queue queue = ((LocalQueueBinding) binding).getQueue();
-               for (Consumer consumer : queue.getConsumers()) {
-                  if (consumer instanceof ServerConsumer) {
-                     ServerConsumer serverConsumer = (ServerConsumer) consumer;
-                     RemotingConnection connection = null;
-
-                     for (RemotingConnection potentialConnection : remotingService.getConnections()) {
-                        if (potentialConnection.getID().toString().equals(serverConsumer.getConnectionID())) {
-                           connection = potentialConnection;
+         clearIO();
+         try {
+            for (Binding binding : postOffice.getMatchingBindings(SimpleString.toSimpleString(address))) {
+               if (binding instanceof LocalQueueBinding) {
+                  Queue queue = ((LocalQueueBinding) binding).getQueue();
+                  for (Consumer consumer : queue.getConsumers()) {
+                     if (consumer instanceof ServerConsumer) {
+                        ServerConsumer serverConsumer = (ServerConsumer) consumer;
+                        RemotingConnection connection = null;
+
+                        for (RemotingConnection potentialConnection : remotingService.getConnections()) {
+                           if (potentialConnection.getID().toString().equals(serverConsumer.getConnectionID())) {
+                              connection = potentialConnection;
+                           }
                         }
-                     }
 
-                     if (connection != null) {
-                        remotingService.removeConnection(connection.getID());
-                        connection.fail(ActiveMQMessageBundle.BUNDLE.consumerConnectionsClosedByManagement(address));
-                        closed = true;
+                        if (connection != null) {
+                           remotingService.removeConnection(connection.getID());
+                           connection.fail(ActiveMQMessageBundle.BUNDLE.consumerConnectionsClosedByManagement(address));
+                           closed = true;
+                        }
                      }
                   }
                }
             }
+         } catch (Exception e) {
+            ActiveMQServerLogger.LOGGER.failedToCloseConsumerConnectionsForAddress(address, e);
+         } finally {
+            blockOnIO();
          }
-      } catch (Exception e) {
-         ActiveMQServerLogger.LOGGER.failedToCloseConsumerConnectionsForAddress(address, e);
-      } finally {
-         blockOnIO();
+         return closed;
+      } catch (Throwable e) {
+         throw new RuntimeException(e.getMessage(), e);
       }
-      return closed;
    }
 
    @Override
    public boolean closeConnectionsForUser(final String userName) {
-      if (AuditLogger.isBaseLoggingEnabled()) {
-         AuditLogger.closeConnectionsForUser(this.server, userName);
-      }
-      boolean closed = false;
-      checkStarted();
+      // possibly a long running task
+      try (AutoCloseable lock = server.managementLock()) {
+         if (AuditLogger.isBaseLoggingEnabled()) {
+            AuditLogger.closeConnectionsForUser(this.server, userName);
+         }
+         boolean closed = false;
+         checkStarted();
 
-      clearIO();
-      try {
-         for (ServerSession serverSession : server.getSessions()) {
-            if (serverSession.getUsername() != null && serverSession.getUsername().equals(userName)) {
-               RemotingConnection connection = null;
+         clearIO();
+         try {
+            for (ServerSession serverSession : server.getSessions()) {
+               if (serverSession.getUsername() != null && serverSession.getUsername().equals(userName)) {
+                  RemotingConnection connection = null;
 
-               for (RemotingConnection potentialConnection : remotingService.getConnections()) {
-                  if (potentialConnection.getID().toString().equals(serverSession.getConnectionID().toString())) {
-                     connection = potentialConnection;
+                  for (RemotingConnection potentialConnection : remotingService.getConnections()) {
+                     if (potentialConnection.getID().toString().equals(serverSession.getConnectionID().toString())) {
+                        connection = potentialConnection;
+                     }
                   }
-               }
 
-               if (connection != null) {
-                  remotingService.removeConnection(connection.getID());
-                  connection.fail(ActiveMQMessageBundle.BUNDLE.connectionsForUserClosedByManagement(userName));
-                  closed = true;
+                  if (connection != null) {
+                     remotingService.removeConnection(connection.getID());
+                     connection.fail(ActiveMQMessageBundle.BUNDLE.connectionsForUserClosedByManagement(userName));
+                     closed = true;
+                  }
                }
             }
+         } finally {
+            blockOnIO();
          }
-      } finally {
-         blockOnIO();
+         return closed;
+      } catch (Throwable e) {
+         throw new RuntimeException(e.getMessage(), e);
       }
-      return closed;
    }
 
    @Override
    public boolean closeConnectionWithID(final String ID) {
-      if (AuditLogger.isBaseLoggingEnabled()) {
-         AuditLogger.closeConnectionWithID(this.server, ID);
-      }
-      checkStarted();
+      // possibly a long running task
+      try (AutoCloseable lock = server.managementLock()) {
+         if (AuditLogger.isBaseLoggingEnabled()) {
+            AuditLogger.closeConnectionWithID(this.server, ID);
+         }
+         checkStarted();
 
-      clearIO();
-      try {
-         for (RemotingConnection connection : remotingService.getConnections()) {
-            if (connection.getID().toString().equals(ID)) {
-               remotingService.removeConnection(connection.getID());
-               connection.fail(ActiveMQMessageBundle.BUNDLE.connectionWithIDClosedByManagement(ID));
-               return true;
+         clearIO();
+         try {
+            for (RemotingConnection connection : remotingService.getConnections()) {
+               if (connection.getID().toString().equals(ID)) {
+                  remotingService.removeConnection(connection.getID());
+                  connection.fail(ActiveMQMessageBundle.BUNDLE.connectionWithIDClosedByManagement(ID));
+                  return true;
+               }
             }
+         } finally {
+            blockOnIO();
          }
-      } finally {
-         blockOnIO();
+         return false;
+      } catch (Throwable e) {
+         throw new RuntimeException(e.getMessage(), e);
       }
-      return false;
    }
 
    @Override
    public boolean closeSessionWithID(final String connectionID, final String ID) throws Exception {
-      if (AuditLogger.isBaseLoggingEnabled()) {
-         AuditLogger.closeSessionWithID(this.server, connectionID, ID);
-      }
-      checkStarted();
+      // possibly a long running task
+      try (AutoCloseable lock = server.managementLock()) {
+         if (AuditLogger.isBaseLoggingEnabled()) {
+            AuditLogger.closeSessionWithID(this.server, connectionID, ID);
+         }
+         checkStarted();
 
-      clearIO();
-      try {
-         List<ServerSession> sessions = server.getSessions(connectionID);
-         for (ServerSession session : sessions) {
-            if (session.getName().equals(ID.toString())) {
-               session.close(true);
-               return true;
+         clearIO();
+         try {
+            List<ServerSession> sessions = server.getSessions(connectionID);
+            for (ServerSession session : sessions) {
+               if (session.getName().equals(ID.toString())) {
+                  session.close(true);
+                  return true;
+               }
             }
-         }
 
-      } finally {
-         blockOnIO();
+         } finally {
+            blockOnIO();
+         }
+         return false;
       }
-      return false;
    }
 
    @Override
    public boolean closeConsumerWithID(final String sessionID, final String ID) throws Exception {
-      if (AuditLogger.isBaseLoggingEnabled()) {
-         AuditLogger.closeConsumerWithID(this.server, sessionID, ID);
-      }
-      checkStarted();
+      // possibly a long running task
+      try (AutoCloseable lock = server.managementLock()) {
+         if (AuditLogger.isBaseLoggingEnabled()) {
+            AuditLogger.closeConsumerWithID(this.server, sessionID, ID);
+         }
+         checkStarted();
 
-      clearIO();
-      try {
-         Set<ServerSession> sessions = server.getSessions();
-         for (ServerSession session : sessions) {
-            if (session.getName().equals(sessionID.toString())) {
-               Set<ServerConsumer> serverConsumers = session.getServerConsumers();
-               for (ServerConsumer serverConsumer : serverConsumers) {
-                  if (serverConsumer.sequentialID() == Long.valueOf(ID)) {
-                     serverConsumer.disconnect();
-                     return true;
+         clearIO();
+         try {
+            Set<ServerSession> sessions = server.getSessions();
+            for (ServerSession session : sessions) {
+               if (session.getName().equals(sessionID.toString())) {
+                  Set<ServerConsumer> serverConsumers = session.getServerConsumers();
+                  for (ServerConsumer serverConsumer : serverConsumers) {
+                     if (serverConsumer.sequentialID() == Long.valueOf(ID)) {
+                        serverConsumer.disconnect();
+                        return true;
+                     }
                   }
                }
             }
-         }
 
-      } finally {
-         blockOnIO();
+         } finally {
+            blockOnIO();
+         }
+         return false;
       }
-      return false;
    }
 
    @Override
@@ -4116,27 +4126,29 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
 
    @Override
    public void scaleDown(String connector) throws Exception {
-      if (AuditLogger.isBaseLoggingEnabled()) {
-         AuditLogger.scaleDown(this.server, connector);
-      }
-      checkStarted();
+      try (AutoCloseable lock = server.managementLock()) {
+         if (AuditLogger.isBaseLoggingEnabled()) {
+            AuditLogger.scaleDown(this.server, connector);
+         }
+         checkStarted();
 
-      clearIO();
-      HAPolicy haPolicy = server.getHAPolicy();
-      if (haPolicy instanceof LiveOnlyPolicy) {
-         LiveOnlyPolicy liveOnlyPolicy = (LiveOnlyPolicy) haPolicy;
+         clearIO();
+         HAPolicy haPolicy = server.getHAPolicy();
+         if (haPolicy instanceof LiveOnlyPolicy) {
+            LiveOnlyPolicy liveOnlyPolicy = (LiveOnlyPolicy) haPolicy;
 
-         if (liveOnlyPolicy.getScaleDownPolicy() == null) {
-            liveOnlyPolicy.setScaleDownPolicy(new ScaleDownPolicy());
-         }
+            if (liveOnlyPolicy.getScaleDownPolicy() == null) {
+               liveOnlyPolicy.setScaleDownPolicy(new ScaleDownPolicy());
+            }
 
-         liveOnlyPolicy.getScaleDownPolicy().setEnabled(true);
+            liveOnlyPolicy.getScaleDownPolicy().setEnabled(true);
 
-         if (connector != null) {
-            liveOnlyPolicy.getScaleDownPolicy().getConnectors().add(0, connector);
-         }
+            if (connector != null) {
+               liveOnlyPolicy.getScaleDownPolicy().getConnectors().add(0, connector);
+            }
 
-         server.fail(true);
+            server.fail(true);
+         }
       }
    }
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java
index d88051e392..9044936d63 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java
@@ -431,10 +431,15 @@ public class AddressControlImpl extends AbstractControl implements AddressContro
 
    @Override
    public long getMessageCount() {
-      if (AuditLogger.isBaseLoggingEnabled()) {
-         AuditLogger.getMessageCount(this.addressInfo);
+      // prevent parallel tasks running
+      try (AutoCloseable lock = server.managementLock()) {
+         if (AuditLogger.isBaseLoggingEnabled()) {
+            AuditLogger.getMessageCount(this.addressInfo);
+         }
+         return getMessageCount(DurabilityType.ALL);
+      } catch (Exception e) {
+         throw new RuntimeException(e.getMessage(), e);
       }
-      return getMessageCount(DurabilityType.ALL);
    }
 
    @Override
@@ -472,14 +477,17 @@ public class AddressControlImpl extends AbstractControl implements AddressContro
                              final String user,
                              final String password,
                              boolean createMessageId) throws Exception {
-      if (AuditLogger.isBaseLoggingEnabled()) {
-         AuditLogger.sendMessageThroughManagement(this, headers, type, body, durable, user, "****");
-      }
-      try {
-         return sendMessage(addressInfo.getName(), server, headers, type, body, durable, user, password, createMessageId);
-      } catch (Exception e) {
-         e.printStackTrace();
-         throw new IllegalStateException(e.getMessage());
+      // prevent parallel tasks running
+      try (AutoCloseable lock = server.managementLock()) {
+         if (AuditLogger.isBaseLoggingEnabled()) {
+            AuditLogger.sendMessageThroughManagement(this, headers, type, body, durable, user, "****");
+         }
+         try {
+            return sendMessage(addressInfo.getName(), server, headers, type, body, durable, user, password, createMessageId);
+         } catch (Exception e) {
+            e.printStackTrace();
+            throw new IllegalStateException(e.getMessage());
+         }
       }
    }
 
@@ -585,20 +593,25 @@ public class AddressControlImpl extends AbstractControl implements AddressContro
 
    @Override
    public boolean clearDuplicateIdCache() {
-      if (AuditLogger.isBaseLoggingEnabled()) {
-         AuditLogger.clearDuplicateIdCache(this.addressInfo);
-      }
-      DuplicateIDCache cache = ((PostOfficeImpl)server.getPostOffice()).getDuplicateIDCaches().get(addressInfo.getName());
-      try {
-         if (cache != null) {
-            cache.clear();
-            return true;
+      // prevent parallel tasks running
+      try (AutoCloseable lock = server.managementLock()) {
+         if (AuditLogger.isBaseLoggingEnabled()) {
+            AuditLogger.clearDuplicateIdCache(this.addressInfo);
+         }
+         DuplicateIDCache cache = ((PostOfficeImpl) server.getPostOffice()).getDuplicateIDCaches().get(addressInfo.getName());
+         try {
+            if (cache != null) {
+               cache.clear();
+               return true;
+            }
+         } catch (Exception e) {
+            logger.debug("Failed to clear duplicate ID cache", e);
          }
+
+         return false;
       } catch (Exception e) {
-         logger.debug("Failed to clear duplicate ID cache", e);
+         throw new RuntimeException(e.getMessage(), e);
       }
-
-      return false;
    }
 
    @Override
@@ -627,37 +640,41 @@ public class AddressControlImpl extends AbstractControl implements AddressContro
 
    @Override
    public long purge() throws Exception {
-      if (AuditLogger.isBaseLoggingEnabled()) {
-         AuditLogger.purge(this.addressInfo);
-      }
-      clearIO();
-      long totalMsgs = 0;
-      try {
-         Bindings bindings = server.getPostOffice().lookupBindingsForAddress(addressInfo.getName());
-         if (bindings != null) {
-            for (Binding binding : bindings.getBindings()) {
-               if (binding instanceof QueueBinding) {
-                  totalMsgs += ((QueueBinding) binding).getQueue().deleteMatchingReferences(QueueImpl.DEFAULT_FLUSH_LIMIT, null, AckReason.KILLED);
+      // prevent parallel tasks running
+      try (AutoCloseable lock = server.managementLock()) {
+         if (AuditLogger.isBaseLoggingEnabled()) {
+            AuditLogger.purge(this.addressInfo);
+         }
+         clearIO();
+         long totalMsgs = 0;
+         try {
+            Bindings bindings = server.getPostOffice().lookupBindingsForAddress(addressInfo.getName());
+            if (bindings != null) {
+               for (Binding binding : bindings.getBindings()) {
+                  if (binding instanceof QueueBinding) {
+                     totalMsgs += ((QueueBinding) binding).getQueue().deleteMatchingReferences(QueueImpl.DEFAULT_FLUSH_LIMIT, null, AckReason.KILLED);
+                  }
                }
             }
+            if (AuditLogger.isResourceLoggingEnabled()) {
+               AuditLogger.purgeAddressSuccess(addressInfo.getName().toString());
+            }
+         } catch (Throwable t) {
+            if (AuditLogger.isResourceLoggingEnabled()) {
+               AuditLogger.purgeAddressFailure(addressInfo.getName().toString());
+            }
+            throw new IllegalStateException(t.getMessage());
+         } finally {
+            blockOnIO();
          }
-         if (AuditLogger.isResourceLoggingEnabled()) {
-            AuditLogger.purgeAddressSuccess(addressInfo.getName().toString());
-         }
-      } catch (Throwable t) {
-         if (AuditLogger.isResourceLoggingEnabled()) {
-            AuditLogger.purgeAddressFailure(addressInfo.getName().toString());
-         }
-         throw new IllegalStateException(t.getMessage());
-      } finally {
-         blockOnIO();
-      }
 
-      return totalMsgs;
+         return totalMsgs;
+      }
    }
 
    @Override
    public void replay(String target, String filter) throws Exception {
+      // server.replay is already calling the managementLock, no need to do it here
       server.replay(null, null, this.getAddress(), target, filter);
    }
 
@@ -669,22 +686,25 @@ public class AddressControlImpl extends AbstractControl implements AddressContro
       Date startScanDate = format.parse(startScan);
       Date endScanDate = format.parse(endScan);
 
+      // server.replay is already calling the managementLock, no need to do it here
       server.replay(startScanDate, endScanDate, this.getAddress(), target, filter);
    }
 
 
    private long getMessageCount(final DurabilityType durability) {
-      long count = 0;
-      for (String queueName : getQueueNames()) {
-         Queue queue = server.locateQueue(queueName);
-         if (queue != null &&
-            (durability == DurabilityType.ALL ||
-            (durability == DurabilityType.DURABLE && queue.isDurable()) ||
-            (durability == DurabilityType.NON_DURABLE && !queue.isDurable()))) {
-            count += queue.getMessageCount();
+      // prevent parallel tasks running
+      try (AutoCloseable lock = server.managementLock()) {
+         long count = 0;
+         for (String queueName : getQueueNames()) {
+            Queue queue = server.locateQueue(queueName);
+            if (queue != null && (durability == DurabilityType.ALL || (durability == DurabilityType.DURABLE && queue.isDurable()) || (durability == DurabilityType.NON_DURABLE && !queue.isDurable()))) {
+               count += queue.getMessageCount();
+            }
          }
+         return count;
+      } catch (Exception e) {
+         throw new RuntimeException(e.getMessage(), e);
       }
-      return count;
    }
 
    private void checkStarted() {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
index 1fbefc3847..9bced82671 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
@@ -740,32 +740,38 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
 
    @Override
    public Map<String, Object>[] listScheduledMessages() throws Exception {
-      if (AuditLogger.isBaseLoggingEnabled()) {
-         AuditLogger.listScheduledMessages(queue);
-      }
-      checkStarted();
+      // it could be a long running task
+      try (AutoCloseable lock = server.managementLock()) {
+         if (AuditLogger.isBaseLoggingEnabled()) {
+            AuditLogger.listScheduledMessages(queue);
+         }
+         checkStarted();
 
-      clearIO();
-      try {
-         List<MessageReference> refs = queue.getScheduledMessages();
-         return convertMessagesToMaps(refs);
-      } finally {
-         blockOnIO();
+         clearIO();
+         try {
+            List<MessageReference> refs = queue.getScheduledMessages();
+            return convertMessagesToMaps(refs);
+         } finally {
+            blockOnIO();
+         }
       }
    }
 
    @Override
    public String listScheduledMessagesAsJSON() throws Exception {
-      if (AuditLogger.isBaseLoggingEnabled()) {
-         AuditLogger.listScheduledMessagesAsJSON(queue);
-      }
-      checkStarted();
+      // it could be a long running task
+      try (AutoCloseable lock = server.managementLock()) {
+         if (AuditLogger.isBaseLoggingEnabled()) {
+            AuditLogger.listScheduledMessagesAsJSON(queue);
+         }
+         checkStarted();
 
-      clearIO();
-      try {
-         return QueueControlImpl.toJSON(listScheduledMessages());
-      } finally {
-         blockOnIO();
+         clearIO();
+         try {
+            return QueueControlImpl.toJSON(listScheduledMessages());
+         } finally {
+            blockOnIO();
+         }
       }
    }
 
@@ -969,33 +975,36 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
    }
 
    private Map<String, Long> internalCountMessages(final String filterStr, final String groupByPropertyStr) throws Exception {
-      checkStarted();
+      //  long running task
+      try (AutoCloseable lock = server.managementLock()) {
+         checkStarted();
 
-      clearIO();
+         clearIO();
 
-      Map<String, Long> result = new HashMap<>();
-      try {
-         Filter filter = FilterImpl.createFilter(filterStr);
-         SimpleString groupByProperty = SimpleString.toSimpleString(groupByPropertyStr);
-         if (filter == null && groupByProperty == null) {
-            result.put(null, getMessageCount());
-         } else {
-            final int limit = addressSettingsRepository.getMatch(address).getManagementBrowsePageSize();
-            int count = 0;
-            try (LinkedListIterator<MessageReference> iterator = queue.browserIterator()) {
-               try {
-                  while (iterator.hasNext() && count++ < limit) {
-                     Message message = iterator.next().getMessage();
-                     internalComputeMessage(result, filter, groupByProperty, message);
+         Map<String, Long> result = new HashMap<>();
+         try {
+            Filter filter = FilterImpl.createFilter(filterStr);
+            SimpleString groupByProperty = SimpleString.toSimpleString(groupByPropertyStr);
+            if (filter == null && groupByProperty == null) {
+               result.put(null, getMessageCount());
+            } else {
+               final int limit = addressSettingsRepository.getMatch(address).getManagementBrowsePageSize();
+               int count = 0;
+               try (LinkedListIterator<MessageReference> iterator = queue.browserIterator()) {
+                  try {
+                     while (iterator.hasNext() && count++ < limit) {
+                        Message message = iterator.next().getMessage();
+                        internalComputeMessage(result, filter, groupByProperty, message);
+                     }
+                  } catch (NoSuchElementException ignored) {
+                     // this could happen through paging browsing
                   }
-               } catch (NoSuchElementException ignored) {
-                  // this could happen through paging browsing
                }
             }
+            return result;
+         } finally {
+            blockOnIO();
          }
-         return result;
-      } finally {
-         blockOnIO();
       }
    }
 
@@ -1019,26 +1028,26 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
    }
 
    private Map<String, Long> internalCountDeliveryMessages(final String filterStr, final String groupByPropertyStr) throws Exception {
-      checkStarted();
+      // it could be a long running task
+      try (AutoCloseable lock = server.managementLock()) {
+         checkStarted();
 
-      clearIO();
+         clearIO();
 
-      Map<String, Long> result = new HashMap<>();
-      try {
-         Filter filter = FilterImpl.createFilter(filterStr);
-         SimpleString groupByProperty = SimpleString.toSimpleString(groupByPropertyStr);
-         if (filter == null && groupByProperty == null) {
-            result.put(null, Long.valueOf(getDeliveringCount()));
-         } else {
-            Map<String, List<MessageReference>> deliveringMessages = queue.getDeliveringMessages();
-            deliveringMessages.forEach((s, messageReferenceList) ->
-                            messageReferenceList.forEach(messageReference ->
-                                    internalComputeMessage(result, filter, groupByProperty, messageReference.getMessage())
-                            ));
+         Map<String, Long> result = new HashMap<>();
+         try {
+            Filter filter = FilterImpl.createFilter(filterStr);
+            SimpleString groupByProperty = SimpleString.toSimpleString(groupByPropertyStr);
+            if (filter == null && groupByProperty == null) {
+               result.put(null, Long.valueOf(getDeliveringCount()));
+            } else {
+               Map<String, List<MessageReference>> deliveringMessages = queue.getDeliveringMessages();
+               deliveringMessages.forEach((s, messageReferenceList) -> messageReferenceList.forEach(messageReference -> internalComputeMessage(result, filter, groupByProperty, messageReference.getMessage())));
+            }
+            return result;
+         } finally {
+            blockOnIO();
          }
-         return result;
-      } finally {
-         blockOnIO();
       }
    }
 
@@ -1057,18 +1066,21 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
 
    @Override
    public boolean removeMessage(final long messageID) throws Exception {
-      if (AuditLogger.isBaseLoggingEnabled()) {
-         AuditLogger.removeMessage(queue, messageID);
-      }
-      checkStarted();
+      // this is a critical task, we need to prevent parallel tasks running
+      try (AutoCloseable lock = server.managementLock()) {
+         if (AuditLogger.isBaseLoggingEnabled()) {
+            AuditLogger.removeMessage(queue, messageID);
+         }
+         checkStarted();
 
-      clearIO();
-      try {
-         return queue.deleteReference(messageID);
-      } catch (ActiveMQException e) {
-         throw new IllegalStateException(e.getMessage());
-      } finally {
-         blockOnIO();
+         clearIO();
+         try {
+            return queue.deleteReference(messageID);
+         } catch (ActiveMQException e) {
+            throw new IllegalStateException(e.getMessage());
+         } finally {
+            blockOnIO();
+         }
       }
    }
 
@@ -1079,30 +1091,33 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
 
    @Override
    public int removeMessages(final int flushLimit, final String filterStr) throws Exception {
-      if (AuditLogger.isBaseLoggingEnabled()) {
-         AuditLogger.removeMessages(queue, flushLimit, filterStr);
-      }
-      checkStarted();
-
-      clearIO();
-      try {
-         Filter filter = FilterImpl.createFilter(filterStr);
+      // this is a critical task, we need to prevent parallel tasks running
+      try (AutoCloseable lock = server.managementLock()) {
+         if (AuditLogger.isBaseLoggingEnabled()) {
+            AuditLogger.removeMessages(queue, flushLimit, filterStr);
+         }
+         checkStarted();
 
-         int removed = 0;
+         clearIO();
          try {
-            removed = queue.deleteMatchingReferences(flushLimit, filter);
-            if (AuditLogger.isResourceLoggingEnabled()) {
-               AuditLogger.removeMessagesSuccess(removed, queue.getName().toString());
-            }
-         } catch (Exception e) {
-            if (AuditLogger.isResourceLoggingEnabled()) {
-               AuditLogger.removeMessagesFailure(queue.getName().toString());
+            Filter filter = FilterImpl.createFilter(filterStr);
+
+            int removed = 0;
+            try {
+               removed = queue.deleteMatchingReferences(flushLimit, filter);
+               if (AuditLogger.isResourceLoggingEnabled()) {
+                  AuditLogger.removeMessagesSuccess(removed, queue.getName().toString());
+               }
+            } catch (Exception e) {
+               if (AuditLogger.isResourceLoggingEnabled()) {
+                  AuditLogger.removeMessagesFailure(queue.getName().toString());
+               }
+               throw e;
             }
-            throw e;
+            return removed;
+         } finally {
+            blockOnIO();
          }
-         return removed;
-      } finally {
-         blockOnIO();
       }
    }
 
@@ -1113,87 +1128,99 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
 
    @Override
    public boolean expireMessage(final long messageID) throws Exception {
-      if (AuditLogger.isBaseLoggingEnabled()) {
-         AuditLogger.expireMessage(queue, messageID);
-      }
-      checkStarted();
+      // this is a critical task, we need to prevent parallel tasks running
+      try (AutoCloseable lock = server.managementLock()) {
+         if (AuditLogger.isBaseLoggingEnabled()) {
+            AuditLogger.expireMessage(queue, messageID);
+         }
+         checkStarted();
 
-      clearIO();
-      try {
-         return queue.expireReference(messageID);
-      } finally {
-         blockOnIO();
+         clearIO();
+         try {
+            return queue.expireReference(messageID);
+         } finally {
+            blockOnIO();
+         }
       }
    }
 
    @Override
    public int expireMessages(final String filterStr) throws Exception {
-      if (AuditLogger.isBaseLoggingEnabled()) {
-         AuditLogger.expireMessages(queue, filterStr);
-      }
-      checkStarted();
+      // this is a critical task, we need to prevent parallel tasks running
+      try (AutoCloseable lock = server.managementLock()) {
+         if (AuditLogger.isBaseLoggingEnabled()) {
+            AuditLogger.expireMessages(queue, filterStr);
+         }
+         checkStarted();
 
-      clearIO();
-      try {
-         Filter filter = FilterImpl.createFilter(filterStr);
-         return queue.expireReferences(filter);
-      } catch (ActiveMQException e) {
-         throw new IllegalStateException(e.getMessage());
-      } finally {
-         blockOnIO();
+         clearIO();
+         try {
+            Filter filter = FilterImpl.createFilter(filterStr);
+            return queue.expireReferences(filter);
+         } catch (ActiveMQException e) {
+            throw new IllegalStateException(e.getMessage());
+         } finally {
+            blockOnIO();
+         }
       }
    }
 
    @Override
    public boolean retryMessage(final long messageID) throws Exception {
-      if (AuditLogger.isBaseLoggingEnabled()) {
-         AuditLogger.retryMessage(queue, messageID);
-      }
+      // this is a critical task, we need to prevent parallel tasks running
+      try (AutoCloseable lock = server.managementLock()) {
+         if (AuditLogger.isBaseLoggingEnabled()) {
+            AuditLogger.retryMessage(queue, messageID);
+         }
 
-      checkStarted();
-      clearIO();
+         checkStarted();
+         clearIO();
 
-      try {
-         Filter singleMessageFilter = new Filter() {
-            @Override
-            public boolean match(Message message) {
-               return message.getMessageID() == messageID;
-            }
+         try {
+            Filter singleMessageFilter = new Filter() {
+               @Override
+               public boolean match(Message message) {
+                  return message.getMessageID() == messageID;
+               }
 
-            @Override
-            public boolean match(Map<String, String> map) {
-               return false;
-            }
+               @Override
+               public boolean match(Map<String, String> map) {
+                  return false;
+               }
 
-            @Override
-            public boolean match(Filterable filterable) {
-               return false;
-            }
+               @Override
+               public boolean match(Filterable filterable) {
+                  return false;
+               }
 
-            @Override
-            public SimpleString getFilterString() {
-               return new SimpleString("custom filter for MESSAGEID= messageID");
-            }
-         };
+               @Override
+               public SimpleString getFilterString() {
+                  return new SimpleString("custom filter for MESSAGEID= messageID");
+               }
+            };
 
-         return queue.retryMessages(singleMessageFilter) > 0;
-      } finally {
-         blockOnIO();
+            return queue.retryMessages(singleMessageFilter) > 0;
+         } finally {
+            blockOnIO();
+         }
       }
    }
 
    @Override
    public int retryMessages() throws Exception {
-      if (AuditLogger.isBaseLoggingEnabled()) {
-         AuditLogger.retryMessages(queue);
-      }
-      checkStarted();
-      clearIO();
+      // this is a critical task, we need to prevent parallel tasks running
+      try (AutoCloseable lock = server.managementLock()) {
+         if (AuditLogger.isBaseLoggingEnabled()) {
+            AuditLogger.retryMessages(queue);
+         }
+         checkStarted();
+         clearIO();
 
-      try {
-         return queue.retryMessages(null);
-      } finally {
-         blockOnIO();
+         try {
+            return queue.retryMessages(null);
+         } finally {
+            blockOnIO();
+         }
       }
    }
 
@@ -1206,22 +1233,25 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
    public boolean moveMessage(final long messageID,
                               final String otherQueueName,
                               final boolean rejectDuplicates) throws Exception {
-      if (AuditLogger.isBaseLoggingEnabled()) {
-         AuditLogger.moveMessage(queue, messageID, otherQueueName, rejectDuplicates);
-      }
-      checkStarted();
+      // this is a critical task, we need to prevent parallel tasks running
+      try (AutoCloseable lock = server.managementLock()) {
+         if (AuditLogger.isBaseLoggingEnabled()) {
+            AuditLogger.moveMessage(queue, messageID, otherQueueName, rejectDuplicates);
+         }
+         checkStarted();
 
-      clearIO();
-      try {
-         Binding binding = server.getPostOffice().getBinding(new SimpleString(otherQueueName));
+         clearIO();
+         try {
+            Binding binding = server.getPostOffice().getBinding(new SimpleString(otherQueueName));
 
-         if (binding == null) {
-            throw ActiveMQMessageBundle.BUNDLE.noQueueFound(otherQueueName);
-         }
+            if (binding == null) {
+               throw ActiveMQMessageBundle.BUNDLE.noQueueFound(otherQueueName);
+            }
 
-         return queue.moveReference(messageID, binding.getAddress(), binding, rejectDuplicates);
-      } finally {
-         blockOnIO();
+            return queue.moveReference(messageID, binding.getAddress(), binding, rejectDuplicates);
+         } finally {
+            blockOnIO();
+         }
       }
 
    }
@@ -1245,25 +1275,28 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
                            final String otherQueueName,
                            final boolean rejectDuplicates,
                            final int messageCount) throws Exception {
-      if (AuditLogger.isBaseLoggingEnabled()) {
-         AuditLogger.moveMessages(queue, flushLimit, filterStr, otherQueueName, rejectDuplicates, messageCount);
-      }
-      checkStarted();
+      // this is a critical task, we need to prevent parallel tasks running
+      try (AutoCloseable lock = server.managementLock()) {
+         if (AuditLogger.isBaseLoggingEnabled()) {
+            AuditLogger.moveMessages(queue, flushLimit, filterStr, otherQueueName, rejectDuplicates, messageCount);
+         }
+         checkStarted();
 
-      clearIO();
-      try {
-         Filter filter = FilterImpl.createFilter(filterStr);
+         clearIO();
+         try {
+            Filter filter = FilterImpl.createFilter(filterStr);
 
-         Binding binding = server.getPostOffice().getBinding(new SimpleString(otherQueueName));
+            Binding binding = server.getPostOffice().getBinding(new SimpleString(otherQueueName));
 
-         if (binding == null) {
-            throw ActiveMQMessageBundle.BUNDLE.noQueueFound(otherQueueName);
-         }
+            if (binding == null) {
+               throw ActiveMQMessageBundle.BUNDLE.noQueueFound(otherQueueName);
+            }
 
-         int retValue = queue.moveReferences(flushLimit, filter, binding.getAddress(), rejectDuplicates, messageCount, binding);
-         return retValue;
-      } finally {
-         blockOnIO();
+            int retValue = queue.moveReferences(flushLimit, filter, binding.getAddress(), rejectDuplicates, messageCount, binding);
+            return retValue;
+         } finally {
+            blockOnIO();
+         }
       }
 
    }
@@ -1277,18 +1310,21 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
 
    @Override
    public int sendMessagesToDeadLetterAddress(final String filterStr) throws Exception {
-      if (AuditLogger.isBaseLoggingEnabled()) {
-         AuditLogger.sendMessagesToDeadLetterAddress(queue, filterStr);
-      }
-      checkStarted();
+      // this is a critical task, we need to prevent parallel tasks running
+      try (AutoCloseable lock = server.managementLock()) {
+         if (AuditLogger.isBaseLoggingEnabled()) {
+            AuditLogger.sendMessagesToDeadLetterAddress(queue, filterStr);
+         }
+         checkStarted();
 
-      clearIO();
-      try {
-         Filter filter = FilterImpl.createFilter(filterStr);
+         clearIO();
+         try {
+            Filter filter = FilterImpl.createFilter(filterStr);
 
-         return queue.sendMessagesToDeadLetterAddress(filter);
-      } finally {
-         blockOnIO();
+            return queue.sendMessagesToDeadLetterAddress(filter);
+         } finally {
+            blockOnIO();
+         }
       }
    }
 
@@ -1310,35 +1346,41 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
                              final String user,
                              final String password,
                              boolean createMessageId) throws Exception {
-      if (AuditLogger.isBaseLoggingEnabled()) {
-         AuditLogger.sendMessageThroughManagement(queue, headers, type, body, durable, user, "****");
-      }
-      try {
-         String s = sendMessage(queue.getAddress(), server, headers, type, body, durable, user, password, createMessageId, queue.getID());
-         if (AuditLogger.isResourceLoggingEnabled()) {
-            AuditLogger.sendMessageSuccess(queue.getName().toString(), user);
+      // this is a critical task, we need to prevent parallel tasks running
+      try (AutoCloseable lock = server.managementLock()) {
+         if (AuditLogger.isBaseLoggingEnabled()) {
+            AuditLogger.sendMessageThroughManagement(queue, headers, type, body, durable, user, "****");
          }
-         return s;
-      } catch (Exception e) {
-         if (AuditLogger.isResourceLoggingEnabled()) {
-            AuditLogger.sendMessageFailure(queue.getName().toString(), user);
+         try {
+            String s = sendMessage(queue.getAddress(), server, headers, type, body, durable, user, password, createMessageId, queue.getID());
+            if (AuditLogger.isResourceLoggingEnabled()) {
+               AuditLogger.sendMessageSuccess(queue.getName().toString(), user);
+            }
+            return s;
+         } catch (Exception e) {
+            if (AuditLogger.isResourceLoggingEnabled()) {
+               AuditLogger.sendMessageFailure(queue.getName().toString(), user);
+            }
+            throw new IllegalStateException(e.getMessage());
          }
-         throw new IllegalStateException(e.getMessage());
       }
    }
 
    @Override
    public boolean sendMessageToDeadLetterAddress(final long messageID) throws Exception {
-      if (AuditLogger.isBaseLoggingEnabled()) {
-         AuditLogger.sendMessageToDeadLetterAddress(queue, messageID);
-      }
-      checkStarted();
+      // this is a critical task, we need to prevent parallel tasks running
+      try (AutoCloseable lock = server.managementLock()) {
+         if (AuditLogger.isBaseLoggingEnabled()) {
+            AuditLogger.sendMessageToDeadLetterAddress(queue, messageID);
+         }
+         checkStarted();
 
-      clearIO();
-      try {
-         return queue.sendMessageToDeadLetterAddress(messageID);
-      } finally {
-         blockOnIO();
+         clearIO();
+         try {
+            return queue.sendMessageToDeadLetterAddress(messageID);
+         } finally {
+            blockOnIO();
+         }
       }
    }
 
@@ -1554,52 +1596,55 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
 
    @Override
    public CompositeData[] browse(int page, int pageSize, String filter) throws Exception {
-      if (AuditLogger.isBaseLoggingEnabled()) {
-         AuditLogger.browse(queue, page, pageSize);
-      }
-      checkStarted();
+      // this is a critical task, we need to prevent parallel tasks running
+      try (AutoCloseable lock = server.managementLock()) {
+         if (AuditLogger.isBaseLoggingEnabled()) {
+            AuditLogger.browse(queue, page, pageSize);
+         }
+         checkStarted();
 
-      clearIO();
-      try {
-         long index = 0;
-         long start = (long) (page - 1) * pageSize;
-         long end = Math.min(page * pageSize, queue.getMessageCount());
+         clearIO();
+         try {
+            long index = 0;
+            long start = (long) (page - 1) * pageSize;
+            long end = Math.min(page * pageSize, queue.getMessageCount());
 
-         ArrayList<CompositeData> c = new ArrayList<>();
-         Filter thefilter = FilterImpl.createFilter(filter);
+            ArrayList<CompositeData> c = new ArrayList<>();
+            Filter thefilter = FilterImpl.createFilter(filter);
 
-         final int attributeSizeLimit = addressSettingsRepository.getMatch(address).getManagementMessageAttributeSizeLimit();
-         try (LinkedListIterator<MessageReference> iterator = queue.browserIterator()) {
-            try {
-               while (iterator.hasNext() && index < end) {
-                  MessageReference ref = iterator.next();
-                  if (thefilter == null || thefilter.match(ref.getMessage())) {
-                     if (index >= start) {
-                        c.add(ref.getMessage().toCompositeData(attributeSizeLimit, ref.getDeliveryCount()));
+            final int attributeSizeLimit = addressSettingsRepository.getMatch(address).getManagementMessageAttributeSizeLimit();
+            try (LinkedListIterator<MessageReference> iterator = queue.browserIterator()) {
+               try {
+                  while (iterator.hasNext() && index < end) {
+                     MessageReference ref = iterator.next();
+                     if (thefilter == null || thefilter.match(ref.getMessage())) {
+                        if (index >= start) {
+                           c.add(ref.getMessage().toCompositeData(attributeSizeLimit, ref.getDeliveryCount()));
+                        }
+                        //we only increase the index if we add a message, otherwise we could stop before we get to a filtered message
+                        index++;
                      }
-                     //we only increase the index if we add a message, otherwise we could stop before we get to a filtered message
-                     index++;
                   }
+               } catch (NoSuchElementException ignored) {
+                  // this could happen through paging browsing
                }
-            } catch (NoSuchElementException ignored) {
-               // this could happen through paging browsing
-            }
 
-            CompositeData[] rc = new CompositeData[c.size()];
-            c.toArray(rc);
+               CompositeData[] rc = new CompositeData[c.size()];
+               c.toArray(rc);
+               if (AuditLogger.isResourceLoggingEnabled()) {
+                  AuditLogger.browseMessagesSuccess(queue.getName().toString(), c.size());
+               }
+               return rc;
+            }
+         } catch (Exception e) {
+            logger.warn(e.getMessage(), e);
             if (AuditLogger.isResourceLoggingEnabled()) {
-               AuditLogger.browseMessagesSuccess(queue.getName().toString(), c.size());
+               AuditLogger.browseMessagesFailure(queue.getName().toString());
             }
-            return rc;
-         }
-      } catch (Exception e) {
-         logger.warn(e.getMessage(), e);
-         if (AuditLogger.isResourceLoggingEnabled()) {
-            AuditLogger.browseMessagesFailure(queue.getName().toString());
+            throw new IllegalStateException(e.getMessage());
+         } finally {
+            blockOnIO();
          }
-         throw new IllegalStateException(e.getMessage());
-      } finally {
-         blockOnIO();
       }
    }
 
@@ -1609,46 +1654,49 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
    }
    @Override
    public CompositeData[] browse(String filter) throws Exception {
-      if (AuditLogger.isBaseLoggingEnabled()) {
-         AuditLogger.browse(queue, filter);
-      }
-      checkStarted();
+      // this is a critical task, we need to prevent parallel tasks running
+      try (AutoCloseable lock = server.managementLock()) {
+         if (AuditLogger.isBaseLoggingEnabled()) {
+            AuditLogger.browse(queue, filter);
+         }
+         checkStarted();
 
-      clearIO();
-      try {
-         final AddressSettings addressSettings = addressSettingsRepository.getMatch(address);
-         final int attributeSizeLimit = addressSettings.getManagementMessageAttributeSizeLimit();
-         final int limit = addressSettings.getManagementBrowsePageSize();
-         int currentPageSize = 0;
-         ArrayList<CompositeData> c = new ArrayList<>();
-         Filter thefilter = FilterImpl.createFilter(filter);
-         try (LinkedListIterator<MessageReference> iterator = queue.browserIterator()) {
-            try {
-               while (iterator.hasNext() && currentPageSize++ < limit) {
-                  MessageReference ref = iterator.next();
-                  if (thefilter == null || thefilter.match(ref.getMessage())) {
-                     c.add(ref.getMessage().toCompositeData(attributeSizeLimit, ref.getDeliveryCount()));
+         clearIO();
+         try {
+            final AddressSettings addressSettings = addressSettingsRepository.getMatch(address);
+            final int attributeSizeLimit = addressSettings.getManagementMessageAttributeSizeLimit();
+            final int limit = addressSettings.getManagementBrowsePageSize();
+            int currentPageSize = 0;
+            ArrayList<CompositeData> c = new ArrayList<>();
+            Filter thefilter = FilterImpl.createFilter(filter);
+            try (LinkedListIterator<MessageReference> iterator = queue.browserIterator()) {
+               try {
+                  while (iterator.hasNext() && currentPageSize++ < limit) {
+                     MessageReference ref = iterator.next();
+                     if (thefilter == null || thefilter.match(ref.getMessage())) {
+                        c.add(ref.getMessage().toCompositeData(attributeSizeLimit, ref.getDeliveryCount()));
 
+                     }
                   }
+               } catch (NoSuchElementException ignored) {
+                  // this could happen through paging browsing
                }
-            } catch (NoSuchElementException ignored) {
-               // this could happen through paging browsing
-            }
 
-            CompositeData[] rc = new CompositeData[c.size()];
-            c.toArray(rc);
+               CompositeData[] rc = new CompositeData[c.size()];
+               c.toArray(rc);
+               if (AuditLogger.isResourceLoggingEnabled()) {
+                  AuditLogger.browseMessagesSuccess(queue.getName().toString(), currentPageSize);
+               }
+               return rc;
+            }
+         } catch (ActiveMQException e) {
             if (AuditLogger.isResourceLoggingEnabled()) {
-               AuditLogger.browseMessagesSuccess(queue.getName().toString(), currentPageSize);
+               AuditLogger.browseMessagesFailure(queue.getName().toString());
             }
-            return rc;
-         }
-      } catch (ActiveMQException e) {
-         if (AuditLogger.isResourceLoggingEnabled()) {
-            AuditLogger.browseMessagesFailure(queue.getName().toString());
+            throw new IllegalStateException(e.getMessage());
+         } finally {
+            blockOnIO();
          }
-         throw new IllegalStateException(e.getMessage());
-      } finally {
-         blockOnIO();
       }
    }
 
@@ -1714,32 +1762,35 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
 
    @Override
    public String listGroupsAsJSON() throws Exception {
-      if (AuditLogger.isBaseLoggingEnabled()) {
-         AuditLogger.listGroupsAsJSON(queue);
-      }
-      checkStarted();
+      // prevent parallel tasks running
+      try (AutoCloseable lock = server.managementLock()) {
+         if (AuditLogger.isBaseLoggingEnabled()) {
+            AuditLogger.listGroupsAsJSON(queue);
+         }
+         checkStarted();
 
-      clearIO();
-      try {
-         Map<SimpleString, Consumer> groups = queue.getGroups();
+         clearIO();
+         try {
+            Map<SimpleString, Consumer> groups = queue.getGroups();
 
-         JsonArrayBuilder jsonArray = JsonLoader.createArrayBuilder();
+            JsonArrayBuilder jsonArray = JsonLoader.createArrayBuilder();
 
-         for (Map.Entry<SimpleString, Consumer> group : groups.entrySet()) {
+            for (Map.Entry<SimpleString, Consumer> group : groups.entrySet()) {
 
-            if (group.getValue() instanceof ServerConsumer) {
-               ServerConsumer serverConsumer = (ServerConsumer) group.getValue();
+               if (group.getValue() instanceof ServerConsumer) {
+                  ServerConsumer serverConsumer = (ServerConsumer) group.getValue();
 
-               JsonObjectBuilder obj = JsonLoader.createObjectBuilder().add("groupID", group.getKey().toString()).add("consumerID", serverConsumer.getID()).add("connectionID", serverConsumer.getConnectionID().toString()).add("sessionID", serverConsumer.getSessionID()).add("browseOnly", serverConsumer.isBrowseOnly()).add("creationTime", serverConsumer.getCreationTime());
+                  JsonObjectBuilder obj = JsonLoader.createObjectBuilder().add("groupID", group.getKey().toString()).add("consumerID", serverConsumer.getID()).add("connectionID", serverConsumer.getConnectionID().toString()).add("sessionID", serverConsumer.getSessionID()).add("browseOnly", serverConsumer.isBrowseOnly()).add("creationTime", serverConsumer.getCreationTime());
+
+                  jsonArray.add(obj);
+               }
 
-               jsonArray.add(obj);
             }
 
+            return jsonArray.build().toString();
+         } finally {
+            blockOnIO();
          }
-
-         return jsonArray.build().toString();
-      } finally {
-         blockOnIO();
       }
    }
 
@@ -1969,31 +2020,37 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
 
    @Override
    public void deliverScheduledMessages(String filter) throws Exception {
-      if (AuditLogger.isBaseLoggingEnabled()) {
-         AuditLogger.deliverScheduledMessage(queue, filter);
-      }
-      checkStarted();
+      // prevent parallel tasks running
+      try (AutoCloseable lock = server.managementLock()) {
+         if (AuditLogger.isBaseLoggingEnabled()) {
+            AuditLogger.deliverScheduledMessage(queue, filter);
+         }
+         checkStarted();
 
-      clearIO();
-      try {
-         queue.deliverScheduledMessages(filter);
-      } finally {
-         blockOnIO();
+         clearIO();
+         try {
+            queue.deliverScheduledMessages(filter);
+         } finally {
+            blockOnIO();
+         }
       }
    }
 
    @Override
    public void deliverScheduledMessage(long messageId) throws Exception {
-      if (AuditLogger.isBaseLoggingEnabled()) {
-         AuditLogger.deliverScheduledMessage(queue, messageId);
-      }
-      checkStarted();
+      // prevent parallel tasks running
+      try (AutoCloseable lock = server.managementLock()) {
+         if (AuditLogger.isBaseLoggingEnabled()) {
+            AuditLogger.deliverScheduledMessage(queue, messageId);
+         }
+         checkStarted();
 
-      clearIO();
-      try {
-         queue.deliverScheduledMessage(messageId);
-      } finally {
-         blockOnIO();
+         clearIO();
+         try {
+            queue.deliverScheduledMessage(messageId);
+         } finally {
+            blockOnIO();
+         }
       }
    }
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index bcfab14924..9e122d670e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -50,6 +50,7 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
@@ -277,7 +278,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
 
    /** Certain management operations shouldn't use more than one thread.
     *  this semaphore is used to guarantee a single thread used. */
-   private final Semaphore managementSemaphore = new Semaphore(1);
+   private final ReentrantLock managementLock = new ReentrantLock();
 
    /**
     * This is a thread pool for io tasks only.
@@ -522,7 +523,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
       if (replayManager == null) {
          throw ActiveMQMessageBundle.BUNDLE.noRetention();
       }
-      replayManager.replay(start, end, address, target, filter);
+      try (AutoCloseable lock = managementLock()) {
+         replayManager.replay(start, end, address, target, filter);
+      }
    }
 
    /**
@@ -4626,10 +4629,10 @@ public class ActiveMQServerImpl implements ActiveMQServer {
 
    @Override
    public AutoCloseable managementLock() throws Exception {
-      if (!managementSemaphore.tryAcquire(1, TimeUnit.MINUTES)) {
+      if (!managementLock.tryLock(1, TimeUnit.MINUTES)) {
          throw ActiveMQMessageBundle.BUNDLE.managementBusy();
       } else {
-         return managementSemaphore::release;
+         return managementLock::unlock;
       }
    }
 }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/replay/ReplayManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/replay/ReplayManager.java
index bb46c337ea..a425b83b6f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/replay/ReplayManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/replay/ReplayManager.java
@@ -69,23 +69,7 @@ public class ReplayManager {
       this.retentionFolder = server.getConfiguration().getJournalRetentionLocation();
    }
 
-   public void replay(Date start, Date end, String sourceAddress, final String targetAddress, String filter) throws Exception {
-
-      if (running.compareAndSet(false, true)) {
-         try {
-            actualReplay(start, end, sourceAddress, targetAddress, filter);
-         } catch (Exception e) {
-            logger.warn(e.getMessage());
-            throw e;
-         } finally {
-            running.set(false);
-         }
-      } else {
-         throw new RuntimeException("Replay manager is currently busy with another operation");
-      }
-   }
-
-   private void actualReplay(Date start, Date end, String sourceAddress, String targetAddressParameter, String filterStr) throws Exception {
+   public void replay(Date start, Date end, String sourceAddress, String targetAddressParameter, String filterStr) throws Exception {
       logger.debug("Replay::{}", sourceAddress);
 
       if (sourceAddress == null) {


[activemq-artemis] 01/03: ARTEMIS-4114 Avoiding deadlock during scale down

Posted by cl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit b565a8a7b9f7415aa158e7cdc47a665187e8dd79
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Thu Dec 15 13:50:08 2022 -0500

    ARTEMIS-4114 Avoiding deadlock during scale down
    
    We will rely on existing tests for this change
---
 .../server/cluster/impl/ClusterConnectionBridge.java  | 19 +++++++++++--------
 1 file changed, 11 insertions(+), 8 deletions(-)

diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
index c34c76dbfb..f977c0852f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
@@ -428,14 +428,17 @@ public class ClusterConnectionBridge extends BridgeImpl {
          clusterConnection.removeRecord(targetNodeID);
 
          if (scaleDown) {
-            try {
-               queue.deleteQueue(true);
-               queue.removeAddress();
-            } catch (ActiveMQAddressDoesNotExistException e) {
-               // ignore
-            } catch (Exception e) {
-               logger.warn(e.getMessage(), e);
-            }
+            executor.execute(() -> {
+               logger.debug("Scaling down queue {}", queue);
+               try {
+                  queue.deleteQueue(true);
+                  queue.removeAddress();
+               } catch (ActiveMQAddressDoesNotExistException e) {
+                  logger.debug("ActiveMQAddressDoesNotExistException during scale down for queue {}", queue);
+               } catch (Exception e) {
+                  logger.warn(e.getMessage(), e);
+               }
+            });
          }
       } else {
          clusterConnection.disconnectRecord(targetNodeID);


[activemq-artemis] 03/03: ARTEMIS-3609 Using a different thread for the completion listener

Posted by cl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit bc1258ab2535e7cc26dd72e31c1fe072eb9adf0d
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Fri Dec 16 18:06:58 2022 -0500

    ARTEMIS-3609 Using a different thread for the completion listener
---
 .../activemq/artemis/api/core/ICoreMessage.java    |  8 ++++++++
 .../core/client/impl/ClientMessageImpl.java        | 11 +++++++++++
 .../core/client/impl/ClientProducerImpl.java       |  6 ++++--
 .../core/client/impl/ClientSessionFactoryImpl.java |  2 +-
 .../core/client/impl/ClientSessionImpl.java        | 23 +++++++++++-----------
 .../core/client/impl/ClientSessionInternal.java    |  9 +--------
 .../impl/SendAcknowledgementHandlerWrapper.java    | 22 ++++++++++-----------
 .../protocol/core/impl/ActiveMQSessionContext.java |  2 +-
 8 files changed, 48 insertions(+), 35 deletions(-)

diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ICoreMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ICoreMessage.java
index ba3150ae70..75011229b3 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ICoreMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ICoreMessage.java
@@ -127,4 +127,12 @@ public interface ICoreMessage extends Message {
       return map;
    }
 
+
+   default boolean isConfirmed() {
+      return false;
+   }
+
+   default void setConfirmed(boolean confirmed) {
+   }
+
 }
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java
index c1739a1b8d..5fac7e56b8 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java
@@ -46,6 +46,8 @@ public class ClientMessageImpl extends CoreMessage implements ClientMessageInter
 
    private int flowControlSize = -1;
 
+   private boolean confirmed;
+
    /**
     * Used on LargeMessages only
     */
@@ -414,4 +416,13 @@ public class ClientMessageImpl extends CoreMessage implements ClientMessageInter
       return new ClientMessageImpl(this);
    }
 
+   @Override
+   public boolean isConfirmed() {
+      return confirmed;
+   }
+
+   @Override
+   public void setConfirmed(boolean confirmed) {
+      this.confirmed = confirmed;
+   }
 }
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
index b6dad465fd..c9c6d75926 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
@@ -136,7 +136,7 @@ public class ClientProducerImpl implements ClientProducerInternal {
       checkClosed();
 
       if (handler != null) {
-         handler = new SendAcknowledgementHandlerWrapper(handler, session.getSessionExecutor());
+         handler =  session.wrap(handler);
       }
 
       doSend(address1, message, handler);
@@ -145,7 +145,7 @@ public class ClientProducerImpl implements ClientProducerInternal {
          logger.debug("Handler was used on producing messages towards address {} however there is no confirmationWindowEnabled", address1);
 
          // if there is no confirmation enabled, we will at least call the handler after the sent is done
-         session.scheduleConfirmation(handler, message);
+         handler.sendAcknowledged(message); // this is asynchronous as we wrapped with an executor
       }
    }
 
@@ -257,6 +257,8 @@ public class ClientProducerImpl implements ClientProducerInternal {
 
          session.workDone();
 
+         msg.setConfirmed(false);
+
          if (isLarge) {
             largeMessageSend(sendBlocking, msg, theCredits, handler);
          } else {
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
index 2a5c5d4030..ac6b70c7f0 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
@@ -732,7 +732,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
 
       SessionContext context = createSessionChannel(name, username, password, xa, autoCommitSends, autoCommitAcks, preAcknowledge, clientID);
 
-      ClientSessionInternal session = new ClientSessionImpl(this, name, username, password, xa, autoCommitSends, autoCommitAcks, preAcknowledge, serverLocator.isBlockOnAcknowledge(), serverLocator.isAutoGroup(), ackBatchSize, serverLocator.getConsumerWindowSize(), serverLocator.getConsumerMaxRate(), serverLocator.getConfirmationWindowSize(), serverLocator.getProducerWindowSize(), serverLocator.getProducerMaxRate(), serverLocator.isBlockOnNonDurableSend(), serverLocator.isBlockOnDurableSe [...]
+      ClientSessionInternal session = new ClientSessionImpl(this, name, username, password, xa, autoCommitSends, autoCommitAcks, preAcknowledge, serverLocator.isBlockOnAcknowledge(), serverLocator.isAutoGroup(), ackBatchSize, serverLocator.getConsumerWindowSize(), serverLocator.getConsumerMaxRate(), serverLocator.getConfirmationWindowSize(), serverLocator.getProducerWindowSize(), serverLocator.getProducerMaxRate(), serverLocator.isBlockOnNonDurableSend(), serverLocator.isBlockOnDurableSe [...]
 
       synchronized (sessions) {
          if (closed || !clientProtocolManager.isAlive()) {
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
index 301faf2f4e..dfb99e65ee 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
@@ -80,6 +80,8 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
 
    private final Executor executor;
 
+   private final Executor confirmationExecutor;
+
    // to be sent to consumers as consumers will need a separate consumer for flow control
    private final Executor flowControlExecutor;
 
@@ -184,6 +186,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
                      final String groupID,
                      final SessionContext sessionContext,
                      final Executor executor,
+                     final Executor confirmationExecutor,
                      final Executor flowControlExecutor,
                      final Executor closeExecutor) throws ActiveMQException {
       this.sessionFactory = sessionFactory;
@@ -196,6 +199,8 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
 
       this.executor = executor;
 
+      this.confirmationExecutor = confirmationExecutor;
+
       this.flowControlExecutor = flowControlExecutor;
 
       this.xa = xa;
@@ -2208,23 +2213,17 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
       return true;
    }
 
-   @Override
-   public void scheduleConfirmation(final SendAcknowledgementHandler handler, final Message message) {
-      executor.execute(new Runnable() {
-         @Override
-         public void run() {
-            handler.sendAcknowledged(message);
-         }
-      });
-   }
-
    @Override
    public SessionContext getSessionContext() {
       return sessionContext;
    }
 
    @Override
-   public Executor getSessionExecutor() {
-      return executor;
+   public SendAcknowledgementHandler wrap(SendAcknowledgementHandler handler) {
+      if (!(handler instanceof SendAcknowledgementHandlerWrapper)) {
+         handler = new SendAcknowledgementHandlerWrapper(handler, confirmationExecutor);
+      }
+      return handler;
    }
+
 }
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java
index 59c68201ea..b2833a2344 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java
@@ -16,8 +16,6 @@
  */
 package org.apache.activemq.artemis.core.client.impl;
 
-import java.util.concurrent.Executor;
-
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
@@ -125,11 +123,6 @@ public interface ClientSessionInternal extends ClientSession {
 
    boolean isConfirmationWindowEnabled();
 
-   /**
-    * @param handler
-    */
-   void scheduleConfirmation(SendAcknowledgementHandler handler, Message message);
-
    boolean isClosing();
 
    String getNodeId();
@@ -138,5 +131,5 @@ public interface ClientSessionInternal extends ClientSession {
 
    SessionContext getSessionContext();
 
-   Executor getSessionExecutor();
+   SendAcknowledgementHandler wrap(SendAcknowledgementHandler handler);
 }
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/SendAcknowledgementHandlerWrapper.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/SendAcknowledgementHandlerWrapper.java
index 931f3868a4..793987d809 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/SendAcknowledgementHandlerWrapper.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/SendAcknowledgementHandlerWrapper.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.client.impl;
 
 import java.util.concurrent.Executor;
 
+import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
 import org.apache.activemq.artemis.utils.actors.Actor;
@@ -26,13 +27,7 @@ public class SendAcknowledgementHandlerWrapper implements SendAcknowledgementHan
 
    private SendAcknowledgementHandler wrapped;
 
-   /**
-    * It's possible that a SendAcknowledgementHandler might be called twice due to subsequent
-    * packet confirmations on the same connection. Using this boolean avoids that possibility.
-    * A new SendAcknowledgementHandlerWrapper is created for each message sent so once it's
-    * called once it will never be called again.
-    */
-   private volatile boolean active = true;
+
 
    private final Actor<Message> messageActor;
 
@@ -44,22 +39,27 @@ public class SendAcknowledgementHandlerWrapper implements SendAcknowledgementHan
 
    @Override
    public void sendAcknowledged(Message message) {
-      if (active) {
+      ICoreMessage msg = message.toCore();
+
+      // It is possible that a SendAcknowledgementHandler might be called twice due to subsequent
+      // packet confirmations on the same connection. Using this boolean avoids that possibility.
+      if (!msg.isConfirmed()) {
          try {
             messageActor.act(message);
          } finally {
-            active = false;
+            msg.setConfirmed(true);
          }
       }
    }
 
    @Override
    public void sendFailed(Message message, Exception e) {
-      if (active) {
+      ICoreMessage msg = message.toCore();
+      if (!msg.isConfirmed()) {
          try {
             wrapped.sendFailed(message, e);
          } finally {
-            active = false;
+            msg.setConfirmed(true);
          }
       }
    }
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
index 0cc1013f6b..7f248b7403 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
@@ -278,7 +278,7 @@ public class ActiveMQSessionContext extends SessionContext {
    public void setSendAcknowledgementHandler(final SendAcknowledgementHandler handler) {
       setHandlers();
 
-      this.sendAckHandler = handler;
+      this.sendAckHandler = session.wrap(handler);
    }
 
    @Override