You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/06/16 05:40:30 UTC
[pulsar] branch master updated: [broker] Add config to allow deliverAt time to be strictly honored (#16068)
This is an automated email from the ASF dual-hosted git repository.
mmarshall pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new b8835bb0c34 [broker] Add config to allow deliverAt time to be strictly honored (#16068)
b8835bb0c34 is described below
commit b8835bb0c348c676c92959abe1f4a61b3751d40c
Author: Michael Marshall <mm...@apache.org>
AuthorDate: Thu Jun 16 00:40:24 2022 -0500
[broker] Add config to allow deliverAt time to be strictly honored (#16068)
* [broker] Add config to allow deliverAt time to be strictly honored
* Fix checkstyle error (this is what happens why you change names last minute)
* Improve documentation; add private final modifiers
### Motivation
The current implementation for `InMemoryDelayedDeliveryTracker` allows messages to deliver early when their `deliverAt` time is within `tickTimeMillis` from now. This is an optimization that ensures messages deliver around the `deliverAt` time. However, some use cases require that messages do not deliver before the `deliverAt` time. (Note that the client api includes a `deliverAfter` method that implies messages won't deliver before some duration of time.)
In order to support this alternative implementation, this PR adds a broker configuration named `isDelayedDeliveryDeliverAtTimeStrict`. When true, messages will only deliver when the `deliverAt` time is greater than or equal to `now`. Note that a tradeoff here is that messages will be later than the `deliverAt` time.
There are two factors that will determine how late messages will get to consumers. The first is the topic's `DelayedDeliveryTickTimeMillis` and the second is the broker's `delayedDeliveryTickTimeMillis`. The first will determine how frequently a timer will be scheduled to deliver delayed messages. The second is used to determine the tick time of the `HashedWheelTimer`, and as a result, can compound with the topic's delay to make a message deliver even later.
### Modifications
* Add broker config named `isDelayedDeliveryDeliverAtTimeStrict`. This config defaults to `false` to maintain the original behavior.
* Update the `InMemoryDelayedDeliveryTracker#addMessage` method so that it will return false when `deliverAt <= getCutoffTime()` instead of just `deliverAt <= getCutoffTime()`.
* Update documentation in several places.
* Implement `InMemoryDelayedDeliveryTracker#getCutoffTime` method that returns the right cutoff time based on the value of `isDelayedDeliveryDeliverAtTimeStrict`. This is the core logical change.
* Update `InMemoryDelayedDeliveryTracker#updateTimer` so that it will not schedule a tick to run sooner that the most recent tick run plus the `tickTimeMillis`. This will ensure the timer is not run too frequently. It is also backwards compatible since the existing feature will deliver any messages that were within now plus the `tickTimeMillis`.
* Add new tests to cover the new configuration.
### Verifying this change
New tests are added as part of this change.
### Does this pull request potentially affect one of the following parts:
This is a new feature that maintains backwards compatibility.
---
conf/broker.conf | 10 ++
.../apache/pulsar/broker/ServiceConfiguration.java | 13 +-
.../delayed/InMemoryDelayedDeliveryTracker.java | 83 ++++++++----
.../InMemoryDelayedDeliveryTrackerFactory.java | 6 +-
.../delayed/InMemoryDeliveryTrackerTest.java | 149 +++++++++++++++++++--
site2/docs/concepts-messaging.md | 10 ++
6 files changed, 232 insertions(+), 39 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index 2cde2073678..7c7b2c71401 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -514,9 +514,19 @@ delayedDeliveryEnabled=true
# Control the tick time for when retrying on delayed delivery,
# affecting the accuracy of the delivery time compared to the scheduled time.
+# Note that this time is used to configure the HashedWheelTimer's tick time for the
+# InMemoryDelayedDeliveryTrackerFactory (the default DelayedDeliverTrackerFactory).
# Default is 1 second.
delayedDeliveryTickTimeMillis=1000
+# When using the InMemoryDelayedDeliveryTrackerFactory (the default DelayedDeliverTrackerFactory), whether
+# the deliverAt time is strictly followed. When false (default), messages may be sent to consumers before the deliverAt
+# time by as much as the tickTimeMillis. This can reduce the overhead on the broker of maintaining the delayed index
+# for a potentially very short time period. When true, messages will not be sent to consumer until the deliverAt time
+# has passed, and they may be as late as the deliverAt time plus the tickTimeMillis for the topic plus the
+# delayedDeliveryTickTimeMillis.
+isDelayedDeliveryDeliverAtTimeStrict=false
+
# Whether to enable acknowledge of batch local index.
acknowledgmentAtBatchIndexLevelEnabled=false
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index b6a8ae02cfa..2a0ee8356e1 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -332,9 +332,20 @@ public class ServiceConfiguration implements PulsarConfiguration {
+ ".InMemoryDelayedDeliveryTrackerFactory";
@FieldContext(category = CATEGORY_SERVER, doc = "Control the tick time for when retrying on delayed delivery, "
- + " affecting the accuracy of the delivery time compared to the scheduled time. Default is 1 second.")
+ + "affecting the accuracy of the delivery time compared to the scheduled time. Default is 1 second. "
+ + "Note that this time is used to configure the HashedWheelTimer's tick time for the "
+ + "InMemoryDelayedDeliveryTrackerFactory.")
private long delayedDeliveryTickTimeMillis = 1000;
+ @FieldContext(category = CATEGORY_SERVER, doc = "When using the InMemoryDelayedDeliveryTrackerFactory (the default "
+ + "DelayedDeliverTrackerFactory), whether the deliverAt time is strictly followed. When false (default), "
+ + "messages may be sent to consumers before the deliverAt time by as much as the tickTimeMillis. This can "
+ + "reduce the overhead on the broker of maintaining the delayed index for a potentially very short time "
+ + "period. When true, messages will not be sent to consumer until the deliverAt time has passed, and they "
+ + "may be as late as the deliverAt time plus the tickTimeMillis for the topic plus the "
+ + "delayedDeliveryTickTimeMillis.")
+ private boolean isDelayedDeliveryDeliverAtTimeStrict = false;
+
@FieldContext(category = CATEGORY_SERVER, doc = "Whether to enable the acknowledge of batch local index")
private boolean acknowledgmentAtBatchIndexLevelEnabled = false;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
index d0eec0098b7..92df563dad4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
@@ -46,39 +46,55 @@ public class InMemoryDelayedDeliveryTracker implements DelayedDeliveryTracker, T
// Timestamp at which the timeout is currently set
private long currentTimeoutTarget;
+ // Last time the TimerTask was triggered for this class
+ private long lastTickRun;
+
private long tickTimeMillis;
private final Clock clock;
- InMemoryDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher, Timer timer, long tickTimeMillis) {
- this(dispatcher, timer, tickTimeMillis, Clock.systemUTC());
+ private final boolean isDelayedDeliveryDeliverAtTimeStrict;
+
+ InMemoryDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher, Timer timer, long tickTimeMillis,
+ boolean isDelayedDeliveryDeliverAtTimeStrict) {
+ this(dispatcher, timer, tickTimeMillis, Clock.systemUTC(), isDelayedDeliveryDeliverAtTimeStrict);
}
InMemoryDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher, Timer timer,
- long tickTimeMillis, Clock clock) {
+ long tickTimeMillis, Clock clock, boolean isDelayedDeliveryDeliverAtTimeStrict) {
this.dispatcher = dispatcher;
this.timer = timer;
this.tickTimeMillis = tickTimeMillis;
this.clock = clock;
+ this.isDelayedDeliveryDeliverAtTimeStrict = isDelayedDeliveryDeliverAtTimeStrict;
+ }
+
+ /**
+ * When {@link #isDelayedDeliveryDeliverAtTimeStrict} is false, we allow for early delivery by as much as the
+ * {@link #tickTimeMillis} because it is a slight optimization to let messages skip going back into the delay
+ * tracker for a brief amount of time when we're already trying to dispatch to the consumer.
+ *
+ * When {@link #isDelayedDeliveryDeliverAtTimeStrict} is true, we use the current time to determine when messages
+ * can be delivered. As a consequence, there are two delays that will affect delivery. The first is the
+ * {@link #tickTimeMillis} and the second is the {@link Timer}'s granularity.
+ *
+ * @return the cutoff time to determine whether a message is ready to deliver to the consumer
+ */
+ private long getCutoffTime() {
+ return isDelayedDeliveryDeliverAtTimeStrict ? clock.millis() : clock.millis() + tickTimeMillis;
}
@Override
- public boolean addMessage(long ledgerId, long entryId, long deliveryAt) {
- long now = clock.millis();
+ public boolean addMessage(long ledgerId, long entryId, long deliverAt) {
if (log.isDebugEnabled()) {
log.debug("[{}] Add message {}:{} -- Delivery in {} ms ", dispatcher.getName(), ledgerId, entryId,
- deliveryAt - now);
+ deliverAt - clock.millis());
}
- if (deliveryAt < (now + tickTimeMillis)) {
- // It's already about time to deliver this message. We add the buffer of
- // `tickTimeMillis` because messages can be extracted from the tracker
- // slightly before the expiration time. We don't want the messages to
- // go back into the delay tracker (for a brief amount of time) when we're
- // trying to dispatch to the consumer.
+ if (deliverAt <= getCutoffTime()) {
return false;
}
- priorityQueue.add(deliveryAt, ledgerId, entryId);
+ priorityQueue.add(deliverAt, ledgerId, entryId);
updateTimer();
return true;
}
@@ -88,11 +104,8 @@ public class InMemoryDelayedDeliveryTracker implements DelayedDeliveryTracker, T
*/
@Override
public boolean hasMessageAvailable() {
- // Avoid the TimerTask run before reach the timeout.
- long cutOffTime = clock.millis() + tickTimeMillis;
- boolean hasMessageAvailable = !priorityQueue.isEmpty() && priorityQueue.peekN1() <= cutOffTime;
+ boolean hasMessageAvailable = !priorityQueue.isEmpty() && priorityQueue.peekN1() <= getCutoffTime();
if (!hasMessageAvailable) {
- // prevent the first delay message later than cutoffTime
updateTimer();
}
return hasMessageAvailable;
@@ -105,11 +118,7 @@ public class InMemoryDelayedDeliveryTracker implements DelayedDeliveryTracker, T
public Set<PositionImpl> getScheduledMessages(int maxMessages) {
int n = maxMessages;
Set<PositionImpl> positions = new TreeSet<>();
- long now = clock.millis();
- // Pick all the messages that will be ready within the tick time period.
- // This is to avoid keeping rescheduling the timer for each message at
- // very short delay
- long cutoffTime = now + tickTimeMillis;
+ long cutoffTime = getCutoffTime();
while (n > 0 && !priorityQueue.isEmpty()) {
long timestamp = priorityQueue.peekN1();
@@ -150,6 +159,17 @@ public class InMemoryDelayedDeliveryTracker implements DelayedDeliveryTracker, T
return priorityQueue.size();
}
+ /**
+ * Update the scheduled timer task such that:
+ * 1. If there are no delayed messages, return and do not schedule a timer task.
+ * 2. If the next message in the queue has the same deliverAt time as the timer task, return and leave existing
+ * timer task in place.
+ * 3. If the deliverAt time for the next delayed message has already passed (i.e. the delay is negative), return
+ * without scheduling a timer task since the subscription is backlogged.
+ * 4. Else, schedule a timer task where the delay is the greater of these two: the next message's deliverAt time or
+ * the last tick time plus the tickTimeMillis (to ensure we do not schedule the task more frequently than the
+ * tickTimeMillis).
+ */
private void updateTimer() {
if (priorityQueue.isEmpty()) {
if (timeout != null) {
@@ -170,10 +190,8 @@ public class InMemoryDelayedDeliveryTracker implements DelayedDeliveryTracker, T
timeout.cancel();
}
- long delayMillis = timestamp - clock.millis();
- if (log.isDebugEnabled()) {
- log.debug("[{}] Start timer in {} millis", dispatcher.getName(), delayMillis);
- }
+ long now = clock.millis();
+ long delayMillis = timestamp - now;
if (delayMillis < 0) {
// There are messages that are already ready to be delivered. If
@@ -185,8 +203,18 @@ public class InMemoryDelayedDeliveryTracker implements DelayedDeliveryTracker, T
return;
}
+ // Compute the earliest time that we schedule the timer to run.
+ long remainingTickDelayMillis = lastTickRun + tickTimeMillis - now;
+ long calculatedDelayMillis = Math.max(delayMillis, remainingTickDelayMillis);
+
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Start timer in {} millis", dispatcher.getName(), calculatedDelayMillis);
+ }
+
+ // Even though we may delay longer than this timestamp because of the tick delay, we still track the
+ // current timeout with reference to the next message's timestamp.
currentTimeoutTarget = timestamp;
- timeout = timer.newTimeout(this, delayMillis, TimeUnit.MILLISECONDS);
+ timeout = timer.newTimeout(this, calculatedDelayMillis, TimeUnit.MILLISECONDS);
}
@Override
@@ -199,6 +227,7 @@ public class InMemoryDelayedDeliveryTracker implements DelayedDeliveryTracker, T
}
synchronized (dispatcher) {
+ lastTickRun = clock.millis();
currentTimeoutTarget = -1;
this.timeout = null;
dispatcher.readMoreEntries();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTrackerFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTrackerFactory.java
index b1a9b263369..5c04a6d53b2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTrackerFactory.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTrackerFactory.java
@@ -31,16 +31,20 @@ public class InMemoryDelayedDeliveryTrackerFactory implements DelayedDeliveryTra
private long tickTimeMillis;
+ private boolean isDelayedDeliveryDeliverAtTimeStrict;
+
@Override
public void initialize(ServiceConfiguration config) {
this.timer = new HashedWheelTimer(new DefaultThreadFactory("pulsar-delayed-delivery"),
config.getDelayedDeliveryTickTimeMillis(), TimeUnit.MILLISECONDS);
this.tickTimeMillis = config.getDelayedDeliveryTickTimeMillis();
+ this.isDelayedDeliveryDeliverAtTimeStrict = config.isDelayedDeliveryDeliverAtTimeStrict();
}
@Override
public DelayedDeliveryTracker newTracker(PersistentDispatcherMultipleConsumers dispatcher) {
- return new InMemoryDelayedDeliveryTracker(dispatcher, timer, tickTimeMillis);
+ return new InMemoryDelayedDeliveryTracker(dispatcher, timer, tickTimeMillis,
+ isDelayedDeliveryDeliverAtTimeStrict);
}
@Override
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
index d7b304d8a0c..f44f61a67f9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
@@ -21,13 +21,16 @@ package org.apache.pulsar.broker.delayed;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyLong;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
+import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.Timer;
import io.netty.util.TimerTask;
@@ -40,27 +43,38 @@ import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import io.netty.util.concurrent.DefaultThreadFactory;
import lombok.Cleanup;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterClass;
import org.testng.annotations.Test;
@Test(groups = "broker")
public class InMemoryDeliveryTrackerTest {
+ // Create a single shared timer for the test.
+ private final Timer timer = new HashedWheelTimer(new DefaultThreadFactory("pulsar-in-memory-delayed-delivery-test"),
+ 500, TimeUnit.MILLISECONDS);
+
+ @AfterClass(alwaysRun = true)
+ public void cleanup() {
+ timer.stop();
+ }
+
@Test
public void test() throws Exception {
PersistentDispatcherMultipleConsumers dispatcher = mock(PersistentDispatcherMultipleConsumers.class);
- Timer timer = mock(Timer.class);
-
AtomicLong clockTime = new AtomicLong();
Clock clock = mock(Clock.class);
when(clock.millis()).then(x -> clockTime.get());
@Cleanup
- InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock);
+ InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock,
+ false);
assertFalse(tracker.hasMessageAvailable());
@@ -131,7 +145,8 @@ public class InMemoryDeliveryTrackerTest {
});
@Cleanup
- InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock);
+ InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock,
+ false);
assertTrue(tasks.isEmpty());
assertTrue(tracker.addMessage(2, 2, 20));
@@ -160,29 +175,143 @@ public class InMemoryDeliveryTrackerTest {
/**
* Adding a message that is about to expire within the tick time should lead
- * to a rejection from the tracker.
+ * to a rejection from the tracker when isDelayedDeliveryDeliverAtTimeStrict is false.
*/
@Test
public void testAddWithinTickTime() {
PersistentDispatcherMultipleConsumers dispatcher = mock(PersistentDispatcherMultipleConsumers.class);
- Timer timer = mock(Timer.class);
-
AtomicLong clockTime = new AtomicLong();
Clock clock = mock(Clock.class);
when(clock.millis()).then(x -> clockTime.get());
@Cleanup
- InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 100, clock);
+ InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 100, clock,
+ false);
clockTime.set(0);
assertFalse(tracker.addMessage(1, 1, 10));
assertFalse(tracker.addMessage(2, 2, 99));
- assertTrue(tracker.addMessage(3, 3, 100));
- assertTrue(tracker.addMessage(4, 4, 200));
+ assertFalse(tracker.addMessage(3, 3, 100));
+ assertTrue(tracker.addMessage(4, 4, 101));
+ assertTrue(tracker.addMessage(5, 5, 200));
assertEquals(tracker.getNumberOfDelayedMessages(), 2);
}
+ public void testAddMessageWithStrictDelay() {
+ PersistentDispatcherMultipleConsumers dispatcher = mock(PersistentDispatcherMultipleConsumers.class);
+
+ AtomicLong clockTime = new AtomicLong();
+ Clock clock = mock(Clock.class);
+ when(clock.millis()).then(x -> clockTime.get());
+
+ @Cleanup
+ InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 100, clock,
+ true);
+
+ clockTime.set(10);
+
+ // Verify behavior for the less than, equal to, and greater than deliverAt times.
+ assertFalse(tracker.addMessage(1, 1, 9));
+ assertFalse(tracker.addMessage(4, 4, 10));
+ assertTrue(tracker.addMessage(1, 1, 11));
+
+ assertEquals(tracker.getNumberOfDelayedMessages(), 1);
+ assertFalse(tracker.hasMessageAvailable());
+ }
+
+ /**
+ * In this test, the deliverAt time is after now, but the deliverAt time is too early to run another tick, so the
+ * tickTimeMillis determines the delay.
+ */
+ public void testAddMessageWithDeliverAtTimeAfterNowBeforeTickTimeFrequencyWithStrict() throws Exception {
+ PersistentDispatcherMultipleConsumers dispatcher = mock(PersistentDispatcherMultipleConsumers.class);
+
+ AtomicLong clockTime = new AtomicLong();
+ Clock clock = mock(Clock.class);
+ when(clock.millis()).then(x -> clockTime.get());
+
+ // Use a short tick time to show that the timer task is run based on the deliverAt time in this scenario.
+ @Cleanup
+ InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer,
+ 1000, clock, true);
+
+ // Set clock time, then run tracker to inherit clock time as the last tick time.
+ clockTime.set(10000);
+ Timeout timeout = mock(Timeout.class);
+ when(timeout.isCancelled()).then(x -> false);
+ tracker.run(timeout);
+ verify(dispatcher, times(1)).readMoreEntries();
+
+ // Add a message that has a delivery time just after the previous run. It will get delivered based on the
+ // tick delay plus the last tick run.
+ assertTrue(tracker.addMessage(1, 1, 10001));
+
+ // Wait longer than the tick time plus the HashedWheelTimer's tick time to ensure that enough time has
+ // passed where it would have been triggered if the tick time was doing the triggering.
+ Thread.sleep(600);
+ verify(dispatcher, times(1)).readMoreEntries();
+
+ // Not wait for the message delivery to get triggered.
+ Awaitility.await().atMost(10, TimeUnit.SECONDS)
+ .untilAsserted(() -> verify(dispatcher).readMoreEntries());
+ }
+
+ /**
+ * In this test, the deliverAt time is after now, but before the (tickTimeMillis + now). Because there wasn't a
+ * recent tick run, the deliverAt time determines the delay.
+ */
+ public void testAddMessageWithDeliverAtTimeAfterNowAfterTickTimeFrequencyWithStrict() {
+ PersistentDispatcherMultipleConsumers dispatcher = mock(PersistentDispatcherMultipleConsumers.class);
+
+ AtomicLong clockTime = new AtomicLong();
+ Clock clock = mock(Clock.class);
+ when(clock.millis()).then(x -> clockTime.get());
+
+ // Use a large tick time to show that the message will get delivered earlier because there wasn't
+ // a previous tick run.
+ @Cleanup
+ InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer,
+ 100000, clock, true);
+
+ clockTime.set(500000);
+
+ assertTrue(tracker.addMessage(1, 1, 500005));
+
+ // Wait long enough for the runnable to run, but not longer than the tick time. The point is that the delivery
+ // should get scheduled early when the tick duration has passed since the last tick.
+ Awaitility.await().atMost(10, TimeUnit.SECONDS)
+ .untilAsserted(() -> verify(dispatcher).readMoreEntries());
+ }
+
+ /**
+ * In this test, the deliverAt time is after now plus tickTimeMillis, so the tickTimeMillis determines the delay.
+ */
+ public void testAddMessageWithDeliverAtTimeAfterFullTickTimeWithStrict() throws Exception {
+ PersistentDispatcherMultipleConsumers dispatcher = mock(PersistentDispatcherMultipleConsumers.class);
+
+ AtomicLong clockTime = new AtomicLong();
+ Clock clock = mock(Clock.class);
+ when(clock.millis()).then(x -> clockTime.get());
+
+ // Use a short tick time to show that the timer task is run based on the deliverAt time in this scenario.
+ @Cleanup
+ InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer,
+ 500, clock, true);
+
+ clockTime.set(0);
+
+ assertTrue(tracker.addMessage(1, 1, 2000));
+
+ // Wait longer than the tick time plus the HashedWheelTimer's tick time to ensure that enough time has
+ // passed where it would have been triggered if the tick time was doing the triggering.
+ Thread.sleep(1000);
+ verifyNoInteractions(dispatcher);
+
+ // Not wait for the message delivery to get triggered.
+ Awaitility.await().atMost(10, TimeUnit.SECONDS)
+ .untilAsserted(() -> verify(dispatcher).readMoreEntries());
+ }
}
diff --git a/site2/docs/concepts-messaging.md b/site2/docs/concepts-messaging.md
index 4c5cbad6937..af34e087aef 100644
--- a/site2/docs/concepts-messaging.md
+++ b/site2/docs/concepts-messaging.md
@@ -975,9 +975,19 @@ delayedDeliveryEnabled=true
# Control the ticking time for the retry of delayed message delivery,
# affecting the accuracy of the delivery time compared to the scheduled time.
+# Note that this time is used to configure the HashedWheelTimer's tick time for the
+# InMemoryDelayedDeliveryTrackerFactory (the default DelayedDeliverTrackerFactory).
# Default is 1 second.
delayedDeliveryTickTimeMillis=1000
+# When using the InMemoryDelayedDeliveryTrackerFactory (the default DelayedDeliverTrackerFactory), whether
+# the deliverAt time is strictly followed. When false (default), messages may be sent to consumers before the deliverAt
+# time by as much as the tickTimeMillis. This can reduce the overhead on the broker of maintaining the delayed index
+# for a potentially very short time period. When true, messages will not be sent to consumer until the deliverAt time
+# has passed, and they may be as late as the deliverAt time plus the tickTimeMillis for the topic plus the
+# delayedDeliveryTickTimeMillis.
+isDelayedDeliveryDeliverAtTimeStrict=false
+
```
### Producer