You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/06/16 05:40:30 UTC

[pulsar] branch master updated: [broker] Add config to allow deliverAt time to be strictly honored (#16068)

This is an automated email from the ASF dual-hosted git repository.

mmarshall pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new b8835bb0c34 [broker] Add config to allow deliverAt time to be strictly honored (#16068)
b8835bb0c34 is described below

commit b8835bb0c348c676c92959abe1f4a61b3751d40c
Author: Michael Marshall <mm...@apache.org>
AuthorDate: Thu Jun 16 00:40:24 2022 -0500

    [broker] Add config to allow deliverAt time to be strictly honored (#16068)
    
    * [broker] Add config to allow deliverAt time to be strictly honored
    
    * Fix checkstyle error (this is what happens why you change names last minute)
    
    * Improve documentation; add private final modifiers
    
    ### Motivation
    
    The current implementation for `InMemoryDelayedDeliveryTracker` allows messages to deliver early when their `deliverAt` time is within `tickTimeMillis` from now. This is an optimization that ensures messages deliver around the `deliverAt` time. However, some use cases require that messages do not deliver before the `deliverAt` time. (Note that the client api includes a `deliverAfter` method that implies messages won't deliver before some duration of time.)
    
    In order to support this alternative implementation, this PR adds a broker configuration named `isDelayedDeliveryDeliverAtTimeStrict`. When true, messages will only deliver when the `deliverAt` time is greater than or equal to `now`. Note that a tradeoff here is that messages will be later than the `deliverAt` time.
    
    There are two factors that will determine how late messages will get to consumers. The first is the topic's `DelayedDeliveryTickTimeMillis` and the second is the broker's `delayedDeliveryTickTimeMillis`. The first will determine how frequently a timer will be scheduled to deliver delayed messages. The second is used to determine the tick time of the `HashedWheelTimer`, and as a result, can compound with the topic's delay to make a message deliver even later.
    
    ### Modifications
    
    * Add broker config named `isDelayedDeliveryDeliverAtTimeStrict`. This config defaults to `false` to maintain the original behavior.
    * Update the `InMemoryDelayedDeliveryTracker#addMessage` method so that it will return false when `deliverAt <= getCutoffTime()` instead of just `deliverAt <= getCutoffTime()`.
    * Update documentation in several places.
    * Implement `InMemoryDelayedDeliveryTracker#getCutoffTime` method that returns the right cutoff time based on the value of `isDelayedDeliveryDeliverAtTimeStrict`. This is the core logical change.
    * Update `InMemoryDelayedDeliveryTracker#updateTimer` so that it will not schedule a tick to run sooner that the most recent tick run plus the `tickTimeMillis`. This will ensure the timer is not run too frequently. It is also backwards compatible since the existing feature will deliver any messages that were within now plus the `tickTimeMillis`.
    * Add new tests to cover the new configuration.
    
    ### Verifying this change
    
    New tests are added as part of this change.
    
    ### Does this pull request potentially affect one of the following parts:
    
    This is a new feature that maintains backwards compatibility.
---
 conf/broker.conf                                   |  10 ++
 .../apache/pulsar/broker/ServiceConfiguration.java |  13 +-
 .../delayed/InMemoryDelayedDeliveryTracker.java    |  83 ++++++++----
 .../InMemoryDelayedDeliveryTrackerFactory.java     |   6 +-
 .../delayed/InMemoryDeliveryTrackerTest.java       | 149 +++++++++++++++++++--
 site2/docs/concepts-messaging.md                   |  10 ++
 6 files changed, 232 insertions(+), 39 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 2cde2073678..7c7b2c71401 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -514,9 +514,19 @@ delayedDeliveryEnabled=true
 
 # Control the tick time for when retrying on delayed delivery,
 # affecting the accuracy of the delivery time compared to the scheduled time.
+# Note that this time is used to configure the HashedWheelTimer's tick time for the
+# InMemoryDelayedDeliveryTrackerFactory (the default DelayedDeliverTrackerFactory).
 # Default is 1 second.
 delayedDeliveryTickTimeMillis=1000
 
+# When using the InMemoryDelayedDeliveryTrackerFactory (the default DelayedDeliverTrackerFactory), whether
+# the deliverAt time is strictly followed. When false (default), messages may be sent to consumers before the deliverAt
+# time by as much as the tickTimeMillis. This can reduce the overhead on the broker of maintaining the delayed index
+# for a potentially very short time period. When true, messages will not be sent to consumer until the deliverAt time
+# has passed, and they may be as late as the deliverAt time plus the tickTimeMillis for the topic plus the
+# delayedDeliveryTickTimeMillis.
+isDelayedDeliveryDeliverAtTimeStrict=false
+
 # Whether to enable acknowledge of batch local index.
 acknowledgmentAtBatchIndexLevelEnabled=false
 
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index b6a8ae02cfa..2a0ee8356e1 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -332,9 +332,20 @@ public class ServiceConfiguration implements PulsarConfiguration {
             + ".InMemoryDelayedDeliveryTrackerFactory";
 
     @FieldContext(category = CATEGORY_SERVER, doc = "Control the tick time for when retrying on delayed delivery, "
-            + " affecting the accuracy of the delivery time compared to the scheduled time. Default is 1 second.")
+            + "affecting the accuracy of the delivery time compared to the scheduled time. Default is 1 second. "
+            + "Note that this time is used to configure the HashedWheelTimer's tick time for the "
+            + "InMemoryDelayedDeliveryTrackerFactory.")
     private long delayedDeliveryTickTimeMillis = 1000;
 
+    @FieldContext(category = CATEGORY_SERVER, doc = "When using the InMemoryDelayedDeliveryTrackerFactory (the default "
+            + "DelayedDeliverTrackerFactory), whether the deliverAt time is strictly followed. When false (default), "
+            + "messages may be sent to consumers before the deliverAt time by as much as the tickTimeMillis. This can "
+            + "reduce the overhead on the broker of maintaining the delayed index for a potentially very short time "
+            + "period. When true, messages will not be sent to consumer until the deliverAt time has passed, and they "
+            + "may be as late as the deliverAt time plus the tickTimeMillis for the topic plus the "
+            + "delayedDeliveryTickTimeMillis.")
+    private boolean isDelayedDeliveryDeliverAtTimeStrict = false;
+
     @FieldContext(category = CATEGORY_SERVER, doc = "Whether to enable the acknowledge of batch local index")
     private boolean acknowledgmentAtBatchIndexLevelEnabled = false;
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
index d0eec0098b7..92df563dad4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
@@ -46,39 +46,55 @@ public class InMemoryDelayedDeliveryTracker implements DelayedDeliveryTracker, T
     // Timestamp at which the timeout is currently set
     private long currentTimeoutTarget;
 
+    // Last time the TimerTask was triggered for this class
+    private long lastTickRun;
+
     private long tickTimeMillis;
 
     private final Clock clock;
 
-    InMemoryDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher, Timer timer, long tickTimeMillis) {
-        this(dispatcher, timer, tickTimeMillis, Clock.systemUTC());
+    private final boolean isDelayedDeliveryDeliverAtTimeStrict;
+
+    InMemoryDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher, Timer timer, long tickTimeMillis,
+                                   boolean isDelayedDeliveryDeliverAtTimeStrict) {
+        this(dispatcher, timer, tickTimeMillis, Clock.systemUTC(), isDelayedDeliveryDeliverAtTimeStrict);
     }
 
     InMemoryDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher, Timer timer,
-                                   long tickTimeMillis, Clock clock) {
+                                   long tickTimeMillis, Clock clock, boolean isDelayedDeliveryDeliverAtTimeStrict) {
         this.dispatcher = dispatcher;
         this.timer = timer;
         this.tickTimeMillis = tickTimeMillis;
         this.clock = clock;
+        this.isDelayedDeliveryDeliverAtTimeStrict = isDelayedDeliveryDeliverAtTimeStrict;
+    }
+
+    /**
+     * When {@link #isDelayedDeliveryDeliverAtTimeStrict} is false, we allow for early delivery by as much as the
+     * {@link #tickTimeMillis} because it is a slight optimization to let messages skip going back into the delay
+     * tracker for a brief amount of time when we're already trying to dispatch to the consumer.
+     *
+     * When {@link #isDelayedDeliveryDeliverAtTimeStrict} is true, we use the current time to determine when messages
+     * can be delivered. As a consequence, there are two delays that will affect delivery. The first is the
+     * {@link #tickTimeMillis} and the second is the {@link Timer}'s granularity.
+     *
+     * @return the cutoff time to determine whether a message is ready to deliver to the consumer
+     */
+    private long getCutoffTime() {
+        return isDelayedDeliveryDeliverAtTimeStrict ? clock.millis() : clock.millis() + tickTimeMillis;
     }
 
     @Override
-    public boolean addMessage(long ledgerId, long entryId, long deliveryAt) {
-        long now = clock.millis();
+    public boolean addMessage(long ledgerId, long entryId, long deliverAt) {
         if (log.isDebugEnabled()) {
             log.debug("[{}] Add message {}:{} -- Delivery in {} ms ", dispatcher.getName(), ledgerId, entryId,
-                    deliveryAt - now);
+                    deliverAt - clock.millis());
         }
-        if (deliveryAt < (now + tickTimeMillis)) {
-            // It's already about time to deliver this message. We add the buffer of
-            // `tickTimeMillis` because messages can be extracted from the tracker
-            // slightly before the expiration time. We don't want the messages to
-            // go back into the delay tracker (for a brief amount of time) when we're
-            // trying to dispatch to the consumer.
+        if (deliverAt <= getCutoffTime()) {
             return false;
         }
 
-        priorityQueue.add(deliveryAt, ledgerId, entryId);
+        priorityQueue.add(deliverAt, ledgerId, entryId);
         updateTimer();
         return true;
     }
@@ -88,11 +104,8 @@ public class InMemoryDelayedDeliveryTracker implements DelayedDeliveryTracker, T
      */
     @Override
     public boolean hasMessageAvailable() {
-        // Avoid the TimerTask run before reach the timeout.
-        long cutOffTime = clock.millis() + tickTimeMillis;
-        boolean hasMessageAvailable = !priorityQueue.isEmpty() && priorityQueue.peekN1() <= cutOffTime;
+        boolean hasMessageAvailable = !priorityQueue.isEmpty() && priorityQueue.peekN1() <= getCutoffTime();
         if (!hasMessageAvailable) {
-            // prevent the first delay message later than cutoffTime
             updateTimer();
         }
         return hasMessageAvailable;
@@ -105,11 +118,7 @@ public class InMemoryDelayedDeliveryTracker implements DelayedDeliveryTracker, T
     public Set<PositionImpl> getScheduledMessages(int maxMessages) {
         int n = maxMessages;
         Set<PositionImpl> positions = new TreeSet<>();
-        long now = clock.millis();
-        // Pick all the messages that will be ready within the tick time period.
-        // This is to avoid keeping rescheduling the timer for each message at
-        // very short delay
-        long cutoffTime = now + tickTimeMillis;
+        long cutoffTime = getCutoffTime();
 
         while (n > 0 && !priorityQueue.isEmpty()) {
             long timestamp = priorityQueue.peekN1();
@@ -150,6 +159,17 @@ public class InMemoryDelayedDeliveryTracker implements DelayedDeliveryTracker, T
         return priorityQueue.size();
     }
 
+    /**
+     * Update the scheduled timer task such that:
+     * 1. If there are no delayed messages, return and do not schedule a timer task.
+     * 2. If the next message in the queue has the same deliverAt time as the timer task, return and leave existing
+     *    timer task in place.
+     * 3. If the deliverAt time for the next delayed message has already passed (i.e. the delay is negative), return
+     *    without scheduling a timer task since the subscription is backlogged.
+     * 4. Else, schedule a timer task where the delay is the greater of these two: the next message's deliverAt time or
+     *    the last tick time plus the tickTimeMillis (to ensure we do not schedule the task more frequently than the
+     *    tickTimeMillis).
+     */
     private void updateTimer() {
         if (priorityQueue.isEmpty()) {
             if (timeout != null) {
@@ -170,10 +190,8 @@ public class InMemoryDelayedDeliveryTracker implements DelayedDeliveryTracker, T
             timeout.cancel();
         }
 
-        long delayMillis = timestamp - clock.millis();
-        if (log.isDebugEnabled()) {
-            log.debug("[{}] Start timer in {} millis", dispatcher.getName(), delayMillis);
-        }
+        long now = clock.millis();
+        long delayMillis = timestamp - now;
 
         if (delayMillis < 0) {
             // There are messages that are already ready to be delivered. If
@@ -185,8 +203,18 @@ public class InMemoryDelayedDeliveryTracker implements DelayedDeliveryTracker, T
             return;
         }
 
+        // Compute the earliest time that we schedule the timer to run.
+        long remainingTickDelayMillis = lastTickRun + tickTimeMillis - now;
+        long calculatedDelayMillis = Math.max(delayMillis, remainingTickDelayMillis);
+
+        if (log.isDebugEnabled()) {
+            log.debug("[{}] Start timer in {} millis", dispatcher.getName(), calculatedDelayMillis);
+        }
+
+        // Even though we may delay longer than this timestamp because of the tick delay, we still track the
+        // current timeout with reference to the next message's timestamp.
         currentTimeoutTarget = timestamp;
-        timeout = timer.newTimeout(this, delayMillis, TimeUnit.MILLISECONDS);
+        timeout = timer.newTimeout(this, calculatedDelayMillis, TimeUnit.MILLISECONDS);
     }
 
     @Override
@@ -199,6 +227,7 @@ public class InMemoryDelayedDeliveryTracker implements DelayedDeliveryTracker, T
         }
 
         synchronized (dispatcher) {
+            lastTickRun = clock.millis();
             currentTimeoutTarget = -1;
             this.timeout = null;
             dispatcher.readMoreEntries();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTrackerFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTrackerFactory.java
index b1a9b263369..5c04a6d53b2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTrackerFactory.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTrackerFactory.java
@@ -31,16 +31,20 @@ public class InMemoryDelayedDeliveryTrackerFactory implements DelayedDeliveryTra
 
     private long tickTimeMillis;
 
+    private boolean isDelayedDeliveryDeliverAtTimeStrict;
+
     @Override
     public void initialize(ServiceConfiguration config) {
         this.timer = new HashedWheelTimer(new DefaultThreadFactory("pulsar-delayed-delivery"),
                 config.getDelayedDeliveryTickTimeMillis(), TimeUnit.MILLISECONDS);
         this.tickTimeMillis = config.getDelayedDeliveryTickTimeMillis();
+        this.isDelayedDeliveryDeliverAtTimeStrict = config.isDelayedDeliveryDeliverAtTimeStrict();
     }
 
     @Override
     public DelayedDeliveryTracker newTracker(PersistentDispatcherMultipleConsumers dispatcher) {
-        return new InMemoryDelayedDeliveryTracker(dispatcher, timer, tickTimeMillis);
+        return new InMemoryDelayedDeliveryTracker(dispatcher, timer, tickTimeMillis,
+                isDelayedDeliveryDeliverAtTimeStrict);
     }
 
     @Override
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
index d7b304d8a0c..f44f61a67f9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
@@ -21,13 +21,16 @@ package org.apache.pulsar.broker.delayed;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.anyLong;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoInteractions;
 import static org.mockito.Mockito.verifyZeroInteractions;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
 
+import io.netty.util.HashedWheelTimer;
 import io.netty.util.Timeout;
 import io.netty.util.Timer;
 import io.netty.util.TimerTask;
@@ -40,27 +43,38 @@ import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
+import io.netty.util.concurrent.DefaultThreadFactory;
 import lombok.Cleanup;
 
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterClass;
 import org.testng.annotations.Test;
 
 @Test(groups = "broker")
 public class InMemoryDeliveryTrackerTest {
 
+    // Create a single shared timer for the test.
+    private final Timer timer = new HashedWheelTimer(new DefaultThreadFactory("pulsar-in-memory-delayed-delivery-test"),
+            500, TimeUnit.MILLISECONDS);
+
+    @AfterClass(alwaysRun = true)
+    public void cleanup() {
+        timer.stop();
+    }
+
     @Test
     public void test() throws Exception {
         PersistentDispatcherMultipleConsumers dispatcher = mock(PersistentDispatcherMultipleConsumers.class);
 
-        Timer timer = mock(Timer.class);
-
         AtomicLong clockTime = new AtomicLong();
         Clock clock = mock(Clock.class);
         when(clock.millis()).then(x -> clockTime.get());
 
         @Cleanup
-        InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock);
+        InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock,
+                false);
 
         assertFalse(tracker.hasMessageAvailable());
 
@@ -131,7 +145,8 @@ public class InMemoryDeliveryTrackerTest {
         });
 
         @Cleanup
-        InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock);
+        InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock,
+                false);
 
         assertTrue(tasks.isEmpty());
         assertTrue(tracker.addMessage(2, 2, 20));
@@ -160,29 +175,143 @@ public class InMemoryDeliveryTrackerTest {
 
     /**
      * Adding a message that is about to expire within the tick time should lead
-     * to a rejection from the tracker.
+     * to a rejection from the tracker when isDelayedDeliveryDeliverAtTimeStrict is false.
      */
     @Test
     public void testAddWithinTickTime() {
         PersistentDispatcherMultipleConsumers dispatcher = mock(PersistentDispatcherMultipleConsumers.class);
 
-        Timer timer = mock(Timer.class);
-
         AtomicLong clockTime = new AtomicLong();
         Clock clock = mock(Clock.class);
         when(clock.millis()).then(x -> clockTime.get());
 
         @Cleanup
-        InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 100, clock);
+        InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 100, clock,
+                false);
 
         clockTime.set(0);
 
         assertFalse(tracker.addMessage(1, 1, 10));
         assertFalse(tracker.addMessage(2, 2, 99));
-        assertTrue(tracker.addMessage(3, 3, 100));
-        assertTrue(tracker.addMessage(4, 4, 200));
+        assertFalse(tracker.addMessage(3, 3, 100));
+        assertTrue(tracker.addMessage(4, 4, 101));
+        assertTrue(tracker.addMessage(5, 5, 200));
 
         assertEquals(tracker.getNumberOfDelayedMessages(), 2);
     }
 
+    public void testAddMessageWithStrictDelay() {
+        PersistentDispatcherMultipleConsumers dispatcher = mock(PersistentDispatcherMultipleConsumers.class);
+
+        AtomicLong clockTime = new AtomicLong();
+        Clock clock = mock(Clock.class);
+        when(clock.millis()).then(x -> clockTime.get());
+
+        @Cleanup
+        InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 100, clock,
+                true);
+
+        clockTime.set(10);
+
+        // Verify behavior for the less than, equal to, and greater than deliverAt times.
+        assertFalse(tracker.addMessage(1, 1, 9));
+        assertFalse(tracker.addMessage(4, 4, 10));
+        assertTrue(tracker.addMessage(1, 1, 11));
+
+        assertEquals(tracker.getNumberOfDelayedMessages(), 1);
+        assertFalse(tracker.hasMessageAvailable());
+    }
+
+    /**
+     * In this test, the deliverAt time is after now, but the deliverAt time is too early to run another tick, so the
+     * tickTimeMillis determines the delay.
+     */
+    public void testAddMessageWithDeliverAtTimeAfterNowBeforeTickTimeFrequencyWithStrict() throws Exception {
+        PersistentDispatcherMultipleConsumers dispatcher = mock(PersistentDispatcherMultipleConsumers.class);
+
+        AtomicLong clockTime = new AtomicLong();
+        Clock clock = mock(Clock.class);
+        when(clock.millis()).then(x -> clockTime.get());
+
+        // Use a short tick time to show that the timer task is run based on the deliverAt time in this scenario.
+        @Cleanup
+        InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer,
+                1000, clock, true);
+
+        // Set clock time, then run tracker to inherit clock time as the last tick time.
+        clockTime.set(10000);
+        Timeout timeout = mock(Timeout.class);
+        when(timeout.isCancelled()).then(x -> false);
+        tracker.run(timeout);
+        verify(dispatcher, times(1)).readMoreEntries();
+
+        // Add a message that has a delivery time just after the previous run. It will get delivered based on the
+        // tick delay plus the last tick run.
+        assertTrue(tracker.addMessage(1, 1, 10001));
+
+        // Wait longer than the tick time plus the HashedWheelTimer's tick time to ensure that enough time has
+        // passed where it would have been triggered if the tick time was doing the triggering.
+        Thread.sleep(600);
+        verify(dispatcher, times(1)).readMoreEntries();
+
+        // Not wait for the message delivery to get triggered.
+        Awaitility.await().atMost(10, TimeUnit.SECONDS)
+                .untilAsserted(() -> verify(dispatcher).readMoreEntries());
+    }
+
+    /**
+     * In this test, the deliverAt time is after now, but before the (tickTimeMillis + now). Because there wasn't a
+     * recent tick run, the deliverAt time determines the delay.
+     */
+    public void testAddMessageWithDeliverAtTimeAfterNowAfterTickTimeFrequencyWithStrict() {
+        PersistentDispatcherMultipleConsumers dispatcher = mock(PersistentDispatcherMultipleConsumers.class);
+
+        AtomicLong clockTime = new AtomicLong();
+        Clock clock = mock(Clock.class);
+        when(clock.millis()).then(x -> clockTime.get());
+
+        // Use a large tick time to show that the message will get delivered earlier because there wasn't
+        // a previous tick run.
+        @Cleanup
+        InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer,
+                100000, clock, true);
+
+        clockTime.set(500000);
+
+        assertTrue(tracker.addMessage(1, 1, 500005));
+
+        // Wait long enough for the runnable to run, but not longer than the tick time. The point is that the delivery
+        // should get scheduled early when the tick duration has passed since the last tick.
+        Awaitility.await().atMost(10, TimeUnit.SECONDS)
+                .untilAsserted(() -> verify(dispatcher).readMoreEntries());
+    }
+
+    /**
+     * In this test, the deliverAt time is after now plus tickTimeMillis, so the tickTimeMillis determines the delay.
+     */
+    public void testAddMessageWithDeliverAtTimeAfterFullTickTimeWithStrict() throws Exception {
+        PersistentDispatcherMultipleConsumers dispatcher = mock(PersistentDispatcherMultipleConsumers.class);
+
+        AtomicLong clockTime = new AtomicLong();
+        Clock clock = mock(Clock.class);
+        when(clock.millis()).then(x -> clockTime.get());
+
+        // Use a short tick time to show that the timer task is run based on the deliverAt time in this scenario.
+        @Cleanup
+        InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer,
+                500, clock, true);
+
+        clockTime.set(0);
+
+        assertTrue(tracker.addMessage(1, 1, 2000));
+
+        // Wait longer than the tick time plus the HashedWheelTimer's tick time to ensure that enough time has
+        // passed where it would have been triggered if the tick time was doing the triggering.
+        Thread.sleep(1000);
+        verifyNoInteractions(dispatcher);
+
+        // Not wait for the message delivery to get triggered.
+        Awaitility.await().atMost(10, TimeUnit.SECONDS)
+                .untilAsserted(() -> verify(dispatcher).readMoreEntries());
+    }
 }
diff --git a/site2/docs/concepts-messaging.md b/site2/docs/concepts-messaging.md
index 4c5cbad6937..af34e087aef 100644
--- a/site2/docs/concepts-messaging.md
+++ b/site2/docs/concepts-messaging.md
@@ -975,9 +975,19 @@ delayedDeliveryEnabled=true
 
 # Control the ticking time for the retry of delayed message delivery,
 # affecting the accuracy of the delivery time compared to the scheduled time.
+# Note that this time is used to configure the HashedWheelTimer's tick time for the
+# InMemoryDelayedDeliveryTrackerFactory (the default DelayedDeliverTrackerFactory).
 # Default is 1 second.
 delayedDeliveryTickTimeMillis=1000
 
+# When using the InMemoryDelayedDeliveryTrackerFactory (the default DelayedDeliverTrackerFactory), whether
+# the deliverAt time is strictly followed. When false (default), messages may be sent to consumers before the deliverAt
+# time by as much as the tickTimeMillis. This can reduce the overhead on the broker of maintaining the delayed index
+# for a potentially very short time period. When true, messages will not be sent to consumer until the deliverAt time
+# has passed, and they may be as late as the deliverAt time plus the tickTimeMillis for the topic plus the
+# delayedDeliveryTickTimeMillis.
+isDelayedDeliveryDeliverAtTimeStrict=false
+
 ```
 
 ### Producer