You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2020/08/12 00:41:29 UTC
[pulsar] branch master updated: support topic level delayed
delivery policy (#7784)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e417d77 support topic level delayed delivery policy (#7784)
e417d77 is described below
commit e417d77a5035e93993ca2e1912282eddf315a6ca
Author: feynmanlin <fe...@tencent.com>
AuthorDate: Wed Aug 12 08:41:10 2020 +0800
support topic level delayed delivery policy (#7784)
Master Issue: #2688
### Motivation
support topic level delayed delivery policy
### Modifications
Support set/get/remove delayed delivery policy on topic level.
### Verifying this change
Added Unit test to verify set/get/remove delayed delivery policy at Topic level work as expected when Topic level policy is enabled/disabled
- org.apache.pulsar.broker.admin.AdminApiDelayedDelivery#testEnableAndDisableTopicDelayedDelivery
- org.apache.pulsar.broker.admin.AdminApiDelayedDelivery#testEnableTopicDelayedDelivery
---
.../broker/admin/impl/PersistentTopicsBase.java | 26 ++++
.../pulsar/broker/admin/v2/PersistentTopics.java | 57 +++++++++
.../PersistentDispatcherMultipleConsumers.java | 9 +-
.../broker/service/persistent/PersistentTopic.java | 24 +++-
.../service/persistent/DelayedDeliveryTest.java | 135 +++++++++++++++++++++
.../org/apache/pulsar/client/admin/Topics.java | 48 ++++++++
.../pulsar/client/admin/internal/TopicsImpl.java | 80 ++++++++++++
.../org/apache/pulsar/admin/cli/CmdTopics.java | 56 +++++++++
.../pulsar/common/policies/data/TopicPolicies.java | 9 ++
9 files changed, 439 insertions(+), 5 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 520a181..612c7a3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -97,6 +97,7 @@ import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.naming.PartitionedManagedLedgerInfo;
import org.apache.pulsar.common.policies.data.PolicyName;
import org.apache.pulsar.common.policies.data.PolicyOperation;
+import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue;
@@ -579,6 +580,31 @@ public class PersistentTopicsBase extends AdminResource {
});
}
+ protected void internalSetDelayedDeliveryPolicies(AsyncResponse asyncResponse, DelayedDeliveryPolicies deliveryPolicies) {
+ TopicPolicies topicPolicies = null;
+ try {
+ topicPolicies = pulsar().getTopicPoliciesService().getTopicPolicies(topicName);
+ } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
+ log.error("Topic {} policies cache have not init.", topicName);
+ asyncResponse.resume(new RestException(e));
+ return;
+ }
+ if (topicPolicies == null) {
+ topicPolicies = new TopicPolicies();
+ }
+ topicPolicies.setDelayedDeliveryEnabled(deliveryPolicies == null ? null : deliveryPolicies.isActive());
+ topicPolicies.setDelayedDeliveryTickTimeMillis(deliveryPolicies == null ? null : deliveryPolicies.getTickTime());
+ pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies)
+ .whenComplete((result, ex) -> {
+ if (ex != null) {
+ log.error("Failed set delayed delivery policy for topic", ex);
+ asyncResponse.resume(new RestException(ex));
+ } else {
+ asyncResponse.resume(Response.noContent().build());
+ }
+ });
+ }
+
private CompletableFuture<Void> updatePartitionInOtherCluster(int numPartitions, Set<String> clusters) {
List<CompletableFuture<Void>> results = new ArrayList<>(clusters.size() -1);
clusters.forEach(cluster -> {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 3b5b07f..0d48c5a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -45,11 +45,13 @@ import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.admin.OffloadProcessStatus;
+import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.BacklogQuota;
+import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
@@ -248,6 +250,61 @@ public class PersistentTopics extends PersistentTopicsBase {
internalCreateNonPartitionedTopic(authoritative);
}
+ @GET
+ @Path("/{tenant}/{namespace}/{topic}/delayedDelivery")
+ @ApiOperation(value = "Get delayed delivery messages config on a topic.")
+ @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),
+ @ApiResponse(code = 500, message = "Internal server error"),})
+ public void getDelayedDeliveryPolicies(@Suspended final AsyncResponse asyncResponse,
+ @PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace,
+ @PathParam("topic") @Encoded String encodedTopic) {
+ validateTopicName(tenant, namespace, encodedTopic);
+ TopicPolicies topicPolicies = getTopicPolicies(topicName).orElse(new TopicPolicies());
+ if (topicPolicies.isDelayedDeliveryEnabledSet() && topicPolicies.isDelayedDeliveryTickTimeMillisSet()) {
+ asyncResponse.resume(new DelayedDeliveryPolicies(topicPolicies.getDelayedDeliveryTickTimeMillis()
+ , topicPolicies.getDelayedDeliveryEnabled()));
+ } else {
+ asyncResponse.resume(Response.noContent().build());
+ }
+ }
+
+ @POST
+ @Path("/{tenant}/{namespace}/{topic}/delayedDelivery")
+ @ApiOperation(value = "Set delayed delivery messages config on a topic.")
+ @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),})
+ public void setDelayedDeliveryPolicies(@Suspended final AsyncResponse asyncResponse,
+ @PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace,
+ @PathParam("topic") @Encoded String encodedTopic,
+ @ApiParam(value = "Delayed delivery policies for the specified topic") DelayedDeliveryPolicies deliveryPolicies) {
+ validateTopicName(tenant, namespace, encodedTopic);
+ validateAdminAccessForTenant(tenant);
+ validatePoliciesReadOnlyAccess();
+ checkTopicLevelPolicyEnable();
+ if (topicName.isGlobal()) {
+ validateGlobalNamespaceOwnership(namespaceName);
+ }
+ internalSetDelayedDeliveryPolicies(asyncResponse, deliveryPolicies);
+ }
+
+
+
+ @DELETE
+ @Path("/{tenant}/{namespace}/{topic}/delayedDelivery")
+ @ApiOperation(value = "Set delayed delivery messages config on a topic.")
+ @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),})
+ public void deleteDelayedDeliveryPolicies(@Suspended final AsyncResponse asyncResponse,
+ @PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace,
+ @PathParam("topic") @Encoded String encodedTopic) {
+ validateTopicName(tenant, namespace, encodedTopic);
+ setDelayedDeliveryPolicies(asyncResponse, tenant, namespace, encodedTopic, null);
+ }
+
/**
* It updates number of partitions of an existing non-global partitioned topic. It requires partitioned-topic to be
* already exist and number of new partitions must be greater than existing number of partitions. Decrementing
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index b8607f5..67c2df9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -336,7 +336,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
}
havePendingReplayRead = true;
- Set<? extends Position> deletedMessages = topic.delayedDeliveryEnabled ?
+ Set<? extends Position> deletedMessages = topic.isDelayedDeliveryEnabled() ?
asyncReplayEntriesInOrder(messagesToReplayNow) : asyncReplayEntries(messagesToReplayNow);
// clear already acked positions from replay bucket
@@ -771,7 +771,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
@Override
public boolean trackDelayedDelivery(long ledgerId, long entryId, MessageMetadata msgMetadata) {
- if (!topic.delayedDeliveryEnabled) {
+ if (!topic.isDelayedDeliveryEnabled()) {
// If broker has the feature disabled, always deliver messages immediately
return false;
}
@@ -783,7 +783,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
.of(topic.getBrokerService().getDelayedDeliveryTrackerFactory().newTracker(this));
}
- delayedDeliveryTracker.get().resetTickTime(topic.delayedDeliveryTickTimeMillis);
+ delayedDeliveryTracker.get().resetTickTime(topic.getDelayedDeliveryTickTimeMillis());
return delayedDeliveryTracker.get().addMessage(ledgerId, entryId, msgMetadata.getDeliverAtTime());
}
}
@@ -793,13 +793,14 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
return messagesToRedeliver.items(maxMessagesToRead,
(ledgerId, entryId) -> new PositionImpl(ledgerId, entryId));
} else if (delayedDeliveryTracker.isPresent() && delayedDeliveryTracker.get().hasMessageAvailable()) {
- delayedDeliveryTracker.get().resetTickTime(topic.delayedDeliveryTickTimeMillis);
+ delayedDeliveryTracker.get().resetTickTime(topic.getDelayedDeliveryTickTimeMillis());
return delayedDeliveryTracker.get().getScheduledMessages(maxMessagesToRead);
} else {
return Collections.emptySet();
}
}
+ @Override
public synchronized long getNumberOfDelayedMessages() {
return delayedDeliveryTracker.map(DelayedDeliveryTracker::getNumberOfDelayedMessages).orElse(0L);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 804e5f5..5af4b30 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -2129,8 +2129,12 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
* @return TopicPolicies is exist else return null.
*/
private TopicPolicies getTopicPolicies(TopicName topicName) {
+ TopicName cloneTopicName = topicName;
+ if (topicName.isPartitioned()) {
+ cloneTopicName = TopicName.get(topicName.getPartitionedTopicName());
+ }
try {
- return brokerService.pulsar().getTopicPoliciesService().getTopicPolicies(topicName);
+ return brokerService.pulsar().getTopicPoliciesService().getTopicPolicies(cloneTopicName);
} catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
log.warn("Topic {} policies cache have not init.", topicName.getPartitionedTopicName());
return null;
@@ -2303,4 +2307,22 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
});
return completableFuture;
}
+
+ public long getDelayedDeliveryTickTimeMillis() {
+ TopicPolicies topicPolicies = getTopicPolicies(TopicName.get(topic));
+ //Topic level setting has higher priority than namespace level
+ if (topicPolicies != null && topicPolicies.isDelayedDeliveryTickTimeMillisSet()) {
+ return topicPolicies.getDelayedDeliveryTickTimeMillis();
+ }
+ return delayedDeliveryTickTimeMillis;
+ }
+
+ public boolean isDelayedDeliveryEnabled() {
+ TopicPolicies topicPolicies = getTopicPolicies(TopicName.get(topic));
+ //Topic level setting has higher priority than namespace level
+ if (topicPolicies != null && topicPolicies.isDelayedDeliveryEnabledSet()) {
+ return topicPolicies.getDelayedDeliveryEnabled();
+ }
+ return delayedDeliveryEnabled;
+ }
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
index 82f6241..5f6d2c4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
@@ -19,15 +19,21 @@
package org.apache.pulsar.broker.service.persistent;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
+import java.util.UUID;
import java.util.concurrent.TimeUnit;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
import lombok.Cleanup;
import org.apache.pulsar.client.api.Consumer;
@@ -37,6 +43,8 @@ import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
+import org.apache.pulsar.common.policies.data.TenantInfo;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -46,6 +54,9 @@ public class DelayedDeliveryTest extends ProducerConsumerBase {
@Override
@BeforeClass
public void setup() throws Exception {
+ conf.setSystemTopicEnabled(true);
+ conf.setTopicLevelPoliciesEnabled(true);
+ conf.setDelayedDeliveryTickTimeMillis(1024);
super.internalSetup();
super.producerBaseSetup();
}
@@ -315,4 +326,128 @@ public class DelayedDeliveryTest extends ProducerConsumerBase {
}
}
}
+
+ @Test(timeOut = 20000)
+ public void testEnableAndDisableTopicDelayedDelivery() throws Exception {
+ String topicName = "persistent://public/default/topic-" + UUID.randomUUID().toString();
+
+ admin.topics().createPartitionedTopic(topicName, 3);
+ assertNull(admin.topics().getDelayedDeliveryPolicy(topicName));
+ DelayedDeliveryPolicies delayedDeliveryPolicies = new DelayedDeliveryPolicies(2000, false);
+ admin.topics().setDelayedDeliveryPolicy(topicName, delayedDeliveryPolicies);
+ //wait for update
+ for (int i = 0; i < 50; i++) {
+ Thread.sleep(100);
+ if (admin.topics().getDelayedDeliveryPolicy(topicName) != null) {
+ break;
+ }
+ }
+
+ assertFalse(admin.topics().getDelayedDeliveryPolicy(topicName).isActive());
+ assertEquals(2000, admin.topics().getDelayedDeliveryPolicy(topicName).getTickTime());
+
+ admin.topics().removeDelayedDeliveryPolicy(topicName);
+ //wait for update
+ for (int i = 0; i < 50; i++) {
+ Thread.sleep(100);
+ if (admin.topics().getDelayedDeliveryPolicy(topicName) == null) {
+ break;
+ }
+ }
+ assertNull(admin.topics().getDelayedDeliveryPolicy(topicName));
+ }
+
+ @Test(timeOut = 20000)
+ public void testEnableTopicDelayedDelivery() throws Exception {
+ final String topicName = "persistent://public/default/test" + UUID.randomUUID().toString();
+
+ admin.topics().createPartitionedTopic(topicName, 3);
+ assertNull(admin.topics().getDelayedDeliveryPolicy(topicName));
+ //1 Set topic policy
+ DelayedDeliveryPolicies delayedDeliveryPolicies = new DelayedDeliveryPolicies(2000, true);
+ admin.topics().setDelayedDeliveryPolicy(topicName, delayedDeliveryPolicies);
+ //wait for update
+ for (int i = 0; i < 50; i++) {
+ Thread.sleep(100);
+ if (admin.topics().getDelayedDeliveryPolicy(topicName) != null) {
+ break;
+ }
+ }
+ //2 Setup consumer and producer
+ @Cleanup
+ Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+ .topic(topicName)
+ .subscriptionName("test-sub" + System.currentTimeMillis())
+ .subscriptionType(SubscriptionType.Shared)
+ .subscribe();
+
+ @Cleanup
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .topic(topicName).create();
+ //3 Send delay message
+ for (int i = 0; i < 10; i++) {
+ producer.newMessage()
+ .value("delayed-msg-" + i)
+ .deliverAfter(5, TimeUnit.SECONDS)
+ .sendAsync();
+ }
+ producer.flush();
+
+ //4 There will be no message in the first 3 seconds
+ assertNull(consumer.receive(3, TimeUnit.SECONDS));
+
+ Set<String> delayedMessages = new HashSet<>();
+ for (int i = 0; i < 10; i++) {
+ Message<String> msg = consumer.receive(4, TimeUnit.SECONDS);
+ delayedMessages.add(msg.getValue());
+ consumer.acknowledge(msg);
+ }
+ for (int i = 0; i < 10; i++) {
+ assertTrue(delayedMessages.contains("delayed-msg-" + i));
+ }
+ //5 Disable delayed delivery
+ delayedDeliveryPolicies.setActive(false);
+ admin.topics().setDelayedDeliveryPolicy(topicName, delayedDeliveryPolicies);
+ //wait for update
+ for (int i = 0; i < 50; i++) {
+ Thread.sleep(100);
+ if (!admin.topics().getDelayedDeliveryPolicy(topicName).isActive()) {
+ break;
+ }
+ }
+ producer.newMessage().value("disabled-msg").deliverAfter(5, TimeUnit.SECONDS).send();
+ //6 Delay deliver is disabled, so we can receive message immediately
+ Message<String> msg = consumer.receive(1, TimeUnit.SECONDS);
+ assertNotNull(msg);
+ consumer.acknowledge(msg);
+ //7 Set a very long tick time, so that trackDelayedDelivery will fail. we can receive msg immediately.
+ delayedDeliveryPolicies.setActive(true);
+ delayedDeliveryPolicies.setTickTime(Integer.MAX_VALUE);
+ admin.topics().setDelayedDeliveryPolicy(topicName, delayedDeliveryPolicies);
+ //wait for update
+ for (int i = 0; i < 50; i++) {
+ Thread.sleep(100);
+ if (admin.topics().getDelayedDeliveryPolicy(topicName).isActive()) {
+ break;
+ }
+ }
+ producer.newMessage().value("long-tick-msg").deliverAfter(5, TimeUnit.SECONDS).send();
+ msg = consumer.receive(1, TimeUnit.SECONDS);
+ assertNotNull(msg);
+ consumer.acknowledge(msg);
+ //8 remove topic policy, it will use namespace level policy
+ admin.topics().removeDelayedDeliveryPolicy(topicName);
+ //wait for update
+ for (int i = 0; i < 50; i++) {
+ Thread.sleep(100);
+ if (admin.topics().getDelayedDeliveryPolicy(topicName) == null) {
+ break;
+ }
+ }
+ producer.newMessage().value("long-tick-msg").deliverAfter(2, TimeUnit.SECONDS).send();
+ msg = consumer.receive(1, TimeUnit.SECONDS);
+ assertNull(msg);
+ msg = consumer.receive(3, TimeUnit.SECONDS);
+ assertNotNull(msg);
+ }
}
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
index 72098ee..a40dcc1 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -35,6 +35,7 @@ import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.BacklogQuota;
+import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
import org.apache.pulsar.common.policies.data.PartitionedTopicInternalStats;
import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
@@ -1436,6 +1437,53 @@ public interface Topics {
void removeBacklogQuota(String topic) throws PulsarAdminException;
/**
+ * Get the delayed delivery policy for a specified topic.
+ * @param topic
+ * @return
+ * @throws PulsarAdminException
+ */
+ DelayedDeliveryPolicies getDelayedDeliveryPolicy(String topic) throws PulsarAdminException;
+
+ /**
+ * Get the delayed delivery policy for a specified topic asynchronously.
+ * @param topic
+ * @return
+ */
+ CompletableFuture<DelayedDeliveryPolicies> getDelayedDeliveryPolicyAsync(String topic);
+
+ /**
+ * Set the delayed delivery policy for a specified topic.
+ * @param topic
+ * @param delayedDeliveryPolicies
+ * @throws PulsarAdminException
+ */
+ void setDelayedDeliveryPolicy(String topic
+ , DelayedDeliveryPolicies delayedDeliveryPolicies) throws PulsarAdminException;
+
+ /**
+ * Set the delayed delivery policy for a specified topic asynchronously.
+ * @param topic
+ * @param delayedDeliveryPolicies
+ * @return
+ */
+ CompletableFuture<Void> setDelayedDeliveryPolicyAsync(String topic
+ , DelayedDeliveryPolicies delayedDeliveryPolicies);
+
+ /**
+ * Remove the delayed delivery policy for a specified topic asynchronously.
+ * @param topic
+ * @return
+ */
+ CompletableFuture<Void> removeDelayedDeliveryPolicyAsync(String topic);
+
+ /**
+ * Remove the delayed delivery policy for a specified topic.
+ * @param topic
+ * @throws PulsarAdminException
+ */
+ void removeDelayedDeliveryPolicy(String topic) throws PulsarAdminException;
+
+ /**
* Set message TTL for a topic.
*
* @param topic
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index 4df515a..6b04e60 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -72,6 +72,7 @@ import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
+import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
import org.apache.pulsar.common.policies.data.ErrorData;
import org.apache.pulsar.common.policies.data.PartitionedTopicInternalStats;
import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
@@ -1431,6 +1432,85 @@ public class TopicsImpl extends BaseResource implements Topics {
}
@Override
+ public DelayedDeliveryPolicies getDelayedDeliveryPolicy(String topic) throws PulsarAdminException {
+ try {
+ return getDelayedDeliveryPolicyAsync(topic).
+ get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<DelayedDeliveryPolicies> getDelayedDeliveryPolicyAsync(String topic) {
+ TopicName topicName = validateTopic(topic);
+ WebTarget path = topicPath(topicName, "delayedDelivery");
+ final CompletableFuture<DelayedDeliveryPolicies> future = new CompletableFuture<>();
+ asyncGetRequest(path, new InvocationCallback<DelayedDeliveryPolicies>() {
+ @Override
+ public void completed(DelayedDeliveryPolicies delayedDeliveryPolicies) {
+ future.complete(delayedDeliveryPolicies);
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+ future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
+
+ @Override
+ public CompletableFuture<Void> removeDelayedDeliveryPolicyAsync(String topic) {
+ TopicName topicName = validateTopic(topic);
+ WebTarget path = topicPath(topicName, "delayedDelivery");
+ return asyncDeleteRequest(path);
+ }
+
+ @Override
+ public void removeDelayedDeliveryPolicy(String topic) throws PulsarAdminException {
+ try {
+ removeDelayedDeliveryPolicyAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> setDelayedDeliveryPolicyAsync(String topic
+ , DelayedDeliveryPolicies delayedDeliveryPolicies) {
+ TopicName topicName = validateTopic(topic);
+ WebTarget path = topicPath(topicName, "delayedDelivery");
+ return asyncPostRequest(path, Entity.entity(delayedDeliveryPolicies, MediaType.APPLICATION_JSON));
+ }
+
+ @Override
+ public void setDelayedDeliveryPolicy(String topic
+ , DelayedDeliveryPolicies delayedDeliveryPolicies) throws PulsarAdminException {
+ try {
+ setDelayedDeliveryPolicyAsync(topic, delayedDeliveryPolicies)
+ .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
public void setMessageTTL(String topic, int messageTTLInSecond) throws PulsarAdminException {
try {
TopicName topicName = validateTopic(topic);
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index 658233f..6791e05 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -48,6 +48,7 @@ import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.policies.data.BacklogQuota;
+import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.util.RelativeTimeUtil;
@@ -111,6 +112,9 @@ public class CmdTopics extends CmdBase {
jcommander.addCommand("get-retention", new GetRetention());
jcommander.addCommand("set-retention", new SetRetention());
jcommander.addCommand("remove-retention", new RemoveRetention());
+ jcommander.addCommand("get-delayed-delivery", new GetDelayedDelivery());
+ jcommander.addCommand("set-delayed-delivery", new SetDelayedDelivery());
+ jcommander.addCommand("remove-delayed-delivery", new RemoveDelayedDelivery());
}
@Parameters(commandDescription = "Get the list of topics under a namespace.")
@@ -888,6 +892,58 @@ public class CmdTopics extends CmdBase {
}
}
+ @Parameters(commandDescription = "Get the delayed delivery policy for a topic")
+ private class GetDelayedDelivery extends CliCommand {
+ @Parameter(description = "tenant/namespace/topic\n", required = true)
+ private java.util.List<String> params;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String topicName = validateTopicName(params);
+ print(admin.topics().getDelayedDeliveryPolicy(topicName));
+ }
+ }
+
+ @Parameters(commandDescription = "Set the delayed delivery policy on a topic")
+ private class SetDelayedDelivery extends CliCommand {
+ @Parameter(description = "tenant/namespace", required = true)
+ private java.util.List<String> params;
+
+ @Parameter(names = { "--enable", "-e" }, description = "Enable delayed delivery messages")
+ private boolean enable = false;
+
+ @Parameter(names = { "--disable", "-d" }, description = "Disable delayed delivery messages")
+ private boolean disable = false;
+
+ @Parameter(names = { "--time", "-t" }, description = "The tick time for when retrying on delayed delivery messages, " +
+ "affecting the accuracy of the delivery time compared to the scheduled time. (eg: 1s, 10s, 1m, 5h, 3d)")
+ private String delayedDeliveryTimeStr = "1s";
+
+ @Override
+ void run() throws PulsarAdminException {
+ String topicName = validateTopicName(params);
+ long delayedDeliveryTimeInMills = TimeUnit.SECONDS.toMillis(RelativeTimeUtil.parseRelativeTimeInSeconds(delayedDeliveryTimeStr));
+
+ if (enable == disable) {
+ throw new ParameterException("Need to specify either --enable or --disable");
+ }
+
+ admin.topics().setDelayedDeliveryPolicy(topicName, new DelayedDeliveryPolicies(delayedDeliveryTimeInMills, enable));
+ }
+ }
+
+ @Parameters(commandDescription = "Remove the delayed delivery policy on a topic")
+ private class RemoveDelayedDelivery extends CliCommand {
+ @Parameter(description = "tenant/namespace", required = true)
+ private java.util.List<String> params;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String topicName = validateTopicName(params);
+ admin.topics().removeDelayedDeliveryPolicy(topicName);
+ }
+ }
+
@Parameters(commandDescription = "Get the message TTL for a topic")
private class GetMessageTTL extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
index f9fcaf8..796ff4a 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
@@ -45,6 +45,15 @@ public class TopicPolicies {
private Integer maxProducerPerTopic = null;
private Integer maxConsumerPerTopic = null;
private Integer maxConsumersPerSubscription = null;
+ private Long delayedDeliveryTickTimeMillis = null;
+ private Boolean delayedDeliveryEnabled = null;
+
+ public boolean isDelayedDeliveryTickTimeMillisSet(){
+ return delayedDeliveryTickTimeMillis != null;
+ }
+ public boolean isDelayedDeliveryEnabledSet(){
+ return delayedDeliveryEnabled != null;
+ }
public boolean isBacklogQuotaSet() {
return !backLogQuotaMap.isEmpty();