You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2024/01/26 13:04:56 UTC
(pulsar) branch master updated: [improve][broker] PIP-315: Configurable max delay limit for delayed delivery (#21798)
This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d37d31f6441 [improve][broker] PIP-315: Configurable max delay limit for delayed delivery (#21798)
d37d31f6441 is described below
commit d37d31f64413b896a051e6d9fa0acf07201e02a3
Author: Kevin Lu <ke...@gmail.com>
AuthorDate: Fri Jan 26 05:04:50 2024 -0800
[improve][broker] PIP-315: Configurable max delay limit for delayed delivery (#21798)
Co-authored-by: Lari Hotari <lh...@users.noreply.github.com>
Co-authored-by: JiangHaiting <ji...@apache.org>
---
conf/broker.conf | 5 +++
.../apache/pulsar/broker/ServiceConfiguration.java | 6 ++++
.../broker/admin/impl/PersistentTopicsBase.java | 5 +++
.../pulsar/broker/service/AbstractTopic.java | 6 ++++
.../broker/service/persistent/PersistentTopic.java | 34 ++++++++++++++++++
.../broker/admin/AdminApiDelayedDeliveryTest.java | 7 ++++
.../pulsar/broker/admin/NamespacesV2Test.java | 40 +++++++++++++++++++++
.../pulsar/broker/admin/TopicPoliciesTest.java | 27 +++++++++++++-
.../service/persistent/DelayedDeliveryTest.java | 41 ++++++++++++++++++++++
.../pulsar/broker/transaction/TransactionTest.java | 37 +++++++++++++++++++
.../policies/data/DelayedDeliveryPolicies.java | 2 ++
.../data/impl/DelayedDeliveryPoliciesImpl.java | 9 ++++-
.../pulsar/admin/cli/PulsarAdminToolTest.java | 16 ++++-----
.../org/apache/pulsar/admin/cli/CmdNamespaces.java | 6 ++++
.../apache/pulsar/admin/cli/CmdTopicPolicies.java | 6 ++++
.../org/apache/pulsar/admin/cli/CmdTopics.java | 6 ++++
.../policies/data/HierarchyTopicPolicies.java | 2 ++
.../pulsar/common/policies/data/TopicPolicies.java | 1 +
18 files changed, 246 insertions(+), 10 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index dc76ef07c70..ea98ad4a9b5 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -588,6 +588,11 @@ delayedDeliveryMaxNumBuckets=-1
# fixed delays in messages in a different way.
delayedDeliveryFixedDelayDetectionLookahead=50000
+# The max allowed delay for delayed delivery (in milliseconds). If the broker receives a message which exceeds this
+# max delay, then it will return an error to the producer.
+# The default value is 0 which means there is no limit on the max delivery delay.
+delayedDeliveryMaxDelayInMillis=0
+
# Whether to enable acknowledge of batch local index.
acknowledgmentAtBatchIndexLevelEnabled=false
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 547858f5b71..e088f50a05c 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -384,6 +384,12 @@ public class ServiceConfiguration implements PulsarConfiguration {
+ "logic to handle fixed delays in messages in a different way.")
private long delayedDeliveryFixedDelayDetectionLookahead = 50_000;
+ @FieldContext(category = CATEGORY_SERVER, doc = """
+ The max allowed delay for delayed delivery (in milliseconds). If the broker receives a message which \
+ exceeds this max delay, then it will return an error to the producer. \
+ The default value is 0 which means there is no limit on the max delivery delay.""")
+ private long delayedDeliveryMaxDelayInMillis = 0;
+
@FieldContext(category = CATEGORY_SERVER, doc = "Whether to enable the acknowledge of batch local index")
private boolean acknowledgmentAtBatchIndexLevelEnabled = false;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 06584344b9a..1731d4c73db 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -516,6 +516,8 @@ public class PersistentTopicsBase extends AdminResource {
topicPolicies.setDelayedDeliveryEnabled(deliveryPolicies == null ? null : deliveryPolicies.isActive());
topicPolicies.setDelayedDeliveryTickTimeMillis(
deliveryPolicies == null ? null : deliveryPolicies.getTickTime());
+ topicPolicies.setDelayedDeliveryMaxDelayInMillis(
+ deliveryPolicies == null ? null : deliveryPolicies.getMaxDeliveryDelayInMillis());
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
});
}
@@ -903,6 +905,7 @@ public class PersistentTopicsBase extends AdminResource {
delayedDeliveryPolicies = DelayedDeliveryPolicies.builder()
.tickTime(policies.getDelayedDeliveryTickTimeMillis())
.active(policies.getDelayedDeliveryEnabled())
+ .maxDeliveryDelayInMillis(policies.getDelayedDeliveryMaxDelayInMillis())
.build();
}
if (delayedDeliveryPolicies == null && applied) {
@@ -911,6 +914,8 @@ public class PersistentTopicsBase extends AdminResource {
delayedDeliveryPolicies = DelayedDeliveryPolicies.builder()
.tickTime(pulsar().getConfiguration().getDelayedDeliveryTickTimeMillis())
.active(pulsar().getConfiguration().isDelayedDeliveryEnabled())
+ .maxDeliveryDelayInMillis(
+ pulsar().getConfiguration().getDelayedDeliveryMaxDelayInMillis())
.build();
}
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 65f2bc5a402..05defa60c05 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -244,6 +244,7 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP
topicPolicies.getReplicatorDispatchRate().updateTopicValue(
DispatchRateImpl.normalize(data.getReplicatorDispatchRate()));
topicPolicies.getDelayedDeliveryTickTimeMillis().updateTopicValue(data.getDelayedDeliveryTickTimeMillis());
+ topicPolicies.getDelayedDeliveryMaxDelayInMillis().updateTopicValue(data.getDelayedDeliveryMaxDelayInMillis());
topicPolicies.getSubscribeRate().updateTopicValue(SubscribeRate.normalize(data.getSubscribeRate()));
topicPolicies.getSubscriptionDispatchRate().updateTopicValue(
DispatchRateImpl.normalize(data.getSubscriptionDispatchRate()));
@@ -287,6 +288,9 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP
topicPolicies.getDelayedDeliveryTickTimeMillis().updateNamespaceValue(
Optional.ofNullable(namespacePolicies.delayed_delivery_policies)
.map(DelayedDeliveryPolicies::getTickTime).orElse(null));
+ topicPolicies.getDelayedDeliveryMaxDelayInMillis().updateNamespaceValue(
+ Optional.ofNullable(namespacePolicies.delayed_delivery_policies)
+ .map(DelayedDeliveryPolicies::getMaxDeliveryDelayInMillis).orElse(null));
topicPolicies.getSubscriptionTypesEnabled().updateNamespaceValue(
subTypeStringsToEnumSet(namespacePolicies.subscription_types_enabled));
updateNamespaceReplicatorDispatchRate(namespacePolicies,
@@ -387,6 +391,8 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP
topicPolicies.getPublishRate().updateBrokerValue(publishRateInBroker(config));
topicPolicies.getDelayedDeliveryEnabled().updateBrokerValue(config.isDelayedDeliveryEnabled());
topicPolicies.getDelayedDeliveryTickTimeMillis().updateBrokerValue(config.getDelayedDeliveryTickTimeMillis());
+ topicPolicies.getDelayedDeliveryMaxDelayInMillis()
+ .updateBrokerValue(config.getDelayedDeliveryMaxDelayInMillis());
topicPolicies.getCompactionThreshold().updateBrokerValue(config.getBrokerServiceCompactionThresholdInBytes());
topicPolicies.getReplicationClusters().updateBrokerValue(Collections.emptyList());
SchemaCompatibilityStrategy schemaCompatibilityStrategy = config.getSchemaCompatibilityStrategy();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index f045492e67b..9baafcb2e9e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -560,6 +560,14 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
decrementPendingWriteOpsAndCheck();
return;
}
+ if (isExceedMaximumDeliveryDelay(headersAndPayload)) {
+ publishContext.completed(
+ new NotAllowedException(
+ String.format("Exceeds max allowed delivery delay of %s milliseconds",
+ getDelayedDeliveryMaxDelayInMillis())), -1, -1);
+ decrementPendingWriteOpsAndCheck();
+ return;
+ }
MessageDeduplication.MessageDupStatus status =
messageDeduplication.isDuplicate(publishContext, headersAndPayload);
@@ -3876,6 +3884,14 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
decrementPendingWriteOpsAndCheck();
return;
}
+ if (isExceedMaximumDeliveryDelay(headersAndPayload)) {
+ publishContext.completed(
+ new NotAllowedException(
+ String.format("Exceeds max allowed delivery delay of %s milliseconds",
+ getDelayedDeliveryMaxDelayInMillis())), -1, -1);
+ decrementPendingWriteOpsAndCheck();
+ return;
+ }
MessageDeduplication.MessageDupStatus status =
messageDeduplication.isDuplicate(publishContext, headersAndPayload);
@@ -3942,6 +3958,10 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
return topicPolicies.getDelayedDeliveryEnabled().get();
}
+ public long getDelayedDeliveryMaxDelayInMillis() {
+ return topicPolicies.getDelayedDeliveryMaxDelayInMillis().get();
+ }
+
public int getMaxUnackedMessagesOnSubscription() {
return topicPolicies.getMaxUnackedMessagesOnSubscription().get();
}
@@ -4093,4 +4113,18 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
public Optional<TopicName> getShadowSourceTopic() {
return Optional.ofNullable(shadowSourceTopic);
}
+
+ protected boolean isExceedMaximumDeliveryDelay(ByteBuf headersAndPayload) {
+ if (isDelayedDeliveryEnabled()) {
+ long maxDeliveryDelayInMs = getDelayedDeliveryMaxDelayInMillis();
+ if (maxDeliveryDelayInMs > 0) {
+ headersAndPayload.markReaderIndex();
+ MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload);
+ headersAndPayload.resetReaderIndex();
+ return msgMetadata.hasDeliverAtTime()
+ && msgMetadata.getDeliverAtTime() - msgMetadata.getPublishTime() > maxDeliveryDelayInMs;
+ }
+ }
+ return false;
+ }
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiDelayedDeliveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiDelayedDeliveryTest.java
index 90af0e963fe..c9752750a8d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiDelayedDeliveryTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiDelayedDeliveryTest.java
@@ -66,6 +66,7 @@ public class AdminApiDelayedDeliveryTest extends MockedPulsarServiceBaseTest {
DelayedDeliveryPolicies delayedDeliveryPolicies = DelayedDeliveryPolicies.builder()
.tickTime(2000)
.active(false)
+ .maxDeliveryDelayInMillis(10_000)
.build();
admin.namespaces().setDelayedDeliveryMessages(namespace, delayedDeliveryPolicies);
//zk update takes time
@@ -124,6 +125,7 @@ public class AdminApiDelayedDeliveryTest extends MockedPulsarServiceBaseTest {
DelayedDeliveryPolicies delayedDeliveryPolicies = DelayedDeliveryPolicies.builder()
.tickTime(3)
.active(true)
+ .maxDeliveryDelayInMillis(5000)
.build();
admin.namespaces().setDelayedDeliveryMessages(namespace, delayedDeliveryPolicies);
Awaitility.await().untilAsserted(()
@@ -151,12 +153,14 @@ public class AdminApiDelayedDeliveryTest extends MockedPulsarServiceBaseTest {
DelayedDeliveryPolicies.builder()
.tickTime(conf.getDelayedDeliveryTickTimeMillis())
.active(conf.isDelayedDeliveryEnabled())
+ .maxDeliveryDelayInMillis(conf.getDelayedDeliveryMaxDelayInMillis())
.build();
assertEquals(admin.topics().getDelayedDeliveryPolicy(topic, true), brokerLevelPolicy);
//set namespace-level policy
DelayedDeliveryPolicies namespaceLevelPolicy = DelayedDeliveryPolicies.builder()
.tickTime(100)
.active(true)
+ .maxDeliveryDelayInMillis(4000)
.build();
admin.namespaces().setDelayedDeliveryMessages(namespace, namespaceLevelPolicy);
Awaitility.await().untilAsserted(()
@@ -164,10 +168,12 @@ public class AdminApiDelayedDeliveryTest extends MockedPulsarServiceBaseTest {
DelayedDeliveryPolicies policyFromBroker = admin.topics().getDelayedDeliveryPolicy(topic, true);
assertEquals(policyFromBroker.getTickTime(), 100);
assertTrue(policyFromBroker.isActive());
+ assertEquals(policyFromBroker.getMaxDeliveryDelayInMillis(), 4000);
// set topic-level policy
DelayedDeliveryPolicies topicLevelPolicy = DelayedDeliveryPolicies.builder()
.tickTime(200)
.active(true)
+ .maxDeliveryDelayInMillis(5000)
.build();
admin.topics().setDelayedDeliveryPolicy(topic, topicLevelPolicy);
Awaitility.await().untilAsserted(()
@@ -175,6 +181,7 @@ public class AdminApiDelayedDeliveryTest extends MockedPulsarServiceBaseTest {
policyFromBroker = admin.topics().getDelayedDeliveryPolicy(topic, true);
assertEquals(policyFromBroker.getTickTime(), 200);
assertTrue(policyFromBroker.isActive());
+ assertEquals(policyFromBroker.getMaxDeliveryDelayInMillis(), 5000);
//remove topic-level policy
admin.topics().removeDelayedDeliveryPolicy(topic);
Awaitility.await().untilAsserted(()
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesV2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesV2Test.java
index cec30762194..c1e8dfa3099 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesV2Test.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesV2Test.java
@@ -39,6 +39,7 @@ import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.PolicyName;
import org.apache.pulsar.common.policies.data.PolicyOperation;
@@ -196,4 +197,43 @@ public class NamespacesV2Test extends MockedPulsarServiceBaseTest {
this.testTenant, this.testNamespace));
assertTrue(Objects.isNull(dispatchRate));
}
+
+ @Test
+ public void testOperationDelayedDelivery() throws Exception {
+ boolean isActive = true;
+ long tickTime = 1000;
+ long maxDeliveryDelayInMillis = 5000;
+ // 1. set delayed delivery policy
+ namespaces.setDelayedDeliveryPolicies(this.testTenant, this.testNamespace,
+ DelayedDeliveryPolicies.builder()
+ .active(isActive)
+ .tickTime(tickTime)
+ .maxDeliveryDelayInMillis(maxDeliveryDelayInMillis)
+ .build());
+
+ // 2. query delayed delivery policy & check
+ DelayedDeliveryPolicies policy =
+ (DelayedDeliveryPolicies) asyncRequests(response -> namespaces.getDelayedDeliveryPolicies(response,
+ this.testTenant, this.testNamespace));
+ assertEquals(policy.isActive(), isActive);
+ assertEquals(policy.getTickTime(), tickTime);
+ assertEquals(policy.getMaxDeliveryDelayInMillis(), maxDeliveryDelayInMillis);
+
+ // 3. remove & check
+ namespaces.removeDelayedDeliveryPolicies(this.testTenant, this.testNamespace);
+ policy =
+ (DelayedDeliveryPolicies) asyncRequests(response -> namespaces.getDelayedDeliveryPolicies(response,
+ this.testTenant, this.testNamespace));
+ assertTrue(Objects.isNull(policy));
+
+ // 4. invalid namespace check
+ String invalidNamespace = this.testNamespace + "/";
+ try {
+ namespaces.setDelayedDeliveryPolicies(this.testTenant, invalidNamespace,
+ DelayedDeliveryPolicies.builder().build());
+ fail("should have failed");
+ } catch (RestException e) {
+ assertEquals(e.getResponse().getStatus(), Response.Status.PRECONDITION_FAILED.getStatusCode());
+ }
+ }
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index 023b77a3dc0..9f56acfb57f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -74,6 +74,7 @@ import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.HierarchyTopicPolicies;
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
@@ -3163,6 +3164,31 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest {
});
}
+ @Test
+ public void testDelayedDeliveryPolicy() throws Exception {
+ final String topic = testTopic + UUID.randomUUID();
+ admin.topics().createNonPartitionedTopic(topic);
+
+ boolean isActive = true;
+ long tickTime = 1000;
+ long maxDeliveryDelayInMillis = 5000;
+ DelayedDeliveryPolicies policy = DelayedDeliveryPolicies.builder()
+ .active(isActive)
+ .tickTime(tickTime)
+ .maxDeliveryDelayInMillis(maxDeliveryDelayInMillis)
+ .build();
+
+ admin.topicPolicies().setDelayedDeliveryPolicy(topic, policy);
+ Awaitility.await()
+ .untilAsserted(() -> Assert.assertEquals(admin.topicPolicies().getDelayedDeliveryPolicy(topic), policy));
+
+ admin.topicPolicies().removeDelayedDeliveryPolicy(topic);
+ Awaitility.await()
+ .untilAsserted(() -> Assert.assertNull(admin.topicPolicies().getDelayedDeliveryPolicy(topic)));
+
+ admin.topics().delete(topic, true);
+ }
+
@Test
public void testUpdateRetentionWithPartialFailure() throws Exception {
String tpName = BrokerTestUtil.newUniqueName("persistent://" + myNamespace + "/tp");
@@ -3207,5 +3233,4 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest {
admin.namespaces().removeRetention(myNamespace);
admin.topics().delete(tpName, false);
}
-
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
index 086d434b81d..ae7edde4496 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
@@ -23,6 +23,7 @@ import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
@@ -337,6 +338,7 @@ public class DelayedDeliveryTest extends ProducerConsumerBase {
DelayedDeliveryPolicies delayedDeliveryPolicies = DelayedDeliveryPolicies.builder()
.tickTime(2000)
.active(false)
+ .maxDeliveryDelayInMillis(5000)
.build();
admin.topics().setDelayedDeliveryPolicy(topicName, delayedDeliveryPolicies);
//wait for update
@@ -349,6 +351,7 @@ public class DelayedDeliveryTest extends ProducerConsumerBase {
assertFalse(admin.topics().getDelayedDeliveryPolicy(topicName).isActive());
assertEquals(2000, admin.topics().getDelayedDeliveryPolicy(topicName).getTickTime());
+ assertEquals(5000, admin.topics().getDelayedDeliveryPolicy(topicName).getMaxDeliveryDelayInMillis());
admin.topics().removeDelayedDeliveryPolicy(topicName);
//wait for update
@@ -622,4 +625,42 @@ public class DelayedDeliveryTest extends ProducerConsumerBase {
}
}
+ @Test
+ public void testDelayedDeliveryExceedsMaxDelay() throws Exception {
+ long maxDeliveryDelayInMillis = 5000;
+ String topic = BrokerTestUtil.newUniqueName("testDelayedDeliveryExceedsMaxDelay");
+
+ @Cleanup
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .topic(topic)
+ .create();
+
+ admin.topicPolicies().setDelayedDeliveryPolicy(topic,
+ DelayedDeliveryPolicies.builder()
+ .active(true)
+ .tickTime(100L)
+ .maxDeliveryDelayInMillis(maxDeliveryDelayInMillis)
+ .build());
+
+ //wait for update
+ for (int i = 0; i < 50; i++) {
+ Thread.sleep(100);
+ if (admin.topics().getDelayedDeliveryPolicy(topic) != null) {
+ break;
+ }
+ }
+
+ try {
+ producer.newMessage()
+ .value("msg")
+ .deliverAfter(6, TimeUnit.SECONDS)
+ .send();
+
+ producer.flush();
+ fail("Should have thrown NotAllowedException due to exceeding maxDeliveryDelayInMillis");
+ } catch (PulsarClientException.NotAllowedException ex) {
+ assertEquals(ex.getMessage(), "Exceeds max allowed delivery delay of "
+ + maxDeliveryDelayInMillis + " milliseconds");
+ }
+ }
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 81ed4311374..a0a28262faa 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -133,6 +133,7 @@ import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TopicPolicies;
@@ -1909,4 +1910,40 @@ public class TransactionTest extends TransactionTestBase {
Assert.assertEquals(result, List.of("V4", "V5", "V6"));
}
+ @Test
+ public void testDelayedDeliveryExceedsMaxDelay() throws Exception {
+ final long maxDeliveryDelayInMillis = 5000;
+ final String namespace = "tnx/ns-prechecks";
+ final String topic = "persistent://" + namespace + "/test_transaction_topic" + UUID.randomUUID();
+ admin.namespaces().createNamespace(namespace);
+ admin.topics().createNonPartitionedTopic(topic);
+ admin.topicPolicies().setDelayedDeliveryPolicy(topic,
+ DelayedDeliveryPolicies.builder()
+ .active(true)
+ .tickTime(100L)
+ .maxDeliveryDelayInMillis(maxDeliveryDelayInMillis)
+ .build());
+
+ @Cleanup
+ Producer<byte[]> producer = this.pulsarClient.newProducer()
+ .topic(topic)
+ .sendTimeout(5, TimeUnit.SECONDS)
+ .addEncryptionKey("my-app-key")
+ .defaultCryptoKeyReader("file:./src/test/resources/certificate/public-key.client-rsa.pem")
+ .create();
+
+ try {
+ Transaction txn = pulsarClient.newTransaction()
+ .withTransactionTimeout(5, TimeUnit.SECONDS).build().get();
+ producer.newMessage(txn)
+ .value(UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8))
+ .deliverAfter(6, TimeUnit.SECONDS)
+ .send();
+ txn.commit();
+ fail("Should have thrown NotAllowedException due to exceeding maxDeliveryDelayInMillis");
+ } catch (PulsarClientException.NotAllowedException ex) {
+ assertEquals(ex.getMessage(), "Exceeds max allowed delivery delay of "
+ + maxDeliveryDelayInMillis + " milliseconds");
+ }
+ }
}
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/DelayedDeliveryPolicies.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/DelayedDeliveryPolicies.java
index 555896ab3e5..f940ecd1b86 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/DelayedDeliveryPolicies.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/DelayedDeliveryPolicies.java
@@ -26,10 +26,12 @@ import org.apache.pulsar.common.policies.data.impl.DelayedDeliveryPoliciesImpl;
public interface DelayedDeliveryPolicies {
long getTickTime();
boolean isActive();
+ long getMaxDeliveryDelayInMillis();
interface Builder {
Builder tickTime(long tickTime);
Builder active(boolean active);
+ Builder maxDeliveryDelayInMillis(long maxDeliveryDelayInMillis);
DelayedDeliveryPolicies build();
}
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/impl/DelayedDeliveryPoliciesImpl.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/impl/DelayedDeliveryPoliciesImpl.java
index 408217f3637..580ac6c95fa 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/impl/DelayedDeliveryPoliciesImpl.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/impl/DelayedDeliveryPoliciesImpl.java
@@ -32,6 +32,7 @@ import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
public final class DelayedDeliveryPoliciesImpl implements DelayedDeliveryPolicies {
private long tickTime;
private boolean active;
+ private long maxDeliveryDelayInMillis;
public static DelayedDeliveryPoliciesImplBuilder builder() {
return new DelayedDeliveryPoliciesImplBuilder();
@@ -40,6 +41,7 @@ public final class DelayedDeliveryPoliciesImpl implements DelayedDeliveryPolicie
public static class DelayedDeliveryPoliciesImplBuilder implements DelayedDeliveryPolicies.Builder {
private long tickTime;
private boolean active;
+ private long maxDeliveryDelayInMillis;
public DelayedDeliveryPoliciesImplBuilder tickTime(long tickTime) {
this.tickTime = tickTime;
@@ -51,8 +53,13 @@ public final class DelayedDeliveryPoliciesImpl implements DelayedDeliveryPolicie
return this;
}
+ public DelayedDeliveryPoliciesImplBuilder maxDeliveryDelayInMillis(long maxDeliveryDelayInMillis) {
+ this.maxDeliveryDelayInMillis = maxDeliveryDelayInMillis;
+ return this;
+ }
+
public DelayedDeliveryPoliciesImpl build() {
- return new DelayedDeliveryPoliciesImpl(tickTime, active);
+ return new DelayedDeliveryPoliciesImpl(tickTime, active, maxDeliveryDelayInMillis);
}
}
}
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 0ab634680cd..2d567a7528d 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -680,9 +680,9 @@ public class PulsarAdminToolTest {
namespaces.run(split("remove-retention myprop/clust/ns1"));
verify(mockNamespaces).removeRetention("myprop/clust/ns1");
- namespaces.run(split("set-delayed-delivery myprop/clust/ns1 -e -t 1s"));
+ namespaces.run(split("set-delayed-delivery myprop/clust/ns1 -e -t 1s -md 5s"));
verify(mockNamespaces).setDelayedDeliveryMessages("myprop/clust/ns1",
- DelayedDeliveryPolicies.builder().tickTime(1000).active(true).build());
+ DelayedDeliveryPolicies.builder().tickTime(1000).active(true).maxDeliveryDelayInMillis(5000).build());
namespaces.run(split("get-delayed-delivery myprop/clust/ns1"));
verify(mockNamespaces).getDelayedDelivery("myprop/clust/ns1");
@@ -1210,9 +1210,9 @@ public class PulsarAdminToolTest {
cmdTopics.run(split("get-delayed-delivery persistent://myprop/clust/ns1/ds1"));
verify(mockTopicsPolicies).getDelayedDeliveryPolicy("persistent://myprop/clust/ns1/ds1", false);
- cmdTopics.run(split("set-delayed-delivery persistent://myprop/clust/ns1/ds1 -t 10s --enable"));
+ cmdTopics.run(split("set-delayed-delivery persistent://myprop/clust/ns1/ds1 -t 10s --enable --maxDelay 5s"));
verify(mockTopicsPolicies).setDelayedDeliveryPolicy("persistent://myprop/clust/ns1/ds1",
- DelayedDeliveryPolicies.builder().tickTime(10000).active(true).build());
+ DelayedDeliveryPolicies.builder().tickTime(10000).active(true).maxDeliveryDelayInMillis(5000).build());
cmdTopics.run(split("remove-delayed-delivery persistent://myprop/clust/ns1/ds1"));
verify(mockTopicsPolicies).removeDelayedDeliveryPolicy("persistent://myprop/clust/ns1/ds1") ;
@@ -1377,9 +1377,9 @@ public class PulsarAdminToolTest {
cmdTopics.run(split("get-delayed-delivery persistent://myprop/clust/ns1/ds1 -g"));
verify(mockGlobalTopicsPolicies).getDelayedDeliveryPolicy("persistent://myprop/clust/ns1/ds1", false);
- cmdTopics.run(split("set-delayed-delivery persistent://myprop/clust/ns1/ds1 -t 10s --enable -g"));
+ cmdTopics.run(split("set-delayed-delivery persistent://myprop/clust/ns1/ds1 -t 10s --enable -md 5s -g"));
verify(mockGlobalTopicsPolicies).setDelayedDeliveryPolicy("persistent://myprop/clust/ns1/ds1",
- DelayedDeliveryPolicies.builder().tickTime(10000).active(true).build());
+ DelayedDeliveryPolicies.builder().tickTime(10000).active(true).maxDeliveryDelayInMillis(5000).build());
cmdTopics.run(split("remove-delayed-delivery persistent://myprop/clust/ns1/ds1 -g"));
verify(mockGlobalTopicsPolicies).removeDelayedDeliveryPolicy("persistent://myprop/clust/ns1/ds1") ;
@@ -1806,9 +1806,9 @@ public class PulsarAdminToolTest {
cmdTopics.run(split("get-delayed-delivery persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).getDelayedDeliveryPolicy("persistent://myprop/clust/ns1/ds1", false);
- cmdTopics.run(split("set-delayed-delivery persistent://myprop/clust/ns1/ds1 -t 10s --enable"));
+ cmdTopics.run(split("set-delayed-delivery persistent://myprop/clust/ns1/ds1 -t 10s -md 5s --enable"));
verify(mockTopics).setDelayedDeliveryPolicy("persistent://myprop/clust/ns1/ds1",
- DelayedDeliveryPolicies.builder().tickTime(10000).active(true).build());
+ DelayedDeliveryPolicies.builder().tickTime(10000).active(true).maxDeliveryDelayInMillis(5000).build());
cmdTopics.run(split("remove-delayed-delivery persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).removeDelayedDeliveryPolicy("persistent://myprop/clust/ns1/ds1") ;
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index 4da7bd83154..4394fc00027 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -1602,6 +1602,11 @@ public class CmdNamespaces extends CmdBase {
converter = TimeUnitToMillisConverter.class)
private Long delayedDeliveryTimeInMills = 1000L;
+ @Parameter(names = { "--maxDelay", "-md" },
+ description = "The max allowed delay for delayed delivery. (eg: 1s, 10s, 1m, 5h, 3d)",
+ converter = TimeUnitToMillisConverter.class)
+ private Long delayedDeliveryMaxDelayInMillis = 0L;
+
@Override
void run() throws PulsarAdminException {
String namespace = validateNamespace(params);
@@ -1612,6 +1617,7 @@ public class CmdNamespaces extends CmdBase {
getAdmin().namespaces().setDelayedDeliveryMessages(namespace, DelayedDeliveryPolicies.builder()
.tickTime(delayedDeliveryTimeInMills)
.active(enable)
+ .maxDeliveryDelayInMillis(delayedDeliveryMaxDelayInMillis)
.build());
}
}
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
index 421ccec1403..c27cbd06849 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
@@ -723,6 +723,11 @@ public class CmdTopicPolicies extends CmdBase {
+ "If set to true, the policy will be replicate to other clusters asynchronously")
private boolean isGlobal;
+ @Parameter(names = { "--maxDelay", "-md" },
+ description = "The max allowed delay for delayed delivery. (eg: 1s, 10s, 1m, 5h, 3d)",
+ converter = TimeUnitToMillisConverter.class)
+ private Long delayedDeliveryMaxDelayInMillis = 0L;
+
@Override
void run() throws PulsarAdminException {
String topicName = validateTopicName(params);
@@ -733,6 +738,7 @@ public class CmdTopicPolicies extends CmdBase {
getTopicPolicies(isGlobal).setDelayedDeliveryPolicy(topicName, DelayedDeliveryPolicies.builder()
.tickTime(delayedDeliveryTimeInMills)
.active(enable)
+ .maxDeliveryDelayInMillis(delayedDeliveryMaxDelayInMillis)
.build());
}
}
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index 508642e63ae..3e2d9d1c13c 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -1813,6 +1813,11 @@ public class CmdTopics extends CmdBase {
converter = TimeUnitToMillisConverter.class)
private Long delayedDeliveryTimeInMills = 1_000L;
+ @Parameter(names = { "--maxDelay", "-md" },
+ description = "The max allowed delay for delayed delivery. (eg: 1s, 10s, 1m, 5h, 3d)",
+ converter = TimeUnitToMillisConverter.class)
+ private Long delayedDeliveryMaxDelayInMillis = 0L;
+
@Override
void run() throws PulsarAdminException {
String topicName = validateTopicName(params);
@@ -1823,6 +1828,7 @@ public class CmdTopics extends CmdBase {
getTopics().setDelayedDeliveryPolicy(topicName, DelayedDeliveryPolicies.builder()
.tickTime(delayedDeliveryTimeInMills)
.active(enable)
+ .maxDeliveryDelayInMillis(delayedDeliveryMaxDelayInMillis)
.build());
}
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java
index 7f841ec8975..4edb033498b 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java
@@ -51,6 +51,7 @@ public class HierarchyTopicPolicies {
final PolicyHierarchyValue<Boolean> delayedDeliveryEnabled;
final PolicyHierarchyValue<Boolean> dispatcherPauseOnAckStatePersistentEnabled;
final PolicyHierarchyValue<Long> delayedDeliveryTickTimeMillis;
+ final PolicyHierarchyValue<Long> delayedDeliveryMaxDelayInMillis;
final PolicyHierarchyValue<DispatchRateImpl> replicatorDispatchRate;
final PolicyHierarchyValue<Integer> maxConsumersPerSubscription;
final PolicyHierarchyValue<SubscribeRate> subscribeRate;
@@ -84,6 +85,7 @@ public class HierarchyTopicPolicies {
delayedDeliveryEnabled = new PolicyHierarchyValue<>();
dispatcherPauseOnAckStatePersistentEnabled = new PolicyHierarchyValue<>();
delayedDeliveryTickTimeMillis = new PolicyHierarchyValue<>();
+ delayedDeliveryMaxDelayInMillis = new PolicyHierarchyValue<>();
replicatorDispatchRate = new PolicyHierarchyValue<>();
compactionThreshold = new PolicyHierarchyValue<>();
subscribeRate = new PolicyHierarchyValue<>();
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
index eede0ab794d..5403b84a4f7 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
@@ -62,6 +62,7 @@ public class TopicPolicies {
private Integer maxUnackedMessagesOnSubscription;
private Long delayedDeliveryTickTimeMillis;
private Boolean delayedDeliveryEnabled;
+ private Long delayedDeliveryMaxDelayInMillis;
private Boolean dispatcherPauseOnAckStatePersistentEnabled;
private OffloadPoliciesImpl offloadPolicies;
private InactiveTopicPolicies inactiveTopicPolicies;