You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/08/06 02:09:48 UTC

[pulsar] branch master updated: Support set message TTL on topic level. (#7738)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 8a763ab  Support set message TTL on topic level. (#7738)
8a763ab is described below

commit 8a763abc4cff878f4765d5f0a806750b9e9998a5
Author: Marvin Cai <ca...@gmail.com>
AuthorDate: Wed Aug 5 19:09:27 2020 -0700

    Support set message TTL on topic level. (#7738)
    
    ### Motivation
    Master Issue: #2688
    Add Topic level policy support for message TTL.
    
    ### Modifications
    
    Support set/get/remove Message TTL on topic level.
    
    ### Verifying this change
    This change added tests and can be verified as follows:
    
      - *Added Unit test to verify set/get/remove message TTL at Topic level work as expected when Topic level policy is enabled/disabled*
      - *Added test case in PersistentTopicE2ETest to verify Topic level message TTL is used when set and will fall back to namespace message TTL if Topic level message TTL is removed.*
---
 .../broker/admin/impl/PersistentTopicsBase.java    |  41 ++++++
 .../pulsar/broker/admin/v2/PersistentTopics.java   |  47 +++++++
 .../broker/service/persistent/PersistentTopic.java |  41 +++++-
 .../pulsar/broker/admin/TopicBacklogQuotaTest.java |   1 +
 .../pulsar/broker/admin/TopicMessageTTLTest.java   | 152 +++++++++++++++++++++
 .../broker/service/PersistentTopicE2ETest.java     |  95 +++++++++++++
 .../org/apache/pulsar/client/admin/Topics.java     |  43 ++++++
 .../pulsar/client/admin/internal/TopicsImpl.java   |  34 +++++
 .../org/apache/pulsar/admin/cli/CmdTopics.java     |  47 +++++++
 9 files changed, 498 insertions(+), 3 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 40d4268..004ca6f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -2096,6 +2096,47 @@ public class PersistentTopicsBase extends AdminResource {
                 });
     }
 
+    protected void internalSetMessageTTL(AsyncResponse asyncResponse, Integer ttlInSecond) {
+        //Validate message ttl value.
+        if (ttlInSecond != null && ttlInSecond.intValue() < 0) {
+            throw new RestException(Status.PRECONDITION_FAILED, "Invalid value for message TTL");
+        }
+        validateAdminAccessForTenant(namespaceName.getTenant());
+        validatePoliciesReadOnlyAccess();
+        if (topicName.isGlobal()) {
+            validateGlobalNamespaceOwnership(namespaceName);
+        }
+        checkTopicLevelPolicyEnable();
+        TopicPolicies topicPolicies;
+        //Update existing topic policy or create a new one if not exist.
+        try {
+            topicPolicies = pulsar().getTopicPoliciesService().getTopicPolicies(topicName);
+        } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
+            log.warn("Topic {} policies cache have not init.", topicName);
+            asyncResponse.resume(new RestException(e));
+            return;
+        }
+        if (topicPolicies == null){
+            topicPolicies = new TopicPolicies();
+        }
+        topicPolicies.setMessageTTLInSeconds(ttlInSecond);
+        pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies)
+                .whenComplete((result, ex) -> {
+                    if (ex != null) {
+                        log.error("Failed set message ttl for topic",ex);
+                        asyncResponse.resume(new RestException(ex));
+                    } else {
+                        log.info("[{}] Successfully set topic message ttl: namespace={}, topic={}, ttl={}",
+                                clientAppId(),
+                                namespaceName,
+                                topicName.getLocalName(),
+                                ttlInSecond);
+                        asyncResponse.resume(Response.noContent().build());
+                    }
+                });
+    }
+
+
     private RetentionPolicies getRetentionPolicies(TopicName topicName, TopicPolicies topicPolicies) {
         RetentionPolicies retentionPolicies = topicPolicies.getRetentionPolicies();
         if (retentionPolicies == null){
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index b939bde..cd0dae6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -1032,6 +1032,53 @@ public class PersistentTopics extends PersistentTopicsBase {
         internalRemoveBacklogQuota(asyncResponse, backlogQuotaType);
     }
 
+    @GET
+    @Path("/{tenant}/{namespace}/{topic}/messageTTL")
+    @ApiOperation(value = "Get message TTL in seconds for a topic")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+                            @ApiResponse(code = 404, message = "Topic does not exist"),
+                            @ApiResponse(code = 405, message = "Topic level policy is disabled, enable the topic level policy and retry")})
+    public int getMessageTTL(@PathParam("tenant") String tenant,
+                             @PathParam("namespace") String namespace,
+                             @PathParam("topic") @Encoded String encodedTopic) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        return getTopicPolicies(topicName)
+                .map(TopicPolicies::getMessageTTLInSeconds)
+                .orElse(0);  //same as default ttl at namespace level
+    }
+
+    @POST
+    @Path("/{tenant}/{namespace}/{topic}/messageTTL")
+    @ApiOperation(value = "Set message TTL in seconds for a topic")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Not authenticate to perform the request or policy is read only"),
+                            @ApiResponse(code = 404, message = "Topic does not exist"),
+                            @ApiResponse(code = 405, message = "Topic level policy is disabled, enable the topic level policy and retry"),
+                            @ApiResponse(code = 412, message = "Invalid message TTL value") })
+    public void setMessageTTL(@Suspended final AsyncResponse asyncResponse,
+                              @PathParam("tenant") String tenant,
+                              @PathParam("namespace") String namespace,
+                              @PathParam("topic") @Encoded String encodedTopic,
+                              @ApiParam(value = "TTL in seconds for the specified namespace", required = true)
+                              @QueryParam("messageTTL") int messageTTL) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        internalSetMessageTTL(asyncResponse, messageTTL);
+    }
+
+    @DELETE
+    @Path("/{tenant}/{namespace}/{topic}/messageTTL")
+    @ApiOperation(value = "Remove message TTL in seconds for a topic")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Not authenticate to perform the request or policy is read only"),
+            @ApiResponse(code = 404, message = "Topic does not exist"),
+            @ApiResponse(code = 405, message = "Topic level policy is disabled, enable the topic level policy and retry"),
+            @ApiResponse(code = 412, message = "Invalid message TTL value") })
+    public void removeMessageTTL(@Suspended final AsyncResponse asyncResponse,
+                              @PathParam("tenant") String tenant,
+                              @PathParam("namespace") String namespace,
+                              @PathParam("topic") @Encoded String encodedTopic) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        internalSetMessageTTL(asyncResponse, null);
+    }
+
     @POST
     @Path("/{tenant}/{namespace}/{topic}/terminate")
     @ApiOperation(value = "Terminate a topic. A topic that is terminated will not accept any more "
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 02d2f0b..767b236 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -112,6 +112,7 @@ import org.apache.pulsar.common.policies.data.PublisherStats;
 import org.apache.pulsar.common.policies.data.ReplicatorStats;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.SubscriptionStats;
+import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.schema.SchemaData;
@@ -1065,7 +1066,8 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
             return future;
         }
 
-        final int newMessageTTLinSeconds = policies.message_ttl_in_seconds;
+        //Ignore current broker's config for messageTTL for replication.
+        final int newMessageTTLinSeconds = getMessageTTL(getTopicPolicies(name), policies, -1);
 
         Set<String> configuredClusters;
         if (policies.replication_clusters != null) {
@@ -1122,8 +1124,10 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
                     .get(AdminResource.path(POLICIES, name.getNamespace()))
                     .orElseThrow(() -> new KeeperException.NoNodeException());
             int defaultTTL = brokerService.pulsar().getConfiguration().getTtlDurationDefaultInSeconds();
-            int message_ttl_in_seconds = (policies.message_ttl_in_seconds <= 0 && defaultTTL > 0) ? defaultTTL
-                    : policies.message_ttl_in_seconds;
+            TopicPolicies topicPolicies = getTopicPolicies(name);
+            //If topic level policy or message ttl is not set, fall back to namespace level config.
+            int message_ttl_in_seconds = getMessageTTL(topicPolicies, policies, defaultTTL);
+
             if (message_ttl_in_seconds != 0) {
                 subscriptions.forEach((subName, sub) -> sub.expireMessages(message_ttl_in_seconds));
                 replicators.forEach((region, replicator) -> ((PersistentReplicator)replicator).expireMessages(message_ttl_in_seconds));
@@ -2108,6 +2112,37 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
         }
     }
 
+    /**
+     * Get {@link TopicPolicies} for this topic.
+     * @param topicName
+     * @return TopicPolicies is exist else return null.
+     */
+    private TopicPolicies getTopicPolicies(TopicName topicName) {
+        try {
+            return brokerService.pulsar().getTopicPoliciesService().getTopicPolicies(topicName);
+        } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
+            log.warn("Topic {} policies cache have not init.", topicName.getPartitionedTopicName());
+            return null;
+        }
+    }
+
+    /**
+     * Get message TTL for this topic.
+     * @param topicPolicies TopicPolicies
+     * @param policies      NameSpace policy
+     * @param brokerDefaultMessageTTL
+     * @return Message TTL in second.
+     */
+    private int getMessageTTL(TopicPolicies topicPolicies, Policies policies, int brokerDefaultMessageTTL) {
+        //Return Topic level message TTL if exist. If topic level policy or message ttl is not set,
+        //fall back to namespace level message ttl then message ttl set for current broker.
+        return (topicPolicies == null || topicPolicies.getMessageTTLInSeconds() == null) ?
+                ((policies.message_ttl_in_seconds <= 0 && brokerDefaultMessageTTL > 0) ?
+                        brokerDefaultMessageTTL :
+                        policies.message_ttl_in_seconds) :
+                topicPolicies.getMessageTTLInSeconds();
+    }
+
     private static final Logger log = LoggerFactory.getLogger(PersistentTopic.class);
 
     @Override
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicBacklogQuotaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicBacklogQuotaTest.java
index c892eca..e847a7d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicBacklogQuotaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicBacklogQuotaTest.java
@@ -74,6 +74,7 @@ public class TopicBacklogQuotaTest extends MockedPulsarServiceBaseTest {
         admin.topics().createPartitionedTopic(backlogQuotaTopic, 2);
         Producer producer = pulsarClient.newProducer().topic(testTenant + "/" + testNamespace + "/" + "lookup-topic").create();
         producer.close();
+        Thread.sleep(3000);
     }
 
     @AfterMethod
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicMessageTTLTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicMessageTTLTest.java
new file mode 100644
index 0000000..0ceb8f4
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicMessageTTLTest.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import com.google.common.collect.Sets;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.service.BacklogQuotaManager;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.BacklogQuota;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class TopicMessageTTLTest extends MockedPulsarServiceBaseTest {
+
+    private static final Logger LOG = LoggerFactory.getLogger(TopicBacklogQuotaTest.class);
+
+    private final String testTenant = "my-tenant";
+
+    private final String testNamespace = "my-namespace";
+
+    private final String myNamespace = testTenant + "/" + testNamespace;
+
+    private final String testTopic = "persistent://" + myNamespace + "/test-topic-message-ttl";
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        this.conf.setSystemTopicEnabled(true);
+        this.conf.setTopicLevelPoliciesEnabled(true);
+        super.internalSetup();
+
+        admin.clusters().createCluster("test", new ClusterData(pulsar.getWebServiceAddress()));
+        TenantInfo tenantInfo = new TenantInfo(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"));
+        admin.tenants().createTenant(this.testTenant, tenantInfo);
+        admin.namespaces().createNamespace(testTenant + "/" + testNamespace, Sets.newHashSet("test"));
+        admin.topics().createPartitionedTopic(testTopic, 2);
+        Producer producer = pulsarClient.newProducer().topic(testTenant + "/" + testNamespace + "/" + "dummy-topic").create();
+        producer.close();
+        Thread.sleep(3000);
+    }
+
+    @AfterMethod
+    @Override
+    public void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testSetThenRemoveMessageTTL() throws Exception {
+        admin.topics().setMessageTTL(testTopic, 100);
+        log.info("Message TTL set success on topic: {}", testTopic);
+
+        Thread.sleep(3000);
+        Integer messageTTL = admin.topics().getMessageTTL(testTopic);
+        log.info("Message TTL {} get on topic: {}", testTopic, messageTTL);
+        Assert.assertEquals(messageTTL.intValue(), 100);
+
+        Thread.sleep(3000);
+        admin.topics().removeMessageTTL(testTopic);
+        messageTTL = admin.topics().getMessageTTL(testTopic);
+        log.info("Message TTL {} get on topic: {}", testTopic, messageTTL);
+        Assert.assertEquals(messageTTL.intValue(), 0);
+    }
+
+    @Test
+    public void testSetInvalidMessageTTL() throws Exception {
+        try {
+            admin.topics().setMessageTTL(testTopic, -100);
+            Assert.fail();
+        } catch (PulsarAdminException e) {
+            Assert.assertEquals(e.getStatusCode(), 412);
+        }
+
+        try {
+            admin.topics().setMessageTTL(testTopic, (int)2147483650L);
+            Assert.fail();
+        } catch (PulsarAdminException e) {
+            Assert.assertEquals(e.getStatusCode(), 412);
+        }
+    }
+
+    @Test
+    public void testGetMessageTTL() throws Exception {
+        // Check default topic level message TTL.
+        Integer messageTTL = admin.topics().getMessageTTL(testTopic);
+        log.info("Message TTL {} get on topic: {}", testTopic, messageTTL);
+        Assert.assertEquals(messageTTL.intValue(), 0);
+
+        admin.topics().setMessageTTL(testTopic, 200);
+        log.info("Message TTL set success on topic: {}", testTopic);
+
+        Thread.sleep(3000);
+        messageTTL = admin.topics().getMessageTTL(testTopic);
+        log.info("Message TTL {} get on topic: {}", testTopic, messageTTL);
+        Assert.assertEquals(messageTTL.intValue(), 200);
+    }
+
+    @Test
+    public void testTopicPolicyDisabled() throws Exception {
+        this.conf.setSystemTopicEnabled(true);
+        this.conf.setTopicLevelPoliciesEnabled(false);
+        super.internalSetup();
+
+        admin.clusters().createCluster("test", new ClusterData(pulsar.getWebServiceAddress()));
+        TenantInfo tenantInfo = new TenantInfo(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"));
+        admin.tenants().createTenant(this.testTenant, tenantInfo);
+        admin.namespaces().createNamespace(testTenant + "/" + testNamespace, Sets.newHashSet("test"));
+        admin.topics().createPartitionedTopic(testTopic, 2);
+
+        try {
+            admin.topics().getMessageTTL(testTopic);
+            Assert.fail();
+        } catch (PulsarAdminException e) {
+            Assert.assertEquals(e.getStatusCode(), 405);
+        }
+
+        try {
+            admin.topics().setMessageTTL(testTopic, 200);
+            Assert.fail();
+        } catch (PulsarAdminException e) {
+            Assert.assertEquals(e.getStatusCode(), 405);
+        }
+    }
+
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
index 9c8667b..5dc3d2e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
@@ -906,6 +906,101 @@ public class PersistentTopicE2ETest extends BrokerTestBase {
     }
 
     @Test
+    public void testMessageExpiryWithTopicMessageTTL() throws Exception {
+        int namespaceMessageTTLSecs = 10;
+        int topicMessageTTLSecs = 2;
+        String namespaceName = "prop/expiry-check-2";
+
+        this.conf.setSystemTopicEnabled(true);
+        this.conf.setTopicLevelPoliciesEnabled(true);
+        setup();
+
+        admin.namespaces().createNamespace(namespaceName);
+        admin.namespaces().setNamespaceReplicationClusters(namespaceName, Sets.newHashSet("test"));
+        admin.namespaces().setNamespaceMessageTTL(namespaceName, namespaceMessageTTLSecs);
+
+        final String topicName = "persistent://prop/expiry-check-2/topic2";
+        final String subName = "sub1";
+        final int numMsgs = 10;
+
+        admin.topics().createNonPartitionedTopic(topicName);
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).subscribe();
+
+        // Set topic level message ttl.
+        Thread.sleep(3000);
+        admin.topics().setMessageTTL(topicName, topicMessageTTLSecs);
+
+        PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
+        PersistentSubscription subRef = topicRef.getSubscription(subName);
+
+        consumer.close();
+        assertFalse(subRef.getDispatcher().isConsumerConnected());
+
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topicName)
+                .enableBatching(false)
+                .messageRoutingMode(MessageRoutingMode.SinglePartition)
+                .create();
+        for (int i = 0; i < numMsgs; i++) {
+            String message = "my-message-" + i;
+            producer.send(message.getBytes());
+        }
+
+        rolloverPerIntervalStats();
+        assertEquals(subRef.getNumberOfEntriesInBacklog(false), numMsgs);
+
+        Thread.sleep(TimeUnit.SECONDS.toMillis(topicMessageTTLSecs));
+        runMessageExpiryCheck();
+
+        // 1. check all messages expired for this unconnected subscription
+        assertEquals(subRef.getNumberOfEntriesInBacklog(false), 0);
+        producer.close();
+
+        // Set topic level message ttl.
+        Thread.sleep(3000);
+        admin.topics().removeMessageTTL(topicName);
+
+        consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).subscribe();
+
+        topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
+        subRef = topicRef.getSubscription(subName);
+
+        consumer.close();
+        assertFalse(subRef.getDispatcher().isConsumerConnected());
+
+        producer = pulsarClient.newProducer()
+                .topic(topicName)
+                .enableBatching(false)
+                .messageRoutingMode(MessageRoutingMode.SinglePartition)
+                .create();
+        for (int i = 0; i < numMsgs; i++) {
+            String message = "my-message-" + i;
+            producer.send(message.getBytes());
+        }
+
+        Thread.sleep(TimeUnit.SECONDS.toMillis(topicMessageTTLSecs));
+        rolloverPerIntervalStats();
+        assertEquals(subRef.getNumberOfEntriesInBacklog(false), numMsgs);
+
+        Thread.sleep(TimeUnit.SECONDS.toMillis(namespaceMessageTTLSecs - topicMessageTTLSecs));
+        runMessageExpiryCheck();
+
+        // 1. check all messages expired for this unconnected subscription
+        assertEquals(subRef.getNumberOfEntriesInBacklog(false), 0);
+
+        // clean-up
+        try {
+            producer.close();
+            consumer.close();
+            admin.topics().deleteSubscription(topicName, subName);
+            admin.topics().delete(topicName);
+            admin.namespaces().deleteNamespace(namespaceName);
+        } catch (PulsarAdminException e) {
+            Assert.assertEquals(e.getStatusCode(), 500);
+        }
+    }
+
+    @Test
     public void testMessageExpiryWithFewExpiredBacklog() throws Exception {
         int messageTTLSecs = 10;
         String namespaceName = "prop/expiry-check-1";
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
index bf6ac82..8a1618e 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -1433,4 +1433,47 @@ public interface Topics {
      *             Unexpected error
      */
     void removeBacklogQuota(String topic) throws PulsarAdminException;
+
+    /**
+     * Set message TTL for a topic.
+     *
+     * @param topic
+     *          Topic name
+     * @param messageTTLInSecond
+     *          Message TTL in second.
+     * @throws NotAuthorizedException
+     *             Don't have admin permission
+     * @throws NotFoundException
+     *             Topic does not exist
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void setMessageTTL(String topic, int messageTTLInSecond) throws PulsarAdminException;
+
+    /**
+     * Get message TTL for a topic.
+     *
+     * @param topic
+     * @return Message TTL in second.
+     * @throws NotAuthorizedException
+     *             Don't have admin permission
+     * @throws NotFoundException
+     *             Topic does not exist
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    int getMessageTTL(String topic) throws PulsarAdminException;
+
+    /**
+     * Remove message TTL for a topic.
+     *
+     * @param topic
+     * @throws NotAuthorizedException
+     *             Don't have admin permission
+     * @throws NotFoundException
+     *             Topic does not exist
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void removeMessageTTL(String topic) throws PulsarAdminException;
 }
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index b5ac144..5831dee 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -1429,5 +1429,39 @@ public class TopicsImpl extends BaseResource implements Topics {
         }
     }
 
+    @Override
+    public void setMessageTTL(String topic, int messageTTLInSecond) throws PulsarAdminException {
+        try {
+            TopicName topicName = validateTopic(topic);
+            WebTarget path = topicPath(topicName, "messageTTL");
+            request(path.queryParam("messageTTL", messageTTLInSecond)).
+                    post(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public int getMessageTTL(String topic) throws PulsarAdminException {
+        try {
+            TopicName topicName = validateTopic(topic);
+            WebTarget path = topicPath(topicName, "messageTTL");
+            return request(path).get(new GenericType<Integer>() {});
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public void removeMessageTTL(String topic) throws PulsarAdminException {
+        try {
+            TopicName topicName = validateTopic(topic);
+            WebTarget path = topicPath(topicName, "messageTTL");
+            request(path.queryParam("messageTTL", 0)).delete(ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
     private static final Logger log = LoggerFactory.getLogger(TopicsImpl.class);
 }
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index 4f2e399..b3622d0 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -104,6 +104,9 @@ public class CmdTopics extends CmdBase {
         jcommander.addCommand("get-backlog-quotas", new GetBacklogQuotaMap());
         jcommander.addCommand("set-backlog-quota", new SetBacklogQuota());
         jcommander.addCommand("remove-backlog-quota", new RemoveBacklogQuota());
+        jcommander.addCommand("get-message-ttl", new GetMessageTTL());
+        jcommander.addCommand("set-message-ttl", new SetMessageTTL());
+        jcommander.addCommand("remove-message-ttl", new RemoveMessageTTL());
     }
 
     @Parameters(commandDescription = "Get the list of topics under a namespace.")
@@ -880,4 +883,48 @@ public class CmdTopics extends CmdBase {
             admin.topics().removeBacklogQuota(persistentTopic);
         }
     }
+
+    @Parameters(commandDescription = "Get the message TTL for a topic")
+    private class GetMessageTTL extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            print(admin.topics().getMessageTTL(persistentTopic));
+        }
+    }
+
+    @Parameters(commandDescription = "Set message TTL for a topic")
+    private class SetMessageTTL extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "-t", "--ttl" }, description = "Message TTL for topic in second, allowed range from 1 to Integer.MAX_VALUE", required = true)
+        private int messageTTLInSecond;
+
+        @Override
+        void run() throws PulsarAdminException {
+            if (messageTTLInSecond < 0) {
+                throw new ParameterException(String.format("Invalid retention policy type '%d'. ", messageTTLInSecond));
+            }
+
+            String persistentTopic = validatePersistentTopic(params);
+            admin.topics().setMessageTTL(persistentTopic, messageTTLInSecond);
+        }
+    }
+
+    @Parameters(commandDescription = "Remove message TTL for a topic")
+    private class RemoveMessageTTL extends CliCommand {
+
+        @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            admin.topics().removeMessageTTL(persistentTopic);
+        }
+    }
 }