You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/07/31 06:07:05 UTC

[pulsar] branch master updated: [Issue 2689] Support set backlog quota on topic level. (#7646)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 26c49a8  [Issue 2689] Support set backlog quota on topic level. (#7646)
26c49a8 is described below

commit 26c49a85dd89d4282a625f02d16959de595d282f
Author: jianyun <21...@qq.com>
AuthorDate: Fri Jul 31 14:06:47 2020 +0800

    [Issue 2689] Support set backlog quota on topic level. (#7646)
    
    ### Motivation
    Support set backlog quota on topic level.
    Based on the system topic function, refer to @codelipenghui  topic-level backlog quota based on zk implementation
    
    ###Modifications
    Support get-backlog-quotas on topic level.
    Support set-backlog-quota on topic level.
    Support remove-backlog-quota on topic level.
---
 .../apache/pulsar/broker/admin/AdminResource.java  |  36 ++++
 .../pulsar/broker/admin/impl/NamespacesBase.java   |  17 +-
 .../broker/admin/impl/PersistentTopicsBase.java    |  79 ++++++++
 .../pulsar/broker/admin/v2/PersistentTopics.java   |  55 ++++++
 .../pulsar/broker/service/BacklogQuotaManager.java |  36 +++-
 .../broker/service/persistent/PersistentTopic.java |   8 +-
 .../broker/admin/TopicBacklogQuotaDisableTest.java |  97 ++++++++++
 .../pulsar/broker/admin/TopicBacklogQuotaTest.java | 205 +++++++++++++++++++++
 .../org/apache/pulsar/client/admin/Topics.java     |  79 ++++++++
 .../pulsar/client/admin/internal/TopicsImpl.java   |  38 ++++
 .../org/apache/pulsar/admin/cli/CmdTopics.java     |  62 +++++++
 .../pulsar/common/policies/data/TopicPolicies.java |   2 +-
 12 files changed, 685 insertions(+), 29 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 9124ebb..590c032 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -28,6 +28,7 @@ import java.net.MalformedURLException;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
@@ -64,8 +65,10 @@ import org.apache.pulsar.common.policies.data.DispatchRate;
 import org.apache.pulsar.common.policies.data.FailureDomain;
 import org.apache.pulsar.common.policies.data.LocalPolicies;
 import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.SubscribeRate;
 import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
 import org.apache.pulsar.common.util.Codec;
 import org.apache.pulsar.common.util.FutureUtil;
@@ -508,6 +511,39 @@ public abstract class AdminResource extends PulsarWebResource {
         return pulsar().getBrokerService().getBacklogQuotaManager().getBacklogQuota(namespace, namespacePath);
     }
 
+    protected Optional<TopicPolicies> getTopicPolicies(TopicName topicName) {
+        try {
+            checkTopicLevelPolicyEnable();
+            return Optional.ofNullable(pulsar().getTopicPoliciesService().getTopicPolicies(topicName));
+        } catch (RestException re) {
+            throw re;
+        } catch (Exception e) {
+            log.error("[{}] Failed to get topic policies {}", clientAppId(), topicName, e);
+            throw new RestException(e);
+        }
+    }
+
+    protected boolean checkBacklogQuota(BacklogQuota quota, RetentionPolicies retention) {
+        if (retention == null || retention.getRetentionSizeInMB() == 0 ||
+                retention.getRetentionSizeInMB() == -1) {
+            return true;
+        }
+        if (quota == null) {
+            quota = pulsar().getBrokerService().getBacklogQuotaManager().getDefaultQuota();
+        }
+        if (quota.getLimit() >= ( retention.getRetentionSizeInMB() * 1024 * 1024)) {
+            return false;
+        }
+        return true;
+    }
+
+    protected void checkTopicLevelPolicyEnable() {
+        if (!config().isTopicLevelPoliciesEnabled()) {
+            throw new RestException(Status.METHOD_NOT_ALLOWED,
+                    "Topic level policies is disabled, to enable the topic level policy and retry.");
+        }
+    }
+
     protected DispatchRate dispatchRate() {
         return new DispatchRate(
                 pulsar().getConfiguration().getDispatchThrottlingRatePerTopicInMsg(),
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index c76eeda..dc5b0af 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -2036,21 +2036,12 @@ public abstract class NamespacesBase extends AdminResource {
     }
 
     private boolean checkQuotas(Policies policies, RetentionPolicies retention) {
-        Map<BacklogQuota.BacklogQuotaType, BacklogQuota> backlog_quota_map = policies.backlog_quota_map;
-        if (backlog_quota_map.isEmpty() || retention.getRetentionSizeInMB() == 0 || retention.getRetentionSizeInMB() == -1) {
+        Map<BacklogQuota.BacklogQuotaType, BacklogQuota> backlogQuotaMap = policies.backlog_quota_map;
+        if (backlogQuotaMap.isEmpty()) {
             return true;
         }
-        BacklogQuota quota = backlog_quota_map.get(BacklogQuotaType.destination_storage);
-        if (quota == null) {
-            quota = pulsar().getBrokerService().getBacklogQuotaManager().getDefaultQuota();
-        }
-        if (quota.getLimit() < 0 && (retention.getRetentionSizeInMB() > 0 || retention.getRetentionTimeInMinutes() > 0)) {
-            return false;
-        }
-        if (quota.getLimit() >= (retention.getRetentionSizeInMB() * 1024 * 1024)) {
-            return false;
-        }
-        return true;
+        BacklogQuota quota = backlogQuotaMap.get(BacklogQuotaType.destination_storage);
+        return checkBacklogQuota(quota, retention);
     }
 
     private void clearBacklog(NamespaceName nsName, String bundleRange, String subscription) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index cdbca45..4066399 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -71,6 +71,7 @@ import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.admin.ZkAdminPaths;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.broker.service.BrokerServiceException.AlreadyRunningException;
 import org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException;
 import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
@@ -104,12 +105,15 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.AuthPolicies;
+import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.PartitionedTopicInternalStats;
 import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
 import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
 import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.SubscriptionStats;
+import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.common.util.DateFormatter;
 import org.apache.pulsar.common.util.FutureUtil;
@@ -2000,6 +2004,81 @@ public class PersistentTopicsBase extends AdminResource {
         return offlineTopicStats;
     }
 
+    protected void internalSetBacklogQuota(AsyncResponse asyncResponse, BacklogQuota.BacklogQuotaType backlogQuotaType, BacklogQuota backlogQuota) {
+        validateAdminAccessForTenant(namespaceName.getTenant());
+        validatePoliciesReadOnlyAccess();
+        if (topicName.isGlobal()) {
+            validateGlobalNamespaceOwnership(namespaceName);
+        }
+        if (backlogQuotaType == null) {
+            backlogQuotaType = BacklogQuota.BacklogQuotaType.destination_storage;
+        }
+        checkTopicLevelPolicyEnable();
+        TopicPolicies topicPolicies;
+        try {
+            topicPolicies = pulsar().getTopicPoliciesService().getTopicPolicies(topicName);
+        } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
+            log.warn("Topic {} policies cache have not init.", topicName);
+            asyncResponse.resume(new RestException(e));
+            return;
+        }
+        if (topicPolicies == null){
+            topicPolicies = new TopicPolicies();
+        }
+
+        RetentionPolicies retentionPolicies = getRetentionPolicies(topicName, topicPolicies);
+        if(!checkBacklogQuota(backlogQuota,retentionPolicies)){
+            log.warn(
+                    "[{}] Failed to update backlog configuration for topic {}: conflicts with retention quota",
+                    clientAppId(), topicName);
+            asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
+                    "Backlog Quota exceeds configured retention quota for topic. " +
+                            "Please increase retention quota and retry"));
+        }
+
+        if(backlogQuota != null){
+            topicPolicies.getBackLogQuotaMap().put(backlogQuotaType.name(), backlogQuota);
+        }else {
+            topicPolicies.getBackLogQuotaMap().remove(backlogQuotaType.name());
+        }
+        Map<String, BacklogQuota> backLogQuotaMap = topicPolicies.getBackLogQuotaMap();
+        pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies)
+                .whenComplete((r, ex) -> {
+                    if (ex != null) {
+                        log.error("Failed updated backlog quota map",ex);
+                        asyncResponse.resume(new RestException(ex));
+                    } else {
+                        try {
+                            log.info("[{}] Successfully updated backlog quota map: namespace={}, topic={}, map={}",
+                                    clientAppId(),
+                                    namespaceName,
+                                    topicName.getLocalName(),
+                                    jsonMapper().writeValueAsString(backLogQuotaMap));
+                        } catch (JsonProcessingException ignore) { }
+                        asyncResponse.resume(Response.noContent().build());
+                    }
+                });
+    }
+
+    private RetentionPolicies getRetentionPolicies(TopicName topicName, TopicPolicies topicPolicies) {
+        RetentionPolicies retentionPolicies = topicPolicies.getRetentionPolicies();
+        if (retentionPolicies == null){
+            try {
+                retentionPolicies = getNamespacePoliciesAsync(topicName.getNamespaceObject())
+                        .thenApply(policies -> policies.retention_policies)
+                        .get(1L, TimeUnit.SECONDS);
+            } catch (Exception e) {
+               throw new RestException(e);
+            }
+        }
+        return retentionPolicies;
+    }
+
+    protected void internalRemoveBacklogQuota(AsyncResponse asyncResponse,
+            BacklogQuota.BacklogQuotaType backlogQuotaType) {
+        internalSetBacklogQuota(asyncResponse, backlogQuotaType, null);
+    }
+
     protected MessageId internalTerminate(boolean authoritative) {
         if (topicName.isGlobal()) {
             validateGlobalNamespaceOwnership(namespaceName);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index fbfea19..b939bde 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.admin.v2;
 
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -38,6 +39,7 @@ import javax.ws.rs.container.Suspended;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 
+import com.google.common.collect.Maps;
 import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase;
 import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.client.admin.LongRunningProcessStatus;
@@ -46,8 +48,10 @@ import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
+import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.policies.data.TopicStats;
 
 import io.swagger.annotations.Api;
@@ -977,6 +981,57 @@ public class PersistentTopics extends PersistentTopicsBase {
         return internalGetBacklog(authoritative);
     }
 
+    @GET
+    @Path("/{tenant}/{namespace}/{topic}/backlogQuotaMap")
+    @ApiOperation(value = "Get backlog quota map on a topic.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Topic policy does not exist"),
+            @ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry")})
+    public Map<BacklogQuota.BacklogQuotaType, BacklogQuota> getBacklogQuotaMap(@PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        return getTopicPolicies(topicName)
+                .map(TopicPolicies::getBackLogQuotaMap)
+                .map(map -> {
+                    HashMap<BacklogQuota.BacklogQuotaType, BacklogQuota> hashMap = Maps.newHashMap();
+                    map.forEach((key,value) -> {
+                        hashMap.put(BacklogQuota.BacklogQuotaType.valueOf(key),value);
+                    });
+                    return hashMap;
+                })
+                .orElse(Maps.newHashMap());
+    }
+
+    @POST
+    @Path("/{tenant}/{namespace}/{topic}/backlogQuota")
+    @ApiOperation(value = " Set a backlog quota for a topic.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Topic does not exist"),
+            @ApiResponse(code = 409, message = "Concurrent modification"),
+            @ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry"),
+            @ApiResponse(code = 412, message = "Specified backlog quota exceeds retention quota. Increase retention quota and retry request") })
+    public void setBacklogQuota(@Suspended final AsyncResponse asyncResponse, @PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic,
+            @QueryParam("backlogQuotaType") BacklogQuota.BacklogQuotaType backlogQuotaType, BacklogQuota backlogQuota) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        internalSetBacklogQuota(asyncResponse, backlogQuotaType, backlogQuota);
+    }
+
+    @DELETE
+    @Path("/{tenant}/{namespace}/{topic}/backlogQuota")
+    @ApiOperation(value = "Remove a backlog quota policy from a topic.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Topic does not exist"),
+            @ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry"),
+            @ApiResponse(code = 409, message = "Concurrent modification") })
+    public void removeBacklogQuota(@Suspended final AsyncResponse asyncResponse, @PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic,
+            @QueryParam("backlogQuotaType") BacklogQuota.BacklogQuotaType backlogQuotaType) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        internalRemoveBacklogQuota(asyncResponse, backlogQuotaType);
+    }
+
     @POST
     @Path("/{tenant}/{namespace}/{topic}/terminate")
     @ApiOperation(value = "Terminate a topic. A topic that is terminated will not accept any more "
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
index fd47425..8eafa5d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.service;
 
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
 import org.apache.bookkeeper.mledger.ManagedCursor;
@@ -32,9 +33,8 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
-import org.apache.pulsar.common.policies.data.BacklogQuota.RetentionPolicy;
+import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.util.FutureUtil;
-import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
 import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,12 +47,17 @@ public class BacklogQuotaManager {
     private static final Logger log = LoggerFactory.getLogger(BacklogQuotaManager.class);
     private final BacklogQuota defaultQuota;
     private final ZooKeeperDataCache<Policies> zkCache;
+    private final PulsarService pulsar;
+    private final boolean isTopicLevelPoliciesEnable;
+
 
     public BacklogQuotaManager(PulsarService pulsar) {
+        this.isTopicLevelPoliciesEnable = pulsar.getConfiguration().isTopicLevelPoliciesEnabled();
         this.defaultQuota = new BacklogQuota(
                 pulsar.getConfiguration().getBacklogQuotaDefaultLimitGB() * 1024 * 1024 * 1024,
                 pulsar.getConfiguration().getBacklogQuotaDefaultRetentionPolicy());
         this.zkCache = pulsar.getConfigurationCache().policiesCache();
+        this.pulsar = pulsar;
     }
 
     public BacklogQuota getDefaultQuota() {
@@ -70,9 +75,25 @@ public class BacklogQuotaManager {
         }
     }
 
-    public long getBacklogQuotaLimit(String namespace) {
-        String policyPath = AdminResource.path(POLICIES, namespace);
-        return getBacklogQuota(namespace, policyPath).getLimit();
+    public BacklogQuota getBacklogQuota(TopicName topicName) {
+        String policyPath = AdminResource.path(POLICIES, topicName.getNamespace());
+        if (!isTopicLevelPoliciesEnable) {
+            return getBacklogQuota(topicName.getNamespace(),policyPath);
+        }
+
+        try {
+            return Optional.ofNullable(pulsar.getTopicPoliciesService().getTopicPolicies(topicName))
+                    .map(TopicPolicies::getBackLogQuotaMap)
+                    .map(map -> map.get(BacklogQuotaType.destination_storage.name()))
+                    .orElseGet(() -> getBacklogQuota(topicName.getNamespace(),policyPath));
+        } catch (Exception e) {
+            log.error("Failed to read policies data, will apply the default backlog quota: topicName={}", topicName, e);
+        }
+        return getBacklogQuota(topicName.getNamespace(),policyPath);
+    }
+
+    public long getBacklogQuotaLimit(TopicName topicName) {
+        return getBacklogQuota(topicName).getLimit();
     }
 
     /**
@@ -83,10 +104,7 @@ public class BacklogQuotaManager {
      */
     public void handleExceededBacklogQuota(PersistentTopic persistentTopic) {
         TopicName topicName = TopicName.get(persistentTopic.getName());
-        String namespace = topicName.getNamespace();
-        String policyPath = AdminResource.path(POLICIES, namespace);
-
-        BacklogQuota quota = getBacklogQuota(namespace, policyPath);
+        BacklogQuota quota = getBacklogQuota(topicName);
         log.info("Backlog quota exceeded for topic [{}]. Applying [{}] policy", persistentTopic.getName(),
                 quota.getPolicy());
         switch (quota.getPolicy()) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 377dc98..eac2380 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1847,11 +1847,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
     @Override
     public BacklogQuota getBacklogQuota() {
         TopicName topicName = TopicName.get(this.getName());
-        String namespace = topicName.getNamespace();
-        String policyPath = AdminResource.path(POLICIES, namespace);
-
-        BacklogQuota backlogQuota = brokerService.getBacklogQuotaManager().getBacklogQuota(namespace, policyPath);
-        return backlogQuota;
+        return brokerService.getBacklogQuotaManager().getBacklogQuota(topicName);
     }
 
     /**
@@ -1882,7 +1878,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
      */
     public boolean isBacklogExceeded() {
         TopicName topicName = TopicName.get(getName());
-        long backlogQuotaLimitInBytes = brokerService.getBacklogQuotaManager().getBacklogQuotaLimit(topicName.getNamespace());
+        long backlogQuotaLimitInBytes = brokerService.getBacklogQuotaManager().getBacklogQuotaLimit(topicName);
         if (backlogQuotaLimitInBytes < 0) {
             return false;
         }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicBacklogQuotaDisableTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicBacklogQuotaDisableTest.java
new file mode 100644
index 0000000..0f8b2cb
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicBacklogQuotaDisableTest.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import com.google.common.collect.Sets;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.service.BacklogQuotaManager;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.BacklogQuota;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class TopicBacklogQuotaDisableTest extends MockedPulsarServiceBaseTest {
+
+    private static final Logger LOG = LoggerFactory.getLogger(TopicBacklogQuotaDisableTest.class);
+
+    private final String testTenant = "my-tenant";
+
+    private final String testNamespace = "my-namespace";
+
+    private final String myNamespace = testTenant + "/" + testNamespace;
+
+    private final String backlogQuotaTopic = "persistent://" + myNamespace + "/test-set-backlog-quota";
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        this.conf.setSystemTopicEnabled(true);
+        this.conf.setTopicLevelPoliciesEnabled(false);
+        super.internalSetup();
+
+        admin.clusters().createCluster("test", new ClusterData(pulsar.getWebServiceAddress()));
+        TenantInfo tenantInfo = new TenantInfo(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"));
+        admin.tenants().createTenant(this.testTenant, tenantInfo);
+        admin.namespaces().createNamespace(testTenant + "/" + testNamespace, Sets.newHashSet("test"));
+        admin.topics().createPartitionedTopic(backlogQuotaTopic, 2);
+    }
+
+    @AfterMethod
+    @Override
+    public void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testBacklogQuotaDisabled() throws Exception {
+        BacklogQuota backlogQuota = new BacklogQuota(1024, BacklogQuota.RetentionPolicy.consumer_backlog_eviction);
+        log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, backlogQuotaTopic);
+
+        try {
+            admin.topics().setBacklogQuota(backlogQuotaTopic, backlogQuota);
+            Assert.fail();
+        } catch (PulsarAdminException e) {
+            Assert.assertEquals(e.getStatusCode(), 405);
+        }
+
+        try {
+            admin.topics().removeBacklogQuota(backlogQuotaTopic);
+            Assert.fail();
+        } catch (PulsarAdminException e) {
+            Assert.assertEquals(e.getStatusCode(), 405);
+        }
+
+        try {
+            admin.topics().getBacklogQuotaMap(backlogQuotaTopic);
+            Assert.fail();
+        } catch (PulsarAdminException e) {
+            Assert.assertEquals(e.getStatusCode(), 405);
+        }
+    }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicBacklogQuotaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicBacklogQuotaTest.java
new file mode 100644
index 0000000..1104ab6
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicBacklogQuotaTest.java
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import com.google.common.collect.Sets;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.service.BacklogQuotaManager;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.BacklogQuota;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class TopicBacklogQuotaTest extends MockedPulsarServiceBaseTest {
+
+    private static final Logger LOG = LoggerFactory.getLogger(TopicBacklogQuotaTest.class);
+
+    private final String testTenant = "my-tenant";
+
+    private final String testNamespace = "my-namespace";
+
+    private final String myNamespace = testTenant + "/" + testNamespace;
+
+    private final String backlogQuotaTopic = "persistent://" + myNamespace + "/test-set-backlog-quota";
+
+    public void disableTopicLevelPolicies() throws Exception {
+        this.conf.setSystemTopicEnabled(true);
+        this.conf.setTopicLevelPoliciesEnabled(false);
+        super.internalSetup();
+
+        admin.clusters().createCluster("test", new ClusterData(pulsar.getWebServiceAddress()));
+        TenantInfo tenantInfo = new TenantInfo(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"));
+        admin.tenants().createTenant(this.testTenant, tenantInfo);
+        admin.namespaces().createNamespace(testTenant + "/" + testNamespace, Sets.newHashSet("test"));
+    }
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        this.conf.setSystemTopicEnabled(true);
+        this.conf.setTopicLevelPoliciesEnabled(true);
+        super.internalSetup();
+
+        admin.clusters().createCluster("test", new ClusterData(pulsar.getWebServiceAddress()));
+        TenantInfo tenantInfo = new TenantInfo(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"));
+        admin.tenants().createTenant(this.testTenant, tenantInfo);
+        admin.namespaces().createNamespace(testTenant + "/" + testNamespace, Sets.newHashSet("test"));
+        admin.topics().createPartitionedTopic(backlogQuotaTopic, 2);
+    }
+
+    @AfterMethod
+    @Override
+    public void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testSetBacklogQuota() throws Exception {
+
+        BacklogQuota backlogQuota = new BacklogQuota(1024, BacklogQuota.RetentionPolicy.consumer_backlog_eviction);
+        log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, backlogQuotaTopic);
+
+        admin.topics().setBacklogQuota(backlogQuotaTopic, backlogQuota);
+        log.info("Backlog quota set success on topic: {}", backlogQuotaTopic);
+
+        Thread.sleep(3000);
+        BacklogQuota getBacklogQuota = admin.topics().getBacklogQuotaMap(backlogQuotaTopic)
+                .get(BacklogQuota.BacklogQuotaType.destination_storage);
+        log.info("Backlog quota {} get on topic: {}", getBacklogQuota, backlogQuotaTopic);
+        Assert.assertEquals(getBacklogQuota, backlogQuota);
+
+        BacklogQuotaManager backlogQuotaManager = pulsar.getBrokerService().getBacklogQuotaManager();
+        BacklogQuota backlogQuotaInManager = backlogQuotaManager.getBacklogQuota(TopicName.get(backlogQuotaTopic));
+        log.info("Backlog quota {} in backlog quota manager on topic: {}", backlogQuotaInManager, backlogQuotaTopic);
+        Assert.assertEquals(backlogQuotaInManager, backlogQuota);
+
+        admin.topics().deletePartitionedTopic(backlogQuotaTopic, true);
+    }
+
+    @Test
+    public void testRemoveBacklogQuota() throws Exception {
+        BacklogQuota backlogQuota = new BacklogQuota(1024, BacklogQuota.RetentionPolicy.consumer_backlog_eviction);
+        log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, backlogQuotaTopic);
+        admin.topics().setBacklogQuota(backlogQuotaTopic, backlogQuota);
+        log.info("Backlog quota set success on topic: {}", backlogQuotaTopic);
+
+        Thread.sleep(3000);
+        BacklogQuota getBacklogQuota = admin.topics().getBacklogQuotaMap(backlogQuotaTopic)
+                .get(BacklogQuota.BacklogQuotaType.destination_storage);
+        log.info("Backlog quota {} get on topic: {}", getBacklogQuota, backlogQuotaTopic);
+        Assert.assertEquals(backlogQuota, getBacklogQuota);
+
+        BacklogQuotaManager backlogQuotaManager = pulsar.getBrokerService().getBacklogQuotaManager();
+        BacklogQuota backlogQuotaInManager = backlogQuotaManager.getBacklogQuota(TopicName.get(backlogQuotaTopic));
+        log.info("Backlog quota {} in backlog quota manager on topic: {}", backlogQuotaInManager, backlogQuotaTopic);
+        Assert.assertEquals(backlogQuota, backlogQuotaInManager);
+
+        admin.topics().removeBacklogQuota(backlogQuotaTopic);
+        getBacklogQuota = admin.topics().getBacklogQuotaMap(backlogQuotaTopic)
+                .get(BacklogQuota.BacklogQuotaType.destination_storage);
+        log.info("Backlog quota {} get on topic: {} after remove", getBacklogQuota, backlogQuotaTopic);
+        Assert.assertNull(getBacklogQuota);
+
+        backlogQuotaInManager = backlogQuotaManager.getBacklogQuota(TopicName.get(backlogQuotaTopic));
+        log.info("Backlog quota {} in backlog quota manager on topic: {} after remove", backlogQuotaInManager,
+                backlogQuotaTopic);
+        Assert.assertEquals(backlogQuotaManager.getDefaultQuota(), backlogQuotaInManager);
+
+        admin.topics().deletePartitionedTopic(backlogQuotaTopic, true);
+    }
+
+    @Test
+    public void testCheckQuota() throws Exception {
+        RetentionPolicies retentionPolicies = new RetentionPolicies(10, 10);
+        String namespace = TopicName.get(backlogQuotaTopic).getNamespace();
+        admin.namespaces().setRetention(namespace, retentionPolicies);
+
+        BacklogQuota backlogQuota =
+                new BacklogQuota(10 * 1024 * 1024, BacklogQuota.RetentionPolicy.consumer_backlog_eviction);
+        log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, backlogQuotaTopic);
+        try {
+            admin.topics().setBacklogQuota(backlogQuotaTopic, backlogQuota);
+            Assert.fail();
+        } catch (PulsarAdminException e) {
+            Assert.assertEquals(e.getStatusCode(), 412);
+        }
+        Thread.sleep(3000);
+        backlogQuota =
+                new BacklogQuota(10 * 1024 * 1024 + 1, BacklogQuota.RetentionPolicy.consumer_backlog_eviction);
+        log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, backlogQuotaTopic);
+        try {
+            admin.topics().setBacklogQuota(backlogQuotaTopic, backlogQuota);
+            Assert.fail();
+        } catch (PulsarAdminException e) {
+            Assert.assertEquals(e.getStatusCode(), 412);
+        }
+        Thread.sleep(3000);
+        backlogQuota =
+                new BacklogQuota(10 * 1024 * 1024 - 1, BacklogQuota.RetentionPolicy.consumer_backlog_eviction);
+        log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, backlogQuotaTopic);
+        admin.topics().setBacklogQuota(backlogQuotaTopic, backlogQuota);
+        Thread.sleep(3000);
+        BacklogQuota getBacklogQuota = admin.topics().getBacklogQuotaMap(backlogQuotaTopic)
+                .get(BacklogQuota.BacklogQuotaType.destination_storage);
+        log.info("Backlog quota {} get on topic: {} after remove", getBacklogQuota, backlogQuotaTopic);
+        Assert.assertEquals(getBacklogQuota, backlogQuota);
+
+        admin.topics().deletePartitionedTopic(backlogQuotaTopic, true);
+    }
+
+    @Test
+    public void testBacklogQuotaDisabled() throws Exception {
+        disableTopicLevelPolicies();
+        admin.topics().createPartitionedTopic(backlogQuotaTopic, 2);
+
+        BacklogQuota backlogQuota = new BacklogQuota(1024, BacklogQuota.RetentionPolicy.consumer_backlog_eviction);
+        log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, backlogQuotaTopic);
+
+        try {
+            admin.topics().setBacklogQuota(backlogQuotaTopic, backlogQuota);
+            Assert.fail();
+        } catch (PulsarAdminException e) {
+            Assert.assertEquals(e.getStatusCode(), 405);
+        }
+
+        try {
+            admin.topics().removeBacklogQuota(backlogQuotaTopic);
+            Assert.fail();
+        } catch (PulsarAdminException e) {
+            Assert.assertEquals(e.getStatusCode(), 405);
+        }
+
+        try {
+            admin.topics().getBacklogQuotaMap(backlogQuotaTopic);
+            Assert.fail();
+        } catch (PulsarAdminException e) {
+            Assert.assertEquals(e.getStatusCode(), 405);
+        }
+    }
+}
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
index f4e997d..bf6ac82 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -34,6 +34,7 @@ import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.PartitionedTopicInternalStats;
 import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
@@ -1354,4 +1355,82 @@ public interface Topics {
      * @return
      */
     CompletableFuture<MessageId> getLastMessageIdAsync(String topic);
+
+    /**
+     * Get backlog quota map for a topic.
+     * Response example:
+     *
+     * <pre>
+     * <code>
+     *  {
+     *      "namespace_memory" : {
+     *          "limit" : "134217728",
+     *          "policy" : "consumer_backlog_eviction"
+     *      },
+     *      "destination_storage" : {
+     *          "limit" : "-1",
+     *          "policy" : "producer_exception"
+     *      }
+     *  }
+     * </code>
+     * </pre>
+     *
+     * @param topic
+     *            Topic name
+     *
+     * @throws NotAuthorizedException
+     *             Permission denied
+     * @throws NotFoundException
+     *             Topic does not exist
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    Map<BacklogQuota.BacklogQuotaType, BacklogQuota> getBacklogQuotaMap(String topic) throws PulsarAdminException;
+
+    /**
+     * Set a backlog quota for a topic.
+     * The backlog quota can be set on this resource:
+     *
+     * <p>
+     * Request parameter example:
+     *</p>
+     *
+     * <pre>
+     * <code>
+     * {
+     *     "limit" : "134217728",
+     *     "policy" : "consumer_backlog_eviction"
+     * }
+     * </code>
+     * </pre>
+     *
+     * @param topic
+     *            Topic name
+     * @param backlogQuota
+     *            the new BacklogQuota
+     *
+     * @throws NotAuthorizedException
+     *             Don't have admin permission
+     * @throws NotFoundException
+     *             Topic does not exist
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void setBacklogQuota(String topic, BacklogQuota backlogQuota) throws PulsarAdminException;
+
+    /**
+     * Remove a backlog quota policy from a topic.
+     * The namespace backlog policy will fall back to the default.
+     *
+     * @param topic
+     *            Topic name
+     *
+     * @throws NotAuthorizedException
+     *             Don't have admin permission
+     * @throws NotFoundException
+     *             Topic does not exist
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void removeBacklogQuota(String topic) throws PulsarAdminException;
 }
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index f6d4fae..b5ac144 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -45,6 +45,7 @@ import java.util.stream.Stream;
 import javax.ws.rs.client.Entity;
 import javax.ws.rs.client.InvocationCallback;
 import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.core.GenericType;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.MultivaluedMap;
 import javax.ws.rs.core.Response;
@@ -69,6 +70,8 @@ import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.BacklogQuota;
+import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
 import org.apache.pulsar.common.policies.data.ErrorData;
 import org.apache.pulsar.common.policies.data.PartitionedTopicInternalStats;
 import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
@@ -1391,5 +1394,40 @@ public class TopicsImpl extends BaseResource implements Topics {
         return future;
     }
 
+    @Override
+    public Map<BacklogQuotaType, BacklogQuota> getBacklogQuotaMap(String topic) throws PulsarAdminException {
+        try {
+            TopicName tn = validateTopic(topic);
+            WebTarget path = topicPath(tn, "backlogQuotaMap");
+            return request(path).get(new GenericType<Map<BacklogQuotaType, BacklogQuota>>() {
+            });
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public void setBacklogQuota(String topic, BacklogQuota backlogQuota) throws PulsarAdminException {
+        try {
+            TopicName tn = validateTopic(topic);
+            WebTarget path = topicPath(tn, "backlogQuota");
+            request(path).post(Entity.entity(backlogQuota, MediaType.APPLICATION_JSON), ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public void removeBacklogQuota(String topic) throws PulsarAdminException {
+        try {
+            TopicName tn = validateTopic(topic);
+            WebTarget path = topicPath(tn, "backlogQuota");
+            request(path.queryParam("backlogQuotaType", BacklogQuotaType.destination_storage.toString()))
+                    .delete(ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
     private static final Logger log = LoggerFactory.getLogger(TopicsImpl.class);
 }
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index 733a31d..4f2e399 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.admin.cli;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 
 import com.beust.jcommander.Parameter;
+import com.beust.jcommander.ParameterException;
 import com.beust.jcommander.Parameters;
 import com.beust.jcommander.converters.CommaParameterSplitter;
 import com.google.common.collect.Lists;
@@ -32,6 +33,7 @@ import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufUtil;
 import io.netty.buffer.Unpooled;
 
+import java.util.Arrays;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
@@ -45,6 +47,7 @@ import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
 import org.apache.pulsar.common.util.RelativeTimeUtil;
 
@@ -98,6 +101,9 @@ public class CmdTopics extends CmdBase {
         jcommander.addCommand("offload", new Offload());
         jcommander.addCommand("offload-status", new OffloadStatusCmd());
         jcommander.addCommand("last-message-id", new GetLastMessageId());
+        jcommander.addCommand("get-backlog-quotas", new GetBacklogQuotaMap());
+        jcommander.addCommand("set-backlog-quota", new SetBacklogQuota());
+        jcommander.addCommand("remove-backlog-quota", new RemoveBacklogQuota());
     }
 
     @Parameters(commandDescription = "Get the list of topics under a namespace.")
@@ -818,4 +824,60 @@ public class CmdTopics extends CmdBase {
             print(topics.getLastMessageId(persistentTopic));
         }
     }
+
+    @Parameters(commandDescription = "Get the backlog quota policies for a topic")
+    private class GetBacklogQuotaMap extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            print(admin.topics().getBacklogQuotaMap(persistentTopic));
+        }
+    }
+
+    @Parameters(commandDescription = "Set a backlog quota policy for a topic")
+    private class SetBacklogQuota extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "-l", "--limit" }, description = "Size limit (eg: 10M, 16G)", required = true)
+        private String limitStr;
+
+        @Parameter(names = { "-p", "--policy" }, description = "Retention policy to enforce when the limit is reached. "
+                + "Valid options are: [producer_request_hold, producer_exception, consumer_backlog_eviction]", required = true)
+        private String policyStr;
+
+        @Override
+        void run() throws PulsarAdminException {
+            BacklogQuota.RetentionPolicy policy;
+            long limit;
+
+            try {
+                policy = BacklogQuota.RetentionPolicy.valueOf(policyStr);
+            } catch (IllegalArgumentException e) {
+                throw new ParameterException(String.format("Invalid retention policy type '%s'. Valid options are: %s",
+                        policyStr, Arrays.toString(BacklogQuota.RetentionPolicy.values())));
+            }
+
+            limit = validateSizeString(limitStr);
+
+            String persistentTopic = validatePersistentTopic(params);
+            admin.topics().setBacklogQuota(persistentTopic, new BacklogQuota(limit, policy));
+        }
+    }
+
+    @Parameters(commandDescription = "Remove a backlog quota policy from a topic")
+    private class RemoveBacklogQuota extends CliCommand {
+
+        @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            admin.topics().removeBacklogQuota(persistentTopic);
+        }
+    }
 }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
index 56fbf83..f9fcaf8 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
@@ -37,7 +37,7 @@ import lombok.NoArgsConstructor;
 @AllArgsConstructor
 public class TopicPolicies {
 
-    private Map<String, BacklogQuota> backLogQuotaMap =  Maps.newHashMap();
+    private Map<String, BacklogQuota> backLogQuotaMap = Maps.newHashMap();
     private PersistencePolicies persistence = null;
     private RetentionPolicies retentionPolicies = null;
     private Boolean deduplicationEnabled = null;