You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2020/02/20 22:45:09 UTC
[activemq-artemis] 01/03: ARTEMIS-2587 auto-create dead-letter
resources
This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
commit b76f3b3a0dd5c684907a817765bd0f075f275d0b
Author: Justin Bertram <jb...@apache.org>
AuthorDate: Fri Feb 14 12:39:19 2020 -0600
ARTEMIS-2587 auto-create dead-letter resources
This is a reimplementation of the IndividualDeadLetterQueueStrategy
from 5.x in a way that makes sense with the Artemis addressing model.
---
.../api/core/management/ActiveMQServerControl.java | 57 ++++++
.../api/core/management/AddressSettingsInfo.java | 31 ++-
.../core/management/AddressSettingsInfoTest.java | 8 +-
.../deployers/impl/FileConfigurationParser.java | 12 ++
.../management/impl/ActiveMQServerControlImpl.java | 116 ++++++++++-
.../artemis/core/server/impl/QueueImpl.java | 14 ++
.../core/settings/impl/AddressSettings.java | 102 +++++++++-
.../resources/schema/artemis-configuration.xsd | 27 ++-
.../core/config/impl/FileConfigurationTest.java | 7 +
.../resources/ConfigurationTest-full-config.xml | 3 +
...rationTest-xinclude-config-address-settings.xml | 3 +
.../src/test/resources/artemis-configuration.xsd | 27 ++-
docs/user-manual/en/address-model.md | 16 ++
docs/user-manual/en/configuration-index.md | 3 +
docs/user-manual/en/undelivered-messages.md | 67 ++++++-
.../management/ActiveMQServerControlTest.java | 29 ++-
.../ActiveMQServerControlUsingCoreTest.java | 108 +++++++++++
.../integration/management/QueueControlTest.java | 55 ++++++
.../server/AutoCreateDeadLetterResourcesTest.java | 216 +++++++++++++++++++++
19 files changed, 883 insertions(+), 18 deletions(-)
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
index 04b267c..dbd8e1a 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
@@ -1387,6 +1387,63 @@ public interface ActiveMQServerControl {
@Parameter(desc = "factor by which to modify the redelivery delay slightly to avoid collisions", name = "redeliveryCollisionAvoidanceFactor") double redeliveryCollisionAvoidanceFactor,
@Parameter(desc = "the number of messages to preserve for future queues created on the matching address", name = "retroactiveMessageCount") long retroactiveMessageCount) throws Exception;
+ /**
+ * adds a new address setting for a specific address
+ */
+ @Operation(desc = "Add address settings for addresses matching the addressMatch", impact = MBeanOperationInfo.ACTION)
+ void addAddressSettings(@Parameter(desc = "an address match", name = "addressMatch") String addressMatch,
+ @Parameter(desc = "the dead letter address setting", name = "DLA") String DLA,
+ @Parameter(desc = "the expiry address setting", name = "expiryAddress") String expiryAddress,
+ @Parameter(desc = "the expiry delay setting", name = "expiryDelay") long expiryDelay,
+ @Parameter(desc = "are any queues created for this address a last value queue", name = "lastValueQueue") boolean lastValueQueue,
+ @Parameter(desc = "the delivery attempts", name = "deliveryAttempts") int deliveryAttempts,
+ @Parameter(desc = "the max size in bytes", name = "maxSizeBytes") long maxSizeBytes,
+ @Parameter(desc = "the page size in bytes", name = "pageSizeBytes") int pageSizeBytes,
+ @Parameter(desc = "the max number of pages in the soft memory cache", name = "pageMaxCacheSize") int pageMaxCacheSize,
+ @Parameter(desc = "the redelivery delay", name = "redeliveryDelay") long redeliveryDelay,
+ @Parameter(desc = "the redelivery delay multiplier", name = "redeliveryMultiplier") double redeliveryMultiplier,
+ @Parameter(desc = "the maximum redelivery delay", name = "maxRedeliveryDelay") long maxRedeliveryDelay,
+ @Parameter(desc = "the redistribution delay", name = "redistributionDelay") long redistributionDelay,
+ @Parameter(desc = "do we send to the DLA when there is no where to route the message", name = "sendToDLAOnNoRoute") boolean sendToDLAOnNoRoute,
+ @Parameter(desc = "the policy to use when the address is full", name = "addressFullMessagePolicy") String addressFullMessagePolicy,
+ @Parameter(desc = "when a consumer falls below this threshold in terms of messages consumed per second it will be considered 'slow'", name = "slowConsumerThreshold") long slowConsumerThreshold,
+ @Parameter(desc = "how often (in seconds) to check for slow consumers", name = "slowConsumerCheckPeriod") long slowConsumerCheckPeriod,
+ @Parameter(desc = "the policy to use when a slow consumer is detected", name = "slowConsumerPolicy") String slowConsumerPolicy,
+ @Parameter(desc = "allow jms queues to be created automatically", name = "autoCreateJmsQueues") boolean autoCreateJmsQueues,
+ @Parameter(desc = "allow auto-created jms queues to be deleted automatically", name = "autoDeleteJmsQueues") boolean autoDeleteJmsQueues,
+ @Parameter(desc = "allow jms topics to be created automatically", name = "autoCreateJmsTopics") boolean autoCreateJmsTopics,
+ @Parameter(desc = "allow auto-created jms topics to be deleted automatically", name = "autoDeleteJmsTopics") boolean autoDeleteJmsTopics,
+ @Parameter(desc = "allow queues to be created automatically", name = "autoCreateQueues") boolean autoCreateQueues,
+ @Parameter(desc = "allow auto-created queues to be deleted automatically", name = "autoDeleteQueues") boolean autoDeleteQueues,
+ @Parameter(desc = "allow addresses to be created automatically", name = "autoCreateAddresses") boolean autoCreateAddresses,
+ @Parameter(desc = "allow auto-created addresses to be deleted automatically", name = "autoDeleteAddresses") boolean autoDeleteAddresses,
+ @Parameter(desc = "how to deal with queues deleted from XML at runtime", name = "configDeleteQueues") String configDeleteQueues,
+ @Parameter(desc = "how to deal with addresses deleted from XML at runtime", name = "configDeleteAddresses") String configDeleteAddresses,
+ @Parameter(desc = "used with `BLOCK`, the max size an address can reach before messages are rejected; works in combination with `max-size-bytes` for AMQP clients only", name = "maxSizeBytesRejectThreshold") long maxSizeBytesRejectThreshold,
+ @Parameter(desc = "last-value-key value if none is set on the queue", name = "defaultLastValueKey") String defaultLastValueKey,
+ @Parameter(desc = "non-destructive value if none is set on the queue", name = "defaultNonDestructive") boolean defaultNonDestructive,
+ @Parameter(desc = "exclusive value if none is set on the queue", name = "defaultExclusiveQueue") boolean defaultExclusiveQueue,
+ @Parameter(desc = "group-rebalance value if none is set on the queue", name = "defaultGroupRebalance") boolean defaultGroupRebalance,
+ @Parameter(desc = "group-buckets value if none is set on the queue", name = "defaultGroupBuckets") int defaultGroupBuckets,
+ @Parameter(desc = "group-first-key value if none is set on the queue", name = "defaultGroupFirstKey") String defaultGroupFirstKey,
+ @Parameter(desc = "max-consumers value if none is set on the queue", name = "defaultMaxConsumers") int defaultMaxConsumers,
+ @Parameter(desc = "purge-on-no-consumers value if none is set on the queue", name = "defaultPurgeOnNoConsumers") boolean defaultPurgeOnNoConsumers,
+ @Parameter(desc = "consumers-before-dispatch value if none is set on the queue", name = "defaultConsumersBeforeDispatch") int defaultConsumersBeforeDispatch,
+ @Parameter(desc = "delay-before-dispatch value if none is set on the queue", name = "defaultDelayBeforeDispatch") long defaultDelayBeforeDispatch,
+ @Parameter(desc = "routing-type value if none is set on the queue", name = "defaultQueueRoutingType") String defaultQueueRoutingType,
+ @Parameter(desc = "routing-type value if none is set on the address", name = "defaultAddressRoutingType") String defaultAddressRoutingType,
+ @Parameter(desc = "consumer-window-size value if none is set on the queue", name = "defaultConsumerWindowSize") int defaultConsumerWindowSize,
+ @Parameter(desc = "ring-size value if none is set on the queue", name = "defaultRingSize") long defaultRingSize,
+ @Parameter(desc = "allow created queues to be deleted automatically", name = "autoDeleteCreatedQueues") boolean autoDeleteCreatedQueues,
+ @Parameter(desc = "delay for deleting auto-created queues", name = "autoDeleteQueuesDelay") long autoDeleteQueuesDelay,
+ @Parameter(desc = "the message count the queue must be at or below before it can be auto deleted", name = "autoDeleteQueuesMessageCount") long autoDeleteQueuesMessageCount,
+ @Parameter(desc = "delay for deleting auto-created addresses", name = "autoDeleteAddressesDelay") long autoDeleteAddressesDelay,
+ @Parameter(desc = "factor by which to modify the redelivery delay slightly to avoid collisions", name = "redeliveryCollisionAvoidanceFactor") double redeliveryCollisionAvoidanceFactor,
+ @Parameter(desc = "the number of messages to preserve for future queues created on the matching address", name = "retroactiveMessageCount") long retroactiveMessageCount,
+ @Parameter(desc = "allow dead-letter address & queue to be created automatically", name = "autoCreateDeadLetterResources") boolean autoCreateDeadLetterResources,
+ @Parameter(desc = "prefix to use on auto-create dead-letter queue", name = "deadLetterQueuePrefix") String deadLetterQueuePrefix,
+ @Parameter(desc = "suffix to use on auto-create dead-letter queue", name = "deadLetterQueueSuffix") String deadLetterQueueSuffix) throws Exception;
+
@Operation(desc = "Remove address settings", impact = MBeanOperationInfo.ACTION)
void removeAddressSettings(@Parameter(desc = "an address match", name = "addressMatch") String addressMatch) throws Exception;
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressSettingsInfo.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressSettingsInfo.java
index 52854f2..5a570c2 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressSettingsInfo.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressSettingsInfo.java
@@ -117,6 +117,12 @@ public final class AddressSettingsInfo {
private final long retroactiveMessageCount;
+ private final boolean autoCreateDeadLetterResources;
+
+ private final String deadLetterQueuePrefix;
+
+ private final String deadLetterQueueSuffix;
+
// Static --------------------------------------------------------
public static AddressSettingsInfo from(final String jsonString) {
@@ -167,7 +173,10 @@ public final class AddressSettingsInfo {
object.getJsonNumber("autoDeleteQueuesMessageCount").longValue(),
object.getJsonNumber("autoDeleteAddressesDelay").longValue(),
object.getJsonNumber("redeliveryCollisionAvoidanceFactor").doubleValue(),
- object.getJsonNumber("retroactiveMessageCount").longValue());
+ object.getJsonNumber("retroactiveMessageCount").longValue(),
+ object.getBoolean("autoCreateDeadLetterResources"),
+ object.getString("deadLetterQueuePrefix"),
+ object.getString("deadLetterQueueSuffix"));
}
// Constructors --------------------------------------------------
@@ -218,7 +227,10 @@ public final class AddressSettingsInfo {
long autoDeleteQueuesMessageCount,
long autoDeleteAddressesDelay,
double redeliveryCollisionAvoidanceFactor,
- long retroactiveMessageCount) {
+ long retroactiveMessageCount,
+ boolean autoCreateDeadLetterResources,
+ String deadLetterQueuePrefix,
+ String deadLetterQueueSuffix) {
this.addressFullMessagePolicy = addressFullMessagePolicy;
this.maxSizeBytes = maxSizeBytes;
this.pageSizeBytes = pageSizeBytes;
@@ -266,6 +278,9 @@ public final class AddressSettingsInfo {
this.autoDeleteAddressesDelay = autoDeleteAddressesDelay;
this.redeliveryCollisionAvoidanceFactor = redeliveryCollisionAvoidanceFactor;
this.retroactiveMessageCount = retroactiveMessageCount;
+ this.autoCreateDeadLetterResources = autoCreateDeadLetterResources;
+ this.deadLetterQueuePrefix = deadLetterQueuePrefix;
+ this.deadLetterQueueSuffix = deadLetterQueueSuffix;
}
// Public --------------------------------------------------------
@@ -465,5 +480,17 @@ public final class AddressSettingsInfo {
public long getRetroactiveMessageCount() {
return retroactiveMessageCount;
}
+
+ public boolean isAutoCreateDeadLetterResources() {
+ return autoCreateDeadLetterResources;
+ }
+
+ public String getDeadLetterQueuePrefix() {
+ return deadLetterQueuePrefix;
+ }
+
+ public String getDeadLetterQueueSuffix() {
+ return deadLetterQueueSuffix;
+ }
}
diff --git a/artemis-core-client/src/test/java/org/apache/activemq/artemis/api/core/management/AddressSettingsInfoTest.java b/artemis-core-client/src/test/java/org/apache/activemq/artemis/api/core/management/AddressSettingsInfoTest.java
index 06a1d34..aaa9595 100644
--- a/artemis-core-client/src/test/java/org/apache/activemq/artemis/api/core/management/AddressSettingsInfoTest.java
+++ b/artemis-core-client/src/test/java/org/apache/activemq/artemis/api/core/management/AddressSettingsInfoTest.java
@@ -77,7 +77,10 @@ public class AddressSettingsInfoTest {
"\"autoDeleteQueuesMessageCount\":8,\n" +
"\"autoDeleteAddressesDelay\":3003,\n" +
"\"redeliveryCollisionAvoidanceFactor\":1.1,\n" +
- "\"retroactiveMessageCount\":101\n" +
+ "\"retroactiveMessageCount\":101,\n" +
+ "\"autoCreateDeadLetterResources\":true,\n" +
+ "\"deadLetterQueuePrefix\":\"FOO.\",\n" +
+ "\"deadLetterQueueSuffix\":\".FOO\"\n" +
"}";
AddressSettingsInfo addressSettingsInfo = AddressSettingsInfo.from(json);
assertEquals("fullPolicy", addressSettingsInfo.getAddressFullMessagePolicy());
@@ -127,6 +130,9 @@ public class AddressSettingsInfoTest {
assertEquals(3003, addressSettingsInfo.getAutoDeleteAddressesDelay());
assertEquals(1.1, addressSettingsInfo.getRedeliveryCollisionAvoidanceFactor(), 0);
assertEquals(101, addressSettingsInfo.getRetroactiveMessageCount());
+ assertTrue(addressSettingsInfo.isAutoCreateDeadLetterResources());
+ assertEquals("FOO.", addressSettingsInfo.getDeadLetterQueuePrefix());
+ assertEquals(".FOO", addressSettingsInfo.getDeadLetterQueueSuffix());
}
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
index 0087f44..8305ef6 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
@@ -161,6 +161,12 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
private static final String DEAD_LETTER_ADDRESS_NODE_NAME = "dead-letter-address";
+ private static final String AUTO_CREATE_DEAD_LETTER_RESOURCES_NODE_NAME = "auto-create-dead-letter-resources";
+
+ private static final String DEAD_LETTER_QUEUE_PREFIX_NODE_NAME = "dead-letter-queue-prefix";
+
+ private static final String DEAD_LETTER_QUEUE_SUFFIX_NODE_NAME = "dead-letter-queue-suffix";
+
private static final String EXPIRY_ADDRESS_NODE_NAME = "expiry-address";
private static final String EXPIRY_DELAY_NODE_NAME = "expiry-delay";
@@ -1185,6 +1191,12 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
long retroactiveMessageCount = XMLUtil.parseLong(child);
Validators.GE_ZERO.validate(RETROACTIVE_MESSAGE_COUNT, retroactiveMessageCount);
addressSettings.setRetroactiveMessageCount(retroactiveMessageCount);
+ } else if (AUTO_CREATE_DEAD_LETTER_RESOURCES_NODE_NAME.equalsIgnoreCase(name)) {
+ addressSettings.setAutoCreateDeadLetterResources(XMLUtil.parseBoolean(child));
+ } else if (DEAD_LETTER_QUEUE_PREFIX_NODE_NAME.equalsIgnoreCase(name)) {
+ addressSettings.setDeadLetterQueuePrefix(new SimpleString(getTrimmedTextContent(child)));
+ } else if (DEAD_LETTER_QUEUE_SUFFIX_NODE_NAME.equalsIgnoreCase(name)) {
+ addressSettings.setDeadLetterQueueSuffix(new SimpleString(getTrimmedTextContent(child)));
}
}
return setting;
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
index 87641cc..ecc5d25 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
@@ -2753,6 +2753,9 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
.add("autoDeleteAddressesDelay", addressSettings.getAutoDeleteAddressesDelay())
.add("redeliveryCollisionAvoidanceFactor", addressSettings.getRedeliveryCollisionAvoidanceFactor())
.add("retroactiveMessageCount", addressSettings.getRetroactiveMessageCount())
+ .add("autoCreateDeadLetterResources", addressSettings.isAutoCreateDeadLetterResources())
+ .add("deadLetterQueuePrefix", addressSettings.getDeadLetterQueuePrefix().toString())
+ .add("deadLetterQueueSuffix", addressSettings.getDeadLetterQueueSuffix().toString())
.build()
.toString();
}
@@ -2936,6 +2939,113 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
final long autoDeleteAddressesDelay,
final double redeliveryCollisionAvoidanceFactor,
final long retroactiveMessageCount) throws Exception {
+ addAddressSettings(address,
+ DLA,
+ expiryAddress,
+ expiryDelay,
+ defaultLastValueQueue,
+ maxDeliveryAttempts,
+ maxSizeBytes,
+ pageSizeBytes,
+ pageMaxCacheSize,
+ redeliveryDelay,
+ redeliveryMultiplier,
+ maxRedeliveryDelay,
+ redistributionDelay,
+ sendToDLAOnNoRoute,
+ addressFullMessagePolicy,
+ slowConsumerThreshold,
+ slowConsumerCheckPeriod,
+ slowConsumerPolicy,
+ autoCreateJmsQueues,
+ autoDeleteJmsQueues,
+ autoCreateJmsTopics,
+ autoDeleteJmsTopics,
+ autoCreateQueues,
+ autoDeleteQueues,
+ autoCreateAddresses,
+ autoDeleteAddresses,
+ configDeleteQueues,
+ configDeleteAddresses,
+ maxSizeBytesRejectThreshold,
+ defaultLastValueKey,
+ defaultNonDestructive,
+ defaultExclusiveQueue,
+ defaultGroupRebalance,
+ defaultGroupBuckets,
+ defaultGroupFirstKey,
+ defaultMaxConsumers,
+ defaultPurgeOnNoConsumers,
+ defaultConsumersBeforeDispatch,
+ defaultDelayBeforeDispatch,
+ defaultQueueRoutingType,
+ defaultAddressRoutingType,
+ defaultConsumerWindowSize,
+ defaultRingSize,
+ autoDeleteCreatedQueues,
+ autoDeleteQueuesDelay,
+ autoDeleteQueuesMessageCount,
+ autoDeleteAddressesDelay,
+ redeliveryCollisionAvoidanceFactor,
+ retroactiveMessageCount,
+ AddressSettings.DEFAULT_AUTO_CREATE_DEAD_LETTER_RESOURCES,
+ AddressSettings.DEFAULT_DEAD_LETTER_QUEUE_PREFIX.toString(),
+ AddressSettings.DEFAULT_DEAD_LETTER_QUEUE_SUFFIX.toString());
+ }
+
+ @Override
+ public void addAddressSettings(final String address,
+ final String DLA,
+ final String expiryAddress,
+ final long expiryDelay,
+ final boolean defaultLastValueQueue,
+ final int maxDeliveryAttempts,
+ final long maxSizeBytes,
+ final int pageSizeBytes,
+ final int pageMaxCacheSize,
+ final long redeliveryDelay,
+ final double redeliveryMultiplier,
+ final long maxRedeliveryDelay,
+ final long redistributionDelay,
+ final boolean sendToDLAOnNoRoute,
+ final String addressFullMessagePolicy,
+ final long slowConsumerThreshold,
+ final long slowConsumerCheckPeriod,
+ final String slowConsumerPolicy,
+ final boolean autoCreateJmsQueues,
+ final boolean autoDeleteJmsQueues,
+ final boolean autoCreateJmsTopics,
+ final boolean autoDeleteJmsTopics,
+ final boolean autoCreateQueues,
+ final boolean autoDeleteQueues,
+ final boolean autoCreateAddresses,
+ final boolean autoDeleteAddresses,
+ final String configDeleteQueues,
+ final String configDeleteAddresses,
+ final long maxSizeBytesRejectThreshold,
+ final String defaultLastValueKey,
+ final boolean defaultNonDestructive,
+ final boolean defaultExclusiveQueue,
+ final boolean defaultGroupRebalance,
+ final int defaultGroupBuckets,
+ final String defaultGroupFirstKey,
+ final int defaultMaxConsumers,
+ final boolean defaultPurgeOnNoConsumers,
+ final int defaultConsumersBeforeDispatch,
+ final long defaultDelayBeforeDispatch,
+ final String defaultQueueRoutingType,
+ final String defaultAddressRoutingType,
+ final int defaultConsumerWindowSize,
+ final long defaultRingSize,
+ final boolean autoDeleteCreatedQueues,
+ final long autoDeleteQueuesDelay,
+ final long autoDeleteQueuesMessageCount,
+ final long autoDeleteAddressesDelay,
+ final double redeliveryCollisionAvoidanceFactor,
+ final long retroactiveMessageCount,
+ final boolean autoCreateDeadLetterResources,
+ final String deadLetterQueuePrefix,
+ final String deadLetterQueueSuffix) throws Exception {
if (AuditLogger.isEnabled()) {
AuditLogger.addAddressSettings(this.server, address, DLA, expiryAddress, expiryDelay, defaultLastValueQueue, maxDeliveryAttempts,
maxSizeBytes, pageSizeBytes, pageMaxCacheSize, redeliveryDelay, redeliveryMultiplier,
@@ -2947,7 +3057,8 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
defaultGroupFirstKey, defaultMaxConsumers, defaultPurgeOnNoConsumers, defaultConsumersBeforeDispatch,
defaultDelayBeforeDispatch, defaultQueueRoutingType, defaultAddressRoutingType, defaultConsumerWindowSize,
defaultRingSize, autoDeleteCreatedQueues, autoDeleteQueuesDelay, autoDeleteQueuesMessageCount,
- autoDeleteAddressesDelay, redeliveryCollisionAvoidanceFactor, retroactiveMessageCount);
+ autoDeleteAddressesDelay, redeliveryCollisionAvoidanceFactor, retroactiveMessageCount, autoCreateDeadLetterResources,
+ deadLetterQueuePrefix, deadLetterQueueSuffix);
}
checkStarted();
@@ -3009,6 +3120,9 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
addressSettings.setAutoDeleteAddressesDelay(autoDeleteAddressesDelay);
addressSettings.setRedeliveryCollisionAvoidanceFactor(redeliveryCollisionAvoidanceFactor);
addressSettings.setRetroactiveMessageCount(retroactiveMessageCount);
+ addressSettings.setAutoCreateDeadLetterResources(autoCreateDeadLetterResources);
+ addressSettings.setDeadLetterQueuePrefix(deadLetterQueuePrefix == null ? null : new SimpleString(deadLetterQueuePrefix));
+ addressSettings.setDeadLetterQueueSuffix(deadLetterQueueSuffix == null ? null : new SimpleString(deadLetterQueueSuffix));
server.getAddressSettingsRepository().addMatch(address, addressSettings);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 5aea38c..13bcd4f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -3340,6 +3340,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
final MessageReference ref,
final SimpleString deadLetterAddress) throws Exception {
if (deadLetterAddress != null) {
+
+ createDeadLetterResources();
+
Bindings bindingList = postOffice.lookupBindingsForAddress(deadLetterAddress);
if (bindingList == null || bindingList.getBindings().isEmpty()) {
@@ -3359,6 +3362,17 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
return false;
}
+ private void createDeadLetterResources() throws Exception {
+ AddressSettings addressSettings = server.getAddressSettingsRepository().getMatch(getAddress().toString());
+ if (addressSettings.isAutoCreateDeadLetterResources() && !getAddress().equals(addressSettings.getDeadLetterAddress())) {
+ if (addressSettings.getDeadLetterAddress() != null && addressSettings.getDeadLetterAddress().length() != 0) {
+ SimpleString dlqName = addressSettings.getDeadLetterQueuePrefix().concat(getAddress()).concat(addressSettings.getDeadLetterQueueSuffix());
+ SimpleString dlqFilter = new SimpleString(String.format("%s = '%s'", Message.HDR_ORIGINAL_ADDRESS, getAddress()));
+ server.createQueue(addressSettings.getDeadLetterAddress(), RoutingType.MULTICAST, dlqName, dlqFilter, null, true, false, true, false, true, addressSettings.getDefaultMaxConsumers(), addressSettings.isDefaultPurgeOnNoConsumers(), addressSettings.isDefaultExclusiveQueue(), addressSettings.isDefaultLastValueQueue(), true);
+ }
+ }
+ }
+
private void move(final Transaction originalTX,
final SimpleString address,
final Binding binding,
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
index 4ded874..b152a39 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
@@ -109,6 +109,12 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
// Default address drop threshold, applied to address settings with BLOCK policy. -1 means no threshold enabled.
public static final long DEFAULT_ADDRESS_REJECT_THRESHOLD = -1;
+ public static final boolean DEFAULT_AUTO_CREATE_DEAD_LETTER_RESOURCES = false;
+
+ public static final SimpleString DEFAULT_DEAD_LETTER_QUEUE_PREFIX = SimpleString.toSimpleString("DLQ.");
+
+ public static final SimpleString DEFAULT_DEAD_LETTER_QUEUE_SUFFIX = SimpleString.toSimpleString("");
+
private AddressFullMessagePolicy addressFullMessagePolicy = null;
private Long maxSizeBytes = null;
@@ -215,6 +221,12 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
private Integer defaultConsumerWindowSize = null;
+ private Boolean autoCreateDeadLetterResources = null;
+
+ private SimpleString deadLetterQueuePrefix = null;
+
+ private SimpleString deadLetterQueueSuffix = null;
+
//from amq5
//make it transient
private transient Integer queuePrefetch = null;
@@ -232,6 +244,9 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
this.redeliveryCollisionAvoidanceFactor = other.redeliveryCollisionAvoidanceFactor;
this.maxRedeliveryDelay = other.maxRedeliveryDelay;
this.deadLetterAddress = other.deadLetterAddress;
+ this.autoCreateDeadLetterResources = other.autoCreateDeadLetterResources;
+ this.deadLetterQueuePrefix = other.deadLetterQueuePrefix;
+ this.deadLetterQueueSuffix = other.deadLetterQueueSuffix;
this.expiryAddress = other.expiryAddress;
this.expiryDelay = other.expiryDelay;
this.defaultLastValueQueue = other.defaultLastValueQueue;
@@ -629,6 +644,33 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
return this;
}
+ public boolean isAutoCreateDeadLetterResources() {
+ return autoCreateDeadLetterResources != null ? autoCreateDeadLetterResources : AddressSettings.DEFAULT_AUTO_CREATE_DEAD_LETTER_RESOURCES;
+ }
+
+ public AddressSettings setAutoCreateDeadLetterResources(final boolean value) {
+ autoCreateDeadLetterResources = value;
+ return this;
+ }
+
+ public SimpleString getDeadLetterQueuePrefix() {
+ return deadLetterQueuePrefix != null ? deadLetterQueuePrefix : AddressSettings.DEFAULT_DEAD_LETTER_QUEUE_PREFIX;
+ }
+
+ public AddressSettings setDeadLetterQueuePrefix(final SimpleString value) {
+ deadLetterQueuePrefix = value;
+ return this;
+ }
+
+ public SimpleString getDeadLetterQueueSuffix() {
+ return deadLetterQueueSuffix != null ? deadLetterQueueSuffix : AddressSettings.DEFAULT_DEAD_LETTER_QUEUE_SUFFIX;
+ }
+
+ public AddressSettings setDeadLetterQueueSuffix(final SimpleString value) {
+ deadLetterQueueSuffix = value;
+ return this;
+ }
+
public long getRedistributionDelay() {
return redistributionDelay != null ? redistributionDelay : AddressSettings.DEFAULT_REDISTRIBUTION_DELAY;
}
@@ -933,6 +975,15 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
if (retroactiveMessageCount == null) {
retroactiveMessageCount = merged.retroactiveMessageCount;
}
+ if (autoCreateDeadLetterResources == null) {
+ autoCreateDeadLetterResources = merged.autoCreateDeadLetterResources;
+ }
+ if (deadLetterQueuePrefix == null) {
+ deadLetterQueuePrefix = merged.deadLetterQueuePrefix;
+ }
+ if (deadLetterQueueSuffix == null) {
+ deadLetterQueueSuffix = merged.deadLetterQueueSuffix;
+ }
}
@Override
@@ -1105,6 +1156,18 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
if (buffer.readableBytes() > 0) {
retroactiveMessageCount = BufferHelper.readNullableLong(buffer);
}
+
+ if (buffer.readableBytes() > 0) {
+ autoCreateDeadLetterResources = BufferHelper.readNullableBoolean(buffer);
+ }
+
+ if (buffer.readableBytes() > 0) {
+ deadLetterQueuePrefix = buffer.readNullableSimpleString();
+ }
+
+ if (buffer.readableBytes() > 0) {
+ deadLetterQueueSuffix = buffer.readNullableSimpleString();
+ }
}
@Override
@@ -1158,7 +1221,10 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
BufferHelper.sizeOfNullableLong(autoDeleteQueuesMessageCount) +
BufferHelper.sizeOfNullableBoolean(autoDeleteCreatedQueues) +
BufferHelper.sizeOfNullableLong(defaultRingSize) +
- BufferHelper.sizeOfNullableLong(retroactiveMessageCount);
+ BufferHelper.sizeOfNullableLong(retroactiveMessageCount) +
+ BufferHelper.sizeOfNullableBoolean(autoCreateDeadLetterResources) +
+ SimpleString.sizeofNullableString(deadLetterQueuePrefix) +
+ SimpleString.sizeofNullableString(deadLetterQueueSuffix);
}
@Override
@@ -1264,6 +1330,12 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
buffer.writeNullableSimpleString(defaultGroupFirstKey);
BufferHelper.writeNullableLong(buffer, retroactiveMessageCount);
+
+ BufferHelper.writeNullableBoolean(buffer, autoCreateDeadLetterResources);
+
+ buffer.writeNullableSimpleString(deadLetterQueuePrefix);
+
+ buffer.writeNullableSimpleString(deadLetterQueueSuffix);
}
/* (non-Javadoc)
@@ -1325,6 +1397,9 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
result = prime * result + ((defaultGroupFirstKey == null) ? 0 : defaultGroupFirstKey.hashCode());
result = prime * result + ((defaultRingSize == null) ? 0 : defaultRingSize.hashCode());
result = prime * result + ((retroactiveMessageCount == null) ? 0 : retroactiveMessageCount.hashCode());
+ result = prime * result + ((autoCreateDeadLetterResources == null) ? 0 : autoCreateDeadLetterResources.hashCode());
+ result = prime * result + ((deadLetterQueuePrefix == null) ? 0 : deadLetterQueuePrefix.hashCode());
+ result = prime * result + ((deadLetterQueueSuffix == null) ? 0 : deadLetterQueueSuffix.hashCode());
return result;
}
@@ -1613,6 +1688,25 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
return false;
} else if (!retroactiveMessageCount.equals(other.retroactiveMessageCount))
return false;
+
+ if (autoCreateDeadLetterResources == null) {
+ if (other.autoCreateDeadLetterResources != null)
+ return false;
+ } else if (!autoCreateDeadLetterResources.equals(other.autoCreateDeadLetterResources))
+ return false;
+
+ if (deadLetterQueuePrefix == null) {
+ if (other.deadLetterQueuePrefix != null)
+ return false;
+ } else if (!deadLetterQueuePrefix.equals(other.deadLetterQueuePrefix))
+ return false;
+
+ if (deadLetterQueueSuffix == null) {
+ if (other.deadLetterQueueSuffix != null)
+ return false;
+ } else if (!deadLetterQueueSuffix.equals(other.deadLetterQueueSuffix))
+ return false;
+
return true;
}
@@ -1722,6 +1816,12 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
defaultRingSize +
", retroactiveMessageCount=" +
retroactiveMessageCount +
+ ", autoCreateDeadLetterResources=" +
+ autoCreateDeadLetterResources +
+ ", deadLetterQueuePrefix=" +
+ deadLetterQueuePrefix +
+ ", deadLetterQueueSuffix=" +
+ deadLetterQueueSuffix +
"]";
}
}
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index 1177f4b..867e0dd 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -3061,7 +3061,7 @@
</xsd:documentation>
</xsd:annotation>
<xsd:all>
- <xsd:element maxOccurs="1" minOccurs="0" name="dead-letter-address" type="xsd:string">
+ <xsd:element name="dead-letter-address" type="xsd:string" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
the address to send dead messages to
@@ -3069,6 +3069,31 @@
</xsd:annotation>
</xsd:element>
+ <xsd:element name="auto-create-dead-letter-resources" type="xsd:boolean" default="false" maxOccurs="1" minOccurs="0">
+ <xsd:annotation>
+ <xsd:documentation>
+ whether or not to automatically create the dead-letter-address and/or a corresponding queue
+ on that address when a message found to be undeliverable
+ </xsd:documentation>
+ </xsd:annotation>
+ </xsd:element>
+
+ <xsd:element name="dead-letter-queue-prefix" type="xsd:string" default="DLQ." maxOccurs="1" minOccurs="0">
+ <xsd:annotation>
+ <xsd:documentation>
+ the prefix to use for auto-created dead letter queues
+ </xsd:documentation>
+ </xsd:annotation>
+ </xsd:element>
+
+ <xsd:element name="dead-letter-queue-suffix" type="xsd:string" default="" maxOccurs="1" minOccurs="0">
+ <xsd:annotation>
+ <xsd:documentation>
+ the suffix to use for auto-created dead letter queues
+ </xsd:documentation>
+ </xsd:annotation>
+ </xsd:element>
+
<xsd:element name="expiry-address" type="xsd:string" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
index 2bac659..d90c0fb 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
@@ -58,6 +58,7 @@ import org.apache.activemq.artemis.core.server.metrics.ActiveMQMetricsPlugin;
import org.apache.activemq.artemis.core.server.metrics.plugins.SimpleMetricsPlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBasePlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.critical.CriticalAnalyzerPolicy;
@@ -341,6 +342,9 @@ public class FileConfigurationTest extends ConfigurationImplTest {
assertTrue(conf.getAddressesSettings().get("a2") != null);
assertEquals("a1.1", conf.getAddressesSettings().get("a1").getDeadLetterAddress().toString());
+ assertEquals(AddressSettings.DEFAULT_AUTO_CREATE_DEAD_LETTER_RESOURCES, conf.getAddressesSettings().get("a1").isAutoCreateDeadLetterResources());
+ assertEquals(AddressSettings.DEFAULT_DEAD_LETTER_QUEUE_PREFIX, conf.getAddressesSettings().get("a1").getDeadLetterQueuePrefix());
+ assertEquals(AddressSettings.DEFAULT_DEAD_LETTER_QUEUE_SUFFIX, conf.getAddressesSettings().get("a1").getDeadLetterQueueSuffix());
assertEquals("a1.2", conf.getAddressesSettings().get("a1").getExpiryAddress().toString());
assertEquals(1, conf.getAddressesSettings().get("a1").getRedeliveryDelay());
assertEquals(0.5, conf.getAddressesSettings().get("a1").getRedeliveryCollisionAvoidanceFactor(), 0);
@@ -365,6 +369,9 @@ public class FileConfigurationTest extends ConfigurationImplTest {
assertEquals(0, conf.getAddressesSettings().get("a1").getRetroactiveMessageCount());
assertEquals("a2.1", conf.getAddressesSettings().get("a2").getDeadLetterAddress().toString());
+ assertEquals(true, conf.getAddressesSettings().get("a2").isAutoCreateDeadLetterResources());
+ assertEquals("", conf.getAddressesSettings().get("a2").getDeadLetterQueuePrefix().toString());
+ assertEquals(".DLQ", conf.getAddressesSettings().get("a2").getDeadLetterQueueSuffix().toString());
assertEquals("a2.2", conf.getAddressesSettings().get("a2").getExpiryAddress().toString());
assertEquals(5, conf.getAddressesSettings().get("a2").getRedeliveryDelay());
assertEquals(0.0, conf.getAddressesSettings().get("a2").getRedeliveryCollisionAvoidanceFactor(), 0);
diff --git a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
index 1fd0f7c..8ecebc1 100644
--- a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
+++ b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
@@ -438,6 +438,9 @@
</address-setting>
<address-setting match="a2">
<dead-letter-address>a2.1</dead-letter-address>
+ <auto-create-dead-letter-resources>true</auto-create-dead-letter-resources>
+ <dead-letter-queue-prefix></dead-letter-queue-prefix>
+ <dead-letter-queue-suffix>.DLQ</dead-letter-queue-suffix>
<expiry-address>a2.2</expiry-address>
<redelivery-delay>5</redelivery-delay>
<max-size-bytes>932489234928324</max-size-bytes>
diff --git a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-address-settings.xml b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-address-settings.xml
index 8b5e9a2..5ff60da 100644
--- a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-address-settings.xml
+++ b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-address-settings.xml
@@ -43,6 +43,9 @@
</address-setting>
<address-setting match="a2">
<dead-letter-address>a2.1</dead-letter-address>
+ <auto-create-dead-letter-resources>true</auto-create-dead-letter-resources>
+ <dead-letter-queue-prefix></dead-letter-queue-prefix>
+ <dead-letter-queue-suffix>.DLQ</dead-letter-queue-suffix>
<expiry-address>a2.2</expiry-address>
<redelivery-delay>5</redelivery-delay>
<max-size-bytes>932489234928324</max-size-bytes>
diff --git a/artemis-tools/src/test/resources/artemis-configuration.xsd b/artemis-tools/src/test/resources/artemis-configuration.xsd
index b06c527..dce2fbc 100644
--- a/artemis-tools/src/test/resources/artemis-configuration.xsd
+++ b/artemis-tools/src/test/resources/artemis-configuration.xsd
@@ -2917,7 +2917,7 @@
</xsd:documentation>
</xsd:annotation>
<xsd:all>
- <xsd:element maxOccurs="1" minOccurs="0" name="dead-letter-address" type="xsd:string">
+ <xsd:element name="dead-letter-address" type="xsd:string" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
the address to send dead messages to
@@ -2925,6 +2925,31 @@
</xsd:annotation>
</xsd:element>
+ <xsd:element name="auto-create-dead-letter-resources" type="xsd:boolean" default="false" maxOccurs="1" minOccurs="0">
+ <xsd:annotation>
+ <xsd:documentation>
+ whether or not to automatically create the dead-letter-address and/or a corresponding queue
+ on that address when any matching queue is created
+ </xsd:documentation>
+ </xsd:annotation>
+ </xsd:element>
+
+ <xsd:element name="dead-letter-queue-prefix" type="xsd:string" default="DLQ." maxOccurs="1" minOccurs="0">
+ <xsd:annotation>
+ <xsd:documentation>
+ the prefix to use for auto-created dead letter queues
+ </xsd:documentation>
+ </xsd:annotation>
+ </xsd:element>
+
+ <xsd:element name="dead-letter-queue-suffix" type="xsd:string" default="" maxOccurs="1" minOccurs="0">
+ <xsd:annotation>
+ <xsd:documentation>
+ the suffix to use for auto-created dead letter queues
+ </xsd:documentation>
+ </xsd:annotation>
+ </xsd:element>
+
<xsd:element name="expiry-address" type="xsd:string" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
diff --git a/docs/user-manual/en/address-model.md b/docs/user-manual/en/address-model.md
index 0a6b50a..33f4d1f 100644
--- a/docs/user-manual/en/address-model.md
+++ b/docs/user-manual/en/address-model.md
@@ -569,6 +569,9 @@ that would be found in the `broker.xml` file.
<address-settings>
<address-setting match="order.foo">
<dead-letter-address>DLA</dead-letter-address>
+ <auto-create-dead-letter-resources>false</auto-create-dead-letter-resources>
+ <dead-letter-queue-prefix>DLQ.</dead-letter-queue-prefix>
+ <dead-letter-queue-suffix></dead-letter-queue-suffix>
<expiry-address>ExpiryQueue</expiry-address>
<expiry-delay>123</expiry-delay>
<redelivery-delay>5000</redelivery-delay>
@@ -636,6 +639,19 @@ exceed `max-delivery-attempts`. If no address is defined here then such
messages will simply be discarded. Read more about [undelivered
messages](undelivered-messages.md#configuring-dead-letter-addresses).
+`auto-create-dead-letter-resources` determines whether or not the broker will
+automatically create the defined `dead-letter-address` and a corresponding
+dead-letter queue when a message is undeliverable. Read more in the chapter
+about [undelivered messages](undelivered-messages.md).
+
+`dead-letter-queue-prefix` defines the prefix used for automatically created
+dead-letter queues. Read more in the chapter about
+[undelivered messages](undelivered-messages.md).
+
+`dead-letter-queue-suffix` defines the suffix used for automatically created
+dead-letter queues. Read more in the chapter about
+[undelivered messages](undelivered-messages.md).
+
`expiry-address` defines where to send a message that has expired. If no
address is defined here then such messages will simply be discarded. Read more
about [message expiry](message-expiry.md#configuring-expiry-addresses).
diff --git a/docs/user-manual/en/configuration-index.md b/docs/user-manual/en/configuration-index.md
index 389d619..2186bf2 100644
--- a/docs/user-manual/en/configuration-index.md
+++ b/docs/user-manual/en/configuration-index.md
@@ -204,6 +204,9 @@ Name | Description | Default
---|---|---
[match](address-model.md) | The filter to apply to the setting | n/a
[dead-letter-address](undelivered-messages.md) | Dead letter address | n/a
+[auto-create-dead-letter-resources](undelivered-messages.md) | Whether or not to auto-create dead-letter address and/or queue | `false`
+[dead-letter-queue-prefix](undelivered-messages.md) | Prefix to use for auto-created dead-letter queues | `DLQ.`
+[dead-letter-queue-suffix](undelivered-messages.md) | Suffix to use for auto-created dead-letter queues | `` (empty)
[expiry-address](message-expiry.md) | Expired messages address | n/a
[expiry-delay](message-expiry.md) | Expiration time override; -1 don't override | -1
[redelivery-delay](undelivered-messages.md) | Time to wait before redelivering a message | 0
diff --git a/docs/user-manual/en/undelivered-messages.md b/docs/user-manual/en/undelivered-messages.md
index a6ca2e2..97f85f7 100644
--- a/docs/user-manual/en/undelivered-messages.md
+++ b/docs/user-manual/en/undelivered-messages.md
@@ -100,11 +100,11 @@ must be between 0.0 and 1.0.
`java.util.Random`)
1. Delivery Attempt 1. (Unsuccessful)
-2. Wait Delay Period: 875 // 1000 + (1000 * ((0.5 * __-1__) * __.25__)
+2. Wait Delay Period: 875 // 1000 + (1000 * ((0.5 \* __-1__) * __.25__)
3. Delivery Attempt 2. (Unsuccessful)
-4. Wait Delay Period: 1375 // 1000 + (1000 * ((0.5 * __1__) * __.75__)
+4. Wait Delay Period: 1375 // 1000 + (1000 * ((0.5 \* __1__) * __.75__)
5. Delivery Attempt 3: (Unsuccessful)
-6. Wait Delay Period: 975 // 1000 + (1000 * ((0.5 * __-1__) * __.05__)
+6. Wait Delay Period: 975 // 1000 + (1000 * ((0.5 \* __-1__) * __.05__)
This feature can be particularly useful in environments where there are
multiple consumers on the same queue all interacting transactionally
@@ -117,8 +117,8 @@ small, configurable amount these redelivery "collisions" can be avoided.
### Example
-See [the examples chapter](examples.md) for an example which shows how delayed redelivery is configured
-and used with JMS.
+See [the examples chapter](examples.md) for an example which shows how
+delayed redelivery is configured and used with JMS.
## Dead Letter Addresses
@@ -145,7 +145,7 @@ Dead letter address is defined in the address-setting configuration:
<!-- undelivered messages in exampleQueue will be sent to the dead letter address
deadLetterQueue after 3 unsuccessful delivery attempts -->
<address-setting match="exampleQueue">
- <dead-letter-address>deadLetterQueue</dead-letter-address>
+ <dead-letter-address>deadLetterAddress</dead-letter-address>
<max-delivery-attempts>3</max-delivery-attempts>
</address-setting>
```
@@ -178,10 +178,63 @@ the following properties:
a String property containing the *original queue* of the dead letter
message
+### Automatically Creating Dead Letter Resources
+
+It's common to segregate undelivered messages by their original address.
+For example, a message sent to the `stocks` address that couldn't be
+delivered for some reason might be ultimately routed to the `DLQ.stocks`
+queue, and likewise a message sent to the `orders` address that couldn't
+be delivered might be routed to the `DLQ.orders` queue.
+
+Using this pattern can make it easy to track and administrate
+undelivered messages. However, it can pose a challenge in environments
+which predominantly use auto-created addresses and queues. Typically
+administrators in those environments don't want to manually create
+an `address-setting` to configure the `dead-letter-address` much less
+the actual `address` and `queue` to hold the undelivered messages.
+
+The solution to this problem is to set the `auto-create-dead-letter-resources`
+`address-setting` to `true` (it's `false` by default) so that the
+broker will create the `address` and `queue` to deal with the
+undelivered messages automatically. The `address` created will be the
+one defined by the `dead-letter-address`. A `MULTICAST` `queue` will be
+created on that `address`. It will be named by the `address` to which
+the message was originally sent, and it will have a filter defined using
+the aforementioned `_AMQ_ORIG_ADDRESS` property so that it will only
+receive messages sent to the relevant `address`. The `queue` name can be
+configured with a prefix and suffix. See the relevant settings in the
+table below:
+
+`address-setting`|default
+---|---
+`dead-letter-queue-prefix`|`DLQ.`
+`dead-letter-queue-suffix`|`` (empty string)
+
+Here is an example configuration:
+
+```xml
+<address-setting match="#">
+ <dead-letter-address>DLA</dead-letter-address>
+ <max-delivery-attempts>3</max-delivery-attempts>
+ <auto-create-dead-letter-resources>true</auto-create-dead-letter-resources>
+ <dead-letter-queue-prefix></dead-letter-queue-prefix> <!-- override the default -->
+ <dead-letter-queue-suffix>.DLQ</dead-letter-queue-suffix>
+</address-setting>
+```
+
+The queue holding the undeliverable messages can be accessed directly
+either by using the queue's name by itself (e.g. when using the core
+client) or by using the fully qualified queue name (e.g. when using
+a JMS client) just like any other queue. Also, note that the queue is
+auto-created which means it will be auto-deleted as per the relevant
+`address-settings`.
+
+
### Example
See: Dead Letter section of the [Examples](examples.md) for an example
-that shows how dead letter is configured and used with JMS.
+that shows how dead letter resources can be statically configured and
+used with JMS.
## Delivery Count Persistence
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java
index 53756ef..74c94e8 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java
@@ -754,6 +754,9 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
long autoDeleteAddressesDelay = RandomUtil.randomPositiveLong();
double redeliveryCollisionAvoidanceFactor = RandomUtil.randomDouble();
long retroactiveMessageCount = RandomUtil.randomPositiveLong();
+ boolean autoCreateDeadLetterResources = RandomUtil.randomBoolean();
+ String deadLetterQueuePrefix = RandomUtil.randomString();
+ String deadLetterQueueSuffix = RandomUtil.randomString();
serverControl.addAddressSettings(addressMatch,
DLA,
@@ -803,7 +806,10 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
autoDeleteQueuesMessageCount,
autoDeleteAddressesDelay,
redeliveryCollisionAvoidanceFactor,
- retroactiveMessageCount);
+ retroactiveMessageCount,
+ autoCreateDeadLetterResources,
+ deadLetterQueuePrefix,
+ deadLetterQueueSuffix);
boolean ex = false;
try {
@@ -855,7 +861,10 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
autoDeleteQueuesMessageCount,
autoDeleteAddressesDelay,
redeliveryCollisionAvoidanceFactor,
- retroactiveMessageCount);
+ retroactiveMessageCount,
+ autoCreateDeadLetterResources,
+ deadLetterQueuePrefix,
+ deadLetterQueueSuffix);
} catch (Exception expected) {
ex = true;
}
@@ -914,6 +923,9 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
assertEquals(autoDeleteAddressesDelay, info.getAutoDeleteAddressesDelay());
assertEquals(redeliveryCollisionAvoidanceFactor, info.getRedeliveryCollisionAvoidanceFactor(), 0);
assertEquals(retroactiveMessageCount, info.getRetroactiveMessageCount());
+ assertEquals(autoCreateDeadLetterResources, info.isAutoCreateDeadLetterResources());
+ assertEquals(deadLetterQueuePrefix, info.getDeadLetterQueuePrefix());
+ assertEquals(deadLetterQueueSuffix, info.getDeadLetterQueueSuffix());
serverControl.addAddressSettings(addressMatch,
DLA,
@@ -963,7 +975,10 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
autoDeleteQueuesMessageCount,
autoDeleteAddressesDelay,
redeliveryCollisionAvoidanceFactor,
- retroactiveMessageCount);
+ retroactiveMessageCount,
+ autoCreateDeadLetterResources,
+ deadLetterQueuePrefix,
+ deadLetterQueueSuffix);
jsonString = serverControl.getAddressSettingsAsJSON(exactAddress);
info = AddressSettingsInfo.from(jsonString);
@@ -1015,6 +1030,9 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
assertEquals(autoDeleteAddressesDelay, info.getAutoDeleteAddressesDelay());
assertEquals(redeliveryCollisionAvoidanceFactor, info.getRedeliveryCollisionAvoidanceFactor(), 0);
assertEquals(retroactiveMessageCount, info.getRetroactiveMessageCount());
+ assertEquals(autoCreateDeadLetterResources, info.isAutoCreateDeadLetterResources());
+ assertEquals(deadLetterQueuePrefix, info.getDeadLetterQueuePrefix());
+ assertEquals(deadLetterQueueSuffix, info.getDeadLetterQueueSuffix());
ex = false;
try {
@@ -1066,7 +1084,10 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
autoDeleteQueuesMessageCount,
autoDeleteAddressesDelay,
redeliveryCollisionAvoidanceFactor,
- retroactiveMessageCount);
+ retroactiveMessageCount,
+ autoCreateDeadLetterResources,
+ deadLetterQueuePrefix,
+ deadLetterQueueSuffix);
} catch (Exception e) {
ex = true;
}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
index d340264..e18acf6 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
@@ -1009,6 +1009,114 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes
}
@Override
+ public void addAddressSettings(@Parameter(desc = "an address match", name = "addressMatch") String addressMatch,
+ @Parameter(desc = "the dead letter address setting", name = "DLA") String DLA,
+ @Parameter(desc = "the expiry address setting", name = "expiryAddress") String expiryAddress,
+ @Parameter(desc = "the expiry delay setting", name = "expiryDelay") long expiryDelay,
+ @Parameter(desc = "are any queues created for this address a last value queue", name = "lastValueQueue") boolean lastValueQueue,
+ @Parameter(desc = "the delivery attempts", name = "deliveryAttempts") int deliveryAttempts,
+ @Parameter(desc = "the max size in bytes", name = "maxSizeBytes") long maxSizeBytes,
+ @Parameter(desc = "the page size in bytes", name = "pageSizeBytes") int pageSizeBytes,
+ @Parameter(desc = "the max number of pages in the soft memory cache", name = "pageMaxCacheSize") int pageMaxCacheSize,
+ @Parameter(desc = "the redelivery delay", name = "redeliveryDelay") long redeliveryDelay,
+ @Parameter(desc = "the redelivery delay multiplier", name = "redeliveryMultiplier") double redeliveryMultiplier,
+ @Parameter(desc = "the maximum redelivery delay", name = "maxRedeliveryDelay") long maxRedeliveryDelay,
+ @Parameter(desc = "the redistribution delay", name = "redistributionDelay") long redistributionDelay,
+ @Parameter(desc = "do we send to the DLA when there is no where to route the message", name = "sendToDLAOnNoRoute") boolean sendToDLAOnNoRoute,
+ @Parameter(desc = "the policy to use when the address is full", name = "addressFullMessagePolicy") String addressFullMessagePolicy,
+ @Parameter(desc = "when a consumer falls below this threshold in terms of messages consumed per second it will be considered 'slow'", name = "slowConsumerThreshold") long slowConsumerThreshold,
+ @Parameter(desc = "how often (in seconds) to check for slow consumers", name = "slowConsumerCheckPeriod") long slowConsumerCheckPeriod,
+ @Parameter(desc = "the policy to use when a slow consumer is detected", name = "slowConsumerPolicy") String slowConsumerPolicy,
+ @Parameter(desc = "allow jms queues to be created automatically", name = "autoCreateJmsQueues") boolean autoCreateJmsQueues,
+ @Parameter(desc = "allow auto-created jms queues to be deleted automatically", name = "autoDeleteJmsQueues") boolean autoDeleteJmsQueues,
+ @Parameter(desc = "allow jms topics to be created automatically", name = "autoCreateJmsTopics") boolean autoCreateJmsTopics,
+ @Parameter(desc = "allow auto-created jms topics to be deleted automatically", name = "autoDeleteJmsTopics") boolean autoDeleteJmsTopics,
+ @Parameter(desc = "allow queues to be created automatically", name = "autoCreateQueues") boolean autoCreateQueues,
+ @Parameter(desc = "allow auto-created queues to be deleted automatically", name = "autoDeleteQueues") boolean autoDeleteQueues,
+ @Parameter(desc = "allow topics to be created automatically", name = "autoCreateAddresses") boolean autoCreateAddresses,
+ @Parameter(desc = "allow auto-created topics to be deleted automatically", name = "autoDeleteAddresses") boolean autoDeleteAddresses,
+ @Parameter(desc = "how to deal with queues deleted from XML at runtime", name = "configDeleteQueues") String configDeleteQueues,
+ @Parameter(desc = "how to deal with addresses deleted from XML at runtime", name = "configDeleteAddresses") String configDeleteAddresses,
+ @Parameter(desc = "used with `BLOCK`, the max size an address can reach before messages are rejected; works in combination with `max-size-bytes` for AMQP clients only", name = "maxSizeBytesRejectThreshold") long maxSizeBytesRejectThreshold,
+ @Parameter(desc = "last-value-key value if none is set on the queue", name = "defaultLastValueKey") String defaultLastValueKey,
+ @Parameter(desc = "non-destructive value if none is set on the queue", name = "defaultNonDestructive") boolean defaultNonDestructive,
+ @Parameter(desc = "exclusive value if none is set on the queue", name = "defaultExclusiveQueue") boolean defaultExclusiveQueue,
+ @Parameter(desc = "group-rebalance value if none is set on the queue", name = "defaultGroupRebalance") boolean defaultGroupRebalance,
+ @Parameter(desc = "group-buckets value if none is set on the queue", name = "defaultGroupBuckets") int defaultGroupBuckets,
+ @Parameter(desc = "group-first-key value if none is set on the queue", name = "defaultGroupFirstKey") String defaultGroupFirstKey,
+ @Parameter(desc = "max-consumers value if none is set on the queue", name = "defaultMaxConsumers") int defaultMaxConsumers,
+ @Parameter(desc = "purge-on-no-consumers value if none is set on the queue", name = "defaultPurgeOnNoConsumers") boolean defaultPurgeOnNoConsumers,
+ @Parameter(desc = "consumers-before-dispatch value if none is set on the queue", name = "defaultConsumersBeforeDispatch") int defaultConsumersBeforeDispatch,
+ @Parameter(desc = "delay-before-dispatch value if none is set on the queue", name = "defaultDelayBeforeDispatch") long defaultDelayBeforeDispatch,
+ @Parameter(desc = "routing-type value if none is set on the queue", name = "defaultQueueRoutingType") String defaultQueueRoutingType,
+ @Parameter(desc = "routing-type value if none is set on the address", name = "defaultAddressRoutingType") String defaultAddressRoutingType,
+ @Parameter(desc = "consumer-window-size value if none is set on the queue", name = "defaultConsumerWindowSize") int defaultConsumerWindowSize,
+ @Parameter(desc = "ring-size value if none is set on the queue", name = "defaultRingSize") long defaultRingSize,
+ @Parameter(desc = "allow created queues to be deleted automatically", name = "autoDeleteCreatedQueues") boolean autoDeleteCreatedQueues,
+ @Parameter(desc = "delay for deleting auto-created queues", name = "autoDeleteQueuesDelay") long autoDeleteQueuesDelay,
+ @Parameter(desc = "the message count the queue must be at or below before it can be auto deleted", name = "autoDeleteQueuesMessageCount") long autoDeleteQueuesMessageCount,
+ @Parameter(desc = "delay for deleting auto-created addresses", name = "autoDeleteAddressesDelay") long autoDeleteAddressesDelay,
+ @Parameter(desc = "factor by which to modify the redelivery delay slightly to avoid collisions", name = "redeliveryCollisionAvoidanceFactor") double redeliveryCollisionAvoidanceFactor,
+ @Parameter(desc = "the number of messages to preserve for future queues created on the matching address", name = "retroactiveMessageCount") long retroactiveMessageCount,
+ @Parameter(desc = "allow dead-letter address & queue to be created automatically", name = "autoCreateDeadLetterResources") boolean autoCreateDeadLetterResources,
+ @Parameter(desc = "prefix to use on auto-create dead-letter queue", name = "deadLetterQueuePrefix") String deadLetterQueuePrefix,
+ @Parameter(desc = "suffix to use on auto-create dead-letter queue", name = "deadLetterQueueSuffix") String deadLetterQueueSuffix) throws Exception {
+ proxy.invokeOperation("addAddressSettings",
+ addressMatch,
+ DLA,
+ expiryAddress,
+ expiryDelay,
+ lastValueQueue,
+ deliveryAttempts,
+ maxSizeBytes,
+ pageSizeBytes,
+ pageMaxCacheSize,
+ redeliveryDelay,
+ redeliveryMultiplier,
+ maxRedeliveryDelay,
+ redistributionDelay,
+ sendToDLAOnNoRoute,
+ addressFullMessagePolicy,
+ slowConsumerThreshold,
+ slowConsumerCheckPeriod,
+ slowConsumerPolicy,
+ autoCreateJmsQueues,
+ autoDeleteJmsQueues,
+ autoCreateJmsTopics,
+ autoDeleteJmsTopics,
+ autoCreateQueues,
+ autoDeleteQueues,
+ autoCreateAddresses,
+ autoDeleteAddresses,
+ configDeleteQueues,
+ configDeleteAddresses,
+ maxSizeBytesRejectThreshold,
+ defaultLastValueKey,
+ defaultNonDestructive,
+ defaultExclusiveQueue,
+ defaultGroupRebalance,
+ defaultGroupBuckets,
+ defaultGroupFirstKey,
+ defaultMaxConsumers,
+ defaultPurgeOnNoConsumers,
+ defaultConsumersBeforeDispatch,
+ defaultDelayBeforeDispatch,
+ defaultQueueRoutingType,
+ defaultAddressRoutingType,
+ defaultConsumerWindowSize,
+ defaultRingSize,
+ autoDeleteCreatedQueues,
+ autoDeleteQueuesDelay,
+ autoDeleteQueuesMessageCount,
+ autoDeleteAddressesDelay,
+ redeliveryCollisionAvoidanceFactor,
+ retroactiveMessageCount,
+ autoCreateDeadLetterResources,
+ deadLetterQueuePrefix,
+ deadLetterQueueSuffix);
+ }
+
+ @Override
public String listNetworkTopology() throws Exception {
return (String) proxy.invokeOperation("listNetworkTopology");
}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
index d8c4746..49bb4bb 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
@@ -1073,6 +1073,61 @@ public class QueueControlTest extends ManagementTestBase {
clientConsumer.close();
}
+ /**
+ * Test retry - get a message from auto-created DLA/DLQ and put on original queue.
+ */
+ @Test
+ public void testRetryMessageWithAutoCreatedResources() throws Exception {
+ final SimpleString dla = new SimpleString("DLA");
+ final SimpleString qName = new SimpleString("q1");
+ final SimpleString adName = new SimpleString("ad1");
+ final String sampleText = "Put me on DLQ";
+
+ AddressSettings addressSettings = server.getAddressSettingsRepository().getMatch(adName.toString());
+ final SimpleString dlq = addressSettings.getDeadLetterQueuePrefix().concat(adName).concat(addressSettings.getDeadLetterQueueSuffix());
+
+ server.getAddressSettingsRepository().addMatch(adName.toString(), new AddressSettings().setMaxDeliveryAttempts(1).setDeadLetterAddress(dla).setAutoCreateDeadLetterResources(true));
+
+ session.createQueue(adName, RoutingType.MULTICAST, qName, null, durable);
+
+ // Send message to queue.
+ ClientProducer producer = session.createProducer(adName);
+ producer.send(createTextMessage(session, sampleText));
+ session.start();
+
+ ClientConsumer clientConsumer = session.createConsumer(qName);
+ ClientMessage clientMessage = clientConsumer.receive(500);
+ clientMessage.acknowledge();
+ Assert.assertNotNull(clientMessage);
+
+ Assert.assertEquals(clientMessage.getBodyBuffer().readString(), sampleText);
+
+ // force a rollback to DLQ
+ session.rollback();
+ clientMessage = clientConsumer.receiveImmediate();
+ Assert.assertNull(clientMessage);
+
+ QueueControl queueControl = createManagementControl(dla, dlq);
+ assertMessageMetrics(queueControl, 1, true);
+ final long messageID = getFirstMessageId(queueControl);
+
+ // Retry the message - i.e. it should go from DLQ to original Queue.
+ Assert.assertTrue(queueControl.retryMessage(messageID));
+
+ // Assert DLQ is empty...
+ Assert.assertEquals(0, getMessageCount(queueControl));
+ assertMessageMetrics(queueControl, 0, durable);
+
+ // .. and that the message is now on the original queue once more.
+ clientMessage = clientConsumer.receive(500);
+ clientMessage.acknowledge();
+ Assert.assertNotNull(clientMessage);
+
+ Assert.assertEquals(sampleText, clientMessage.getBodyBuffer().readString());
+
+ clientConsumer.close();
+ }
+
@Test
public void testRetryMessageWithoutDLQ() throws Exception {
final SimpleString qName = new SimpleString("q1");
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/AutoCreateDeadLetterResourcesTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/AutoCreateDeadLetterResourcesTest.java
new file mode 100644
index 0000000..d4009d4
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/AutoCreateDeadLetterResourcesTest.java
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.server;
+
+import javax.jms.JMSContext;
+
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.client.ClientConsumer;
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.api.core.client.ClientProducer;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.RandomUtil;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.apache.activemq.artemis.utils.CompositeAddress;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AutoCreateDeadLetterResourcesTest extends ActiveMQTestBase {
+ public final SimpleString addressA = new SimpleString("addressA");
+ public final SimpleString queueA = new SimpleString("queueA");
+ public final SimpleString dla = new SimpleString("myDLA");
+
+ private ActiveMQServer server;
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ server = createServer(false);
+
+ // set common address settings needed for all tests; make sure to use getMatch instead of addMatch in invidual tests or these will be overwritten
+ server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setAutoCreateDeadLetterResources(true).setDeadLetterAddress(dla).setMaxDeliveryAttempts(1));
+
+ server.start();
+ }
+
+ @Test
+ public void testAutoCreationOfDeadLetterResources() throws Exception {
+ int before = server.getActiveMQServerControl().getQueueNames().length;
+ triggerDlaDelivery();
+ assertNotNull(server.getAddressInfo(dla));
+ assertNotNull(server.locateQueue(AddressSettings.DEFAULT_DEAD_LETTER_QUEUE_PREFIX.concat(addressA).concat(AddressSettings.DEFAULT_DEAD_LETTER_QUEUE_SUFFIX)));
+ assertEquals(2, server.getActiveMQServerControl().getQueueNames().length - before);
+ }
+
+ @Test
+ public void testAutoCreationOfDeadLetterResourcesWithNullDLA() throws Exception {
+ testAutoCreationOfDeadLetterResourcesWithNoDLA(null);
+ }
+
+ @Test
+ public void testAutoCreationOfDeadLetterResourcesWithEmptyDLA() throws Exception {
+ testAutoCreationOfDeadLetterResourcesWithNoDLA(SimpleString.toSimpleString(""));
+ }
+
+ private void testAutoCreationOfDeadLetterResourcesWithNoDLA(SimpleString dla) throws Exception {
+ server.getAddressSettingsRepository().getMatch("#").setDeadLetterAddress(dla);
+ int before = server.getActiveMQServerControl().getQueueNames().length;
+ triggerDlaDelivery();
+ if (dla != null) {
+ assertNull(server.getAddressInfo(dla));
+ }
+ assertNull(server.locateQueue(AddressSettings.DEFAULT_DEAD_LETTER_QUEUE_PREFIX.concat(addressA).concat(AddressSettings.DEFAULT_DEAD_LETTER_QUEUE_SUFFIX)));
+ assertEquals(1, server.getActiveMQServerControl().getQueueNames().length - before);
+ }
+
+ @Test
+ public void testAutoCreateDeadLetterQueuePrefix() throws Exception {
+ SimpleString prefix = RandomUtil.randomSimpleString();
+ server.getAddressSettingsRepository().getMatch("#").setDeadLetterQueuePrefix(prefix);
+ triggerDlaDelivery();
+ assertNotNull(server.locateQueue(prefix.concat(addressA).concat(AddressSettings.DEFAULT_DEAD_LETTER_QUEUE_SUFFIX)));
+ }
+
+ @Test
+ public void testAutoCreateDeadLetterQueueSuffix() throws Exception {
+ SimpleString suffix = RandomUtil.randomSimpleString();
+ server.getAddressSettingsRepository().getMatch("#").setDeadLetterQueueSuffix(suffix);
+ triggerDlaDelivery();
+ assertNotNull(server.locateQueue(AddressSettings.DEFAULT_DEAD_LETTER_QUEUE_PREFIX.concat(addressA).concat(suffix)));
+ }
+
+ @Test
+ public void testAutoCreateDeadLetterQueuePrefixAndSuffix() throws Exception {
+ SimpleString prefix = RandomUtil.randomSimpleString();
+ SimpleString suffix = RandomUtil.randomSimpleString();
+ server.getAddressSettingsRepository().getMatch("#").setDeadLetterQueuePrefix(prefix).setDeadLetterQueueSuffix(suffix);
+ triggerDlaDelivery();
+ assertNotNull(server.locateQueue(prefix.concat(addressA).concat(suffix)));
+ }
+
+ @Test
+ public void testAutoCreatedDeadLetterFilterAnycast() throws Exception {
+ testAutoCreatedDeadLetterFilter(RoutingType.ANYCAST);
+ }
+
+ @Test
+ public void testAutoCreatedDeadLetterFilterMulticast() throws Exception {
+ testAutoCreatedDeadLetterFilter(RoutingType.MULTICAST);
+ }
+
+ private void testAutoCreatedDeadLetterFilter(RoutingType routingType) throws Exception {
+ final int ITERATIONS = 100;
+ final int MESSAGE_COUNT = 10;
+
+ for (int i = 0; i < ITERATIONS; i++) {
+ SimpleString address = RandomUtil.randomSimpleString();
+ SimpleString queue = RandomUtil.randomSimpleString();
+ server.createQueue(address, routingType, queue, null, true, false);
+ ServerLocator locator = createInVMNonHALocator();
+ ClientSessionFactory cf = createSessionFactory(locator);
+ ClientSession s = addClientSession(cf.createSession(true, false));
+ ClientProducer p = s.createProducer(address);
+ for (int j = 0; j < MESSAGE_COUNT; j++) {
+ p.send(s.createMessage(true).setRoutingType(routingType));
+ }
+ p.close();
+ ClientConsumer consumer = s.createConsumer(queue);
+ s.start();
+ for (int j = 0; j < MESSAGE_COUNT; j++) {
+ ClientMessage message = consumer.receive();
+ assertNotNull(message);
+ message.acknowledge();
+ }
+ s.rollback();
+ Queue dlq = server.locateQueue(AddressSettings.DEFAULT_DEAD_LETTER_QUEUE_PREFIX.concat(address).concat(AddressSettings.DEFAULT_DEAD_LETTER_QUEUE_SUFFIX));
+ assertNotNull(dlq);
+ Wait.assertEquals(MESSAGE_COUNT, dlq::getMessageCount);
+ }
+
+ assertEquals(ITERATIONS, server.getPostOffice().getBindingsForAddress(dla).getBindings().size());
+ }
+
+ @Test
+ public void testAutoDeletionAndRecreationOfDeadLetterResources() throws Exception {
+ SimpleString dlqName = AddressSettings.DEFAULT_DEAD_LETTER_QUEUE_PREFIX.concat(addressA).concat(AddressSettings.DEFAULT_DEAD_LETTER_QUEUE_SUFFIX);
+
+ triggerDlaDelivery();
+
+ // consume the message from the DLQ so it will be auto-deleted
+ ServerLocator locator = createInVMNonHALocator();
+ ClientSessionFactory sessionFactory = createSessionFactory(locator);
+ ClientSession session = addClientSession(sessionFactory.createSession(true, true));
+ ClientConsumer consumer = session.createConsumer(dlqName);
+ session.start();
+ ClientMessage message = consumer.receive();
+ assertNotNull(message);
+ message.acknowledge();
+ consumer.close();
+ session.close();
+ sessionFactory.close();
+ locator.close();
+
+ Wait.assertTrue(() -> server.locateQueue(dlqName) == null, 2000, 100);
+
+ server.destroyQueue(queueA);
+
+ triggerDlaDelivery();
+ assertNotNull(server.getAddressInfo(dla));
+ assertNotNull(server.locateQueue(dlqName));
+ }
+
+ @Test
+ public void testWithJMSFQQN() throws Exception {
+ SimpleString dlqName = AddressSettings.DEFAULT_DEAD_LETTER_QUEUE_PREFIX.concat(addressA).concat(AddressSettings.DEFAULT_DEAD_LETTER_QUEUE_SUFFIX);
+ String fqqn = CompositeAddress.toFullyQualified(dla, dlqName).toString();
+
+ triggerDlaDelivery();
+
+ JMSContext context = new ActiveMQConnectionFactory("vm://0").createContext();
+ context.start();
+ assertNotNull(context.createConsumer(context.createQueue(fqqn)).receive(2000));
+ }
+
+ private void triggerDlaDelivery() throws Exception {
+ server.createQueue(addressA, RoutingType.ANYCAST, queueA, null, true, false);
+ ServerLocator locator = createInVMNonHALocator();
+ ClientSessionFactory sessionFactory = createSessionFactory(locator);
+ ClientSession session = addClientSession(sessionFactory.createSession(true, false));
+ ClientProducer producer = addClientProducer(session.createProducer(addressA));
+ producer.send(session.createMessage(true));
+ producer.close();
+ ClientConsumer consumer = addClientConsumer(session.createConsumer(queueA));
+ session.start();
+ ClientMessage message = consumer.receive();
+ assertNotNull(message);
+ message.acknowledge();
+ session.rollback();
+ session.close();
+ sessionFactory.close();
+ locator.close();
+ }
+}