You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/07/31 06:07:05 UTC
[pulsar] branch master updated: [Issue 2689] Support set backlog
quota on topic level. (#7646)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 26c49a8 [Issue 2689] Support set backlog quota on topic level. (#7646)
26c49a8 is described below
commit 26c49a85dd89d4282a625f02d16959de595d282f
Author: jianyun <21...@qq.com>
AuthorDate: Fri Jul 31 14:06:47 2020 +0800
[Issue 2689] Support set backlog quota on topic level. (#7646)
### Motivation
Support set backlog quota on topic level.
Based on the system topic function, refer to @codelipenghui topic-level backlog quota based on zk implementation
###Modifications
Support get-backlog-quotas on topic level.
Support set-backlog-quota on topic level.
Support remove-backlog-quota on topic level.
---
.../apache/pulsar/broker/admin/AdminResource.java | 36 ++++
.../pulsar/broker/admin/impl/NamespacesBase.java | 17 +-
.../broker/admin/impl/PersistentTopicsBase.java | 79 ++++++++
.../pulsar/broker/admin/v2/PersistentTopics.java | 55 ++++++
.../pulsar/broker/service/BacklogQuotaManager.java | 36 +++-
.../broker/service/persistent/PersistentTopic.java | 8 +-
.../broker/admin/TopicBacklogQuotaDisableTest.java | 97 ++++++++++
.../pulsar/broker/admin/TopicBacklogQuotaTest.java | 205 +++++++++++++++++++++
.../org/apache/pulsar/client/admin/Topics.java | 79 ++++++++
.../pulsar/client/admin/internal/TopicsImpl.java | 38 ++++
.../org/apache/pulsar/admin/cli/CmdTopics.java | 62 +++++++
.../pulsar/common/policies/data/TopicPolicies.java | 2 +-
12 files changed, 685 insertions(+), 29 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 9124ebb..590c032 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -28,6 +28,7 @@ import java.net.MalformedURLException;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
@@ -64,8 +65,10 @@ import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.FailureDomain;
import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.FutureUtil;
@@ -508,6 +511,39 @@ public abstract class AdminResource extends PulsarWebResource {
return pulsar().getBrokerService().getBacklogQuotaManager().getBacklogQuota(namespace, namespacePath);
}
+ protected Optional<TopicPolicies> getTopicPolicies(TopicName topicName) {
+ try {
+ checkTopicLevelPolicyEnable();
+ return Optional.ofNullable(pulsar().getTopicPoliciesService().getTopicPolicies(topicName));
+ } catch (RestException re) {
+ throw re;
+ } catch (Exception e) {
+ log.error("[{}] Failed to get topic policies {}", clientAppId(), topicName, e);
+ throw new RestException(e);
+ }
+ }
+
+ protected boolean checkBacklogQuota(BacklogQuota quota, RetentionPolicies retention) {
+ if (retention == null || retention.getRetentionSizeInMB() == 0 ||
+ retention.getRetentionSizeInMB() == -1) {
+ return true;
+ }
+ if (quota == null) {
+ quota = pulsar().getBrokerService().getBacklogQuotaManager().getDefaultQuota();
+ }
+ if (quota.getLimit() >= ( retention.getRetentionSizeInMB() * 1024 * 1024)) {
+ return false;
+ }
+ return true;
+ }
+
+ protected void checkTopicLevelPolicyEnable() {
+ if (!config().isTopicLevelPoliciesEnabled()) {
+ throw new RestException(Status.METHOD_NOT_ALLOWED,
+ "Topic level policies is disabled, to enable the topic level policy and retry.");
+ }
+ }
+
protected DispatchRate dispatchRate() {
return new DispatchRate(
pulsar().getConfiguration().getDispatchThrottlingRatePerTopicInMsg(),
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index c76eeda..dc5b0af 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -2036,21 +2036,12 @@ public abstract class NamespacesBase extends AdminResource {
}
private boolean checkQuotas(Policies policies, RetentionPolicies retention) {
- Map<BacklogQuota.BacklogQuotaType, BacklogQuota> backlog_quota_map = policies.backlog_quota_map;
- if (backlog_quota_map.isEmpty() || retention.getRetentionSizeInMB() == 0 || retention.getRetentionSizeInMB() == -1) {
+ Map<BacklogQuota.BacklogQuotaType, BacklogQuota> backlogQuotaMap = policies.backlog_quota_map;
+ if (backlogQuotaMap.isEmpty()) {
return true;
}
- BacklogQuota quota = backlog_quota_map.get(BacklogQuotaType.destination_storage);
- if (quota == null) {
- quota = pulsar().getBrokerService().getBacklogQuotaManager().getDefaultQuota();
- }
- if (quota.getLimit() < 0 && (retention.getRetentionSizeInMB() > 0 || retention.getRetentionTimeInMinutes() > 0)) {
- return false;
- }
- if (quota.getLimit() >= (retention.getRetentionSizeInMB() * 1024 * 1024)) {
- return false;
- }
- return true;
+ BacklogQuota quota = backlogQuotaMap.get(BacklogQuotaType.destination_storage);
+ return checkBacklogQuota(quota, retention);
}
private void clearBacklog(NamespaceName nsName, String bundleRange, String subscription) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index cdbca45..4066399 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -71,6 +71,7 @@ import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.admin.ZkAdminPaths;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.BrokerServiceException.AlreadyRunningException;
import org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException;
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
@@ -104,12 +105,15 @@ import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.AuthPolicies;
+import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.PartitionedTopicInternalStats;
import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
+import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.FutureUtil;
@@ -2000,6 +2004,81 @@ public class PersistentTopicsBase extends AdminResource {
return offlineTopicStats;
}
+ protected void internalSetBacklogQuota(AsyncResponse asyncResponse, BacklogQuota.BacklogQuotaType backlogQuotaType, BacklogQuota backlogQuota) {
+ validateAdminAccessForTenant(namespaceName.getTenant());
+ validatePoliciesReadOnlyAccess();
+ if (topicName.isGlobal()) {
+ validateGlobalNamespaceOwnership(namespaceName);
+ }
+ if (backlogQuotaType == null) {
+ backlogQuotaType = BacklogQuota.BacklogQuotaType.destination_storage;
+ }
+ checkTopicLevelPolicyEnable();
+ TopicPolicies topicPolicies;
+ try {
+ topicPolicies = pulsar().getTopicPoliciesService().getTopicPolicies(topicName);
+ } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
+ log.warn("Topic {} policies cache have not init.", topicName);
+ asyncResponse.resume(new RestException(e));
+ return;
+ }
+ if (topicPolicies == null){
+ topicPolicies = new TopicPolicies();
+ }
+
+ RetentionPolicies retentionPolicies = getRetentionPolicies(topicName, topicPolicies);
+ if(!checkBacklogQuota(backlogQuota,retentionPolicies)){
+ log.warn(
+ "[{}] Failed to update backlog configuration for topic {}: conflicts with retention quota",
+ clientAppId(), topicName);
+ asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
+ "Backlog Quota exceeds configured retention quota for topic. " +
+ "Please increase retention quota and retry"));
+ }
+
+ if(backlogQuota != null){
+ topicPolicies.getBackLogQuotaMap().put(backlogQuotaType.name(), backlogQuota);
+ }else {
+ topicPolicies.getBackLogQuotaMap().remove(backlogQuotaType.name());
+ }
+ Map<String, BacklogQuota> backLogQuotaMap = topicPolicies.getBackLogQuotaMap();
+ pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies)
+ .whenComplete((r, ex) -> {
+ if (ex != null) {
+ log.error("Failed updated backlog quota map",ex);
+ asyncResponse.resume(new RestException(ex));
+ } else {
+ try {
+ log.info("[{}] Successfully updated backlog quota map: namespace={}, topic={}, map={}",
+ clientAppId(),
+ namespaceName,
+ topicName.getLocalName(),
+ jsonMapper().writeValueAsString(backLogQuotaMap));
+ } catch (JsonProcessingException ignore) { }
+ asyncResponse.resume(Response.noContent().build());
+ }
+ });
+ }
+
+ private RetentionPolicies getRetentionPolicies(TopicName topicName, TopicPolicies topicPolicies) {
+ RetentionPolicies retentionPolicies = topicPolicies.getRetentionPolicies();
+ if (retentionPolicies == null){
+ try {
+ retentionPolicies = getNamespacePoliciesAsync(topicName.getNamespaceObject())
+ .thenApply(policies -> policies.retention_policies)
+ .get(1L, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ throw new RestException(e);
+ }
+ }
+ return retentionPolicies;
+ }
+
+ protected void internalRemoveBacklogQuota(AsyncResponse asyncResponse,
+ BacklogQuota.BacklogQuotaType backlogQuotaType) {
+ internalSetBacklogQuota(asyncResponse, backlogQuotaType, null);
+ }
+
protected MessageId internalTerminate(boolean authoritative) {
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index fbfea19..b939bde 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.admin.v2;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -38,6 +39,7 @@ import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
+import com.google.common.collect.Maps;
import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
@@ -46,8 +48,10 @@ import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
+import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.policies.data.TopicStats;
import io.swagger.annotations.Api;
@@ -977,6 +981,57 @@ public class PersistentTopics extends PersistentTopicsBase {
return internalGetBacklog(authoritative);
}
+ @GET
+ @Path("/{tenant}/{namespace}/{topic}/backlogQuotaMap")
+ @ApiOperation(value = "Get backlog quota map on a topic.")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Topic policy does not exist"),
+ @ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry")})
+ public Map<BacklogQuota.BacklogQuotaType, BacklogQuota> getBacklogQuotaMap(@PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace,
+ @PathParam("topic") @Encoded String encodedTopic) {
+ validateTopicName(tenant, namespace, encodedTopic);
+ return getTopicPolicies(topicName)
+ .map(TopicPolicies::getBackLogQuotaMap)
+ .map(map -> {
+ HashMap<BacklogQuota.BacklogQuotaType, BacklogQuota> hashMap = Maps.newHashMap();
+ map.forEach((key,value) -> {
+ hashMap.put(BacklogQuota.BacklogQuotaType.valueOf(key),value);
+ });
+ return hashMap;
+ })
+ .orElse(Maps.newHashMap());
+ }
+
+ @POST
+ @Path("/{tenant}/{namespace}/{topic}/backlogQuota")
+ @ApiOperation(value = " Set a backlog quota for a topic.")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Topic does not exist"),
+ @ApiResponse(code = 409, message = "Concurrent modification"),
+ @ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry"),
+ @ApiResponse(code = 412, message = "Specified backlog quota exceeds retention quota. Increase retention quota and retry request") })
+ public void setBacklogQuota(@Suspended final AsyncResponse asyncResponse, @PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
+ @PathParam("topic") @Encoded String encodedTopic,
+ @QueryParam("backlogQuotaType") BacklogQuota.BacklogQuotaType backlogQuotaType, BacklogQuota backlogQuota) {
+ validateTopicName(tenant, namespace, encodedTopic);
+ internalSetBacklogQuota(asyncResponse, backlogQuotaType, backlogQuota);
+ }
+
+ @DELETE
+ @Path("/{tenant}/{namespace}/{topic}/backlogQuota")
+ @ApiOperation(value = "Remove a backlog quota policy from a topic.")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Topic does not exist"),
+ @ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry"),
+ @ApiResponse(code = 409, message = "Concurrent modification") })
+ public void removeBacklogQuota(@Suspended final AsyncResponse asyncResponse, @PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
+ @PathParam("topic") @Encoded String encodedTopic,
+ @QueryParam("backlogQuotaType") BacklogQuota.BacklogQuotaType backlogQuotaType) {
+ validateTopicName(tenant, namespace, encodedTopic);
+ internalRemoveBacklogQuota(asyncResponse, backlogQuotaType);
+ }
+
@POST
@Path("/{tenant}/{namespace}/{topic}/terminate")
@ApiOperation(value = "Terminate a topic. A topic that is terminated will not accept any more "
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
index fd47425..8eafa5d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.service;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.mledger.ManagedCursor;
@@ -32,9 +33,8 @@ import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
-import org.apache.pulsar.common.policies.data.BacklogQuota.RetentionPolicy;
+import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.util.FutureUtil;
-import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,12 +47,17 @@ public class BacklogQuotaManager {
private static final Logger log = LoggerFactory.getLogger(BacklogQuotaManager.class);
private final BacklogQuota defaultQuota;
private final ZooKeeperDataCache<Policies> zkCache;
+ private final PulsarService pulsar;
+ private final boolean isTopicLevelPoliciesEnable;
+
public BacklogQuotaManager(PulsarService pulsar) {
+ this.isTopicLevelPoliciesEnable = pulsar.getConfiguration().isTopicLevelPoliciesEnabled();
this.defaultQuota = new BacklogQuota(
pulsar.getConfiguration().getBacklogQuotaDefaultLimitGB() * 1024 * 1024 * 1024,
pulsar.getConfiguration().getBacklogQuotaDefaultRetentionPolicy());
this.zkCache = pulsar.getConfigurationCache().policiesCache();
+ this.pulsar = pulsar;
}
public BacklogQuota getDefaultQuota() {
@@ -70,9 +75,25 @@ public class BacklogQuotaManager {
}
}
- public long getBacklogQuotaLimit(String namespace) {
- String policyPath = AdminResource.path(POLICIES, namespace);
- return getBacklogQuota(namespace, policyPath).getLimit();
+ public BacklogQuota getBacklogQuota(TopicName topicName) {
+ String policyPath = AdminResource.path(POLICIES, topicName.getNamespace());
+ if (!isTopicLevelPoliciesEnable) {
+ return getBacklogQuota(topicName.getNamespace(),policyPath);
+ }
+
+ try {
+ return Optional.ofNullable(pulsar.getTopicPoliciesService().getTopicPolicies(topicName))
+ .map(TopicPolicies::getBackLogQuotaMap)
+ .map(map -> map.get(BacklogQuotaType.destination_storage.name()))
+ .orElseGet(() -> getBacklogQuota(topicName.getNamespace(),policyPath));
+ } catch (Exception e) {
+ log.error("Failed to read policies data, will apply the default backlog quota: topicName={}", topicName, e);
+ }
+ return getBacklogQuota(topicName.getNamespace(),policyPath);
+ }
+
+ public long getBacklogQuotaLimit(TopicName topicName) {
+ return getBacklogQuota(topicName).getLimit();
}
/**
@@ -83,10 +104,7 @@ public class BacklogQuotaManager {
*/
public void handleExceededBacklogQuota(PersistentTopic persistentTopic) {
TopicName topicName = TopicName.get(persistentTopic.getName());
- String namespace = topicName.getNamespace();
- String policyPath = AdminResource.path(POLICIES, namespace);
-
- BacklogQuota quota = getBacklogQuota(namespace, policyPath);
+ BacklogQuota quota = getBacklogQuota(topicName);
log.info("Backlog quota exceeded for topic [{}]. Applying [{}] policy", persistentTopic.getName(),
quota.getPolicy());
switch (quota.getPolicy()) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 377dc98..eac2380 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1847,11 +1847,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
@Override
public BacklogQuota getBacklogQuota() {
TopicName topicName = TopicName.get(this.getName());
- String namespace = topicName.getNamespace();
- String policyPath = AdminResource.path(POLICIES, namespace);
-
- BacklogQuota backlogQuota = brokerService.getBacklogQuotaManager().getBacklogQuota(namespace, policyPath);
- return backlogQuota;
+ return brokerService.getBacklogQuotaManager().getBacklogQuota(topicName);
}
/**
@@ -1882,7 +1878,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
*/
public boolean isBacklogExceeded() {
TopicName topicName = TopicName.get(getName());
- long backlogQuotaLimitInBytes = brokerService.getBacklogQuotaManager().getBacklogQuotaLimit(topicName.getNamespace());
+ long backlogQuotaLimitInBytes = brokerService.getBacklogQuotaManager().getBacklogQuotaLimit(topicName);
if (backlogQuotaLimitInBytes < 0) {
return false;
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicBacklogQuotaDisableTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicBacklogQuotaDisableTest.java
new file mode 100644
index 0000000..0f8b2cb
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicBacklogQuotaDisableTest.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import com.google.common.collect.Sets;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.service.BacklogQuotaManager;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.BacklogQuota;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class TopicBacklogQuotaDisableTest extends MockedPulsarServiceBaseTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TopicBacklogQuotaDisableTest.class);
+
+ private final String testTenant = "my-tenant";
+
+ private final String testNamespace = "my-namespace";
+
+ private final String myNamespace = testTenant + "/" + testNamespace;
+
+ private final String backlogQuotaTopic = "persistent://" + myNamespace + "/test-set-backlog-quota";
+
+ @BeforeMethod
+ @Override
+ protected void setup() throws Exception {
+ this.conf.setSystemTopicEnabled(true);
+ this.conf.setTopicLevelPoliciesEnabled(false);
+ super.internalSetup();
+
+ admin.clusters().createCluster("test", new ClusterData(pulsar.getWebServiceAddress()));
+ TenantInfo tenantInfo = new TenantInfo(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"));
+ admin.tenants().createTenant(this.testTenant, tenantInfo);
+ admin.namespaces().createNamespace(testTenant + "/" + testNamespace, Sets.newHashSet("test"));
+ admin.topics().createPartitionedTopic(backlogQuotaTopic, 2);
+ }
+
+ @AfterMethod
+ @Override
+ public void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Test
+ public void testBacklogQuotaDisabled() throws Exception {
+ BacklogQuota backlogQuota = new BacklogQuota(1024, BacklogQuota.RetentionPolicy.consumer_backlog_eviction);
+ log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, backlogQuotaTopic);
+
+ try {
+ admin.topics().setBacklogQuota(backlogQuotaTopic, backlogQuota);
+ Assert.fail();
+ } catch (PulsarAdminException e) {
+ Assert.assertEquals(e.getStatusCode(), 405);
+ }
+
+ try {
+ admin.topics().removeBacklogQuota(backlogQuotaTopic);
+ Assert.fail();
+ } catch (PulsarAdminException e) {
+ Assert.assertEquals(e.getStatusCode(), 405);
+ }
+
+ try {
+ admin.topics().getBacklogQuotaMap(backlogQuotaTopic);
+ Assert.fail();
+ } catch (PulsarAdminException e) {
+ Assert.assertEquals(e.getStatusCode(), 405);
+ }
+ }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicBacklogQuotaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicBacklogQuotaTest.java
new file mode 100644
index 0000000..1104ab6
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicBacklogQuotaTest.java
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import com.google.common.collect.Sets;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.service.BacklogQuotaManager;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.BacklogQuota;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class TopicBacklogQuotaTest extends MockedPulsarServiceBaseTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TopicBacklogQuotaTest.class);
+
+ private final String testTenant = "my-tenant";
+
+ private final String testNamespace = "my-namespace";
+
+ private final String myNamespace = testTenant + "/" + testNamespace;
+
+ private final String backlogQuotaTopic = "persistent://" + myNamespace + "/test-set-backlog-quota";
+
+ public void disableTopicLevelPolicies() throws Exception {
+ this.conf.setSystemTopicEnabled(true);
+ this.conf.setTopicLevelPoliciesEnabled(false);
+ super.internalSetup();
+
+ admin.clusters().createCluster("test", new ClusterData(pulsar.getWebServiceAddress()));
+ TenantInfo tenantInfo = new TenantInfo(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"));
+ admin.tenants().createTenant(this.testTenant, tenantInfo);
+ admin.namespaces().createNamespace(testTenant + "/" + testNamespace, Sets.newHashSet("test"));
+ }
+
+ @BeforeMethod
+ @Override
+ protected void setup() throws Exception {
+ this.conf.setSystemTopicEnabled(true);
+ this.conf.setTopicLevelPoliciesEnabled(true);
+ super.internalSetup();
+
+ admin.clusters().createCluster("test", new ClusterData(pulsar.getWebServiceAddress()));
+ TenantInfo tenantInfo = new TenantInfo(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"));
+ admin.tenants().createTenant(this.testTenant, tenantInfo);
+ admin.namespaces().createNamespace(testTenant + "/" + testNamespace, Sets.newHashSet("test"));
+ admin.topics().createPartitionedTopic(backlogQuotaTopic, 2);
+ }
+
+ @AfterMethod
+ @Override
+ public void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Test
+ public void testSetBacklogQuota() throws Exception {
+
+ BacklogQuota backlogQuota = new BacklogQuota(1024, BacklogQuota.RetentionPolicy.consumer_backlog_eviction);
+ log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, backlogQuotaTopic);
+
+ admin.topics().setBacklogQuota(backlogQuotaTopic, backlogQuota);
+ log.info("Backlog quota set success on topic: {}", backlogQuotaTopic);
+
+ Thread.sleep(3000);
+ BacklogQuota getBacklogQuota = admin.topics().getBacklogQuotaMap(backlogQuotaTopic)
+ .get(BacklogQuota.BacklogQuotaType.destination_storage);
+ log.info("Backlog quota {} get on topic: {}", getBacklogQuota, backlogQuotaTopic);
+ Assert.assertEquals(getBacklogQuota, backlogQuota);
+
+ BacklogQuotaManager backlogQuotaManager = pulsar.getBrokerService().getBacklogQuotaManager();
+ BacklogQuota backlogQuotaInManager = backlogQuotaManager.getBacklogQuota(TopicName.get(backlogQuotaTopic));
+ log.info("Backlog quota {} in backlog quota manager on topic: {}", backlogQuotaInManager, backlogQuotaTopic);
+ Assert.assertEquals(backlogQuotaInManager, backlogQuota);
+
+ admin.topics().deletePartitionedTopic(backlogQuotaTopic, true);
+ }
+
+ @Test
+ public void testRemoveBacklogQuota() throws Exception {
+ BacklogQuota backlogQuota = new BacklogQuota(1024, BacklogQuota.RetentionPolicy.consumer_backlog_eviction);
+ log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, backlogQuotaTopic);
+ admin.topics().setBacklogQuota(backlogQuotaTopic, backlogQuota);
+ log.info("Backlog quota set success on topic: {}", backlogQuotaTopic);
+
+ Thread.sleep(3000);
+ BacklogQuota getBacklogQuota = admin.topics().getBacklogQuotaMap(backlogQuotaTopic)
+ .get(BacklogQuota.BacklogQuotaType.destination_storage);
+ log.info("Backlog quota {} get on topic: {}", getBacklogQuota, backlogQuotaTopic);
+ Assert.assertEquals(backlogQuota, getBacklogQuota);
+
+ BacklogQuotaManager backlogQuotaManager = pulsar.getBrokerService().getBacklogQuotaManager();
+ BacklogQuota backlogQuotaInManager = backlogQuotaManager.getBacklogQuota(TopicName.get(backlogQuotaTopic));
+ log.info("Backlog quota {} in backlog quota manager on topic: {}", backlogQuotaInManager, backlogQuotaTopic);
+ Assert.assertEquals(backlogQuota, backlogQuotaInManager);
+
+ admin.topics().removeBacklogQuota(backlogQuotaTopic);
+ getBacklogQuota = admin.topics().getBacklogQuotaMap(backlogQuotaTopic)
+ .get(BacklogQuota.BacklogQuotaType.destination_storage);
+ log.info("Backlog quota {} get on topic: {} after remove", getBacklogQuota, backlogQuotaTopic);
+ Assert.assertNull(getBacklogQuota);
+
+ backlogQuotaInManager = backlogQuotaManager.getBacklogQuota(TopicName.get(backlogQuotaTopic));
+ log.info("Backlog quota {} in backlog quota manager on topic: {} after remove", backlogQuotaInManager,
+ backlogQuotaTopic);
+ Assert.assertEquals(backlogQuotaManager.getDefaultQuota(), backlogQuotaInManager);
+
+ admin.topics().deletePartitionedTopic(backlogQuotaTopic, true);
+ }
+
+ @Test
+ public void testCheckQuota() throws Exception {
+ RetentionPolicies retentionPolicies = new RetentionPolicies(10, 10);
+ String namespace = TopicName.get(backlogQuotaTopic).getNamespace();
+ admin.namespaces().setRetention(namespace, retentionPolicies);
+
+ BacklogQuota backlogQuota =
+ new BacklogQuota(10 * 1024 * 1024, BacklogQuota.RetentionPolicy.consumer_backlog_eviction);
+ log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, backlogQuotaTopic);
+ try {
+ admin.topics().setBacklogQuota(backlogQuotaTopic, backlogQuota);
+ Assert.fail();
+ } catch (PulsarAdminException e) {
+ Assert.assertEquals(e.getStatusCode(), 412);
+ }
+ Thread.sleep(3000);
+ backlogQuota =
+ new BacklogQuota(10 * 1024 * 1024 + 1, BacklogQuota.RetentionPolicy.consumer_backlog_eviction);
+ log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, backlogQuotaTopic);
+ try {
+ admin.topics().setBacklogQuota(backlogQuotaTopic, backlogQuota);
+ Assert.fail();
+ } catch (PulsarAdminException e) {
+ Assert.assertEquals(e.getStatusCode(), 412);
+ }
+ Thread.sleep(3000);
+ backlogQuota =
+ new BacklogQuota(10 * 1024 * 1024 - 1, BacklogQuota.RetentionPolicy.consumer_backlog_eviction);
+ log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, backlogQuotaTopic);
+ admin.topics().setBacklogQuota(backlogQuotaTopic, backlogQuota);
+ Thread.sleep(3000);
+ BacklogQuota getBacklogQuota = admin.topics().getBacklogQuotaMap(backlogQuotaTopic)
+ .get(BacklogQuota.BacklogQuotaType.destination_storage);
+ log.info("Backlog quota {} get on topic: {} after remove", getBacklogQuota, backlogQuotaTopic);
+ Assert.assertEquals(getBacklogQuota, backlogQuota);
+
+ admin.topics().deletePartitionedTopic(backlogQuotaTopic, true);
+ }
+
+ @Test
+ public void testBacklogQuotaDisabled() throws Exception {
+ disableTopicLevelPolicies();
+ admin.topics().createPartitionedTopic(backlogQuotaTopic, 2);
+
+ BacklogQuota backlogQuota = new BacklogQuota(1024, BacklogQuota.RetentionPolicy.consumer_backlog_eviction);
+ log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, backlogQuotaTopic);
+
+ try {
+ admin.topics().setBacklogQuota(backlogQuotaTopic, backlogQuota);
+ Assert.fail();
+ } catch (PulsarAdminException e) {
+ Assert.assertEquals(e.getStatusCode(), 405);
+ }
+
+ try {
+ admin.topics().removeBacklogQuota(backlogQuotaTopic);
+ Assert.fail();
+ } catch (PulsarAdminException e) {
+ Assert.assertEquals(e.getStatusCode(), 405);
+ }
+
+ try {
+ admin.topics().getBacklogQuotaMap(backlogQuotaTopic);
+ Assert.fail();
+ } catch (PulsarAdminException e) {
+ Assert.assertEquals(e.getStatusCode(), 405);
+ }
+ }
+}
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
index f4e997d..bf6ac82 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -34,6 +34,7 @@ import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.PartitionedTopicInternalStats;
import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
@@ -1354,4 +1355,82 @@ public interface Topics {
* @return
*/
CompletableFuture<MessageId> getLastMessageIdAsync(String topic);
+
+ /**
+ * Get backlog quota map for a topic.
+ * Response example:
+ *
+ * <pre>
+ * <code>
+ * {
+ * "namespace_memory" : {
+ * "limit" : "134217728",
+ * "policy" : "consumer_backlog_eviction"
+ * },
+ * "destination_storage" : {
+ * "limit" : "-1",
+ * "policy" : "producer_exception"
+ * }
+ * }
+ * </code>
+ * </pre>
+ *
+ * @param topic
+ * Topic name
+ *
+ * @throws NotAuthorizedException
+ * Permission denied
+ * @throws NotFoundException
+ * Topic does not exist
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ Map<BacklogQuota.BacklogQuotaType, BacklogQuota> getBacklogQuotaMap(String topic) throws PulsarAdminException;
+
+ /**
+ * Set a backlog quota for a topic.
+ * The backlog quota can be set on this resource:
+ *
+ * <p>
+ * Request parameter example:
+ *</p>
+ *
+ * <pre>
+ * <code>
+ * {
+ * "limit" : "134217728",
+ * "policy" : "consumer_backlog_eviction"
+ * }
+ * </code>
+ * </pre>
+ *
+ * @param topic
+ * Topic name
+ * @param backlogQuota
+ * the new BacklogQuota
+ *
+ * @throws NotAuthorizedException
+ * Don't have admin permission
+ * @throws NotFoundException
+ * Topic does not exist
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ void setBacklogQuota(String topic, BacklogQuota backlogQuota) throws PulsarAdminException;
+
+ /**
+ * Remove a backlog quota policy from a topic.
+ * The namespace backlog policy will fall back to the default.
+ *
+ * @param topic
+ * Topic name
+ *
+ * @throws NotAuthorizedException
+ * Don't have admin permission
+ * @throws NotFoundException
+ * Topic does not exist
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ void removeBacklogQuota(String topic) throws PulsarAdminException;
}
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index f6d4fae..b5ac144 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -45,6 +45,7 @@ import java.util.stream.Stream;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.InvocationCallback;
import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.core.GenericType;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.core.Response;
@@ -69,6 +70,8 @@ import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.BacklogQuota;
+import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
import org.apache.pulsar.common.policies.data.ErrorData;
import org.apache.pulsar.common.policies.data.PartitionedTopicInternalStats;
import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
@@ -1391,5 +1394,40 @@ public class TopicsImpl extends BaseResource implements Topics {
return future;
}
+ @Override
+ public Map<BacklogQuotaType, BacklogQuota> getBacklogQuotaMap(String topic) throws PulsarAdminException {
+ try {
+ TopicName tn = validateTopic(topic);
+ WebTarget path = topicPath(tn, "backlogQuotaMap");
+ return request(path).get(new GenericType<Map<BacklogQuotaType, BacklogQuota>>() {
+ });
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
+
+ @Override
+ public void setBacklogQuota(String topic, BacklogQuota backlogQuota) throws PulsarAdminException {
+ try {
+ TopicName tn = validateTopic(topic);
+ WebTarget path = topicPath(tn, "backlogQuota");
+ request(path).post(Entity.entity(backlogQuota, MediaType.APPLICATION_JSON), ErrorData.class);
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
+
+ @Override
+ public void removeBacklogQuota(String topic) throws PulsarAdminException {
+ try {
+ TopicName tn = validateTopic(topic);
+ WebTarget path = topicPath(tn, "backlogQuota");
+ request(path.queryParam("backlogQuotaType", BacklogQuotaType.destination_storage.toString()))
+ .delete(ErrorData.class);
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
+
private static final Logger log = LoggerFactory.getLogger(TopicsImpl.class);
}
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index 733a31d..4f2e399 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.admin.cli;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import com.beust.jcommander.Parameter;
+import com.beust.jcommander.ParameterException;
import com.beust.jcommander.Parameters;
import com.beust.jcommander.converters.CommaParameterSplitter;
import com.google.common.collect.Lists;
@@ -32,6 +33,7 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
+import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutionException;
@@ -45,6 +47,7 @@ import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.util.RelativeTimeUtil;
@@ -98,6 +101,9 @@ public class CmdTopics extends CmdBase {
jcommander.addCommand("offload", new Offload());
jcommander.addCommand("offload-status", new OffloadStatusCmd());
jcommander.addCommand("last-message-id", new GetLastMessageId());
+ jcommander.addCommand("get-backlog-quotas", new GetBacklogQuotaMap());
+ jcommander.addCommand("set-backlog-quota", new SetBacklogQuota());
+ jcommander.addCommand("remove-backlog-quota", new RemoveBacklogQuota());
}
@Parameters(commandDescription = "Get the list of topics under a namespace.")
@@ -818,4 +824,60 @@ public class CmdTopics extends CmdBase {
print(topics.getLastMessageId(persistentTopic));
}
}
+
+ @Parameters(commandDescription = "Get the backlog quota policies for a topic")
+ private class GetBacklogQuotaMap extends CliCommand {
+ @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+ private java.util.List<String> params;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String persistentTopic = validatePersistentTopic(params);
+ print(admin.topics().getBacklogQuotaMap(persistentTopic));
+ }
+ }
+
+ @Parameters(commandDescription = "Set a backlog quota policy for a topic")
+ private class SetBacklogQuota extends CliCommand {
+ @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+ private java.util.List<String> params;
+
+ @Parameter(names = { "-l", "--limit" }, description = "Size limit (eg: 10M, 16G)", required = true)
+ private String limitStr;
+
+ @Parameter(names = { "-p", "--policy" }, description = "Retention policy to enforce when the limit is reached. "
+ + "Valid options are: [producer_request_hold, producer_exception, consumer_backlog_eviction]", required = true)
+ private String policyStr;
+
+ @Override
+ void run() throws PulsarAdminException {
+ BacklogQuota.RetentionPolicy policy;
+ long limit;
+
+ try {
+ policy = BacklogQuota.RetentionPolicy.valueOf(policyStr);
+ } catch (IllegalArgumentException e) {
+ throw new ParameterException(String.format("Invalid retention policy type '%s'. Valid options are: %s",
+ policyStr, Arrays.toString(BacklogQuota.RetentionPolicy.values())));
+ }
+
+ limit = validateSizeString(limitStr);
+
+ String persistentTopic = validatePersistentTopic(params);
+ admin.topics().setBacklogQuota(persistentTopic, new BacklogQuota(limit, policy));
+ }
+ }
+
+ @Parameters(commandDescription = "Remove a backlog quota policy from a topic")
+ private class RemoveBacklogQuota extends CliCommand {
+
+ @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+ private java.util.List<String> params;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String persistentTopic = validatePersistentTopic(params);
+ admin.topics().removeBacklogQuota(persistentTopic);
+ }
+ }
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
index 56fbf83..f9fcaf8 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
@@ -37,7 +37,7 @@ import lombok.NoArgsConstructor;
@AllArgsConstructor
public class TopicPolicies {
- private Map<String, BacklogQuota> backLogQuotaMap = Maps.newHashMap();
+ private Map<String, BacklogQuota> backLogQuotaMap = Maps.newHashMap();
private PersistencePolicies persistence = null;
private RetentionPolicies retentionPolicies = null;
private Boolean deduplicationEnabled = null;