You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/07/15 16:13:47 UTC

[pulsar] branch branch-2.10 updated (6a6b04d98ac -> 871fe368ec9)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a change to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


    from 6a6b04d98ac [Branch-2.10] Remove redundant pulsar-zookeeper-utils module (#16258)
     new 0a553e6a4be [broker] Add config to allow deliverAt time to be strictly honored (#16068)
     new 871fe368ec9 Avoid tracking the delays of all the message when we detect that they are fixed (#16609)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 conf/broker.conf                                   |  10 +
 .../apache/pulsar/broker/ServiceConfiguration.java |  13 +-
 .../broker/delayed/DelayedDeliveryTracker.java     |   5 +
 .../delayed/InMemoryDelayedDeliveryTracker.java    | 128 +++++++---
 .../InMemoryDelayedDeliveryTrackerFactory.java     |   6 +-
 .../broker/service/AbstractBaseDispatcher.java     |   3 +-
 .../PersistentDispatcherMultipleConsumers.java     |  17 +-
 .../delayed/InMemoryDeliveryTrackerTest.java       | 262 ++++++++++++++++++++-
 site2/docs/concepts-messaging.md                   |   2 +
 9 files changed, 402 insertions(+), 44 deletions(-)


[pulsar] 02/02: Avoid tracking the delays of all the message when we detect that they are fixed (#16609)

Posted by mm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 871fe368ec9b7fc5e915832d57353ca1cccb449a
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Fri Jul 15 09:01:41 2022 -0700

    Avoid tracking the delays of all the message when we detect that they are fixed (#16609)
    
    * Avoid tracking the delays of all the message when we detect that they are fixed
    
    * Use tick time to avoid clock skews across different producers
---
 .../broker/delayed/DelayedDeliveryTracker.java     |   5 +
 .../delayed/InMemoryDelayedDeliveryTracker.java    |  47 ++++++++-
 .../broker/service/AbstractBaseDispatcher.java     |   3 +-
 .../PersistentDispatcherMultipleConsumers.java     |  17 +++-
 .../delayed/InMemoryDeliveryTrackerTest.java       | 113 +++++++++++++++++++++
 5 files changed, 179 insertions(+), 6 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java
index 2fbd9a51d4a..35853d3599b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java
@@ -55,6 +55,11 @@ public interface DelayedDeliveryTracker extends AutoCloseable {
      */
     Set<PositionImpl> getScheduledMessages(int maxMessages);
 
+    /**
+     * Tells whether the dispatcher should pause any message deliveries, until the DelayedDeliveryTracker has
+     * more messages available.
+     */
+    boolean shouldPauseAllDeliveries();
 
     /**
      *  Reset tick time use zk policies cache.
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
index 92df563dad4..837d3d1872c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
@@ -55,6 +55,20 @@ public class InMemoryDelayedDeliveryTracker implements DelayedDeliveryTracker, T
 
     private final boolean isDelayedDeliveryDeliverAtTimeStrict;
 
+    // If we detect that all messages have fixed delay time, such that the delivery is
+    // always going to be in FIFO order, then we can avoid pulling all the messages in
+    // tracker. Instead, we use the lookahead for detection and pause the read from
+    // the cursor if the delays are fixed.
+    public static final long DETECT_FIXED_DELAY_LOOKAHEAD_MESSAGES = 50_000;
+
+    // This is the timestamp of the message with the highest delivery time
+    // If new added messages are lower than this, it means the delivery is requested
+    // to be out-of-order. It gets reset to 0, once the tracker is emptied.
+    private long highestDeliveryTimeTracked = 0;
+
+    // Track whether we have seen all messages with fixed delay so far.
+    private boolean messagesHaveFixedDelay = true;
+
     InMemoryDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher, Timer timer, long tickTimeMillis,
                                    boolean isDelayedDeliveryDeliverAtTimeStrict) {
         this(dispatcher, timer, tickTimeMillis, Clock.systemUTC(), isDelayedDeliveryDeliverAtTimeStrict);
@@ -86,16 +100,28 @@ public class InMemoryDelayedDeliveryTracker implements DelayedDeliveryTracker, T
 
     @Override
     public boolean addMessage(long ledgerId, long entryId, long deliverAt) {
+        if (deliverAt < 0 || deliverAt <= getCutoffTime()) {
+            messagesHaveFixedDelay = false;
+            return false;
+        }
+
         if (log.isDebugEnabled()) {
             log.debug("[{}] Add message {}:{} -- Delivery in {} ms ", dispatcher.getName(), ledgerId, entryId,
                     deliverAt - clock.millis());
         }
-        if (deliverAt <= getCutoffTime()) {
-            return false;
-        }
+
 
         priorityQueue.add(deliverAt, ledgerId, entryId);
         updateTimer();
+
+        // Check that new delivery time comes after the current highest, or at
+        // least within a single tick time interval of 1 second.
+        if (deliverAt < (highestDeliveryTimeTracked - tickTimeMillis)) {
+            messagesHaveFixedDelay = false;
+        }
+
+        highestDeliveryTimeTracked = Math.max(highestDeliveryTimeTracked, deliverAt);
+
         return true;
     }
 
@@ -137,6 +163,13 @@ public class InMemoryDelayedDeliveryTracker implements DelayedDeliveryTracker, T
         if (log.isDebugEnabled()) {
             log.debug("[{}] Get scheduled messages - found {}", dispatcher.getName(), positions.size());
         }
+
+        if (priorityQueue.isEmpty()) {
+            // Reset to initial state
+            highestDeliveryTimeTracked = 0;
+            messagesHaveFixedDelay = true;
+        }
+
         updateTimer();
         return positions;
     }
@@ -241,4 +274,12 @@ public class InMemoryDelayedDeliveryTracker implements DelayedDeliveryTracker, T
             timeout.cancel();
         }
     }
+
+    @Override
+    public boolean shouldPauseAllDeliveries() {
+        // Pause deliveries if we know all delays are fixed within the lookahead window
+        return messagesHaveFixedDelay
+                && priorityQueue.size() >= DETECT_FIXED_DELAY_LOOKAHEAD_MESSAGES
+                && !hasMessageAvailable();
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index c9ea4a56d6c..3b79c9e27de 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -195,8 +195,7 @@ public abstract class AbstractBaseDispatcher implements Dispatcher {
                 entry.release();
                 individualAcknowledgeMessageIfNeeded(pos, Collections.emptyMap());
                 continue;
-            } else if (msgMetadata.hasDeliverAtTime()
-                    && trackDelayedDelivery(entry.getLedgerId(), entry.getEntryId(), msgMetadata)) {
+            } else if (trackDelayedDelivery(entry.getLedgerId(), entry.getEntryId(), msgMetadata)) {
                 // The message is marked for delayed delivery. Ignore for now.
                 entries.set(i, null);
                 entry.release();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index f77a55338f5..33105ac5a3a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -228,6 +228,10 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
     }
 
     public synchronized void readMoreEntries() {
+        if (shouldPauseDeliveryForDelayTracker()) {
+            return;
+        }
+
         // totalAvailablePermits may be updated by other threads
         int firstAvailableConsumerPermits = getFirstAvailableConsumerPermits();
         int currentTotalAvailablePermits = Math.max(totalAvailablePermits, firstAvailableConsumerPermits);
@@ -866,13 +870,20 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
 
         synchronized (this) {
             if (!delayedDeliveryTracker.isPresent()) {
+                if (!msgMetadata.hasDeliverAtTime()) {
+                    // No need to initialize the tracker here
+                    return false;
+                }
+
                 // Initialize the tracker the first time we need to use it
                 delayedDeliveryTracker = Optional
                         .of(topic.getBrokerService().getDelayedDeliveryTrackerFactory().newTracker(this));
             }
 
             delayedDeliveryTracker.get().resetTickTime(topic.getDelayedDeliveryTickTimeMillis());
-            return delayedDeliveryTracker.get().addMessage(ledgerId, entryId, msgMetadata.getDeliverAtTime());
+
+            long deliverAtTime = msgMetadata.hasDeliverAtTime() ? msgMetadata.getDeliverAtTime() : -1L;
+            return delayedDeliveryTracker.get().addMessage(ledgerId, entryId, deliverAtTime);
         }
     }
 
@@ -887,6 +898,10 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
         }
     }
 
+    protected synchronized boolean shouldPauseDeliveryForDelayTracker() {
+        return delayedDeliveryTracker.isPresent() && delayedDeliveryTracker.get().shouldPauseAllDeliveries();
+    }
+
     @Override
     public synchronized long getNumberOfDelayedMessages() {
         return delayedDeliveryTracker.map(DelayedDeliveryTracker::getNumberOfDelayedMessages).orElse(0L);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
index f44f61a67f9..db2db6cc1db 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
@@ -314,4 +314,117 @@ public class InMemoryDeliveryTrackerTest {
         Awaitility.await().atMost(10, TimeUnit.SECONDS)
                 .untilAsserted(() -> verify(dispatcher).readMoreEntries());
     }
+
+    @Test
+    public void testWithFixedDelays() throws Exception {
+        PersistentDispatcherMultipleConsumers dispatcher = mock(PersistentDispatcherMultipleConsumers.class);
+
+        AtomicLong clockTime = new AtomicLong();
+        Clock clock = mock(Clock.class);
+        when(clock.millis()).then(x -> clockTime.get());
+
+        @Cleanup
+        InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock,
+                true);
+
+        assertFalse(tracker.hasMessageAvailable());
+
+        assertTrue(tracker.addMessage(1, 1, 10));
+        assertTrue(tracker.addMessage(2, 2, 20));
+        assertTrue(tracker.addMessage(3, 3, 30));
+        assertTrue(tracker.addMessage(4, 4, 40));
+        assertTrue(tracker.addMessage(5, 5, 50));
+
+        assertFalse(tracker.hasMessageAvailable());
+        assertEquals(tracker.getNumberOfDelayedMessages(), 5);
+        assertFalse(tracker.shouldPauseAllDeliveries());
+
+        for (int i = 6; i <= InMemoryDelayedDeliveryTracker.DETECT_FIXED_DELAY_LOOKAHEAD_MESSAGES; i++) {
+            assertTrue(tracker.addMessage(i, i, i * 10));
+        }
+
+        assertTrue(tracker.shouldPauseAllDeliveries());
+
+        clockTime.set(InMemoryDelayedDeliveryTracker.DETECT_FIXED_DELAY_LOOKAHEAD_MESSAGES * 10);
+
+        tracker.getScheduledMessages(100);
+        assertFalse(tracker.shouldPauseAllDeliveries());
+
+        // Empty the tracker
+        int removed = 0;
+        do {
+            removed = tracker.getScheduledMessages(100).size();
+        } while (removed > 0);
+
+        assertFalse(tracker.shouldPauseAllDeliveries());
+    }
+
+    @Test
+    public void testWithMixedDelays() throws Exception {
+        PersistentDispatcherMultipleConsumers dispatcher = mock(PersistentDispatcherMultipleConsumers.class);
+
+        AtomicLong clockTime = new AtomicLong();
+        Clock clock = mock(Clock.class);
+        when(clock.millis()).then(x -> clockTime.get());
+
+        @Cleanup
+        InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock,
+                true);
+
+        assertFalse(tracker.hasMessageAvailable());
+
+        assertTrue(tracker.addMessage(1, 1, 10));
+        assertTrue(tracker.addMessage(2, 2, 20));
+        assertTrue(tracker.addMessage(3, 3, 30));
+        assertTrue(tracker.addMessage(4, 4, 40));
+        assertTrue(tracker.addMessage(5, 5, 50));
+
+        assertFalse(tracker.shouldPauseAllDeliveries());
+
+        for (int i = 6; i <= InMemoryDelayedDeliveryTracker.DETECT_FIXED_DELAY_LOOKAHEAD_MESSAGES; i++) {
+            assertTrue(tracker.addMessage(i, i, i * 10));
+        }
+
+        assertTrue(tracker.shouldPauseAllDeliveries());
+
+        // Add message with earlier delivery time
+        assertTrue(tracker.addMessage(5, 5, 5));
+
+        assertFalse(tracker.shouldPauseAllDeliveries());
+    }
+
+    @Test
+    public void testWithNoDelays() throws Exception {
+        PersistentDispatcherMultipleConsumers dispatcher = mock(PersistentDispatcherMultipleConsumers.class);
+
+        AtomicLong clockTime = new AtomicLong();
+        Clock clock = mock(Clock.class);
+        when(clock.millis()).then(x -> clockTime.get());
+
+        @Cleanup
+        InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock,
+                true);
+
+        assertFalse(tracker.hasMessageAvailable());
+
+        assertTrue(tracker.addMessage(1, 1, 10));
+        assertTrue(tracker.addMessage(2, 2, 20));
+        assertTrue(tracker.addMessage(3, 3, 30));
+        assertTrue(tracker.addMessage(4, 4, 40));
+        assertTrue(tracker.addMessage(5, 5, 50));
+
+        assertFalse(tracker.shouldPauseAllDeliveries());
+
+        for (int i = 6; i <= InMemoryDelayedDeliveryTracker.DETECT_FIXED_DELAY_LOOKAHEAD_MESSAGES; i++) {
+            assertTrue(tracker.addMessage(i, i, i * 10));
+        }
+
+        assertTrue(tracker.shouldPauseAllDeliveries());
+
+        // Add message with no-delay
+        assertFalse(tracker.addMessage(5, 5, -1L));
+
+        assertFalse(tracker.shouldPauseAllDeliveries());
+    }
+
 }


[pulsar] 01/02: [broker] Add config to allow deliverAt time to be strictly honored (#16068)

Posted by mm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 0a553e6a4be3c874f63037490353f745ab1452a6
Author: Michael Marshall <mm...@apache.org>
AuthorDate: Thu Jun 16 00:40:24 2022 -0500

    [broker] Add config to allow deliverAt time to be strictly honored (#16068)
    
    * [broker] Add config to allow deliverAt time to be strictly honored
    
    * Fix checkstyle error (this is what happens why you change names last minute)
    
    * Improve documentation; add private final modifiers
    
    The current implementation for `InMemoryDelayedDeliveryTracker` allows messages to deliver early when their `deliverAt` time is within `tickTimeMillis` from now. This is an optimization that ensures messages deliver around the `deliverAt` time. However, some use cases require that messages do not deliver before the `deliverAt` time. (Note that the client api includes a `deliverAfter` method that implies messages won't deliver before some duration of time.)
    
    In order to support this alternative implementation, this PR adds a broker configuration named `isDelayedDeliveryDeliverAtTimeStrict`. When true, messages will only deliver when the `deliverAt` time is greater than or equal to `now`. Note that a tradeoff here is that messages will be later than the `deliverAt` time.
    
    There are two factors that will determine how late messages will get to consumers. The first is the topic's `DelayedDeliveryTickTimeMillis` and the second is the broker's `delayedDeliveryTickTimeMillis`. The first will determine how frequently a timer will be scheduled to deliver delayed messages. The second is used to determine the tick time of the `HashedWheelTimer`, and as a result, can compound with the topic's delay to make a message deliver even later.
    
    * Add broker config named `isDelayedDeliveryDeliverAtTimeStrict`. This config defaults to `false` to maintain the original behavior.
    * Update the `InMemoryDelayedDeliveryTracker#addMessage` method so that it will return false when `deliverAt <= getCutoffTime()` instead of just `deliverAt <= getCutoffTime()`.
    * Update documentation in several places.
    * Implement `InMemoryDelayedDeliveryTracker#getCutoffTime` method that returns the right cutoff time based on the value of `isDelayedDeliveryDeliverAtTimeStrict`. This is the core logical change.
    * Update `InMemoryDelayedDeliveryTracker#updateTimer` so that it will not schedule a tick to run sooner that the most recent tick run plus the `tickTimeMillis`. This will ensure the timer is not run too frequently. It is also backwards compatible since the existing feature will deliver any messages that were within now plus the `tickTimeMillis`.
    * Add new tests to cover the new configuration.
    
    New tests are added as part of this change.
    
    This is a new feature that maintains backwards compatibility.
---
 conf/broker.conf                                   |  10 ++
 .../apache/pulsar/broker/ServiceConfiguration.java |  13 +-
 .../delayed/InMemoryDelayedDeliveryTracker.java    |  83 ++++++++----
 .../InMemoryDelayedDeliveryTrackerFactory.java     |   6 +-
 .../delayed/InMemoryDeliveryTrackerTest.java       | 149 +++++++++++++++++++--
 site2/docs/concepts-messaging.md                   |   2 +
 6 files changed, 224 insertions(+), 39 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index d97daf3c9f7..d2dc7da75af 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -514,9 +514,19 @@ delayedDeliveryEnabled=true
 
 # Control the tick time for when retrying on delayed delivery,
 # affecting the accuracy of the delivery time compared to the scheduled time.
+# Note that this time is used to configure the HashedWheelTimer's tick time for the
+# InMemoryDelayedDeliveryTrackerFactory (the default DelayedDeliverTrackerFactory).
 # Default is 1 second.
 delayedDeliveryTickTimeMillis=1000
 
+# When using the InMemoryDelayedDeliveryTrackerFactory (the default DelayedDeliverTrackerFactory), whether
+# the deliverAt time is strictly followed. When false (default), messages may be sent to consumers before the deliverAt
+# time by as much as the tickTimeMillis. This can reduce the overhead on the broker of maintaining the delayed index
+# for a potentially very short time period. When true, messages will not be sent to consumer until the deliverAt time
+# has passed, and they may be as late as the deliverAt time plus the tickTimeMillis for the topic plus the
+# delayedDeliveryTickTimeMillis.
+isDelayedDeliveryDeliverAtTimeStrict=false
+
 # Whether to enable acknowledge of batch local index.
 acknowledgmentAtBatchIndexLevelEnabled=false
 
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 2eae2bac7d9..a94aaea2c92 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -311,9 +311,20 @@ public class ServiceConfiguration implements PulsarConfiguration {
             + ".InMemoryDelayedDeliveryTrackerFactory";
 
     @FieldContext(category = CATEGORY_SERVER, doc = "Control the tick time for when retrying on delayed delivery, "
-            + " affecting the accuracy of the delivery time compared to the scheduled time. Default is 1 second.")
+            + "affecting the accuracy of the delivery time compared to the scheduled time. Default is 1 second. "
+            + "Note that this time is used to configure the HashedWheelTimer's tick time for the "
+            + "InMemoryDelayedDeliveryTrackerFactory.")
     private long delayedDeliveryTickTimeMillis = 1000;
 
+    @FieldContext(category = CATEGORY_SERVER, doc = "When using the InMemoryDelayedDeliveryTrackerFactory (the default "
+            + "DelayedDeliverTrackerFactory), whether the deliverAt time is strictly followed. When false (default), "
+            + "messages may be sent to consumers before the deliverAt time by as much as the tickTimeMillis. This can "
+            + "reduce the overhead on the broker of maintaining the delayed index for a potentially very short time "
+            + "period. When true, messages will not be sent to consumer until the deliverAt time has passed, and they "
+            + "may be as late as the deliverAt time plus the tickTimeMillis for the topic plus the "
+            + "delayedDeliveryTickTimeMillis.")
+    private boolean isDelayedDeliveryDeliverAtTimeStrict = false;
+
     @FieldContext(category = CATEGORY_SERVER, doc = "Whether to enable the acknowledge of batch local index")
     private boolean acknowledgmentAtBatchIndexLevelEnabled = false;
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
index d0eec0098b7..92df563dad4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
@@ -46,39 +46,55 @@ public class InMemoryDelayedDeliveryTracker implements DelayedDeliveryTracker, T
     // Timestamp at which the timeout is currently set
     private long currentTimeoutTarget;
 
+    // Last time the TimerTask was triggered for this class
+    private long lastTickRun;
+
     private long tickTimeMillis;
 
     private final Clock clock;
 
-    InMemoryDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher, Timer timer, long tickTimeMillis) {
-        this(dispatcher, timer, tickTimeMillis, Clock.systemUTC());
+    private final boolean isDelayedDeliveryDeliverAtTimeStrict;
+
+    InMemoryDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher, Timer timer, long tickTimeMillis,
+                                   boolean isDelayedDeliveryDeliverAtTimeStrict) {
+        this(dispatcher, timer, tickTimeMillis, Clock.systemUTC(), isDelayedDeliveryDeliverAtTimeStrict);
     }
 
     InMemoryDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher, Timer timer,
-                                   long tickTimeMillis, Clock clock) {
+                                   long tickTimeMillis, Clock clock, boolean isDelayedDeliveryDeliverAtTimeStrict) {
         this.dispatcher = dispatcher;
         this.timer = timer;
         this.tickTimeMillis = tickTimeMillis;
         this.clock = clock;
+        this.isDelayedDeliveryDeliverAtTimeStrict = isDelayedDeliveryDeliverAtTimeStrict;
+    }
+
+    /**
+     * When {@link #isDelayedDeliveryDeliverAtTimeStrict} is false, we allow for early delivery by as much as the
+     * {@link #tickTimeMillis} because it is a slight optimization to let messages skip going back into the delay
+     * tracker for a brief amount of time when we're already trying to dispatch to the consumer.
+     *
+     * When {@link #isDelayedDeliveryDeliverAtTimeStrict} is true, we use the current time to determine when messages
+     * can be delivered. As a consequence, there are two delays that will affect delivery. The first is the
+     * {@link #tickTimeMillis} and the second is the {@link Timer}'s granularity.
+     *
+     * @return the cutoff time to determine whether a message is ready to deliver to the consumer
+     */
+    private long getCutoffTime() {
+        return isDelayedDeliveryDeliverAtTimeStrict ? clock.millis() : clock.millis() + tickTimeMillis;
     }
 
     @Override
-    public boolean addMessage(long ledgerId, long entryId, long deliveryAt) {
-        long now = clock.millis();
+    public boolean addMessage(long ledgerId, long entryId, long deliverAt) {
         if (log.isDebugEnabled()) {
             log.debug("[{}] Add message {}:{} -- Delivery in {} ms ", dispatcher.getName(), ledgerId, entryId,
-                    deliveryAt - now);
+                    deliverAt - clock.millis());
         }
-        if (deliveryAt < (now + tickTimeMillis)) {
-            // It's already about time to deliver this message. We add the buffer of
-            // `tickTimeMillis` because messages can be extracted from the tracker
-            // slightly before the expiration time. We don't want the messages to
-            // go back into the delay tracker (for a brief amount of time) when we're
-            // trying to dispatch to the consumer.
+        if (deliverAt <= getCutoffTime()) {
             return false;
         }
 
-        priorityQueue.add(deliveryAt, ledgerId, entryId);
+        priorityQueue.add(deliverAt, ledgerId, entryId);
         updateTimer();
         return true;
     }
@@ -88,11 +104,8 @@ public class InMemoryDelayedDeliveryTracker implements DelayedDeliveryTracker, T
      */
     @Override
     public boolean hasMessageAvailable() {
-        // Avoid the TimerTask run before reach the timeout.
-        long cutOffTime = clock.millis() + tickTimeMillis;
-        boolean hasMessageAvailable = !priorityQueue.isEmpty() && priorityQueue.peekN1() <= cutOffTime;
+        boolean hasMessageAvailable = !priorityQueue.isEmpty() && priorityQueue.peekN1() <= getCutoffTime();
         if (!hasMessageAvailable) {
-            // prevent the first delay message later than cutoffTime
             updateTimer();
         }
         return hasMessageAvailable;
@@ -105,11 +118,7 @@ public class InMemoryDelayedDeliveryTracker implements DelayedDeliveryTracker, T
     public Set<PositionImpl> getScheduledMessages(int maxMessages) {
         int n = maxMessages;
         Set<PositionImpl> positions = new TreeSet<>();
-        long now = clock.millis();
-        // Pick all the messages that will be ready within the tick time period.
-        // This is to avoid keeping rescheduling the timer for each message at
-        // very short delay
-        long cutoffTime = now + tickTimeMillis;
+        long cutoffTime = getCutoffTime();
 
         while (n > 0 && !priorityQueue.isEmpty()) {
             long timestamp = priorityQueue.peekN1();
@@ -150,6 +159,17 @@ public class InMemoryDelayedDeliveryTracker implements DelayedDeliveryTracker, T
         return priorityQueue.size();
     }
 
+    /**
+     * Update the scheduled timer task such that:
+     * 1. If there are no delayed messages, return and do not schedule a timer task.
+     * 2. If the next message in the queue has the same deliverAt time as the timer task, return and leave existing
+     *    timer task in place.
+     * 3. If the deliverAt time for the next delayed message has already passed (i.e. the delay is negative), return
+     *    without scheduling a timer task since the subscription is backlogged.
+     * 4. Else, schedule a timer task where the delay is the greater of these two: the next message's deliverAt time or
+     *    the last tick time plus the tickTimeMillis (to ensure we do not schedule the task more frequently than the
+     *    tickTimeMillis).
+     */
     private void updateTimer() {
         if (priorityQueue.isEmpty()) {
             if (timeout != null) {
@@ -170,10 +190,8 @@ public class InMemoryDelayedDeliveryTracker implements DelayedDeliveryTracker, T
             timeout.cancel();
         }
 
-        long delayMillis = timestamp - clock.millis();
-        if (log.isDebugEnabled()) {
-            log.debug("[{}] Start timer in {} millis", dispatcher.getName(), delayMillis);
-        }
+        long now = clock.millis();
+        long delayMillis = timestamp - now;
 
         if (delayMillis < 0) {
             // There are messages that are already ready to be delivered. If
@@ -185,8 +203,18 @@ public class InMemoryDelayedDeliveryTracker implements DelayedDeliveryTracker, T
             return;
         }
 
+        // Compute the earliest time that we schedule the timer to run.
+        long remainingTickDelayMillis = lastTickRun + tickTimeMillis - now;
+        long calculatedDelayMillis = Math.max(delayMillis, remainingTickDelayMillis);
+
+        if (log.isDebugEnabled()) {
+            log.debug("[{}] Start timer in {} millis", dispatcher.getName(), calculatedDelayMillis);
+        }
+
+        // Even though we may delay longer than this timestamp because of the tick delay, we still track the
+        // current timeout with reference to the next message's timestamp.
         currentTimeoutTarget = timestamp;
-        timeout = timer.newTimeout(this, delayMillis, TimeUnit.MILLISECONDS);
+        timeout = timer.newTimeout(this, calculatedDelayMillis, TimeUnit.MILLISECONDS);
     }
 
     @Override
@@ -199,6 +227,7 @@ public class InMemoryDelayedDeliveryTracker implements DelayedDeliveryTracker, T
         }
 
         synchronized (dispatcher) {
+            lastTickRun = clock.millis();
             currentTimeoutTarget = -1;
             this.timeout = null;
             dispatcher.readMoreEntries();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTrackerFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTrackerFactory.java
index b1a9b263369..5c04a6d53b2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTrackerFactory.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTrackerFactory.java
@@ -31,16 +31,20 @@ public class InMemoryDelayedDeliveryTrackerFactory implements DelayedDeliveryTra
 
     private long tickTimeMillis;
 
+    private boolean isDelayedDeliveryDeliverAtTimeStrict;
+
     @Override
     public void initialize(ServiceConfiguration config) {
         this.timer = new HashedWheelTimer(new DefaultThreadFactory("pulsar-delayed-delivery"),
                 config.getDelayedDeliveryTickTimeMillis(), TimeUnit.MILLISECONDS);
         this.tickTimeMillis = config.getDelayedDeliveryTickTimeMillis();
+        this.isDelayedDeliveryDeliverAtTimeStrict = config.isDelayedDeliveryDeliverAtTimeStrict();
     }
 
     @Override
     public DelayedDeliveryTracker newTracker(PersistentDispatcherMultipleConsumers dispatcher) {
-        return new InMemoryDelayedDeliveryTracker(dispatcher, timer, tickTimeMillis);
+        return new InMemoryDelayedDeliveryTracker(dispatcher, timer, tickTimeMillis,
+                isDelayedDeliveryDeliverAtTimeStrict);
     }
 
     @Override
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
index d7b304d8a0c..f44f61a67f9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
@@ -21,13 +21,16 @@ package org.apache.pulsar.broker.delayed;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.anyLong;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoInteractions;
 import static org.mockito.Mockito.verifyZeroInteractions;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
 
+import io.netty.util.HashedWheelTimer;
 import io.netty.util.Timeout;
 import io.netty.util.Timer;
 import io.netty.util.TimerTask;
@@ -40,27 +43,38 @@ import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
+import io.netty.util.concurrent.DefaultThreadFactory;
 import lombok.Cleanup;
 
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterClass;
 import org.testng.annotations.Test;
 
 @Test(groups = "broker")
 public class InMemoryDeliveryTrackerTest {
 
+    // Create a single shared timer for the test.
+    private final Timer timer = new HashedWheelTimer(new DefaultThreadFactory("pulsar-in-memory-delayed-delivery-test"),
+            500, TimeUnit.MILLISECONDS);
+
+    @AfterClass(alwaysRun = true)
+    public void cleanup() {
+        timer.stop();
+    }
+
     @Test
     public void test() throws Exception {
         PersistentDispatcherMultipleConsumers dispatcher = mock(PersistentDispatcherMultipleConsumers.class);
 
-        Timer timer = mock(Timer.class);
-
         AtomicLong clockTime = new AtomicLong();
         Clock clock = mock(Clock.class);
         when(clock.millis()).then(x -> clockTime.get());
 
         @Cleanup
-        InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock);
+        InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock,
+                false);
 
         assertFalse(tracker.hasMessageAvailable());
 
@@ -131,7 +145,8 @@ public class InMemoryDeliveryTrackerTest {
         });
 
         @Cleanup
-        InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock);
+        InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock,
+                false);
 
         assertTrue(tasks.isEmpty());
         assertTrue(tracker.addMessage(2, 2, 20));
@@ -160,29 +175,143 @@ public class InMemoryDeliveryTrackerTest {
 
     /**
      * Adding a message that is about to expire within the tick time should lead
-     * to a rejection from the tracker.
+     * to a rejection from the tracker when isDelayedDeliveryDeliverAtTimeStrict is false.
      */
     @Test
     public void testAddWithinTickTime() {
         PersistentDispatcherMultipleConsumers dispatcher = mock(PersistentDispatcherMultipleConsumers.class);
 
-        Timer timer = mock(Timer.class);
-
         AtomicLong clockTime = new AtomicLong();
         Clock clock = mock(Clock.class);
         when(clock.millis()).then(x -> clockTime.get());
 
         @Cleanup
-        InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 100, clock);
+        InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 100, clock,
+                false);
 
         clockTime.set(0);
 
         assertFalse(tracker.addMessage(1, 1, 10));
         assertFalse(tracker.addMessage(2, 2, 99));
-        assertTrue(tracker.addMessage(3, 3, 100));
-        assertTrue(tracker.addMessage(4, 4, 200));
+        assertFalse(tracker.addMessage(3, 3, 100));
+        assertTrue(tracker.addMessage(4, 4, 101));
+        assertTrue(tracker.addMessage(5, 5, 200));
 
         assertEquals(tracker.getNumberOfDelayedMessages(), 2);
     }
 
+    public void testAddMessageWithStrictDelay() {
+        PersistentDispatcherMultipleConsumers dispatcher = mock(PersistentDispatcherMultipleConsumers.class);
+
+        AtomicLong clockTime = new AtomicLong();
+        Clock clock = mock(Clock.class);
+        when(clock.millis()).then(x -> clockTime.get());
+
+        @Cleanup
+        InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 100, clock,
+                true);
+
+        clockTime.set(10);
+
+        // Verify behavior for the less than, equal to, and greater than deliverAt times.
+        assertFalse(tracker.addMessage(1, 1, 9));
+        assertFalse(tracker.addMessage(4, 4, 10));
+        assertTrue(tracker.addMessage(1, 1, 11));
+
+        assertEquals(tracker.getNumberOfDelayedMessages(), 1);
+        assertFalse(tracker.hasMessageAvailable());
+    }
+
+    /**
+     * In this test, the deliverAt time is after now, but the deliverAt time is too early to run another tick, so the
+     * tickTimeMillis determines the delay.
+     */
+    public void testAddMessageWithDeliverAtTimeAfterNowBeforeTickTimeFrequencyWithStrict() throws Exception {
+        PersistentDispatcherMultipleConsumers dispatcher = mock(PersistentDispatcherMultipleConsumers.class);
+
+        AtomicLong clockTime = new AtomicLong();
+        Clock clock = mock(Clock.class);
+        when(clock.millis()).then(x -> clockTime.get());
+
+        // Use a short tick time to show that the timer task is run based on the deliverAt time in this scenario.
+        @Cleanup
+        InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer,
+                1000, clock, true);
+
+        // Set clock time, then run tracker to inherit clock time as the last tick time.
+        clockTime.set(10000);
+        Timeout timeout = mock(Timeout.class);
+        when(timeout.isCancelled()).then(x -> false);
+        tracker.run(timeout);
+        verify(dispatcher, times(1)).readMoreEntries();
+
+        // Add a message that has a delivery time just after the previous run. It will get delivered based on the
+        // tick delay plus the last tick run.
+        assertTrue(tracker.addMessage(1, 1, 10001));
+
+        // Wait longer than the tick time plus the HashedWheelTimer's tick time to ensure that enough time has
+        // passed where it would have been triggered if the tick time was doing the triggering.
+        Thread.sleep(600);
+        verify(dispatcher, times(1)).readMoreEntries();
+
+        // Not wait for the message delivery to get triggered.
+        Awaitility.await().atMost(10, TimeUnit.SECONDS)
+                .untilAsserted(() -> verify(dispatcher).readMoreEntries());
+    }
+
+    /**
+     * In this test, the deliverAt time is after now, but before the (tickTimeMillis + now). Because there wasn't a
+     * recent tick run, the deliverAt time determines the delay.
+     */
+    public void testAddMessageWithDeliverAtTimeAfterNowAfterTickTimeFrequencyWithStrict() {
+        PersistentDispatcherMultipleConsumers dispatcher = mock(PersistentDispatcherMultipleConsumers.class);
+
+        AtomicLong clockTime = new AtomicLong();
+        Clock clock = mock(Clock.class);
+        when(clock.millis()).then(x -> clockTime.get());
+
+        // Use a large tick time to show that the message will get delivered earlier because there wasn't
+        // a previous tick run.
+        @Cleanup
+        InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer,
+                100000, clock, true);
+
+        clockTime.set(500000);
+
+        assertTrue(tracker.addMessage(1, 1, 500005));
+
+        // Wait long enough for the runnable to run, but not longer than the tick time. The point is that the delivery
+        // should get scheduled early when the tick duration has passed since the last tick.
+        Awaitility.await().atMost(10, TimeUnit.SECONDS)
+                .untilAsserted(() -> verify(dispatcher).readMoreEntries());
+    }
+
+    /**
+     * In this test, the deliverAt time is after now plus tickTimeMillis, so the tickTimeMillis determines the delay.
+     */
+    public void testAddMessageWithDeliverAtTimeAfterFullTickTimeWithStrict() throws Exception {
+        PersistentDispatcherMultipleConsumers dispatcher = mock(PersistentDispatcherMultipleConsumers.class);
+
+        AtomicLong clockTime = new AtomicLong();
+        Clock clock = mock(Clock.class);
+        when(clock.millis()).then(x -> clockTime.get());
+
+        // Use a short tick time to show that the timer task is run based on the deliverAt time in this scenario.
+        @Cleanup
+        InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer,
+                500, clock, true);
+
+        clockTime.set(0);
+
+        assertTrue(tracker.addMessage(1, 1, 2000));
+
+        // Wait longer than the tick time plus the HashedWheelTimer's tick time to ensure that enough time has
+        // passed where it would have been triggered if the tick time was doing the triggering.
+        Thread.sleep(1000);
+        verifyNoInteractions(dispatcher);
+
+        // Not wait for the message delivery to get triggered.
+        Awaitility.await().atMost(10, TimeUnit.SECONDS)
+                .untilAsserted(() -> verify(dispatcher).readMoreEntries());
+    }
 }
diff --git a/site2/docs/concepts-messaging.md b/site2/docs/concepts-messaging.md
index 595000f3c30..eee7a67b8a8 100644
--- a/site2/docs/concepts-messaging.md
+++ b/site2/docs/concepts-messaging.md
@@ -790,6 +790,8 @@ delayedDeliveryEnabled=true
 
 # Control the ticking time for the retry of delayed message delivery,
 # affecting the accuracy of the delivery time compared to the scheduled time.
+# Note that this time is used to configure the HashedWheelTimer's tick time for the
+# InMemoryDelayedDeliveryTrackerFactory (the default DelayedDeliverTrackerFactory).
 # Default is 1 second.
 delayedDeliveryTickTimeMillis=1000
 ```