You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2019/08/26 03:41:19 UTC
[activemq-artemis] branch master updated: ARTEMIS-2364 collision
avoidance for redelivery
This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new 449f032 ARTEMIS-2364 collision avoidance for redelivery
new a848572 This closes #2691
449f032 is described below
commit 449f0323ecf0bb540e1303d31c77ff499a6ef5b1
Author: Justin Bertram <jb...@apache.org>
AuthorDate: Fri May 31 15:20:29 2019 -0500
ARTEMIS-2364 collision avoidance for redelivery
This is a feature from 5.x implemented via
https://issues.apache.org/jira/browse/AMQ-747.
---
.../artemis/core/config/impl/Validators.java | 12 +++
.../deployers/impl/FileConfigurationParser.java | 7 ++
.../artemis/core/server/ActiveMQMessageBundle.java | 3 +
.../artemis/core/server/impl/QueueImpl.java | 8 ++
.../core/settings/impl/AddressSettings.java | 32 +++++++
.../resources/schema/artemis-configuration.xsd | 8 ++
.../core/config/impl/FileConfigurationTest.java | 2 +
.../resources/ConfigurationTest-full-config.xml | 1 +
...rationTest-xinclude-config-address-settings.xml | 1 +
.../src/test/resources/artemis-configuration.xsd | 8 ++
docs/user-manual/en/address-model.md | 6 ++
docs/user-manual/en/configuration-index.md | 7 +-
docs/user-manual/en/undelivered-messages.md | 58 ++++++++++---
.../integration/client/RedeliveryConsumerTest.java | 98 ++++++++++++++++++++++
14 files changed, 238 insertions(+), 13 deletions(-)
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/Validators.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/Validators.java
index a5d41ee..52ec5db 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/Validators.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/Validators.java
@@ -98,6 +98,18 @@ public final class Validators {
}
};
+ public static final Validator LE_ONE = new Validator() {
+ @Override
+ public void validate(final String name, final Object value) {
+ Number val = (Number) value;
+ if (val.doubleValue() <= 1) {
+ // OK
+ } else {
+ throw ActiveMQMessageBundle.BUNDLE.lessThanOrEqualToOne(name, val);
+ }
+ }
+ };
+
public static final Validator MINUS_ONE_OR_GT_ZERO = new Validator() {
@Override
public void validate(final String name, final Object value) {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
index 3b9062d..9b23c29 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
@@ -166,6 +166,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
private static final String REDELIVERY_DELAY_MULTIPLIER_NODE_NAME = "redelivery-delay-multiplier";
+ private static final String REDELIVERY_COLLISION_AVOIDANCE_FACTOR_NODE_NAME = "redelivery-collision-avoidance-factor";
+
private static final String MAX_REDELIVERY_DELAY_NODE_NAME = "max-redelivery-delay";
private static final String MAX_DELIVERY_ATTEMPTS = "max-delivery-attempts";
@@ -1046,6 +1048,11 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
addressSettings.setRedeliveryDelay(XMLUtil.parseLong(child));
} else if (REDELIVERY_DELAY_MULTIPLIER_NODE_NAME.equalsIgnoreCase(name)) {
addressSettings.setRedeliveryMultiplier(XMLUtil.parseDouble(child));
+ } else if (REDELIVERY_COLLISION_AVOIDANCE_FACTOR_NODE_NAME.equalsIgnoreCase(name)) {
+ double redeliveryCollisionAvoidanceFactor = XMLUtil.parseDouble(child);
+ Validators.GE_ZERO.validate(REDELIVERY_COLLISION_AVOIDANCE_FACTOR_NODE_NAME, redeliveryCollisionAvoidanceFactor);
+ Validators.LE_ONE.validate(REDELIVERY_COLLISION_AVOIDANCE_FACTOR_NODE_NAME, redeliveryCollisionAvoidanceFactor);
+ addressSettings.setRedeliveryCollisionAvoidanceFactor(redeliveryCollisionAvoidanceFactor);
} else if (MAX_REDELIVERY_DELAY_NODE_NAME.equalsIgnoreCase(name)) {
addressSettings.setMaxRedeliveryDelay(XMLUtil.parseLong(child));
} else if (MAX_SIZE_BYTES_NODE_NAME.equalsIgnoreCase(name)) {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
index c8eca81..8f40777 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
@@ -479,4 +479,7 @@ public interface ActiveMQMessageBundle {
@Message(id = 229227, value = "{0} must be equals to -1 or greater than 0 and less than or equal to Integer.MAX_VALUE (actual value: {1})", format = Message.Format.MESSAGE_FORMAT)
IllegalArgumentException inRangeOfPositiveIntThanMinusOne(String name, Number val);
+
+ @Message(id = 229228, value = "{0} must be less than or equal to 1 (actual value: {1})", format = Message.Format.MESSAGE_FORMAT)
+ IllegalArgumentException lessThanOrEqualToOne(String name, Number val);
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 7e8d82e..ea2ce33 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -28,10 +28,12 @@ import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
+import java.util.Random;
import java.util.Set;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
@@ -3617,9 +3619,15 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
long redeliveryDelay = addressSettings.getRedeliveryDelay();
long maxRedeliveryDelay = addressSettings.getMaxRedeliveryDelay();
double redeliveryMultiplier = addressSettings.getRedeliveryMultiplier();
+ double collisionAvoidanceFactor = addressSettings.getRedeliveryCollisionAvoidanceFactor();
int tmpDeliveryCount = deliveryCount > 0 ? deliveryCount - 1 : 0;
long delay = (long) (redeliveryDelay * (Math.pow(redeliveryMultiplier, tmpDeliveryCount)));
+ if (collisionAvoidanceFactor > 0) {
+ Random random = ThreadLocalRandom.current();
+ double variance = (random.nextBoolean() ? collisionAvoidanceFactor : -collisionAvoidanceFactor) * random.nextDouble();
+ delay += (delay * variance);
+ }
if (delay > maxRedeliveryDelay) {
delay = maxRedeliveryDelay;
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
index 625d652..0129ec7 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
@@ -54,6 +54,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
public static final double DEFAULT_REDELIVER_MULTIPLIER = 1.0;
+ public static final double DEFAULT_REDELIVER_COLLISION_AVOIDANCE_FACTOR = 0.0;
+
public static final boolean DEFAULT_LAST_VALUE_QUEUE = false;
@Deprecated
@@ -125,6 +127,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
private Double redeliveryMultiplier = null;
+ private Double redeliveryCollisionAvoidanceFactor = null;
+
private Long maxRedeliveryDelay = null;
private SimpleString deadLetterAddress = null;
@@ -223,6 +227,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
this.messageCounterHistoryDayLimit = other.messageCounterHistoryDayLimit;
this.redeliveryDelay = other.redeliveryDelay;
this.redeliveryMultiplier = other.redeliveryMultiplier;
+ this.redeliveryCollisionAvoidanceFactor = other.redeliveryCollisionAvoidanceFactor;
this.maxRedeliveryDelay = other.maxRedeliveryDelay;
this.deadLetterAddress = other.deadLetterAddress;
this.expiryAddress = other.expiryAddress;
@@ -566,6 +571,15 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
return this;
}
+ public double getRedeliveryCollisionAvoidanceFactor() {
+ return redeliveryCollisionAvoidanceFactor != null ? redeliveryCollisionAvoidanceFactor : AddressSettings.DEFAULT_REDELIVER_COLLISION_AVOIDANCE_FACTOR;
+ }
+
+ public AddressSettings setRedeliveryCollisionAvoidanceFactor(final double redeliveryCollisionAvoidanceFactor) {
+ this.redeliveryCollisionAvoidanceFactor = redeliveryCollisionAvoidanceFactor;
+ return this;
+ }
+
public long getMaxRedeliveryDelay() {
// default is redelivery-delay * 10 as specified on the docs and at this JIRA:
// https://issues.jboss.org/browse/HORNETQ-1263
@@ -776,6 +790,9 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
if (redeliveryMultiplier == null) {
redeliveryMultiplier = merged.redeliveryMultiplier;
}
+ if (redeliveryCollisionAvoidanceFactor == null) {
+ redeliveryCollisionAvoidanceFactor = merged.redeliveryCollisionAvoidanceFactor;
+ }
if (maxRedeliveryDelay == null) {
maxRedeliveryDelay = merged.maxRedeliveryDelay;
}
@@ -1059,6 +1076,10 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
if (buffer.readableBytes() > 0) {
defaultRingSize = BufferHelper.readNullableLong(buffer);
}
+
+ if (buffer.readableBytes() > 0) {
+ redeliveryCollisionAvoidanceFactor = BufferHelper.readNullableDouble(buffer);
+ }
}
@Override
@@ -1073,6 +1094,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
BufferHelper.sizeOfNullableInteger(messageCounterHistoryDayLimit) +
BufferHelper.sizeOfNullableLong(redeliveryDelay) +
BufferHelper.sizeOfNullableDouble(redeliveryMultiplier) +
+ BufferHelper.sizeOfNullableDouble(redeliveryCollisionAvoidanceFactor) +
BufferHelper.sizeOfNullableLong(maxRedeliveryDelay) +
SimpleString.sizeofNullableString(deadLetterAddress) +
SimpleString.sizeofNullableString(expiryAddress) +
@@ -1210,6 +1232,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
BufferHelper.writeNullableLong(buffer, defaultRingSize);
+ BufferHelper.writeNullableDouble(buffer, redeliveryCollisionAvoidanceFactor);
+
}
/* (non-Javadoc)
@@ -1235,6 +1259,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
result = prime * result + ((pageMaxCache == null) ? 0 : pageMaxCache.hashCode());
result = prime * result + ((redeliveryDelay == null) ? 0 : redeliveryDelay.hashCode());
result = prime * result + ((redeliveryMultiplier == null) ? 0 : redeliveryMultiplier.hashCode());
+ result = prime * result + ((redeliveryCollisionAvoidanceFactor == null) ? 0 : redeliveryCollisionAvoidanceFactor.hashCode());
result = prime * result + ((maxRedeliveryDelay == null) ? 0 : maxRedeliveryDelay.hashCode());
result = prime * result + ((redistributionDelay == null) ? 0 : redistributionDelay.hashCode());
result = prime * result + ((sendToDLAOnNoRoute == null) ? 0 : sendToDLAOnNoRoute.hashCode());
@@ -1363,6 +1388,11 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
return false;
} else if (!redeliveryMultiplier.equals(other.redeliveryMultiplier))
return false;
+ if (redeliveryCollisionAvoidanceFactor == null) {
+ if (other.redeliveryCollisionAvoidanceFactor != null)
+ return false;
+ } else if (!redeliveryCollisionAvoidanceFactor.equals(other.redeliveryCollisionAvoidanceFactor))
+ return false;
if (maxRedeliveryDelay == null) {
if (other.maxRedeliveryDelay != null)
return false;
@@ -1580,6 +1610,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
redeliveryDelay +
", redeliveryMultiplier=" +
redeliveryMultiplier +
+ ", redeliveryCollisionAvoidanceFactor=" +
+ redeliveryCollisionAvoidanceFactor +
", maxRedeliveryDelay=" +
maxRedeliveryDelay +
", redistributionDelay=" +
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index a9e4be6..fea5cc6 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -2991,6 +2991,14 @@
</xsd:annotation>
</xsd:element>
+ <xsd:element name="redelivery-collision-avoidance-factor" type="xsd:double" maxOccurs="1" minOccurs="0">
+ <xsd:annotation>
+ <xsd:documentation>
+ factor by which to modify the redelivery delay slightly to avoid collisions
+ </xsd:documentation>
+ </xsd:annotation>
+ </xsd:element>
+
<xsd:element name="max-redelivery-delay" type="xsd:long" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
index a16d8c0..e0f6372 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
@@ -343,6 +343,7 @@ public class FileConfigurationTest extends ConfigurationImplTest {
assertEquals("a1.1", conf.getAddressesSettings().get("a1").getDeadLetterAddress().toString());
assertEquals("a1.2", conf.getAddressesSettings().get("a1").getExpiryAddress().toString());
assertEquals(1, conf.getAddressesSettings().get("a1").getRedeliveryDelay());
+ assertEquals(0.5, conf.getAddressesSettings().get("a1").getRedeliveryCollisionAvoidanceFactor(), 0);
assertEquals(856686592L, conf.getAddressesSettings().get("a1").getMaxSizeBytes());
assertEquals(817381738L, conf.getAddressesSettings().get("a1").getPageSizeBytes());
assertEquals(10, conf.getAddressesSettings().get("a1").getPageCacheMaxSize());
@@ -365,6 +366,7 @@ public class FileConfigurationTest extends ConfigurationImplTest {
assertEquals("a2.1", conf.getAddressesSettings().get("a2").getDeadLetterAddress().toString());
assertEquals("a2.2", conf.getAddressesSettings().get("a2").getExpiryAddress().toString());
assertEquals(5, conf.getAddressesSettings().get("a2").getRedeliveryDelay());
+ assertEquals(0.0, conf.getAddressesSettings().get("a2").getRedeliveryCollisionAvoidanceFactor(), 0);
assertEquals(932489234928324L, conf.getAddressesSettings().get("a2").getMaxSizeBytes());
assertEquals(712671626L, conf.getAddressesSettings().get("a2").getPageSizeBytes());
assertEquals(20, conf.getAddressesSettings().get("a2").getPageCacheMaxSize());
diff --git a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
index 9b213cc..d0843cd 100644
--- a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
+++ b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
@@ -383,6 +383,7 @@
<dead-letter-address>a1.1</dead-letter-address>
<expiry-address>a1.2</expiry-address>
<redelivery-delay>1</redelivery-delay>
+ <redelivery-collision-avoidance-factor>0.5</redelivery-collision-avoidance-factor>
<max-size-bytes>817M</max-size-bytes>
<page-size-bytes>817381738</page-size-bytes>
<page-max-cache-size>10</page-max-cache-size>
diff --git a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-address-settings.xml b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-address-settings.xml
index 49b625a..92dee3f 100644
--- a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-address-settings.xml
+++ b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-address-settings.xml
@@ -19,6 +19,7 @@
<dead-letter-address>a1.1</dead-letter-address>
<expiry-address>a1.2</expiry-address>
<redelivery-delay>1</redelivery-delay>
+ <redelivery-collision-avoidance-factor>0.5</redelivery-collision-avoidance-factor>
<max-size-bytes>817M</max-size-bytes>
<page-size-bytes>817381738</page-size-bytes>
<page-max-cache-size>10</page-max-cache-size>
diff --git a/artemis-tools/src/test/resources/artemis-configuration.xsd b/artemis-tools/src/test/resources/artemis-configuration.xsd
index 64f32cd..6b3f261 100644
--- a/artemis-tools/src/test/resources/artemis-configuration.xsd
+++ b/artemis-tools/src/test/resources/artemis-configuration.xsd
@@ -2957,6 +2957,14 @@
</xsd:annotation>
</xsd:element>
+ <xsd:element name="redelivery-collision-avoidance-factor" type="xsd:double" maxOccurs="1" minOccurs="0">
+ <xsd:annotation>
+ <xsd:documentation>
+ factor by which to modify the redelivery delay slightly to avoid collisions
+ </xsd:documentation>
+ </xsd:annotation>
+ </xsd:element>
+
<xsd:element name="max-redelivery-delay" type="xsd:long" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
diff --git a/docs/user-manual/en/address-model.md b/docs/user-manual/en/address-model.md
index 7512522..0cbe809 100644
--- a/docs/user-manual/en/address-model.md
+++ b/docs/user-manual/en/address-model.md
@@ -573,6 +573,7 @@ that would be found in the `broker.xml` file.
<expiry-delay>123</expiry-delay>
<redelivery-delay>5000</redelivery-delay>
<redelivery-delay-multiplier>1.0</redelivery-delay-multiplier>
+ <redelivery-collision-avoidance-factor>0.0</redelivery-collision-avoidance-factor>
<max-redelivery-delay>10000</max-redelivery-delay>
<max-delivery-attempts>3</max-delivery-attempts>
<max-size-bytes>100000</max-size-bytes>
@@ -660,6 +661,11 @@ messages](undelivered-messages.md#configuring-delayed-redelivery).
Default is `1.0`. Read more about [undelivered
messages](undelivered-messages.md#configuring-delayed-redelivery).
+`redelivery-collision-avoidance-factor` defines an additional factor used to
+calculate an adjustment to the `redelivery-delay` (up or down). Default is
+`0.0`. Valid values are between 0.0 and 1.0. Read more about [undelivered
+messages](undelivered-messages.md#configuring-delayed-redelivery).
+
`max-size-bytes`, `page-size-bytes`, & `page-max-cache-size` are used to
configure paging on an address. This is explained
[here](paging.md#configuration).
diff --git a/docs/user-manual/en/configuration-index.md b/docs/user-manual/en/configuration-index.md
index 4289e12..0deb104 100644
--- a/docs/user-manual/en/configuration-index.md
+++ b/docs/user-manual/en/configuration-index.md
@@ -204,10 +204,11 @@ Name | Description | Default
[match](address-model.md) | The filter to apply to the setting | n/a
[dead-letter-address](undelivered-messages.md) | Dead letter address | n/a
[expiry-address](message-expiry.md) | Expired messages address | n/a
-[expiry-delay](address-model.md) | Expiration time override; -1 don't override | -1
+[expiry-delay](message-expiry.md) | Expiration time override; -1 don't override | -1
[redelivery-delay](undelivered-messages.md) | Time to wait before redelivering a message | 0
-[redelivery-delay-multiplier](address-model.md) | Multiplier to apply to the `redelivery-delay` | 1.0
-[max-redelivery-delay](address-model.md) | Max value for the `redelivery-delay` | 10 \* `redelivery-delay`
+[redelivery-delay-multiplier](undelivered-messages.md) | Multiplier to apply to the `redelivery-delay` | 1.0
+[redelivery-collision-avoidance-factor](undelivered-messages.md) | an additional factor used to calculate an adjustment to the `redelivery-delay` (up or down) | 0.0
+[max-redelivery-delay](undelivered-messages.md) | Max value for the `redelivery-delay` | 10 \* `redelivery-delay`
[max-delivery-attempts](undelivered-messages.md)| Number of retries before dead letter address| 10
[max-size-bytes](paging.md)| Max size a queue can be before invoking `address-full-policy` | -1
[max-size-bytes-reject-threshold]() | Used with `BLOCK`, the max size an address can reach before messages are rejected; works in combination with `max-size-bytes` **for AMQP clients only**. | -1
diff --git a/docs/user-manual/en/undelivered-messages.md b/docs/user-manual/en/undelivered-messages.md
index 5ef5565..a6ca2e2 100644
--- a/docs/user-manual/en/undelivered-messages.md
+++ b/docs/user-manual/en/undelivered-messages.md
@@ -42,6 +42,8 @@ Delayed redelivery is defined in the address-setting configuration:
<redelivery-delay-multiplier>1.5</redelivery-delay-multiplier>
<!-- default is 0 (no delay) -->
<redelivery-delay>5000</redelivery-delay>
+ <!-- default is 0.0) -->
+ <redelivery-collision-avoidance-factor>0.15</redelivery-collision-avoidance-factor>
<!-- default is redelivery-delay * 10 -->
<max-redelivery-delay>50000</max-redelivery-delay>
</address-setting>
@@ -59,24 +61,60 @@ message will be sent asynchronously back to the queue after the delay.
You can specify a multiplier (the `redelivery-delay-multiplier`) that will
take effect on top of the `redelivery-delay`. Each time a message is redelivered
the delay period will be equal to the previous delay * `redelivery-delay-multiplier`.
-A max-redelivery-delay can be set to prevent the delay from becoming too large.
-The max-redelivery-delay is defaulted to redelivery-delay \* 10.
+A `max-redelivery-delay` can be set to prevent the delay from becoming too large.
+The `max-redelivery-delay` is defaulted to `redelivery-delay` \* 10.
-Example:
+**Example:**
- - redelivery-delay=5000, redelivery-delay-multiplier=2, max-redelivery-delay=15000
+- redelivery-delay=5000, redelivery-delay-multiplier=2, max-redelivery-delay=15000,
+ redelivery-collision-avoidance-factor=0.0
- 1. Delivery Attempt 1. (Unsuccessful)
- 2. Wait Delay Period: 5000
- 3. Delivery Attempt 2. (Unsuccessful)
- 4. Wait Delay Period: 10000 // (5000 * 2) < max-delay-period. Use 10000
- 5. Delivery Attempt 3: (Unsuccessful)
- 6. Wait Delay Period: 15000 // (10000 * 2) > max-delay-period: Use max-delay-delivery
+1. Delivery Attempt 1. (Unsuccessful)
+2. Wait Delay Period: 5000
+3. Delivery Attempt 2. (Unsuccessful)
+4. Wait Delay Period: 10000 // (5000 * 2) < max-delay-period. Use 10000
+5. Delivery Attempt 3: (Unsuccessful)
+6. Wait Delay Period: 15000 // (10000 * 2) > max-delay-period: Use max-delay-delivery
Address wildcards can be used to configure redelivery delay for a set of
addresses (see [Understanding the Wildcard Syntax](wildcard-syntax.md)), so you don't have to specify redelivery delay
individually for each address.
+The `redelivery-delay` can be also be modified by configuring the
+`redelivery-collision-avoidance-factor`. This factor will be made either
+positive or negative at random to control whether the ultimate value will
+increase or decrease the `redelivery-delay`. Then it's multiplied by a random
+number between 0.0 and 1.0. This result is then multiplied by the
+`redelivery-delay` and then added to the `redelivery-delay` to arrive at the
+final value.
+
+The algorithm may sound complicated but the bottom line is quite simple: the
+larger `redelivery-collision-avoidance-factor` you choose the larger the variance
+of the `redelivery-delay` will be. The `redelivery-collision-avoidance-factor`
+must be between 0.0 and 1.0.
+
+**Example:**
+
+- redelivery-delay=1000, redelivery-delay-multiplier=1, max-redelivery-delay=15000,
+ redelivery-collision-avoidance-factor=0.5, (bold values chosen using
+ `java.util.Random`)
+
+1. Delivery Attempt 1. (Unsuccessful)
+2. Wait Delay Period: 875 // 1000 + (1000 * ((0.5 * __-1__) * __.25__)
+3. Delivery Attempt 2. (Unsuccessful)
+4. Wait Delay Period: 1375 // 1000 + (1000 * ((0.5 * __1__) * __.75__)
+5. Delivery Attempt 3: (Unsuccessful)
+6. Wait Delay Period: 975 // 1000 + (1000 * ((0.5 * __-1__) * __.05__)
+
+This feature can be particularly useful in environments where there are
+multiple consumers on the same queue all interacting transactionally
+with the same external system (e.g. a database). If there is overlapping
+data in messages which are consumed concurrently then one transaction can
+succeed while all the rest fail. If those failed messages are redelivered
+at the same time then this process where one consumer succeeds and the
+rest fail will continue. By randomly padding the redelivery-delay by a
+small, configurable amount these redelivery "collisions" can be avoided.
+
### Example
See [the examples chapter](examples.md) for an example which shows how delayed redelivery is configured
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/RedeliveryConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/RedeliveryConsumerTest.java
index cfe6fa9..69e516e 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/RedeliveryConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/RedeliveryConsumerTest.java
@@ -17,6 +17,7 @@
package org.apache.activemq.artemis.tests.integration.client;
import java.util.List;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.ActiveMQException;
@@ -302,6 +303,103 @@ public class RedeliveryConsumerTest extends ActiveMQTestBase {
}
+ @Test
+ public void testRedeliveryCollisionAvoidance() throws Exception {
+ setUp(false);
+ int numberOfThreads = 10;
+ long redeliveryDelay = 1000;
+ server.getAddressSettingsRepository().getMatch(ADDRESS.toString()).setRedeliveryDelay(redeliveryDelay).setRedeliveryCollisionAvoidanceFactor(0.5);
+
+ ClientSession session = factory.createSession(false, false, false);
+ ClientProducer prod = session.createProducer(ADDRESS);
+ for (int i = 0; i < numberOfThreads; i++) {
+ prod.send(createTextMessage(session, "Hello" + i));
+ }
+ session.commit();
+ session.close();
+
+ final CountDownLatch aligned = new CountDownLatch(numberOfThreads);
+ final CountDownLatch startRollback = new CountDownLatch(1);
+
+ class ConsumerThread extends Thread {
+
+ ConsumerThread(int i) {
+ super("RedeliveryCollisionAvoidance::" + i);
+ }
+
+ long delay = 0;
+ int errors = 0;
+
+ @Override
+ public void run() {
+ try (ServerLocator locator = createInVMNonHALocator()) {
+ locator.setConsumerWindowSize(0);
+ ClientSessionFactory factory = locator.createSessionFactory();
+ ClientSession session = factory.createSession(false, false, false);
+ session.start();
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+ ClientMessage msg = consumer.receive(5000);
+ assertNotNull(msg);
+ msg.acknowledge();
+ aligned.countDown();
+ startRollback.await();
+ session.rollback();
+ long start = System.currentTimeMillis();
+ msg = consumer.receive(5000);
+ delay = System.currentTimeMillis() - start;
+ assertNotNull(msg);
+ msg.acknowledge();
+ session.commit();
+ } catch (Exception e) {
+ e.printStackTrace();
+ errors++;
+ }
+ }
+ }
+
+ ConsumerThread[] threads = new ConsumerThread[numberOfThreads];
+
+ for (int i = 0; i < numberOfThreads; i++) {
+ threads[i] = new ConsumerThread(i);
+ threads[i].start();
+ }
+
+ aligned.await();
+ startRollback.countDown();
+
+ try {
+ for (ConsumerThread t : threads) {
+ t.join(60000);
+ assertFalse(t.isAlive());
+ assertEquals("There are Errors on the test thread", 0, t.errors);
+ }
+ } finally {
+ for (ConsumerThread t : threads) {
+ if (t.isAlive()) {
+ t.interrupt();
+ }
+ t.join(1000);
+ }
+ }
+
+ long maxDelay = 0;
+ long minDelay = Long.MAX_VALUE;
+
+ for (ConsumerThread t : threads) {
+ if (t.delay < minDelay) {
+ minDelay = t.delay;
+ }
+ if (t.delay > maxDelay) {
+ maxDelay = t.delay;
+ }
+ }
+
+ // make sure the difference between the minimum redelivery delay and the maximum redelivery delay is larger that the expected nominal variance
+ assertTrue((maxDelay - minDelay) > (redeliveryDelay * .05));
+
+ factory.close();
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------