You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2023/01/13 14:23:59 UTC
[activemq-artemis] 02/03: ARTEMIS-4116 Management lock to avoid multiple long running tasks running from user requests
This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
commit 4550fcf47c8c87d4e7cc0c06207a09c7de41dc65
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Thu Dec 15 13:55:03 2022 -0500
ARTEMIS-4116 Management lock to avoid multiple long running tasks running from user requests
---
.../api/core/management/ActiveMQServerControl.java | 4 +-
.../management/impl/ActiveMQServerControlImpl.java | 450 +++++++-------
.../core/management/impl/AddressControlImpl.java | 128 ++--
.../core/management/impl/QueueControlImpl.java | 671 +++++++++++----------
.../core/server/impl/ActiveMQServerImpl.java | 11 +-
.../artemis/core/server/replay/ReplayManager.java | 18 +-
6 files changed, 679 insertions(+), 603 deletions(-)
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
index 248f44d4a6..2229abfee5 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
@@ -526,7 +526,7 @@ public interface ActiveMQServerControl {
// Operations ----------------------------------------------------
@Operation(desc = "Isolate the broker", impact = MBeanOperationInfo.ACTION)
- boolean freezeReplication();
+ boolean freezeReplication() throws Exception;
@Operation(desc = "Create an address", impact = MBeanOperationInfo.ACTION)
String createAddress(@Parameter(name = "name", desc = "The name of the address") String name,
@@ -1905,7 +1905,7 @@ public interface ActiveMQServerControl {
* Returns the names of the queues created on this server with the given routing-type.
*/
@Operation(desc = "Names of the queues created on this server with the given routing-type (i.e. ANYCAST or MULTICAST)", impact = MBeanOperationInfo.INFO)
- String[] getQueueNames(@Parameter(name = "routingType", desc = "The routing type, MULTICAST or ANYCAST") String routingType);
+ String[] getQueueNames(@Parameter(name = "routingType", desc = "The routing type, MULTICAST or ANYCAST") String routingType) throws Exception;
/**
* Returns the names of the cluster-connections deployed on this server.
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
index 8e5ce81e08..25deb59e09 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
@@ -826,17 +826,19 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
}
@Override
- public boolean freezeReplication() {
+ public boolean freezeReplication() throws Exception {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.freezeReplication(this.server);
}
- Activation activation = server.getActivation();
- if (activation instanceof SharedNothingLiveActivation) {
- SharedNothingLiveActivation liveActivation = (SharedNothingLiveActivation) activation;
- liveActivation.freezeReplication();
- return true;
+ try (AutoCloseable lock = server.managementLock()) {
+ Activation activation = server.getActivation();
+ if (activation instanceof SharedNothingLiveActivation) {
+ SharedNothingLiveActivation liveActivation = (SharedNothingLiveActivation) activation;
+ liveActivation.freezeReplication();
+ return true;
+ }
+ return false;
}
- return false;
}
private enum AddressInfoTextFormatter {
@@ -966,21 +968,25 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
@Override
public void deleteAddress(String name, boolean force) throws Exception {
- checkStarted();
- clearIO();
- try {
- server.removeAddressInfo(new SimpleString(name), null, force);
- if (AuditLogger.isResourceLoggingEnabled()) {
- AuditLogger.deleteAddressSuccess(name);
- }
- } catch (ActiveMQException e) {
- if (AuditLogger.isResourceLoggingEnabled()) {
- AuditLogger.deleteAddressFailure(name);
+ // delete might be a long running task, we ensure only one large task running
+ try (AutoCloseable lock = server.managementLock()) {
+ checkStarted();
+
+ clearIO();
+ try {
+ server.removeAddressInfo(new SimpleString(name), null, force);
+ if (AuditLogger.isResourceLoggingEnabled()) {
+ AuditLogger.deleteAddressSuccess(name);
+ }
+ } catch (ActiveMQException e) {
+ if (AuditLogger.isResourceLoggingEnabled()) {
+ AuditLogger.deleteAddressFailure(name);
+ }
+ throw new IllegalStateException(e.getMessage());
+ } finally {
+ blockOnIO();
}
- throw new IllegalStateException(e.getMessage());
- } finally {
- blockOnIO();
}
}
@@ -1003,10 +1009,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
clearIO();
try {
- server.createQueue(new QueueConfiguration(name)
- .setAddress(address)
- .setFilterString(filterStr)
- .setDurable(durable));
+ server.createQueue(new QueueConfiguration(name).setAddress(address).setFilterString(filterStr).setDurable(durable));
} finally {
blockOnIO();
}
@@ -1232,10 +1235,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
boolean autoCreateAddress,
long ringSize) throws Exception {
if (AuditLogger.isBaseLoggingEnabled()) {
- AuditLogger.createQueue(this.server, null, null, address, routingType, name, filterStr, durable,
- maxConsumers, purgeOnNoConsumers, exclusive, groupRebalance, groupBuckets, groupFirstKey,
- lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch,
- autoDelete, autoDeleteDelay, autoDeleteMessageCount, autoCreateAddress, ringSize);
+ AuditLogger.createQueue(this.server, null, null, address, routingType, name, filterStr, durable, maxConsumers, purgeOnNoConsumers, exclusive, groupRebalance, groupBuckets, groupFirstKey, lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, autoDelete, autoDeleteDelay, autoDeleteMessageCount, autoCreateAddress, ringSize);
}
checkStarted();
@@ -1247,34 +1247,14 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
filter = new SimpleString(filterStr);
}
- final Queue queue = server.createQueue(new QueueConfiguration(name)
- .setAddress(address)
- .setRoutingType(RoutingType.valueOf(routingType.toUpperCase()))
- .setFilterString(filter)
- .setDurable(durable)
- .setMaxConsumers(maxConsumers)
- .setPurgeOnNoConsumers(purgeOnNoConsumers)
- .setExclusive(exclusive)
- .setGroupRebalance(groupRebalance)
- .setGroupBuckets(groupBuckets)
- .setGroupFirstKey(groupFirstKey)
- .setLastValue(lastValue)
- .setLastValueKey(lastValueKey)
- .setNonDestructive(nonDestructive)
- .setConsumersBeforeDispatch(consumersBeforeDispatch)
- .setDelayBeforeDispatch(delayBeforeDispatch)
- .setAutoDelete(autoDelete)
- .setAutoDeleteDelay(autoDeleteDelay)
- .setAutoDeleteMessageCount(autoDeleteMessageCount)
- .setAutoCreateAddress(autoCreateAddress)
- .setRingSize(ringSize));
+ final Queue queue = server.createQueue(new QueueConfiguration(name).setAddress(address).setRoutingType(RoutingType.valueOf(routingType.toUpperCase())).setFilterString(filter).setDurable(durable).setMaxConsumers(maxConsumers).setPurgeOnNoConsumers(purgeOnNoConsumers).setExclusive(exclusive).setGroupRebalance(groupRebalance).setGroupBuckets(groupBuckets).setGroupFirstKey(groupFirstKey).setLastValue(lastValue).setLastValueKey(lastValueKey).setNonDestructive(nonDestructive).setConsu [...]
if (AuditLogger.isResourceLoggingEnabled()) {
- AuditLogger.createQueueSuccess( name, address, routingType);
+ AuditLogger.createQueueSuccess(name, address, routingType);
}
return QueueTextFormatter.Long.format(queue, new StringBuilder()).toString();
} catch (ActiveMQException e) {
if (AuditLogger.isResourceLoggingEnabled()) {
- AuditLogger.createQueueFailure( name, address, routingType);
+ AuditLogger.createQueueFailure(name, address, routingType);
}
throw new IllegalStateException(e.getMessage());
} finally {
@@ -1313,7 +1293,6 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
@Override
public String updateQueue(String queueConfigurationAsJson) throws Exception {
-
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.updateQueue(this.server, queueConfigurationAsJson);
}
@@ -1414,8 +1393,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
String user,
Long ringSize) throws Exception {
if (AuditLogger.isBaseLoggingEnabled()) {
- AuditLogger.updateQueue(this.server, name, routingType, filter, maxConsumers, purgeOnNoConsumers,
- exclusive, groupRebalance, groupBuckets, groupFirstKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, user, ringSize);
+ AuditLogger.updateQueue(this.server, name, routingType, filter, maxConsumers, purgeOnNoConsumers, exclusive, groupRebalance, groupBuckets, groupFirstKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, user, ringSize);
}
checkStarted();
@@ -1592,27 +1570,30 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
@Override
public void destroyQueue(final String name, final boolean removeConsumers, final boolean forceAutoDeleteAddress) throws Exception {
- if (AuditLogger.isBaseLoggingEnabled()) {
- AuditLogger.destroyQueue(this.server, null, null, name, removeConsumers, forceAutoDeleteAddress);
- }
- checkStarted();
+ // destroy might be a long running task, we prevent multiple running tasks in this case
+ try (AutoCloseable lock = server.managementLock()) {
+ if (AuditLogger.isBaseLoggingEnabled()) {
+ AuditLogger.destroyQueue(this.server, null, null, name, removeConsumers, forceAutoDeleteAddress);
+ }
+ checkStarted();
- clearIO();
- try {
- SimpleString queueName = new SimpleString(name);
+ clearIO();
try {
- server.destroyQueue(queueName, null, !removeConsumers, removeConsumers, forceAutoDeleteAddress);
- } catch (Exception e) {
+ SimpleString queueName = new SimpleString(name);
+ try {
+ server.destroyQueue(queueName, null, !removeConsumers, removeConsumers, forceAutoDeleteAddress);
+ } catch (Exception e) {
+ if (AuditLogger.isResourceLoggingEnabled()) {
+ AuditLogger.destroyQueueFailure(name);
+ }
+ throw e;
+ }
if (AuditLogger.isResourceLoggingEnabled()) {
- AuditLogger.destroyQueueFailure(name);
+ AuditLogger.destroyQueueSuccess(name);
}
- throw e;
- }
- if (AuditLogger.isResourceLoggingEnabled()) {
- AuditLogger.destroyQueueSuccess(name);
+ } finally {
+ blockOnIO();
}
- } finally {
- blockOnIO();
}
}
@@ -2149,56 +2130,64 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
@Override
public synchronized boolean commitPreparedTransaction(final String transactionAsBase64) throws Exception {
- if (AuditLogger.isBaseLoggingEnabled()) {
- AuditLogger.commitPreparedTransaction(this.server, transactionAsBase64);
- }
- checkStarted();
-
- clearIO();
- try {
- List<Xid> xids = resourceManager.getPreparedTransactions();
+ // commit might be a long running task if dealing with a large transaction
+ // ensuring a single one just in case
+ try (AutoCloseable lock = server.managementLock()) {
+ if (AuditLogger.isBaseLoggingEnabled()) {
+ AuditLogger.commitPreparedTransaction(this.server, transactionAsBase64);
+ }
+ checkStarted();
- for (Xid xid : xids) {
- if (XidImpl.toBase64String(xid).equals(transactionAsBase64)) {
- Transaction transaction = resourceManager.removeTransaction(xid, null);
- transaction.commit(false);
- long recordID = server.getStorageManager().storeHeuristicCompletion(xid, true);
- storageManager.waitOnOperations();
- resourceManager.putHeuristicCompletion(recordID, xid, true);
- return true;
+ clearIO();
+ try {
+ List<Xid> xids = resourceManager.getPreparedTransactions();
+
+ for (Xid xid : xids) {
+ if (XidImpl.toBase64String(xid).equals(transactionAsBase64)) {
+ Transaction transaction = resourceManager.removeTransaction(xid, null);
+ transaction.commit(false);
+ long recordID = server.getStorageManager().storeHeuristicCompletion(xid, true);
+ storageManager.waitOnOperations();
+ resourceManager.putHeuristicCompletion(recordID, xid, true);
+ return true;
+ }
}
+ return false;
+ } finally {
+ blockOnIO();
}
- return false;
- } finally {
- blockOnIO();
}
}
@Override
public synchronized boolean rollbackPreparedTransaction(final String transactionAsBase64) throws Exception {
- if (AuditLogger.isBaseLoggingEnabled()) {
- AuditLogger.rollbackPreparedTransaction(this.server, transactionAsBase64);
- }
- checkStarted();
+ // rollback might be a long running task if dealing with a large transaction
+ // ensuring a single task just in case
+ try (AutoCloseable lock = server.managementLock()) {
+ if (AuditLogger.isBaseLoggingEnabled()) {
+ AuditLogger.rollbackPreparedTransaction(this.server, transactionAsBase64);
+ }
+ checkStarted();
- clearIO();
- try {
+ clearIO();
+ try {
- List<Xid> xids = resourceManager.getPreparedTransactions();
+ List<Xid> xids = resourceManager.getPreparedTransactions();
- for (Xid xid : xids) {
- if (XidImpl.toBase64String(xid).equals(transactionAsBase64)) {
- Transaction transaction = resourceManager.removeTransaction(xid, null);
- transaction.rollback();
- long recordID = server.getStorageManager().storeHeuristicCompletion(xid, false);
- server.getStorageManager().waitOnOperations();
- resourceManager.putHeuristicCompletion(recordID, xid, false);
- return true;
+ for (Xid xid : xids) {
+ if (XidImpl.toBase64String(xid).equals(transactionAsBase64)) {
+ Transaction transaction = resourceManager.removeTransaction(xid, null);
+ transaction.rollback();
+ long recordID = server.getStorageManager().storeHeuristicCompletion(xid, false);
+ server.getStorageManager().waitOnOperations();
+ resourceManager.putHeuristicCompletion(recordID, xid, false);
+ return true;
+ }
}
+ return false;
+ } finally {
+ blockOnIO();
}
- return false;
- } finally {
- blockOnIO();
}
}
@@ -2278,149 +2267,170 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
@Override
public boolean closeConsumerConnectionsForAddress(final String address) {
- if (AuditLogger.isBaseLoggingEnabled()) {
- AuditLogger.closeConsumerConnectionsForAddress(this.server, address);
- }
- boolean closed = false;
- checkStarted();
+ // this could be a long running task, ensuring a single task
+ try (AutoCloseable lock = server.managementLock()) {
+ if (AuditLogger.isBaseLoggingEnabled()) {
+ AuditLogger.closeConsumerConnectionsForAddress(this.server, address);
+ }
+ boolean closed = false;
+ checkStarted();
- clearIO();
- try {
- for (Binding binding : postOffice.getMatchingBindings(SimpleString.toSimpleString(address))) {
- if (binding instanceof LocalQueueBinding) {
- Queue queue = ((LocalQueueBinding) binding).getQueue();
- for (Consumer consumer : queue.getConsumers()) {
- if (consumer instanceof ServerConsumer) {
- ServerConsumer serverConsumer = (ServerConsumer) consumer;
- RemotingConnection connection = null;
-
- for (RemotingConnection potentialConnection : remotingService.getConnections()) {
- if (potentialConnection.getID().toString().equals(serverConsumer.getConnectionID())) {
- connection = potentialConnection;
+ clearIO();
+ try {
+ for (Binding binding : postOffice.getMatchingBindings(SimpleString.toSimpleString(address))) {
+ if (binding instanceof LocalQueueBinding) {
+ Queue queue = ((LocalQueueBinding) binding).getQueue();
+ for (Consumer consumer : queue.getConsumers()) {
+ if (consumer instanceof ServerConsumer) {
+ ServerConsumer serverConsumer = (ServerConsumer) consumer;
+ RemotingConnection connection = null;
+
+ for (RemotingConnection potentialConnection : remotingService.getConnections()) {
+ if (potentialConnection.getID().toString().equals(serverConsumer.getConnectionID())) {
+ connection = potentialConnection;
+ }
}
- }
- if (connection != null) {
- remotingService.removeConnection(connection.getID());
- connection.fail(ActiveMQMessageBundle.BUNDLE.consumerConnectionsClosedByManagement(address));
- closed = true;
+ if (connection != null) {
+ remotingService.removeConnection(connection.getID());
+ connection.fail(ActiveMQMessageBundle.BUNDLE.consumerConnectionsClosedByManagement(address));
+ closed = true;
+ }
}
}
}
}
+ } catch (Exception e) {
+ ActiveMQServerLogger.LOGGER.failedToCloseConsumerConnectionsForAddress(address, e);
+ } finally {
+ blockOnIO();
}
- } catch (Exception e) {
- ActiveMQServerLogger.LOGGER.failedToCloseConsumerConnectionsForAddress(address, e);
- } finally {
- blockOnIO();
+ return closed;
+ } catch (Throwable e) {
+ throw new RuntimeException(e.getMessage(), e);
}
- return closed;
}
@Override
public boolean closeConnectionsForUser(final String userName) {
- if (AuditLogger.isBaseLoggingEnabled()) {
- AuditLogger.closeConnectionsForUser(this.server, userName);
- }
- boolean closed = false;
- checkStarted();
+ // possibly a long running task
+ try (AutoCloseable lock = server.managementLock()) {
+ if (AuditLogger.isBaseLoggingEnabled()) {
+ AuditLogger.closeConnectionsForUser(this.server, userName);
+ }
+ boolean closed = false;
+ checkStarted();
- clearIO();
- try {
- for (ServerSession serverSession : server.getSessions()) {
- if (serverSession.getUsername() != null && serverSession.getUsername().equals(userName)) {
- RemotingConnection connection = null;
+ clearIO();
+ try {
+ for (ServerSession serverSession : server.getSessions()) {
+ if (serverSession.getUsername() != null && serverSession.getUsername().equals(userName)) {
+ RemotingConnection connection = null;
- for (RemotingConnection potentialConnection : remotingService.getConnections()) {
- if (potentialConnection.getID().toString().equals(serverSession.getConnectionID().toString())) {
- connection = potentialConnection;
+ for (RemotingConnection potentialConnection : remotingService.getConnections()) {
+ if (potentialConnection.getID().toString().equals(serverSession.getConnectionID().toString())) {
+ connection = potentialConnection;
+ }
}
- }
- if (connection != null) {
- remotingService.removeConnection(connection.getID());
- connection.fail(ActiveMQMessageBundle.BUNDLE.connectionsForUserClosedByManagement(userName));
- closed = true;
+ if (connection != null) {
+ remotingService.removeConnection(connection.getID());
+ connection.fail(ActiveMQMessageBundle.BUNDLE.connectionsForUserClosedByManagement(userName));
+ closed = true;
+ }
}
}
+ } finally {
+ blockOnIO();
}
- } finally {
- blockOnIO();
+ return closed;
+ } catch (Throwable e) {
+ throw new RuntimeException(e.getMessage(), e);
}
- return closed;
}
@Override
public boolean closeConnectionWithID(final String ID) {
- if (AuditLogger.isBaseLoggingEnabled()) {
- AuditLogger.closeConnectionWithID(this.server, ID);
- }
- checkStarted();
+ // possibly a long running task
+ try (AutoCloseable lock = server.managementLock()) {
+ if (AuditLogger.isBaseLoggingEnabled()) {
+ AuditLogger.closeConnectionWithID(this.server, ID);
+ }
+ checkStarted();
- clearIO();
- try {
- for (RemotingConnection connection : remotingService.getConnections()) {
- if (connection.getID().toString().equals(ID)) {
- remotingService.removeConnection(connection.getID());
- connection.fail(ActiveMQMessageBundle.BUNDLE.connectionWithIDClosedByManagement(ID));
- return true;
+ clearIO();
+ try {
+ for (RemotingConnection connection : remotingService.getConnections()) {
+ if (connection.getID().toString().equals(ID)) {
+ remotingService.removeConnection(connection.getID());
+ connection.fail(ActiveMQMessageBundle.BUNDLE.connectionWithIDClosedByManagement(ID));
+ return true;
+ }
}
+ } finally {
+ blockOnIO();
}
- } finally {
- blockOnIO();
+ return false;
+ } catch (Throwable e) {
+ throw new RuntimeException(e.getMessage(), e);
}
- return false;
}
@Override
public boolean closeSessionWithID(final String connectionID, final String ID) throws Exception {
- if (AuditLogger.isBaseLoggingEnabled()) {
- AuditLogger.closeSessionWithID(this.server, connectionID, ID);
- }
- checkStarted();
+ // possibly a long running task
+ try (AutoCloseable lock = server.managementLock()) {
+ if (AuditLogger.isBaseLoggingEnabled()) {
+ AuditLogger.closeSessionWithID(this.server, connectionID, ID);
+ }
+ checkStarted();
- clearIO();
- try {
- List<ServerSession> sessions = server.getSessions(connectionID);
- for (ServerSession session : sessions) {
- if (session.getName().equals(ID.toString())) {
- session.close(true);
- return true;
+ clearIO();
+ try {
+ List<ServerSession> sessions = server.getSessions(connectionID);
+ for (ServerSession session : sessions) {
+ if (session.getName().equals(ID.toString())) {
+ session.close(true);
+ return true;
+ }
}
- }
- } finally {
- blockOnIO();
+ } finally {
+ blockOnIO();
+ }
+ return false;
}
- return false;
}
@Override
public boolean closeConsumerWithID(final String sessionID, final String ID) throws Exception {
- if (AuditLogger.isBaseLoggingEnabled()) {
- AuditLogger.closeConsumerWithID(this.server, sessionID, ID);
- }
- checkStarted();
+ // possibly a long running task
+ try (AutoCloseable lock = server.managementLock()) {
+ if (AuditLogger.isBaseLoggingEnabled()) {
+ AuditLogger.closeConsumerWithID(this.server, sessionID, ID);
+ }
+ checkStarted();
- clearIO();
- try {
- Set<ServerSession> sessions = server.getSessions();
- for (ServerSession session : sessions) {
- if (session.getName().equals(sessionID.toString())) {
- Set<ServerConsumer> serverConsumers = session.getServerConsumers();
- for (ServerConsumer serverConsumer : serverConsumers) {
- if (serverConsumer.sequentialID() == Long.valueOf(ID)) {
- serverConsumer.disconnect();
- return true;
+ clearIO();
+ try {
+ Set<ServerSession> sessions = server.getSessions();
+ for (ServerSession session : sessions) {
+ if (session.getName().equals(sessionID.toString())) {
+ Set<ServerConsumer> serverConsumers = session.getServerConsumers();
+ for (ServerConsumer serverConsumer : serverConsumers) {
+ if (serverConsumer.sequentialID() == Long.valueOf(ID)) {
+ serverConsumer.disconnect();
+ return true;
+ }
}
}
}
- }
- } finally {
- blockOnIO();
+ } finally {
+ blockOnIO();
+ }
+ return false;
}
- return false;
}
@Override
@@ -4116,27 +4126,29 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
@Override
public void scaleDown(String connector) throws Exception {
- if (AuditLogger.isBaseLoggingEnabled()) {
- AuditLogger.scaleDown(this.server, connector);
- }
- checkStarted();
+ try (AutoCloseable lock = server.managementLock()) {
+ if (AuditLogger.isBaseLoggingEnabled()) {
+ AuditLogger.scaleDown(this.server, connector);
+ }
+ checkStarted();
- clearIO();
- HAPolicy haPolicy = server.getHAPolicy();
- if (haPolicy instanceof LiveOnlyPolicy) {
- LiveOnlyPolicy liveOnlyPolicy = (LiveOnlyPolicy) haPolicy;
+ clearIO();
+ HAPolicy haPolicy = server.getHAPolicy();
+ if (haPolicy instanceof LiveOnlyPolicy) {
+ LiveOnlyPolicy liveOnlyPolicy = (LiveOnlyPolicy) haPolicy;
- if (liveOnlyPolicy.getScaleDownPolicy() == null) {
- liveOnlyPolicy.setScaleDownPolicy(new ScaleDownPolicy());
- }
+ if (liveOnlyPolicy.getScaleDownPolicy() == null) {
+ liveOnlyPolicy.setScaleDownPolicy(new ScaleDownPolicy());
+ }
- liveOnlyPolicy.getScaleDownPolicy().setEnabled(true);
+ liveOnlyPolicy.getScaleDownPolicy().setEnabled(true);
- if (connector != null) {
- liveOnlyPolicy.getScaleDownPolicy().getConnectors().add(0, connector);
- }
+ if (connector != null) {
+ liveOnlyPolicy.getScaleDownPolicy().getConnectors().add(0, connector);
+ }
- server.fail(true);
+ server.fail(true);
+ }
}
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java
index d88051e392..9044936d63 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java
@@ -431,10 +431,15 @@ public class AddressControlImpl extends AbstractControl implements AddressContro
@Override
public long getMessageCount() {
- if (AuditLogger.isBaseLoggingEnabled()) {
- AuditLogger.getMessageCount(this.addressInfo);
+ // prevent parallel tasks running
+ try (AutoCloseable lock = server.managementLock()) {
+ if (AuditLogger.isBaseLoggingEnabled()) {
+ AuditLogger.getMessageCount(this.addressInfo);
+ }
+ return getMessageCount(DurabilityType.ALL);
+ } catch (Exception e) {
+ throw new RuntimeException(e.getMessage(), e);
}
- return getMessageCount(DurabilityType.ALL);
}
@Override
@@ -472,14 +477,17 @@ public class AddressControlImpl extends AbstractControl implements AddressContro
final String user,
final String password,
boolean createMessageId) throws Exception {
- if (AuditLogger.isBaseLoggingEnabled()) {
- AuditLogger.sendMessageThroughManagement(this, headers, type, body, durable, user, "****");
- }
- try {
- return sendMessage(addressInfo.getName(), server, headers, type, body, durable, user, password, createMessageId);
- } catch (Exception e) {
- e.printStackTrace();
- throw new IllegalStateException(e.getMessage());
+ // prevent parallel tasks running
+ try (AutoCloseable lock = server.managementLock()) {
+ if (AuditLogger.isBaseLoggingEnabled()) {
+ AuditLogger.sendMessageThroughManagement(this, headers, type, body, durable, user, "****");
+ }
+ try {
+ return sendMessage(addressInfo.getName(), server, headers, type, body, durable, user, password, createMessageId);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new IllegalStateException(e.getMessage());
+ }
}
}
@@ -585,20 +593,25 @@ public class AddressControlImpl extends AbstractControl implements AddressContro
@Override
public boolean clearDuplicateIdCache() {
- if (AuditLogger.isBaseLoggingEnabled()) {
- AuditLogger.clearDuplicateIdCache(this.addressInfo);
- }
- DuplicateIDCache cache = ((PostOfficeImpl)server.getPostOffice()).getDuplicateIDCaches().get(addressInfo.getName());
- try {
- if (cache != null) {
- cache.clear();
- return true;
+ // prevent parallel tasks running
+ try (AutoCloseable lock = server.managementLock()) {
+ if (AuditLogger.isBaseLoggingEnabled()) {
+ AuditLogger.clearDuplicateIdCache(this.addressInfo);
+ }
+ DuplicateIDCache cache = ((PostOfficeImpl) server.getPostOffice()).getDuplicateIDCaches().get(addressInfo.getName());
+ try {
+ if (cache != null) {
+ cache.clear();
+ return true;
+ }
+ } catch (Exception e) {
+ logger.debug("Failed to clear duplicate ID cache", e);
}
+
+ return false;
} catch (Exception e) {
- logger.debug("Failed to clear duplicate ID cache", e);
+ throw new RuntimeException(e.getMessage(), e);
}
-
- return false;
}
@Override
@@ -627,37 +640,41 @@ public class AddressControlImpl extends AbstractControl implements AddressContro
@Override
public long purge() throws Exception {
- if (AuditLogger.isBaseLoggingEnabled()) {
- AuditLogger.purge(this.addressInfo);
- }
- clearIO();
- long totalMsgs = 0;
- try {
- Bindings bindings = server.getPostOffice().lookupBindingsForAddress(addressInfo.getName());
- if (bindings != null) {
- for (Binding binding : bindings.getBindings()) {
- if (binding instanceof QueueBinding) {
- totalMsgs += ((QueueBinding) binding).getQueue().deleteMatchingReferences(QueueImpl.DEFAULT_FLUSH_LIMIT, null, AckReason.KILLED);
+ // prevent parallel tasks running
+ try (AutoCloseable lock = server.managementLock()) {
+ if (AuditLogger.isBaseLoggingEnabled()) {
+ AuditLogger.purge(this.addressInfo);
+ }
+ clearIO();
+ long totalMsgs = 0;
+ try {
+ Bindings bindings = server.getPostOffice().lookupBindingsForAddress(addressInfo.getName());
+ if (bindings != null) {
+ for (Binding binding : bindings.getBindings()) {
+ if (binding instanceof QueueBinding) {
+ totalMsgs += ((QueueBinding) binding).getQueue().deleteMatchingReferences(QueueImpl.DEFAULT_FLUSH_LIMIT, null, AckReason.KILLED);
+ }
}
}
+ if (AuditLogger.isResourceLoggingEnabled()) {
+ AuditLogger.purgeAddressSuccess(addressInfo.getName().toString());
+ }
+ } catch (Throwable t) {
+ if (AuditLogger.isResourceLoggingEnabled()) {
+ AuditLogger.purgeAddressFailure(addressInfo.getName().toString());
+ }
+ throw new IllegalStateException(t.getMessage());
+ } finally {
+ blockOnIO();
}
- if (AuditLogger.isResourceLoggingEnabled()) {
- AuditLogger.purgeAddressSuccess(addressInfo.getName().toString());
- }
- } catch (Throwable t) {
- if (AuditLogger.isResourceLoggingEnabled()) {
- AuditLogger.purgeAddressFailure(addressInfo.getName().toString());
- }
- throw new IllegalStateException(t.getMessage());
- } finally {
- blockOnIO();
- }
- return totalMsgs;
+ return totalMsgs;
+ }
}
@Override
public void replay(String target, String filter) throws Exception {
+ // server.replay is already calling the managementLock, no need to do it here
server.replay(null, null, this.getAddress(), target, filter);
}
@@ -669,22 +686,25 @@ public class AddressControlImpl extends AbstractControl implements AddressContro
Date startScanDate = format.parse(startScan);
Date endScanDate = format.parse(endScan);
+ // server.replay is already calling the managementLock, no need to do it here
server.replay(startScanDate, endScanDate, this.getAddress(), target, filter);
}
private long getMessageCount(final DurabilityType durability) {
- long count = 0;
- for (String queueName : getQueueNames()) {
- Queue queue = server.locateQueue(queueName);
- if (queue != null &&
- (durability == DurabilityType.ALL ||
- (durability == DurabilityType.DURABLE && queue.isDurable()) ||
- (durability == DurabilityType.NON_DURABLE && !queue.isDurable()))) {
- count += queue.getMessageCount();
+ // prevent parallel tasks running
+ try (AutoCloseable lock = server.managementLock()) {
+ long count = 0;
+ for (String queueName : getQueueNames()) {
+ Queue queue = server.locateQueue(queueName);
+ if (queue != null && (durability == DurabilityType.ALL || (durability == DurabilityType.DURABLE && queue.isDurable()) || (durability == DurabilityType.NON_DURABLE && !queue.isDurable()))) {
+ count += queue.getMessageCount();
+ }
}
+ return count;
+ } catch (Exception e) {
+ throw new RuntimeException(e.getMessage(), e);
}
- return count;
}
private void checkStarted() {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
index 1fbefc3847..9bced82671 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
@@ -740,32 +740,38 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
@Override
public Map<String, Object>[] listScheduledMessages() throws Exception {
- if (AuditLogger.isBaseLoggingEnabled()) {
- AuditLogger.listScheduledMessages(queue);
- }
- checkStarted();
+ // it could be a long running task
+ try (AutoCloseable lock = server.managementLock()) {
+ if (AuditLogger.isBaseLoggingEnabled()) {
+ AuditLogger.listScheduledMessages(queue);
+ }
+ checkStarted();
- clearIO();
- try {
- List<MessageReference> refs = queue.getScheduledMessages();
- return convertMessagesToMaps(refs);
- } finally {
- blockOnIO();
+ clearIO();
+ try {
+ List<MessageReference> refs = queue.getScheduledMessages();
+ return convertMessagesToMaps(refs);
+ } finally {
+ blockOnIO();
+ }
}
}
@Override
public String listScheduledMessagesAsJSON() throws Exception {
- if (AuditLogger.isBaseLoggingEnabled()) {
- AuditLogger.listScheduledMessagesAsJSON(queue);
- }
- checkStarted();
+ // it could be a long running task
+ try (AutoCloseable lock = server.managementLock()) {
+ if (AuditLogger.isBaseLoggingEnabled()) {
+ AuditLogger.listScheduledMessagesAsJSON(queue);
+ }
+ checkStarted();
- clearIO();
- try {
- return QueueControlImpl.toJSON(listScheduledMessages());
- } finally {
- blockOnIO();
+ clearIO();
+ try {
+ return QueueControlImpl.toJSON(listScheduledMessages());
+ } finally {
+ blockOnIO();
+ }
}
}
@@ -969,33 +975,36 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
}
private Map<String, Long> internalCountMessages(final String filterStr, final String groupByPropertyStr) throws Exception {
- checkStarted();
+ // long running task
+ try (AutoCloseable lock = server.managementLock()) {
+ checkStarted();
- clearIO();
+ clearIO();
- Map<String, Long> result = new HashMap<>();
- try {
- Filter filter = FilterImpl.createFilter(filterStr);
- SimpleString groupByProperty = SimpleString.toSimpleString(groupByPropertyStr);
- if (filter == null && groupByProperty == null) {
- result.put(null, getMessageCount());
- } else {
- final int limit = addressSettingsRepository.getMatch(address).getManagementBrowsePageSize();
- int count = 0;
- try (LinkedListIterator<MessageReference> iterator = queue.browserIterator()) {
- try {
- while (iterator.hasNext() && count++ < limit) {
- Message message = iterator.next().getMessage();
- internalComputeMessage(result, filter, groupByProperty, message);
+ Map<String, Long> result = new HashMap<>();
+ try {
+ Filter filter = FilterImpl.createFilter(filterStr);
+ SimpleString groupByProperty = SimpleString.toSimpleString(groupByPropertyStr);
+ if (filter == null && groupByProperty == null) {
+ result.put(null, getMessageCount());
+ } else {
+ final int limit = addressSettingsRepository.getMatch(address).getManagementBrowsePageSize();
+ int count = 0;
+ try (LinkedListIterator<MessageReference> iterator = queue.browserIterator()) {
+ try {
+ while (iterator.hasNext() && count++ < limit) {
+ Message message = iterator.next().getMessage();
+ internalComputeMessage(result, filter, groupByProperty, message);
+ }
+ } catch (NoSuchElementException ignored) {
+ // this could happen through paging browsing
}
- } catch (NoSuchElementException ignored) {
- // this could happen through paging browsing
}
}
+ return result;
+ } finally {
+ blockOnIO();
}
- return result;
- } finally {
- blockOnIO();
}
}
@@ -1019,26 +1028,26 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
}
private Map<String, Long> internalCountDeliveryMessages(final String filterStr, final String groupByPropertyStr) throws Exception {
- checkStarted();
+ // it could be a long running task
+ try (AutoCloseable lock = server.managementLock()) {
+ checkStarted();
- clearIO();
+ clearIO();
- Map<String, Long> result = new HashMap<>();
- try {
- Filter filter = FilterImpl.createFilter(filterStr);
- SimpleString groupByProperty = SimpleString.toSimpleString(groupByPropertyStr);
- if (filter == null && groupByProperty == null) {
- result.put(null, Long.valueOf(getDeliveringCount()));
- } else {
- Map<String, List<MessageReference>> deliveringMessages = queue.getDeliveringMessages();
- deliveringMessages.forEach((s, messageReferenceList) ->
- messageReferenceList.forEach(messageReference ->
- internalComputeMessage(result, filter, groupByProperty, messageReference.getMessage())
- ));
+ Map<String, Long> result = new HashMap<>();
+ try {
+ Filter filter = FilterImpl.createFilter(filterStr);
+ SimpleString groupByProperty = SimpleString.toSimpleString(groupByPropertyStr);
+ if (filter == null && groupByProperty == null) {
+ result.put(null, Long.valueOf(getDeliveringCount()));
+ } else {
+ Map<String, List<MessageReference>> deliveringMessages = queue.getDeliveringMessages();
+ deliveringMessages.forEach((s, messageReferenceList) -> messageReferenceList.forEach(messageReference -> internalComputeMessage(result, filter, groupByProperty, messageReference.getMessage())));
+ }
+ return result;
+ } finally {
+ blockOnIO();
}
- return result;
- } finally {
- blockOnIO();
}
}
@@ -1057,18 +1066,21 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
@Override
public boolean removeMessage(final long messageID) throws Exception {
- if (AuditLogger.isBaseLoggingEnabled()) {
- AuditLogger.removeMessage(queue, messageID);
- }
- checkStarted();
+ // this is a critical task, we need to prevent parallel tasks running
+ try (AutoCloseable lock = server.managementLock()) {
+ if (AuditLogger.isBaseLoggingEnabled()) {
+ AuditLogger.removeMessage(queue, messageID);
+ }
+ checkStarted();
- clearIO();
- try {
- return queue.deleteReference(messageID);
- } catch (ActiveMQException e) {
- throw new IllegalStateException(e.getMessage());
- } finally {
- blockOnIO();
+ clearIO();
+ try {
+ return queue.deleteReference(messageID);
+ } catch (ActiveMQException e) {
+ throw new IllegalStateException(e.getMessage());
+ } finally {
+ blockOnIO();
+ }
}
}
@@ -1079,30 +1091,33 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
@Override
public int removeMessages(final int flushLimit, final String filterStr) throws Exception {
- if (AuditLogger.isBaseLoggingEnabled()) {
- AuditLogger.removeMessages(queue, flushLimit, filterStr);
- }
- checkStarted();
-
- clearIO();
- try {
- Filter filter = FilterImpl.createFilter(filterStr);
+ // this is a critical task, we need to prevent parallel tasks running
+ try (AutoCloseable lock = server.managementLock()) {
+ if (AuditLogger.isBaseLoggingEnabled()) {
+ AuditLogger.removeMessages(queue, flushLimit, filterStr);
+ }
+ checkStarted();
- int removed = 0;
+ clearIO();
try {
- removed = queue.deleteMatchingReferences(flushLimit, filter);
- if (AuditLogger.isResourceLoggingEnabled()) {
- AuditLogger.removeMessagesSuccess(removed, queue.getName().toString());
- }
- } catch (Exception e) {
- if (AuditLogger.isResourceLoggingEnabled()) {
- AuditLogger.removeMessagesFailure(queue.getName().toString());
+ Filter filter = FilterImpl.createFilter(filterStr);
+
+ int removed = 0;
+ try {
+ removed = queue.deleteMatchingReferences(flushLimit, filter);
+ if (AuditLogger.isResourceLoggingEnabled()) {
+ AuditLogger.removeMessagesSuccess(removed, queue.getName().toString());
+ }
+ } catch (Exception e) {
+ if (AuditLogger.isResourceLoggingEnabled()) {
+ AuditLogger.removeMessagesFailure(queue.getName().toString());
+ }
+ throw e;
}
- throw e;
+ return removed;
+ } finally {
+ blockOnIO();
}
- return removed;
- } finally {
- blockOnIO();
}
}
@@ -1113,87 +1128,99 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
@Override
public boolean expireMessage(final long messageID) throws Exception {
- if (AuditLogger.isBaseLoggingEnabled()) {
- AuditLogger.expireMessage(queue, messageID);
- }
- checkStarted();
+ // this is a critical task, we need to prevent parallel tasks running
+ try (AutoCloseable lock = server.managementLock()) {
+ if (AuditLogger.isBaseLoggingEnabled()) {
+ AuditLogger.expireMessage(queue, messageID);
+ }
+ checkStarted();
- clearIO();
- try {
- return queue.expireReference(messageID);
- } finally {
- blockOnIO();
+ clearIO();
+ try {
+ return queue.expireReference(messageID);
+ } finally {
+ blockOnIO();
+ }
}
}
@Override
public int expireMessages(final String filterStr) throws Exception {
- if (AuditLogger.isBaseLoggingEnabled()) {
- AuditLogger.expireMessages(queue, filterStr);
- }
- checkStarted();
+ // this is a critical task, we need to prevent parallel tasks running
+ try (AutoCloseable lock = server.managementLock()) {
+ if (AuditLogger.isBaseLoggingEnabled()) {
+ AuditLogger.expireMessages(queue, filterStr);
+ }
+ checkStarted();
- clearIO();
- try {
- Filter filter = FilterImpl.createFilter(filterStr);
- return queue.expireReferences(filter);
- } catch (ActiveMQException e) {
- throw new IllegalStateException(e.getMessage());
- } finally {
- blockOnIO();
+ clearIO();
+ try {
+ Filter filter = FilterImpl.createFilter(filterStr);
+ return queue.expireReferences(filter);
+ } catch (ActiveMQException e) {
+ throw new IllegalStateException(e.getMessage());
+ } finally {
+ blockOnIO();
+ }
}
}
@Override
public boolean retryMessage(final long messageID) throws Exception {
- if (AuditLogger.isBaseLoggingEnabled()) {
- AuditLogger.retryMessage(queue, messageID);
- }
+ // this is a critical task, we need to prevent parallel tasks running
+ try (AutoCloseable lock = server.managementLock()) {
+ if (AuditLogger.isBaseLoggingEnabled()) {
+ AuditLogger.retryMessage(queue, messageID);
+ }
- checkStarted();
- clearIO();
+ checkStarted();
+ clearIO();
- try {
- Filter singleMessageFilter = new Filter() {
- @Override
- public boolean match(Message message) {
- return message.getMessageID() == messageID;
- }
+ try {
+ Filter singleMessageFilter = new Filter() {
+ @Override
+ public boolean match(Message message) {
+ return message.getMessageID() == messageID;
+ }
- @Override
- public boolean match(Map<String, String> map) {
- return false;
- }
+ @Override
+ public boolean match(Map<String, String> map) {
+ return false;
+ }
- @Override
- public boolean match(Filterable filterable) {
- return false;
- }
+ @Override
+ public boolean match(Filterable filterable) {
+ return false;
+ }
- @Override
- public SimpleString getFilterString() {
- return new SimpleString("custom filter for MESSAGEID= messageID");
- }
- };
+ @Override
+ public SimpleString getFilterString() {
+ return new SimpleString("custom filter for MESSAGEID= messageID");
+ }
+ };
- return queue.retryMessages(singleMessageFilter) > 0;
- } finally {
- blockOnIO();
+ return queue.retryMessages(singleMessageFilter) > 0;
+ } finally {
+ blockOnIO();
+ }
}
}
@Override
public int retryMessages() throws Exception {
- if (AuditLogger.isBaseLoggingEnabled()) {
- AuditLogger.retryMessages(queue);
- }
- checkStarted();
- clearIO();
+ // this is a critical task, we need to prevent parallel tasks running
+ try (AutoCloseable lock = server.managementLock()) {
+ if (AuditLogger.isBaseLoggingEnabled()) {
+ AuditLogger.retryMessages(queue);
+ }
+ checkStarted();
+ clearIO();
- try {
- return queue.retryMessages(null);
- } finally {
- blockOnIO();
+ try {
+ return queue.retryMessages(null);
+ } finally {
+ blockOnIO();
+ }
}
}
@@ -1206,22 +1233,25 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
public boolean moveMessage(final long messageID,
final String otherQueueName,
final boolean rejectDuplicates) throws Exception {
- if (AuditLogger.isBaseLoggingEnabled()) {
- AuditLogger.moveMessage(queue, messageID, otherQueueName, rejectDuplicates);
- }
- checkStarted();
+ // this is a critical task, we need to prevent parallel tasks running
+ try (AutoCloseable lock = server.managementLock()) {
+ if (AuditLogger.isBaseLoggingEnabled()) {
+ AuditLogger.moveMessage(queue, messageID, otherQueueName, rejectDuplicates);
+ }
+ checkStarted();
- clearIO();
- try {
- Binding binding = server.getPostOffice().getBinding(new SimpleString(otherQueueName));
+ clearIO();
+ try {
+ Binding binding = server.getPostOffice().getBinding(new SimpleString(otherQueueName));
- if (binding == null) {
- throw ActiveMQMessageBundle.BUNDLE.noQueueFound(otherQueueName);
- }
+ if (binding == null) {
+ throw ActiveMQMessageBundle.BUNDLE.noQueueFound(otherQueueName);
+ }
- return queue.moveReference(messageID, binding.getAddress(), binding, rejectDuplicates);
- } finally {
- blockOnIO();
+ return queue.moveReference(messageID, binding.getAddress(), binding, rejectDuplicates);
+ } finally {
+ blockOnIO();
+ }
}
}
@@ -1245,25 +1275,28 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
final String otherQueueName,
final boolean rejectDuplicates,
final int messageCount) throws Exception {
- if (AuditLogger.isBaseLoggingEnabled()) {
- AuditLogger.moveMessages(queue, flushLimit, filterStr, otherQueueName, rejectDuplicates, messageCount);
- }
- checkStarted();
+ // this is a critical task, we need to prevent parallel tasks running
+ try (AutoCloseable lock = server.managementLock()) {
+ if (AuditLogger.isBaseLoggingEnabled()) {
+ AuditLogger.moveMessages(queue, flushLimit, filterStr, otherQueueName, rejectDuplicates, messageCount);
+ }
+ checkStarted();
- clearIO();
- try {
- Filter filter = FilterImpl.createFilter(filterStr);
+ clearIO();
+ try {
+ Filter filter = FilterImpl.createFilter(filterStr);
- Binding binding = server.getPostOffice().getBinding(new SimpleString(otherQueueName));
+ Binding binding = server.getPostOffice().getBinding(new SimpleString(otherQueueName));
- if (binding == null) {
- throw ActiveMQMessageBundle.BUNDLE.noQueueFound(otherQueueName);
- }
+ if (binding == null) {
+ throw ActiveMQMessageBundle.BUNDLE.noQueueFound(otherQueueName);
+ }
- int retValue = queue.moveReferences(flushLimit, filter, binding.getAddress(), rejectDuplicates, messageCount, binding);
- return retValue;
- } finally {
- blockOnIO();
+ int retValue = queue.moveReferences(flushLimit, filter, binding.getAddress(), rejectDuplicates, messageCount, binding);
+ return retValue;
+ } finally {
+ blockOnIO();
+ }
}
}
@@ -1277,18 +1310,21 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
@Override
public int sendMessagesToDeadLetterAddress(final String filterStr) throws Exception {
- if (AuditLogger.isBaseLoggingEnabled()) {
- AuditLogger.sendMessagesToDeadLetterAddress(queue, filterStr);
- }
- checkStarted();
+ // this is a critical task, we need to prevent parallel tasks running
+ try (AutoCloseable lock = server.managementLock()) {
+ if (AuditLogger.isBaseLoggingEnabled()) {
+ AuditLogger.sendMessagesToDeadLetterAddress(queue, filterStr);
+ }
+ checkStarted();
- clearIO();
- try {
- Filter filter = FilterImpl.createFilter(filterStr);
+ clearIO();
+ try {
+ Filter filter = FilterImpl.createFilter(filterStr);
- return queue.sendMessagesToDeadLetterAddress(filter);
- } finally {
- blockOnIO();
+ return queue.sendMessagesToDeadLetterAddress(filter);
+ } finally {
+ blockOnIO();
+ }
}
}
@@ -1310,35 +1346,41 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
final String user,
final String password,
boolean createMessageId) throws Exception {
- if (AuditLogger.isBaseLoggingEnabled()) {
- AuditLogger.sendMessageThroughManagement(queue, headers, type, body, durable, user, "****");
- }
- try {
- String s = sendMessage(queue.getAddress(), server, headers, type, body, durable, user, password, createMessageId, queue.getID());
- if (AuditLogger.isResourceLoggingEnabled()) {
- AuditLogger.sendMessageSuccess(queue.getName().toString(), user);
+ // this is a critical task, we need to prevent parallel tasks running
+ try (AutoCloseable lock = server.managementLock()) {
+ if (AuditLogger.isBaseLoggingEnabled()) {
+ AuditLogger.sendMessageThroughManagement(queue, headers, type, body, durable, user, "****");
}
- return s;
- } catch (Exception e) {
- if (AuditLogger.isResourceLoggingEnabled()) {
- AuditLogger.sendMessageFailure(queue.getName().toString(), user);
+ try {
+ String s = sendMessage(queue.getAddress(), server, headers, type, body, durable, user, password, createMessageId, queue.getID());
+ if (AuditLogger.isResourceLoggingEnabled()) {
+ AuditLogger.sendMessageSuccess(queue.getName().toString(), user);
+ }
+ return s;
+ } catch (Exception e) {
+ if (AuditLogger.isResourceLoggingEnabled()) {
+ AuditLogger.sendMessageFailure(queue.getName().toString(), user);
+ }
+ throw new IllegalStateException(e.getMessage());
}
- throw new IllegalStateException(e.getMessage());
}
}
@Override
public boolean sendMessageToDeadLetterAddress(final long messageID) throws Exception {
- if (AuditLogger.isBaseLoggingEnabled()) {
- AuditLogger.sendMessageToDeadLetterAddress(queue, messageID);
- }
- checkStarted();
+ // this is a critical task, we need to prevent parallel tasks running
+ try (AutoCloseable lock = server.managementLock()) {
+ if (AuditLogger.isBaseLoggingEnabled()) {
+ AuditLogger.sendMessageToDeadLetterAddress(queue, messageID);
+ }
+ checkStarted();
- clearIO();
- try {
- return queue.sendMessageToDeadLetterAddress(messageID);
- } finally {
- blockOnIO();
+ clearIO();
+ try {
+ return queue.sendMessageToDeadLetterAddress(messageID);
+ } finally {
+ blockOnIO();
+ }
}
}
@@ -1554,52 +1596,55 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
@Override
public CompositeData[] browse(int page, int pageSize, String filter) throws Exception {
- if (AuditLogger.isBaseLoggingEnabled()) {
- AuditLogger.browse(queue, page, pageSize);
- }
- checkStarted();
+ // this is a critical task, we need to prevent parallel tasks running
+ try (AutoCloseable lock = server.managementLock()) {
+ if (AuditLogger.isBaseLoggingEnabled()) {
+ AuditLogger.browse(queue, page, pageSize);
+ }
+ checkStarted();
- clearIO();
- try {
- long index = 0;
- long start = (long) (page - 1) * pageSize;
- long end = Math.min(page * pageSize, queue.getMessageCount());
+ clearIO();
+ try {
+ long index = 0;
+ long start = (long) (page - 1) * pageSize;
+ long end = Math.min(page * pageSize, queue.getMessageCount());
- ArrayList<CompositeData> c = new ArrayList<>();
- Filter thefilter = FilterImpl.createFilter(filter);
+ ArrayList<CompositeData> c = new ArrayList<>();
+ Filter thefilter = FilterImpl.createFilter(filter);
- final int attributeSizeLimit = addressSettingsRepository.getMatch(address).getManagementMessageAttributeSizeLimit();
- try (LinkedListIterator<MessageReference> iterator = queue.browserIterator()) {
- try {
- while (iterator.hasNext() && index < end) {
- MessageReference ref = iterator.next();
- if (thefilter == null || thefilter.match(ref.getMessage())) {
- if (index >= start) {
- c.add(ref.getMessage().toCompositeData(attributeSizeLimit, ref.getDeliveryCount()));
+ final int attributeSizeLimit = addressSettingsRepository.getMatch(address).getManagementMessageAttributeSizeLimit();
+ try (LinkedListIterator<MessageReference> iterator = queue.browserIterator()) {
+ try {
+ while (iterator.hasNext() && index < end) {
+ MessageReference ref = iterator.next();
+ if (thefilter == null || thefilter.match(ref.getMessage())) {
+ if (index >= start) {
+ c.add(ref.getMessage().toCompositeData(attributeSizeLimit, ref.getDeliveryCount()));
+ }
+ //we only increase the index if we add a message, otherwise we could stop before we get to a filtered message
+ index++;
}
- //we only increase the index if we add a message, otherwise we could stop before we get to a filtered message
- index++;
}
+ } catch (NoSuchElementException ignored) {
+ // this could happen through paging browsing
}
- } catch (NoSuchElementException ignored) {
- // this could happen through paging browsing
- }
- CompositeData[] rc = new CompositeData[c.size()];
- c.toArray(rc);
+ CompositeData[] rc = new CompositeData[c.size()];
+ c.toArray(rc);
+ if (AuditLogger.isResourceLoggingEnabled()) {
+ AuditLogger.browseMessagesSuccess(queue.getName().toString(), c.size());
+ }
+ return rc;
+ }
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
if (AuditLogger.isResourceLoggingEnabled()) {
- AuditLogger.browseMessagesSuccess(queue.getName().toString(), c.size());
+ AuditLogger.browseMessagesFailure(queue.getName().toString());
}
- return rc;
- }
- } catch (Exception e) {
- logger.warn(e.getMessage(), e);
- if (AuditLogger.isResourceLoggingEnabled()) {
- AuditLogger.browseMessagesFailure(queue.getName().toString());
+ throw new IllegalStateException(e.getMessage());
+ } finally {
+ blockOnIO();
}
- throw new IllegalStateException(e.getMessage());
- } finally {
- blockOnIO();
}
}
@@ -1609,46 +1654,49 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
}
@Override
public CompositeData[] browse(String filter) throws Exception {
- if (AuditLogger.isBaseLoggingEnabled()) {
- AuditLogger.browse(queue, filter);
- }
- checkStarted();
+ // this is a critical task, we need to prevent parallel tasks running
+ try (AutoCloseable lock = server.managementLock()) {
+ if (AuditLogger.isBaseLoggingEnabled()) {
+ AuditLogger.browse(queue, filter);
+ }
+ checkStarted();
- clearIO();
- try {
- final AddressSettings addressSettings = addressSettingsRepository.getMatch(address);
- final int attributeSizeLimit = addressSettings.getManagementMessageAttributeSizeLimit();
- final int limit = addressSettings.getManagementBrowsePageSize();
- int currentPageSize = 0;
- ArrayList<CompositeData> c = new ArrayList<>();
- Filter thefilter = FilterImpl.createFilter(filter);
- try (LinkedListIterator<MessageReference> iterator = queue.browserIterator()) {
- try {
- while (iterator.hasNext() && currentPageSize++ < limit) {
- MessageReference ref = iterator.next();
- if (thefilter == null || thefilter.match(ref.getMessage())) {
- c.add(ref.getMessage().toCompositeData(attributeSizeLimit, ref.getDeliveryCount()));
+ clearIO();
+ try {
+ final AddressSettings addressSettings = addressSettingsRepository.getMatch(address);
+ final int attributeSizeLimit = addressSettings.getManagementMessageAttributeSizeLimit();
+ final int limit = addressSettings.getManagementBrowsePageSize();
+ int currentPageSize = 0;
+ ArrayList<CompositeData> c = new ArrayList<>();
+ Filter thefilter = FilterImpl.createFilter(filter);
+ try (LinkedListIterator<MessageReference> iterator = queue.browserIterator()) {
+ try {
+ while (iterator.hasNext() && currentPageSize++ < limit) {
+ MessageReference ref = iterator.next();
+ if (thefilter == null || thefilter.match(ref.getMessage())) {
+ c.add(ref.getMessage().toCompositeData(attributeSizeLimit, ref.getDeliveryCount()));
+ }
}
+ } catch (NoSuchElementException ignored) {
+ // this could happen through paging browsing
}
- } catch (NoSuchElementException ignored) {
- // this could happen through paging browsing
- }
- CompositeData[] rc = new CompositeData[c.size()];
- c.toArray(rc);
+ CompositeData[] rc = new CompositeData[c.size()];
+ c.toArray(rc);
+ if (AuditLogger.isResourceLoggingEnabled()) {
+ AuditLogger.browseMessagesSuccess(queue.getName().toString(), currentPageSize);
+ }
+ return rc;
+ }
+ } catch (ActiveMQException e) {
if (AuditLogger.isResourceLoggingEnabled()) {
- AuditLogger.browseMessagesSuccess(queue.getName().toString(), currentPageSize);
+ AuditLogger.browseMessagesFailure(queue.getName().toString());
}
- return rc;
- }
- } catch (ActiveMQException e) {
- if (AuditLogger.isResourceLoggingEnabled()) {
- AuditLogger.browseMessagesFailure(queue.getName().toString());
+ throw new IllegalStateException(e.getMessage());
+ } finally {
+ blockOnIO();
}
- throw new IllegalStateException(e.getMessage());
- } finally {
- blockOnIO();
}
}
@@ -1714,32 +1762,35 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
@Override
public String listGroupsAsJSON() throws Exception {
- if (AuditLogger.isBaseLoggingEnabled()) {
- AuditLogger.listGroupsAsJSON(queue);
- }
- checkStarted();
+ // prevent parallel tasks running
+ try (AutoCloseable lock = server.managementLock()) {
+ if (AuditLogger.isBaseLoggingEnabled()) {
+ AuditLogger.listGroupsAsJSON(queue);
+ }
+ checkStarted();
- clearIO();
- try {
- Map<SimpleString, Consumer> groups = queue.getGroups();
+ clearIO();
+ try {
+ Map<SimpleString, Consumer> groups = queue.getGroups();
- JsonArrayBuilder jsonArray = JsonLoader.createArrayBuilder();
+ JsonArrayBuilder jsonArray = JsonLoader.createArrayBuilder();
- for (Map.Entry<SimpleString, Consumer> group : groups.entrySet()) {
+ for (Map.Entry<SimpleString, Consumer> group : groups.entrySet()) {
- if (group.getValue() instanceof ServerConsumer) {
- ServerConsumer serverConsumer = (ServerConsumer) group.getValue();
+ if (group.getValue() instanceof ServerConsumer) {
+ ServerConsumer serverConsumer = (ServerConsumer) group.getValue();
- JsonObjectBuilder obj = JsonLoader.createObjectBuilder().add("groupID", group.getKey().toString()).add("consumerID", serverConsumer.getID()).add("connectionID", serverConsumer.getConnectionID().toString()).add("sessionID", serverConsumer.getSessionID()).add("browseOnly", serverConsumer.isBrowseOnly()).add("creationTime", serverConsumer.getCreationTime());
+ JsonObjectBuilder obj = JsonLoader.createObjectBuilder().add("groupID", group.getKey().toString()).add("consumerID", serverConsumer.getID()).add("connectionID", serverConsumer.getConnectionID().toString()).add("sessionID", serverConsumer.getSessionID()).add("browseOnly", serverConsumer.isBrowseOnly()).add("creationTime", serverConsumer.getCreationTime());
+
+ jsonArray.add(obj);
+ }
- jsonArray.add(obj);
}
+ return jsonArray.build().toString();
+ } finally {
+ blockOnIO();
}
-
- return jsonArray.build().toString();
- } finally {
- blockOnIO();
}
}
@@ -1969,31 +2020,37 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
@Override
public void deliverScheduledMessages(String filter) throws Exception {
- if (AuditLogger.isBaseLoggingEnabled()) {
- AuditLogger.deliverScheduledMessage(queue, filter);
- }
- checkStarted();
+ // prevent parallel tasks running
+ try (AutoCloseable lock = server.managementLock()) {
+ if (AuditLogger.isBaseLoggingEnabled()) {
+ AuditLogger.deliverScheduledMessage(queue, filter);
+ }
+ checkStarted();
- clearIO();
- try {
- queue.deliverScheduledMessages(filter);
- } finally {
- blockOnIO();
+ clearIO();
+ try {
+ queue.deliverScheduledMessages(filter);
+ } finally {
+ blockOnIO();
+ }
}
}
@Override
public void deliverScheduledMessage(long messageId) throws Exception {
- if (AuditLogger.isBaseLoggingEnabled()) {
- AuditLogger.deliverScheduledMessage(queue, messageId);
- }
- checkStarted();
+ // prevent parallel tasks running
+ try (AutoCloseable lock = server.managementLock()) {
+ if (AuditLogger.isBaseLoggingEnabled()) {
+ AuditLogger.deliverScheduledMessage(queue, messageId);
+ }
+ checkStarted();
- clearIO();
- try {
- queue.deliverScheduledMessage(messageId);
- } finally {
- blockOnIO();
+ clearIO();
+ try {
+ queue.deliverScheduledMessage(messageId);
+ } finally {
+ blockOnIO();
+ }
}
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index bcfab14924..9e122d670e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -50,6 +50,7 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@@ -277,7 +278,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
/** Certain management operations shouldn't use more than one thread.
* this semaphore is used to guarantee a single thread used. */
- private final Semaphore managementSemaphore = new Semaphore(1);
+ private final ReentrantLock managementLock = new ReentrantLock();
/**
* This is a thread pool for io tasks only.
@@ -522,7 +523,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
if (replayManager == null) {
throw ActiveMQMessageBundle.BUNDLE.noRetention();
}
- replayManager.replay(start, end, address, target, filter);
+ try (AutoCloseable lock = managementLock()) {
+ replayManager.replay(start, end, address, target, filter);
+ }
}
/**
@@ -4626,10 +4629,10 @@ public class ActiveMQServerImpl implements ActiveMQServer {
@Override
public AutoCloseable managementLock() throws Exception {
- if (!managementSemaphore.tryAcquire(1, TimeUnit.MINUTES)) {
+ if (!managementLock.tryLock(1, TimeUnit.MINUTES)) {
throw ActiveMQMessageBundle.BUNDLE.managementBusy();
} else {
- return managementSemaphore::release;
+ return managementLock::unlock;
}
}
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/replay/ReplayManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/replay/ReplayManager.java
index bb46c337ea..a425b83b6f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/replay/ReplayManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/replay/ReplayManager.java
@@ -69,23 +69,7 @@ public class ReplayManager {
this.retentionFolder = server.getConfiguration().getJournalRetentionLocation();
}
- public void replay(Date start, Date end, String sourceAddress, final String targetAddress, String filter) throws Exception {
-
- if (running.compareAndSet(false, true)) {
- try {
- actualReplay(start, end, sourceAddress, targetAddress, filter);
- } catch (Exception e) {
- logger.warn(e.getMessage());
- throw e;
- } finally {
- running.set(false);
- }
- } else {
- throw new RuntimeException("Replay manager is currently busy with another operation");
- }
- }
-
- private void actualReplay(Date start, Date end, String sourceAddress, String targetAddressParameter, String filterStr) throws Exception {
+ public void replay(Date start, Date end, String sourceAddress, String targetAddressParameter, String filterStr) throws Exception {
logger.debug("Replay::{}", sourceAddress);
if (sourceAddress == null) {