You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/08/06 02:09:48 UTC
[pulsar] branch master updated: Support set message TTL on topic
level. (#7738)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 8a763ab Support set message TTL on topic level. (#7738)
8a763ab is described below
commit 8a763abc4cff878f4765d5f0a806750b9e9998a5
Author: Marvin Cai <ca...@gmail.com>
AuthorDate: Wed Aug 5 19:09:27 2020 -0700
Support set message TTL on topic level. (#7738)
### Motivation
Master Issue: #2688
Add Topic level policy support for message TTL.
### Modifications
Support set/get/remove Message TTL on topic level.
### Verifying this change
This change added tests and can be verified as follows:
- *Added Unit test to verify set/get/remove message TTL at Topic level work as expected when Topic level policy is enabled/disabled*
- *Added test case in PersistentTopicE2ETest to verify Topic level message TTL is used when set and will fall back to namespace message TTL if Topic level message TTL is removed.*
---
.../broker/admin/impl/PersistentTopicsBase.java | 41 ++++++
.../pulsar/broker/admin/v2/PersistentTopics.java | 47 +++++++
.../broker/service/persistent/PersistentTopic.java | 41 +++++-
.../pulsar/broker/admin/TopicBacklogQuotaTest.java | 1 +
.../pulsar/broker/admin/TopicMessageTTLTest.java | 152 +++++++++++++++++++++
.../broker/service/PersistentTopicE2ETest.java | 95 +++++++++++++
.../org/apache/pulsar/client/admin/Topics.java | 43 ++++++
.../pulsar/client/admin/internal/TopicsImpl.java | 34 +++++
.../org/apache/pulsar/admin/cli/CmdTopics.java | 47 +++++++
9 files changed, 498 insertions(+), 3 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 40d4268..004ca6f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -2096,6 +2096,47 @@ public class PersistentTopicsBase extends AdminResource {
});
}
+ protected void internalSetMessageTTL(AsyncResponse asyncResponse, Integer ttlInSecond) {
+ //Validate message ttl value.
+ if (ttlInSecond != null && ttlInSecond.intValue() < 0) {
+ throw new RestException(Status.PRECONDITION_FAILED, "Invalid value for message TTL");
+ }
+ validateAdminAccessForTenant(namespaceName.getTenant());
+ validatePoliciesReadOnlyAccess();
+ if (topicName.isGlobal()) {
+ validateGlobalNamespaceOwnership(namespaceName);
+ }
+ checkTopicLevelPolicyEnable();
+ TopicPolicies topicPolicies;
+ //Update existing topic policy or create a new one if not exist.
+ try {
+ topicPolicies = pulsar().getTopicPoliciesService().getTopicPolicies(topicName);
+ } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
+ log.warn("Topic {} policies cache have not init.", topicName);
+ asyncResponse.resume(new RestException(e));
+ return;
+ }
+ if (topicPolicies == null){
+ topicPolicies = new TopicPolicies();
+ }
+ topicPolicies.setMessageTTLInSeconds(ttlInSecond);
+ pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies)
+ .whenComplete((result, ex) -> {
+ if (ex != null) {
+ log.error("Failed set message ttl for topic",ex);
+ asyncResponse.resume(new RestException(ex));
+ } else {
+ log.info("[{}] Successfully set topic message ttl: namespace={}, topic={}, ttl={}",
+ clientAppId(),
+ namespaceName,
+ topicName.getLocalName(),
+ ttlInSecond);
+ asyncResponse.resume(Response.noContent().build());
+ }
+ });
+ }
+
+
private RetentionPolicies getRetentionPolicies(TopicName topicName, TopicPolicies topicPolicies) {
RetentionPolicies retentionPolicies = topicPolicies.getRetentionPolicies();
if (retentionPolicies == null){
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index b939bde..cd0dae6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -1032,6 +1032,53 @@ public class PersistentTopics extends PersistentTopicsBase {
internalRemoveBacklogQuota(asyncResponse, backlogQuotaType);
}
+ @GET
+ @Path("/{tenant}/{namespace}/{topic}/messageTTL")
+ @ApiOperation(value = "Get message TTL in seconds for a topic")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Topic does not exist"),
+ @ApiResponse(code = 405, message = "Topic level policy is disabled, enable the topic level policy and retry")})
+ public int getMessageTTL(@PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace,
+ @PathParam("topic") @Encoded String encodedTopic) {
+ validateTopicName(tenant, namespace, encodedTopic);
+ return getTopicPolicies(topicName)
+ .map(TopicPolicies::getMessageTTLInSeconds)
+ .orElse(0); //same as default ttl at namespace level
+ }
+
+ @POST
+ @Path("/{tenant}/{namespace}/{topic}/messageTTL")
+ @ApiOperation(value = "Set message TTL in seconds for a topic")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Not authenticate to perform the request or policy is read only"),
+ @ApiResponse(code = 404, message = "Topic does not exist"),
+ @ApiResponse(code = 405, message = "Topic level policy is disabled, enable the topic level policy and retry"),
+ @ApiResponse(code = 412, message = "Invalid message TTL value") })
+ public void setMessageTTL(@Suspended final AsyncResponse asyncResponse,
+ @PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace,
+ @PathParam("topic") @Encoded String encodedTopic,
+ @ApiParam(value = "TTL in seconds for the specified namespace", required = true)
+ @QueryParam("messageTTL") int messageTTL) {
+ validateTopicName(tenant, namespace, encodedTopic);
+ internalSetMessageTTL(asyncResponse, messageTTL);
+ }
+
+ @DELETE
+ @Path("/{tenant}/{namespace}/{topic}/messageTTL")
+ @ApiOperation(value = "Remove message TTL in seconds for a topic")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Not authenticate to perform the request or policy is read only"),
+ @ApiResponse(code = 404, message = "Topic does not exist"),
+ @ApiResponse(code = 405, message = "Topic level policy is disabled, enable the topic level policy and retry"),
+ @ApiResponse(code = 412, message = "Invalid message TTL value") })
+ public void removeMessageTTL(@Suspended final AsyncResponse asyncResponse,
+ @PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace,
+ @PathParam("topic") @Encoded String encodedTopic) {
+ validateTopicName(tenant, namespace, encodedTopic);
+ internalSetMessageTTL(asyncResponse, null);
+ }
+
@POST
@Path("/{tenant}/{namespace}/{topic}/terminate")
@ApiOperation(value = "Terminate a topic. A topic that is terminated will not accept any more "
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 02d2f0b..767b236 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -112,6 +112,7 @@ import org.apache.pulsar.common.policies.data.PublisherStats;
import org.apache.pulsar.common.policies.data.ReplicatorStats;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
+import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.schema.SchemaData;
@@ -1065,7 +1066,8 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
return future;
}
- final int newMessageTTLinSeconds = policies.message_ttl_in_seconds;
+ //Ignore current broker's config for messageTTL for replication.
+ final int newMessageTTLinSeconds = getMessageTTL(getTopicPolicies(name), policies, -1);
Set<String> configuredClusters;
if (policies.replication_clusters != null) {
@@ -1122,8 +1124,10 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
.get(AdminResource.path(POLICIES, name.getNamespace()))
.orElseThrow(() -> new KeeperException.NoNodeException());
int defaultTTL = brokerService.pulsar().getConfiguration().getTtlDurationDefaultInSeconds();
- int message_ttl_in_seconds = (policies.message_ttl_in_seconds <= 0 && defaultTTL > 0) ? defaultTTL
- : policies.message_ttl_in_seconds;
+ TopicPolicies topicPolicies = getTopicPolicies(name);
+ //If topic level policy or message ttl is not set, fall back to namespace level config.
+ int message_ttl_in_seconds = getMessageTTL(topicPolicies, policies, defaultTTL);
+
if (message_ttl_in_seconds != 0) {
subscriptions.forEach((subName, sub) -> sub.expireMessages(message_ttl_in_seconds));
replicators.forEach((region, replicator) -> ((PersistentReplicator)replicator).expireMessages(message_ttl_in_seconds));
@@ -2108,6 +2112,37 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
}
}
+ /**
+ * Get {@link TopicPolicies} for this topic.
+ * @param topicName
+ * @return TopicPolicies is exist else return null.
+ */
+ private TopicPolicies getTopicPolicies(TopicName topicName) {
+ try {
+ return brokerService.pulsar().getTopicPoliciesService().getTopicPolicies(topicName);
+ } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
+ log.warn("Topic {} policies cache have not init.", topicName.getPartitionedTopicName());
+ return null;
+ }
+ }
+
+ /**
+ * Get message TTL for this topic.
+ * @param topicPolicies TopicPolicies
+ * @param policies NameSpace policy
+ * @param brokerDefaultMessageTTL
+ * @return Message TTL in second.
+ */
+ private int getMessageTTL(TopicPolicies topicPolicies, Policies policies, int brokerDefaultMessageTTL) {
+ //Return Topic level message TTL if exist. If topic level policy or message ttl is not set,
+ //fall back to namespace level message ttl then message ttl set for current broker.
+ return (topicPolicies == null || topicPolicies.getMessageTTLInSeconds() == null) ?
+ ((policies.message_ttl_in_seconds <= 0 && brokerDefaultMessageTTL > 0) ?
+ brokerDefaultMessageTTL :
+ policies.message_ttl_in_seconds) :
+ topicPolicies.getMessageTTLInSeconds();
+ }
+
private static final Logger log = LoggerFactory.getLogger(PersistentTopic.class);
@Override
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicBacklogQuotaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicBacklogQuotaTest.java
index c892eca..e847a7d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicBacklogQuotaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicBacklogQuotaTest.java
@@ -74,6 +74,7 @@ public class TopicBacklogQuotaTest extends MockedPulsarServiceBaseTest {
admin.topics().createPartitionedTopic(backlogQuotaTopic, 2);
Producer producer = pulsarClient.newProducer().topic(testTenant + "/" + testNamespace + "/" + "lookup-topic").create();
producer.close();
+ Thread.sleep(3000);
}
@AfterMethod
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicMessageTTLTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicMessageTTLTest.java
new file mode 100644
index 0000000..0ceb8f4
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicMessageTTLTest.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import com.google.common.collect.Sets;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.service.BacklogQuotaManager;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.BacklogQuota;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class TopicMessageTTLTest extends MockedPulsarServiceBaseTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TopicBacklogQuotaTest.class);
+
+ private final String testTenant = "my-tenant";
+
+ private final String testNamespace = "my-namespace";
+
+ private final String myNamespace = testTenant + "/" + testNamespace;
+
+ private final String testTopic = "persistent://" + myNamespace + "/test-topic-message-ttl";
+
+ @BeforeMethod
+ @Override
+ protected void setup() throws Exception {
+ this.conf.setSystemTopicEnabled(true);
+ this.conf.setTopicLevelPoliciesEnabled(true);
+ super.internalSetup();
+
+ admin.clusters().createCluster("test", new ClusterData(pulsar.getWebServiceAddress()));
+ TenantInfo tenantInfo = new TenantInfo(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"));
+ admin.tenants().createTenant(this.testTenant, tenantInfo);
+ admin.namespaces().createNamespace(testTenant + "/" + testNamespace, Sets.newHashSet("test"));
+ admin.topics().createPartitionedTopic(testTopic, 2);
+ Producer producer = pulsarClient.newProducer().topic(testTenant + "/" + testNamespace + "/" + "dummy-topic").create();
+ producer.close();
+ Thread.sleep(3000);
+ }
+
+ @AfterMethod
+ @Override
+ public void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Test
+ public void testSetThenRemoveMessageTTL() throws Exception {
+ admin.topics().setMessageTTL(testTopic, 100);
+ log.info("Message TTL set success on topic: {}", testTopic);
+
+ Thread.sleep(3000);
+ Integer messageTTL = admin.topics().getMessageTTL(testTopic);
+ log.info("Message TTL {} get on topic: {}", testTopic, messageTTL);
+ Assert.assertEquals(messageTTL.intValue(), 100);
+
+ Thread.sleep(3000);
+ admin.topics().removeMessageTTL(testTopic);
+ messageTTL = admin.topics().getMessageTTL(testTopic);
+ log.info("Message TTL {} get on topic: {}", testTopic, messageTTL);
+ Assert.assertEquals(messageTTL.intValue(), 0);
+ }
+
+ @Test
+ public void testSetInvalidMessageTTL() throws Exception {
+ try {
+ admin.topics().setMessageTTL(testTopic, -100);
+ Assert.fail();
+ } catch (PulsarAdminException e) {
+ Assert.assertEquals(e.getStatusCode(), 412);
+ }
+
+ try {
+ admin.topics().setMessageTTL(testTopic, (int)2147483650L);
+ Assert.fail();
+ } catch (PulsarAdminException e) {
+ Assert.assertEquals(e.getStatusCode(), 412);
+ }
+ }
+
+ @Test
+ public void testGetMessageTTL() throws Exception {
+ // Check default topic level message TTL.
+ Integer messageTTL = admin.topics().getMessageTTL(testTopic);
+ log.info("Message TTL {} get on topic: {}", testTopic, messageTTL);
+ Assert.assertEquals(messageTTL.intValue(), 0);
+
+ admin.topics().setMessageTTL(testTopic, 200);
+ log.info("Message TTL set success on topic: {}", testTopic);
+
+ Thread.sleep(3000);
+ messageTTL = admin.topics().getMessageTTL(testTopic);
+ log.info("Message TTL {} get on topic: {}", testTopic, messageTTL);
+ Assert.assertEquals(messageTTL.intValue(), 200);
+ }
+
+ @Test
+ public void testTopicPolicyDisabled() throws Exception {
+ this.conf.setSystemTopicEnabled(true);
+ this.conf.setTopicLevelPoliciesEnabled(false);
+ super.internalSetup();
+
+ admin.clusters().createCluster("test", new ClusterData(pulsar.getWebServiceAddress()));
+ TenantInfo tenantInfo = new TenantInfo(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"));
+ admin.tenants().createTenant(this.testTenant, tenantInfo);
+ admin.namespaces().createNamespace(testTenant + "/" + testNamespace, Sets.newHashSet("test"));
+ admin.topics().createPartitionedTopic(testTopic, 2);
+
+ try {
+ admin.topics().getMessageTTL(testTopic);
+ Assert.fail();
+ } catch (PulsarAdminException e) {
+ Assert.assertEquals(e.getStatusCode(), 405);
+ }
+
+ try {
+ admin.topics().setMessageTTL(testTopic, 200);
+ Assert.fail();
+ } catch (PulsarAdminException e) {
+ Assert.assertEquals(e.getStatusCode(), 405);
+ }
+ }
+
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
index 9c8667b..5dc3d2e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
@@ -906,6 +906,101 @@ public class PersistentTopicE2ETest extends BrokerTestBase {
}
@Test
+ public void testMessageExpiryWithTopicMessageTTL() throws Exception {
+ int namespaceMessageTTLSecs = 10;
+ int topicMessageTTLSecs = 2;
+ String namespaceName = "prop/expiry-check-2";
+
+ this.conf.setSystemTopicEnabled(true);
+ this.conf.setTopicLevelPoliciesEnabled(true);
+ setup();
+
+ admin.namespaces().createNamespace(namespaceName);
+ admin.namespaces().setNamespaceReplicationClusters(namespaceName, Sets.newHashSet("test"));
+ admin.namespaces().setNamespaceMessageTTL(namespaceName, namespaceMessageTTLSecs);
+
+ final String topicName = "persistent://prop/expiry-check-2/topic2";
+ final String subName = "sub1";
+ final int numMsgs = 10;
+
+ admin.topics().createNonPartitionedTopic(topicName);
+ Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).subscribe();
+
+ // Set topic level message ttl.
+ Thread.sleep(3000);
+ admin.topics().setMessageTTL(topicName, topicMessageTTLSecs);
+
+ PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
+ PersistentSubscription subRef = topicRef.getSubscription(subName);
+
+ consumer.close();
+ assertFalse(subRef.getDispatcher().isConsumerConnected());
+
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topicName)
+ .enableBatching(false)
+ .messageRoutingMode(MessageRoutingMode.SinglePartition)
+ .create();
+ for (int i = 0; i < numMsgs; i++) {
+ String message = "my-message-" + i;
+ producer.send(message.getBytes());
+ }
+
+ rolloverPerIntervalStats();
+ assertEquals(subRef.getNumberOfEntriesInBacklog(false), numMsgs);
+
+ Thread.sleep(TimeUnit.SECONDS.toMillis(topicMessageTTLSecs));
+ runMessageExpiryCheck();
+
+ // 1. check all messages expired for this unconnected subscription
+ assertEquals(subRef.getNumberOfEntriesInBacklog(false), 0);
+ producer.close();
+
+ // Set topic level message ttl.
+ Thread.sleep(3000);
+ admin.topics().removeMessageTTL(topicName);
+
+ consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).subscribe();
+
+ topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
+ subRef = topicRef.getSubscription(subName);
+
+ consumer.close();
+ assertFalse(subRef.getDispatcher().isConsumerConnected());
+
+ producer = pulsarClient.newProducer()
+ .topic(topicName)
+ .enableBatching(false)
+ .messageRoutingMode(MessageRoutingMode.SinglePartition)
+ .create();
+ for (int i = 0; i < numMsgs; i++) {
+ String message = "my-message-" + i;
+ producer.send(message.getBytes());
+ }
+
+ Thread.sleep(TimeUnit.SECONDS.toMillis(topicMessageTTLSecs));
+ rolloverPerIntervalStats();
+ assertEquals(subRef.getNumberOfEntriesInBacklog(false), numMsgs);
+
+ Thread.sleep(TimeUnit.SECONDS.toMillis(namespaceMessageTTLSecs - topicMessageTTLSecs));
+ runMessageExpiryCheck();
+
+ // 1. check all messages expired for this unconnected subscription
+ assertEquals(subRef.getNumberOfEntriesInBacklog(false), 0);
+
+ // clean-up
+ try {
+ producer.close();
+ consumer.close();
+ admin.topics().deleteSubscription(topicName, subName);
+ admin.topics().delete(topicName);
+ admin.namespaces().deleteNamespace(namespaceName);
+ } catch (PulsarAdminException e) {
+ Assert.assertEquals(e.getStatusCode(), 500);
+ }
+ }
+
+ @Test
public void testMessageExpiryWithFewExpiredBacklog() throws Exception {
int messageTTLSecs = 10;
String namespaceName = "prop/expiry-check-1";
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
index bf6ac82..8a1618e 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -1433,4 +1433,47 @@ public interface Topics {
* Unexpected error
*/
void removeBacklogQuota(String topic) throws PulsarAdminException;
+
+ /**
+ * Set message TTL for a topic.
+ *
+ * @param topic
+ * Topic name
+ * @param messageTTLInSecond
+ * Message TTL in second.
+ * @throws NotAuthorizedException
+ * Don't have admin permission
+ * @throws NotFoundException
+ * Topic does not exist
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ void setMessageTTL(String topic, int messageTTLInSecond) throws PulsarAdminException;
+
+ /**
+ * Get message TTL for a topic.
+ *
+ * @param topic
+ * @return Message TTL in second.
+ * @throws NotAuthorizedException
+ * Don't have admin permission
+ * @throws NotFoundException
+ * Topic does not exist
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ int getMessageTTL(String topic) throws PulsarAdminException;
+
+ /**
+ * Remove message TTL for a topic.
+ *
+ * @param topic
+ * @throws NotAuthorizedException
+ * Don't have admin permission
+ * @throws NotFoundException
+ * Topic does not exist
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ void removeMessageTTL(String topic) throws PulsarAdminException;
}
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index b5ac144..5831dee 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -1429,5 +1429,39 @@ public class TopicsImpl extends BaseResource implements Topics {
}
}
+ @Override
+ public void setMessageTTL(String topic, int messageTTLInSecond) throws PulsarAdminException {
+ try {
+ TopicName topicName = validateTopic(topic);
+ WebTarget path = topicPath(topicName, "messageTTL");
+ request(path.queryParam("messageTTL", messageTTLInSecond)).
+ post(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class);
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
+
+ @Override
+ public int getMessageTTL(String topic) throws PulsarAdminException {
+ try {
+ TopicName topicName = validateTopic(topic);
+ WebTarget path = topicPath(topicName, "messageTTL");
+ return request(path).get(new GenericType<Integer>() {});
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
+
+ @Override
+ public void removeMessageTTL(String topic) throws PulsarAdminException {
+ try {
+ TopicName topicName = validateTopic(topic);
+ WebTarget path = topicPath(topicName, "messageTTL");
+ request(path.queryParam("messageTTL", 0)).delete(ErrorData.class);
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
+
private static final Logger log = LoggerFactory.getLogger(TopicsImpl.class);
}
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index 4f2e399..b3622d0 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -104,6 +104,9 @@ public class CmdTopics extends CmdBase {
jcommander.addCommand("get-backlog-quotas", new GetBacklogQuotaMap());
jcommander.addCommand("set-backlog-quota", new SetBacklogQuota());
jcommander.addCommand("remove-backlog-quota", new RemoveBacklogQuota());
+ jcommander.addCommand("get-message-ttl", new GetMessageTTL());
+ jcommander.addCommand("set-message-ttl", new SetMessageTTL());
+ jcommander.addCommand("remove-message-ttl", new RemoveMessageTTL());
}
@Parameters(commandDescription = "Get the list of topics under a namespace.")
@@ -880,4 +883,48 @@ public class CmdTopics extends CmdBase {
admin.topics().removeBacklogQuota(persistentTopic);
}
}
+
+ @Parameters(commandDescription = "Get the message TTL for a topic")
+ private class GetMessageTTL extends CliCommand {
+ @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+ private java.util.List<String> params;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String persistentTopic = validatePersistentTopic(params);
+ print(admin.topics().getMessageTTL(persistentTopic));
+ }
+ }
+
+ @Parameters(commandDescription = "Set message TTL for a topic")
+ private class SetMessageTTL extends CliCommand {
+ @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+ private java.util.List<String> params;
+
+ @Parameter(names = { "-t", "--ttl" }, description = "Message TTL for topic in second, allowed range from 1 to Integer.MAX_VALUE", required = true)
+ private int messageTTLInSecond;
+
+ @Override
+ void run() throws PulsarAdminException {
+ if (messageTTLInSecond < 0) {
+ throw new ParameterException(String.format("Invalid retention policy type '%d'. ", messageTTLInSecond));
+ }
+
+ String persistentTopic = validatePersistentTopic(params);
+ admin.topics().setMessageTTL(persistentTopic, messageTTLInSecond);
+ }
+ }
+
+ @Parameters(commandDescription = "Remove message TTL for a topic")
+ private class RemoveMessageTTL extends CliCommand {
+
+ @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+ private java.util.List<String> params;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String persistentTopic = validatePersistentTopic(params);
+ admin.topics().removeMessageTTL(persistentTopic);
+ }
+ }
}