You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2024/01/26 13:04:56 UTC

(pulsar) branch master updated: [improve][broker] PIP-315: Configurable max delay limit for delayed delivery (#21798)

This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d37d31f6441 [improve][broker] PIP-315: Configurable max delay limit for delayed delivery (#21798)
d37d31f6441 is described below

commit d37d31f64413b896a051e6d9fa0acf07201e02a3
Author: Kevin Lu <ke...@gmail.com>
AuthorDate: Fri Jan 26 05:04:50 2024 -0800

    [improve][broker] PIP-315: Configurable max delay limit for delayed delivery (#21798)
    
    Co-authored-by: Lari Hotari <lh...@users.noreply.github.com>
    Co-authored-by: JiangHaiting <ji...@apache.org>
---
 conf/broker.conf                                   |  5 +++
 .../apache/pulsar/broker/ServiceConfiguration.java |  6 ++++
 .../broker/admin/impl/PersistentTopicsBase.java    |  5 +++
 .../pulsar/broker/service/AbstractTopic.java       |  6 ++++
 .../broker/service/persistent/PersistentTopic.java | 34 ++++++++++++++++++
 .../broker/admin/AdminApiDelayedDeliveryTest.java  |  7 ++++
 .../pulsar/broker/admin/NamespacesV2Test.java      | 40 +++++++++++++++++++++
 .../pulsar/broker/admin/TopicPoliciesTest.java     | 27 +++++++++++++-
 .../service/persistent/DelayedDeliveryTest.java    | 41 ++++++++++++++++++++++
 .../pulsar/broker/transaction/TransactionTest.java | 37 +++++++++++++++++++
 .../policies/data/DelayedDeliveryPolicies.java     |  2 ++
 .../data/impl/DelayedDeliveryPoliciesImpl.java     |  9 ++++-
 .../pulsar/admin/cli/PulsarAdminToolTest.java      | 16 ++++-----
 .../org/apache/pulsar/admin/cli/CmdNamespaces.java |  6 ++++
 .../apache/pulsar/admin/cli/CmdTopicPolicies.java  |  6 ++++
 .../org/apache/pulsar/admin/cli/CmdTopics.java     |  6 ++++
 .../policies/data/HierarchyTopicPolicies.java      |  2 ++
 .../pulsar/common/policies/data/TopicPolicies.java |  1 +
 18 files changed, 246 insertions(+), 10 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index dc76ef07c70..ea98ad4a9b5 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -588,6 +588,11 @@ delayedDeliveryMaxNumBuckets=-1
 # fixed delays in messages in a different way.
 delayedDeliveryFixedDelayDetectionLookahead=50000
 
+# The max allowed delay for delayed delivery (in milliseconds). If the broker receives a message which exceeds this
+# max delay, then it will return an error to the producer.
+# The default value is 0 which means there is no limit on the max delivery delay.
+delayedDeliveryMaxDelayInMillis=0
+
 # Whether to enable acknowledge of batch local index.
 acknowledgmentAtBatchIndexLevelEnabled=false
 
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 547858f5b71..e088f50a05c 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -384,6 +384,12 @@ public class ServiceConfiguration implements PulsarConfiguration {
             + "logic to handle fixed delays in messages in a different way.")
     private long delayedDeliveryFixedDelayDetectionLookahead = 50_000;
 
+    @FieldContext(category = CATEGORY_SERVER, doc = """
+            The max allowed delay for delayed delivery (in milliseconds). If the broker receives a message which \
+            exceeds this max delay, then it will return an error to the producer. \
+            The default value is 0 which means there is no limit on the max delivery delay.""")
+    private long delayedDeliveryMaxDelayInMillis = 0;
+
     @FieldContext(category = CATEGORY_SERVER, doc = "Whether to enable the acknowledge of batch local index")
     private boolean acknowledgmentAtBatchIndexLevelEnabled = false;
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 06584344b9a..1731d4c73db 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -516,6 +516,8 @@ public class PersistentTopicsBase extends AdminResource {
                 topicPolicies.setDelayedDeliveryEnabled(deliveryPolicies == null ? null : deliveryPolicies.isActive());
                 topicPolicies.setDelayedDeliveryTickTimeMillis(
                         deliveryPolicies == null ? null : deliveryPolicies.getTickTime());
+                topicPolicies.setDelayedDeliveryMaxDelayInMillis(
+                        deliveryPolicies == null ? null : deliveryPolicies.getMaxDeliveryDelayInMillis());
                 return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
             });
     }
@@ -903,6 +905,7 @@ public class PersistentTopicsBase extends AdminResource {
                     delayedDeliveryPolicies = DelayedDeliveryPolicies.builder()
                             .tickTime(policies.getDelayedDeliveryTickTimeMillis())
                             .active(policies.getDelayedDeliveryEnabled())
+                            .maxDeliveryDelayInMillis(policies.getDelayedDeliveryMaxDelayInMillis())
                             .build();
                 }
                 if (delayedDeliveryPolicies == null && applied) {
@@ -911,6 +914,8 @@ public class PersistentTopicsBase extends AdminResource {
                         delayedDeliveryPolicies = DelayedDeliveryPolicies.builder()
                                 .tickTime(pulsar().getConfiguration().getDelayedDeliveryTickTimeMillis())
                                 .active(pulsar().getConfiguration().isDelayedDeliveryEnabled())
+                                .maxDeliveryDelayInMillis(
+                                        pulsar().getConfiguration().getDelayedDeliveryMaxDelayInMillis())
                                 .build();
                     }
                 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 65f2bc5a402..05defa60c05 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -244,6 +244,7 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP
         topicPolicies.getReplicatorDispatchRate().updateTopicValue(
             DispatchRateImpl.normalize(data.getReplicatorDispatchRate()));
         topicPolicies.getDelayedDeliveryTickTimeMillis().updateTopicValue(data.getDelayedDeliveryTickTimeMillis());
+        topicPolicies.getDelayedDeliveryMaxDelayInMillis().updateTopicValue(data.getDelayedDeliveryMaxDelayInMillis());
         topicPolicies.getSubscribeRate().updateTopicValue(SubscribeRate.normalize(data.getSubscribeRate()));
         topicPolicies.getSubscriptionDispatchRate().updateTopicValue(
             DispatchRateImpl.normalize(data.getSubscriptionDispatchRate()));
@@ -287,6 +288,9 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP
         topicPolicies.getDelayedDeliveryTickTimeMillis().updateNamespaceValue(
                 Optional.ofNullable(namespacePolicies.delayed_delivery_policies)
                         .map(DelayedDeliveryPolicies::getTickTime).orElse(null));
+        topicPolicies.getDelayedDeliveryMaxDelayInMillis().updateNamespaceValue(
+                Optional.ofNullable(namespacePolicies.delayed_delivery_policies)
+                        .map(DelayedDeliveryPolicies::getMaxDeliveryDelayInMillis).orElse(null));
         topicPolicies.getSubscriptionTypesEnabled().updateNamespaceValue(
                 subTypeStringsToEnumSet(namespacePolicies.subscription_types_enabled));
         updateNamespaceReplicatorDispatchRate(namespacePolicies,
@@ -387,6 +391,8 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP
         topicPolicies.getPublishRate().updateBrokerValue(publishRateInBroker(config));
         topicPolicies.getDelayedDeliveryEnabled().updateBrokerValue(config.isDelayedDeliveryEnabled());
         topicPolicies.getDelayedDeliveryTickTimeMillis().updateBrokerValue(config.getDelayedDeliveryTickTimeMillis());
+        topicPolicies.getDelayedDeliveryMaxDelayInMillis()
+                .updateBrokerValue(config.getDelayedDeliveryMaxDelayInMillis());
         topicPolicies.getCompactionThreshold().updateBrokerValue(config.getBrokerServiceCompactionThresholdInBytes());
         topicPolicies.getReplicationClusters().updateBrokerValue(Collections.emptyList());
         SchemaCompatibilityStrategy schemaCompatibilityStrategy = config.getSchemaCompatibilityStrategy();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index f045492e67b..9baafcb2e9e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -560,6 +560,14 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
             decrementPendingWriteOpsAndCheck();
             return;
         }
+        if (isExceedMaximumDeliveryDelay(headersAndPayload)) {
+            publishContext.completed(
+                    new NotAllowedException(
+                            String.format("Exceeds max allowed delivery delay of %s milliseconds",
+                                    getDelayedDeliveryMaxDelayInMillis())), -1, -1);
+            decrementPendingWriteOpsAndCheck();
+            return;
+        }
 
         MessageDeduplication.MessageDupStatus status =
                 messageDeduplication.isDuplicate(publishContext, headersAndPayload);
@@ -3876,6 +3884,14 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
             decrementPendingWriteOpsAndCheck();
             return;
         }
+        if (isExceedMaximumDeliveryDelay(headersAndPayload)) {
+            publishContext.completed(
+                    new NotAllowedException(
+                            String.format("Exceeds max allowed delivery delay of %s milliseconds",
+                                    getDelayedDeliveryMaxDelayInMillis())), -1, -1);
+            decrementPendingWriteOpsAndCheck();
+            return;
+        }
 
         MessageDeduplication.MessageDupStatus status =
                 messageDeduplication.isDuplicate(publishContext, headersAndPayload);
@@ -3942,6 +3958,10 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
         return topicPolicies.getDelayedDeliveryEnabled().get();
     }
 
+    public long getDelayedDeliveryMaxDelayInMillis() {
+        return topicPolicies.getDelayedDeliveryMaxDelayInMillis().get();
+    }
+
     public int getMaxUnackedMessagesOnSubscription() {
         return topicPolicies.getMaxUnackedMessagesOnSubscription().get();
     }
@@ -4093,4 +4113,18 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
     public Optional<TopicName> getShadowSourceTopic() {
         return Optional.ofNullable(shadowSourceTopic);
     }
+
+    protected boolean isExceedMaximumDeliveryDelay(ByteBuf headersAndPayload) {
+        if (isDelayedDeliveryEnabled()) {
+            long maxDeliveryDelayInMs = getDelayedDeliveryMaxDelayInMillis();
+            if (maxDeliveryDelayInMs > 0) {
+                headersAndPayload.markReaderIndex();
+                MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload);
+                headersAndPayload.resetReaderIndex();
+                return msgMetadata.hasDeliverAtTime()
+                        && msgMetadata.getDeliverAtTime() - msgMetadata.getPublishTime() > maxDeliveryDelayInMs;
+            }
+        }
+        return false;
+    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiDelayedDeliveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiDelayedDeliveryTest.java
index 90af0e963fe..c9752750a8d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiDelayedDeliveryTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiDelayedDeliveryTest.java
@@ -66,6 +66,7 @@ public class AdminApiDelayedDeliveryTest extends MockedPulsarServiceBaseTest {
         DelayedDeliveryPolicies delayedDeliveryPolicies = DelayedDeliveryPolicies.builder()
                 .tickTime(2000)
                 .active(false)
+                .maxDeliveryDelayInMillis(10_000)
                 .build();
         admin.namespaces().setDelayedDeliveryMessages(namespace, delayedDeliveryPolicies);
         //zk update takes time
@@ -124,6 +125,7 @@ public class AdminApiDelayedDeliveryTest extends MockedPulsarServiceBaseTest {
         DelayedDeliveryPolicies delayedDeliveryPolicies = DelayedDeliveryPolicies.builder()
                 .tickTime(3)
                 .active(true)
+                .maxDeliveryDelayInMillis(5000)
                 .build();
         admin.namespaces().setDelayedDeliveryMessages(namespace, delayedDeliveryPolicies);
         Awaitility.await().untilAsserted(()
@@ -151,12 +153,14 @@ public class AdminApiDelayedDeliveryTest extends MockedPulsarServiceBaseTest {
                 DelayedDeliveryPolicies.builder()
                         .tickTime(conf.getDelayedDeliveryTickTimeMillis())
                         .active(conf.isDelayedDeliveryEnabled())
+                        .maxDeliveryDelayInMillis(conf.getDelayedDeliveryMaxDelayInMillis())
                         .build();
         assertEquals(admin.topics().getDelayedDeliveryPolicy(topic, true), brokerLevelPolicy);
         //set namespace-level policy
         DelayedDeliveryPolicies namespaceLevelPolicy = DelayedDeliveryPolicies.builder()
                 .tickTime(100)
                 .active(true)
+                .maxDeliveryDelayInMillis(4000)
                 .build();
         admin.namespaces().setDelayedDeliveryMessages(namespace, namespaceLevelPolicy);
         Awaitility.await().untilAsserted(()
@@ -164,10 +168,12 @@ public class AdminApiDelayedDeliveryTest extends MockedPulsarServiceBaseTest {
         DelayedDeliveryPolicies policyFromBroker = admin.topics().getDelayedDeliveryPolicy(topic, true);
         assertEquals(policyFromBroker.getTickTime(), 100);
         assertTrue(policyFromBroker.isActive());
+        assertEquals(policyFromBroker.getMaxDeliveryDelayInMillis(), 4000);
         // set topic-level policy
         DelayedDeliveryPolicies topicLevelPolicy = DelayedDeliveryPolicies.builder()
                 .tickTime(200)
                 .active(true)
+                .maxDeliveryDelayInMillis(5000)
                 .build();
         admin.topics().setDelayedDeliveryPolicy(topic, topicLevelPolicy);
         Awaitility.await().untilAsserted(()
@@ -175,6 +181,7 @@ public class AdminApiDelayedDeliveryTest extends MockedPulsarServiceBaseTest {
         policyFromBroker = admin.topics().getDelayedDeliveryPolicy(topic, true);
         assertEquals(policyFromBroker.getTickTime(), 200);
         assertTrue(policyFromBroker.isActive());
+        assertEquals(policyFromBroker.getMaxDeliveryDelayInMillis(), 5000);
         //remove topic-level policy
         admin.topics().removeDelayedDeliveryPolicy(topic);
         Awaitility.await().untilAsserted(()
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesV2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesV2Test.java
index cec30762194..c1e8dfa3099 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesV2Test.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesV2Test.java
@@ -39,6 +39,7 @@ import org.apache.pulsar.broker.web.PulsarWebResource;
 import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
 import org.apache.pulsar.common.policies.data.DispatchRate;
 import org.apache.pulsar.common.policies.data.PolicyName;
 import org.apache.pulsar.common.policies.data.PolicyOperation;
@@ -196,4 +197,43 @@ public class NamespacesV2Test extends MockedPulsarServiceBaseTest {
                 this.testTenant, this.testNamespace));
         assertTrue(Objects.isNull(dispatchRate));
     }
+
+    @Test
+    public void testOperationDelayedDelivery() throws Exception {
+        boolean isActive = true;
+        long tickTime = 1000;
+        long maxDeliveryDelayInMillis = 5000;
+        // 1. set delayed delivery policy
+        namespaces.setDelayedDeliveryPolicies(this.testTenant, this.testNamespace,
+                DelayedDeliveryPolicies.builder()
+                        .active(isActive)
+                        .tickTime(tickTime)
+                        .maxDeliveryDelayInMillis(maxDeliveryDelayInMillis)
+                        .build());
+
+        // 2. query delayed delivery policy & check
+        DelayedDeliveryPolicies policy =
+                (DelayedDeliveryPolicies) asyncRequests(response -> namespaces.getDelayedDeliveryPolicies(response,
+                        this.testTenant, this.testNamespace));
+        assertEquals(policy.isActive(), isActive);
+        assertEquals(policy.getTickTime(), tickTime);
+        assertEquals(policy.getMaxDeliveryDelayInMillis(), maxDeliveryDelayInMillis);
+
+        // 3. remove & check
+        namespaces.removeDelayedDeliveryPolicies(this.testTenant, this.testNamespace);
+        policy =
+                (DelayedDeliveryPolicies) asyncRequests(response -> namespaces.getDelayedDeliveryPolicies(response,
+                        this.testTenant, this.testNamespace));
+        assertTrue(Objects.isNull(policy));
+
+        // 4. invalid namespace check
+        String invalidNamespace = this.testNamespace + "/";
+        try {
+            namespaces.setDelayedDeliveryPolicies(this.testTenant, invalidNamespace,
+                    DelayedDeliveryPolicies.builder().build());
+            fail("should have failed");
+        } catch (RestException e) {
+            assertEquals(e.getResponse().getStatus(), Response.Status.PRECONDITION_FAILED.getStatusCode());
+        }
+    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index 023b77a3dc0..9f56acfb57f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -74,6 +74,7 @@ import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
 import org.apache.pulsar.common.policies.data.DispatchRate;
 import org.apache.pulsar.common.policies.data.HierarchyTopicPolicies;
 import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
@@ -3163,6 +3164,31 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest {
                 });
     }
 
+    @Test
+    public void testDelayedDeliveryPolicy() throws Exception {
+        final String topic = testTopic + UUID.randomUUID();
+        admin.topics().createNonPartitionedTopic(topic);
+
+        boolean isActive = true;
+        long tickTime = 1000;
+        long maxDeliveryDelayInMillis = 5000;
+        DelayedDeliveryPolicies policy = DelayedDeliveryPolicies.builder()
+                .active(isActive)
+                .tickTime(tickTime)
+                .maxDeliveryDelayInMillis(maxDeliveryDelayInMillis)
+                .build();
+
+        admin.topicPolicies().setDelayedDeliveryPolicy(topic, policy);
+        Awaitility.await()
+                .untilAsserted(() -> Assert.assertEquals(admin.topicPolicies().getDelayedDeliveryPolicy(topic), policy));
+
+        admin.topicPolicies().removeDelayedDeliveryPolicy(topic);
+        Awaitility.await()
+                .untilAsserted(() -> Assert.assertNull(admin.topicPolicies().getDelayedDeliveryPolicy(topic)));
+
+        admin.topics().delete(topic, true);
+    }
+    
     @Test
     public void testUpdateRetentionWithPartialFailure() throws Exception {
         String tpName = BrokerTestUtil.newUniqueName("persistent://" + myNamespace + "/tp");
@@ -3207,5 +3233,4 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest {
         admin.namespaces().removeRetention(myNamespace);
         admin.topics().delete(tpName, false);
     }
-
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
index 086d434b81d..ae7edde4496 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
@@ -23,6 +23,7 @@ import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
@@ -337,6 +338,7 @@ public class DelayedDeliveryTest extends ProducerConsumerBase {
         DelayedDeliveryPolicies delayedDeliveryPolicies = DelayedDeliveryPolicies.builder()
                 .tickTime(2000)
                 .active(false)
+                .maxDeliveryDelayInMillis(5000)
                 .build();
         admin.topics().setDelayedDeliveryPolicy(topicName, delayedDeliveryPolicies);
         //wait for update
@@ -349,6 +351,7 @@ public class DelayedDeliveryTest extends ProducerConsumerBase {
 
         assertFalse(admin.topics().getDelayedDeliveryPolicy(topicName).isActive());
         assertEquals(2000, admin.topics().getDelayedDeliveryPolicy(topicName).getTickTime());
+        assertEquals(5000, admin.topics().getDelayedDeliveryPolicy(topicName).getMaxDeliveryDelayInMillis());
 
         admin.topics().removeDelayedDeliveryPolicy(topicName);
         //wait for update
@@ -622,4 +625,42 @@ public class DelayedDeliveryTest extends ProducerConsumerBase {
         }
     }
 
+    @Test
+    public void testDelayedDeliveryExceedsMaxDelay() throws Exception {
+        long maxDeliveryDelayInMillis = 5000;
+        String topic = BrokerTestUtil.newUniqueName("testDelayedDeliveryExceedsMaxDelay");
+
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topic)
+                .create();
+
+        admin.topicPolicies().setDelayedDeliveryPolicy(topic,
+                DelayedDeliveryPolicies.builder()
+                        .active(true)
+                        .tickTime(100L)
+                        .maxDeliveryDelayInMillis(maxDeliveryDelayInMillis)
+                        .build());
+
+        //wait for update
+        for (int i = 0; i < 50; i++) {
+            Thread.sleep(100);
+            if (admin.topics().getDelayedDeliveryPolicy(topic) != null) {
+                break;
+            }
+        }
+
+        try {
+            producer.newMessage()
+                    .value("msg")
+                    .deliverAfter(6, TimeUnit.SECONDS)
+                    .send();
+
+            producer.flush();
+            fail("Should have thrown NotAllowedException due to exceeding maxDeliveryDelayInMillis");
+        } catch (PulsarClientException.NotAllowedException ex) {
+            assertEquals(ex.getMessage(), "Exceeds max allowed delivery delay of "
+                    + maxDeliveryDelayInMillis + " milliseconds");
+        }
+    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 81ed4311374..a0a28262faa 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -133,6 +133,7 @@ import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.SystemTopicNames;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
 import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.TopicPolicies;
@@ -1909,4 +1910,40 @@ public class TransactionTest extends TransactionTestBase {
         Assert.assertEquals(result, List.of("V4", "V5", "V6"));
     }
 
+    @Test
+    public void testDelayedDeliveryExceedsMaxDelay() throws Exception {
+        final long maxDeliveryDelayInMillis = 5000;
+        final String namespace = "tnx/ns-prechecks";
+        final String topic = "persistent://" + namespace + "/test_transaction_topic" + UUID.randomUUID();
+        admin.namespaces().createNamespace(namespace);
+        admin.topics().createNonPartitionedTopic(topic);
+        admin.topicPolicies().setDelayedDeliveryPolicy(topic,
+                DelayedDeliveryPolicies.builder()
+                        .active(true)
+                        .tickTime(100L)
+                        .maxDeliveryDelayInMillis(maxDeliveryDelayInMillis)
+                        .build());
+
+        @Cleanup
+        Producer<byte[]> producer = this.pulsarClient.newProducer()
+                .topic(topic)
+                .sendTimeout(5, TimeUnit.SECONDS)
+                .addEncryptionKey("my-app-key")
+                .defaultCryptoKeyReader("file:./src/test/resources/certificate/public-key.client-rsa.pem")
+                .create();
+
+        try {
+            Transaction txn = pulsarClient.newTransaction()
+                    .withTransactionTimeout(5, TimeUnit.SECONDS).build().get();
+            producer.newMessage(txn)
+                    .value(UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8))
+                    .deliverAfter(6, TimeUnit.SECONDS)
+                    .send();
+            txn.commit();
+            fail("Should have thrown NotAllowedException due to exceeding maxDeliveryDelayInMillis");
+        } catch (PulsarClientException.NotAllowedException ex) {
+            assertEquals(ex.getMessage(), "Exceeds max allowed delivery delay of "
+                    + maxDeliveryDelayInMillis + " milliseconds");
+        }
+    }
 }
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/DelayedDeliveryPolicies.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/DelayedDeliveryPolicies.java
index 555896ab3e5..f940ecd1b86 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/DelayedDeliveryPolicies.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/DelayedDeliveryPolicies.java
@@ -26,10 +26,12 @@ import org.apache.pulsar.common.policies.data.impl.DelayedDeliveryPoliciesImpl;
 public interface DelayedDeliveryPolicies {
     long getTickTime();
     boolean isActive();
+    long getMaxDeliveryDelayInMillis();
 
     interface Builder {
         Builder tickTime(long tickTime);
         Builder active(boolean active);
+        Builder maxDeliveryDelayInMillis(long maxDeliveryDelayInMillis);
         DelayedDeliveryPolicies build();
     }
 
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/impl/DelayedDeliveryPoliciesImpl.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/impl/DelayedDeliveryPoliciesImpl.java
index 408217f3637..580ac6c95fa 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/impl/DelayedDeliveryPoliciesImpl.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/impl/DelayedDeliveryPoliciesImpl.java
@@ -32,6 +32,7 @@ import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
 public final class DelayedDeliveryPoliciesImpl implements DelayedDeliveryPolicies {
     private long tickTime;
     private boolean active;
+    private long maxDeliveryDelayInMillis;
 
     public static DelayedDeliveryPoliciesImplBuilder builder() {
         return new DelayedDeliveryPoliciesImplBuilder();
@@ -40,6 +41,7 @@ public final class DelayedDeliveryPoliciesImpl implements DelayedDeliveryPolicie
     public static class DelayedDeliveryPoliciesImplBuilder implements DelayedDeliveryPolicies.Builder {
         private long tickTime;
         private boolean active;
+        private long maxDeliveryDelayInMillis;
 
         public DelayedDeliveryPoliciesImplBuilder tickTime(long tickTime) {
             this.tickTime = tickTime;
@@ -51,8 +53,13 @@ public final class DelayedDeliveryPoliciesImpl implements DelayedDeliveryPolicie
             return this;
         }
 
+        public DelayedDeliveryPoliciesImplBuilder maxDeliveryDelayInMillis(long maxDeliveryDelayInMillis) {
+            this.maxDeliveryDelayInMillis = maxDeliveryDelayInMillis;
+            return this;
+        }
+
         public DelayedDeliveryPoliciesImpl build() {
-            return new DelayedDeliveryPoliciesImpl(tickTime, active);
+            return new DelayedDeliveryPoliciesImpl(tickTime, active, maxDeliveryDelayInMillis);
         }
     }
 }
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 0ab634680cd..2d567a7528d 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -680,9 +680,9 @@ public class PulsarAdminToolTest {
         namespaces.run(split("remove-retention myprop/clust/ns1"));
         verify(mockNamespaces).removeRetention("myprop/clust/ns1");
 
-        namespaces.run(split("set-delayed-delivery myprop/clust/ns1 -e -t 1s"));
+        namespaces.run(split("set-delayed-delivery myprop/clust/ns1 -e -t 1s -md 5s"));
         verify(mockNamespaces).setDelayedDeliveryMessages("myprop/clust/ns1",
-                DelayedDeliveryPolicies.builder().tickTime(1000).active(true).build());
+                DelayedDeliveryPolicies.builder().tickTime(1000).active(true).maxDeliveryDelayInMillis(5000).build());
 
         namespaces.run(split("get-delayed-delivery myprop/clust/ns1"));
         verify(mockNamespaces).getDelayedDelivery("myprop/clust/ns1");
@@ -1210,9 +1210,9 @@ public class PulsarAdminToolTest {
 
         cmdTopics.run(split("get-delayed-delivery persistent://myprop/clust/ns1/ds1"));
         verify(mockTopicsPolicies).getDelayedDeliveryPolicy("persistent://myprop/clust/ns1/ds1", false);
-        cmdTopics.run(split("set-delayed-delivery persistent://myprop/clust/ns1/ds1 -t 10s --enable"));
+        cmdTopics.run(split("set-delayed-delivery persistent://myprop/clust/ns1/ds1 -t 10s --enable --maxDelay 5s"));
         verify(mockTopicsPolicies).setDelayedDeliveryPolicy("persistent://myprop/clust/ns1/ds1",
-                DelayedDeliveryPolicies.builder().tickTime(10000).active(true).build());
+                DelayedDeliveryPolicies.builder().tickTime(10000).active(true).maxDeliveryDelayInMillis(5000).build());
         cmdTopics.run(split("remove-delayed-delivery persistent://myprop/clust/ns1/ds1"));
         verify(mockTopicsPolicies).removeDelayedDeliveryPolicy("persistent://myprop/clust/ns1/ds1") ;
 
@@ -1377,9 +1377,9 @@ public class PulsarAdminToolTest {
 
         cmdTopics.run(split("get-delayed-delivery persistent://myprop/clust/ns1/ds1 -g"));
         verify(mockGlobalTopicsPolicies).getDelayedDeliveryPolicy("persistent://myprop/clust/ns1/ds1", false);
-        cmdTopics.run(split("set-delayed-delivery persistent://myprop/clust/ns1/ds1 -t 10s --enable -g"));
+        cmdTopics.run(split("set-delayed-delivery persistent://myprop/clust/ns1/ds1 -t 10s --enable -md 5s -g"));
         verify(mockGlobalTopicsPolicies).setDelayedDeliveryPolicy("persistent://myprop/clust/ns1/ds1",
-                DelayedDeliveryPolicies.builder().tickTime(10000).active(true).build());
+                DelayedDeliveryPolicies.builder().tickTime(10000).active(true).maxDeliveryDelayInMillis(5000).build());
         cmdTopics.run(split("remove-delayed-delivery persistent://myprop/clust/ns1/ds1 -g"));
         verify(mockGlobalTopicsPolicies).removeDelayedDeliveryPolicy("persistent://myprop/clust/ns1/ds1") ;
 
@@ -1806,9 +1806,9 @@ public class PulsarAdminToolTest {
 
         cmdTopics.run(split("get-delayed-delivery persistent://myprop/clust/ns1/ds1"));
         verify(mockTopics).getDelayedDeliveryPolicy("persistent://myprop/clust/ns1/ds1", false);
-        cmdTopics.run(split("set-delayed-delivery persistent://myprop/clust/ns1/ds1 -t 10s --enable"));
+        cmdTopics.run(split("set-delayed-delivery persistent://myprop/clust/ns1/ds1 -t 10s -md 5s --enable"));
         verify(mockTopics).setDelayedDeliveryPolicy("persistent://myprop/clust/ns1/ds1",
-                DelayedDeliveryPolicies.builder().tickTime(10000).active(true).build());
+                DelayedDeliveryPolicies.builder().tickTime(10000).active(true).maxDeliveryDelayInMillis(5000).build());
         cmdTopics.run(split("remove-delayed-delivery persistent://myprop/clust/ns1/ds1"));
         verify(mockTopics).removeDelayedDeliveryPolicy("persistent://myprop/clust/ns1/ds1") ;
 
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index 4da7bd83154..4394fc00027 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -1602,6 +1602,11 @@ public class CmdNamespaces extends CmdBase {
                 converter = TimeUnitToMillisConverter.class)
         private Long delayedDeliveryTimeInMills = 1000L;
 
+        @Parameter(names = { "--maxDelay", "-md" },
+                description = "The max allowed delay for delayed delivery. (eg: 1s, 10s, 1m, 5h, 3d)",
+                converter = TimeUnitToMillisConverter.class)
+        private Long delayedDeliveryMaxDelayInMillis = 0L;
+
         @Override
         void run() throws PulsarAdminException {
             String namespace = validateNamespace(params);
@@ -1612,6 +1617,7 @@ public class CmdNamespaces extends CmdBase {
             getAdmin().namespaces().setDelayedDeliveryMessages(namespace, DelayedDeliveryPolicies.builder()
                     .tickTime(delayedDeliveryTimeInMills)
                     .active(enable)
+                    .maxDeliveryDelayInMillis(delayedDeliveryMaxDelayInMillis)
                     .build());
         }
     }
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
index 421ccec1403..c27cbd06849 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
@@ -723,6 +723,11 @@ public class CmdTopicPolicies extends CmdBase {
                 + "If set to true, the policy will be replicate to other clusters asynchronously")
         private boolean isGlobal;
 
+        @Parameter(names = { "--maxDelay", "-md" },
+                description = "The max allowed delay for delayed delivery. (eg: 1s, 10s, 1m, 5h, 3d)",
+                converter = TimeUnitToMillisConverter.class)
+        private Long delayedDeliveryMaxDelayInMillis = 0L;
+
         @Override
         void run() throws PulsarAdminException {
             String topicName = validateTopicName(params);
@@ -733,6 +738,7 @@ public class CmdTopicPolicies extends CmdBase {
             getTopicPolicies(isGlobal).setDelayedDeliveryPolicy(topicName, DelayedDeliveryPolicies.builder()
                     .tickTime(delayedDeliveryTimeInMills)
                     .active(enable)
+                    .maxDeliveryDelayInMillis(delayedDeliveryMaxDelayInMillis)
                     .build());
         }
     }
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index 508642e63ae..3e2d9d1c13c 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -1813,6 +1813,11 @@ public class CmdTopics extends CmdBase {
                 converter = TimeUnitToMillisConverter.class)
         private Long delayedDeliveryTimeInMills = 1_000L;
 
+        @Parameter(names = { "--maxDelay", "-md" },
+                description = "The max allowed delay for delayed delivery. (eg: 1s, 10s, 1m, 5h, 3d)",
+                converter = TimeUnitToMillisConverter.class)
+        private Long delayedDeliveryMaxDelayInMillis = 0L;
+
         @Override
         void run() throws PulsarAdminException {
             String topicName = validateTopicName(params);
@@ -1823,6 +1828,7 @@ public class CmdTopics extends CmdBase {
             getTopics().setDelayedDeliveryPolicy(topicName, DelayedDeliveryPolicies.builder()
                     .tickTime(delayedDeliveryTimeInMills)
                     .active(enable)
+                    .maxDeliveryDelayInMillis(delayedDeliveryMaxDelayInMillis)
                     .build());
         }
     }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java
index 7f841ec8975..4edb033498b 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java
@@ -51,6 +51,7 @@ public class HierarchyTopicPolicies {
     final PolicyHierarchyValue<Boolean> delayedDeliveryEnabled;
     final PolicyHierarchyValue<Boolean> dispatcherPauseOnAckStatePersistentEnabled;
     final PolicyHierarchyValue<Long> delayedDeliveryTickTimeMillis;
+    final PolicyHierarchyValue<Long> delayedDeliveryMaxDelayInMillis;
     final PolicyHierarchyValue<DispatchRateImpl> replicatorDispatchRate;
     final PolicyHierarchyValue<Integer> maxConsumersPerSubscription;
     final PolicyHierarchyValue<SubscribeRate> subscribeRate;
@@ -84,6 +85,7 @@ public class HierarchyTopicPolicies {
         delayedDeliveryEnabled = new PolicyHierarchyValue<>();
         dispatcherPauseOnAckStatePersistentEnabled = new PolicyHierarchyValue<>();
         delayedDeliveryTickTimeMillis = new PolicyHierarchyValue<>();
+        delayedDeliveryMaxDelayInMillis = new PolicyHierarchyValue<>();
         replicatorDispatchRate = new PolicyHierarchyValue<>();
         compactionThreshold = new PolicyHierarchyValue<>();
         subscribeRate = new PolicyHierarchyValue<>();
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
index eede0ab794d..5403b84a4f7 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
@@ -62,6 +62,7 @@ public class TopicPolicies {
     private Integer maxUnackedMessagesOnSubscription;
     private Long delayedDeliveryTickTimeMillis;
     private Boolean delayedDeliveryEnabled;
+    private Long delayedDeliveryMaxDelayInMillis;
     private Boolean dispatcherPauseOnAckStatePersistentEnabled;
     private OffloadPoliciesImpl offloadPolicies;
     private InactiveTopicPolicies inactiveTopicPolicies;