You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2020/06/08 22:02:59 UTC
[activemq-artemis] branch master updated: ARTEMIS-2787 - Add
ability to disable and enable a queue
This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new 99f6c7b ARTEMIS-2787 - Add ability to disable and enable a queue
new 1691f18 This closes #3169
99f6c7b is described below
commit 99f6c7bf2014ba1fd77ac41f4ee986c2a80700a6
Author: Michael Pearce <mi...@me.com>
AuthorDate: Mon Jun 1 12:53:00 2020 +0100
ARTEMIS-2787 - Add ability to disable and enable a queue
Add feature
Add tests
Add docs
Add missing bits noticed in ring-size
Address comments
---
.../activemq/artemis/api/core/QueueAttributes.java | 15 ++++++
.../artemis/api/core/QueueConfiguration.java | 21 ++++++++
.../apache/activemq/artemis/logs/AuditLogger.java | 26 ++++++++++
.../api/config/ActiveMQDefaultConfiguration.java | 6 +++
.../artemis/api/core/client/ClientSession.java | 2 +
.../artemis/api/core/management/QueueControl.java | 21 +++++++-
.../artemis/core/client/impl/QueueQueryImpl.java | 13 ++++-
.../protocol/core/impl/ActiveMQSessionContext.java | 2 +-
.../impl/wireformat/CreateQueueMessage_V2.java | 34 +++++++++++--
.../wireformat/CreateSharedQueueMessage_V2.java | 51 +++++++++++++++++++-
.../SessionQueueQueryResponseMessage_V3.java | 39 +++++++++++++--
.../artemis/core/server/QueueQueryResult.java | 11 ++++-
.../core/config/CoreQueueConfiguration.java | 34 ++++++++++++-
.../deployers/impl/FileConfigurationParser.java | 4 ++
.../core/management/impl/QueueControlImpl.java | 46 ++++++++++++++++++
.../artemis/core/persistence/QueueBindingInfo.java | 4 ++
.../journal/AbstractJournalStorageManager.java | 2 +-
.../codec/PersistentQueueBindingEncoding.java | 25 +++++++++-
.../core/postoffice/impl/PostOfficeImpl.java | 4 ++
.../apache/activemq/artemis/core/server/Queue.java | 4 ++
.../core/server/impl/ActiveMQServerImpl.java | 8 ++--
.../core/server/impl/PostOfficeJournalLoader.java | 1 +
.../core/server/impl/QueueConfigurationUtils.java | 4 +-
.../artemis/core/server/impl/QueueImpl.java | 18 +++++++
.../resources/schema/artemis-configuration.xsd | 1 +
.../impl/journal/QueueBindingEncodingTest.java | 3 ++
.../server/impl/ScheduledDeliveryHandlerTest.java | 10 ++++
.../src/test/resources/artemis-configuration.xsd | 1 +
docs/user-manual/en/address-model.md | 28 +++++++++++
docs/user-manual/en/management.md | 9 ++++
.../integration/addressing/AddressingTest.java | 56 ++++++++++++++++++++++
.../tests/integration/client/SessionTest.java | 20 ++++++++
.../tests/integration/jms/RedeployTest.java | 12 +++++
.../management/QueueControlUsingCoreTest.java | 15 ++++++
.../persistence/QueueConfigRestartTest.java | 44 +++++++++++++++++
.../src/test/resources/reload-changed.xml | 4 +-
.../src/test/resources/reload-original.xml | 1 +
.../tests/unit/core/postoffice/impl/FakeQueue.java | 10 ++++
38 files changed, 586 insertions(+), 23 deletions(-)
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/QueueAttributes.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/QueueAttributes.java
index 5f6ddb8..ca4d4d8 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/QueueAttributes.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/QueueAttributes.java
@@ -41,6 +41,7 @@ public class QueueAttributes implements Serializable {
public static final String AUTO_DELETE_DELAY = "auto-delete-delay";
public static final String AUTO_DELETE_MESSAGE_COUNT = "auto-delete-message-count";
public static final String RING_SIZE = "ring-size";
+ public static final String ENABLED = "enabled";
private RoutingType routingType;
private SimpleString filterString;
@@ -61,6 +62,7 @@ public class QueueAttributes implements Serializable {
private Long autoDeleteDelay;
private Long autoDeleteMessageCount;
private Long ringSize;
+ private Boolean enabled;
public void set(String key, String value) {
@@ -103,6 +105,8 @@ public class QueueAttributes implements Serializable {
setAutoDeleteMessageCount(Long.valueOf(value));
} else if (key.equals(RING_SIZE)) {
setRingSize(Long.valueOf(value));
+ } else if (key.equals(ENABLED)) {
+ setEnabled(Boolean.valueOf(value));
}
}
}
@@ -113,6 +117,7 @@ public class QueueAttributes implements Serializable {
.setRoutingType(this.getRoutingType())
.setExclusive(this.getExclusive())
.setRingSize(this.getRingSize())
+ .setEnabled(this.isEnabled())
.setGroupRebalance(this.getGroupRebalance())
.setNonDestructive(this.getNonDestructive())
.setLastValue(this.getLastValue())
@@ -139,6 +144,7 @@ public class QueueAttributes implements Serializable {
.setRoutingType(queueConfiguration.getRoutingType())
.setExclusive(queueConfiguration.isExclusive())
.setRingSize(queueConfiguration.getRingSize())
+ .setEnabled(queueConfiguration.isEnabled())
.setGroupRebalance(queueConfiguration.isGroupRebalance())
.setNonDestructive(queueConfiguration.isNonDestructive())
.setLastValue(queueConfiguration.isLastValue())
@@ -327,4 +333,13 @@ public class QueueAttributes implements Serializable {
this.ringSize = ringSize;
return this;
}
+
+ public Boolean isEnabled() {
+ return enabled;
+ }
+
+ public QueueAttributes setEnabled(Boolean enabled) {
+ this.enabled = enabled;
+ return this;
+ }
}
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/QueueConfiguration.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/QueueConfiguration.java
index 58ce01c..afe438e 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/QueueConfiguration.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/QueueConfiguration.java
@@ -62,6 +62,7 @@ public class QueueConfiguration implements Serializable {
public static final String LAST_VALUE_KEY = "last-value-key";
public static final String NON_DESTRUCTIVE = "non-destructive";
public static final String PURGE_ON_NO_CONSUMERS = "purge-on-no-consumers";
+ public static final String ENABLED = "enabled";
public static final String CONSUMERS_BEFORE_DISPATCH = "consumers-before-dispatch";
public static final String DELAY_BEFORE_DISPATCH = "delay-before-dispatch";
public static final String CONSUMER_PRIORITY = "consumer-priority";
@@ -92,6 +93,7 @@ public class QueueConfiguration implements Serializable {
private SimpleString lastValueKey;
private Boolean nonDestructive;
private Boolean purgeOnNoConsumers;
+ private Boolean enabled;
private Integer consumersBeforeDispatch;
private Long delayBeforeDispatch;
private Integer consumerPriority;
@@ -202,6 +204,8 @@ public class QueueConfiguration implements Serializable {
setNonDestructive(Boolean.valueOf(value));
} else if (key.equals(PURGE_ON_NO_CONSUMERS)) {
setPurgeOnNoConsumers(Boolean.valueOf(value));
+ } else if (key.equals(ENABLED)) {
+ setEnabled(Boolean.valueOf(value));
} else if (key.equals(CONSUMERS_BEFORE_DISPATCH)) {
setConsumersBeforeDispatch(Integer.valueOf(value));
} else if (key.equals(DELAY_BEFORE_DISPATCH)) {
@@ -409,6 +413,16 @@ public class QueueConfiguration implements Serializable {
return this;
}
+ public Boolean isEnabled() {
+ return enabled;
+ }
+
+ public QueueConfiguration setEnabled(Boolean enabled) {
+ this.enabled = enabled;
+ return this;
+ }
+
+
public Integer getConsumersBeforeDispatch() {
return consumersBeforeDispatch;
}
@@ -634,6 +648,9 @@ public class QueueConfiguration implements Serializable {
if (isPurgeOnNoConsumers() != null) {
builder.add(PURGE_ON_NO_CONSUMERS, isPurgeOnNoConsumers());
}
+ if (isEnabled() != null) {
+ builder.add(ENABLED, isEnabled());
+ }
if (getConsumersBeforeDispatch() != null) {
builder.add(CONSUMERS_BEFORE_DISPATCH, getConsumersBeforeDispatch());
}
@@ -741,6 +758,8 @@ public class QueueConfiguration implements Serializable {
return false;
if (!Objects.equals(purgeOnNoConsumers, that.purgeOnNoConsumers))
return false;
+ if (!Objects.equals(enabled, that.enabled))
+ return false;
if (!Objects.equals(consumersBeforeDispatch, that.consumersBeforeDispatch))
return false;
if (!Objects.equals(delayBeforeDispatch, that.delayBeforeDispatch))
@@ -789,6 +808,7 @@ public class QueueConfiguration implements Serializable {
result = 31 * result + Objects.hashCode(lastValueKey);
result = 31 * result + Objects.hashCode(nonDestructive);
result = 31 * result + Objects.hashCode(purgeOnNoConsumers);
+ result = 31 * result + Objects.hashCode(enabled);
result = 31 * result + Objects.hashCode(consumersBeforeDispatch);
result = 31 * result + Objects.hashCode(delayBeforeDispatch);
result = 31 * result + Objects.hashCode(consumerPriority);
@@ -824,6 +844,7 @@ public class QueueConfiguration implements Serializable {
+ ", lastValueKey=" + lastValueKey
+ ", nonDestructive=" + nonDestructive
+ ", purgeOnNoConsumers=" + purgeOnNoConsumers
+ + ", enabled=" + enabled
+ ", consumersBeforeDispatch=" + consumersBeforeDispatch
+ ", delayBeforeDispatch=" + delayBeforeDispatch
+ ", consumerPriority=" + consumerPriority
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java
index 2e97449..2cab145 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java
@@ -2670,4 +2670,30 @@ public interface AuditLogger extends BasicLogger {
@LogMessage(level = Logger.Level.INFO)
@Message(id = 601727, value = "User {0} is updating a divert on target resource: {1} {2}", format = Message.Format.MESSAGE_FORMAT)
void updateDivert(String user, Object source, Object... args);
+
+ static void isEnabled(Object source) {
+ LOGGER.isEnabled(getCaller(), source);
+ }
+
+ @LogMessage(level = Logger.Level.INFO)
+ @Message(id = 601728, value = "User {0} is getting enabled property on target resource: {1} {2}", format = Message.Format.MESSAGE_FORMAT)
+ void isEnabled(String user, Object source, Object... args);
+
+ static void disable(Object source, Object... args) {
+ LOGGER.disable(getCaller(), source, arrayToString(args));
+ }
+
+ @LogMessage(level = Logger.Level.INFO)
+ @Message(id = 601729, value = "User {0} is disabling on target resource: {1} {2}", format = Message.Format.MESSAGE_FORMAT)
+ void disable(String user, Object source, Object... args);
+
+ static void enable(Object source) {
+ LOGGER.resume(getCaller(), source);
+ }
+
+ @LogMessage(level = Logger.Level.INFO)
+ @Message(id = 601730, value = "User {0} is enabling on target resource: {1} {2}", format = Message.Format.MESSAGE_FORMAT)
+ void enable(String user, Object source, Object... args);
+
+
}
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
index acee42f..60656e5 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
@@ -514,6 +514,8 @@ public final class ActiveMQDefaultConfiguration {
public static final boolean DEFAULT_PURGE_ON_NO_CONSUMERS = false;
+ public static final boolean DEFAULT_ENABLED = true;
+
public static final boolean DEFAULT_QUEUE_AUTO_DELETE = true;
public static final boolean DEFAULT_CREATED_QUEUE_AUTO_DELETE = false;
@@ -1453,6 +1455,10 @@ public final class ActiveMQDefaultConfiguration {
return DEFAULT_PURGE_ON_NO_CONSUMERS;
}
+ public static boolean getDefaultEnabled() {
+ return DEFAULT_ENABLED;
+ }
+
public static boolean getDefaultQueueAutoDelete(boolean autoCreated) {
return autoCreated ? getDefaultQueueAutoDelete() : getDefaultCreatedQueueAutoDelete();
}
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java
index feba48a..965a4e7 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java
@@ -173,6 +173,8 @@ public interface ClientSession extends XAResource, AutoCloseable {
Long getAutoDeleteMessageCount();
Long getRingSize();
+
+ Boolean isEnabled();
}
// Lifecycle operations ------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
index 3c0e7f3..bbb27c0 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
@@ -257,12 +257,30 @@ public interface QueueControl {
/**
*
*/
- @Attribute(desc = "delete this queue when the last consumer disconnects")
+ @Attribute(desc = "purge this queue when the last consumer disconnects")
boolean isPurgeOnNoConsumers();
/**
*
*/
+ @Attribute(desc = "if the queue is enabled, default it is enabled, when disabled messages will not be routed to the queue")
+ boolean isEnabled();
+
+ /**
+ * Enables the queue. Messages are now routed to this queue.
+ */
+ @Operation(desc = "Enables routing of messages to the Queue", impact = MBeanOperationInfo.ACTION)
+ void enable() throws Exception;
+
+ /**
+ * Enables the queue. Messages are not routed to this queue.
+ */
+ @Operation(desc = "Disables routing of messages to the Queue", impact = MBeanOperationInfo.ACTION)
+ void disable() throws Exception;
+
+ /**
+ *
+ */
@Attribute(desc = "is this queue managed by configuration (broker.xml)")
boolean isConfigurationManaged();
@@ -595,6 +613,7 @@ public interface QueueControl {
@Operation(desc = "Resumes delivery of queued messages and gets the queue out of paused state. It will also affected the state of a persisted pause.", impact = MBeanOperationInfo.ACTION)
void resume() throws Exception;
+
@Operation(desc = "List all the existent consumers on the Queue")
String listConsumersAsJSON() throws Exception;
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/QueueQueryImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/QueueQueryImpl.java
index 93df02a..d28f4f4 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/QueueQueryImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/QueueQueryImpl.java
@@ -74,6 +74,8 @@ public class QueueQueryImpl implements ClientSession.QueueQuery {
private final Long ringSize;
+ private final Boolean enabled;
+
private final Integer defaultConsumerWindowSize;
@@ -160,7 +162,7 @@ public class QueueQueryImpl implements ClientSession.QueueQuery {
final Long autoDeleteDelay,
final Long autoDeleteMessageCount,
final Integer defaultConsumerWindowSize) {
- this(durable, temporary, consumerCount, messageCount, filterString, address, name, exists, autoCreateQueues, maxConsumers, autoCreated, purgeOnNoConsumers, routingType, exclusive, groupRebalance, groupBuckets, null, lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, autoDelete, autoDeleteDelay, autoDeleteMessageCount, defaultConsumerWindowSize, null);
+ this(durable, temporary, consumerCount, messageCount, filterString, address, name, exists, autoCreateQueues, maxConsumers, autoCreated, purgeOnNoConsumers, routingType, exclusive, groupRebalance, groupBuckets, null, lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, autoDelete, autoDeleteDelay, autoDeleteMessageCount, defaultConsumerWindowSize, null, null);
}
public QueueQueryImpl(final boolean durable,
@@ -189,7 +191,8 @@ public class QueueQueryImpl implements ClientSession.QueueQuery {
final Long autoDeleteDelay,
final Long autoDeleteMessageCount,
final Integer defaultConsumerWindowSize,
- final Long ringSize) {
+ final Long ringSize,
+ final Boolean enabled) {
this.durable = durable;
this.temporary = temporary;
this.consumerCount = consumerCount;
@@ -217,6 +220,7 @@ public class QueueQueryImpl implements ClientSession.QueueQuery {
this.autoDeleteMessageCount = autoDeleteMessageCount;
this.defaultConsumerWindowSize = defaultConsumerWindowSize;
this.ringSize = ringSize;
+ this.enabled = enabled;
}
@Override
@@ -353,5 +357,10 @@ public class QueueQueryImpl implements ClientSession.QueueQuery {
public Long getRingSize() {
return ringSize;
}
+
+ @Override
+ public Boolean isEnabled() {
+ return enabled;
+ }
}
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
index 9e2bba7..a4aa8ab 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
@@ -905,7 +905,7 @@ public class ActiveMQSessionContext extends SessionContext {
// We try to recreate any non-durable or auto-created queues, since they might not be there on failover/reconnect.
// This allows e.g. JMS non durable subs and temporary queues to continue to be used after failover/reconnection
if (!queueInfo.isDurable() || queueInfo.isAutoCreated()) {
- CreateQueueMessage_V2 createQueueRequest = new CreateQueueMessage_V2(queueInfo.getAddress(), queueInfo.getName(), queueInfo.getRoutingType(), queueInfo.getFilterString(), queueInfo.isDurable(), queueInfo.isTemporary(), queueInfo.getMaxConsumers(), queueInfo.isPurgeOnNoConsumers(), queueInfo.isAutoCreated(), false, queueInfo.isExclusive(), queueInfo.isGroupRebalance(), queueInfo.getGroupBuckets(), queueInfo.getGroupFirstKey(), queueInfo.isLastValue(), queueInfo.getLastValueKey(), [...]
+ CreateQueueMessage_V2 createQueueRequest = new CreateQueueMessage_V2(queueInfo.getAddress(), queueInfo.getName(), queueInfo.getRoutingType(), queueInfo.getFilterString(), queueInfo.isDurable(), queueInfo.isTemporary(), queueInfo.getMaxConsumers(), queueInfo.isPurgeOnNoConsumers(), queueInfo.isAutoCreated(), false, queueInfo.isExclusive(), queueInfo.isGroupRebalance(), queueInfo.getGroupBuckets(), queueInfo.getGroupFirstKey(), queueInfo.isLastValue(), queueInfo.getLastValueKey(), [...]
sendPacketWithoutLock(sessionChannel, createQueueRequest);
}
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage_V2.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage_V2.java
index 5b24edc..3fe20a7 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage_V2.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage_V2.java
@@ -59,6 +59,8 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage {
private Long ringSize;
+ private Boolean enabled;
+
@Deprecated
public CreateQueueMessage_V2(final SimpleString address,
final SimpleString queueName,
@@ -89,7 +91,8 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage {
queueAttributes.getAutoDelete(),
queueAttributes.getAutoDeleteDelay(),
queueAttributes.getAutoDeleteMessageCount(),
- queueAttributes.getRingSize()
+ queueAttributes.getRingSize(),
+ queueAttributes.isEnabled()
);
}
@@ -118,7 +121,8 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage {
queueConfiguration.isAutoDelete(),
queueConfiguration.getAutoDeleteDelay(),
queueConfiguration.getAutoDeleteMessageCount(),
- queueConfiguration.getRingSize()
+ queueConfiguration.getRingSize(),
+ queueConfiguration.isEnabled()
);
}
@@ -144,7 +148,8 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage {
final Boolean autoDelete,
final Long autoDeleteDelay,
final Long autoDeleteMessageCount,
- final Long ringSize) {
+ final Long ringSize,
+ final Boolean enabled) {
this();
this.address = address;
@@ -170,6 +175,7 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage {
this.autoDeleteDelay = autoDeleteDelay;
this.autoDeleteMessageCount = autoDeleteMessageCount;
this.ringSize = ringSize;
+ this.enabled = enabled;
}
public CreateQueueMessage_V2() {
@@ -200,7 +206,8 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage {
.setAutoDeleteMessageCount(autoDeleteMessageCount)
.setTemporary(temporary)
.setAutoCreated(autoCreated)
- .setRingSize(ringSize);
+ .setRingSize(ringSize)
+ .setEnabled(enabled);
}
@Override
@@ -223,6 +230,7 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage {
buff.append(", autoDeleteDelay=" + autoDeleteDelay);
buff.append(", autoDeleteMessageCount=" + autoDeleteMessageCount);
buff.append(", ringSize=" + ringSize);
+ buff.append(", enabled=" + enabled);
buff.append("]");
return buff.toString();
@@ -364,6 +372,14 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage {
this.ringSize = ringSize;
}
+ public Boolean isEnabled() {
+ return enabled;
+ }
+
+ public void setEnabled(Boolean enabled) {
+ this.enabled = enabled;
+ }
+
@Override
public void encodeRest(final ActiveMQBuffer buffer) {
super.encodeRest(buffer);
@@ -384,6 +400,7 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage {
BufferHelper.writeNullableLong(buffer, autoDeleteMessageCount);
buffer.writeNullableSimpleString(groupFirstKey);
BufferHelper.writeNullableLong(buffer, ringSize);
+ BufferHelper.writeNullableBoolean(buffer, enabled);
}
@Override
@@ -414,6 +431,9 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage {
if (buffer.readableBytes() > 0) {
ringSize = BufferHelper.readNullableLong(buffer);
}
+ if (buffer.readableBytes() > 0) {
+ enabled = BufferHelper.readNullableBoolean(buffer);
+ }
}
@Override
@@ -437,6 +457,7 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage {
result = prime * result + (autoDeleteDelay == null ? 0 : autoDeleteDelay.hashCode());
result = prime * result + (autoDeleteMessageCount == null ? 0 : autoDeleteMessageCount.hashCode());
result = prime * result + (ringSize == null ? 0 : ringSize.hashCode());
+ result = prime * result + (enabled ? 1231 : 1237);
return result;
}
@@ -520,6 +541,11 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage {
return false;
} else if (!ringSize.equals(other.ringSize))
return false;
+ if (enabled == null) {
+ if (other.enabled != null)
+ return false;
+ } else if (!enabled.equals(other.enabled))
+ return false;
if (routingType == null) {
if (other.routingType != null)
return false;
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage_V2.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage_V2.java
index 6cbe8c9..7588e74 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage_V2.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage_V2.java
@@ -39,6 +39,8 @@ public class CreateSharedQueueMessage_V2 extends CreateSharedQueueMessage {
private Boolean autoDelete;
private Long autoDeleteDelay;
private Long autoDeleteMessageCount;
+ private Long ringSize;
+ private Boolean enabled;
public CreateSharedQueueMessage_V2(final QueueConfiguration queueConfiguration, boolean requiresResponse) {
this(
@@ -61,6 +63,8 @@ public class CreateSharedQueueMessage_V2 extends CreateSharedQueueMessage {
queueConfiguration.isAutoDelete(),
queueConfiguration.getAutoDeleteDelay(),
queueConfiguration.getAutoDeleteMessageCount(),
+ queueConfiguration.getRingSize(),
+ queueConfiguration.isEnabled(),
requiresResponse
);
}
@@ -84,6 +88,8 @@ public class CreateSharedQueueMessage_V2 extends CreateSharedQueueMessage {
final Boolean autoDelete,
final Long autoDeleteDelay,
final Long autoDeleteMessageCount,
+ final Long ringSize,
+ final Boolean enabled,
final boolean requiresResponse) {
this();
@@ -106,6 +112,8 @@ public class CreateSharedQueueMessage_V2 extends CreateSharedQueueMessage {
this.autoDelete = autoDelete;
this.autoDeleteDelay = autoDeleteDelay;
this.autoDeleteMessageCount = autoDeleteMessageCount;
+ this.ringSize = ringSize;
+ this.enabled = enabled;
this.requiresResponse = requiresResponse;
}
@@ -233,6 +241,22 @@ public class CreateSharedQueueMessage_V2 extends CreateSharedQueueMessage {
this.autoDeleteMessageCount = autoDeleteMessageCount;
}
+ public Long getRingSize() {
+ return ringSize;
+ }
+
+ public void setRingSize(Long ringSize) {
+ this.ringSize = ringSize;
+ }
+
+ public Boolean isEnabled() {
+ return enabled;
+ }
+
+ public void setEnabled(Boolean enabled) {
+ this.enabled = enabled;
+ }
+
public QueueConfiguration toQueueConfiguration() {
return new QueueConfiguration(queueName)
.setAddress(address)
@@ -252,7 +276,9 @@ public class CreateSharedQueueMessage_V2 extends CreateSharedQueueMessage {
.setLastValueKey(lastValueKey)
.setAutoDelete(autoDelete)
.setAutoDeleteDelay(autoDeleteDelay)
- .setAutoDeleteMessageCount(autoDeleteMessageCount);
+ .setAutoDeleteMessageCount(autoDeleteMessageCount)
+ .setRingSize(ringSize)
+ .setEnabled(enabled);
}
@Override
@@ -277,6 +303,8 @@ public class CreateSharedQueueMessage_V2 extends CreateSharedQueueMessage {
buff.append(", autoDelete=" + autoDelete);
buff.append(", autoDeleteDelay=" + autoDeleteDelay);
buff.append(", autoDeleteMessageCount=" + autoDeleteMessageCount);
+ buff.append(", ringSize=" + ringSize);
+ buff.append(", enabled=" + enabled);
buff.append(", requiresResponse=" + requiresResponse);
buff.append("]");
return buff.toString();
@@ -304,6 +332,8 @@ public class CreateSharedQueueMessage_V2 extends CreateSharedQueueMessage {
BufferHelper.writeNullableLong(buffer, autoDeleteDelay);
BufferHelper.writeNullableLong(buffer, autoDeleteMessageCount);
buffer.writeNullableSimpleString(groupFirstKey);
+ BufferHelper.writeNullableLong(buffer, ringSize);
+ BufferHelper.writeNullableBoolean(buffer, enabled);
}
@Override
@@ -334,6 +364,12 @@ public class CreateSharedQueueMessage_V2 extends CreateSharedQueueMessage {
if (buffer.readableBytes() > 0) {
groupFirstKey = buffer.readNullableSimpleString();
}
+ if (buffer.readableBytes() > 0) {
+ ringSize = buffer.readNullableLong();
+ }
+ if (buffer.readableBytes() > 0) {
+ enabled = buffer.readNullableBoolean();
+ }
}
@Override
@@ -360,7 +396,8 @@ public class CreateSharedQueueMessage_V2 extends CreateSharedQueueMessage {
result = prime * result + (autoDelete == null ? 0 : autoDelete.hashCode());
result = prime * result + (autoDeleteDelay == null ? 0 : autoDeleteDelay.hashCode());
result = prime * result + (autoDeleteMessageCount == null ? 0 : autoDeleteMessageCount.hashCode());
-
+ result = prime * result + (ringSize == null ? 0 : ringSize.hashCode());
+ result = prime * result + (enabled ? 1231 : 1237);
return result;
}
@@ -464,6 +501,16 @@ public class CreateSharedQueueMessage_V2 extends CreateSharedQueueMessage {
return false;
} else if (!autoDeleteMessageCount.equals(other.autoDeleteMessageCount))
return false;
+ if (ringSize == null) {
+ if (other.ringSize != null)
+ return false;
+ } else if (!ringSize.equals(other.ringSize))
+ return false;
+ if (enabled == null) {
+ if (other.enabled != null)
+ return false;
+ } else if (!enabled.equals(other.enabled))
+ return false;
return true;
}
}
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage_V3.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage_V3.java
index f736172..7be0210 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage_V3.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage_V3.java
@@ -62,12 +62,14 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
private Long ringSize;
+ private Boolean enabled;
+
public SessionQueueQueryResponseMessage_V3(final QueueQueryResult result) {
- this(result.getName(), result.getAddress(), result.isDurable(), result.isTemporary(), result.getFilterString(), result.getConsumerCount(), result.getMessageCount(), result.isExists(), result.isAutoCreateQueues(), result.isAutoCreated(), result.isPurgeOnNoConsumers(), result.getRoutingType(), result.getMaxConsumers(), result.isExclusive(), result.isGroupRebalance(), result.getGroupBuckets(), result.getGroupFirstKey(), result.isLastValue(), result.getLastValueKey(), result.isNonDestr [...]
+ this(result.getName(), result.getAddress(), result.isDurable(), result.isTemporary(), result.getFilterString(), result.getConsumerCount(), result.getMessageCount(), result.isExists(), result.isAutoCreateQueues(), result.isAutoCreated(), result.isPurgeOnNoConsumers(), result.getRoutingType(), result.getMaxConsumers(), result.isExclusive(), result.isGroupRebalance(), result.getGroupBuckets(), result.getGroupFirstKey(), result.isLastValue(), result.getLastValueKey(), result.isNonDestr [...]
}
public SessionQueueQueryResponseMessage_V3() {
- this(null, null, false, false, null, 0, 0, false, false, false, false, RoutingType.MULTICAST, -1, null, null,null, null, null, null, null, null, null, null, null, null, null);
+ this(null, null, false, false, null, 0, 0, false, false, false, false, RoutingType.MULTICAST, -1, null, null,null, null, null, null, null, null, null, null, null, null, null, null, null);
}
private SessionQueueQueryResponseMessage_V3(final SimpleString name,
@@ -95,7 +97,9 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
final Boolean autoDelete,
final Long autoDeleteDelay,
final Long autoDeleteMessageCount,
- final Integer defaultConsumerWindowSize) {
+ final Integer defaultConsumerWindowSize,
+ final Long ringSize,
+ final Boolean enabled) {
super(SESS_QUEUEQUERY_RESP_V3);
this.durable = durable;
@@ -149,6 +153,10 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
this.autoDeleteMessageCount = autoDeleteMessageCount;
this.defaultConsumerWindowSize = defaultConsumerWindowSize;
+
+ this.ringSize = ringSize;
+
+ this.enabled = enabled;
}
public boolean isAutoCreated() {
@@ -279,6 +287,18 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
return ringSize;
}
+ public void setRingSize(Long ringSize) {
+ this.ringSize = ringSize;
+ }
+
+ public Boolean isEnabled() {
+ return enabled;
+ }
+
+ public void setEnabled(Boolean enabled) {
+ this.enabled = enabled;
+ }
+
@Override
public void encodeRest(final ActiveMQBuffer buffer) {
super.encodeRest(buffer);
@@ -300,6 +320,7 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
BufferHelper.writeNullableLong(buffer, autoDeleteMessageCount);
buffer.writeNullableSimpleString(groupFirstKey);
BufferHelper.writeNullableLong(buffer, ringSize);
+ BufferHelper.writeNullableBoolean(buffer, enabled);
}
@@ -334,6 +355,9 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
if (buffer.readableBytes() > 0) {
ringSize = BufferHelper.readNullableLong(buffer);
}
+ if (buffer.readableBytes() > 0) {
+ enabled = BufferHelper.readNullableBoolean(buffer);
+ }
}
@Override
@@ -358,6 +382,7 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
result = prime * result + (autoDeleteMessageCount == null ? 0 : autoDeleteMessageCount.hashCode());
result = prime * result + ((defaultConsumerWindowSize == null) ? 0 : defaultConsumerWindowSize.hashCode());
result = prime * result + (ringSize == null ? 0 : ringSize.hashCode());
+ result = prime * result + (enabled == null ? 0 : enabled ? 1231 : 1237);
return result;
}
@@ -389,12 +414,13 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
buff.append(", autoDeleteMessageCount=" + autoDeleteMessageCount);
buff.append(", defaultConsumerWindowSize=" + defaultConsumerWindowSize);
buff.append(", ringSize=" + ringSize);
+ buff.append(", enabled=" + enabled);
return buff.toString();
}
@Override
public ClientSession.QueueQuery toQueueQuery() {
- return new QueueQueryImpl(isDurable(), isTemporary(), getConsumerCount(), getMessageCount(), getFilterString(), getAddress(), getName(), isExists(), isAutoCreateQueues(), getMaxConsumers(), isAutoCreated(), isPurgeOnNoConsumers(), getRoutingType(), isExclusive(), isGroupRebalance(), getGroupBuckets(), getGroupFirstKey(), isLastValue(), getLastValueKey(), isNonDestructive(), getConsumersBeforeDispatch(), getDelayBeforeDispatch(), isAutoDelete(), getAutoDeleteDelay(), getAutoDeleteMe [...]
+ return new QueueQueryImpl(isDurable(), isTemporary(), getConsumerCount(), getMessageCount(), getFilterString(), getAddress(), getName(), isExists(), isAutoCreateQueues(), getMaxConsumers(), isAutoCreated(), isPurgeOnNoConsumers(), getRoutingType(), isExclusive(), isGroupRebalance(), getGroupBuckets(), getGroupFirstKey(), isLastValue(), getLastValueKey(), isNonDestructive(), getConsumersBeforeDispatch(), getDelayBeforeDispatch(), isAutoDelete(), getAutoDeleteDelay(), getAutoDeleteMe [...]
}
@Override
@@ -475,6 +501,11 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
return false;
} else if (!ringSize.equals(other.ringSize))
return false;
+ if (enabled == enabled) {
+ if (other.ringSize != null)
+ return false;
+ } else if (!enabled.equals(other.ringSize))
+ return false;
if (defaultConsumerWindowSize == null) {
if (other.defaultConsumerWindowSize != null)
return false;
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/QueueQueryResult.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/QueueQueryResult.java
index f27966d..3ae86dc 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/QueueQueryResult.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/QueueQueryResult.java
@@ -75,6 +75,8 @@ public class QueueQueryResult {
private Long ringSize;
+ private Boolean enabled;
+
public QueueQueryResult(final SimpleString name,
final SimpleString address,
final boolean durable,
@@ -101,7 +103,8 @@ public class QueueQueryResult {
final Long autoDeleteDelay,
final Long autoDeleteMessageCount,
final Integer defaultConsumerWindowSize,
- final Long ringSize) {
+ final Long ringSize,
+ final Boolean enabled) {
this.durable = durable;
this.temporary = temporary;
@@ -155,6 +158,8 @@ public class QueueQueryResult {
this.defaultConsumerWindowSize = defaultConsumerWindowSize;
this.ringSize = ringSize;
+
+ this.enabled = enabled;
}
public boolean isExists() {
@@ -268,4 +273,8 @@ public class QueueQueryResult {
public Long getRingSize() {
return ringSize;
}
+
+ public Boolean isEnabled() {
+ return enabled;
+ }
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/CoreQueueConfiguration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/CoreQueueConfiguration.java
index b6d3bf3..bbc9971 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/CoreQueueConfiguration.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/CoreQueueConfiguration.java
@@ -57,6 +57,8 @@ public class CoreQueueConfiguration implements Serializable {
private Long delayBeforeDispatch;
+ private Boolean enabled;
+
private Long ringSize = ActiveMQDefaultConfiguration.getDefaultRingSize();
private Boolean purgeOnNoConsumers = ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers();
@@ -126,6 +128,10 @@ public class CoreQueueConfiguration implements Serializable {
return ringSize;
}
+ public Boolean isEnabled() {
+ return enabled;
+ }
+
public QueueConfiguration toQueueConfiguration() {
return new QueueConfiguration(this.getName())
.setAddress(this.getAddress())
@@ -144,7 +150,8 @@ public class CoreQueueConfiguration implements Serializable {
.setGroupBuckets(this.getGroupBuckets())
.setGroupFirstKey(this.getGroupFirstKey())
.setUser(this.getUser())
- .setLastValueKey(this.getLastValueKey());
+ .setLastValueKey(this.getLastValueKey())
+ .setEnabled(this.isEnabled());
}
public static CoreQueueConfiguration fromQueueConfiguration(QueueConfiguration queueConfiguration) {
@@ -165,6 +172,7 @@ public class CoreQueueConfiguration implements Serializable {
.setConsumersBeforeDispatch(queueConfiguration.getConsumersBeforeDispatch())
.setDelayBeforeDispatch(queueConfiguration.getDelayBeforeDispatch())
.setRingSize(queueConfiguration.getRingSize() != null ? queueConfiguration.getRingSize() : ActiveMQDefaultConfiguration.getDefaultRingSize())
+ .setEnabled(queueConfiguration.isEnabled() != null ? queueConfiguration.isEnabled() : ActiveMQDefaultConfiguration.getDefaultEnabled())
.setPurgeOnNoConsumers(queueConfiguration.isPurgeOnNoConsumers() != null ? queueConfiguration.isPurgeOnNoConsumers() : ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers())
.setRoutingType(queueConfiguration.getRoutingType() != null ? queueConfiguration.getRoutingType() : ActiveMQDefaultConfiguration.getDefaultRoutingType());
}
@@ -234,6 +242,14 @@ public class CoreQueueConfiguration implements Serializable {
}
/**
+ * @param enabled for this queue, default is true
+ */
+ public CoreQueueConfiguration setEnabled(Boolean enabled) {
+ this.enabled = enabled;
+ return this;
+ }
+
+ /**
* @param purgeOnNoConsumers delete this queue when consumer count reaches 0, default is false
*/
public CoreQueueConfiguration setPurgeOnNoConsumers(Boolean purgeOnNoConsumers) {
@@ -321,6 +337,8 @@ public class CoreQueueConfiguration implements Serializable {
result = prime * result + ((consumersBeforeDispatch == null) ? 0 : consumersBeforeDispatch.hashCode());
result = prime * result + ((delayBeforeDispatch == null) ? 0 : delayBeforeDispatch.hashCode());
result = prime * result + ((routingType == null) ? 0 : routingType.hashCode());
+ result = prime * result + ((ringSize == null) ? 0 : ringSize.hashCode());
+ result = prime * result + ((enabled == null) ? 0 : enabled.hashCode());
return result;
}
@@ -361,6 +379,18 @@ public class CoreQueueConfiguration implements Serializable {
} else if (!purgeOnNoConsumers.equals(other.purgeOnNoConsumers)) {
return false;
}
+ if (ringSize == null) {
+ if (other.ringSize != null)
+ return false;
+ } else if (!ringSize.equals(other.ringSize)) {
+ return false;
+ }
+ if (enabled == null) {
+ if (other.enabled != null)
+ return false;
+ } else if (!enabled.equals(other.enabled)) {
+ return false;
+ }
if (exclusive == null) {
if (other.exclusive != null)
return false;
@@ -447,6 +477,8 @@ public class CoreQueueConfiguration implements Serializable {
", nonDestructive=" + nonDestructive +
", consumersBeforeDispatch=" + consumersBeforeDispatch +
", delayBeforeDispatch=" + delayBeforeDispatch +
+ ", ringSize=" + ringSize +
+ ", enabled=" + enabled +
"]";
}
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
index 6e806e4..f42ae97 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
@@ -1299,6 +1299,7 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
Boolean nonDestructive = null;
Integer consumersBeforeDispatch = null;
Long delayBeforeDispatch = null;
+ Boolean enabled = null;
Long ringSize = ActiveMQDefaultConfiguration.getDefaultRingSize();
NamedNodeMap attributes = node.getAttributes();
@@ -1327,6 +1328,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
consumersBeforeDispatch = Integer.parseInt(item.getNodeValue());
} else if (item.getNodeName().equals("delay-before-dispatch")) {
delayBeforeDispatch = Long.parseLong(item.getNodeValue());
+ } else if (item.getNodeName().equals("enabled")) {
+ enabled = Boolean.parseBoolean(item.getNodeValue());
} else if (item.getNodeName().equals("ring-size")) {
ringSize = Long.parseLong(item.getNodeValue());
}
@@ -1363,6 +1366,7 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
.setNonDestructive(nonDestructive)
.setConsumersBeforeDispatch(consumersBeforeDispatch)
.setDelayBeforeDispatch(delayBeforeDispatch)
+ .setEnabled(enabled)
.setRingSize(ringSize);
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
index 086d8db..0ce51b8 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
@@ -619,6 +619,52 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
}
@Override
+ public void disable() throws Exception {
+ if (AuditLogger.isEnabled()) {
+ AuditLogger.disable(queue);
+ }
+ checkStarted();
+
+ clearIO();
+ try {
+ server.getPostOffice().updateQueue(queue.getQueueConfiguration().setEnabled(false));
+ } finally {
+ blockOnIO();
+ }
+ }
+
+ @Override
+ public void enable() throws Exception {
+ if (AuditLogger.isEnabled()) {
+ AuditLogger.enable(queue);
+ }
+ checkStarted();
+
+ clearIO();
+ try {
+ server.getPostOffice().updateQueue(queue.getQueueConfiguration().setEnabled(true));
+ } finally {
+ blockOnIO();
+ }
+ }
+
+ @Override
+ public boolean isEnabled() {
+ if (AuditLogger.isEnabled()) {
+ AuditLogger.isEnabled(queue);
+ }
+ checkStarted();
+
+ clearIO();
+ try {
+ return queue.isEnabled();
+ } finally {
+ blockOnIO();
+ }
+ }
+
+
+ @Override
public boolean isConfigurationManaged() {
if (AuditLogger.isEnabled()) {
AuditLogger.isConfigurationManaged(queue);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java
index 9e088e6..62ed7af 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java
@@ -58,6 +58,10 @@ public interface QueueBindingInfo {
void setPurgeOnNoConsumers(boolean purgeOnNoConsumers);
+ boolean isEnabled();
+
+ void setEnabled(boolean enabled);
+
boolean isExclusive();
void setExclusive(boolean exclusive);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
index 44230b0..551c64f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
@@ -1307,7 +1307,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
SimpleString filterString = filter == null ? null : filter.getFilterString();
- PersistentQueueBindingEncoding bindingEncoding = new PersistentQueueBindingEncoding(queue.getName(), binding.getAddress(), filterString, queue.getUser(), queue.isAutoCreated(), queue.getMaxConsumers(), queue.isPurgeOnNoConsumers(), queue.isExclusive(), queue.isGroupRebalance(), queue.getGroupBuckets(), queue.getGroupFirstKey(), queue.isLastValue(), queue.getLastValueKey(), queue.isNonDestructive(), queue.getConsumersBeforeDispatch(), queue.getDelayBeforeDispatch(), queue.isAutoDele [...]
+ PersistentQueueBindingEncoding bindingEncoding = new PersistentQueueBindingEncoding(queue.getName(), binding.getAddress(), filterString, queue.getUser(), queue.isAutoCreated(), queue.getMaxConsumers(), queue.isPurgeOnNoConsumers(), queue.isEnabled(), queue.isExclusive(), queue.isGroupRebalance(), queue.getGroupBuckets(), queue.getGroupFirstKey(), queue.isLastValue(), queue.getLastValueKey(), queue.isNonDestructive(), queue.getConsumersBeforeDispatch(), queue.getDelayBeforeDispatch( [...]
readLock();
try {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java
index adf91c2..555bce1 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java
@@ -46,6 +46,8 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
public boolean purgeOnNoConsumers;
+ public boolean enabled;
+
public boolean exclusive;
public boolean lastValue;
@@ -96,6 +98,8 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
maxConsumers +
", purgeOnNoConsumers=" +
purgeOnNoConsumers +
+ ", enabled=" +
+ enabled +
", exclusive=" +
exclusive +
", lastValue=" +
@@ -134,6 +138,7 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
final boolean autoCreated,
final int maxConsumers,
final boolean purgeOnNoConsumers,
+ final boolean enabled,
final boolean exclusive,
final boolean groupRebalance,
final int groupBuckets,
@@ -156,6 +161,7 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
this.autoCreated = autoCreated;
this.maxConsumers = maxConsumers;
this.purgeOnNoConsumers = purgeOnNoConsumers;
+ this.enabled = enabled;
this.exclusive = exclusive;
this.groupRebalance = groupRebalance;
this.groupBuckets = groupBuckets;
@@ -256,6 +262,16 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
}
@Override
+ public boolean isEnabled() {
+ return enabled;
+ }
+
+ @Override
+ public void setEnabled(boolean enabled) {
+ this.enabled = enabled;
+ }
+
+ @Override
public boolean isExclusive() {
return exclusive;
}
@@ -461,6 +477,11 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
} else {
ringSize = ActiveMQDefaultConfiguration.getDefaultRingSize();
}
+ if (buffer.readableBytes() > 0) {
+ enabled = buffer.readBoolean();
+ } else {
+ enabled = ActiveMQDefaultConfiguration.getDefaultEnabled();
+ }
}
@Override
@@ -487,6 +508,7 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
buffer.writeLong(autoDeleteMessageCount);
buffer.writeNullableSimpleString(groupFirstKey);
buffer.writeLong(ringSize);
+ buffer.writeBoolean(enabled);
}
@Override
@@ -510,7 +532,8 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
DataConstants.SIZE_LONG +
DataConstants.SIZE_LONG +
SimpleString.sizeofNullableString(groupFirstKey) +
- DataConstants.SIZE_LONG;
+ DataConstants.SIZE_LONG +
+ DataConstants.SIZE_BOOLEAN;
}
private SimpleString createMetadata() {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index 67564bb..556362e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -660,6 +660,10 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
changed = true;
queue.setPurgeOnNoConsumers(queueConfiguration.isPurgeOnNoConsumers());
}
+ if (queueConfiguration.isEnabled() != null && queue.isEnabled() != queueConfiguration.isEnabled().booleanValue()) {
+ changed = true;
+ queue.setEnabled(queueConfiguration.isEnabled());
+ }
if (queueConfiguration.isExclusive() != null && queue.isExclusive() != queueConfiguration.isExclusive().booleanValue()) {
changed = true;
queue.setExclusive(queueConfiguration.isExclusive());
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
index 5e8e94d..be7b155 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
@@ -94,6 +94,10 @@ public interface Queue extends Bindable,CriticalComponent {
void setPurgeOnNoConsumers(boolean value);
+ boolean isEnabled();
+
+ void setEnabled(boolean value);
+
int getConsumersBeforeDispatch();
void setConsumersBeforeDispatch(int consumersBeforeDispatch);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index bf6c48e..d40a50b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -971,6 +971,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
SimpleString defaultGroupFirstKey = addressSettings.getDefaultGroupFirstKey();
long autoDeleteQueuesDelay = addressSettings.getAutoDeleteQueuesDelay();
long autoDeleteQueuesMessageCount = addressSettings.getAutoDeleteQueuesMessageCount();
+ long defaultRingSize = addressSettings.getDefaultRingSize();
+ boolean defaultEnabled = ActiveMQDefaultConfiguration.getDefaultEnabled();
SimpleString managementAddress = getManagementService() != null ? getManagementService().getManagementAddress() : null;
@@ -981,12 +983,12 @@ public class ActiveMQServerImpl implements ActiveMQServer {
SimpleString filterString = filter == null ? null : filter.getFilterString();
- response = new QueueQueryResult(realName, binding.getAddress(), queue.isDurable(), queue.isTemporary(), filterString, queue.getConsumerCount(), queue.getMessageCount(), autoCreateQueues, true, queue.isAutoCreated(), queue.isPurgeOnNoConsumers(), queue.getRoutingType(), queue.getMaxConsumers(), queue.isExclusive(), queue.isGroupRebalance(), queue.getGroupBuckets(), queue.getGroupFirstKey(), queue.isLastValue(), queue.getLastValueKey(), queue.isNonDestructive(), queue.getConsumers [...]
+ response = new QueueQueryResult(realName, binding.getAddress(), queue.isDurable(), queue.isTemporary(), filterString, queue.getConsumerCount(), queue.getMessageCount(), autoCreateQueues, true, queue.isAutoCreated(), queue.isPurgeOnNoConsumers(), queue.getRoutingType(), queue.getMaxConsumers(), queue.isExclusive(), queue.isGroupRebalance(), queue.getGroupBuckets(), queue.getGroupFirstKey(), queue.isLastValue(), queue.getLastValueKey(), queue.isNonDestructive(), queue.getConsumers [...]
} else if (realName.equals(managementAddress)) {
// make an exception for the management address (see HORNETQ-29)
- response = new QueueQueryResult(realName, managementAddress, true, false, null, -1, -1, autoCreateQueues, true, false, false, RoutingType.MULTICAST, -1, false, false, null, null, null,null, null, null, null, null, null, null, defaultConsumerWindowSize, null);
+ response = new QueueQueryResult(realName, managementAddress, true, false, null, -1, -1, autoCreateQueues, true, false, false, RoutingType.MULTICAST, -1, false, false, null, null, null,null, null, null, null, null, null, null, defaultConsumerWindowSize, null, null);
} else {
- response = new QueueQueryResult(realName, addressName, true, false, null, 0, 0, autoCreateQueues, false, false, defaultPurgeOnNoConsumers, RoutingType.MULTICAST, defaultMaxConsumers, defaultExclusiveQueue, defaultGroupRebalance, defaultGroupBuckets, defaultGroupFirstKey, defaultLastValueQueue, defaultLastValueKey, defaultNonDestructive, defaultConsumersBeforeDispatch, defaultDelayBeforeDispatch, isAutoDelete(false, addressSettings), autoDeleteQueuesDelay, autoDeleteQueuesMessage [...]
+ response = new QueueQueryResult(realName, addressName, true, false, null, 0, 0, autoCreateQueues, false, false, defaultPurgeOnNoConsumers, RoutingType.MULTICAST, defaultMaxConsumers, defaultExclusiveQueue, defaultGroupRebalance, defaultGroupBuckets, defaultGroupFirstKey, defaultLastValueQueue, defaultLastValueKey, defaultNonDestructive, defaultConsumersBeforeDispatch, defaultDelayBeforeDispatch, isAutoDelete(false, addressSettings), autoDeleteQueuesDelay, autoDeleteQueuesMessage [...]
}
return response;
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
index db6a55a..60e9421 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
@@ -143,6 +143,7 @@ public class PostOfficeJournalLoader implements JournalLoader {
.setTemporary(false)
.setAutoCreated(queueBindingInfo.isAutoCreated())
.setPurgeOnNoConsumers(queueBindingInfo.isPurgeOnNoConsumers())
+ .setEnabled(queueBindingInfo.isEnabled())
.setMaxConsumers(queueBindingInfo.getMaxConsumers())
.setExclusive(queueBindingInfo.isExclusive())
.setGroupRebalance(queueBindingInfo.isGroupRebalance())
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConfigurationUtils.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConfigurationUtils.java
index da4faa3..ed90282 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConfigurationUtils.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConfigurationUtils.java
@@ -17,6 +17,7 @@
package org.apache.activemq.artemis.core.server.impl;
+import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
@@ -37,11 +38,12 @@ public class QueueConfigurationUtils {
config.setRoutingType(config.getRoutingType() == null ? as.getDefaultQueueRoutingType() : config.getRoutingType());
config.setPurgeOnNoConsumers(config.isPurgeOnNoConsumers() == null ? as.isDefaultPurgeOnNoConsumers() : config.isPurgeOnNoConsumers());
config.setAutoCreateAddress(config.isAutoCreateAddress() == null ? as.isAutoCreateAddresses() : config.isAutoCreateAddress());
-
// set the default auto-delete
config.setAutoDelete(config.isAutoDelete() == null ? (config.isAutoCreated() && as.isAutoDeleteQueues()) || (!config.isAutoCreated() && as.isAutoDeleteCreatedQueues()) : config.isAutoDelete());
config.setAutoDeleteDelay(config.getAutoDeleteDelay() == null ? as.getAutoDeleteQueuesDelay() : config.getAutoDeleteDelay());
config.setAutoDeleteMessageCount(config.getAutoDeleteMessageCount() == null ? as.getAutoDeleteQueuesMessageCount() : config.getAutoDeleteMessageCount());
+
+ config.setEnabled(config.isEnabled() == null ? ActiveMQDefaultConfiguration.getDefaultEnabled() : config.isEnabled());
}
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 84a6140..3a7a224 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -296,6 +296,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
private volatile boolean purgeOnNoConsumers;
+ private volatile boolean enabled;
+
private final AddressInfo addressInfo;
private volatile RoutingType routingType;
@@ -633,6 +635,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
this.purgeOnNoConsumers = queueConfiguration.isPurgeOnNoConsumers() == null ? ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers() : queueConfiguration.isPurgeOnNoConsumers();
+ this.enabled = queueConfiguration.isEnabled() == null ? ActiveMQDefaultConfiguration.getDefaultEnabled() : queueConfiguration.isEnabled();
+
this.consumersBeforeDispatch = queueConfiguration.getConsumersBeforeDispatch() == null ? ActiveMQDefaultConfiguration.getDefaultConsumersBeforeDispatch() : queueConfiguration.getConsumersBeforeDispatch();
this.delayBeforeDispatch = queueConfiguration.getDelayBeforeDispatch() == null ? ActiveMQDefaultConfiguration.getDefaultDelayBeforeDispatch() : queueConfiguration.getDelayBeforeDispatch();
@@ -797,6 +801,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
@Override
public void route(final Message message, final RoutingContext context) throws Exception {
+ if (!enabled) {
+ context.setReusable(false);
+ return;
+ }
if (purgeOnNoConsumers) {
context.setReusable(false);
if (getConsumerCount() == 0) {
@@ -870,6 +878,16 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
@Override
+ public boolean isEnabled() {
+ return enabled;
+ }
+
+ @Override
+ public synchronized void setEnabled(boolean value) {
+ this.enabled = value;
+ }
+
+ @Override
public int getMaxConsumers() {
return maxConsumers;
}
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index 47bd59e..05c91fd 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -3789,6 +3789,7 @@
<xsd:attribute name="consumers-before-dispatch" type="xsd:int" use="optional"/>
<xsd:attribute name="delay-before-dispatch" type="xsd:long" use="optional"/>
<xsd:attribute name="ring-size" type="xsd:long" use="optional"/>
+ <xsd:attribute name="enabled" type="xsd:boolean" use="optional"/>
<xsd:attributeGroup ref="xml:specialAttrs"/>
</xsd:complexType>
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/QueueBindingEncodingTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/QueueBindingEncodingTest.java
index f6ed46b..2ac3a2a 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/QueueBindingEncodingTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/QueueBindingEncodingTest.java
@@ -50,6 +50,7 @@ public class QueueBindingEncodingTest extends Assert {
final byte routingType = RandomUtil.randomByte();
final boolean configurationManaged = RandomUtil.randomBoolean();
final long ringSize = RandomUtil.randomLong();
+ final boolean enabled = RandomUtil.randomBoolean();
PersistentQueueBindingEncoding encoding = new PersistentQueueBindingEncoding(name,
address,
@@ -58,6 +59,7 @@ public class QueueBindingEncodingTest extends Assert {
autoCreated,
maxConsumers,
purgeOnNoConsumers,
+ enabled,
exclusive,
groupRebalance,
groupBuckets,
@@ -87,6 +89,7 @@ public class QueueBindingEncodingTest extends Assert {
assertEquals(autoCreated, decoding.isAutoCreated());
assertEquals(maxConsumers, decoding.getMaxConsumers());
assertEquals(purgeOnNoConsumers, decoding.isPurgeOnNoConsumers());
+ assertEquals(enabled, decoding.isEnabled());
assertEquals(exclusive, decoding.isExclusive());
assertEquals(groupRebalance, decoding.isGroupRebalance());
assertEquals(groupBuckets, decoding.getGroupBuckets());
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
index 6eda103..40c472c 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
@@ -818,6 +818,16 @@ public class ScheduledDeliveryHandlerTest extends Assert {
}
@Override
+ public boolean isEnabled() {
+ return false;
+ }
+
+ @Override
+ public void setEnabled(boolean value) {
+
+ }
+
+ @Override
public PagingStore getPagingStore() {
return null;
}
diff --git a/artemis-tools/src/test/resources/artemis-configuration.xsd b/artemis-tools/src/test/resources/artemis-configuration.xsd
index 09e8f9e..a3e5812 100644
--- a/artemis-tools/src/test/resources/artemis-configuration.xsd
+++ b/artemis-tools/src/test/resources/artemis-configuration.xsd
@@ -3573,6 +3573,7 @@
<xsd:attribute name="consumers-before-dispatch" type="xsd:int" use="optional"/>
<xsd:attribute name="delay-before-dispatch" type="xsd:long" use="optional"/>
<xsd:attribute name="ring-size" type="xsd:long" use="optional"/>
+ <xsd:attribute name="enabled" type="xsd:boolean" use="optional"/>
<xsd:attributeGroup ref="xml:specialAttrs"/>
</xsd:complexType>
diff --git a/docs/user-manual/en/address-model.md b/docs/user-manual/en/address-model.md
index b08c704..ce3737e 100644
--- a/docs/user-manual/en/address-model.md
+++ b/docs/user-manual/en/address-model.md
@@ -527,6 +527,34 @@ Open the file `<broker-instance>/etc/broker.xml` for editing.
</addresses>
```
+#### Disabled Queue
+
+If a user requires to statically configure a queue and disable routing to it,
+for example where a queue needs to be defined so a consumer can bind,
+but you want to disable message routing to it for the time being.
+
+Or you need to stop message flow to the queue to allow investigation keeping the consumer bound,
+but dont wish to have further messages routed to the queue to avoid message build up.
+
+When **enabled** is set to **true** the queue will have messages routed to it. (default)
+
+When **enabled** is set to **false** the queue will NOT have messages routed to it.
+
+Open the file `<broker-instance>/etc/broker.xml` for editing.
+
+```xml
+<addresses>
+ <address name="foo.bar">
+ <multicast>
+ <queue name="orders1" enabled="false"/>
+ </multicast>
+ </address>
+</addresses>
+```
+
+Warning: Disabling all the queues on an address means that any message sent to that address will be silently dropped.
+
+
## Protocol Managers
A "protocol manager" maps protocol-specific concepts down to the core
diff --git a/docs/user-manual/en/management.md b/docs/user-manual/en/management.md
index 8985d6d..431e567 100644
--- a/docs/user-manual/en/management.md
+++ b/docs/user-manual/en/management.md
@@ -211,6 +211,15 @@ a given property.)
The `QueueControl` can pause and resume the underlying queue. When a queue is
paused, it will receive messages but will not deliver them. When it's resumed,
it'll begin delivering the queued messages, if any.
+
+- Disabling and Enabling Queues
+
+ The `QueueControl` can disable and enable the underlying queue. When a queue is
+ disabled, it will not longer have messages routed to it. When it's enabled,
+ it'll begin having messages routed to it again.
+
+ This is useful where you may need to disable message routing to a queue but wish to keep consumers active
+ to investigate issues, without causing further message build up in the queue.
#### Other Resources Management
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressingTest.java
index aed19c6..45148fe 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressingTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressingTest.java
@@ -268,6 +268,62 @@ public class AddressingTest extends ActiveMQTestBase {
Wait.assertEquals(1, server.locateQueue(queueName)::getMessageCount);
}
+
+ @Test
+ public void testQueueEnabledDisabled() throws Exception {
+ SimpleString address = new SimpleString("test.address");
+ SimpleString defaultQueue = SimpleString.toSimpleString(UUID.randomUUID().toString());
+ SimpleString enabledQueue = SimpleString.toSimpleString(UUID.randomUUID().toString());
+ SimpleString disabledQueue = SimpleString.toSimpleString(UUID.randomUUID().toString());
+
+
+ //Validate default is enabled, and check that queues enabled receive messages and disabled do not on same address.
+
+ server.createQueue(new QueueConfiguration(defaultQueue).setAddress(address).setRoutingType(RoutingType.MULTICAST));
+ server.createQueue(new QueueConfiguration(enabledQueue).setAddress(address).setRoutingType(RoutingType.MULTICAST).setEnabled(true));
+ server.createQueue(new QueueConfiguration(disabledQueue).setAddress(address).setRoutingType(RoutingType.MULTICAST).setEnabled(false));
+
+ assertNotNull(server.locateQueue(defaultQueue));
+ assertNotNull(server.locateQueue(enabledQueue));
+ assertNotNull(server.locateQueue(disabledQueue));
+ ClientSession session = sessionFactory.createSession();
+ ClientProducer producer = session.createProducer(address);
+ producer.send(session.createMessage(true));
+
+ assertNotNull(server.locateQueue(defaultQueue));
+ assertNotNull(server.locateQueue(enabledQueue));
+ assertNotNull(server.locateQueue(disabledQueue));
+
+ Wait.assertEquals(1, server.locateQueue(defaultQueue)::getMessageCount);
+ Wait.assertEquals(1, server.locateQueue(enabledQueue)::getMessageCount);
+ Wait.assertEquals(0, server.locateQueue(disabledQueue)::getMessageCount);
+
+ //Update Queue Disable All
+ server.updateQueue(new QueueConfiguration(defaultQueue).setAddress(address).setRoutingType(RoutingType.MULTICAST).setEnabled(false));
+ server.updateQueue(new QueueConfiguration(enabledQueue).setAddress(address).setRoutingType(RoutingType.MULTICAST).setEnabled(false));
+ server.updateQueue(new QueueConfiguration(disabledQueue).setAddress(address).setRoutingType(RoutingType.MULTICAST).setEnabled(false));
+
+ producer.send(session.createMessage(true));
+
+ Wait.assertEquals(1, server.locateQueue(defaultQueue)::getMessageCount);
+ Wait.assertEquals(1, server.locateQueue(enabledQueue)::getMessageCount);
+ Wait.assertEquals(0, server.locateQueue(disabledQueue)::getMessageCount);
+
+
+ //Update Queue Enable All
+ server.updateQueue(new QueueConfiguration(defaultQueue).setAddress(address).setRoutingType(RoutingType.MULTICAST).setEnabled(true));
+ server.updateQueue(new QueueConfiguration(enabledQueue).setAddress(address).setRoutingType(RoutingType.MULTICAST).setEnabled(true));
+ server.updateQueue(new QueueConfiguration(disabledQueue).setAddress(address).setRoutingType(RoutingType.MULTICAST).setEnabled(true));
+
+
+ producer.send(session.createMessage(true));
+
+ Wait.assertEquals(2, server.locateQueue(defaultQueue)::getMessageCount);
+ Wait.assertEquals(2, server.locateQueue(enabledQueue)::getMessageCount);
+ Wait.assertEquals(1, server.locateQueue(disabledQueue)::getMessageCount);
+
+ }
+
@Test
public void testLimitOnMaxConsumers() throws Exception {
SimpleString address = new SimpleString("test.address");
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SessionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SessionTest.java
index 8e51681..6191db7 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SessionTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SessionTest.java
@@ -670,7 +670,27 @@ public class SessionTest extends ActiveMQTestBase {
assertTrue(result.isPurgeOnNoConsumers());
assertTrue(result.isExclusive());
assertTrue(result.isLastValue());
+ assertTrue(result.isEnabled());
server.destroyQueue(queueName);
}
+ {
+ if (!legacyCreateQueue) {
+ clientSession.createQueue(new QueueConfiguration(queueName).setAddress(addressName).setRoutingType(RoutingType.ANYCAST).setFilterString("filter").setAutoCreated(true).setMaxConsumers(0).setPurgeOnNoConsumers(true).setExclusive(true).setLastValue(true).setEnabled(false));
+ Queue result = server.locateQueue(queueName);
+ assertEquals(addressName, result.getAddress());
+ assertEquals(queueName, result.getName());
+ assertEquals(RoutingType.ANYCAST, result.getRoutingType());
+ assertEquals("filter", result.getFilter().getFilterString().toString());
+ assertTrue(result.isDurable());
+ assertTrue(result.isAutoCreated());
+ assertEquals(0, result.getMaxConsumers());
+ assertTrue(result.isPurgeOnNoConsumers());
+ assertTrue(result.isExclusive());
+ assertTrue(result.isLastValue());
+ assertFalse(result.isEnabled());
+ server.destroyQueue(queueName);
+ }
+
+ }
}
}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java
index f0f430c..f63a408 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java
@@ -568,6 +568,10 @@ public class RedeployTest extends ActiveMQTestBase {
Assert.assertTrue(listQueuesNamesForAddress(embeddedActiveMQ, "config_test_queue_change").contains("config_test_queue_change_queue"));
Assert.assertEquals(10, getQueue(embeddedActiveMQ, "config_test_queue_change_queue").getMaxConsumers());
Assert.assertEquals(false, getQueue(embeddedActiveMQ, "config_test_queue_change_queue").isPurgeOnNoConsumers());
+ Assert.assertEquals(true, getQueue(embeddedActiveMQ, "config_test_queue_change_queue").isEnabled());
+
+ Assert.assertEquals(true, getQueue(embeddedActiveMQ, "config_test_queue_change_queue_defaults").isPurgeOnNoConsumers());
+ Assert.assertEquals(false, getQueue(embeddedActiveMQ, "config_test_queue_change_queue_defaults").isEnabled());
Files.copy(url2.openStream(), brokerXML, StandardCopyOption.REPLACE_EXISTING);
brokerXML.toFile().setLastModified(System.currentTimeMillis() + 1000);
@@ -594,6 +598,11 @@ public class RedeployTest extends ActiveMQTestBase {
Assert.assertTrue(listQueuesNamesForAddress(embeddedActiveMQ, "config_test_queue_change").contains("config_test_queue_change_queue"));
Assert.assertEquals(1, getQueue(embeddedActiveMQ, "config_test_queue_change_queue").getMaxConsumers());
Assert.assertEquals(true, getQueue(embeddedActiveMQ, "config_test_queue_change_queue").isPurgeOnNoConsumers());
+ Assert.assertEquals(false, getQueue(embeddedActiveMQ, "config_test_queue_change_queue").isEnabled());
+
+ Assert.assertEquals(false, getQueue(embeddedActiveMQ, "config_test_queue_change_queue_defaults").isPurgeOnNoConsumers());
+ Assert.assertEquals(true, getQueue(embeddedActiveMQ, "config_test_queue_change_queue_defaults").isEnabled());
+
} finally {
embeddedActiveMQ.stop();
}
@@ -621,7 +630,10 @@ public class RedeployTest extends ActiveMQTestBase {
Assert.assertTrue(listQueuesNamesForAddress(embeddedActiveMQ, "config_test_queue_change").contains("config_test_queue_change_queue"));
Assert.assertEquals(1, getQueue(embeddedActiveMQ, "config_test_queue_change_queue").getMaxConsumers());
Assert.assertEquals(true, getQueue(embeddedActiveMQ, "config_test_queue_change_queue").isPurgeOnNoConsumers());
+ Assert.assertEquals(false, getQueue(embeddedActiveMQ, "config_test_queue_change_queue").isEnabled());
+ Assert.assertEquals(false, getQueue(embeddedActiveMQ, "config_test_queue_change_queue_defaults").isPurgeOnNoConsumers());
+ Assert.assertEquals(true, getQueue(embeddedActiveMQ, "config_test_queue_change_queue_defaults").isEnabled());
} finally {
embeddedActiveMQ.stop();
}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
index 824579f..d969998 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
@@ -181,6 +181,21 @@ public class QueueControlUsingCoreTest extends QueueControlTest {
}
@Override
+ public boolean isEnabled() {
+ return (Boolean) proxy.retrieveAttributeValue("isEnabled");
+ }
+
+ @Override
+ public void enable() throws Exception {
+ proxy.invokeOperation("enable");
+ }
+
+ @Override
+ public void disable() throws Exception {
+ proxy.invokeOperation("disable");
+ }
+
+ @Override
public boolean isConfigurationManaged() {
return (Boolean) proxy.retrieveAttributeValue("configurationManaged");
}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/QueueConfigRestartTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/QueueConfigRestartTest.java
index 939071b..fd8d275 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/QueueConfigRestartTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/QueueConfigRestartTest.java
@@ -173,6 +173,50 @@ public class QueueConfigRestartTest extends ActiveMQTestBase {
QueueBinding queueBinding2 = (QueueBinding)server.getPostOffice().getBinding(queue);
Assert.assertTrue(queueBinding2.getQueue().isPurgeOnNoConsumers());
}
+
+ @Test
+ public void testQueueConfigEnabledAndRestart() throws Exception {
+ ActiveMQServer server = createServer(true);
+
+ server.start();
+
+ SimpleString address = new SimpleString("test.address");
+ SimpleString queue = new SimpleString("test.queue");
+
+ server.createQueue(new QueueConfiguration(queue).setAddress(address).setEnabled(true));
+
+ QueueBinding queueBinding1 = (QueueBinding)server.getPostOffice().getBinding(queue);
+ Assert.assertTrue(queueBinding1.getQueue().isEnabled());
+
+ server.stop();
+
+ server.start();
+
+ QueueBinding queueBinding2 = (QueueBinding)server.getPostOffice().getBinding(queue);
+ Assert.assertTrue(queueBinding2.getQueue().isEnabled());
+ }
+
+ @Test
+ public void testQueueConfigDisabledAndRestart() throws Exception {
+ ActiveMQServer server = createServer(true);
+
+ server.start();
+
+ SimpleString address = new SimpleString("test.address");
+ SimpleString queue = new SimpleString("test.queue");
+
+ server.createQueue(new QueueConfiguration(queue).setAddress(address).setEnabled(false));
+
+ QueueBinding queueBinding1 = (QueueBinding)server.getPostOffice().getBinding(queue);
+ Assert.assertFalse(queueBinding1.getQueue().isEnabled());
+
+ server.stop();
+
+ server.start();
+
+ QueueBinding queueBinding2 = (QueueBinding)server.getPostOffice().getBinding(queue);
+ Assert.assertFalse(queueBinding2.getQueue().isEnabled());
+ }
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
diff --git a/tests/integration-tests/src/test/resources/reload-changed.xml b/tests/integration-tests/src/test/resources/reload-changed.xml
index 2ed7878..33f92ea 100644
--- a/tests/integration-tests/src/test/resources/reload-changed.xml
+++ b/tests/integration-tests/src/test/resources/reload-changed.xml
@@ -147,7 +147,9 @@ under the License.
</address>
<address name="config_test_queue_change">
<multicast>
- <queue name="config_test_queue_change_queue" max-consumers="1" purge-on-no-consumers="true" />
+ <queue name="config_test_queue_change_queue" max-consumers="1" purge-on-no-consumers="true" enabled="false" />
+ <!-- checks that when not set, values are set back to defaults -->
+ <queue name="config_test_queue_change_queue_defaults" />
</multicast>
</address>
</addresses>
diff --git a/tests/integration-tests/src/test/resources/reload-original.xml b/tests/integration-tests/src/test/resources/reload-original.xml
index 45476ac..9adb766 100644
--- a/tests/integration-tests/src/test/resources/reload-original.xml
+++ b/tests/integration-tests/src/test/resources/reload-original.xml
@@ -161,6 +161,7 @@ under the License.
<address name="config_test_queue_change">
<multicast>
<queue name="config_test_queue_change_queue" max-consumers="10" purge-on-no-consumers="false" />
+ <queue name="config_test_queue_change_queue_defaults" purge-on-no-consumers="true" enabled="false" />
</multicast>
</address>
</addresses>
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
index d3a2050..b1896bd 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
@@ -51,6 +51,16 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
}
@Override
+ public boolean isEnabled() {
+ return false;
+ }
+
+ @Override
+ public void setEnabled(boolean value) {
+
+ }
+
+ @Override
public PagingStore getPagingStore() {
return null;
}