You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/12/13 17:51:10 UTC

[pulsar] branch master updated: Always return the backlog quota for a namespace in rest response (#3164)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 355b754  Always return the backlog quota for a namespace in rest response (#3164)
355b754 is described below

commit 355b754e2cd3d1a054c786757c1affb117561ce8
Author: cckellogg <cc...@gmail.com>
AuthorDate: Thu Dec 13 09:51:05 2018 -0800

    Always return the backlog quota for a namespace in rest response (#3164)
    
    * Always return the backlog storage quota for a namespace in rest response.
    
    * Attach other defaults if not set to namespace policies.
    
    * Fix unit tests.
    
    * Fix broken test.
---
 .../apache/pulsar/broker/admin/AdminResource.java  | 72 +++++++++++++++++++++-
 .../org/apache/pulsar/broker/ConfigHelper.java     | 67 ++++++++++++++++++++
 .../apache/pulsar/broker/admin/AdminApiTest.java   | 22 ++++---
 .../pulsar/broker/admin/v1/V1_AdminApiTest.java    | 22 ++++---
 .../broker/service/BacklogQuotaManagerTest.java    | 28 ++++++---
 .../pulsar/common/policies/data/Policies.java      |  7 +++
 6 files changed, 194 insertions(+), 24 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 79e21d4..a76d25e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -37,6 +37,7 @@ import javax.ws.rs.core.UriBuilder;
 
 import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
 import org.apache.pulsar.broker.web.PulsarWebResource;
 import org.apache.pulsar.broker.web.RestException;
@@ -48,11 +49,14 @@ import org.apache.pulsar.common.naming.NamespaceBundleFactory;
 import org.apache.pulsar.common.naming.NamespaceBundles;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.BundlesData;
 import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.DispatchRate;
 import org.apache.pulsar.common.policies.data.FailureDomain;
 import org.apache.pulsar.common.policies.data.LocalPolicies;
 import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.SubscribeRate;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
 import org.apache.pulsar.common.util.Codec;
@@ -304,13 +308,19 @@ public abstract class AdminResource extends PulsarWebResource {
 
     protected Policies getNamespacePolicies(NamespaceName namespaceName) {
         try {
-            Policies policies = policiesCache().get(AdminResource.path(POLICIES, namespaceName.toString()))
+            final String namespace = namespaceName.toString();
+            final String policyPath = AdminResource.path(POLICIES, namespace);
+            Policies policies = policiesCache().get(policyPath)
                     .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Namespace does not exist"));
             // fetch bundles from LocalZK-policies
             NamespaceBundles bundles = pulsar().getNamespaceService().getNamespaceBundleFactory()
                     .getBundles(namespaceName);
             BundlesData bundleData = NamespaceBundleFactory.getBundlesData(bundles);
             policies.bundles = bundleData != null ? bundleData : policies.bundles;
+
+            // hydrate the namespace polices
+            mergeNamespaceWithDefaults(policies, namespace, policyPath);
+
             return policies;
         } catch (RestException re) {
             throw re;
@@ -320,6 +330,66 @@ public abstract class AdminResource extends PulsarWebResource {
         }
     }
 
+    protected void mergeNamespaceWithDefaults(Policies policies, String namespace, String namespacePath) {
+        if (policies.backlog_quota_map.isEmpty()) {
+            Policies.setStorageQuota(policies, namespaceBacklogQuota(namespace, namespacePath));
+        }
+
+        final ServiceConfiguration config = pulsar().getConfiguration();
+        if (policies.max_producers_per_topic < 1) {
+            policies.max_producers_per_topic = config.getMaxProducersPerTopic();
+        }
+
+        if (policies.max_consumers_per_topic < 1) {
+            policies.max_consumers_per_topic = config.getMaxConsumersPerTopic();
+        }
+
+        if (policies.max_consumers_per_subscription < 1) {
+            policies.max_consumers_per_subscription = config.getMaxConsumersPerSubscription();
+        }
+
+        final String cluster = config.getClusterName();
+        // attach default dispatch rate polices
+        if (policies.clusterDispatchRate.isEmpty()) {
+            policies.clusterDispatchRate.put(cluster, dispatchRate());
+        }
+
+        if (policies.subscriptionDispatchRate.isEmpty()) {
+            policies.subscriptionDispatchRate.put(cluster, subscriptionDispatchRate());
+        }
+
+        if (policies.clusterSubscribeRate.isEmpty()) {
+            policies.clusterSubscribeRate.put(cluster, subscribeRate());
+        }
+    }
+
+    protected BacklogQuota namespaceBacklogQuota(String namespace, String namespacePath) {
+        return pulsar().getBrokerService().getBacklogQuotaManager().getBacklogQuota(namespace, namespacePath);
+    }
+
+    protected DispatchRate dispatchRate() {
+        return new DispatchRate(
+                pulsar().getConfiguration().getDispatchThrottlingRatePerTopicInMsg(),
+                pulsar().getConfiguration().getDispatchThrottlingRatePerTopicInByte(),
+                1
+        );
+    }
+
+    protected DispatchRate subscriptionDispatchRate() {
+        return new DispatchRate(
+                pulsar().getConfiguration().getDispatchThrottlingRatePerSubscriptionInMsg(),
+                pulsar().getConfiguration().getDispatchThrottlingRatePerSubscribeInByte(),
+                1
+        );
+    }
+
+    protected SubscribeRate subscribeRate() {
+        return new SubscribeRate(
+                pulsar().getConfiguration().getSubscribeThrottlingRatePerConsumer(),
+                pulsar().getConfiguration().getSubscribeRatePeriodPerConsumerInSecond()
+        );
+    }
+
     public static ObjectMapper jsonMapper() {
         return ObjectMapperFactory.getThreadLocal();
     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/ConfigHelper.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/ConfigHelper.java
new file mode 100644
index 0000000..020d7b3
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/ConfigHelper.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker;
+
+import org.apache.pulsar.common.policies.data.BacklogQuota;
+import org.apache.pulsar.common.policies.data.DispatchRate;
+import org.apache.pulsar.common.policies.data.SubscribeRate;
+
+import java.util.Collections;
+import java.util.Map;
+
+public class ConfigHelper {
+    private ConfigHelper() {}
+
+
+    public static Map<BacklogQuota.BacklogQuotaType, BacklogQuota> backlogQuotaMap(ServiceConfiguration configuration) {
+        return Collections.singletonMap(BacklogQuota.BacklogQuotaType.destination_storage,
+                backlogQuota(configuration));
+    }
+
+    public static BacklogQuota backlogQuota(ServiceConfiguration configuration) {
+        return new BacklogQuota(
+                configuration.getBacklogQuotaDefaultLimitGB() * 1024 * 1024 * 1024,
+                configuration.getBacklogQuotaDefaultRetentionPolicy()
+        );
+    }
+
+    public static DispatchRate dispatchRate(ServiceConfiguration configuration) {
+        return new DispatchRate(
+                configuration.getDispatchThrottlingRatePerTopicInMsg(),
+                configuration.getDispatchThrottlingRatePerTopicInByte(),
+                1
+        );
+    }
+
+    public static DispatchRate subscriptionDispatchRate(ServiceConfiguration configuration) {
+        return new DispatchRate(
+                configuration.getDispatchThrottlingRatePerSubscriptionInMsg(),
+                configuration.getDispatchThrottlingRatePerSubscribeInByte(),
+                1
+        );
+    }
+
+    public static SubscribeRate subscribeRate(ServiceConfiguration configuration) {
+        return new SubscribeRate(
+                configuration.getSubscribeThrottlingRatePerConsumer(),
+                configuration.getSubscribeRatePeriodPerConsumerInSecond()
+        );
+    }
+
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 01160fe..a6eccfc 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -57,6 +57,7 @@ import javax.ws.rs.core.Response.Status;
 
 import lombok.extern.slf4j.Slf4j;
 
+import org.apache.pulsar.broker.ConfigHelper;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
@@ -633,6 +634,12 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
         policies.auth_policies.namespace_auth.put("spiffe://developer/passport-role", EnumSet.allOf(AuthAction.class));
         policies.auth_policies.namespace_auth.put("my-role", EnumSet.allOf(AuthAction.class));
 
+        // set default quotas on namespace
+        Policies.setStorageQuota(policies, ConfigHelper.backlogQuota(conf));
+        policies.clusterDispatchRate.put("test", ConfigHelper.dispatchRate(conf));
+        policies.subscriptionDispatchRate.put("test", ConfigHelper.subscriptionDispatchRate(conf));
+        policies.clusterSubscribeRate.put("test", ConfigHelper.subscribeRate(conf));
+
         assertEquals(admin.namespaces().getPolicies("prop-xyz/ns1"), policies);
         assertEquals(admin.namespaces().getPermissions("prop-xyz/ns1"), policies.auth_policies.namespace_auth);
 
@@ -1332,24 +1339,25 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
 
     @Test
     public void backlogQuotas() throws Exception {
-        assertEquals(admin.namespaces().getBacklogQuotaMap("prop-xyz/ns1"), Maps.newTreeMap());
+        assertEquals(admin.namespaces().getBacklogQuotaMap("prop-xyz/ns1"),
+                ConfigHelper.backlogQuotaMap(conf));
 
         Map<BacklogQuotaType, BacklogQuota> quotaMap = admin.namespaces().getBacklogQuotaMap("prop-xyz/ns1");
-        assertEquals(quotaMap.size(), 0);
-        assertEquals(quotaMap.get(BacklogQuotaType.destination_storage), null);
+        assertEquals(quotaMap.size(), 1);
+        assertEquals(quotaMap.get(BacklogQuotaType.destination_storage), ConfigHelper.backlogQuota(conf));
 
         admin.namespaces().setBacklogQuota("prop-xyz/ns1",
-                new BacklogQuota(1 * 1024 * 1024 * 1024, RetentionPolicy.producer_exception));
+                new BacklogQuota(1 * 1024 * 1024, RetentionPolicy.producer_exception));
         quotaMap = admin.namespaces().getBacklogQuotaMap("prop-xyz/ns1");
         assertEquals(quotaMap.size(), 1);
         assertEquals(quotaMap.get(BacklogQuotaType.destination_storage),
-                new BacklogQuota(1 * 1024 * 1024 * 1024, RetentionPolicy.producer_exception));
+                new BacklogQuota(1 * 1024 * 1024, RetentionPolicy.producer_exception));
 
         admin.namespaces().removeBacklogQuota("prop-xyz/ns1");
 
         quotaMap = admin.namespaces().getBacklogQuotaMap("prop-xyz/ns1");
-        assertEquals(quotaMap.size(), 0);
-        assertEquals(quotaMap.get(BacklogQuotaType.destination_storage), null);
+        assertEquals(quotaMap.size(), 1);
+        assertEquals(quotaMap.get(BacklogQuotaType.destination_storage), ConfigHelper.backlogQuota(conf));
     }
 
     @Test
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
index 803f8af..744eb7b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
@@ -47,6 +47,7 @@ import java.util.concurrent.TimeUnit;
 import javax.ws.rs.client.InvocationCallback;
 import javax.ws.rs.client.WebTarget;
 
+import org.apache.pulsar.broker.ConfigHelper;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
@@ -628,6 +629,12 @@ public class V1_AdminApiTest extends MockedPulsarServiceBaseTest {
         policies.bundles = Policies.defaultBundle();
         policies.auth_policies.namespace_auth.put("my-role", EnumSet.allOf(AuthAction.class));
 
+        // set default quotas on namespace
+        Policies.setStorageQuota(policies, ConfigHelper.backlogQuota(conf));
+        policies.clusterDispatchRate.put("test", ConfigHelper.dispatchRate(conf));
+        policies.subscriptionDispatchRate.put("test", ConfigHelper.subscriptionDispatchRate(conf));
+        policies.clusterSubscribeRate.put("test", ConfigHelper.subscribeRate(conf));
+
         assertEquals(admin.namespaces().getPolicies("prop-xyz/use/ns1"), policies);
         assertEquals(admin.namespaces().getPermissions("prop-xyz/use/ns1"), policies.auth_policies.namespace_auth);
 
@@ -1324,24 +1331,25 @@ public class V1_AdminApiTest extends MockedPulsarServiceBaseTest {
 
     @Test
     public void backlogQuotas() throws Exception {
-        assertEquals(admin.namespaces().getBacklogQuotaMap("prop-xyz/use/ns1"), Maps.newTreeMap());
+        assertEquals(admin.namespaces().getBacklogQuotaMap("prop-xyz/use/ns1"),
+                ConfigHelper.backlogQuotaMap(conf));
 
         Map<BacklogQuotaType, BacklogQuota> quotaMap = admin.namespaces().getBacklogQuotaMap("prop-xyz/use/ns1");
-        assertEquals(quotaMap.size(), 0);
-        assertEquals(quotaMap.get(BacklogQuotaType.destination_storage), null);
+        assertEquals(quotaMap.size(), 1);
+        assertEquals(quotaMap.get(BacklogQuotaType.destination_storage), ConfigHelper.backlogQuota(conf));
 
         admin.namespaces().setBacklogQuota("prop-xyz/use/ns1",
-                new BacklogQuota(1 * 1024 * 1024 * 1024, RetentionPolicy.producer_exception));
+                new BacklogQuota(1 * 1024 * 1024, RetentionPolicy.producer_exception));
         quotaMap = admin.namespaces().getBacklogQuotaMap("prop-xyz/use/ns1");
         assertEquals(quotaMap.size(), 1);
         assertEquals(quotaMap.get(BacklogQuotaType.destination_storage),
-                new BacklogQuota(1 * 1024 * 1024 * 1024, RetentionPolicy.producer_exception));
+                new BacklogQuota(1 * 1024 * 1024, RetentionPolicy.producer_exception));
 
         admin.namespaces().removeBacklogQuota("prop-xyz/use/ns1");
 
         quotaMap = admin.namespaces().getBacklogQuotaMap("prop-xyz/use/ns1");
-        assertEquals(quotaMap.size(), 0);
-        assertEquals(quotaMap.get(BacklogQuotaType.destination_storage), null);
+        assertEquals(quotaMap.size(), 1);
+        assertEquals(quotaMap.get(BacklogQuotaType.destination_storage), ConfigHelper.backlogQuota(conf));
     }
 
     @Test
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
index 97155ee..a2fb3b2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
@@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.bookkeeper.test.PortManager;
+import org.apache.pulsar.broker.ConfigHelper;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -126,7 +127,8 @@ public class BacklogQuotaManagerTest {
 
     @Test
     public void testConsumerBacklogEviction() throws Exception {
-        assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"), Maps.newTreeMap());
+        assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"),
+                ConfigHelper.backlogQuotaMap(config));
         admin.namespaces().setBacklogQuota("prop/ns-quota",
                 new BacklogQuota(10 * 1024, BacklogQuota.RetentionPolicy.consumer_backlog_eviction));
         PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()).statsInterval(0, TimeUnit.SECONDS)
@@ -157,7 +159,8 @@ public class BacklogQuotaManagerTest {
 
     @Test
     public void testConsumerBacklogEvictionWithAck() throws Exception {
-        assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"), Maps.newTreeMap());
+        assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"),
+                ConfigHelper.backlogQuotaMap(config));
         admin.namespaces().setBacklogQuota("prop/ns-quota",
                 new BacklogQuota(10 * 1024, BacklogQuota.RetentionPolicy.consumer_backlog_eviction));
         PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()).build();
@@ -188,7 +191,8 @@ public class BacklogQuotaManagerTest {
 
     @Test
     public void testConcurrentAckAndEviction() throws Exception {
-        assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"), Maps.newTreeMap());
+        assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"),
+                ConfigHelper.backlogQuotaMap(config));
         admin.namespaces().setBacklogQuota("prop/ns-quota",
                 new BacklogQuota(10 * 1024, BacklogQuota.RetentionPolicy.consumer_backlog_eviction));
 
@@ -260,7 +264,8 @@ public class BacklogQuotaManagerTest {
 
     @Test
     public void testNoEviction() throws Exception {
-        assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"), Maps.newTreeMap());
+        assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"),
+                ConfigHelper.backlogQuotaMap(config));
         admin.namespaces().setBacklogQuota("prop/ns-quota",
                 new BacklogQuota(10 * 1024, BacklogQuota.RetentionPolicy.consumer_backlog_eviction));
 
@@ -325,7 +330,8 @@ public class BacklogQuotaManagerTest {
 
     @Test
     public void testEvictionMulti() throws Exception {
-        assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"), Maps.newTreeMap());
+        assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"),
+                ConfigHelper.backlogQuotaMap(config));
         admin.namespaces().setBacklogQuota("prop/ns-quota",
                 new BacklogQuota(15 * 1024, BacklogQuota.RetentionPolicy.consumer_backlog_eviction));
 
@@ -433,7 +439,8 @@ public class BacklogQuotaManagerTest {
 
     @Test
     public void testAheadProducerOnHold() throws Exception {
-        assertEquals(admin.namespaces().getBacklogQuotaMap("prop/quotahold"), Maps.newTreeMap());
+        assertEquals(admin.namespaces().getBacklogQuotaMap("prop/quotahold"),
+                ConfigHelper.backlogQuotaMap(config));
         admin.namespaces().setBacklogQuota("prop/quotahold",
                 new BacklogQuota(10 * 1024, BacklogQuota.RetentionPolicy.producer_request_hold));
         final PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString())
@@ -471,7 +478,8 @@ public class BacklogQuotaManagerTest {
 
     @Test
     public void testAheadProducerOnHoldTimeout() throws Exception {
-        assertEquals(admin.namespaces().getBacklogQuotaMap("prop/quotahold"), Maps.newTreeMap());
+        assertEquals(admin.namespaces().getBacklogQuotaMap("prop/quotahold"),
+                ConfigHelper.backlogQuotaMap(config));
         admin.namespaces().setBacklogQuota("prop/quotahold",
                 new BacklogQuota(10 * 1024, BacklogQuota.RetentionPolicy.producer_request_hold));
         final PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString())
@@ -505,7 +513,8 @@ public class BacklogQuotaManagerTest {
 
     @Test
     public void testProducerException() throws Exception {
-        assertEquals(admin.namespaces().getBacklogQuotaMap("prop/quotahold"), Maps.newTreeMap());
+        assertEquals(admin.namespaces().getBacklogQuotaMap("prop/quotahold"),
+                ConfigHelper.backlogQuotaMap(config));
         admin.namespaces().setBacklogQuota("prop/quotahold",
                 new BacklogQuota(10 * 1024, BacklogQuota.RetentionPolicy.producer_exception));
         final PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString())
@@ -541,7 +550,8 @@ public class BacklogQuotaManagerTest {
 
     @Test
     public void testProducerExceptionAndThenUnblock() throws Exception {
-        assertEquals(admin.namespaces().getBacklogQuotaMap("prop/quotahold"), Maps.newTreeMap());
+        assertEquals(admin.namespaces().getBacklogQuotaMap("prop/quotahold"),
+                ConfigHelper.backlogQuotaMap(config));
         admin.namespaces().setBacklogQuota("prop/quotahold",
                 new BacklogQuota(10 * 1024, BacklogQuota.RetentionPolicy.producer_exception));
         final PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString())
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
index 3d12545..e5b520a 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
@@ -103,6 +103,13 @@ public class Policies {
         return bundle;
     }
 
+    public static void setStorageQuota(Policies polices, BacklogQuota quota) {
+        if (polices == null) {
+            return;
+        }
+        polices.backlog_quota_map.put(BacklogQuota.BacklogQuotaType.destination_storage, quota);
+    }
+
     @Override
     public String toString() {
         return MoreObjects.toStringHelper(this).add("auth_policies", auth_policies)