You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/12/13 17:51:10 UTC
[pulsar] branch master updated: Always return the backlog quota for
a namespace in rest response (#3164)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 355b754 Always return the backlog quota for a namespace in rest response (#3164)
355b754 is described below
commit 355b754e2cd3d1a054c786757c1affb117561ce8
Author: cckellogg <cc...@gmail.com>
AuthorDate: Thu Dec 13 09:51:05 2018 -0800
Always return the backlog quota for a namespace in rest response (#3164)
* Always return the backlog storage quota for a namespace in rest response.
* Attach other defaults if not set to namespace policies.
* Fix unit tests.
* Fix broken test.
---
.../apache/pulsar/broker/admin/AdminResource.java | 72 +++++++++++++++++++++-
.../org/apache/pulsar/broker/ConfigHelper.java | 67 ++++++++++++++++++++
.../apache/pulsar/broker/admin/AdminApiTest.java | 22 ++++---
.../pulsar/broker/admin/v1/V1_AdminApiTest.java | 22 ++++---
.../broker/service/BacklogQuotaManagerTest.java | 28 ++++++---
.../pulsar/common/policies/data/Policies.java | 7 +++
6 files changed, 194 insertions(+), 24 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 79e21d4..a76d25e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -37,6 +37,7 @@ import javax.ws.rs.core.UriBuilder;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
@@ -48,11 +49,14 @@ import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.FailureDomain;
import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
import org.apache.pulsar.common.util.Codec;
@@ -304,13 +308,19 @@ public abstract class AdminResource extends PulsarWebResource {
protected Policies getNamespacePolicies(NamespaceName namespaceName) {
try {
- Policies policies = policiesCache().get(AdminResource.path(POLICIES, namespaceName.toString()))
+ final String namespace = namespaceName.toString();
+ final String policyPath = AdminResource.path(POLICIES, namespace);
+ Policies policies = policiesCache().get(policyPath)
.orElseThrow(() -> new RestException(Status.NOT_FOUND, "Namespace does not exist"));
// fetch bundles from LocalZK-policies
NamespaceBundles bundles = pulsar().getNamespaceService().getNamespaceBundleFactory()
.getBundles(namespaceName);
BundlesData bundleData = NamespaceBundleFactory.getBundlesData(bundles);
policies.bundles = bundleData != null ? bundleData : policies.bundles;
+
+ // hydrate the namespace polices
+ mergeNamespaceWithDefaults(policies, namespace, policyPath);
+
return policies;
} catch (RestException re) {
throw re;
@@ -320,6 +330,66 @@ public abstract class AdminResource extends PulsarWebResource {
}
}
+ protected void mergeNamespaceWithDefaults(Policies policies, String namespace, String namespacePath) {
+ if (policies.backlog_quota_map.isEmpty()) {
+ Policies.setStorageQuota(policies, namespaceBacklogQuota(namespace, namespacePath));
+ }
+
+ final ServiceConfiguration config = pulsar().getConfiguration();
+ if (policies.max_producers_per_topic < 1) {
+ policies.max_producers_per_topic = config.getMaxProducersPerTopic();
+ }
+
+ if (policies.max_consumers_per_topic < 1) {
+ policies.max_consumers_per_topic = config.getMaxConsumersPerTopic();
+ }
+
+ if (policies.max_consumers_per_subscription < 1) {
+ policies.max_consumers_per_subscription = config.getMaxConsumersPerSubscription();
+ }
+
+ final String cluster = config.getClusterName();
+ // attach default dispatch rate polices
+ if (policies.clusterDispatchRate.isEmpty()) {
+ policies.clusterDispatchRate.put(cluster, dispatchRate());
+ }
+
+ if (policies.subscriptionDispatchRate.isEmpty()) {
+ policies.subscriptionDispatchRate.put(cluster, subscriptionDispatchRate());
+ }
+
+ if (policies.clusterSubscribeRate.isEmpty()) {
+ policies.clusterSubscribeRate.put(cluster, subscribeRate());
+ }
+ }
+
+ protected BacklogQuota namespaceBacklogQuota(String namespace, String namespacePath) {
+ return pulsar().getBrokerService().getBacklogQuotaManager().getBacklogQuota(namespace, namespacePath);
+ }
+
+ protected DispatchRate dispatchRate() {
+ return new DispatchRate(
+ pulsar().getConfiguration().getDispatchThrottlingRatePerTopicInMsg(),
+ pulsar().getConfiguration().getDispatchThrottlingRatePerTopicInByte(),
+ 1
+ );
+ }
+
+ protected DispatchRate subscriptionDispatchRate() {
+ return new DispatchRate(
+ pulsar().getConfiguration().getDispatchThrottlingRatePerSubscriptionInMsg(),
+ pulsar().getConfiguration().getDispatchThrottlingRatePerSubscribeInByte(),
+ 1
+ );
+ }
+
+ protected SubscribeRate subscribeRate() {
+ return new SubscribeRate(
+ pulsar().getConfiguration().getSubscribeThrottlingRatePerConsumer(),
+ pulsar().getConfiguration().getSubscribeRatePeriodPerConsumerInSecond()
+ );
+ }
+
public static ObjectMapper jsonMapper() {
return ObjectMapperFactory.getThreadLocal();
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/ConfigHelper.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/ConfigHelper.java
new file mode 100644
index 0000000..020d7b3
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/ConfigHelper.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker;
+
+import org.apache.pulsar.common.policies.data.BacklogQuota;
+import org.apache.pulsar.common.policies.data.DispatchRate;
+import org.apache.pulsar.common.policies.data.SubscribeRate;
+
+import java.util.Collections;
+import java.util.Map;
+
+public class ConfigHelper {
+ private ConfigHelper() {}
+
+
+ public static Map<BacklogQuota.BacklogQuotaType, BacklogQuota> backlogQuotaMap(ServiceConfiguration configuration) {
+ return Collections.singletonMap(BacklogQuota.BacklogQuotaType.destination_storage,
+ backlogQuota(configuration));
+ }
+
+ public static BacklogQuota backlogQuota(ServiceConfiguration configuration) {
+ return new BacklogQuota(
+ configuration.getBacklogQuotaDefaultLimitGB() * 1024 * 1024 * 1024,
+ configuration.getBacklogQuotaDefaultRetentionPolicy()
+ );
+ }
+
+ public static DispatchRate dispatchRate(ServiceConfiguration configuration) {
+ return new DispatchRate(
+ configuration.getDispatchThrottlingRatePerTopicInMsg(),
+ configuration.getDispatchThrottlingRatePerTopicInByte(),
+ 1
+ );
+ }
+
+ public static DispatchRate subscriptionDispatchRate(ServiceConfiguration configuration) {
+ return new DispatchRate(
+ configuration.getDispatchThrottlingRatePerSubscriptionInMsg(),
+ configuration.getDispatchThrottlingRatePerSubscribeInByte(),
+ 1
+ );
+ }
+
+ public static SubscribeRate subscribeRate(ServiceConfiguration configuration) {
+ return new SubscribeRate(
+ configuration.getSubscribeThrottlingRatePerConsumer(),
+ configuration.getSubscribeRatePeriodPerConsumerInSecond()
+ );
+ }
+
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 01160fe..a6eccfc 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -57,6 +57,7 @@ import javax.ws.rs.core.Response.Status;
import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.ConfigHelper;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
@@ -633,6 +634,12 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
policies.auth_policies.namespace_auth.put("spiffe://developer/passport-role", EnumSet.allOf(AuthAction.class));
policies.auth_policies.namespace_auth.put("my-role", EnumSet.allOf(AuthAction.class));
+ // set default quotas on namespace
+ Policies.setStorageQuota(policies, ConfigHelper.backlogQuota(conf));
+ policies.clusterDispatchRate.put("test", ConfigHelper.dispatchRate(conf));
+ policies.subscriptionDispatchRate.put("test", ConfigHelper.subscriptionDispatchRate(conf));
+ policies.clusterSubscribeRate.put("test", ConfigHelper.subscribeRate(conf));
+
assertEquals(admin.namespaces().getPolicies("prop-xyz/ns1"), policies);
assertEquals(admin.namespaces().getPermissions("prop-xyz/ns1"), policies.auth_policies.namespace_auth);
@@ -1332,24 +1339,25 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
@Test
public void backlogQuotas() throws Exception {
- assertEquals(admin.namespaces().getBacklogQuotaMap("prop-xyz/ns1"), Maps.newTreeMap());
+ assertEquals(admin.namespaces().getBacklogQuotaMap("prop-xyz/ns1"),
+ ConfigHelper.backlogQuotaMap(conf));
Map<BacklogQuotaType, BacklogQuota> quotaMap = admin.namespaces().getBacklogQuotaMap("prop-xyz/ns1");
- assertEquals(quotaMap.size(), 0);
- assertEquals(quotaMap.get(BacklogQuotaType.destination_storage), null);
+ assertEquals(quotaMap.size(), 1);
+ assertEquals(quotaMap.get(BacklogQuotaType.destination_storage), ConfigHelper.backlogQuota(conf));
admin.namespaces().setBacklogQuota("prop-xyz/ns1",
- new BacklogQuota(1 * 1024 * 1024 * 1024, RetentionPolicy.producer_exception));
+ new BacklogQuota(1 * 1024 * 1024, RetentionPolicy.producer_exception));
quotaMap = admin.namespaces().getBacklogQuotaMap("prop-xyz/ns1");
assertEquals(quotaMap.size(), 1);
assertEquals(quotaMap.get(BacklogQuotaType.destination_storage),
- new BacklogQuota(1 * 1024 * 1024 * 1024, RetentionPolicy.producer_exception));
+ new BacklogQuota(1 * 1024 * 1024, RetentionPolicy.producer_exception));
admin.namespaces().removeBacklogQuota("prop-xyz/ns1");
quotaMap = admin.namespaces().getBacklogQuotaMap("prop-xyz/ns1");
- assertEquals(quotaMap.size(), 0);
- assertEquals(quotaMap.get(BacklogQuotaType.destination_storage), null);
+ assertEquals(quotaMap.size(), 1);
+ assertEquals(quotaMap.get(BacklogQuotaType.destination_storage), ConfigHelper.backlogQuota(conf));
}
@Test
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
index 803f8af..744eb7b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
@@ -47,6 +47,7 @@ import java.util.concurrent.TimeUnit;
import javax.ws.rs.client.InvocationCallback;
import javax.ws.rs.client.WebTarget;
+import org.apache.pulsar.broker.ConfigHelper;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
@@ -628,6 +629,12 @@ public class V1_AdminApiTest extends MockedPulsarServiceBaseTest {
policies.bundles = Policies.defaultBundle();
policies.auth_policies.namespace_auth.put("my-role", EnumSet.allOf(AuthAction.class));
+ // set default quotas on namespace
+ Policies.setStorageQuota(policies, ConfigHelper.backlogQuota(conf));
+ policies.clusterDispatchRate.put("test", ConfigHelper.dispatchRate(conf));
+ policies.subscriptionDispatchRate.put("test", ConfigHelper.subscriptionDispatchRate(conf));
+ policies.clusterSubscribeRate.put("test", ConfigHelper.subscribeRate(conf));
+
assertEquals(admin.namespaces().getPolicies("prop-xyz/use/ns1"), policies);
assertEquals(admin.namespaces().getPermissions("prop-xyz/use/ns1"), policies.auth_policies.namespace_auth);
@@ -1324,24 +1331,25 @@ public class V1_AdminApiTest extends MockedPulsarServiceBaseTest {
@Test
public void backlogQuotas() throws Exception {
- assertEquals(admin.namespaces().getBacklogQuotaMap("prop-xyz/use/ns1"), Maps.newTreeMap());
+ assertEquals(admin.namespaces().getBacklogQuotaMap("prop-xyz/use/ns1"),
+ ConfigHelper.backlogQuotaMap(conf));
Map<BacklogQuotaType, BacklogQuota> quotaMap = admin.namespaces().getBacklogQuotaMap("prop-xyz/use/ns1");
- assertEquals(quotaMap.size(), 0);
- assertEquals(quotaMap.get(BacklogQuotaType.destination_storage), null);
+ assertEquals(quotaMap.size(), 1);
+ assertEquals(quotaMap.get(BacklogQuotaType.destination_storage), ConfigHelper.backlogQuota(conf));
admin.namespaces().setBacklogQuota("prop-xyz/use/ns1",
- new BacklogQuota(1 * 1024 * 1024 * 1024, RetentionPolicy.producer_exception));
+ new BacklogQuota(1 * 1024 * 1024, RetentionPolicy.producer_exception));
quotaMap = admin.namespaces().getBacklogQuotaMap("prop-xyz/use/ns1");
assertEquals(quotaMap.size(), 1);
assertEquals(quotaMap.get(BacklogQuotaType.destination_storage),
- new BacklogQuota(1 * 1024 * 1024 * 1024, RetentionPolicy.producer_exception));
+ new BacklogQuota(1 * 1024 * 1024, RetentionPolicy.producer_exception));
admin.namespaces().removeBacklogQuota("prop-xyz/use/ns1");
quotaMap = admin.namespaces().getBacklogQuotaMap("prop-xyz/use/ns1");
- assertEquals(quotaMap.size(), 0);
- assertEquals(quotaMap.get(BacklogQuotaType.destination_storage), null);
+ assertEquals(quotaMap.size(), 1);
+ assertEquals(quotaMap.get(BacklogQuotaType.destination_storage), ConfigHelper.backlogQuota(conf));
}
@Test
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
index 97155ee..a2fb3b2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
@@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.bookkeeper.test.PortManager;
+import org.apache.pulsar.broker.ConfigHelper;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -126,7 +127,8 @@ public class BacklogQuotaManagerTest {
@Test
public void testConsumerBacklogEviction() throws Exception {
- assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"), Maps.newTreeMap());
+ assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"),
+ ConfigHelper.backlogQuotaMap(config));
admin.namespaces().setBacklogQuota("prop/ns-quota",
new BacklogQuota(10 * 1024, BacklogQuota.RetentionPolicy.consumer_backlog_eviction));
PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()).statsInterval(0, TimeUnit.SECONDS)
@@ -157,7 +159,8 @@ public class BacklogQuotaManagerTest {
@Test
public void testConsumerBacklogEvictionWithAck() throws Exception {
- assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"), Maps.newTreeMap());
+ assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"),
+ ConfigHelper.backlogQuotaMap(config));
admin.namespaces().setBacklogQuota("prop/ns-quota",
new BacklogQuota(10 * 1024, BacklogQuota.RetentionPolicy.consumer_backlog_eviction));
PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()).build();
@@ -188,7 +191,8 @@ public class BacklogQuotaManagerTest {
@Test
public void testConcurrentAckAndEviction() throws Exception {
- assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"), Maps.newTreeMap());
+ assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"),
+ ConfigHelper.backlogQuotaMap(config));
admin.namespaces().setBacklogQuota("prop/ns-quota",
new BacklogQuota(10 * 1024, BacklogQuota.RetentionPolicy.consumer_backlog_eviction));
@@ -260,7 +264,8 @@ public class BacklogQuotaManagerTest {
@Test
public void testNoEviction() throws Exception {
- assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"), Maps.newTreeMap());
+ assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"),
+ ConfigHelper.backlogQuotaMap(config));
admin.namespaces().setBacklogQuota("prop/ns-quota",
new BacklogQuota(10 * 1024, BacklogQuota.RetentionPolicy.consumer_backlog_eviction));
@@ -325,7 +330,8 @@ public class BacklogQuotaManagerTest {
@Test
public void testEvictionMulti() throws Exception {
- assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"), Maps.newTreeMap());
+ assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"),
+ ConfigHelper.backlogQuotaMap(config));
admin.namespaces().setBacklogQuota("prop/ns-quota",
new BacklogQuota(15 * 1024, BacklogQuota.RetentionPolicy.consumer_backlog_eviction));
@@ -433,7 +439,8 @@ public class BacklogQuotaManagerTest {
@Test
public void testAheadProducerOnHold() throws Exception {
- assertEquals(admin.namespaces().getBacklogQuotaMap("prop/quotahold"), Maps.newTreeMap());
+ assertEquals(admin.namespaces().getBacklogQuotaMap("prop/quotahold"),
+ ConfigHelper.backlogQuotaMap(config));
admin.namespaces().setBacklogQuota("prop/quotahold",
new BacklogQuota(10 * 1024, BacklogQuota.RetentionPolicy.producer_request_hold));
final PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString())
@@ -471,7 +478,8 @@ public class BacklogQuotaManagerTest {
@Test
public void testAheadProducerOnHoldTimeout() throws Exception {
- assertEquals(admin.namespaces().getBacklogQuotaMap("prop/quotahold"), Maps.newTreeMap());
+ assertEquals(admin.namespaces().getBacklogQuotaMap("prop/quotahold"),
+ ConfigHelper.backlogQuotaMap(config));
admin.namespaces().setBacklogQuota("prop/quotahold",
new BacklogQuota(10 * 1024, BacklogQuota.RetentionPolicy.producer_request_hold));
final PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString())
@@ -505,7 +513,8 @@ public class BacklogQuotaManagerTest {
@Test
public void testProducerException() throws Exception {
- assertEquals(admin.namespaces().getBacklogQuotaMap("prop/quotahold"), Maps.newTreeMap());
+ assertEquals(admin.namespaces().getBacklogQuotaMap("prop/quotahold"),
+ ConfigHelper.backlogQuotaMap(config));
admin.namespaces().setBacklogQuota("prop/quotahold",
new BacklogQuota(10 * 1024, BacklogQuota.RetentionPolicy.producer_exception));
final PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString())
@@ -541,7 +550,8 @@ public class BacklogQuotaManagerTest {
@Test
public void testProducerExceptionAndThenUnblock() throws Exception {
- assertEquals(admin.namespaces().getBacklogQuotaMap("prop/quotahold"), Maps.newTreeMap());
+ assertEquals(admin.namespaces().getBacklogQuotaMap("prop/quotahold"),
+ ConfigHelper.backlogQuotaMap(config));
admin.namespaces().setBacklogQuota("prop/quotahold",
new BacklogQuota(10 * 1024, BacklogQuota.RetentionPolicy.producer_exception));
final PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString())
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
index 3d12545..e5b520a 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
@@ -103,6 +103,13 @@ public class Policies {
return bundle;
}
+ public static void setStorageQuota(Policies polices, BacklogQuota quota) {
+ if (polices == null) {
+ return;
+ }
+ polices.backlog_quota_map.put(BacklogQuota.BacklogQuotaType.destination_storage, quota);
+ }
+
@Override
public String toString() {
return MoreObjects.toStringHelper(this).add("auth_policies", auth_policies)